cross_doctor/app/task_worker.py

1043 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2025/11/10 17:56
# @Description:
import json
from app.global_source import db_user, db_task
from flask import request, send_file
from pypinyin import lazy_pinyin, Style
from app.common_worker import *
ALLOWED_EXTENSION_TASK = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'doc', 'docx', 'xlsx', 'xls', 'pptx', 'ppt'}
def do_query_task_list_parameter(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not area_id.lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
res = make_common_res(0, 'ok')
res['data'] = {}
#任务类型
task_types = db_task.query_task_type(nodeid, area_id)
task_types_res = {}
for task_type in task_types:
task_types_res[task_type['task_type_no']] = task_type['task_type']
res['data']['task_type'] = task_types_res
# 路网(数据)类型
res['data']['data_type'] = {0: '不关联', 1: '路口任务', 2: '干线协调任务'}
#任务等级 0低无需审批1中可事后审批2高审批后执行
res['data']['task_class'] = {0: '低(无需审批)', 1: '中(可事后审批)', 2: '高(审批后执行)'}
#任务状态
res['data']['task_state'] = {0:'待下发', 1: '待审批', 2: '进行中未审批', 3: '进行中', 4: '完成'}
#负责人
executors = db_task.query_task_executor(nodeid, area_id)
if len(executors)<=0:
logging.error(' query_task_executor没有数据!')
executor_list = []
for executor in executors:
executor_list.append(executor['user_name'])
res['data']['executor'] = executor_list
#需求来源
srcs = db_task.query_task_src(nodeid, area_id)
if len(srcs)<=0:
logging.error(' query_task_src没有数据!')
src_list = []
for src in srcs:
src_list.append(src['src'])
res['data']['task_src'] = src_list
return json.dumps(res)
def do_query_task_list(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not area_id.lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
#任务名称`task_name` varchar(300) DEFAULT NULL COMMENT '任务名称',
#任务类型`task_type` varchar(300) DEFAULT NULL COMMENT '任务类型',
#任务等级`task_class` varchar(300) DEFAULT NULL COMMENT '需求等级';
#计划开始时间`plan_begin_time` bigint DEFAULT NULL COMMENT '计划开始时间',
#计划结束时间`plan_end_time` bigint DEFAULT NULL COMMENT '计划结束时间',
#任务状态`task_state` int NOT NULL COMMENT '任务状态 0未开始1进行中2完成3挂起4废止',
#负责人`executor` varchar(300) DEFAULT NULL COMMENT '负责人',
#需求来源 `task_src` varchar(300) DEFAULT NULL COMMENT '需求来源';
all_task_list = db_task.query_all_tak_list(nodeid, area_id)
upload_file_dict = db_task.query_task_file(nodeid, area_id)
for task_info in all_task_list:
if task_info['taskno'] in upload_file_dict.keys():
task_info['download_url_list'] = upload_file_dict[task_info['taskno']]
else:
task_info['download_url_list'] = []
name_list, type_list, class_list, begin_list, end_list, state_list, executor_list, task_src_list = [], [], [], [], [], [], [], []
task_name = params.get('task_name')
task_type = params.get('task_type')
task_class = params.get('task_class')
plan_begin_time = params.get('plan_begin_time')
plan_end_time = params.get('plan_end_time')
task_state = params.get('task_state')
executor = params.get('executor')
task_src = params.get('task_src')
task_no = None
if task_name.isdigit():
task_no = task_name
else:
if task_name and any('\u4e00' <= c <= '\u9fff' for c in task_name):
# 判定关键字是否包含中文 如果是 则认为是关键字 进行模糊查询
for task_info in all_task_list:
if task_name in task_info['task_name']:
name_list.append(task_info)
elif task_name and not any('\u4e00' <= c <= '\u9fff' for c in task_name):
# 如果不存在中文 则认为是拼音或者首字母 进行模糊查询
for task_info in all_task_list:
if any('\u4e00' <= c <= '\u9fff' for c in task_info['task_name']):
task_name_pinyin = lazy_pinyin(task_info['task_name'], style=Style.NORMAL)
else:
task_name_pinyin = task_info['task_name']
if len(task_name) > 1 and task_name in ''.join(task_name_pinyin):
name_list.append(task_info)
continue
for i in range(len(task_name)):
if any('\u4e00' <= c <= '\u9fff' for c in task_info['task_name']) and task_name[i] == list(task_name_pinyin)[i][0] and task_info not in name_list:
name_list.append(task_info)
if task_type and len(task_type) > 0:
type_list.extend([task_info for task_info in all_task_list if int(task_type) == task_info['task_type']])
if len(type_list) == 0:
# 用于多种筛选条件取交集使用
type_list.append({'arteryids': -1})
if task_class and len(task_class) > 0:
class_list.extend([task_info for task_info in all_task_list if int(task_class) == task_info['task_class']])
if len(class_list) == 0:
# 用于多种筛选条件取交集使用
class_list.append({'arteryids': -1})
if plan_begin_time and len(plan_begin_time) > 0:
begin_list.extend([task_info for task_info in all_task_list if int(plan_begin_time) <= task_info['plan_begin_time']])
if len(begin_list) == 0:
# 用于多种筛选条件取交集使用
begin_list.append({'arteryids': -1})
if plan_end_time and len(plan_end_time) > 0:
end_list.extend([task_info for task_info in all_task_list if int(plan_end_time) >= task_info['plan_end_time']])
if len(end_list) == 0:
# 用于多种筛选条件取交集使用
end_list.append({'arteryids': -1})
if task_state and len(task_state) > 0:
state_list.extend([task_info for task_info in all_task_list if int(task_state) == task_info['task_state']])
if len(state_list) == 0:
# 用于多种筛选条件取交集使用
state_list.append({'arteryids': -1})
if executor and len(executor) > 0:
executor_list.extend([task_info for task_info in all_task_list if executor == task_info['executor']])
if len(executor_list) == 0:
# 用于多种筛选条件取交集使用
executor_list.append({'arteryids': -1})
if task_src and len(task_src) > 0:
task_src_list.extend([task_info for task_info in all_task_list if task_src == task_info['task_src']])
if len(task_src_list) == 0:
# 用于多种筛选条件取交集使用
task_src_list.append({'arteryids': -1})
non_empty_lists = [lst for lst in [name_list, type_list, class_list, begin_list, end_list, state_list, executor_list, task_src_list] if len(lst) > 0]
if non_empty_lists:
res_list = intersect_dicts(non_empty_lists)
else:
res_list = []
if task_no and len(task_no) > 0:
res_list = [task_info for task_info in all_task_list if int(task_no) == task_info['taskno']]
if not task_name and not task_type and not task_class and not plan_begin_time and not plan_end_time and not task_state and not executor and not task_src and not task_no:
res_list = all_task_list
# task_list = db_task.query_task_list(task_name,task_type,data_type, task_class,plan_begin_time,plan_end_time,publish_time,task_state,executor,task_src, nodeid)
filtered_list = [d for d in res_list if d['arteryids'] != -1]
for task_info in filtered_list:
comment = task_info['comment']
if '_split_suffix_for_query_info' in comment:
comment_list = comment.split('_split_suffix_for_query_info')
comment_list.pop()
else:
comment_list = [comment]
task_info['comment'] = comment_list
sorted_list = sorted(filtered_list, key=sort_key)
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['area_id'] = area_id
res['data'] = sorted_list
res['desc'] = ''
return json.dumps(res)
def sort_key(item):
return (item['task_state'], item['progress'], item['plan_end_time'])
def make_hashable(obj):
"""将对象转换为可哈希的类型"""
if isinstance(obj, dict):
return tuple(sorted((k, make_hashable(v)) for k, v in obj.items()))
elif isinstance(obj, list):
return tuple(make_hashable(v) for v in obj)
else:
return obj
def intersect_dicts(list_of_dicts):
"""计算多个字典列表的交集"""
if not list_of_dicts:
return []
# 将第一个列表中的字典转换为可哈希的元组
intersection = set(make_hashable(d) for d in list_of_dicts[0])
# 与后续的每个列表进行交集操作
for lst in list_of_dicts[1:]:
current_set = set(make_hashable(d) for d in lst)
intersection &= current_set
# 将交集中的元组转换回字典
return [dict(t) for t in intersection]
def do_remove_task(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not area_id.lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
tasknos = check_param(params, 'tasknos')
if not tasknos or len(tasknos) < 1:
return json.dumps(make_common_res(2, '任务id缺失请检查后重试'))
count = db_task.delete_task(tasknos, nodeid)
if count != len(tasknos):
logging.error(str(params) + ' do_remove_task删除失败!')
res = make_common_res(-1, '删除失败')
return json.dumps(res)
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
return json.dumps(res)
def do_distribute_task(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not area_id.lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
taskno = check_param(params, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id缺失请检查后重试'))
operator = check_param(params, 'operator')
if not operator:
return json.dumps(make_common_res(2, '操作员信息缺失,请检查后重试'))
task_old = db_task.query_task(taskno, nodeid, area_id)
if task_old is None:
logging.error(str(params) + ' do_distribute_task找不到对应的任务!')
return json.dumps(make_common_res(-1, '找不到对应的任务'))
#任务等级,0正常1紧急2特急
#任务状态 0待下发1待审批2进行中未审批3进行中4完成
if task_old['task_class'] == 2:
new_state = '1'
elif task_old['task_class'] == 1:
new_state = '2'
else:
new_state = '3'
count = db_task.update_task_state(nodeid, area_id, taskno, new_state)
if count!=1:
logging.error(str(params) + ' do_distribute_task更新任务状态失败!')
res = make_common_res(-1, '更新任务状态失败')
return json.dumps(res)
basemark = {'operation': '下发任务', 'content': {}}
count = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False))
if count!=1:
logging.error(str(params) + ' do_distribute_task添加任务履历失败!')
res = make_common_res(-1, '添加任务履历失败')
return json.dumps(res)
task_old_info = db_task.query_task(taskno, nodeid, area_id)
# 20241022 补充开发下发任务的同时修改任务基本信息的功能
modify_data, modify_item = gen_update_sql(params, task_old_info)
if len(modify_data) > 0:
ret = db_task.update_task_info(taskno, modify_data, nodeid, area_id)
if ret != 1:
return json.dumps(make_common_res(3, '任务基本信息修改失败'))
basemark = {'operation': '编辑任务', 'content': {'content': modify_item}}
his_ret = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False))
if his_ret != 1:
return json.dumps(make_common_res(3, '任务履历添加失败'))
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
return json.dumps(res)
def do_approval(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not area_id.lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
taskno = check_param(params, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id缺失请检查后重试'))
operator = check_param(params, 'operator')
if not operator:
return json.dumps(make_common_res(2, '操作员信息缺失,请检查后重试'))
#task_old = db_task.query_task(taskno, nodeid)
# 任务等级,0正常1紧急2特急
# 任务状态 0待下发1待审批2进行中未审批3进行中4完成
new_state = '3'
count = db_task.update_task_state(nodeid, area_id, taskno, new_state)
if count!=1:
logging.error(str(params) + ' do_approval更新任务状态失败!')
res = make_common_res(-1, '更新任务状态失败')
return json.dumps(res)
basemark = {'operation': '审批任务', 'content': {}}
count = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False))
if count!=1:
logging.error(str(params) + ' do_approval添加任务履历失败!')
res = make_common_res(-1, '添加任务履历失败')
return json.dumps(res)
# 20241022 新增审批任务的同时修改任务部分基础属性的功能
task_old_info = db_task.query_task(taskno, nodeid, area_id)
modify_data, modify_item = gen_update_sql(params, task_old_info)
if len(modify_data) > 0:
ret = db_task.update_task_info(taskno, modify_data, nodeid, area_id)
if ret != 1:
return json.dumps(make_common_res(3, '任务基本信息修改失败'))
basemark = {'operation': '编辑任务', 'content': {'content': modify_item}}
his_ret = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False))
if his_ret != 1:
return json.dumps(make_common_res(3, '任务履历添加失败'))
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
return json.dumps(res)
def do_complete_task(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not area_id.lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
taskno = check_param(params, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id缺失请检查后重试'))
operator = check_param(params, 'operator')
if not operator:
return json.dumps(make_common_res(2, '操作员信息缺失,请检查后重试'))
task_old = db_task.query_task(taskno, nodeid, area_id)
if not task_old:
return json.dumps(make_common_res(3, '找不到对应的任务'))
if task_old['task_state'] == '4':
return json.dumps(make_common_res(0, '当前任务已完成'))
# 任务等级,0正常1紧急2特急
# 任务状态 0待下发1待审批2进行中未审批3进行中4完成
new_state = '4'
count = db_task.update_task_state(nodeid, area_id, taskno, new_state)
if count!=1:
logging.error(str(params) + ' do_complete_task更新任务状态失败!')
res = make_common_res(-1, '更新任务状态失败')
return json.dumps(res)
basemark = {'operation': '完成任务', 'content': {}}
count = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False))
if count!=1:
logging.error(str(params) + ' do_complete_task添加任务履历失败!')
res = make_common_res(-1, '添加任务履历失败')
return json.dumps(res)
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
return json.dumps(res)
def do_add_task(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not area_id.lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
task_type = check_param(params, 'task_type')
if not task_type:
return json.dumps(make_common_res(2, '任务类型缺失,请检查后重试'))
# if int(task_type) not in (1, 2, 3, 4, 5):
# return json.dumps(make_common_res(2, '任务类型参数异常,请检查后重试'))
task_name = check_param(params, 'task_name')
if not task_name:
return json.dumps(make_common_res(2, '任务名称缺失,请检查后重试'))
task_class = check_param(params, 'task_class') # 需求等级,0低无需审批1中可事后审批2高审批后执行
if not task_class:
return json.dumps(make_common_res(2, '任务等级缺失,请检查后重试'))
data_type = check_param(params, 'data_type')
if not data_type:
return json.dumps(make_common_res(2, '关联路网状态缺失,请检查后重试'))
crossids, arteryids, sectionids = '', '', ''
if int(data_type) == 1:
crossids = check_param(params, 'crossids')
if not crossids:
return json.dumps(make_common_res(2, '当前所选关联路网状态为:关联路口,但关联路口信息缺失,请检查后重试'))
elif int(data_type) == 2:
arteryids = check_param(params, 'arteryids')
if not arteryids:
return json.dumps(make_common_res(2, '当前所选关联路网状态为:关联干线,但关联干线信息缺失,请检查后重试'))
add_type = check_param(params, 'add_type')
if not add_type or add_type == 'normal':
plan_begin_time = check_param(params, 'plan_begin_time')
if not plan_begin_time:
return json.dumps(make_common_res(2, '计划开始时间缺失,请检查后重试'))
plan_end_time = check_param(params, 'plan_end_time')
if not plan_end_time:
return json.dumps(make_common_res(2, '计划结束时间缺失,请检查后重试'))
else:
plan_begin_time = 0
plan_end_time = 0
task_src = check_param(params, 'task_src')
if not task_src:
return json.dumps(make_common_res(2, '需求来源缺失,请检查后重试'))
# if task_src not in ('舆情', '交警交办任务', '现场拥堵', '平台优化策略'):
# return json.dumps(make_common_res(2, '需求来源异常,可选项为:舆情、交警交办任务、现场拥堵、平台优化策略,请检查后重试'))
executor = check_param(params, 'executor')
if not executor:
return json.dumps(make_common_res(2, '任务负责人缺失,请检查后重试'))
publish_time = check_param(params, 'publish_time')
if not publish_time:
return json.dumps(make_common_res(2, '发布时间缺失,请检查后重试'))
publish_time = int(publish_time)
timestamp = publish_time
description = check_param(params, 'description')
if not description:
return json.dumps(make_common_res(2, '任务描述缺失,请检查后重试'))
comment = check_param(params, 'comment')
if not comment:
comment = ''
operator = check_param(params, 'operator')
if not operator:
return json.dumps(make_common_res(2, '操作员信息缺失,请检查后重试'))
creatorid = operator
progress, record_task_state, task_state = 0, 0, 0
if int(task_class) == 0:
task_state = 3
elif int(task_class) == 1:
task_state = 2
elif int(task_class) == 2:
task_state = 1
if comment and add_type and add_type != 'normal':
comment = "【推荐策略】" + comment + "_split_suffix_for_query_info"
if comment and (not add_type or add_type == 'normal'):
comment = "" + datetime.now().date().strftime('%Y-%m-%d') + "" + comment + "_split_suffix_for_query_info"
if add_type and add_type != 'normal':
task_state = 0
count = db_task.add_task(timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time, publish_time,
executor, progress, task_state, description, crossids, sectionids, arteryids, comment,
record_task_state, task_src, task_class, nodeid, area_id)
if count != 1:
logging.error(str(params) + ' 添加任务报错!')
res = make_common_res(-1, '添加任务报错。')
return json.dumps(res)
basemark = {'operation': '创建任务', 'content': {}}
taskno = db_task.get_task_no(nodeid, area_id, task_name, task_type, task_class, data_type, plan_begin_time, plan_end_time, executor, task_src, comment)
if taskno is None:
logging.error(str(params) + ' do_add_task更新任务状态失败!')
res = make_common_res(-1, '更新任务状态失败')
return json.dumps(res)
if add_type and add_type != 'normal':
basemark = {'operation': '诊断问题确认', 'content': {}}
count = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False))
if count != 1:
logging.error(str(params) + ' do_add_task添加任务履历失败!')
res = make_common_res(-1, '添加任务履历失败')
return json.dumps(res)
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
res['taskno'] = taskno
return json.dumps(res)
#获取所有具有'已完成'任务的路口
#params
#nodeid
def do_query_completed_task_cross_list(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not area_id.lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
task_list = db_task.query_completed_task_cross_list(nodeid, area_id)
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['data'] = task_list
res['desc'] = ''
return json.dumps(res)
#获取指定路口的所有的'已完成'任务
#params
#nodeid
#crossid
def do_query_completed_task_by_cross(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not area_id.lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(2, '缺少crossid 请刷新后重试'))
task_list = db_task.query_completed_task_by_cross(crossid, nodeid, area_id)
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['data'] = task_list
res['desc'] = ''
return json.dumps(res)
def do_update_task_progress_field(task_old, params, filed_name, filed_desc, taskno):
field = params.get(filed_name)
basemarkinfo = ''
if field is None:
return basemarkinfo
old_field = task_old[filed_name]
if type(task_old[filed_name]) is int and type(field) is str:
old_field = str(old_field)
if field != old_field:
count = db_task.do_update('`task`', filed_name, field, 'taskno', taskno)
if count != 1:
logging.error(str(params) + ' do_update_task_progress_field更新任务状态失败!')
res = make_common_res(-1, '更新任务状态失败')
return basemarkinfo
if filed_name == 'progress' and old_field != field:
basemarkinfo = '【编辑】任务进度:%s%%->%s%%' %(old_field, field)
return basemarkinfo
# 无调用
def do_update_task_field(task_old, params, filed_name, filed_desc, taskno):
field = params.get(filed_name)
basemarkinfo = ''
if field is None:
return basemarkinfo
old_field = task_old[filed_name]
if type(task_old[filed_name]) is int and type(field) is str:
old_field = str(old_field)
if field != old_field:
basemarkinfo = '%s ' %(filed_desc)
count = db_task.do_update('`task`', filed_name, field, 'taskno', taskno)
if count != 1:
logging.error(str(params) + ' do_update_task_progress_field更新任务状态失败!')
res = make_common_res(-1, '更新任务状态失败')
return json.dumps(res)
return basemarkinfo
def do_update_task(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not area_id.lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
taskno = check_param(params, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id缺失请检查后重试'))
operator = check_param(params, 'operator')
if not operator:
return json.dumps(make_common_res(3, '操作者信息缺失,请检查后重试'))
task_old = db_task.query_task(taskno, nodeid, area_id)
if task_old is None:
logging.error(str(params) + ' do_update_task 任务在数据库中不存在!')
res = make_common_res(-1, '任务在数据库中不存在。')
return json.dumps(res)
task_old_info = db_task.query_task(taskno, nodeid, area_id)
update_sql, basemarkinfo = gen_update_sql(params, task_old_info)
# print(update_sql, basemarkinfo)
if len(update_sql) > 0:
ret = db_task.update_task_info(taskno, update_sql, nodeid, area_id)
if ret != 1:
return json.dumps(make_common_res(3, '任务基本信息修改失败'))
progress_info = do_update_task_progress_field(task_old, params, 'progress', '任务进度', taskno)
if len(progress_info) > 0:
basemark = {
'operation': '编辑任务',
'content': {
'progress_info': progress_info,
'content': basemarkinfo
}
}
else:
basemark = {
'operation': '编辑任务',
'content': {
'content': basemarkinfo
}
}
if (len(basemarkinfo) > 0 and basemarkinfo != '') or len(progress_info) > 0:
ret = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False))
if ret != 1:
return json.dumps(make_common_res(3, '任务履历信息添加失败'))
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
return json.dumps(res)
def allowed_file_for_task(filename):
# 检查文件名是否为空,并且文件扩展名是否在允许的列表中
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSION_TASK
def do_task_upload(params):
nodeid = check_param({'nodeid': request.form.get('nodeid')}, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(3, "nodeid检查报错"))
taskno = check_param({'taskno': request.form.get('taskno')}, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id异常请检查后重试'))
if len(request.files.keys()) < 1:
return json.dumps(make_common_res(1, '文件不存在'))
suc_file_num, suc_mysql_num = 0, 0
for key in request.files.keys():
file = request.files[key]
try:
if not (file and file.filename and allowed_file_for_task(file.filename)):
return json.dumps(make_common_res(1, '文件格式错误'))
if not os.path.exists(f'./app/static/task_file/{nodeid}/task_no_{taskno}'):
os.makedirs(f'./app/static/task_file/{nodeid}/task_no_{taskno}')
os.chmod(f'./app/static/task_file/{nodeid}/task_no_{taskno}', 0o777)
file.save(f'./app/static/task_file/{nodeid}/task_no_{taskno}/{file.filename}')
suc_file_num += 1
download_url = f'/api/download_task_file?nodeid={nodeid}&taskno={taskno}&file_name={file.filename}'
ret = db_task.insert_upload_file_record(taskno, nodeid, download_url)
if ret == 1:
suc_mysql_num += 1
else:
break
except Exception as e:
logging.error(str(e) + '任务文件上传失败')
if os.path.exists(f'./app/static/task_file/{nodeid}/task_no_{taskno}/{file.filename}'):
os.remove(f'./app/static/task_file/{nodeid}/task_no_{taskno}/{file.filename}')
logging.info('回滚上传操作')
break
if suc_file_num == suc_mysql_num == len(request.files.keys()):
res = make_common_res(0, 'ok')
res['nodeid'] = ''
res['desc'] = ''
return json.dumps(res)
else:
return json.dumps(make_common_res(4, '任务文件上传失败'))
def do_task_file_download(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not area_id.lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
taskno = check_param(params, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id异常请检查后重试'))
file_name = check_param(params, 'file_name')
if not file_name:
return json.dumps(make_common_res(2, '文件名异常,请检查后重试'))
current_dir_path = os.path.dirname(os.path.abspath(__file__))
file_path = f"{current_dir_path}/static/task_file/{nodeid}/task_no_{taskno}/{file_name}"
if not os.path.isfile(file_path):
logging.error(f"文件不存在: {file_path}")
return json.dumps(make_common_res(2, '文件不存在,请检查后重试'))
return send_file(file_path, as_attachment=True)
def do_query_task_history(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not area_id.lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
taskno = check_param(params, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id异常请检查后重试'))
task_history_list = db_task.query_task_history(taskno, nodeid, area_id)
download_url_list = db_task.query_task_file_by_taskno(nodeid, taskno, area_id)
for task_history in task_history_list:
task_history['content'] = task_history['content'].replace('_split_suffix_for_query_info', '')
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
res['data'] = {
'task_history_list': task_history_list,
'download_url_list': download_url_list
}
return json.dumps(res)
def del_task_file_api(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not area_id.lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
taskno = check_param(params, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id异常请检查后重试'))
file_name = check_param(params, 'file_name')
if not file_name:
return json.dumps(make_common_res(2, '文件名异常,请检查后重试'))
file_id = check_param(params, 'file_id')
if not file_id:
return json.dumps(make_common_res(2, '文件id异常请检查后重试'))
ret = db_task.del_task_file(nodeid, taskno, file_name, file_id)
if ret == 1:
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
return json.dumps(res)
else:
return json.dumps(make_common_res(4, '删除失败'))
def do_query_task_progress_history(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not area_id.lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
# taskno = params.get('taskno')
yesterday_date = check_param(params, 'yesterday_date')
task_history_list = db_task.query_task_progress_history(yesterday_date, nodeid, area_id)
if not yesterday_date or yesterday_date == '':
return json.dumps(make_common_res(3, "查询日期缺失"))
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['desc'] = ''
res['data'] = task_history_list
return json.dumps(res)
def do_query_task_detail(params):
global db_task
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not area_id.lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
res = make_common_res(0, 'ok')
taskno = check_param(params, 'taskno')
if not taskno:
return json.dumps(make_common_res(2, '任务id缺失请检查后重试'))
task = db_task.query_task(taskno, nodeid)
if task is None:
logging.error(str(params) + ' do_query_task_detail 任务在数据库中不存在!')
res = make_common_res(-1, '任务在数据库中不存在。')
return json.dumps(res)
res['desc'] = ''
res['data'] = task
return json.dumps(res)
def gen_update_sql(params, task_old_info):
modify_data, modify_item = '', ''
task_name = check_param(params, 'task_name')
if task_name and task_name != task_old_info['task_name']:
modify_data += f"task_name = '{task_name}'"
modify_item += f"【编辑】任务名称:{task_old_info['task_name']} -> {task_name}"
task_type = check_param(params, 'task_type')
if task_type and int(task_type) != task_old_info['task_type']:
prefix = ''
if modify_data != '':
prefix = 'prefix_for_split_'
modify_data += f", task_type = {int(task_type)}"
else:
modify_data += f'task_type = {int(task_type)}'
modify_item += prefix
modify_item += f"【编辑】任务类型:{task_old_info['task_type']} -> {task_type} "
task_class = check_param(params, 'task_class')
if task_class and int(task_class) != task_old_info['task_class']:
prefix = ''
if modify_data != '':
prefix = 'prefix_for_split_'
modify_data += f', task_class = {int(task_class)}'
else:
modify_data += f'task_class = {int(task_class)}'
modify_item += prefix
modify_item += f"【编辑】任务等级:{task_old_info['task_class']} -> {task_class} "
data_type = check_param(params, 'data_type')
if data_type and int(data_type) != task_old_info['data_type']:
prefix = ''
if modify_data != '':
prefix = 'prefix_for_split_'
modify_data += f', data_type = {int(data_type)}'
else:
modify_data += f'data_type = {int(data_type)}'
if int(data_type) == 1:
crossids = check_param(params, 'crossids')
if crossids and crossids != '' and crossids != task_old_info['crossids']:
modify_data += f", crossids = '{crossids}'"
elif int(data_type) == 2:
arteryids = check_param(params, 'arteryids')
if arteryids and arteryids != '' and arteryids != task_old_info['arteryids']:
modify_data += f", arteryids = '{arteryids}'"
modify_item += prefix
modify_item += f"【编辑】关联路网信息:{task_old_info['data_type']} -> {data_type}"
# plan_begin_time = check_param(params, 'plan_begin_time')
# if plan_begin_time and plan_begin_time != task_old_info['plan_begin_time']:
# if modify_data != '':
# modify_data += f', plan_begin_time = {int(plan_begin_time)}'
# else:
# modify_data += f'plan_begin_time = {int(plan_begin_time)}'
# modify_item += ' 计划开始时间 '
# plan_end_time = check_param(params, 'plan_end_time')
# if plan_end_time and plan_end_time != task_old_info['plan_end_time']:
# if modify_data != '':
# modify_data += f', plan_end_time = {int(plan_end_time)}'
# else:
# modify_data += f'plan_end_time = {int(plan_end_time)}'
# modify_item += ' 计划结束时间 '
executor = check_param(params, 'executor')
if executor and executor != task_old_info['executor']:
prefix = ''
if modify_data != '':
prefix = 'prefix_for_split_'
modify_data += f", executor = '{executor}'"
else:
modify_data += f"executor = '{executor}'"
modify_item += prefix
modify_item += f"【编辑】负责人:{task_old_info['executor']} -> {executor} "
task_src = check_param(params, 'task_src')
if task_src and task_src != task_old_info['task_src']:
prefix = ''
if modify_data != '':
prefix = 'prefix_for_split_'
modify_data += f", task_src = '{task_src}'"
else:
modify_data += f"task_src = '{task_src}'"
modify_item += prefix
modify_item += f"【编辑】任务来源:{task_old_info['task_src']} -> {task_src} "
description = check_param(params, 'description')
if description and description != task_old_info['description']:
prefix = ''
if modify_data != '':
prefix = 'prefix_for_split_'
modify_data += f", description = '{description}'"
else:
modify_data += f"description = '{description}'"
modify_item += prefix
modify_item += f"【编辑】任务描述:{task_old_info['description']} -> {description}"
comment = check_param(params, 'comment')
if comment and comment != '':
new_comment = task_old_info['comment'] + "" + datetime.now().date().strftime('%Y-%m-%d') + "" + comment + "_split_suffix_for_query_info"
prefix = ''
if modify_data != '':
prefix = 'prefix_for_split_'
modify_data += f", comment = '{new_comment}'"
else:
modify_data += f"comment = '{new_comment}'"
modify_item += prefix
modify_item += f"【添加】任务备注:{comment}"
return modify_data, modify_item