diff --git a/app/cross_eva_views.py b/app/cross_eva_views.py index e87c9ce..2dba2c4 100644 --- a/app/cross_eva_views.py +++ b/app/cross_eva_views.py @@ -7,15 +7,37 @@ from apscheduler.schedulers.background import BackgroundScheduler from flask import Flask, request, jsonify, redirect from flask_cors import CORS from flask_caching import Cache +from werkzeug.exceptions import UnsupportedMediaType from app.cross_evaluate_worker import * from app.phasetable_worker import phase_cross_list +from app.views_task import * app = Flask(__name__) cache = Cache(app, config={'CACHE_TYPE': 'simple'}) CORS(app, resources={r"/api/*": {"origins": "*"}}) cache_keys = ['userid'] +# @app.before_request +# def middleware_manage(): +# nodeid = request.args.get('nodeid') +# area_id = request.args.get('area_id') +# try: +# if not nodeid: +# nodeid = request.json.get('nodeid') +# if not area_id: +# area_id = request.json.get('area_id') +# except UnsupportedMediaType: +# if not nodeid: +# nodeid = request.form.get('nodeid') +# if not area_id: +# area_id = request.form.get('area_id') +# if (not nodeid or nodeid not in g_config['nodeid_list']) or (not area_id or area_id not in g_config['area_id_list']): +# return json.dumps(make_common_res(400, '辖区id异常'), ensure_ascii=False), 200 +# +# g.nodeid = nodeid +# g.area_id = area_id + @app.route('/', methods=['GET']) def server_info(): diff --git a/app/models.py b/app/models.py index 3abe0e7..21b3254 100644 --- a/app/models.py +++ b/app/models.py @@ -6,7 +6,7 @@ from app.comm import * from matplotlib.path import Path # 车信箭头标记到转向集合的映射关系 -g_turn2str = {'1': 's', '2': 'l', '3': 'r', '4': 'ls', '5': 'lr', '6': 't', '7': 'st', '8': 'lt', '9': 'lr', +g_turn2str = {'1': 's', '2': 'l', '3': 'r', '4': 'ls', '5': 'sr', '6': 't', '7': 'st', '8': 'lt', '9': 'lr', '10': 'lst', '11': '-', '12': 'lsr', '13': 'rt', '14': 'lrt', '15': 'srt', '16': 'lsrt', '17': 'bus', '19': 'reversible'} g_turnflag2type = {'s': 0, 'l': 1, 'r': 2, 't': 3} diff --git a/app/task_db_func.py b/app/task_db_func.py new file mode 100644 index 0000000..dfdb6dc --- /dev/null +++ b/app/task_db_func.py @@ -0,0 +1,258 @@ +# -*- coding: utf-8 -*- +# @Author: Owl +# @Date: 2025/11/10 17:59 +# @Description: +# -*- coding:utf-8 -*- +#import logging + +import pymysql +import pymysql.cursors +from datetime import datetime + +from flask import g +from app.db_func_base import * + +class TaskDbHelper(TableDbHelperBase): + + def __init__(self, pool): + self.db_pool = pool + self.DB_Name = 'task' + + + + def query_task(self, taskno, nodeid, area_id): + sql_query = "select * from `task` where nodeid='%s' and taskno='%s' and area_id = %s" % (nodeid, taskno, area_id) + tasks = self.do_select(sql_query) + if len(tasks) != 1: + logging.error('query_ledger error! %s' % (sql_query)) + return None + return tasks[0] + + def query_task_executor(self, nodeid, area_id): + sql_query = "select user_name from user.user where userno in (select userno from user.area_user where nodeid = '%s' and area_id = %s) and department = '信号调优团队'" % (nodeid, area_id) + executors = self.do_select(sql_query) + if len(executors) <= 0: + logging.error('query_task_executor is null! %s' % (sql_query)) + return [] + return executors + + + def query_task_src(self, nodeid, area_id): + sql_query = "select distinct src from `task_src` where nodeid='%s' and area_id = %s" % (nodeid, area_id) + srcs = self.do_select(sql_query) + if len(srcs) <= 0: + logging.error('query_task_src is null! %s' % (sql_query)) + return [] + return srcs + + def query_task_type(self, nodeid, area_id): + sql_query = "select * from `task_type` where nodeid='%s' and area_id = %s" % (nodeid, area_id) + types = self.do_select(sql_query) + if len(types) <= 0: + logging.error('query_task_type is null! %s' % (sql_query)) + return [] + return types + + def query_all_tak_list(self, nodeid, area_id): + """查询路口的全部台账履历""" + sql_query = "select * from `task` where nodeid='%s' and area_id = %s and record_state=0 order by task_state asc, progress asc, plan_end_time desc" % (nodeid, area_id) + res_list = self.do_select(sql_query) + for res in res_list: + logging.info(res['update_time']) + if res['update_time'] is not None: + res['update_time'] = res['update_time'].strftime('%Y-%m-%d %H:%M:%S') + return res_list + + def query_task_list(self, task_name,task_type,data_type, task_class,plan_begin_time,plan_end_time,publish_time,task_state,executor,task_src, nodeid): + """查询路口的全部台账履历""" + sql_query = "select * from `task` where nodeid='%s' and record_state=0" % (nodeid) + + if task_name is not None and len(task_name)>0: + sql_query += " and task_name like '%%%s%%'" %(task_name) + + if task_type is not None and len(task_type)>0: + sql_query += " and task_type='%s'" %(task_type) + + if data_type is not None and len(data_type) > 0: + data_type += " and task_type='%s'" % (data_type) + + if task_class is not None and len(task_class)>0: + sql_query += " and task_class='%s'" %(task_class) + + if plan_begin_time is not None and len(plan_begin_time)>0: + sql_query += " and plan_begin_time='%s'" %(plan_begin_time) + + if plan_end_time is not None and len(plan_end_time)>0: + sql_query += " and plan_end_time='%s'" %(plan_end_time) + + if publish_time is not None and len(publish_time)>0: + sql_query += " and publish_time='%s'" %(publish_time) + + if task_state is not None and len(task_state)>0: + sql_query += " and task_state='%s'" %(task_state) + + if executor is not None and len(executor)>0: + sql_query += " and executor='%s'" %(executor) + + if task_src is not None and len(task_src) > 0: + sql_query += " and task_src='%s'" % (task_src) + res_list = self.do_select(sql_query) + if len(res_list) > 0: + for res in res_list: + logging.info(res['update_time']) + if res['update_time'] is not None: + res['update_time'] = res['update_time'].strftime('%Y-%m-%d %H:%M:%S') + return res_list + + def query_completed_task_cross_list(self, nodeid, area_id): + sql_query = "select crossid,name from tmnet.cross where crossid in (select crossids from task where task_state=4 and nodeid='%s') and nodeid='%s';" % (nodeid, nodeid) + + return self.do_select(sql_query) + + def query_completed_task_by_cross(self, crossid, nodeid, area_id): + sql_query = "select * from task where task_state=4 and crossids='%s' and nodeid='%s' and area_id = %s;" % (crossid, nodeid, area_id) + + return self.do_select(sql_query) + + def get_last_taskno(self, creatorid, task_name, nodeid): + sql_query = "select max(taskno) as taskno from task where creatorid='%s' and task_name='%s' and nodeid='%s';" % (creatorid,task_name, nodeid) + tasknos = self.do_select(sql_query) + + if len(tasknos) != 1: + logging.error('get_last_taskno error! %s' % (sql_query)) + return None + return tasknos[0]['taskno'] + + def get_task_no(self, nodeid, area_id, task_name, task_type, task_class, data_type, plan_begin_time, plan_end_time, executor, task_src, comment): + sql = f""" + select taskno from task where nodeid = '{nodeid}' and area_id = {area_id} and task_name = '{task_name}' and task_type = {task_type} + and task_class = {task_class} and data_type = {data_type} and plan_begin_time = {plan_begin_time} + and plan_end_time = {plan_end_time} and executor = '{executor}' and task_src = '{task_src}' and comment = '{comment}' + """ + res = self.do_select(sql) + if len(res) > 0: + return res[0]['taskno'] + return None + + def delete_task(self, taskno, nodeid, area_id): + taskno_str = ','.join(str(num) for num in taskno) + sql = "update `task` set record_state=1 where taskno in (%s) and nodeid='%s' and area_id = %s;" % (taskno_str, nodeid, area_id) + # 执行删除操作 + return self.do_execute(sql) + + def query_task_history(self, taskno, nodeid, area_id): + """查询路口的全部台账履历""" + sql_query = "select * from task_history where taskno='%s' and nodeid='%s' and area_id = %s order by history_date;" % (taskno, nodeid, area_id) + return self.do_select(sql_query) + + def query_task_progress_history(self, yesterday_date, nodeid, area_id): + """查询路口的全部台账履历""" + + sql_query = (f""" + select + t2.task_name, + content + from + (select * from task.task_history where date(history_date)='{yesterday_date}' and nodeid='{nodeid}' and area_id = {area_id} and content like '%%operation%%任务进度%%content%%' order by history_date) t1 + left join + (select task_name,taskno from task.task) t2 + on t1.taskno = t2.taskno; + """) + + return self.do_select(sql_query) + + def add_task_history(self, taskno, nodeid, area_id, operator,remark): + """查询路口的全部台账履历""" + now = datetime.now() + time_str = now.strftime('%Y-%m-%d %H:%M:%S') + sql_insert = "insert into task_history (taskno,nodeid, area_id, operator,content, history_date) values('%s',%s,'%s','%s','%s','%s')" % (taskno, nodeid, area_id, operator,remark, time_str) + count = self.do_execute(sql_insert) + if count != 1: + logging.error('add_task_history error! %s' % (sql_insert)) + return 0 + return count + + def add_task(self, timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time, + executor, progress, task_state, description, crossids, sectionids, arteryids, comment, + record_state, task_src, task_class, nodeid, area_id): + sql_insert = "insert into task (timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time," \ + " executor, progress, task_state, description, crossids, sectionids, arteryids, comment," \ + " record_state, task_src, task_class, nodeid) values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s', %s)" % ( + timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time, + executor, progress, task_state, description, crossids, sectionids, arteryids, comment, + record_state, task_src, task_class, nodeid, area_id) + + count = self.do_execute(sql_insert) + if count != 1: + logging.error('add_task error! %s' % (sql_insert)) + return 0 + return count + + def update_task_info(self, taskno, modify_data, nodeid, area_id): + update_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + sql = f"update task.task set %s, update_time = '%s' where taskno = '%s' and nodeid = %s and area_id = %s" %(modify_data, update_time, taskno, nodeid, area_id) + return self.do_execute(sql) + ########################################################################## + + def insert_upload_file_record(self, taskno, nodeid, download_url): + sql = f""" + insert into task.task_upload_file_record (taskno, nodeid, download_url) + values (%d , %d, '%s') + """ % (int(taskno), int(nodeid), download_url) + return self.do_execute(sql) + + def query_task_file(self, nodeid, area_id): + sql = f""" + select * from task.task_upload_file_record where nodeid = %d and area_id = %s + """ % (int(nodeid), area_id) + row_list = self.do_select(sql) + tmp_dict = {} + for row in row_list: + if row['taskno'] not in tmp_dict.keys(): + tmp_dict[row['taskno']] = [{ + 'id': row['id'], + 'download_url': row['download_url'] + }] + else: + tmp_dict[row['taskno']].append({ + 'id': row['id'], + 'download_url': row['download_url'] + }) + return tmp_dict + + def query_task_file_by_taskno(self, nodeid, taskno, area_id): + sql = f""" + select id, download_url from task.task_upload_file_record where nodeid = %d and taskno = %d and area_id = %d + """ % (int(nodeid), int(taskno), int(area_id)) + row_list = self.do_select(sql) + download_url_list = [] + for row in row_list: + download_url_list.append({ + 'id': row['id'], + 'download_url': row['download_url'] + }) + return download_url_list + + def del_task_file(self, nodeid, taskno, file_name, file_id): + download_url = f'/api/download_task_file?nodeid={nodeid}&taskno={taskno}&file_name={file_name}' + sql = f""" + delete from task.task_upload_file_record where nodeid = %d and taskno = %d and download_url = '%s' and id = %d + """ % (int(nodeid), int(taskno), download_url, int(file_id)) + return self.do_execute(sql) + + def update_task_state(self, nodeid, area_id, task_no, state): + sql = "update task set state='%s' where nodeid='%s' and area_id='%s' and task_no='%s'" % (state, nodeid, area_id, task_no) + return self.do_execute(sql) + + # 私有属性示例 + __secret_code = "Private Info" + + # 私有方法示例 + def __private_activity(self): + print("This is a private activity.") + +# +# if __name__ == '__main__': +# tt_5min = get_latest_5min_timestamp() +# print(tt_5min) +# print(get_today_str()) diff --git a/app/task_worker.py b/app/task_worker.py new file mode 100644 index 0000000..cd7fb49 --- /dev/null +++ b/app/task_worker.py @@ -0,0 +1,1041 @@ +# -*- coding: utf-8 -*- +# @Author: Owl +# @Date: 2025/11/10 17:56 +# @Description: + +import json + +from flask import request, send_file +from pypinyin import lazy_pinyin, Style + +from app.common_worker import * + +ALLOWED_EXTENSION_TASK = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'doc', 'docx', 'xlsx', 'xls', 'pptx', 'ppt'} + + +def do_query_task_list_parameter(params): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_user_areas(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = [int(row['area_id']) for row in area_list] + if int(area_id) not in area_list: + return json.dumps(make_common_res(6, '用户信息异常')) + + res = make_common_res(0, 'ok') + res['data'] = {} + + #任务类型 + task_types = db_task.query_task_type(nodeid, area_id) + task_types_res = {} + for task_type in task_types: + task_types_res[task_type['task_type_no']] = task_type['task_type'] + res['data']['task_type'] = task_types_res + # 路网(数据)类型 + res['data']['data_type'] = {0: '不关联', 1: '路口任务', 2: '干线协调任务'} + #任务等级 0低(无需审批),1中(可事后审批),2高(审批后执行) + res['data']['task_class'] = {0: '低(无需审批)', 1: '中(可事后审批)', 2: '高(审批后执行)'} + #任务状态 + res['data']['task_state'] = {0:'待下发', 1: '待审批', 2: '进行中未审批', 3: '进行中', 4: '完成'} + + + #负责人 + executors = db_task.query_task_executor(nodeid, area_id) + if len(executors)<=0: + logging.error(' query_task_executor没有数据!') + + executor_list = [] + for executor in executors: + executor_list.append(executor['user_name']) + res['data']['executor'] = executor_list + + #需求来源 + srcs = db_task.query_task_src(nodeid, area_id) + if len(srcs)<=0: + logging.error(' query_task_src没有数据!') + src_list = [] + for src in srcs: + src_list.append(src['src']) + res['data']['task_src'] = src_list + + return json.dumps(res) + + +def do_query_task_list(params): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_user_areas(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = [int(row['area_id']) for row in area_list] + if int(area_id) not in area_list: + return json.dumps(make_common_res(6, '用户信息异常')) + + #任务名称`task_name` varchar(300) DEFAULT NULL COMMENT '任务名称', + #任务类型`task_type` varchar(300) DEFAULT NULL COMMENT '任务类型', + #任务等级`task_class` varchar(300) DEFAULT NULL COMMENT '需求等级'; + #计划开始时间`plan_begin_time` bigint DEFAULT NULL COMMENT '计划开始时间', + #计划结束时间`plan_end_time` bigint DEFAULT NULL COMMENT '计划结束时间', + #任务状态`task_state` int NOT NULL COMMENT '任务状态 0未开始,1进行中,2完成,3挂起,4废止', + #负责人`executor` varchar(300) DEFAULT NULL COMMENT '负责人', + #需求来源 `task_src` varchar(300) DEFAULT NULL COMMENT '需求来源'; + all_task_list = db_task.query_all_tak_list(nodeid, area_id) + upload_file_dict = db_task.query_task_file(nodeid, area_id) + for task_info in all_task_list: + if task_info['taskno'] in upload_file_dict.keys(): + task_info['download_url_list'] = upload_file_dict[task_info['taskno']] + else: + task_info['download_url_list'] = [] + name_list, type_list, class_list, begin_list, end_list, state_list, executor_list, task_src_list = [], [], [], [], [], [], [], [] + task_name = params.get('task_name') + task_type = params.get('task_type') + task_class = params.get('task_class') + plan_begin_time = params.get('plan_begin_time') + plan_end_time = params.get('plan_end_time') + task_state = params.get('task_state') + executor = params.get('executor') + task_src = params.get('task_src') + task_no = None + if task_name.isdigit(): + task_no = task_name + else: + if task_name and any('\u4e00' <= c <= '\u9fff' for c in task_name): + # 判定关键字是否包含中文 如果是 则认为是关键字 进行模糊查询 + for task_info in all_task_list: + if task_name in task_info['task_name']: + name_list.append(task_info) + elif task_name and not any('\u4e00' <= c <= '\u9fff' for c in task_name): + # 如果不存在中文 则认为是拼音或者首字母 进行模糊查询 + for task_info in all_task_list: + if any('\u4e00' <= c <= '\u9fff' for c in task_info['task_name']): + task_name_pinyin = lazy_pinyin(task_info['task_name'], style=Style.NORMAL) + else: + task_name_pinyin = task_info['task_name'] + if len(task_name) > 1 and task_name in ''.join(task_name_pinyin): + name_list.append(task_info) + continue + for i in range(len(task_name)): + if any('\u4e00' <= c <= '\u9fff' for c in task_info['task_name']) and task_name[i] == list(task_name_pinyin)[i][0] and task_info not in name_list: + name_list.append(task_info) + if task_type and len(task_type) > 0: + type_list.extend([task_info for task_info in all_task_list if int(task_type) == task_info['task_type']]) + if len(type_list) == 0: + # 用于多种筛选条件取交集使用 + type_list.append({'arteryids': -1}) + if task_class and len(task_class) > 0: + class_list.extend([task_info for task_info in all_task_list if int(task_class) == task_info['task_class']]) + if len(class_list) == 0: + # 用于多种筛选条件取交集使用 + class_list.append({'arteryids': -1}) + if plan_begin_time and len(plan_begin_time) > 0: + begin_list.extend([task_info for task_info in all_task_list if int(plan_begin_time) <= task_info['plan_begin_time']]) + if len(begin_list) == 0: + # 用于多种筛选条件取交集使用 + begin_list.append({'arteryids': -1}) + if plan_end_time and len(plan_end_time) > 0: + end_list.extend([task_info for task_info in all_task_list if int(plan_end_time) >= task_info['plan_end_time']]) + if len(end_list) == 0: + # 用于多种筛选条件取交集使用 + end_list.append({'arteryids': -1}) + if task_state and len(task_state) > 0: + state_list.extend([task_info for task_info in all_task_list if int(task_state) == task_info['task_state']]) + if len(state_list) == 0: + # 用于多种筛选条件取交集使用 + state_list.append({'arteryids': -1}) + if executor and len(executor) > 0: + executor_list.extend([task_info for task_info in all_task_list if executor == task_info['executor']]) + if len(executor_list) == 0: + # 用于多种筛选条件取交集使用 + executor_list.append({'arteryids': -1}) + if task_src and len(task_src) > 0: + task_src_list.extend([task_info for task_info in all_task_list if task_src == task_info['task_src']]) + if len(task_src_list) == 0: + # 用于多种筛选条件取交集使用 + task_src_list.append({'arteryids': -1}) + non_empty_lists = [lst for lst in [name_list, type_list, class_list, begin_list, end_list, state_list, executor_list, task_src_list] if len(lst) > 0] + if non_empty_lists: + res_list = intersect_dicts(non_empty_lists) + else: + res_list = [] + if task_no and len(task_no) > 0: + res_list = [task_info for task_info in all_task_list if int(task_no) == task_info['taskno']] + if not task_name and not task_type and not task_class and not plan_begin_time and not plan_end_time and not task_state and not executor and not task_src and not task_no: + res_list = all_task_list + # task_list = db_task.query_task_list(task_name,task_type,data_type, task_class,plan_begin_time,plan_end_time,publish_time,task_state,executor,task_src, nodeid) + filtered_list = [d for d in res_list if d['arteryids'] != -1] + for task_info in filtered_list: + comment = task_info['comment'] + if '_split_suffix_for_query_info' in comment: + comment_list = comment.split('_split_suffix_for_query_info') + comment_list.pop() + else: + comment_list = [comment] + task_info['comment'] = comment_list + sorted_list = sorted(filtered_list, key=sort_key) + res = make_common_res(0, 'ok') + res['nodeid'] = nodeid + res['area_id'] = area_id + res['data'] = sorted_list + res['desc'] = '' + + return json.dumps(res) + + +def sort_key(item): + return (item['task_state'], item['progress'], item['plan_end_time']) + + +def make_hashable(obj): + """将对象转换为可哈希的类型""" + if isinstance(obj, dict): + return tuple(sorted((k, make_hashable(v)) for k, v in obj.items())) + elif isinstance(obj, list): + return tuple(make_hashable(v) for v in obj) + else: + return obj + + +def intersect_dicts(list_of_dicts): + """计算多个字典列表的交集""" + if not list_of_dicts: + return [] + + # 将第一个列表中的字典转换为可哈希的元组 + intersection = set(make_hashable(d) for d in list_of_dicts[0]) + + # 与后续的每个列表进行交集操作 + for lst in list_of_dicts[1:]: + current_set = set(make_hashable(d) for d in lst) + intersection &= current_set + + # 将交集中的元组转换回字典 + return [dict(t) for t in intersection] + + +def do_remove_task(params): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_user_areas(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = [int(row['area_id']) for row in area_list] + if int(area_id) not in area_list: + return json.dumps(make_common_res(6, '用户信息异常')) + tasknos = check_param(params, 'tasknos') + if not tasknos or len(tasknos) < 1: + return json.dumps(make_common_res(2, '任务id缺失,请检查后重试')) + count = db_task.delete_task(tasknos, nodeid) + if count != len(tasknos): + logging.error(str(params) + ' do_remove_task删除失败!') + res = make_common_res(-1, '删除失败') + return json.dumps(res) + + res = make_common_res(0, 'ok') + res['nodeid'] = nodeid + res['desc'] = '' + return json.dumps(res) + + +def do_distribute_task(params): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_user_areas(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = [int(row['area_id']) for row in area_list] + if int(area_id) not in area_list: + return json.dumps(make_common_res(6, '用户信息异常')) + taskno = check_param(params, 'taskno') + if not taskno: + return json.dumps(make_common_res(2, '任务id缺失,请检查后重试')) + operator = check_param(params, 'operator') + if not operator: + return json.dumps(make_common_res(2, '操作员信息缺失,请检查后重试')) + + task_old = db_task.query_task(taskno, nodeid, area_id) + if task_old is None: + logging.error(str(params) + ' do_distribute_task找不到对应的任务!') + return json.dumps(make_common_res(-1, '找不到对应的任务')) + + #任务等级,0正常,1紧急,2特急 + #任务状态 0待下发,1待审批,2进行中未审批,3进行中,4完成 + if task_old['task_class'] == 2: + new_state = '1' + elif task_old['task_class'] == 1: + new_state = '2' + else: + new_state = '3' + + count = db_task.update_task_state(nodeid, area_id, taskno, new_state) + if count!=1: + logging.error(str(params) + ' do_distribute_task更新任务状态失败!') + res = make_common_res(-1, '更新任务状态失败') + return json.dumps(res) + + basemark = {'operation': '下发任务', 'content': {}} + count = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False)) + if count!=1: + logging.error(str(params) + ' do_distribute_task添加任务履历失败!') + res = make_common_res(-1, '添加任务履历失败') + return json.dumps(res) + task_old_info = db_task.query_task(taskno, nodeid, area_id) + + # 20241022 补充开发下发任务的同时修改任务基本信息的功能 + modify_data, modify_item = gen_update_sql(params, task_old_info) + if len(modify_data) > 0: + ret = db_task.update_task_info(taskno, modify_data, nodeid, area_id) + if ret != 1: + return json.dumps(make_common_res(3, '任务基本信息修改失败')) + basemark = {'operation': '编辑任务', 'content': {'content': modify_item}} + his_ret = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False)) + if his_ret != 1: + return json.dumps(make_common_res(3, '任务履历添加失败')) + + res = make_common_res(0, 'ok') + res['nodeid'] = nodeid + res['desc'] = '' + return json.dumps(res) + + +def do_approval(params): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_user_areas(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = [int(row['area_id']) for row in area_list] + if int(area_id) not in area_list: + return json.dumps(make_common_res(6, '用户信息异常')) + taskno = check_param(params, 'taskno') + if not taskno: + return json.dumps(make_common_res(2, '任务id缺失,请检查后重试')) + operator = check_param(params, 'operator') + if not operator: + return json.dumps(make_common_res(2, '操作员信息缺失,请检查后重试')) + + #task_old = db_task.query_task(taskno, nodeid) + + # 任务等级,0正常,1紧急,2特急 + # 任务状态 0待下发,1待审批,2进行中未审批,3进行中,4完成 + new_state = '3' + + count = db_task.update_task_state(nodeid, area_id, taskno, new_state) + if count!=1: + logging.error(str(params) + ' do_approval更新任务状态失败!') + res = make_common_res(-1, '更新任务状态失败') + return json.dumps(res) + + basemark = {'operation': '审批任务', 'content': {}} + count = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False)) + if count!=1: + logging.error(str(params) + ' do_approval添加任务履历失败!') + res = make_common_res(-1, '添加任务履历失败') + return json.dumps(res) + # 20241022 新增审批任务的同时修改任务部分基础属性的功能 + task_old_info = db_task.query_task(taskno, nodeid, area_id) + modify_data, modify_item = gen_update_sql(params, task_old_info) + if len(modify_data) > 0: + ret = db_task.update_task_info(taskno, modify_data, nodeid, area_id) + if ret != 1: + return json.dumps(make_common_res(3, '任务基本信息修改失败')) + basemark = {'operation': '编辑任务', 'content': {'content': modify_item}} + his_ret = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False)) + if his_ret != 1: + return json.dumps(make_common_res(3, '任务履历添加失败')) + + res = make_common_res(0, 'ok') + res['nodeid'] = nodeid + res['desc'] = '' + return json.dumps(res) + + +def do_complete_task(params): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_user_areas(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = [int(row['area_id']) for row in area_list] + if int(area_id) not in area_list: + return json.dumps(make_common_res(6, '用户信息异常')) + taskno = check_param(params, 'taskno') + if not taskno: + return json.dumps(make_common_res(2, '任务id缺失,请检查后重试')) + operator = check_param(params, 'operator') + if not operator: + return json.dumps(make_common_res(2, '操作员信息缺失,请检查后重试')) + + task_old = db_task.query_task(taskno, nodeid, area_id) + if not task_old: + return json.dumps(make_common_res(3, '找不到对应的任务')) + if task_old['task_state'] == '4': + return json.dumps(make_common_res(0, '当前任务已完成')) + # 任务等级,0正常,1紧急,2特急 + # 任务状态 0待下发,1待审批,2进行中未审批,3进行中,4完成 + new_state = '4' + + count = db_task.update_task_state(nodeid, area_id, taskno, new_state) + if count!=1: + logging.error(str(params) + ' do_complete_task更新任务状态失败!') + res = make_common_res(-1, '更新任务状态失败') + return json.dumps(res) + + basemark = {'operation': '完成任务', 'content': {}} + count = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False)) + if count!=1: + logging.error(str(params) + ' do_complete_task添加任务履历失败!') + res = make_common_res(-1, '添加任务履历失败') + return json.dumps(res) + + res = make_common_res(0, 'ok') + res['nodeid'] = nodeid + res['desc'] = '' + return json.dumps(res) + + +def do_add_task(params): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_user_areas(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = [int(row['area_id']) for row in area_list] + if int(area_id) not in area_list: + return json.dumps(make_common_res(6, '用户信息异常')) + + task_type = check_param(params, 'task_type') + if not task_type: + return json.dumps(make_common_res(2, '任务类型缺失,请检查后重试')) + # if int(task_type) not in (1, 2, 3, 4, 5): + # return json.dumps(make_common_res(2, '任务类型参数异常,请检查后重试')) + task_name = check_param(params, 'task_name') + if not task_name: + return json.dumps(make_common_res(2, '任务名称缺失,请检查后重试')) + task_class = check_param(params, 'task_class') # 需求等级,0低(无需审批),1中(可事后审批),2高(审批后执行) + if not task_class: + return json.dumps(make_common_res(2, '任务等级缺失,请检查后重试')) + data_type = check_param(params, 'data_type') + if not data_type: + return json.dumps(make_common_res(2, '关联路网状态缺失,请检查后重试')) + crossids, arteryids, sectionids = '', '', '' + if int(data_type) == 1: + crossids = check_param(params, 'crossids') + if not crossids: + return json.dumps(make_common_res(2, '当前所选关联路网状态为:关联路口,但关联路口信息缺失,请检查后重试')) + elif int(data_type) == 2: + arteryids = check_param(params, 'arteryids') + if not arteryids: + return json.dumps(make_common_res(2, '当前所选关联路网状态为:关联干线,但关联干线信息缺失,请检查后重试')) + add_type = check_param(params, 'add_type') + if not add_type or add_type == 'normal': + plan_begin_time = check_param(params, 'plan_begin_time') + if not plan_begin_time: + return json.dumps(make_common_res(2, '计划开始时间缺失,请检查后重试')) + plan_end_time = check_param(params, 'plan_end_time') + if not plan_end_time: + return json.dumps(make_common_res(2, '计划结束时间缺失,请检查后重试')) + else: + plan_begin_time = 0 + plan_end_time = 0 + task_src = check_param(params, 'task_src') + if not task_src: + return json.dumps(make_common_res(2, '需求来源缺失,请检查后重试')) + # if task_src not in ('舆情', '交警交办任务', '现场拥堵', '平台优化策略'): + # return json.dumps(make_common_res(2, '需求来源异常,可选项为:舆情、交警交办任务、现场拥堵、平台优化策略,请检查后重试')) + + executor = check_param(params, 'executor') + if not executor: + return json.dumps(make_common_res(2, '任务负责人缺失,请检查后重试')) + publish_time = check_param(params, 'publish_time') + if not publish_time: + return json.dumps(make_common_res(2, '发布时间缺失,请检查后重试')) + publish_time = int(publish_time) + timestamp = publish_time + description = check_param(params, 'description') + if not description: + return json.dumps(make_common_res(2, '任务描述缺失,请检查后重试')) + comment = check_param(params, 'comment') + if not comment: + comment = '' + operator = check_param(params, 'operator') + if not operator: + return json.dumps(make_common_res(2, '操作员信息缺失,请检查后重试')) + + creatorid = operator + progress, record_task_state, task_state = 0, 0, 0 + if int(task_class) == 0: + task_state = 3 + elif int(task_class) == 1: + task_state = 2 + elif int(task_class) == 2: + task_state = 1 + if comment and add_type and add_type != 'normal': + comment = "【推荐策略】" + comment + "_split_suffix_for_query_info" + if comment and (not add_type or add_type == 'normal'): + comment = "【" + datetime.now().date().strftime('%Y-%m-%d') + "】" + comment + "_split_suffix_for_query_info" + if add_type and add_type != 'normal': + task_state = 0 + count = db_task.add_task(timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time, publish_time, + executor, progress, task_state, description, crossids, sectionids, arteryids, comment, + record_task_state, task_src, task_class, nodeid, area_id) + if count != 1: + logging.error(str(params) + ' 添加任务报错!') + res = make_common_res(-1, '添加任务报错。') + return json.dumps(res) + + basemark = {'operation': '创建任务', 'content': {}} + taskno = db_task.get_task_no(nodeid, area_id, task_name, task_type, task_class, data_type, plan_begin_time, plan_end_time, executor, task_src, comment) + if taskno is None: + logging.error(str(params) + ' do_add_task更新任务状态失败!') + res = make_common_res(-1, '更新任务状态失败') + return json.dumps(res) + if add_type and add_type != 'normal': + basemark = {'operation': '诊断问题确认', 'content': {}} + count = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False)) + if count != 1: + logging.error(str(params) + ' do_add_task添加任务履历失败!') + res = make_common_res(-1, '添加任务履历失败') + return json.dumps(res) + + res = make_common_res(0, 'ok') + res['nodeid'] = nodeid + res['desc'] = '' + res['taskno'] = taskno + return json.dumps(res) + + +#获取所有具有'已完成'任务的路口 +#params +#nodeid +def do_query_completed_task_cross_list(params): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_user_areas(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = [int(row['area_id']) for row in area_list] + if int(area_id) not in area_list: + return json.dumps(make_common_res(6, '用户信息异常')) + + task_list = db_task.query_completed_task_cross_list(nodeid, area_id) + + res = make_common_res(0, 'ok') + res['nodeid'] = nodeid + res['data'] = task_list + res['desc'] = '' + + return json.dumps(res) + + +#获取指定路口的所有的'已完成'任务 +#params +#nodeid +#crossid +def do_query_completed_task_by_cross(params): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_user_areas(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = [int(row['area_id']) for row in area_list] + if int(area_id) not in area_list: + return json.dumps(make_common_res(6, '用户信息异常')) + crossid = check_param(params, 'crossid') + if not crossid: + return json.dumps(make_common_res(2, '缺少crossid, 请刷新后重试')) + task_list = db_task.query_completed_task_by_cross(crossid, nodeid, area_id) + + res = make_common_res(0, 'ok') + res['nodeid'] = nodeid + res['data'] = task_list + res['desc'] = '' + + return json.dumps(res) + + +def do_update_task_progress_field(task_old, params, filed_name, filed_desc, taskno): + field = params.get(filed_name) + basemarkinfo = '' + if field is None: + return basemarkinfo + + old_field = task_old[filed_name] + if type(task_old[filed_name]) is int and type(field) is str: + old_field = str(old_field) + + + if field != old_field: + count = db_task.do_update('`task`', filed_name, field, 'taskno', taskno) + if count != 1: + logging.error(str(params) + ' do_update_task_progress_field更新任务状态失败!') + res = make_common_res(-1, '更新任务状态失败') + return basemarkinfo + + if filed_name == 'progress' and old_field != field: + basemarkinfo = '【编辑】任务进度:%s%%->%s%%' %(old_field, field) + + return basemarkinfo + + +# 无调用 +def do_update_task_field(task_old, params, filed_name, filed_desc, taskno): + field = params.get(filed_name) + basemarkinfo = '' + if field is None: + return basemarkinfo + + old_field = task_old[filed_name] + if type(task_old[filed_name]) is int and type(field) is str: + old_field = str(old_field) + + if field != old_field: + basemarkinfo = '%s ' %(filed_desc) + count = db_task.do_update('`task`', filed_name, field, 'taskno', taskno) + if count != 1: + logging.error(str(params) + ' do_update_task_progress_field更新任务状态失败!') + res = make_common_res(-1, '更新任务状态失败') + return json.dumps(res) + + return basemarkinfo + + +def do_update_task(params): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_user_areas(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = [int(row['area_id']) for row in area_list] + if int(area_id) not in area_list: + return json.dumps(make_common_res(6, '用户信息异常')) + taskno = check_param(params, 'taskno') + if not taskno: + return json.dumps(make_common_res(2, '任务id缺失,请检查后重试')) + operator = check_param(params, 'operator') + if not operator: + return json.dumps(make_common_res(3, '操作者信息缺失,请检查后重试')) + + + task_old = db_task.query_task(taskno, nodeid, area_id) + if task_old is None: + logging.error(str(params) + ' do_update_task 任务在数据库中不存在!') + res = make_common_res(-1, '任务在数据库中不存在。') + return json.dumps(res) + task_old_info = db_task.query_task(taskno, nodeid, area_id) + update_sql, basemarkinfo = gen_update_sql(params, task_old_info) + # print(update_sql, basemarkinfo) + if len(update_sql) > 0: + ret = db_task.update_task_info(taskno, update_sql, nodeid, area_id) + if ret != 1: + return json.dumps(make_common_res(3, '任务基本信息修改失败')) + + progress_info = do_update_task_progress_field(task_old, params, 'progress', '任务进度', taskno) + if len(progress_info) > 0: + basemark = { + 'operation': '编辑任务', + 'content': { + 'progress_info': progress_info, + 'content': basemarkinfo + } + } + else: + basemark = { + 'operation': '编辑任务', + 'content': { + 'content': basemarkinfo + } + } + if (len(basemarkinfo) > 0 and basemarkinfo != '') or len(progress_info) > 0: + ret = db_task.add_task_history(taskno, nodeid, area_id, operator, json.dumps(basemark, ensure_ascii=False)) + if ret != 1: + return json.dumps(make_common_res(3, '任务履历信息添加失败')) + res = make_common_res(0, 'ok') + res['nodeid'] = nodeid + res['desc'] = '' + return json.dumps(res) + + +def allowed_file_for_task(filename): + # 检查文件名是否为空,并且文件扩展名是否在允许的列表中 + return '.' in filename and \ + filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSION_TASK + + +def do_task_upload(params): + nodeid = check_param({'nodeid': request.form.get('nodeid')}, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(3, "nodeid检查报错")) + taskno = check_param({'taskno': request.form.get('taskno')}, 'taskno') + if not taskno: + return json.dumps(make_common_res(2, '任务id异常,请检查后重试')) + if len(request.files.keys()) < 1: + return json.dumps(make_common_res(1, '文件不存在')) + suc_file_num, suc_mysql_num = 0, 0 + for key in request.files.keys(): + file = request.files[key] + try: + if not (file and file.filename and allowed_file_for_task(file.filename)): + return json.dumps(make_common_res(1, '文件格式错误')) + if not os.path.exists(f'./app/static/task_file/{nodeid}/task_no_{taskno}'): + os.makedirs(f'./app/static/task_file/{nodeid}/task_no_{taskno}') + os.chmod(f'./app/static/task_file/{nodeid}/task_no_{taskno}', 0o777) + file.save(f'./app/static/task_file/{nodeid}/task_no_{taskno}/{file.filename}') + suc_file_num += 1 + download_url = f'/api/download_task_file?nodeid={nodeid}&taskno={taskno}&file_name={file.filename}' + ret = db_task.insert_upload_file_record(taskno, nodeid, download_url) + if ret == 1: + suc_mysql_num += 1 + else: + break + except Exception as e: + logging.error(str(e) + '任务文件上传失败') + if os.path.exists(f'./app/static/task_file/{nodeid}/task_no_{taskno}/{file.filename}'): + os.remove(f'./app/static/task_file/{nodeid}/task_no_{taskno}/{file.filename}') + logging.info('回滚上传操作') + break + if suc_file_num == suc_mysql_num == len(request.files.keys()): + res = make_common_res(0, 'ok') + res['nodeid'] = '' + res['desc'] = '' + return json.dumps(res) + else: + return json.dumps(make_common_res(4, '任务文件上传失败')) + + +def do_task_file_download(params): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_user_areas(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = [int(row['area_id']) for row in area_list] + if int(area_id) not in area_list: + return json.dumps(make_common_res(6, '用户信息异常')) + taskno = check_param(params, 'taskno') + if not taskno: + return json.dumps(make_common_res(2, '任务id异常,请检查后重试')) + file_name = check_param(params, 'file_name') + if not file_name: + return json.dumps(make_common_res(2, '文件名异常,请检查后重试')) + current_dir_path = os.path.dirname(os.path.abspath(__file__)) + file_path = f"{current_dir_path}/static/task_file/{nodeid}/task_no_{taskno}/{file_name}" + if not os.path.isfile(file_path): + logging.error(f"文件不存在: {file_path}") + return json.dumps(make_common_res(2, '文件不存在,请检查后重试')) + + return send_file(file_path, as_attachment=True) + + +def do_query_task_history(params): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_user_areas(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = [int(row['area_id']) for row in area_list] + if int(area_id) not in area_list: + return json.dumps(make_common_res(6, '用户信息异常')) + taskno = check_param(params, 'taskno') + if not taskno: + return json.dumps(make_common_res(2, '任务id异常,请检查后重试')) + + task_history_list = db_task.query_task_history(taskno, nodeid, area_id) + download_url_list = db_task.query_task_file_by_taskno(nodeid, taskno, area_id) + for task_history in task_history_list: + task_history['content'] = task_history['content'].replace('_split_suffix_for_query_info', '') + + res = make_common_res(0, 'ok') + res['nodeid'] = nodeid + res['desc'] = '' + res['data'] = { + 'task_history_list': task_history_list, + 'download_url_list': download_url_list + } + return json.dumps(res) + + +def del_task_file_api(params): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_user_areas(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = [int(row['area_id']) for row in area_list] + if int(area_id) not in area_list: + return json.dumps(make_common_res(6, '用户信息异常')) + taskno = check_param(params, 'taskno') + if not taskno: + return json.dumps(make_common_res(2, '任务id异常,请检查后重试')) + file_name = check_param(params, 'file_name') + if not file_name: + return json.dumps(make_common_res(2, '文件名异常,请检查后重试')) + file_id = check_param(params, 'file_id') + if not file_id: + return json.dumps(make_common_res(2, '文件id异常,请检查后重试')) + ret = db_task.del_task_file(nodeid, taskno, file_name, file_id) + if ret == 1: + res = make_common_res(0, 'ok') + res['nodeid'] = nodeid + res['desc'] = '' + return json.dumps(res) + else: + return json.dumps(make_common_res(4, '删除失败')) + + +def do_query_task_progress_history(params): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_user_areas(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = [int(row['area_id']) for row in area_list] + if int(area_id) not in area_list: + return json.dumps(make_common_res(6, '用户信息异常')) + # taskno = params.get('taskno') + yesterday_date = check_param(params, 'yesterday_date') + task_history_list = db_task.query_task_progress_history(yesterday_date, nodeid, area_id) + if not yesterday_date or yesterday_date == '': + return json.dumps(make_common_res(3, "查询日期缺失")) + + res = make_common_res(0, 'ok') + res['nodeid'] = nodeid + res['desc'] = '' + res['data'] = task_history_list + + return json.dumps(res) + + +def do_query_task_detail(params): + global db_task + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_user_areas(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = [int(row['area_id']) for row in area_list] + if int(area_id) not in area_list: + return json.dumps(make_common_res(6, '用户信息异常')) + + res = make_common_res(0, 'ok') + taskno = check_param(params, 'taskno') + if not taskno: + return json.dumps(make_common_res(2, '任务id缺失,请检查后重试')) + + task = db_task.query_task(taskno, nodeid) + if task is None: + logging.error(str(params) + ' do_query_task_detail 任务在数据库中不存在!') + res = make_common_res(-1, '任务在数据库中不存在。') + return json.dumps(res) + + res['desc'] = '' + res['data'] = task + return json.dumps(res) + + +def gen_update_sql(params, task_old_info): + modify_data, modify_item = '', '' + task_name = check_param(params, 'task_name') + if task_name and task_name != task_old_info['task_name']: + modify_data += f"task_name = '{task_name}'" + modify_item += f"【编辑】任务名称:{task_old_info['task_name']} -> {task_name}" + task_type = check_param(params, 'task_type') + if task_type and int(task_type) != task_old_info['task_type']: + prefix = '' + if modify_data != '': + prefix = 'prefix_for_split_' + modify_data += f", task_type = {int(task_type)}" + else: + modify_data += f'task_type = {int(task_type)}' + modify_item += prefix + modify_item += f"【编辑】任务类型:{task_old_info['task_type']} -> {task_type} " + task_class = check_param(params, 'task_class') + if task_class and int(task_class) != task_old_info['task_class']: + prefix = '' + if modify_data != '': + prefix = 'prefix_for_split_' + modify_data += f', task_class = {int(task_class)}' + else: + modify_data += f'task_class = {int(task_class)}' + modify_item += prefix + modify_item += f"【编辑】任务等级:{task_old_info['task_class']} -> {task_class} " + data_type = check_param(params, 'data_type') + if data_type and int(data_type) != task_old_info['data_type']: + prefix = '' + if modify_data != '': + prefix = 'prefix_for_split_' + modify_data += f', data_type = {int(data_type)}' + else: + modify_data += f'data_type = {int(data_type)}' + if int(data_type) == 1: + crossids = check_param(params, 'crossids') + if crossids and crossids != '' and crossids != task_old_info['crossids']: + modify_data += f", crossids = '{crossids}'" + elif int(data_type) == 2: + arteryids = check_param(params, 'arteryids') + if arteryids and arteryids != '' and arteryids != task_old_info['arteryids']: + modify_data += f", arteryids = '{arteryids}'" + modify_item += prefix + modify_item += f"【编辑】关联路网信息:{task_old_info['data_type']} -> {data_type}" + # plan_begin_time = check_param(params, 'plan_begin_time') + # if plan_begin_time and plan_begin_time != task_old_info['plan_begin_time']: + # if modify_data != '': + # modify_data += f', plan_begin_time = {int(plan_begin_time)}' + # else: + # modify_data += f'plan_begin_time = {int(plan_begin_time)}' + # modify_item += ' 计划开始时间 ' + # plan_end_time = check_param(params, 'plan_end_time') + # if plan_end_time and plan_end_time != task_old_info['plan_end_time']: + # if modify_data != '': + # modify_data += f', plan_end_time = {int(plan_end_time)}' + # else: + # modify_data += f'plan_end_time = {int(plan_end_time)}' + # modify_item += ' 计划结束时间 ' + executor = check_param(params, 'executor') + if executor and executor != task_old_info['executor']: + prefix = '' + if modify_data != '': + prefix = 'prefix_for_split_' + modify_data += f", executor = '{executor}'" + else: + modify_data += f"executor = '{executor}'" + modify_item += prefix + modify_item += f"【编辑】负责人:{task_old_info['executor']} -> {executor} " + task_src = check_param(params, 'task_src') + if task_src and task_src != task_old_info['task_src']: + prefix = '' + if modify_data != '': + prefix = 'prefix_for_split_' + modify_data += f", task_src = '{task_src}'" + else: + modify_data += f"task_src = '{task_src}'" + modify_item += prefix + modify_item += f"【编辑】任务来源:{task_old_info['task_src']} -> {task_src} " + description = check_param(params, 'description') + if description and description != task_old_info['description']: + prefix = '' + if modify_data != '': + prefix = 'prefix_for_split_' + modify_data += f", description = '{description}'" + else: + modify_data += f"description = '{description}'" + modify_item += prefix + modify_item += f"【编辑】任务描述:{task_old_info['description']} -> {description}" + comment = check_param(params, 'comment') + if comment and comment != '': + new_comment = task_old_info['comment'] + "【" + datetime.now().date().strftime('%Y-%m-%d') + "】" + comment + "_split_suffix_for_query_info" + prefix = '' + if modify_data != '': + prefix = 'prefix_for_split_' + modify_data += f", comment = '{new_comment}'" + else: + modify_data += f"comment = '{new_comment}'" + modify_item += prefix + modify_item += f"【添加】任务备注:{comment}" + + return modify_data, modify_item diff --git a/app/views_task.py b/app/views_task.py new file mode 100644 index 0000000..f2b060a --- /dev/null +++ b/app/views_task.py @@ -0,0 +1,148 @@ +# -*- coding: utf-8 -*- +# @Author: Owl +# @Date: 2025/11/10 18:12 +# @Description: + +import json +from flask import Flask, request, jsonify, redirect +from flask_cors import CORS +from app.models import * +from app.task_worker import * +from app.cross_eva_views import app +#app = Flask(__name__) +#CORS(app, resources={r"/api/*": {"origins": "*"}}) + +#获取路口台账的查询时的基础参数 +@app.route('/api/get_task_list_parameter', methods=['GET']) +def cross_task_list_parameter(): + return do_query_task_list_parameter(dict(request.args)) + +#获取全部符合条件的任务 +#task_name +#task_type +#task_class +#plan_begin_time +#plan_end_time +#state +#executor +#task_src +@app.route('/api/get_task_list', methods=['GET']) +def get_task_list(): + return do_query_task_list(dict(request.args)) + + +#获取所有具有'已完成'任务的路口 +#params +#nodeid +@app.route('/api/get_completed_task_cross_list', methods=['GET']) +def get_completed_task_cross_list(): + return do_query_completed_task_cross_list(dict(request.args)) + +#获取指定路口的所有的'已完成'任务 +#params +#nodeid +#crossid +@app.route('/api/get_completed_task_by_cross', methods=['GET']) +def get_completed_task_by_cross(): + return do_query_completed_task_by_cross(dict(request.args)) + + +#参数: +# "nodeid": "9660" +#creatorid +#task_name +#task_type +#plan_begin_time +#plan_end_time +#executor +#description +#crossids 路口id +#sectionids 路段id,选填 +#comment +#task_src +#task_class +@app.route('/api/add_task', methods=['POST']) +def add_task(): + try: + vle = do_add_task(dict(request.form)) + return vle + except pymysql.Error as e: + # 捕获 PyMySQL 异常 + logging.error(f"add_task except: {e}") + return json.dumps(make_common_res(-1, '发生异常')) + +#删除任务 +#参数: +# "nodeid": "9660" +# "taskno" +# 20250103 更新为post接口以支持同时删除多条记录 +@app.route('/api/remove_task', methods=['POST']) +def remove_task(): + return do_remove_task(request.get_json()) + +#下发任务 +#参数: +# "nodeid": "9660" +# "taskno" +@app.route('/api/distribute_task', methods=['POST']) +def distribute_task(): + return do_distribute_task(dict(request.form)) + + +#审批任务 +#参数: +# "nodeid": "9660" +# "taskno" +@app.route('/api/approval_task', methods=['POST']) +def approval_task(): + return do_approval(request.get_json()) + +#完成任务 +#参数: +# "nodeid": "9660" +# "taskno" +@app.route('/api/complete_task', methods=['GET']) +def complete_task(): + return do_complete_task(dict(request.args)) + + +@app.route('/api/update_task', methods=['POST']) +def update_task(): + return do_update_task(dict(request.form)) + + + +#参数: +# "nodeid": "9660" +# "taskno" +@app.route('/api/get_task_history', methods=['GET']) +def get_task_history(): + return do_query_task_history(dict(request.args)) + + +#参数: +# "nodeid": "9660" +# "cross_id" +@app.route('/api/get_task_detail', methods=['GET']) +def get_task_detail(): + return do_query_task_detail(dict(request.args)) + + +@app.route('/api/upload_task_file', methods=['POST']) +def task_upload(): + return do_task_upload(dict(request.args)) + + +@app.route('/api/download_task_file', methods=['GET']) +def task_download(): + return do_task_file_download(dict(request.args)) + + +@app.route('/api/del_task_file', methods=['GET']) +def task_del(): + return del_task_file_api(dict(request.args)) + + +# if __name__ == '__main__': +# init() +# app.run(debug=True)