cross_doctor/app/task_worker.py

1042 lines
46 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2025/11/10 17:56
# @Description:
import json
from flask import request, send_file
from pypinyin import lazy_pinyin, Style
from app.common_worker import *
ALLOWED_EXTENSION_TASK = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'doc', 'docx', 'xlsx', 'xls', 'pptx', 'ppt'}
def do_query_task_list_parameter(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(6, '用户信息异常'))
res = make_common_res(0, 'ok')
res['data'] = {}
#任务类型
task_types = db_task.query_task_type(nodeid, area_id)
task_types_res = {}
for task_type in task_types:
task_types_res[task_type['task_type_no']] = task_type['task_type']
res['data']['task_type'] = task_types_res
# 路网(数据)类型
res['data']['data_type'] = {0: '不关联', 1: '路口任务', 2: '干线协调任务'}
#任务等级 0低无需审批1中可事后审批2高审批后执行
res['data']['task_class'] = {0: '低(无需审批)', 1: '中(可事后审批)', 2: '高(审批后执行)'}
#任务状态
res['data']['task_state'] = {0:'待下发', 1: '待审批', 2: '进行中未审批', 3: '进行中', 4: '完成'}
#负责人
executors = db_task.query_task_executor(nodeid, area_id)
if len(executors)<=0:
logging.error(' query_task_executor没有数据!')
executor_list = []
for executor in executors:
executor_list.append(executor['user_name'])
res['data']['executor'] = executor_list
#需求来源
srcs = db_task.query_task_src(nodeid, area_id)
if len(srcs)<=0:
logging.error(' query_task_src没有数据!')
src_list = []
for src in srcs:
src_list.append(src['src'])
res['data']['task_src'] = src_list
return json.dumps(res)
def do_query_task_list(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(6, '用户信息异常'))
#任务名称`task_name` varchar(300) DEFAULT NULL COMMENT '任务名称',
#任务类型`task_type` varchar(300) DEFAULT NULL COMMENT '任务类型',
#任务等级`task_class` varchar(300) DEFAULT NULL COMMENT '需求等级';
#计划开始时间`plan_begin_time` bigint DEFAULT NULL COMMENT '计划开始时间',
#计划结束时间`plan_end_time` bigint DEFAULT NULL COMMENT '计划结束时间',
#任务状态`task_state` int NOT NULL COMMENT '任务状态 0未开始1进行中2完成3挂起4废止',
#负责人`executor` varchar(300) DEFAULT NULL COMMENT '负责人',
#需求来源 `task_src` varchar(300) DEFAULT NULL COMMENT '需求来源';
all_task_list = db_task.query_all_tak_list(nodeid, area_id)
upload_file_dict = db_task.query_task_file(nodeid, area_id)
for task_info in all_task_list:
if task_info['taskno'] in upload_file_dict.keys():
task_info['download_url_list'] = upload_file_dict[task_info['taskno']]
else:
task_info['download_url_list'] = []
name_list, type_list, class_list, begin_list, end_list, state_list, executor_list, task_src_list = [], [], [], [], [], [], [], []
task_name = params.get('task_name')
task_type = params.get('task_type')
task_class = params.get('task_class')
plan_begin_time = params.get('plan_begin_time')
plan_end_time = params.get('plan_end_time')
task_state = params.get('task_state')
executor = params.get('executor')
task_src = params.get('task_src')
task_no = None
if task_name.isdigit():
task_no = task_name
else:
if task_name and any('\u4e00' <= c <= '\u9fff' for c in task_name):
# 判定关键字是否包含中文 如果是 则认为是关键字 进行模糊查询
for task_info in all_task_list:
if task_name in task_info['task_name']:
name_list.append(task_info)
elif task_name and not any('\u4e00' <= c <= '\u9fff' for c in task_name):
# 如果不存在中文 则认为是拼音或者首字母 进行模糊查询
for task_info in all_task_list:
if any('\u4e00' <= c <= '\u9fff' for c in task_info['task_name']):
task_name_pinyin = lazy_pinyin(task_info['task_name'], style=Style.NORMAL)
else:
task_name_pinyin = task_info['task_name']
if len(task_name) > 1 and task_name in ''.join(task_name_pinyin):
name_list.append(task_info)
continue
for i in range(len(task_name)):
if any('\u4e00' <= c <= '\u9fff' for c in task_info['task_name']) and task_name[i] == list(task_name_pinyin)[i][0] and task_info not in name_list:
name_list.append(task_info)
if task_type and len(task_type) > 0:
type_list.extend([task_info for task_info in all_task_list if int(task_type) == task_info['task_type']])
if len(type_list) == 0:
# 用于多种筛选条件取交集使用
type_list.append({'arteryids': -1})
if task_class and len(task_class) > 0:
class_list.extend([task_info for task_info in all_task_list if int(task_class) == task_info['task_class']])
if len(class_list) == 0:
# 用于多种筛选条件取交集使用
class_list.append({'arteryids': -1})
if plan_begin_time and len(plan_begin_time) > 0:
begin_list.extend([task_info for task_info in all_task_list if int(plan_begin_time) <= task_info['plan_begin_time']])
if len(begin_list) == 0:
# 用于多种筛选条件取交集使用
begin_list.append({'arteryids': -1})
if plan_end_time and len(plan_end_time) > 0:
end_list.extend([task_info for task_info in all_task_list if int(plan_end_time) >= task_info['plan_end_time']])
if len(end_list) == 0:
# 用于多种筛选条件取交集使用
end_list.append({'arteryids': -1})
if task_state and len(task_state) > 0:
state_list.extend([task_info for task_info in all_task_list if int(task_state) == task_info['task_state']])
if len(state_list) == 0:
# 用于多种筛选条件取交集使用
state_list.append({'arteryids': -1})
if executor and len(executor) > 0:
executor_list.extend([task_info for task_info in all_task_list if executor == task_info['executor']])
if len(executor_list) == 0:
# 用于多种筛选条件取交集使用
executor_list.append({'arteryids': -1})
if task_src and len(task_src) > 0:
task_src_list.extend([task_info for task_info in all_task_list if task_src == task_info['task_src']])
if len(task_src_list) == 0:
# 用于多种筛选条件取交集使用
task_src_list.append({'arteryids': -1})
non_empty_lists = [lst for lst in [name_list, type_list, class_list, begin_list, end_list, state_list, executor_list, task_src_list] if len(lst) > 0]
if non_empty_lists:
res_list = intersect_dicts(non_empty_lists)
else:
res_list = []
if task_no and len(task_no) > 0:
res_list = [task_info for task_info in all_task_list if int(task_no) == task_info['taskno']]
if not task_name and not task_type and not task_class and not plan_begin_time and not plan_end_time and not task_state and not executor and not task_src and not task_no:
res_list = all_task_list
# task_list = db_task.query_task_list(task_name,task_type,data_type, task_class,plan_begin_time,plan_end_time,publish_time,task_state,executor,task_src, nodeid)
filtered_list = [d for d in res_list if d['arteryids'] != -1]
for task_info in filtered_list:
comment = task_info['comment']
if '_split_suffix_for_query_info' in comment:
comment_list = comment.split('_split_suffix_for_query_info')
comment_list.pop()
else:
comment_list = [comment]
task_info['comment'] = comment_list
sorted_list = sorted(filtered_list, key=sort_key)
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['area_id'] = area_id
res['data'] = sorted_list
res['desc'] = ''
return json.dumps(res)
def sort_key(item):
return (item['task_state'], item['progress'], item['plan_end_time'])
def make_hashable(obj):
"""将对象转换为可哈希的类型"""
if isinstance(obj, dict):
return tuple(sorted((k, make_hashable(v)) for k, v in obj.items()))
elif isinstance(obj, list):
return tuple(make_hashable(v) for v in obj)
else:
return obj
def intersect_dicts(list_of_dicts):
"""计算多个字典列表的交集"""
if not list_of_dicts:
return []
# 将第一个列表中的字典转换为可哈希的元组
intersection = set(make_hashable(d) for d in list_of_dicts[0])
# 与后续的每个列表进行交集操作
for lst in list_of_dicts[1:]:
current_set = set(make_hashable(d) for d in lst)
intersection &= current_set
# 将交集中的元组转换回字典
return [dict(t) for t in intersection]
def do_remove_task(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(6, '用户信息异常'))
tasknos = check_param(params, 'tasknos')
if not tasknos or len(tasknos) < 1:
return json.dumps(make_common_res(2, '任务id缺失请检查后重试'))
count = db_task.delete_task(tasknos, nodeid)
if count != len(tasknos):
logging.error(str(params) + ' do_remove_task删除失败!')
res = make_common_res(-1, '删除失败')
return json.dumps(res)
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
return json.dumps(res)
def do_distribute_task(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(6, '用户信息异常'))
taskno = check_param(params, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id缺失请检查后重试'))
operator = check_param(params, 'operator')
if not operator:
return json.dumps(make_common_res(2, '操作员信息缺失,请检查后重试'))
task_old = db_task.query_task(taskno, nodeid, area_id)
if task_old is None:
logging.error(str(params) + ' do_distribute_task找不到对应的任务!')
return json.dumps(make_common_res(-1, '找不到对应的任务'))
#任务等级,0正常1紧急2特急
#任务状态 0待下发1待审批2进行中未审批3进行中4完成
if task_old['task_class'] == 2:
new_state = '1'
elif task_old['task_class'] == 1:
new_state = '2'
else:
new_state = '3'
count = db_task.update_task_state(nodeid, area_id, taskno, new_state)
if count!=1:
logging.error(str(params) + ' do_distribute_task更新任务状态失败!')
res = make_common_res(-1, '更新任务状态失败')
return json.dumps(res)
basemark = {'operation': '下发任务', 'content': {}}
count = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False))
if count!=1:
logging.error(str(params) + ' do_distribute_task添加任务履历失败!')
res = make_common_res(-1, '添加任务履历失败')
return json.dumps(res)
task_old_info = db_task.query_task(taskno, nodeid, area_id)
# 20241022 补充开发下发任务的同时修改任务基本信息的功能
modify_data, modify_item = gen_update_sql(params, task_old_info)
if len(modify_data) > 0:
ret = db_task.update_task_info(taskno, modify_data, nodeid, area_id)
if ret != 1:
return json.dumps(make_common_res(3, '任务基本信息修改失败'))
basemark = {'operation': '编辑任务', 'content': {'content': modify_item}}
his_ret = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False))
if his_ret != 1:
return json.dumps(make_common_res(3, '任务履历添加失败'))
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
return json.dumps(res)
def do_approval(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(6, '用户信息异常'))
taskno = check_param(params, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id缺失请检查后重试'))
operator = check_param(params, 'operator')
if not operator:
return json.dumps(make_common_res(2, '操作员信息缺失,请检查后重试'))
#task_old = db_task.query_task(taskno, nodeid)
# 任务等级,0正常1紧急2特急
# 任务状态 0待下发1待审批2进行中未审批3进行中4完成
new_state = '3'
count = db_task.update_task_state(nodeid, area_id, taskno, new_state)
if count!=1:
logging.error(str(params) + ' do_approval更新任务状态失败!')
res = make_common_res(-1, '更新任务状态失败')
return json.dumps(res)
basemark = {'operation': '审批任务', 'content': {}}
count = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False))
if count!=1:
logging.error(str(params) + ' do_approval添加任务履历失败!')
res = make_common_res(-1, '添加任务履历失败')
return json.dumps(res)
# 20241022 新增审批任务的同时修改任务部分基础属性的功能
task_old_info = db_task.query_task(taskno, nodeid, area_id)
modify_data, modify_item = gen_update_sql(params, task_old_info)
if len(modify_data) > 0:
ret = db_task.update_task_info(taskno, modify_data, nodeid, area_id)
if ret != 1:
return json.dumps(make_common_res(3, '任务基本信息修改失败'))
basemark = {'operation': '编辑任务', 'content': {'content': modify_item}}
his_ret = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False))
if his_ret != 1:
return json.dumps(make_common_res(3, '任务履历添加失败'))
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
return json.dumps(res)
def do_complete_task(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(6, '用户信息异常'))
taskno = check_param(params, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id缺失请检查后重试'))
operator = check_param(params, 'operator')
if not operator:
return json.dumps(make_common_res(2, '操作员信息缺失,请检查后重试'))
task_old = db_task.query_task(taskno, nodeid, area_id)
if not task_old:
return json.dumps(make_common_res(3, '找不到对应的任务'))
if task_old['task_state'] == '4':
return json.dumps(make_common_res(0, '当前任务已完成'))
# 任务等级,0正常1紧急2特急
# 任务状态 0待下发1待审批2进行中未审批3进行中4完成
new_state = '4'
count = db_task.update_task_state(nodeid, area_id, taskno, new_state)
if count!=1:
logging.error(str(params) + ' do_complete_task更新任务状态失败!')
res = make_common_res(-1, '更新任务状态失败')
return json.dumps(res)
basemark = {'operation': '完成任务', 'content': {}}
count = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False))
if count!=1:
logging.error(str(params) + ' do_complete_task添加任务履历失败!')
res = make_common_res(-1, '添加任务履历失败')
return json.dumps(res)
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
return json.dumps(res)
def do_add_task(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(6, '用户信息异常'))
task_type = check_param(params, 'task_type')
if not task_type:
return json.dumps(make_common_res(2, '任务类型缺失,请检查后重试'))
# if int(task_type) not in (1, 2, 3, 4, 5):
# return json.dumps(make_common_res(2, '任务类型参数异常,请检查后重试'))
task_name = check_param(params, 'task_name')
if not task_name:
return json.dumps(make_common_res(2, '任务名称缺失,请检查后重试'))
task_class = check_param(params, 'task_class') # 需求等级,0低无需审批1中可事后审批2高审批后执行
if not task_class:
return json.dumps(make_common_res(2, '任务等级缺失,请检查后重试'))
data_type = check_param(params, 'data_type')
if not data_type:
return json.dumps(make_common_res(2, '关联路网状态缺失,请检查后重试'))
crossids, arteryids, sectionids = '', '', ''
if int(data_type) == 1:
crossids = check_param(params, 'crossids')
if not crossids:
return json.dumps(make_common_res(2, '当前所选关联路网状态为:关联路口,但关联路口信息缺失,请检查后重试'))
elif int(data_type) == 2:
arteryids = check_param(params, 'arteryids')
if not arteryids:
return json.dumps(make_common_res(2, '当前所选关联路网状态为:关联干线,但关联干线信息缺失,请检查后重试'))
add_type = check_param(params, 'add_type')
if not add_type or add_type == 'normal':
plan_begin_time = check_param(params, 'plan_begin_time')
if not plan_begin_time:
return json.dumps(make_common_res(2, '计划开始时间缺失,请检查后重试'))
plan_end_time = check_param(params, 'plan_end_time')
if not plan_end_time:
return json.dumps(make_common_res(2, '计划结束时间缺失,请检查后重试'))
else:
plan_begin_time = 0
plan_end_time = 0
task_src = check_param(params, 'task_src')
if not task_src:
return json.dumps(make_common_res(2, '需求来源缺失,请检查后重试'))
# if task_src not in ('舆情', '交警交办任务', '现场拥堵', '平台优化策略'):
# return json.dumps(make_common_res(2, '需求来源异常,可选项为:舆情、交警交办任务、现场拥堵、平台优化策略,请检查后重试'))
executor = check_param(params, 'executor')
if not executor:
return json.dumps(make_common_res(2, '任务负责人缺失,请检查后重试'))
publish_time = check_param(params, 'publish_time')
if not publish_time:
return json.dumps(make_common_res(2, '发布时间缺失,请检查后重试'))
publish_time = int(publish_time)
timestamp = publish_time
description = check_param(params, 'description')
if not description:
return json.dumps(make_common_res(2, '任务描述缺失,请检查后重试'))
comment = check_param(params, 'comment')
if not comment:
comment = ''
operator = check_param(params, 'operator')
if not operator:
return json.dumps(make_common_res(2, '操作员信息缺失,请检查后重试'))
creatorid = operator
progress, record_task_state, task_state = 0, 0, 0
if int(task_class) == 0:
task_state = 3
elif int(task_class) == 1:
task_state = 2
elif int(task_class) == 2:
task_state = 1
if comment and add_type and add_type != 'normal':
comment = "【推荐策略】" + comment + "_split_suffix_for_query_info"
if comment and (not add_type or add_type == 'normal'):
comment = "" + datetime.now().date().strftime('%Y-%m-%d') + "" + comment + "_split_suffix_for_query_info"
if add_type and add_type != 'normal':
task_state = 0
count = db_task.add_task(timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time, publish_time,
executor, progress, task_state, description, crossids, sectionids, arteryids, comment,
record_task_state, task_src, task_class, nodeid, area_id)
if count != 1:
logging.error(str(params) + ' 添加任务报错!')
res = make_common_res(-1, '添加任务报错。')
return json.dumps(res)
basemark = {'operation': '创建任务', 'content': {}}
taskno = db_task.get_task_no(nodeid, area_id, task_name, task_type, task_class, data_type, plan_begin_time, plan_end_time, executor, task_src, comment)
if taskno is None:
logging.error(str(params) + ' do_add_task更新任务状态失败!')
res = make_common_res(-1, '更新任务状态失败')
return json.dumps(res)
if add_type and add_type != 'normal':
basemark = {'operation': '诊断问题确认', 'content': {}}
count = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False))
if count != 1:
logging.error(str(params) + ' do_add_task添加任务履历失败!')
res = make_common_res(-1, '添加任务履历失败')
return json.dumps(res)
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
res['taskno'] = taskno
return json.dumps(res)
#获取所有具有'已完成'任务的路口
#params
#nodeid
def do_query_completed_task_cross_list(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(6, '用户信息异常'))
task_list = db_task.query_completed_task_cross_list(nodeid, area_id)
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['data'] = task_list
res['desc'] = ''
return json.dumps(res)
#获取指定路口的所有的'已完成'任务
#params
#nodeid
#crossid
def do_query_completed_task_by_cross(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(6, '用户信息异常'))
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(2, '缺少crossid 请刷新后重试'))
task_list = db_task.query_completed_task_by_cross(crossid, nodeid, area_id)
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['data'] = task_list
res['desc'] = ''
return json.dumps(res)
def do_update_task_progress_field(task_old, params, filed_name, filed_desc, taskno):
field = params.get(filed_name)
basemarkinfo = ''
if field is None:
return basemarkinfo
old_field = task_old[filed_name]
if type(task_old[filed_name]) is int and type(field) is str:
old_field = str(old_field)
if field != old_field:
count = db_task.do_update('`task`', filed_name, field, 'taskno', taskno)
if count != 1:
logging.error(str(params) + ' do_update_task_progress_field更新任务状态失败!')
res = make_common_res(-1, '更新任务状态失败')
return basemarkinfo
if filed_name == 'progress' and old_field != field:
basemarkinfo = '【编辑】任务进度:%s%%->%s%%' %(old_field, field)
return basemarkinfo
# 无调用
def do_update_task_field(task_old, params, filed_name, filed_desc, taskno):
field = params.get(filed_name)
basemarkinfo = ''
if field is None:
return basemarkinfo
old_field = task_old[filed_name]
if type(task_old[filed_name]) is int and type(field) is str:
old_field = str(old_field)
if field != old_field:
basemarkinfo = '%s ' %(filed_desc)
count = db_task.do_update('`task`', filed_name, field, 'taskno', taskno)
if count != 1:
logging.error(str(params) + ' do_update_task_progress_field更新任务状态失败!')
res = make_common_res(-1, '更新任务状态失败')
return json.dumps(res)
return basemarkinfo
def do_update_task(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(6, '用户信息异常'))
taskno = check_param(params, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id缺失请检查后重试'))
operator = check_param(params, 'operator')
if not operator:
return json.dumps(make_common_res(3, '操作者信息缺失,请检查后重试'))
task_old = db_task.query_task(taskno, nodeid, area_id)
if task_old is None:
logging.error(str(params) + ' do_update_task 任务在数据库中不存在!')
res = make_common_res(-1, '任务在数据库中不存在。')
return json.dumps(res)
task_old_info = db_task.query_task(taskno, nodeid, area_id)
update_sql, basemarkinfo = gen_update_sql(params, task_old_info)
# print(update_sql, basemarkinfo)
if len(update_sql) > 0:
ret = db_task.update_task_info(taskno, update_sql, nodeid, area_id)
if ret != 1:
return json.dumps(make_common_res(3, '任务基本信息修改失败'))
progress_info = do_update_task_progress_field(task_old, params, 'progress', '任务进度', taskno)
if len(progress_info) > 0:
basemark = {
'operation': '编辑任务',
'content': {
'progress_info': progress_info,
'content': basemarkinfo
}
}
else:
basemark = {
'operation': '编辑任务',
'content': {
'content': basemarkinfo
}
}
if (len(basemarkinfo) > 0 and basemarkinfo != '') or len(progress_info) > 0:
ret = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False))
if ret != 1:
return json.dumps(make_common_res(3, '任务履历信息添加失败'))
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
return json.dumps(res)
def allowed_file_for_task(filename):
# 检查文件名是否为空,并且文件扩展名是否在允许的列表中
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSION_TASK
def do_task_upload(params):
nodeid = check_param({'nodeid': request.form.get('nodeid')}, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(3, "nodeid检查报错"))
taskno = check_param({'taskno': request.form.get('taskno')}, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id异常请检查后重试'))
if len(request.files.keys()) < 1:
return json.dumps(make_common_res(1, '文件不存在'))
suc_file_num, suc_mysql_num = 0, 0
for key in request.files.keys():
file = request.files[key]
try:
if not (file and file.filename and allowed_file_for_task(file.filename)):
return json.dumps(make_common_res(1, '文件格式错误'))
if not os.path.exists(f'./app/static/task_file/{nodeid}/task_no_{taskno}'):
os.makedirs(f'./app/static/task_file/{nodeid}/task_no_{taskno}')
os.chmod(f'./app/static/task_file/{nodeid}/task_no_{taskno}', 0o777)
file.save(f'./app/static/task_file/{nodeid}/task_no_{taskno}/{file.filename}')
suc_file_num += 1
download_url = f'/api/download_task_file?nodeid={nodeid}&taskno={taskno}&file_name={file.filename}'
ret = db_task.insert_upload_file_record(taskno, nodeid, download_url)
if ret == 1:
suc_mysql_num += 1
else:
break
except Exception as e:
logging.error(str(e) + '任务文件上传失败')
if os.path.exists(f'./app/static/task_file/{nodeid}/task_no_{taskno}/{file.filename}'):
os.remove(f'./app/static/task_file/{nodeid}/task_no_{taskno}/{file.filename}')
logging.info('回滚上传操作')
break
if suc_file_num == suc_mysql_num == len(request.files.keys()):
res = make_common_res(0, 'ok')
res['nodeid'] = ''
res['desc'] = ''
return json.dumps(res)
else:
return json.dumps(make_common_res(4, '任务文件上传失败'))
def do_task_file_download(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(6, '用户信息异常'))
taskno = check_param(params, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id异常请检查后重试'))
file_name = check_param(params, 'file_name')
if not file_name:
return json.dumps(make_common_res(2, '文件名异常,请检查后重试'))
current_dir_path = os.path.dirname(os.path.abspath(__file__))
file_path = f"{current_dir_path}/static/task_file/{nodeid}/task_no_{taskno}/{file_name}"
if not os.path.isfile(file_path):
logging.error(f"文件不存在: {file_path}")
return json.dumps(make_common_res(2, '文件不存在,请检查后重试'))
return send_file(file_path, as_attachment=True)
def do_query_task_history(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(6, '用户信息异常'))
taskno = check_param(params, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id异常请检查后重试'))
task_history_list = db_task.query_task_history(taskno, nodeid, area_id)
download_url_list = db_task.query_task_file_by_taskno(nodeid, taskno, area_id)
for task_history in task_history_list:
task_history['content'] = task_history['content'].replace('_split_suffix_for_query_info', '')
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
res['data'] = {
'task_history_list': task_history_list,
'download_url_list': download_url_list
}
return json.dumps(res)
def del_task_file_api(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(6, '用户信息异常'))
taskno = check_param(params, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id异常请检查后重试'))
file_name = check_param(params, 'file_name')
if not file_name:
return json.dumps(make_common_res(2, '文件名异常,请检查后重试'))
file_id = check_param(params, 'file_id')
if not file_id:
return json.dumps(make_common_res(2, '文件id异常请检查后重试'))
ret = db_task.del_task_file(nodeid, taskno, file_name, file_id)
if ret == 1:
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
return json.dumps(res)
else:
return json.dumps(make_common_res(4, '删除失败'))
def do_query_task_progress_history(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(6, '用户信息异常'))
# taskno = params.get('taskno')
yesterday_date = check_param(params, 'yesterday_date')
task_history_list = db_task.query_task_progress_history(yesterday_date, nodeid, area_id)
if not yesterday_date or yesterday_date == '':
return json.dumps(make_common_res(3, "查询日期缺失"))
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
res['data'] = task_history_list
return json.dumps(res)
def do_query_task_detail(params):
global db_task
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(6, '用户信息异常'))
res = make_common_res(0, 'ok')
taskno = check_param(params, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id缺失请检查后重试'))
task = db_task.query_task(taskno, nodeid)
if task is None:
logging.error(str(params) + ' do_query_task_detail 任务在数据库中不存在!')
res = make_common_res(-1, '任务在数据库中不存在。')
return json.dumps(res)
res['desc'] = ''
res['data'] = task
return json.dumps(res)
def gen_update_sql(params, task_old_info):
modify_data, modify_item = '', ''
task_name = check_param(params, 'task_name')
if task_name and task_name != task_old_info['task_name']:
modify_data += f"task_name = '{task_name}'"
modify_item += f"【编辑】任务名称:{task_old_info['task_name']} -> {task_name}"
task_type = check_param(params, 'task_type')
if task_type and int(task_type) != task_old_info['task_type']:
prefix = ''
if modify_data != '':
prefix = 'prefix_for_split_'
modify_data += f", task_type = {int(task_type)}"
else:
modify_data += f'task_type = {int(task_type)}'
modify_item += prefix
modify_item += f"【编辑】任务类型:{task_old_info['task_type']} -> {task_type} "
task_class = check_param(params, 'task_class')
if task_class and int(task_class) != task_old_info['task_class']:
prefix = ''
if modify_data != '':
prefix = 'prefix_for_split_'
modify_data += f', task_class = {int(task_class)}'
else:
modify_data += f'task_class = {int(task_class)}'
modify_item += prefix
modify_item += f"【编辑】任务等级:{task_old_info['task_class']} -> {task_class} "
data_type = check_param(params, 'data_type')
if data_type and int(data_type) != task_old_info['data_type']:
prefix = ''
if modify_data != '':
prefix = 'prefix_for_split_'
modify_data += f', data_type = {int(data_type)}'
else:
modify_data += f'data_type = {int(data_type)}'
if int(data_type) == 1:
crossids = check_param(params, 'crossids')
if crossids and crossids != '' and crossids != task_old_info['crossids']:
modify_data += f", crossids = '{crossids}'"
elif int(data_type) == 2:
arteryids = check_param(params, 'arteryids')
if arteryids and arteryids != '' and arteryids != task_old_info['arteryids']:
modify_data += f", arteryids = '{arteryids}'"
modify_item += prefix
modify_item += f"【编辑】关联路网信息:{task_old_info['data_type']} -> {data_type}"
# plan_begin_time = check_param(params, 'plan_begin_time')
# if plan_begin_time and plan_begin_time != task_old_info['plan_begin_time']:
# if modify_data != '':
# modify_data += f', plan_begin_time = {int(plan_begin_time)}'
# else:
# modify_data += f'plan_begin_time = {int(plan_begin_time)}'
# modify_item += ' 计划开始时间 '
# plan_end_time = check_param(params, 'plan_end_time')
# if plan_end_time and plan_end_time != task_old_info['plan_end_time']:
# if modify_data != '':
# modify_data += f', plan_end_time = {int(plan_end_time)}'
# else:
# modify_data += f'plan_end_time = {int(plan_end_time)}'
# modify_item += ' 计划结束时间 '
executor = check_param(params, 'executor')
if executor and executor != task_old_info['executor']:
prefix = ''
if modify_data != '':
prefix = 'prefix_for_split_'
modify_data += f", executor = '{executor}'"
else:
modify_data += f"executor = '{executor}'"
modify_item += prefix
modify_item += f"【编辑】负责人:{task_old_info['executor']} -> {executor} "
task_src = check_param(params, 'task_src')
if task_src and task_src != task_old_info['task_src']:
prefix = ''
if modify_data != '':
prefix = 'prefix_for_split_'
modify_data += f", task_src = '{task_src}'"
else:
modify_data += f"task_src = '{task_src}'"
modify_item += prefix
modify_item += f"【编辑】任务来源:{task_old_info['task_src']} -> {task_src} "
description = check_param(params, 'description')
if description and description != task_old_info['description']:
prefix = ''
if modify_data != '':
prefix = 'prefix_for_split_'
modify_data += f", description = '{description}'"
else:
modify_data += f"description = '{description}'"
modify_item += prefix
modify_item += f"【编辑】任务描述:{task_old_info['description']} -> {description}"
comment = check_param(params, 'comment')
if comment and comment != '':
new_comment = task_old_info['comment'] + "" + datetime.now().date().strftime('%Y-%m-%d') + "" + comment + "_split_suffix_for_query_info"
prefix = ''
if modify_data != '':
prefix = 'prefix_for_split_'
modify_data += f", comment = '{new_comment}'"
else:
modify_data += f"comment = '{new_comment}'"
modify_item += prefix
modify_item += f"【添加】任务备注:{comment}"
return modify_data, modify_item