# -*- coding: utf-8 -*- # @Author: Owl # @Date: 2025/11/10 17:56 # @Description: import json from app.global_source import db_user, db_task from flask import request, send_file from pypinyin import lazy_pinyin, Style from app.common_worker import * ALLOWED_EXTENSION_TASK = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'doc', 'docx', 'xlsx', 'xls', 'pptx', 'ppt'} def do_query_task_list_parameter(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) res = make_common_res(0, 'ok') res['data'] = {} #任务类型 task_types = db_task.query_task_type(nodeid, area_id) task_types_res = {} for task_type in task_types: task_types_res[task_type['task_type_no']] = task_type['task_type'] res['data']['task_type'] = task_types_res # 路网(数据)类型 res['data']['data_type'] = {0: '不关联', 1: '路口任务', 2: '干线协调任务'} #任务等级 0低(无需审批),1中(可事后审批),2高(审批后执行) res['data']['task_class'] = {0: '低(无需审批)', 1: '中(可事后审批)', 2: '高(审批后执行)'} #任务状态 res['data']['task_state'] = {0:'待下发', 1: '待审批', 2: '进行中未审批', 3: '进行中', 4: '完成'} #负责人 executors = db_task.query_task_executor(nodeid, area_id) if len(executors)<=0: logging.error(' query_task_executor没有数据!') executor_list = [] for executor in executors: executor_list.append(executor['user_name']) res['data']['executor'] = executor_list #需求来源 srcs = db_task.query_task_src(nodeid, area_id) if len(srcs)<=0: logging.error(' query_task_src没有数据!') src_list = [] for src in srcs: src_list.append(src['src']) res['data']['task_src'] = src_list return json.dumps(res) def do_query_task_list(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) #任务名称`task_name` varchar(300) DEFAULT NULL COMMENT '任务名称', #任务类型`task_type` varchar(300) DEFAULT NULL COMMENT '任务类型', #任务等级`task_class` varchar(300) DEFAULT NULL COMMENT '需求等级'; #计划开始时间`plan_begin_time` bigint DEFAULT NULL COMMENT '计划开始时间', #计划结束时间`plan_end_time` bigint DEFAULT NULL COMMENT '计划结束时间', #任务状态`task_state` int NOT NULL COMMENT '任务状态 0未开始,1进行中,2完成,3挂起,4废止', #负责人`executor` varchar(300) DEFAULT NULL COMMENT '负责人', #需求来源 `task_src` varchar(300) DEFAULT NULL COMMENT '需求来源'; all_task_list = db_task.query_all_tak_list(nodeid, area_id) upload_file_dict = db_task.query_task_file(nodeid, area_id) for task_info in all_task_list: if task_info['taskno'] in upload_file_dict.keys(): task_info['download_url_list'] = upload_file_dict[task_info['taskno']] else: task_info['download_url_list'] = [] name_list, type_list, class_list, begin_list, end_list, state_list, executor_list, task_src_list = [], [], [], [], [], [], [], [] task_name = params.get('task_name') task_type = params.get('task_type') task_class = params.get('task_class') plan_begin_time = params.get('plan_begin_time') plan_end_time = params.get('plan_end_time') task_state = params.get('task_state') executor = params.get('executor') task_src = params.get('task_src') task_no = None if task_name.isdigit(): task_no = task_name else: if task_name and any('\u4e00' <= c <= '\u9fff' for c in task_name): # 判定关键字是否包含中文 如果是 则认为是关键字 进行模糊查询 for task_info in all_task_list: if task_name in task_info['task_name']: name_list.append(task_info) elif task_name and not any('\u4e00' <= c <= '\u9fff' for c in task_name): # 如果不存在中文 则认为是拼音或者首字母 进行模糊查询 for task_info in all_task_list: if any('\u4e00' <= c <= '\u9fff' for c in task_info['task_name']): task_name_pinyin = lazy_pinyin(task_info['task_name'], style=Style.NORMAL) else: task_name_pinyin = task_info['task_name'] if len(task_name) > 1 and task_name in ''.join(task_name_pinyin): name_list.append(task_info) continue for i in range(len(task_name)): if any('\u4e00' <= c <= '\u9fff' for c in task_info['task_name']) and task_name[i] == list(task_name_pinyin)[i][0] and task_info not in name_list: name_list.append(task_info) if task_type and len(task_type) > 0: type_list.extend([task_info for task_info in all_task_list if int(task_type) == task_info['task_type']]) if len(type_list) == 0: # 用于多种筛选条件取交集使用 type_list.append({'arteryids': -1}) if task_class and len(task_class) > 0: class_list.extend([task_info for task_info in all_task_list if int(task_class) == task_info['task_class']]) if len(class_list) == 0: # 用于多种筛选条件取交集使用 class_list.append({'arteryids': -1}) if plan_begin_time and len(plan_begin_time) > 0: begin_list.extend([task_info for task_info in all_task_list if int(plan_begin_time) <= task_info['plan_begin_time']]) if len(begin_list) == 0: # 用于多种筛选条件取交集使用 begin_list.append({'arteryids': -1}) if plan_end_time and len(plan_end_time) > 0: end_list.extend([task_info for task_info in all_task_list if int(plan_end_time) >= task_info['plan_end_time']]) if len(end_list) == 0: # 用于多种筛选条件取交集使用 end_list.append({'arteryids': -1}) if task_state and len(task_state) > 0: state_list.extend([task_info for task_info in all_task_list if int(task_state) == task_info['task_state']]) if len(state_list) == 0: # 用于多种筛选条件取交集使用 state_list.append({'arteryids': -1}) if executor and len(executor) > 0: executor_list.extend([task_info for task_info in all_task_list if executor == task_info['executor']]) if len(executor_list) == 0: # 用于多种筛选条件取交集使用 executor_list.append({'arteryids': -1}) if task_src and len(task_src) > 0: task_src_list.extend([task_info for task_info in all_task_list if task_src == task_info['task_src']]) if len(task_src_list) == 0: # 用于多种筛选条件取交集使用 task_src_list.append({'arteryids': -1}) non_empty_lists = [lst for lst in [name_list, type_list, class_list, begin_list, end_list, state_list, executor_list, task_src_list] if len(lst) > 0] if non_empty_lists: res_list = intersect_dicts(non_empty_lists) else: res_list = [] if task_no and len(task_no) > 0: res_list = [task_info for task_info in all_task_list if int(task_no) == task_info['taskno']] if not task_name and not task_type and not task_class and not plan_begin_time and not plan_end_time and not task_state and not executor and not task_src and not task_no: res_list = all_task_list # task_list = db_task.query_task_list(task_name,task_type,data_type, task_class,plan_begin_time,plan_end_time,publish_time,task_state,executor,task_src, nodeid) filtered_list = [d for d in res_list if d['arteryids'] != -1] for task_info in filtered_list: comment = task_info['comment'] if '_split_suffix_for_query_info' in comment: comment_list = comment.split('_split_suffix_for_query_info') comment_list.pop() else: comment_list = [comment] task_info['comment'] = comment_list sorted_list = sorted(filtered_list, key=sort_key) res = make_common_res(0, 'ok') res['nodeid'] = nodeid res['area_id'] = area_id res['data'] = sorted_list res['desc'] = '' return json.dumps(res) def sort_key(item): return (item['task_state'], item['progress'], -item['plan_end_time']) def make_hashable(obj): """将对象转换为可哈希的类型""" if isinstance(obj, dict): return tuple(sorted((k, make_hashable(v)) for k, v in obj.items())) elif isinstance(obj, list): return tuple(make_hashable(v) for v in obj) else: return obj def intersect_dicts(list_of_dicts): """计算多个字典列表的交集""" if not list_of_dicts: return [] # 将第一个列表中的字典转换为可哈希的元组 intersection = set(make_hashable(d) for d in list_of_dicts[0]) # 与后续的每个列表进行交集操作 for lst in list_of_dicts[1:]: current_set = set(make_hashable(d) for d in lst) intersection &= current_set # 将交集中的元组转换回字典 return [dict(t) for t in intersection] def do_remove_task(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) tasknos = check_param(params, 'tasknos') if not tasknos or len(tasknos) < 1: return json.dumps(make_common_res(2, '任务id缺失,请检查后重试')) count = db_task.delete_task(tasknos, nodeid, area_id) if count != len(tasknos): logging.error(str(params) + ' do_remove_task删除失败!') res = make_common_res(-1, '删除失败') return json.dumps(res) res = make_common_res(0, 'ok') res['nodeid'] = nodeid res['desc'] = '' return json.dumps(res) def do_distribute_task(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) taskno = check_param(params, 'taskno') if not taskno: return json.dumps(make_common_res(2, '任务id缺失,请检查后重试')) operator = check_param(params, 'operator') if not operator: return json.dumps(make_common_res(2, '操作员信息缺失,请检查后重试')) task_old = db_task.query_task(taskno, nodeid, area_id) if task_old is None: logging.error(str(params) + ' do_distribute_task找不到对应的任务!') return json.dumps(make_common_res(-1, '找不到对应的任务')) #任务等级,0正常,1紧急,2特急 #任务状态 0待下发,1待审批,2进行中未审批,3进行中,4完成 if task_old['task_class'] == 2: new_state = '1' elif task_old['task_class'] == 1: new_state = '2' else: new_state = '3' count = db_task.update_task_state(nodeid, area_id, taskno, new_state) if count!=1: logging.error(str(params) + ' do_distribute_task更新任务状态失败!') res = make_common_res(-1, '更新任务状态失败') return json.dumps(res) basemark = {'operation': '下发任务', 'content': {}} count = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False)) if count!=1: logging.error(str(params) + ' do_distribute_task添加任务履历失败!') res = make_common_res(-1, '添加任务履历失败') return json.dumps(res) task_old_info = db_task.query_task(taskno, nodeid, area_id) # 20241022 补充开发下发任务的同时修改任务基本信息的功能 modify_data, modify_item = gen_update_sql(params, task_old_info) if len(modify_data) > 0: ret = db_task.update_task_info(taskno, modify_data, nodeid, area_id) if ret != 1: return json.dumps(make_common_res(3, '任务基本信息修改失败')) basemark = {'operation': '编辑任务', 'content': {'content': modify_item}} his_ret = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False)) if his_ret != 1: return json.dumps(make_common_res(3, '任务履历添加失败')) res = make_common_res(0, 'ok') res['nodeid'] = nodeid res['desc'] = '' return json.dumps(res) def do_approval(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) taskno = check_param(params, 'taskno') if not taskno: return json.dumps(make_common_res(2, '任务id缺失,请检查后重试')) operator = check_param(params, 'operator') if not operator: return json.dumps(make_common_res(2, '操作员信息缺失,请检查后重试')) #task_old = db_task.query_task(taskno, nodeid) # 任务等级,0正常,1紧急,2特急 # 任务状态 0待下发,1待审批,2进行中未审批,3进行中,4完成 new_state = '3' count = db_task.update_task_state(nodeid, area_id, taskno, new_state) if count!=1: logging.error(str(params) + ' do_approval更新任务状态失败!') res = make_common_res(-1, '更新任务状态失败') return json.dumps(res) basemark = {'operation': '审批任务', 'content': {}} count = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False)) if count!=1: logging.error(str(params) + ' do_approval添加任务履历失败!') res = make_common_res(-1, '添加任务履历失败') return json.dumps(res) # 20241022 新增审批任务的同时修改任务部分基础属性的功能 task_old_info = db_task.query_task(taskno, nodeid, area_id) modify_data, modify_item = gen_update_sql(params, task_old_info) if len(modify_data) > 0: ret = db_task.update_task_info(taskno, modify_data, nodeid, area_id) if ret != 1: return json.dumps(make_common_res(3, '任务基本信息修改失败')) basemark = {'operation': '编辑任务', 'content': {'content': modify_item}} his_ret = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False)) if his_ret != 1: return json.dumps(make_common_res(3, '任务履历添加失败')) res = make_common_res(0, 'ok') res['nodeid'] = nodeid res['desc'] = '' return json.dumps(res) def do_complete_task(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) taskno = check_param(params, 'taskno') if not taskno: return json.dumps(make_common_res(2, '任务id缺失,请检查后重试')) operator = check_param(params, 'operator') if not operator: return json.dumps(make_common_res(2, '操作员信息缺失,请检查后重试')) task_old = db_task.query_task(taskno, nodeid, area_id) if not task_old: return json.dumps(make_common_res(3, '找不到对应的任务')) if task_old['task_state'] == '4': return json.dumps(make_common_res(0, '当前任务已完成')) # 任务等级,0正常,1紧急,2特急 # 任务状态 0待下发,1待审批,2进行中未审批,3进行中,4完成 new_state = '4' count = db_task.update_task_state(nodeid, area_id, taskno, new_state) if count!=1: logging.error(str(params) + ' do_complete_task更新任务状态失败!') res = make_common_res(-1, '更新任务状态失败') return json.dumps(res) basemark = {'operation': '完成任务', 'content': {}} count = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False)) if count!=1: logging.error(str(params) + ' do_complete_task添加任务履历失败!') res = make_common_res(-1, '添加任务履历失败') return json.dumps(res) res = make_common_res(0, 'ok') res['nodeid'] = nodeid res['desc'] = '' return json.dumps(res) def do_add_task(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) task_type = check_param(params, 'task_type') if not task_type: return json.dumps(make_common_res(2, '任务类型缺失,请检查后重试')) # if int(task_type) not in (1, 2, 3, 4, 5): # return json.dumps(make_common_res(2, '任务类型参数异常,请检查后重试')) task_name = check_param(params, 'task_name') if not task_name: return json.dumps(make_common_res(2, '任务名称缺失,请检查后重试')) task_class = check_param(params, 'task_class') # 需求等级,0低(无需审批),1中(可事后审批),2高(审批后执行) if not task_class: return json.dumps(make_common_res(2, '任务等级缺失,请检查后重试')) data_type = check_param(params, 'data_type') if not data_type: return json.dumps(make_common_res(2, '关联路网状态缺失,请检查后重试')) crossids, arteryids, sectionids = '', '', '' if int(data_type) == 1: crossids = check_param(params, 'crossids') if not crossids: return json.dumps(make_common_res(2, '当前所选关联路网状态为:关联路口,但关联路口信息缺失,请检查后重试')) elif int(data_type) == 2: arteryids = check_param(params, 'arteryids') if not arteryids: return json.dumps(make_common_res(2, '当前所选关联路网状态为:关联干线,但关联干线信息缺失,请检查后重试')) add_type = check_param(params, 'add_type') if not add_type or add_type == 'normal': plan_begin_time = check_param(params, 'plan_begin_time') if not plan_begin_time: return json.dumps(make_common_res(2, '计划开始时间缺失,请检查后重试')) plan_end_time = check_param(params, 'plan_end_time') if not plan_end_time: return json.dumps(make_common_res(2, '计划结束时间缺失,请检查后重试')) else: plan_begin_time = 0 plan_end_time = 0 task_src = check_param(params, 'task_src') if not task_src: return json.dumps(make_common_res(2, '需求来源缺失,请检查后重试')) # if task_src not in ('舆情', '交警交办任务', '现场拥堵', '平台优化策略'): # return json.dumps(make_common_res(2, '需求来源异常,可选项为:舆情、交警交办任务、现场拥堵、平台优化策略,请检查后重试')) executor = check_param(params, 'executor') if not executor: return json.dumps(make_common_res(2, '任务负责人缺失,请检查后重试')) publish_time = check_param(params, 'publish_time') if not publish_time: return json.dumps(make_common_res(2, '发布时间缺失,请检查后重试')) publish_time = int(publish_time) timestamp = publish_time description = check_param(params, 'description') if not description: return json.dumps(make_common_res(2, '任务描述缺失,请检查后重试')) comment = check_param(params, 'comment') if not comment: comment = '' operator = check_param(params, 'operator') if not operator: return json.dumps(make_common_res(2, '操作员信息缺失,请检查后重试')) creatorid = operator progress, record_task_state, task_state = 0, 0, 0 if int(task_class) == 0: task_state = 3 elif int(task_class) == 1: task_state = 2 elif int(task_class) == 2: task_state = 1 if comment and add_type and add_type != 'normal': comment = "【推荐策略】" + comment + "_split_suffix_for_query_info" if comment and (not add_type or add_type == 'normal'): comment = "【" + datetime.now().date().strftime('%Y-%m-%d') + "】" + comment + "_split_suffix_for_query_info" if add_type and add_type != 'normal': task_state = 0 count = db_task.add_task(timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time, publish_time, executor, progress, task_state, description, crossids, sectionids, arteryids, comment, record_task_state, task_src, task_class, nodeid, area_id) if count != 1: logging.error(str(params) + ' 添加任务报错!') res = make_common_res(-1, '添加任务报错。') return json.dumps(res) basemark = {'operation': '创建任务', 'content': {}} taskno = db_task.get_task_no(nodeid, area_id, task_name, task_type, task_class, data_type, plan_begin_time, plan_end_time, executor, task_src, comment) if taskno is None: logging.error(str(params) + ' do_add_task更新任务状态失败!') res = make_common_res(-1, '更新任务状态失败') return json.dumps(res) if add_type and add_type != 'normal': basemark = {'operation': '诊断问题确认', 'content': {}} count = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False)) if count != 1: logging.error(str(params) + ' do_add_task添加任务履历失败!') res = make_common_res(-1, '添加任务履历失败') return json.dumps(res) res = make_common_res(0, 'ok') res['nodeid'] = nodeid res['desc'] = '' res['taskno'] = taskno return json.dumps(res) #获取所有具有'已完成'任务的路口 #params #nodeid def do_query_completed_task_cross_list(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) task_list = db_task.query_completed_task_cross_list(nodeid, area_id) res = make_common_res(0, 'ok') res['nodeid'] = nodeid res['data'] = task_list res['desc'] = '' return json.dumps(res) #获取指定路口的所有的'已完成'任务 #params #nodeid #crossid def do_query_completed_task_by_cross(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) crossid = check_param(params, 'crossid') if not crossid: return json.dumps(make_common_res(2, '缺少crossid, 请刷新后重试')) task_list = db_task.query_completed_task_by_cross(crossid, nodeid, area_id) res = make_common_res(0, 'ok') res['nodeid'] = nodeid res['data'] = task_list res['desc'] = '' return json.dumps(res) def do_update_task_progress_field(task_old, params, filed_name, filed_desc, taskno): field = params.get(filed_name) basemarkinfo = '' if field is None: return basemarkinfo old_field = task_old[filed_name] if type(task_old[filed_name]) is int and type(field) is str: old_field = str(old_field) if field != old_field: count = db_task.do_update('`task`', filed_name, field, 'taskno', taskno) if count != 1: logging.error(str(params) + ' do_update_task_progress_field更新任务状态失败!') res = make_common_res(-1, '更新任务状态失败') return basemarkinfo if filed_name == 'progress' and old_field != field: basemarkinfo = '【编辑】任务进度:%s%%->%s%%' %(old_field, field) return basemarkinfo # 无调用 def do_update_task_field(task_old, params, filed_name, filed_desc, taskno): field = params.get(filed_name) basemarkinfo = '' if field is None: return basemarkinfo old_field = task_old[filed_name] if type(task_old[filed_name]) is int and type(field) is str: old_field = str(old_field) if field != old_field: basemarkinfo = '%s ' %(filed_desc) count = db_task.do_update('`task`', filed_name, field, 'taskno', taskno) if count != 1: logging.error(str(params) + ' do_update_task_progress_field更新任务状态失败!') res = make_common_res(-1, '更新任务状态失败') return json.dumps(res) return basemarkinfo def do_update_task(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) taskno = check_param(params, 'taskno') if not taskno: return json.dumps(make_common_res(2, '任务id缺失,请检查后重试')) operator = check_param(params, 'operator') if not operator: return json.dumps(make_common_res(3, '操作者信息缺失,请检查后重试')) task_old = db_task.query_task(taskno, nodeid, area_id) if task_old is None: logging.error(str(params) + ' do_update_task 任务在数据库中不存在!') res = make_common_res(-1, '任务在数据库中不存在。') return json.dumps(res) task_old_info = db_task.query_task(taskno, nodeid, area_id) update_sql, basemarkinfo = gen_update_sql(params, task_old_info) # print(update_sql, basemarkinfo) if len(update_sql) > 0: ret = db_task.update_task_info(taskno, update_sql, nodeid, area_id) if ret != 1: return json.dumps(make_common_res(3, '任务基本信息修改失败')) progress_info = do_update_task_progress_field(task_old, params, 'progress', '任务进度', taskno) if len(progress_info) > 0: basemark = { 'operation': '编辑任务', 'content': { 'progress_info': progress_info, 'content': basemarkinfo } } else: basemark = { 'operation': '编辑任务', 'content': { 'content': basemarkinfo } } if (len(basemarkinfo) > 0 and basemarkinfo != '') or len(progress_info) > 0: ret = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False)) if ret != 1: return json.dumps(make_common_res(3, '任务履历信息添加失败')) res = make_common_res(0, 'ok') res['nodeid'] = nodeid res['desc'] = '' return json.dumps(res) def allowed_file_for_task(filename): # 检查文件名是否为空,并且文件扩展名是否在允许的列表中 return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSION_TASK def do_task_upload(params): nodeid = check_param({'nodeid': request.form.get('nodeid')}, 'nodeid') if not nodeid: return json.dumps(make_common_res(3, "nodeid检查报错")) area_id = check_param({'area_id': request.form.get('area_id')}, 'area_id') if not area_id: return json.dumps(make_common_res(3, "area_id检查报错")) taskno = check_param({'taskno': request.form.get('taskno')}, 'taskno') if not taskno: return json.dumps(make_common_res(2, '任务id异常,请检查后重试')) userid = check_param({'userid': request.form.get('userid')}, 'userid') if not userid: return json.dumps(make_common_res(4, '用户id缺失,请检查后重试')) if len(request.files.keys()) < 1: return json.dumps(make_common_res(1, '文件不存在')) suc_file_num, suc_mysql_num = 0, 0 for key in request.files.keys(): file = request.files[key] try: if not (file and file.filename and allowed_file_for_task(file.filename)): return json.dumps(make_common_res(1, '文件格式错误')) if not os.path.exists(f'./app/static/task_file/{nodeid}/task_no_{taskno}'): os.makedirs(f'./app/static/task_file/{nodeid}/task_no_{taskno}') os.chmod(f'./app/static/task_file/{nodeid}/task_no_{taskno}', 0o777) file.save(f'./app/static/task_file/{nodeid}/task_no_{taskno}/{file.filename}') suc_file_num += 1 download_url = f'/api/download_task_file?nodeid={nodeid}&taskno={taskno}&file_name={file.filename}&area_id={area_id}&userid={userid}' ret = db_task.insert_upload_file_record(taskno, nodeid, download_url, area_id) if ret == 1: suc_mysql_num += 1 else: break except Exception as e: logging.error(str(e) + '任务文件上传失败') if os.path.exists(f'./app/static/task_file/{nodeid}/task_no_{taskno}/{file.filename}'): os.remove(f'./app/static/task_file/{nodeid}/task_no_{taskno}/{file.filename}') logging.info('回滚上传操作') break if suc_file_num == suc_mysql_num == len(request.files.keys()): res = make_common_res(0, 'ok') res['nodeid'] = '' res['desc'] = '' return json.dumps(res) else: return json.dumps(make_common_res(4, '任务文件上传失败')) def do_task_file_download(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) taskno = check_param(params, 'taskno') if not taskno: return json.dumps(make_common_res(2, '任务id异常,请检查后重试')) file_name = check_param(params, 'file_name') if not file_name: return json.dumps(make_common_res(2, '文件名异常,请检查后重试')) current_dir_path = os.path.dirname(os.path.abspath(__file__)) file_path = f"{current_dir_path}/static/task_file/{nodeid}/task_no_{taskno}/{file_name}" if not os.path.isfile(file_path): logging.error(f"文件不存在: {file_path}") return json.dumps(make_common_res(2, '文件不存在,请检查后重试')) return send_file(file_path, as_attachment=True) def do_query_task_history(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) taskno = check_param(params, 'taskno') if not taskno: return json.dumps(make_common_res(2, '任务id异常,请检查后重试')) task_history_list = db_task.query_task_history(taskno, nodeid, area_id) download_url_list = db_task.query_task_file_by_taskno(nodeid, taskno, area_id) for task_history in task_history_list: task_history['content'] = task_history['content'].replace('_split_suffix_for_query_info', '') res = make_common_res(0, 'ok') res['nodeid'] = nodeid res['desc'] = '' res['data'] = { 'task_history_list': task_history_list, 'download_url_list': download_url_list } return json.dumps(res) def del_task_file_api(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) taskno = check_param(params, 'taskno') if not taskno: return json.dumps(make_common_res(2, '任务id异常,请检查后重试')) file_name = check_param(params, 'file_name') if not file_name: return json.dumps(make_common_res(2, '文件名异常,请检查后重试')) file_id = check_param(params, 'file_id') if not file_id: return json.dumps(make_common_res(2, '文件id异常,请检查后重试')) ret = db_task.del_task_file(nodeid, taskno, file_name, file_id) if ret == 1: res = make_common_res(0, 'ok') res['nodeid'] = nodeid res['desc'] = '' return json.dumps(res) else: return json.dumps(make_common_res(4, '删除失败')) def do_query_task_progress_history(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) # taskno = params.get('taskno') yesterday_date = check_param(params, 'yesterday_date') task_history_list = db_task.query_task_progress_history(yesterday_date, nodeid, area_id) if not yesterday_date or yesterday_date == '': return json.dumps(make_common_res(3, "查询日期缺失")) res = make_common_res(0, 'ok') res['nodeid'] = nodeid res['desc'] = '' res['data'] = task_history_list return json.dumps(res) def do_query_task_detail(params): global db_task nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) res = make_common_res(0, 'ok') taskno = check_param(params, 'taskno') if not taskno: return json.dumps(make_common_res(2, '任务id缺失,请检查后重试')) task = db_task.query_task(taskno, nodeid) if task is None: logging.error(str(params) + ' do_query_task_detail 任务在数据库中不存在!') res = make_common_res(-1, '任务在数据库中不存在。') return json.dumps(res) res['desc'] = '' res['data'] = task return json.dumps(res) def gen_update_sql(params, task_old_info): modify_data, modify_item = '', '' task_name = check_param(params, 'task_name') if task_name and task_name != task_old_info['task_name']: modify_data += f"task_name = '{task_name}'" modify_item += f"【编辑】任务名称:{task_old_info['task_name']} -> {task_name}" task_type = check_param(params, 'task_type') if task_type and int(task_type) != task_old_info['task_type']: prefix = '' if modify_data != '': prefix = 'prefix_for_split_' modify_data += f", task_type = {int(task_type)}" else: modify_data += f'task_type = {int(task_type)}' modify_item += prefix modify_item += f"【编辑】任务类型:{task_old_info['task_type']} -> {task_type} " task_class = check_param(params, 'task_class') if task_class and int(task_class) != task_old_info['task_class']: prefix = '' if modify_data != '': prefix = 'prefix_for_split_' modify_data += f', task_class = {int(task_class)}' else: modify_data += f'task_class = {int(task_class)}' modify_item += prefix modify_item += f"【编辑】任务等级:{task_old_info['task_class']} -> {task_class} " data_type = check_param(params, 'data_type') if data_type and int(data_type) != task_old_info['data_type']: prefix = '' if modify_data != '': prefix = 'prefix_for_split_' modify_data += f', data_type = {int(data_type)}' else: modify_data += f'data_type = {int(data_type)}' if int(data_type) == 1: crossids = check_param(params, 'crossids') if crossids and crossids != '' and crossids != task_old_info['crossids']: modify_data += f", crossids = '{crossids}'" elif int(data_type) == 2: arteryids = check_param(params, 'arteryids') if arteryids and arteryids != '' and arteryids != task_old_info['arteryids']: modify_data += f", arteryids = '{arteryids}'" modify_item += prefix modify_item += f"【编辑】关联路网信息:{task_old_info['data_type']} -> {data_type}" # plan_begin_time = check_param(params, 'plan_begin_time') # if plan_begin_time and plan_begin_time != task_old_info['plan_begin_time']: # if modify_data != '': # modify_data += f', plan_begin_time = {int(plan_begin_time)}' # else: # modify_data += f'plan_begin_time = {int(plan_begin_time)}' # modify_item += ' 计划开始时间 ' # plan_end_time = check_param(params, 'plan_end_time') # if plan_end_time and plan_end_time != task_old_info['plan_end_time']: # if modify_data != '': # modify_data += f', plan_end_time = {int(plan_end_time)}' # else: # modify_data += f'plan_end_time = {int(plan_end_time)}' # modify_item += ' 计划结束时间 ' executor = check_param(params, 'executor') if executor and executor != task_old_info['executor']: prefix = '' if modify_data != '': prefix = 'prefix_for_split_' modify_data += f", executor = '{executor}'" else: modify_data += f"executor = '{executor}'" modify_item += prefix modify_item += f"【编辑】负责人:{task_old_info['executor']} -> {executor} " task_src = check_param(params, 'task_src') if task_src and task_src != task_old_info['task_src']: prefix = '' if modify_data != '': prefix = 'prefix_for_split_' modify_data += f", task_src = '{task_src}'" else: modify_data += f"task_src = '{task_src}'" modify_item += prefix modify_item += f"【编辑】任务来源:{task_old_info['task_src']} -> {task_src} " description = check_param(params, 'description') if description and description != task_old_info['description']: prefix = '' if modify_data != '': prefix = 'prefix_for_split_' modify_data += f", description = '{description}'" else: modify_data += f"description = '{description}'" modify_item += prefix modify_item += f"【编辑】任务描述:{task_old_info['description']} -> {description}" comment = check_param(params, 'comment') if comment and comment != '': new_comment = task_old_info['comment'] + "【" + datetime.now().date().strftime('%Y-%m-%d') + "】" + comment + "_split_suffix_for_query_info" prefix = '' if modify_data != '': prefix = 'prefix_for_split_' modify_data += f", comment = '{new_comment}'" else: modify_data += f"comment = '{new_comment}'" modify_item += prefix modify_item += f"【添加】任务备注:{comment}" return modify_data, modify_item