cross_doctor/app/task_worker.py

1340 lines
61 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2025/11/10 17:56
# @Description:
import json
from app.global_source import db_user, db_task
from flask import request, send_file
from pypinyin import lazy_pinyin, Style
from app.common_worker import *
from proto.phase_grpc import LedgerTaskDetailPhaseState
from tool.qcos_func import get_client, CosFolderManager
ALLOWED_EXTENSION_TASK = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'doc', 'docx', 'xlsx', 'xls', 'pptx', 'ppt'}
ALLOWED_CROSS_PIC_FILE_TYPE = {'png', 'jpg', 'jpeg'}
g_cos_root = 'https://xinglu-1324629296.cos.ap-beijing.myqcloud.com'
g_cos_bucket = 'xinglu-1324629296'
def do_query_task_list_parameter(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
res = make_common_res(0, 'ok')
res['data'] = {}
#任务类型
task_types = db_task.query_task_type(nodeid, area_id)
task_types_res = {}
for task_type in task_types:
task_types_res[task_type['task_type_no']] = task_type['task_type']
res['data']['task_type'] = task_types_res
# 路网(数据)类型
res['data']['data_type'] = {0: '不关联', 1: '路口任务', 2: '干线协调任务'}
#任务等级 0低无需审批1中可事后审批2高审批后执行
res['data']['task_class'] = {0: '低(无需审批)', 1: '中(可事后审批)', 2: '高(审批后执行)'}
#任务状态
res['data']['task_state'] = {0:'待下发', 1: '待审批', 2: '进行中未审批', 3: '进行中', 4: '完成'}
#负责人
executors = db_task.query_task_executor(nodeid, area_id)
if len(executors)<=0:
logging.error(' query_task_executor没有数据!')
executor_list = []
for executor in executors:
executor_list.append(executor['user_name'])
res['data']['executor'] = executor_list
#需求来源
srcs = db_task.query_task_src(nodeid, area_id)
if len(srcs)<=0:
logging.error(' query_task_src没有数据!')
src_list = []
for src in srcs:
src_list.append(src['src'])
res['data']['task_src'] = src_list
return json.dumps(res)
def do_query_task_list(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
#任务名称`task_name` varchar(300) DEFAULT NULL COMMENT '任务名称',
#任务类型`task_type` varchar(300) DEFAULT NULL COMMENT '任务类型',
#任务等级`task_class` varchar(300) DEFAULT NULL COMMENT '需求等级';
#计划开始时间`plan_begin_time` bigint DEFAULT NULL COMMENT '计划开始时间',
#计划结束时间`plan_end_time` bigint DEFAULT NULL COMMENT '计划结束时间',
#任务状态`task_state` int NOT NULL COMMENT '任务状态 0未开始1进行中2完成3挂起4废止',
#负责人`executor` varchar(300) DEFAULT NULL COMMENT '负责人',
#需求来源 `task_src` varchar(300) DEFAULT NULL COMMENT '需求来源';
all_task_list = db_task.query_all_tak_list(nodeid, area_id)
upload_file_dict = db_task.query_task_file(nodeid, area_id)
for task_info in all_task_list:
if task_info['taskno'] in upload_file_dict.keys():
task_info['download_url_list'] = upload_file_dict[task_info['taskno']]
else:
task_info['download_url_list'] = []
name_list, type_list, class_list, begin_list, end_list, state_list, executor_list, task_src_list = [], [], [], [], [], [], [], []
task_name = params.get('task_name')
task_type = params.get('task_type')
task_class = params.get('task_class')
plan_begin_time = params.get('plan_begin_time')
plan_end_time = params.get('plan_end_time')
task_state = params.get('task_state')
executor = params.get('executor')
task_src = params.get('task_src')
task_no = None
if task_name.isdigit():
task_no = task_name
else:
if task_name and any('\u4e00' <= c <= '\u9fff' for c in task_name):
# 判定关键字是否包含中文 如果是 则认为是关键字 进行模糊查询
for task_info in all_task_list:
if task_name in task_info['task_name']:
name_list.append(task_info)
elif task_name and not any('\u4e00' <= c <= '\u9fff' for c in task_name):
# 如果不存在中文 则认为是拼音或者首字母 进行模糊查询
for task_info in all_task_list:
if any('\u4e00' <= c <= '\u9fff' for c in task_info['task_name']):
task_name_pinyin = lazy_pinyin(task_info['task_name'], style=Style.NORMAL)
else:
task_name_pinyin = task_info['task_name']
if len(task_name) > 1 and task_name in ''.join(task_name_pinyin):
name_list.append(task_info)
continue
for i in range(len(task_name)):
if any('\u4e00' <= c <= '\u9fff' for c in task_info['task_name']) and task_name[i] == list(task_name_pinyin)[i][0] and task_info not in name_list:
name_list.append(task_info)
if task_type and len(task_type) > 0:
type_list.extend([task_info for task_info in all_task_list if int(task_type) == task_info['task_type']])
if len(type_list) == 0:
# 用于多种筛选条件取交集使用
type_list.append({'arteryids': -1})
if task_class and len(task_class) > 0:
class_list.extend([task_info for task_info in all_task_list if int(task_class) == task_info['task_class']])
if len(class_list) == 0:
# 用于多种筛选条件取交集使用
class_list.append({'arteryids': -1})
if plan_begin_time and len(plan_begin_time) > 0:
begin_list.extend([task_info for task_info in all_task_list if int(plan_begin_time) <= task_info['plan_begin_time']])
if len(begin_list) == 0:
# 用于多种筛选条件取交集使用
begin_list.append({'arteryids': -1})
if plan_end_time and len(plan_end_time) > 0:
end_list.extend([task_info for task_info in all_task_list if int(plan_end_time) >= task_info['plan_end_time']])
if len(end_list) == 0:
# 用于多种筛选条件取交集使用
end_list.append({'arteryids': -1})
if task_state and len(task_state) > 0:
state_list.extend([task_info for task_info in all_task_list if int(task_state) == task_info['task_state']])
if len(state_list) == 0:
# 用于多种筛选条件取交集使用
state_list.append({'arteryids': -1})
if executor and len(executor) > 0:
executor_list.extend([task_info for task_info in all_task_list if executor == task_info['executor']])
if len(executor_list) == 0:
# 用于多种筛选条件取交集使用
executor_list.append({'arteryids': -1})
if task_src and len(task_src) > 0:
task_src_list.extend([task_info for task_info in all_task_list if task_src == task_info['task_src']])
if len(task_src_list) == 0:
# 用于多种筛选条件取交集使用
task_src_list.append({'arteryids': -1})
non_empty_lists = [lst for lst in [name_list, type_list, class_list, begin_list, end_list, state_list, executor_list, task_src_list] if len(lst) > 0]
if non_empty_lists:
res_list = intersect_dicts(non_empty_lists)
else:
res_list = []
if task_no and len(task_no) > 0:
res_list = [task_info for task_info in all_task_list if int(task_no) == task_info['taskno']]
if not task_name and not task_type and not task_class and not plan_begin_time and not plan_end_time and not task_state and not executor and not task_src and not task_no:
res_list = all_task_list
# task_list = db_task.query_task_list(task_name,task_type,data_type, task_class,plan_begin_time,plan_end_time,publish_time,task_state,executor,task_src, nodeid)
filtered_list = [d for d in res_list if d['arteryids'] != -1]
cross_name_info = db_tmnet.query_cross_list4task(nodeid, area_id)
task_crosses_info = db_task.query_all_ledger_task_crosses_info(nodeid, area_id)
for task_info in filtered_list:
comment = task_info['comment']
if '_split_suffix_for_query_info' in comment:
comment_list = comment.split('_split_suffix_for_query_info')
comment_list.pop()
else:
comment_list = [comment]
task_info['comment'] = comment_list
if task_info['task_type_class'] == 1:
item_task_no = task_info['taskno']
ledger_task_add_info = query_ledger_task_additional_info(item_task_no, nodeid, area_id, cross_name_info, task_crosses_info[item_task_no])
task_info['entered_percent'] = 100 if task_info['task_state'] == 4 else ledger_task_add_info['entered_percent']
task_info['approve_percent'] = 100 if task_info['task_state'] == 4 else ledger_task_add_info['approve_percent']
sorted_list = sorted(filtered_list, key=sort_key)
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['area_id'] = area_id
res['data'] = sorted_list
res['desc'] = ''
return json.dumps(res)
def sort_key(item):
return (item['task_state'], item['progress'], -item['plan_end_time'])
def make_hashable(obj):
"""将对象转换为可哈希的类型"""
if isinstance(obj, dict):
return tuple(sorted((k, make_hashable(v)) for k, v in obj.items()))
elif isinstance(obj, list):
return tuple(make_hashable(v) for v in obj)
else:
return obj
def intersect_dicts(list_of_dicts):
"""计算多个字典列表的交集"""
if not list_of_dicts:
return []
# 将第一个列表中的字典转换为可哈希的元组
intersection = set(make_hashable(d) for d in list_of_dicts[0])
# 与后续的每个列表进行交集操作
for lst in list_of_dicts[1:]:
current_set = set(make_hashable(d) for d in lst)
intersection &= current_set
# 将交集中的元组转换回字典
return [dict(t) for t in intersection]
def do_remove_task(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
tasknos = check_param(params, 'tasknos')
if not tasknos or len(tasknos) < 1:
return json.dumps(make_common_res(2, '任务id缺失请检查后重试'))
count = db_task.delete_task(tasknos, nodeid, area_id)
if count != len(tasknos):
logging.error(str(params) + ' do_remove_task删除失败!')
res = make_common_res(-1, '删除失败')
return json.dumps(res)
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
return json.dumps(res)
def do_distribute_task(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
taskno = check_param(params, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id缺失请检查后重试'))
operator = check_param(params, 'operator')
if not operator:
return json.dumps(make_common_res(2, '操作员信息缺失,请检查后重试'))
task_old = db_task.query_task(taskno, nodeid, area_id)
if task_old is None:
logging.error(str(params) + ' do_distribute_task找不到对应的任务!')
return json.dumps(make_common_res(-1, '找不到对应的任务'))
#任务等级,0正常1紧急2特急
#任务状态 0待下发1待审批2进行中未审批3进行中4完成
if task_old['task_class'] == 2:
new_state = '1'
elif task_old['task_class'] == 1:
new_state = '2'
else:
new_state = '3'
count = db_task.update_task_state(nodeid, area_id, taskno, new_state)
if count!=1:
logging.error(str(params) + ' do_distribute_task更新任务状态失败!')
res = make_common_res(-1, '更新任务状态失败')
return json.dumps(res)
basemark = {'operation': '下发任务', 'content': {}}
count = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False))
if count!=1:
logging.error(str(params) + ' do_distribute_task添加任务履历失败!')
res = make_common_res(-1, '添加任务履历失败')
return json.dumps(res)
task_old_info = db_task.query_task(taskno, nodeid, area_id)
# 20241022 补充开发下发任务的同时修改任务基本信息的功能
modify_data, modify_item = gen_update_sql(params, task_old_info)
if len(modify_data) > 0:
ret = db_task.update_task_info(taskno, modify_data, nodeid, area_id)
if ret != 1:
return json.dumps(make_common_res(3, '任务基本信息修改失败'))
basemark = {'operation': '编辑任务', 'content': {'content': modify_item}}
his_ret = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False))
if his_ret != 1:
return json.dumps(make_common_res(3, '任务履历添加失败'))
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
return json.dumps(res)
def do_approval(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
taskno = check_param(params, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id缺失请检查后重试'))
operator = check_param(params, 'operator')
if not operator:
return json.dumps(make_common_res(2, '操作员信息缺失,请检查后重试'))
#task_old = db_task.query_task(taskno, nodeid)
# 任务等级,0正常1紧急2特急
# 任务状态 0待下发1待审批2进行中未审批3进行中4完成
new_state = '3'
count = db_task.update_task_state(nodeid, area_id, taskno, new_state)
if count!=1:
logging.error(str(params) + ' do_approval更新任务状态失败!')
res = make_common_res(-1, '更新任务状态失败')
return json.dumps(res)
basemark = {'operation': '审批任务', 'content': {}}
count = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False))
if count!=1:
logging.error(str(params) + ' do_approval添加任务履历失败!')
res = make_common_res(-1, '添加任务履历失败')
return json.dumps(res)
# 20241022 新增审批任务的同时修改任务部分基础属性的功能
task_old_info = db_task.query_task(taskno, nodeid, area_id)
modify_data, modify_item = gen_update_sql(params, task_old_info)
if len(modify_data) > 0:
ret = db_task.update_task_info(taskno, modify_data, nodeid, area_id)
if ret != 1:
return json.dumps(make_common_res(3, '任务基本信息修改失败'))
basemark = {'operation': '编辑任务', 'content': {'content': modify_item}}
his_ret = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False))
if his_ret != 1:
return json.dumps(make_common_res(3, '任务履历添加失败'))
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
return json.dumps(res)
def do_complete_task(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
taskno = check_param(params, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id缺失请检查后重试'))
operator = check_param(params, 'operator')
if not operator:
return json.dumps(make_common_res(2, '操作员信息缺失,请检查后重试'))
task_old = db_task.query_task(taskno, nodeid, area_id)
if not task_old:
return json.dumps(make_common_res(3, '找不到对应的任务'))
if task_old['task_state'] == '4':
return json.dumps(make_common_res(0, '当前任务已完成'))
# 任务等级,0正常1紧急2特急
# 任务状态 0待下发1待审批2进行中未审批3进行中4完成
new_state = '4'
count = db_task.update_task_state(nodeid, area_id, taskno, new_state)
if count!=1:
logging.error(str(params) + ' do_complete_task更新任务状态失败!')
res = make_common_res(-1, '更新任务状态失败')
return json.dumps(res)
basemark = {'operation': '完成任务', 'content': {}}
count = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False))
if count!=1:
logging.error(str(params) + ' do_complete_task添加任务履历失败!')
res = make_common_res(-1, '添加任务履历失败')
return json.dumps(res)
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
return json.dumps(res)
def do_add_task(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
task_type = check_param(params, 'task_type')
if not task_type:
return json.dumps(make_common_res(2, '任务类型缺失,请检查后重试'))
task_type_class = check_param(params, 'task_type_class')
if not task_type_class:
task_type_class = 0
task_type_class = int(task_type_class)
full_review = check_param(params, 'full_review')
if not full_review:
full_review = 0
full_review = int(full_review)
task_name = check_param(params, 'task_name')
if not task_name:
return json.dumps(make_common_res(2, '任务名称缺失,请检查后重试'))
task_class = check_param(params, 'task_class') # 需求等级,0低无需审批1中可事后审批2高审批后执行
if not task_class:
return json.dumps(make_common_res(2, '任务等级缺失,请检查后重试'))
data_type = check_param(params, 'data_type')
if not data_type:
return json.dumps(make_common_res(2, '关联路网状态缺失,请检查后重试'))
crossids, arteryids, sectionids = '', '', ''
if int(data_type) == 1:
crossids = check_param(params, 'crossids')
if not crossids:
return json.dumps(make_common_res(2, '当前所选关联路网状态为:关联路口,但关联路口信息缺失,请检查后重试'))
elif int(data_type) == 2:
arteryids = check_param(params, 'arteryids')
if not arteryids:
return json.dumps(make_common_res(2, '当前所选关联路网状态为:关联干线,但关联干线信息缺失,请检查后重试'))
add_type = check_param(params, 'add_type')
if not add_type or add_type == 'normal':
plan_begin_time = check_param(params, 'plan_begin_time')
if not plan_begin_time:
return json.dumps(make_common_res(2, '计划开始时间缺失,请检查后重试'))
plan_end_time = check_param(params, 'plan_end_time')
if not plan_end_time:
return json.dumps(make_common_res(2, '计划结束时间缺失,请检查后重试'))
else:
plan_begin_time = 0
plan_end_time = 0
task_src = check_param(params, 'task_src')
if not task_src:
return json.dumps(make_common_res(2, '需求来源缺失,请检查后重试'))
executor = check_param(params, 'executor')
if not executor:
return json.dumps(make_common_res(2, '任务负责人缺失,请检查后重试'))
publish_time = check_param(params, 'publish_time')
if not publish_time:
return json.dumps(make_common_res(2, '发布时间缺失,请检查后重试'))
publish_time = int(publish_time)
timestamp = publish_time
description = check_param(params, 'description')
if not description:
return json.dumps(make_common_res(2, '任务描述缺失,请检查后重试'))
comment = check_param(params, 'comment')
if not comment:
comment = ''
operator = check_param(params, 'operator')
if not operator:
return json.dumps(make_common_res(2, '操作员信息缺失,请检查后重试'))
creatorid = operator
progress, record_task_state, task_state = 0, 0, 0
if int(task_class) == 0:
task_state = 3
elif int(task_class) == 1:
task_state = 2
elif int(task_class) == 2:
task_state = 1
if comment and add_type and add_type != 'normal':
comment = "【推荐策略】" + comment + "_split_suffix_for_query_info"
if comment and (not add_type or add_type == 'normal'):
comment = "" + datetime.now().date().strftime('%Y-%m-%d') + "" + comment + "_split_suffix_for_query_info"
if add_type and add_type != 'normal':
task_state = 0
count = db_task.add_task(timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time, publish_time,
executor, progress, task_state, description, crossids, sectionids, arteryids, comment,
record_task_state, task_src, task_class, nodeid, area_id, task_type_class, full_review)
if count != 1:
logging.error(str(params) + ' 添加任务报错!')
res = make_common_res(-1, '添加任务报错。')
return json.dumps(res)
basemark = {'operation': '创建任务', 'content': {}}
taskno = db_task.get_task_no(nodeid, area_id, task_name, task_type, task_class, data_type, plan_begin_time, plan_end_time, executor, task_src, comment)
if taskno is None:
logging.error(str(params) + ' do_add_task更新任务状态失败!')
res = make_common_res(-1, '更新任务状态失败')
return json.dumps(res)
if add_type and add_type != 'normal':
basemark = {'operation': '诊断问题确认', 'content': {}}
count = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False))
if count != 1:
logging.error(str(params) + ' do_add_task添加任务履历失败!')
res = make_common_res(-1, '添加任务履历失败')
return json.dumps(res)
if task_type_class == 1:
values = []
crossid_list = crossids.split(',')
for crossid in crossid_list:
values.append((taskno, crossid, nodeid, area_id))
ret = db_task.insert_ledger_task_cross(values, crossid_list, taskno, nodeid)
if not ret:
logging.error(str(params) + ' do_add_task添加任务关联路口失败!')
res = make_common_res(-1, '添加任务关联路口失败, 请反馈该情况至管理员')
return json.dumps(res)
# 更新配时方案状态信息函数调用
ledger_update_res, e = LedgerTaskDetailPhaseState(int(nodeid), crossid_list)
if e:
logging.error(e)
return json.dumps(make_common_res(2, '台账任务创建成功,但路口列表配时方案状态更新失败,请反馈该情况至管理员!'))
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
res['taskno'] = taskno
return json.dumps(res)
#获取所有具有'已完成'任务的路口
#params
#nodeid
def do_query_completed_task_cross_list(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
task_list = db_task.query_completed_task_cross_list(nodeid, area_id)
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['data'] = task_list
res['desc'] = ''
return json.dumps(res)
#获取指定路口的所有的'已完成'任务
#params
#nodeid
#crossid
def do_query_completed_task_by_cross(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(2, '缺少crossid 请刷新后重试'))
task_list = db_task.query_completed_task_by_cross(crossid, nodeid, area_id)
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['data'] = task_list
res['desc'] = ''
return json.dumps(res)
def do_update_task_progress_field(task_old, params, filed_name, filed_desc, taskno):
field = params.get(filed_name)
basemarkinfo = ''
if field is None:
return basemarkinfo
old_field = task_old[filed_name]
if type(task_old[filed_name]) is int and type(field) is str:
old_field = str(old_field)
if field != old_field:
count = db_task.do_update('`task`', filed_name, field, 'taskno', taskno)
if count != 1:
logging.error(str(params) + ' do_update_task_progress_field更新任务状态失败!')
res = make_common_res(-1, '更新任务状态失败')
return basemarkinfo
if filed_name == 'progress' and old_field != field:
basemarkinfo = '【编辑】任务进度:%s%%->%s%%' %(old_field, field)
return basemarkinfo
# 无调用
def do_update_task_field(task_old, params, filed_name, filed_desc, taskno):
field = params.get(filed_name)
basemarkinfo = ''
if field is None:
return basemarkinfo
old_field = task_old[filed_name]
if type(task_old[filed_name]) is int and type(field) is str:
old_field = str(old_field)
if field != old_field:
basemarkinfo = '%s ' %(filed_desc)
count = db_task.do_update('`task`', filed_name, field, 'taskno', taskno)
if count != 1:
logging.error(str(params) + ' do_update_task_progress_field更新任务状态失败!')
res = make_common_res(-1, '更新任务状态失败')
return json.dumps(res)
return basemarkinfo
def do_update_task(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
taskno = check_param(params, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id缺失请检查后重试'))
operator = check_param(params, 'operator')
if not operator:
return json.dumps(make_common_res(3, '操作者信息缺失,请检查后重试'))
task_old = db_task.query_task(taskno, nodeid, area_id)
if task_old is None:
logging.error(str(params) + ' do_update_task 任务在数据库中不存在!')
res = make_common_res(-1, '任务在数据库中不存在。')
return json.dumps(res)
task_old_info = db_task.query_task(taskno, nodeid, area_id)
update_sql, basemarkinfo = gen_update_sql(params, task_old_info)
# print(update_sql, basemarkinfo)
if len(update_sql) > 0:
ret = db_task.update_task_info(taskno, update_sql, nodeid, area_id)
if ret != 1:
return json.dumps(make_common_res(3, '任务基本信息修改失败'))
progress_info = do_update_task_progress_field(task_old, params, 'progress', '任务进度', taskno)
if len(progress_info) > 0:
basemark = {
'operation': '编辑任务',
'content': {
'progress_info': progress_info,
'content': basemarkinfo
}
}
else:
basemark = {
'operation': '编辑任务',
'content': {
'content': basemarkinfo
}
}
if (len(basemarkinfo) > 0 and basemarkinfo != '') or len(progress_info) > 0:
ret = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False))
if ret != 1:
return json.dumps(make_common_res(3, '任务履历信息添加失败'))
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
return json.dumps(res)
def allowed_file_for_task(filename):
# 检查文件名是否为空,并且文件扩展名是否在允许的列表中
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSION_TASK
def do_task_upload(params):
nodeid = check_param({'nodeid': request.form.get('nodeid')}, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(3, "nodeid检查报错"))
area_id = check_param({'area_id': request.form.get('area_id')}, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, "area_id检查报错"))
taskno = check_param({'taskno': request.form.get('taskno')}, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id异常请检查后重试'))
userid = check_param({'userid': request.form.get('userid')}, 'userid')
if not userid:
return json.dumps(make_common_res(4, '用户id缺失请检查后重试'))
if len(request.files.keys()) < 1:
return json.dumps(make_common_res(1, '文件不存在'))
suc_file_num, suc_mysql_num = 0, 0
for key in request.files.keys():
file = request.files[key]
try:
if not (file and file.filename and allowed_file_for_task(file.filename)):
return json.dumps(make_common_res(1, '文件格式错误'))
file_name = file.filename
cos_key = '/user/cross_doctor/task_file/%s' % ('task_no_' + str(taskno) + '_' + file_name)
file_stream = file.stream
cos_client = get_client()
cos_client.put_object(Bucket=g_cos_bucket, Key=cos_key, Body=file_stream)
suc_file_num += 1
download_url = f'{g_cos_root}{cos_key}'
ret = db_task.insert_upload_file_record(taskno, nodeid, download_url, area_id)
if ret == 1:
suc_mysql_num += 1
else:
break
except Exception as e:
logging.error(str(e) + '任务文件上传失败')
break
if suc_file_num == suc_mysql_num == len(request.files.keys()):
res = make_common_res(0, 'ok')
res['nodeid'] = ''
res['desc'] = ''
return json.dumps(res)
else:
return json.dumps(make_common_res(4, '任务文件上传失败'))
def do_task_file_download(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
taskno = check_param(params, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id异常请检查后重试'))
file_name = check_param(params, 'file_name')
if not file_name:
return json.dumps(make_common_res(2, '文件名异常,请检查后重试'))
current_dir_path = os.path.dirname(os.path.abspath(__file__))
file_path = f"{current_dir_path}/static/task_file/{nodeid}/task_no_{taskno}/{file_name}"
if not os.path.isfile(file_path):
logging.error(f"文件不存在: {file_path}")
return json.dumps(make_common_res(2, '文件不存在,请检查后重试'))
return send_file(file_path, as_attachment=True)
def do_query_task_history(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
taskno = check_param(params, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id异常请检查后重试'))
task_history_list = db_task.query_task_history(taskno, nodeid, area_id)
download_url_list = db_task.query_task_file_by_taskno(nodeid, taskno, area_id)
for task_history in task_history_list:
task_history['content'] = task_history['content'].replace('_split_suffix_for_query_info', '')
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
res['data'] = {
'task_history_list': task_history_list,
'download_url_list': download_url_list
}
return json.dumps(res)
def del_task_file_api(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
taskno = check_param(params, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id异常请检查后重试'))
file_id = check_param(params, 'file_id')
if not file_id:
return json.dumps(make_common_res(2, '文件id异常请检查后重试'))
ret = db_task.del_task_file(nodeid, taskno, file_id)
if ret == 1:
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
return json.dumps(res)
else:
return json.dumps(make_common_res(4, '删除失败'))
def do_query_task_progress_history(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
# taskno = params.get('taskno')
yesterday_date = check_param(params, 'yesterday_date')
task_history_list = db_task.query_task_progress_history(yesterday_date, nodeid, area_id)
if not yesterday_date or yesterday_date == '':
return json.dumps(make_common_res(3, "查询日期缺失"))
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
res['data'] = task_history_list
return json.dumps(res)
def do_query_task_detail(params):
global db_task
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
res = make_common_res(0, 'ok')
taskno = check_param(params, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id缺失请检查后重试'))
task = db_task.query_task(taskno, nodeid, area_id)
if task is None:
logging.error(str(params) + ' do_query_task_detail 任务在数据库中不存在!')
res = make_common_res(-1, '任务在数据库中不存在。')
return json.dumps(res)
if task['task_type_class'] == 1:
ledger_task_additional_info = query_ledger_task_additional_info(taskno, nodeid, area_id)
task['ledger_task_additional_info'] = ledger_task_additional_info
if task['task_state'] == 4:
task['ledger_task_additional_info']['entered_percent'] = 100
task['ledger_task_additional_info']['approve_percent'] = 100
task.pop('update_time')
res['desc'] = ''
res['data'] = task
return json.dumps(res)
def gen_update_sql(params, task_old_info):
modify_data, modify_item = '', ''
task_name = check_param(params, 'task_name')
if task_name and task_name != task_old_info['task_name']:
modify_data += f"task_name = '{task_name}'"
modify_item += f"【编辑】任务名称:{task_old_info['task_name']} -> {task_name}"
task_type = check_param(params, 'task_type')
if task_type and int(task_type) != task_old_info['task_type']:
prefix = ''
if modify_data != '':
prefix = 'prefix_for_split_'
modify_data += f", task_type = {int(task_type)}"
else:
modify_data += f'task_type = {int(task_type)}'
modify_item += prefix
modify_item += f"【编辑】任务类型:{task_old_info['task_type']} -> {task_type} "
task_class = check_param(params, 'task_class')
if task_class and int(task_class) != task_old_info['task_class']:
prefix = ''
if modify_data != '':
prefix = 'prefix_for_split_'
modify_data += f', task_class = {int(task_class)}'
else:
modify_data += f'task_class = {int(task_class)}'
modify_item += prefix
modify_item += f"【编辑】任务等级:{task_old_info['task_class']} -> {task_class} "
data_type = check_param(params, 'data_type')
if data_type and int(data_type) != task_old_info['data_type']:
prefix = ''
if modify_data != '':
prefix = 'prefix_for_split_'
modify_data += f', data_type = {int(data_type)}'
else:
modify_data += f'data_type = {int(data_type)}'
if int(data_type) == 1:
crossids = check_param(params, 'crossids')
if crossids and crossids != '' and crossids != task_old_info['crossids']:
modify_data += f", crossids = '{crossids}'"
elif int(data_type) == 2:
arteryids = check_param(params, 'arteryids')
if arteryids and arteryids != '' and arteryids != task_old_info['arteryids']:
modify_data += f", arteryids = '{arteryids}'"
modify_item += prefix
modify_item += f"【编辑】关联路网信息:{task_old_info['data_type']} -> {data_type}"
executor = check_param(params, 'executor')
if executor and executor != task_old_info['executor']:
prefix = ''
if modify_data != '':
prefix = 'prefix_for_split_'
modify_data += f", executor = '{executor}'"
else:
modify_data += f"executor = '{executor}'"
modify_item += prefix
modify_item += f"【编辑】负责人:{task_old_info['executor']} -> {executor} "
task_src = check_param(params, 'task_src')
if task_src and task_src != task_old_info['task_src']:
prefix = ''
if modify_data != '':
prefix = 'prefix_for_split_'
modify_data += f", task_src = '{task_src}'"
else:
modify_data += f"task_src = '{task_src}'"
modify_item += prefix
modify_item += f"【编辑】任务来源:{task_old_info['task_src']} -> {task_src} "
description = check_param(params, 'description')
if description and description != task_old_info['description']:
prefix = ''
if modify_data != '':
prefix = 'prefix_for_split_'
modify_data += f", description = '{description}'"
else:
modify_data += f"description = '{description}'"
modify_item += prefix
modify_item += f"【编辑】任务描述:{task_old_info['description']} -> {description}"
comment = check_param(params, 'comment')
if comment and comment != '':
new_comment = task_old_info['comment'] + "" + datetime.now().date().strftime('%Y-%m-%d') + "" + comment + "_split_suffix_for_query_info"
prefix = ''
if modify_data != '':
prefix = 'prefix_for_split_'
modify_data += f", comment = '{new_comment}'"
else:
modify_data += f"comment = '{new_comment}'"
modify_item += prefix
modify_item += f"【添加】任务备注:{comment}"
return modify_data, modify_item
# 新增台账管理路口类别任务 圈选路口 返回所有可用路口列表
def get_undistributed_cross_list(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
all_area_cross_list = db_tmnet.query_cross_list4task(nodeid, area_id)
# 查询出所有已经分配过的路口列表
task_crosses = db_task.query_ledger_task_list(nodeid, area_id)
distributed_cross_list = [row['crossid'] for row in task_crosses]
row_list = [cross for cross in all_area_cross_list if cross['crossid'] not in distributed_cross_list]
distributed_cross_info_list = [cross for cross in all_area_cross_list if cross['crossid'] in distributed_cross_list]
res = make_common_res(0, 'ok')
res['data'] = {
'cross_list': row_list,
'distributed_cross_list': distributed_cross_info_list
}
return json.dumps(res)
# 查询台账路口任务详情-> 路口详细列表、完成情况占比
def query_ledger_task_additional_info(task_no, nodeid, area_id, cross_name_info, task_crosses_info):
task_cross_num = len(task_crosses_info)
entered_cross_num = len([cross for cross in task_crosses_info if cross['ledger_status'] == 2 and cross['phase_status'] in (1, 3)])
entered_percent = int(entered_cross_num * 100 / task_cross_num) if task_cross_num > 0 else 0
approve_cross_num = len([cross for cross in task_crosses_info if cross['submit_status'] == 2])
approve_percent = int(approve_cross_num * 100 / task_cross_num) if task_cross_num > 0 else 0
cross_name_dict = {row['crossid']: row['name'] for row in cross_name_info}
cross_info_dict = {}
this_week_entering_cross_num, this_week_approve_cross_num = 0, 0
for row in task_crosses_info:
row['name'] = cross_name_dict[row['crossid']]
ledger_time_info, phase_time_info, approve_time_info = get_year_week(row['ledger_status_update_time']), get_year_week(row['phase_status_update_time']), get_year_week(row['approver_time'])
now_time_info = get_year_week(datetime.now())
if row['ledger_status'] == 2 and row['phase_status'] == 1 and ledger_time_info == now_time_info and phase_time_info == now_time_info:
this_week_entering_cross_num += 1
if row['submit_status'] == 2 and approve_time_info == now_time_info:
this_week_approve_cross_num += 1
for item in ['ledger_status_update_time', 'phase_status_update_time', 'approver_time', 'create_time', 'update_time']:
if row[item]:
row[item] = row[item].strftime('%Y-%m-%d %H:%M:%S')
cross_info_dict[row['crossid']] = row
cross_info_dict[row['crossid']]['high_pic_num'] = 0
cross_info_dict[row['crossid']]['ground_pic_num'] = 0
cross_pics = db_task.query_ledger_task_crosses_pics(list(cross_info_dict.keys()))
for row in cross_pics:
crossid = row['crossid']
if row['pic_type'] == 1:
# 高空图最大数量为2
cross_info_dict[crossid]['high_pic_num'] += 1
else:
# 地面图最大数量为10
cross_info_dict[crossid]['ground_pic_num'] += 1
ledger_task_additional_info = {
'entered_percent': entered_percent,
'approve_percent': approve_percent,
'this_week_entering_cross_num': this_week_entering_cross_num,
'this_week_approve_cross_num': this_week_approve_cross_num,
'cross_list': list(cross_info_dict.values())
}
return ledger_task_additional_info
# 重新分配台账任务路口
def do_redistributed_cross_list(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
old_task_no = check_param(params, 'old_task_no')
if not old_task_no:
return json.dumps(make_common_res(6, '缺少任务编号, 请刷新后重试'))
crossid_list = check_param(params, 'crossid_list')
if not crossid_list or len(crossid_list) < 1:
return json.dumps(make_common_res(7, '缺少路口列表, 请刷新后重试'))
ret = db_task.drop_old_task_cross(crossid_list, old_task_no, nodeid, area_id)
if not ret:
return json.dumps(make_common_res(8, '删除旧任务路口失败,请检查后重试'))
return json.dumps(make_common_res(0, 'ok'))
# 操作台账任务路口状态
def do_op_ledger_task_cross_status(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
task_no = check_param(params, 'task_no')
if not task_no:
return json.dumps(make_common_res(6, '缺少任务编号, 请刷新后重试'))
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(7, '缺少路口编号, 请刷新后重试'))
status = check_param(params, 'status')
if not status:
return json.dumps(make_common_res(8, '缺少状态信息'))
status = int(status)
record = db_task.query_ledger_task_cross_record(task_no, crossid)
if not record:
return json.dumps(make_common_res(8, '路口信息异常,请检查后重试'))
if record['phase_status'] != 1 and record['ledger_status'] != 2:
return json.dumps(make_common_res(9, '路口当前未完成信息录入,请确认路口状态信息'))
if status in (2, 3):
approver = check_param(params, 'approver')
if not approver:
return json.dumps(make_common_res(10, '缺少审核人信息'))
approver_id = check_param(params, 'approver_id')
if not approver_id:
return json.dumps(make_common_res(11, '缺少审核人id信息'))
now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
ret = db_task.approval_ledger_task_cross(task_no, crossid, status, approver, approver_id, now_time)
else:
# 提交状态字段 submit 0未提交 1待审核 2审核通过 3审核未通过
ret = db_task.update_ledger_task_cross_record(task_no, crossid, 'submit_status', status)
if ret:
return json.dumps(make_common_res(0, 'ok'))
return json.dumps(make_common_res(10, '更新路口提交状态失败,请检查后重试'))
# 批量上传路口图片
def do_batch_upload_cross_pics(params):
nodeid = check_param({'nodeid': request.form.get('nodeid')}, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param({'area_id': request.form.get('area_id')}, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param({'userid': request.form.get('userid')}, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
task_no = check_param({'task_no': request.form.get('task_no')}, 'task_no')
if not task_no:
return json.dumps(make_common_res(6, '缺少任务编号, 请刷新后重试'))
file_type = check_param({'file_type': request.form.get('file_type')}, 'file_type')
if not file_type:
file_type = 1
# file_type 1: 高空图片 2地面图片
file_type = int(file_type)
if len(request.files.keys()) < 1:
return json.dumps(make_common_res(7, '缺少文件列表, 请刷新后重试'))
task_cross_list = db_task.query_ledger_task_crosses(task_no)
crossid_list = [row['crossid'] for row in task_cross_list]
cross_info_list = db_tmnet.query_cross_infos(crossid_list)
cross_info_dict = {row['name']: row['crossid'] for row in cross_info_list}
checker = RegexSingleMatcher(cross_info_dict)
fail_list, suc_dict, records = [], {}, []
cross_file_dict = {}
cos_client = get_client()
folder_manager = CosFolderManager(cos_client, g_cos_bucket)
for key in request.files.keys():
file_info = request.files[key]
name = file_info.filename
if not '.' in name and name.rsplit('.', 1)[1].lower() in ALLOWED_CROSS_PIC_FILE_TYPE:
continue
find_res = checker.find(name)
if find_res:
# 如果找到了路口id说明文件名称匹配到了路口将该文件按照需求上传到cos目录上的路口文件夹下,同时写表记录文件路径
file_stream = file_info.stream
crossid = cross_info_dict[find_res]
cos_path = f'user/ledger/{nodeid}/{area_id}/{crossid}'
folder_manager.ensure_folder(cos_path)
cos_key = f'{cos_path}/{name}'
cos_client.put_object(Bucket=g_cos_bucket, Key=cos_key, Body=file_stream)
suc_dict[name] = find_res
download_url = f'{g_cos_root}/{cos_key}'
if crossid not in cross_file_dict.keys():
cross_file_dict[crossid] = []
cross_file_dict[crossid].append((crossid, file_type, download_url))
else:
fail_list.append(name)
continue
for crossid in cross_file_dict.keys():
his_pics = db_tmnet.query_cross_pics(crossid, file_type)
if file_type == 2:
if len(his_pics) + len(cross_file_dict[crossid]) > 10:
db_tmnet.del_cross_pic(crossid, file_type, len(his_pics) + len(cross_file_dict[crossid]) - 10)
elif file_type == 1:
if len(his_pics) + len(cross_file_dict[crossid]) > 2:
db_tmnet.del_cross_pic(crossid, file_type, len(his_pics) + len(cross_file_dict[crossid]) - 2)
records.extend(cross_file_dict[crossid])
# 批量写入路口图片记录
if records:
ret = db_tmnet.batch_insert_cross_pic(records)
else:
ret = 0
if ret == len(records):
res = make_common_res(0, 'ok')
res['data'] = {
'total': len(request.files.keys()),
'suc_info': suc_dict,
'fail_list': fail_list
}
return json.dumps(res)
else:
return json.dumps(make_common_res(8, '批量上传路口图片失败'))
class RegexSingleMatcher:
def __init__(self, key_dict):
self.key_dict = key_dict
# 转义并编译正则
keys = [re.escape(k) for k in key_dict.keys() if k]
if keys:
# 正则 search 默认就是找到第一个就停止
self.pattern = re.compile("|".join(keys))
else:
self.pattern = None
def find(self, target_string):
if not self.pattern:
return None
match = self.pattern.search(target_string)
if match:
return match.group() # 返回匹配到的具体字符串
return None
def get_year_week(time_input):
"""
支持 datetime 对象、时间戳整数、时间字符串
返回格式:'2023-40'
"""
if not time_input:
return None
# 使用 ISO 标准获取年周
year, week, _ = time_input.isocalendar()
return f"{year}-{week:02d}"