From 298d340018aa993762ddc747b71d337586c367cd Mon Sep 17 00:00:00 2001 From: yinzijian Date: Sat, 15 Nov 2025 16:22:35 +0800 Subject: [PATCH] =?UTF-8?q?=E8=BF=81=E7=A7=BB=E5=B7=A5=E4=BD=9C=E5=8F=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: yinzijian --- app/cross_eva_views.py | 1 + app/global_source.py | 2 + app/tmnet_db_func.py | 59 ++ app/views_workstation.py | 77 +++ app/work_station_common.py | 174 +++++ app/workstation_db_function.py | 196 ++++++ app/workstation_worker.py | 1087 ++++++++++++++++++++++++++++++++ 7 files changed, 1596 insertions(+) create mode 100644 app/views_workstation.py create mode 100644 app/work_station_common.py create mode 100644 app/workstation_db_function.py create mode 100644 app/workstation_worker.py diff --git a/app/cross_eva_views.py b/app/cross_eva_views.py index dc68582..a5f36f9 100644 --- a/app/cross_eva_views.py +++ b/app/cross_eva_views.py @@ -85,6 +85,7 @@ def update_cross_examine_record_state_api(): from app.user_views import * from app.views_task import * +from app.views_workstation import * if __name__ == '__main__': pass \ No newline at end of file diff --git a/app/global_source.py b/app/global_source.py index 746a584..418df11 100644 --- a/app/global_source.py +++ b/app/global_source.py @@ -6,6 +6,7 @@ from app.models import * from app.models_wave import RoadLinkManager from app.phase_db_func import PhaseTableDbHelper from app.task_db_func import TaskDbHelper +from app.workstation_db_function import WorkstationDbHelper from tool.mysql_common_connector_pool import * from app.user_db_func import * from app.tmnet_db_func import * @@ -90,6 +91,7 @@ db_cross = CrossDbHelper(g_cross_delay_pool) db_task = TaskDbHelper(g_db_pool) db_phasetable = PhaseTableDbHelper(g_db_pool) db_tmnet = TmnetDbHelper(g_roadnet_pool) +db_workstation = WorkstationDbHelper(g_db_pool) nodeid_list = [] diff --git a/app/tmnet_db_func.py b/app/tmnet_db_func.py index 60ee62f..9719f5d 100644 --- a/app/tmnet_db_func.py +++ b/app/tmnet_db_func.py @@ -240,3 +240,62 @@ class TmnetDbHelper(TableDbHelperBase): except Exception as error: self.close(conn, cursor) return None, error + + def query_cross_info_all(self, crossids: []): + conn, cursor = self.connect() + try: + sql = f'''select IFNULL(clui.name, c2.name) as name, + c2.crossid, + IFNULL(clui.location, c2.location) as location, + c2.nodeid, + c2.area_id + from `cross` as c2 + left join `cross_ledger_update_info` as clui on clui.crossid = c2.crossid + where c2.crossid in %s + union all + select udc.name, + udc.crossid, + udc.location, + udc.nodeid, + udc.area_id + from `user_defined_cross` as udc + where udc.crossid in %s''' + print(cursor.mogrify(sql, (crossids, crossids))) + cursor.execute(sql, (crossids, crossids)) + # 获取所有查询结果 + result = cursor.fetchall() + self.close(conn, cursor) + return result, None + except Exception as e: + self.close(conn, cursor) + return None, e + + def query_road_info_all(self, roadids: []): + conn, cursor = self.connect() + try: + sql = f'''select + IFNULL(rlui.name, r.name) as name, + IFNULL(rlui.from_crossid, r.from_crossid) as from_crossid, + IFNULL(rlui.to_crossid, r.to_crossid) as to_crossid, + IFNULL(rlui.src_direct, r.src_direct) as src_direct, + r.nodeid + from `road` as r + left join `road_ledger_update_info` as rlui on rlui.roadid = r.roadid + where r.roadid in %s + union all + select name + from_crossid, + to_crossid, + src_direct, + nodeid + from `user_defined_roads` + where roadid in %s''' + print(cursor.mogrify(sql, (roadids, roadids))) + cursor.execute(sql, (roadids, roadids)) + # 获取所有查询结果 + result = cursor.fetchall() + self.close(conn, cursor) + return result, None + except Exception as e: + self.close(conn, cursor) + return None, e \ No newline at end of file diff --git a/app/views_workstation.py b/app/views_workstation.py new file mode 100644 index 0000000..0e8d348 --- /dev/null +++ b/app/views_workstation.py @@ -0,0 +1,77 @@ +from app.cross_eva_views import app +from app.workstation_worker import * + +@app.route('/api/favorite') +def add_favorite(): + token = request.headers.get('token') + if not token: + token = None + return favorite(dict(request.args), token) + +@app.route('/api/favorite_list') +def favorite_list(): + token = request.headers.get('token') + if not token: + token = None + return get_favorite_list(dict(request.args), token) + +@app.route('/api/favorite_data_list') +def favorite_data_list(): + token = request.headers.get('token') + if not token: + token = None + return get_favorite_data_list(dict(request.args), token) + +@app.route('/api/workstation_task_list') +def workstation_task_list(): + token = request.headers.get('token') + if not token: + token = None + return get_workstation_task_list(dict(request.args), token) + +@app.route('/api/del_favorite_list', methods=['POST']) +def del_favorite_list(): + token = request.headers.get('token') + if not token: + token = None + return delete_favorite_list(request.get_json(), token) + +# @app.route('/api/get_optimize_crosses_list') +# def crosses_list(): +# return get_crosses_list(dict(request.args)) +# +# @app.route('/api/cross_phase_symptom_info') +# def cross_phase_symptom_info(): +# return get_cross_phase_symptom_info(dict(request.args)) +# +# @app.route('/api/cross_optimize_plan_detail') +# def cross_optimize_plan_detail(): +# return get_cross_optimize_plan_detail(dict(request.args)) +# +# @app.route('/api/cross_init_info', methods=['POST']) +# def cross_init_info(): +# return get_cross_init_info(request.get_json()) +# +# @app.route('/api/cross_optimize', methods=['POST']) +# def cross_optimize(): +# return gen_cross_optimize(request.get_json()) +# +# @app.route('/api/save_edit_plan', methods=['POST']) +# def save_edit_plan_detail(): +# return save_edit_plan(request.get_json()) +# +# @app.route('/api/issued_edit_plan', methods=['POST']) +# def issued_edit_plan_detail(): +# return issued_edit_plan(request.get_json()) +# +# @app.route('/api/update_optimize_plan_status') +# def update_optimize_plan_status_detail(): +# return update_optimize_plan_status(dict(request.args)) +# +# @app.route('/api/download_optimize_plan', methods=['POST']) +# def down_optimize_plan(): +# return download_optimize_plan(request.get_json()) + +@app.route('/api/get_road_net_detail') +def get_road_net_detail_detail(): + return get_road_net_detail(dict(request.args)) \ No newline at end of file diff --git a/app/work_station_common.py b/app/work_station_common.py new file mode 100644 index 0000000..b7b75a6 --- /dev/null +++ b/app/work_station_common.py @@ -0,0 +1,174 @@ +import json +from collections import Counter + +def gen_work_station_cross_data_list(cross_data_list,cross_info): + res_list = [] + for cross_data in cross_data_list: + cross_id = cross_data['crossid'] + jam_index = float(cross_data['jam_index']) if 'jam_index' in cross_data and cross_data['jam_index'] else 0.0 + unbalance_index = float(cross_data['unbalance_index']) if 'unbalance_index' in cross_data and cross_data['unbalance_index'] else 0.0 + flow = int(cross_data['flow']) if 'flow' in cross_data and cross_data['flow'] else 0 + queue_len = float(cross_data['queue_len']) if 'queue_len' in cross_data and cross_data['queue_len'] else -1 + if queue_len == -1: + queue_len = '-' + stop_times = float(cross_data['stop_times']) if 'stop_times' in cross_data and cross_data['stop_times'] else 0.0 + delay_time = float(cross_data['delay_time']) if 'delay_time' in cross_data and cross_data['delay_time'] else 0.0 + res_list.append({ + 'id': cross_id, + 'name': cross_info[cross_id]['name'] if cross_info.get(cross_id) else '', + 'jam_index': jam_index, + 'unbalance_index': unbalance_index, + 'flow': flow, + 'queue_len': queue_len, + 'stop_times': stop_times, + 'delay_time': delay_time + }) + return res_list + +def gen_work_station_artery_data_list(artery_data_list): + res_list = [] + for artery_data in artery_data_list: + artery_id = artery_data['arteryid'] + arteryname = (workstation_db_pool.query_artery_name(artery_id))[0]['name'] + jam_index = float(artery_data['jam_index']) if 'jam_index' in artery_data and artery_data['jam_index'] else 0.0 + speed = float(artery_data['speed']) if 'speed' in artery_data and artery_data['speed'] else 0.0 + stop_times = float(artery_data['stop_times']) if 'stop_times' in artery_data and artery_data['stop_times'] else 0.0 + un_stop_pass = float(artery_data['un_stop_pass']) if 'un_stop_pass' in artery_data and artery_data['un_stop_pass'] else 0.0 + travel_time = float(artery_data['travel_time']) if 'travel_time' in artery_data and artery_data['travel_time'] else 0.0 + delay_time = float(artery_data['delay_time']) if 'delay_time' in artery_data and artery_data['delay_time'] else 0.0 + res_list.append({ + 'id': artery_id, + 'name': arteryname, + 'jam_index': jam_index, + 'speed': speed, + 'stop_times': stop_times, + 'un_stop_pass': un_stop_pass, + 'travel_time': travel_time, + 'delay_time': delay_time + }) + return res_list + +def cal_task_type_num(data_list): + items_num = len(data_list) + num1, num2, num3, num4, num5 = 0, 0, 0, 0, 0 + for item in data_list: + task_type = item['task_type'] + if task_type == 1: + num1 += 1 + elif task_type == 2: + num2 += 1 + elif task_type == 3: + num3 += 1 + elif task_type == 4: + num4 += 1 + elif task_type == 5: + num5 += 1 + res = { + 'list_len': items_num, + 'detail': [ + { + 'label': '方案优化', + 'value': num1 + }, + { + 'label': '交办任务', + 'value': num2 + }, + { + 'label': '交通舆情', + 'value': num3 + }, + { + 'label': '档案优化', + 'value': num4 + }, + { + 'label': '效果巡检', + 'value': num5 + } + ], + 'task_list': data_list + } + return res + + +def gen_task_statistics_res(last_month_should_done, last_month_done, last_month_should_done_but_not, need_todo, + last_week_should_done, last_week_done, last_week_should_done_but_not, week_need_todo, + yesterday_res, last_day_of_last_month_obj, formatted_last_week, yesterday_date): + month_detail = gen_task_done_detail(last_month_done) + week_detail = gen_task_done_detail(last_week_done) + last_month = last_day_of_last_month_obj.strftime('%Y年%m月') + yesterday_data = [] + for row in yesterday_res: + task_name = row['task_name'] + op_info = json.loads(row['content']) + progress_info, content = '', '' + if op_info['operation'] == '编辑任务': + if 'progress_info' in op_info['content'].keys(): + progress_info = op_info['content']['progress_info'] + if 'content' in op_info['content'].keys(): + content = op_info['content']['content'] + yesterday_data.append({ + 'task_name': task_name, + 'progress_info': progress_info, + 'content': content + }) + res = { + 'month': { + 'date': '上月-' + last_month, + 'data_list': [ + { + 'label': '已完成', + 'value': len(last_month_done), + 'detail_list': month_detail + }, + { + 'label': '应完成逾期', + 'value': last_month_should_done_but_not + }, + { + 'label': '后续待完成', + 'value': need_todo + }, + { + 'label': '应完成', + 'value': last_month_should_done + } + ] + }, + 'week': { + 'date': '上周-' + formatted_last_week, + 'data_list': [ + { + 'label': '已完成', + 'value': len(last_week_done), + 'detail_list': week_detail + }, + { + 'label': '应完成逾期', + 'value': last_week_should_done_but_not + }, + { + 'label': '后续待完成', + 'value': week_need_todo + }, + { + 'label': '应完成', + 'value': last_week_should_done + } + ] + }, + 'yesterday':{ + 'date': '昨天-' + yesterday_date, + 'data_list': yesterday_data + } + } + return res + +def gen_task_done_detail(data_list): + counter = Counter(data_list) + task_type_dict = {1: '方案优化', 2: '交办任务', 3: '交通舆情', 4: '档案优化', 5: '效果巡检'} + res = [] + for element, count in counter.items(): + res.append({'type': task_type_dict.get(element), 'count': count}) + return res \ No newline at end of file diff --git a/app/workstation_db_function.py b/app/workstation_db_function.py new file mode 100644 index 0000000..7ae5235 --- /dev/null +++ b/app/workstation_db_function.py @@ -0,0 +1,196 @@ +import hashlib +from PIL import Image +import base64 +import io +from app.db_func_base import * +from flask import g, has_app_context + + +# md5加密方法 +def md5_hash(text): + encode_text = text.encode('utf-8') + md5 = hashlib.md5() + md5.update(encode_text) + return md5.hexdigest() + + +def image_to_base64(image_path): + with Image.open(image_path) as img: + # 将图片转换为字节流 + img_byte_arr = io.BytesIO() + img.save(img_byte_arr, format='JPEG') + img_byte_arr = img_byte_arr.getvalue() + # 将字节流编码为Base64字符串 + base64_string = base64.b64encode(img_byte_arr).decode('utf-8') + return base64_string + + +class WorkstationDbHelper(TableDbHelperBase): + def __init__(self, pool): + self.db_pool = pool + self.DB_Name = 'workstation' + + def get_traffic_db_name(self, nodeid=None): + db_traffic_name = 'traffic' + if nodeid and str(nodeid) != '9660': + db_traffic_name = f'traffic_{nodeid}' + if has_app_context() and hasattr(g, "nodeid") and str(g.nodeid) != '9660': + db_traffic_name = f'traffic_{g.nodeid}' + return db_traffic_name + + # 创建用户 + def create_user(self, user_name, password, user_role=2, image=None): + pwd_md5 = md5_hash(password) + image_base64 = '' + if image: + image_base64 = image_to_base64(image) + sql = ( + f""" + insert into {self.DB_Name}.users(user_name, pwd, user_role, image) value ({user_name}, {pwd_md5}, {user_role}, {image_base64}); + """ + ) + return self.do_execute(sql) + + def create_gen_corss_report_task(self, nodeid, crossid, cross_name, report_type, items, tasks, create_user_id, + create_user_name): + sql = ( + f""" + insert into {self.DB_Name}.gen_report_task(nodeid, report_type, crossid, cross_name, items, tasks, create_user_id, create_user_name) + value({nodeid}, {report_type}, '{crossid}', '{cross_name}', '{items}', '{tasks}', '{create_user_id}', '{create_user_name}'); + """ + ) + return self.do_execute(sql) + + def query_report_task_list(self, nodeid, create_user_id=None, create_user_name=None): + sql = (f""" + select * from {self.DB_Name}.gen_report_task where nodeid = {nodeid}; + """) + if create_user_id and create_user_name: + sql = ( + f""" + select * from {self.DB_Name}.gen_report_task where nodeid = {nodeid} and create_user_id = '{create_user_id}' and create_user_name = '{create_user_name}'; + """ + ) + return self.do_select(sql) + + def update_report_task_status(self, nodeid, create_user_id, create_user_name, crossid, cross_name, report_type, + items, tasks, status, download_url=None, failed_reason=None, file_name=None): + sql = ( + f"""update {self.DB_Name}.gen_report_task set status = {status},failed_reason = '{failed_reason}',download_url = '{download_url}',file_name = '{file_name}' + where nodeid = {nodeid} and report_type = {report_type} and crossid = '{crossid}' and cross_name = '{cross_name}' + and items = '{items}' and tasks = '{tasks}' and create_user_id = '{create_user_id}' + and create_user_name = '{create_user_name}';""" + ) + return self.do_execute(sql) + + def create_favorite_cross_artery(self, nodeid, create_user_id, create_user_name, favorite_type, favorite_id, + favorite_name, area_id: int): + sql = ( + f"""insert into {self.DB_Name}.user_favorite(nodeid, user_id, user_name,favorite_type, favorite_id, favorite_name,area_id) + value ({nodeid},'{create_user_id}', '{create_user_name}', {favorite_type}, '{favorite_id}', '{favorite_name}',{area_id});""" + ) + return self.do_execute(sql) + + def query_favorite_info_list(self, nodeid, create_user_id, create_user_name, area_id: int): + sql = ( + f""" + select * from {self.DB_Name}.user_favorite where nodeid = {nodeid} and user_id = '{create_user_id}' and user_name = '{create_user_name}' and area_id = {area_id}; + """ + ) + return self.do_select(sql) + + def query_cross_rt_info(self): + sql = ( + f""" + select * from {self.get_traffic_db_name()}.cross_delay_rt where timestamp = (select value from {self.get_traffic_db_name()}.delay_update_info where param = 'latest_tp'); + """ + ) + return self.do_select(sql) + + # def query_artery_rt_info(self): + # sql = ( + # f""" + # select * from {self.get_traffic_db_name()}.artery_delay_rt where timestamp = (select value from {self.get_traffic_db_name()}.delay_update_info where param = 'latest_tp') and duration = 1 + # """ + # ) + # return self.do_select(sql) + + def query_artery_name(self, arteryid): + sql = ( + f"""select `name` from tmnet.artery where arteryid = '{arteryid}'""" + ) + return self.do_select(sql) + + def delete_favorite_ids(self, nodeid, ids, user_id, user_name, area_id): + sql = ( + f"""delete from workstation.user_favorite where user_id = '{user_id}' and user_name = '{user_name}' and nodeid = '{nodeid}' and area_id = {area_id} and favorite_id in ({ids})""" + ) + ret = self.do_execute(sql) + return ret + + def query_task_list(self, nodeid, user_id, area_id): + sql = ( + f""" + select * from task.task where executor = '{user_id}' and nodeid = {nodeid} and record_state = 0 and area_id = {area_id}; + """ + ) + return self.do_select(sql) + + def query_task_yesterday_operator(self, yesterday_date, nodeid): + sql = ( + f""" + select task_name, content + from (select taskno, nodeid, history_date, content, operator + from task.task_history + where date(history_date) = '{yesterday_date}' + and nodeid = {nodeid}) t1 + left join + (select taskno, task_name from task.task) t2 + on t1.taskno = t2.taskno + """ + ) + return self.do_select(sql) + + def check_favorite_info_exists(self, nodeid, user_id, user_name, f_id, f_name, area_id: int): + sql = ( + f""" + select * from workstation.user_favorite where user_id = '{user_id}' and user_name = '{user_name}' and nodeid = '{nodeid}' and favorite_id = '{f_id}' and favorite_name = '{f_name}' and area_id = {area_id}; + """ + ) + return self.do_select(sql) + + def query_yesterday_task_data(self, yesterday_date, nodeid, user_name, area_id): + sql_query = (f""" + select + t2.task_name, + content + from + (select * from task.task_history where date(history_date)='{yesterday_date}' and nodeid='{nodeid}' and content like '%%operation%%任务进度%%content%%' and operator = '{user_name}' and area_id = {area_id} order by history_date) t1 + left join + (select task_name,taskno from task.task where record_state = 0 and nodeid = '{nodeid}' and executor = '{user_name}') t2 + on t1.taskno = t2.taskno; + """) + + return self.do_select(sql_query) + + + def query_cross_delay_info(self, nodeid, start, cross_list): + conn, cursor = self.connect() + try: + sql = f'''select cd1.* + from {self.get_traffic_db_name(nodeid)}.cross_delay as cd1 + join (select crossid, max(day) max_day + from traffic_350100.cross_delay + where tp_start = %s + and crossid in %s + group by crossid) as t1 on cd1.crossid = t1.crossid and cd1.day = t1.max_day + where cd1.crossid in %s + and cd1.tp_start = %s''' + print(cursor.mogrify(sql, (start, cross_list, cross_list, start))) + cursor.execute(sql, (start, cross_list, cross_list, start)) + result = cursor.fetchall() + self.close(conn, cursor) + return result, None + except Exception as error: + self.close(conn, cursor) + return None, error diff --git a/app/workstation_worker.py b/app/workstation_worker.py new file mode 100644 index 0000000..1738943 --- /dev/null +++ b/app/workstation_worker.py @@ -0,0 +1,1087 @@ +import json +import os +from datetime import timezone +from app.user_worker import do_get_user_info +from app.workstation_db_function import * +from app.work_station_common import * +from app.task_worker import * +from app.phasetable_worker import * +import proto.xlcomm_pb2 as pb + + +def favorite(params, token): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_areaid_list(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = map(int, area_list) + if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: + return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) + + if not token: + return json.dumps(make_common_res(2, '用户信息异常,请刷新一下后重试')) + token_params = {'token': token} + token_res = json.loads(do_get_user_info(token_params, token)) + create_user_id = token_res['token']['userno'] + create_user_name = token_res['token']['user_name'] + favorite_type = int(check_param(params, 'favorite_type')) + if not favorite_type or favorite_type not in (1, 2): + return json.dumps(make_common_res(3, '缺少收藏类型')) + favorite_id = check_param(params, 'favorite_id') + if not favorite_id: + return json.dumps(make_common_res(4, '缺少收藏路口或干线id')) + favorite_name = check_param(params, 'favorite_name') + if not favorite_name: + return json.dumps(make_common_res(5, '缺少收藏路口或干线名称')) + row_list = db_workstation.check_favorite_info_exists(nodeid, create_user_id, create_user_name, favorite_id, + favorite_name, int(area_id)) + if len(row_list) > 0: + return json.dumps(make_common_res(6, '当前收藏内容已存在,请勿重复收藏')) + ret = db_workstation.create_favorite_cross_artery(nodeid, create_user_id, create_user_name, favorite_type, + favorite_id, favorite_name, int(area_id)) + if int(ret) == 1: + return json.dumps(make_common_res(0, '收藏成功')) + + +def get_favorite_list(params, token): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_areaid_list(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = map(int, area_list) + if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: + return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) + if not token: + return json.dumps(make_common_res(2, '用户信息异常,请刷新一下后重试')) + token_params = {'token': token} + token_res = json.loads(do_get_user_info(token_params, token)) + user_id = token_res['token']['userno'] + user_name = token_res['token']['user_name'] + row_list = db_workstation.query_favorite_info_list(nodeid, user_id, user_name, int(area_id)) + cross_list, artery_list = [], [] + for row in row_list: + favorite_type = row['favorite_type'] + favorite_id = row['favorite_id'] + if favorite_type == 1: + cross_list.append(favorite_id) + elif favorite_type == 2: + artery_list.append(favorite_id) + res = make_common_res(0, 'ok') + res['data'] = { + 'cross_list': cross_list, + 'artery_list': artery_list + } + return json.dumps(res, ensure_ascii=False) + + +def get_favorite_data_list(params, token): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_areaid_list(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = map(int, area_list) + if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: + return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) + page = int(check_param(params, 'page')) + if not page: + return json.dumps(make_common_res(3, '缺少分页页码')) + page_size = int(check_param(params, 'page_size')) + if not page_size or page_size == 0: + return json.dumps(make_common_res(4, '缺少分页条数')) + query_type = check_param(params, 'query_type') + if not query_type or query_type not in ('cross', 'artery'): + return json.dumps(make_common_res(5, '查询类型缺失或查询类型异常')) + token_params = {'token': token} + token_res = json.loads(do_get_user_info(token_params, token)) + user_id = token_res['token']['userno'] + user_name = token_res['token']['user_name'] + row_list = db_workstation.query_favorite_info_list(nodeid, user_id, user_name, int(area_id)) + cross_list, artery_list, cross_id_list = [], [], [] + for row in row_list: + favorite_type = row['favorite_type'] + favorite_id = row['favorite_id'] + favorite_name = row['favorite_name'] + if favorite_type == 1: + cross_list.append({'id': favorite_id, 'name': favorite_name}) + cross_id_list.append(favorite_id) + + res = make_common_res(0, 'ok') + res['data'] = [] + res['total_pages'] = 0 + res['total_num'] = 0 + + if len(cross_id_list) == 0 or query_type != 'cross': + return json.dumps(res, ensure_ascii=False) + + # 插入实时数据 + cross_info_list, cross_rt_data_list = [], [] + if len(cross_id_list) > 0: + print(cross_rt_data_list) + cross_rt_data_list, error = read_cross_deday(nodeid, area_id, cross_id_list) + if error: + return json.dumps(make_common_res(2, error)) + + if len(cross_rt_data_list) == 0: + return json.dumps(res, ensure_ascii=False) + + for cross_rt_data in cross_rt_data_list: + cross_id = cross_rt_data['crossid'] + if any(d['id'] == cross_id for d in cross_list): + cross_info_list.append(cross_rt_data) + start_index = (page - 1) * page_size + end_index = start_index + page_size + + cross_info, error = get_cross_info(cross_id_list) + if error: + return json.dumps(make_common_res(2, error)) + + paginated_data = cross_info_list[start_index:end_index] + res['total_pages'] = (len(cross_info_list) + page_size - 1) // page_size + res['total_num'] = len(cross_info_list) + res['data'] = gen_work_station_cross_data_list(paginated_data, cross_info) + + return json.dumps(res, ensure_ascii=False) + + +def get_cross_info(cross_ids: []): + cross_info, error = db_tmnet.query_cross_info_all(cross_ids) + if error: + return None, error + result = {} + for item_cross_info in cross_info: + result[item_cross_info['crossid']] = { + 'name': item_cross_info['name'], + 'location': item_cross_info['location'], + 'nodeid': item_cross_info['nodeid'], + 'area_id': item_cross_info['area_id'] + } + return result, None + + +def get_road_info(road_ids: []): + roads_info, error = db_tmnet.query_road_info_all(road_ids) + if error: + return None, error + result = {} + for item_roads_info in roads_info: + result[roads_info['roadid']] = { + 'name': item_roads_info['name'], + 'from_crossid': item_roads_info['from_crossid'], + 'to_crossid': item_roads_info['to_crossid'], + 'src_direct': item_roads_info['src_direct'], + 'nodeid': item_roads_info['nodeid'] + } + return result, None + + +def read_cross_deday(nodeid, area_id, cross_id_list): + tp_info = [ + "00:00-07:00", + "07:00-09:00", + "09:00-17:00", + "17:00-19:00", + "19:00-22:00", + "22:00-23:59" + ] + tp_desc = db_tmnet.query_city_tp_info(nodeid, area_id) + if tp_desc: + tp_info = tp_desc[0]['tp_desc'].split(',') + now_time = datetime.now().strftime('%H:%M') + start, end = '', '' + for index, item_tp_info in enumerate(tp_info): + start, end = map(str, item_tp_info.split('-')) + if now_time >= start and end >= now_time: + start, end = map(str, tp_info[index - 1].split('-')) + break + cross_data_list, error = db_workstation.query_cross_delay_info(nodeid, f"t{int(start.replace(':', ''))}", + cross_id_list) + if error: + return [], error + + if len(cross_data_list) <= 0: + return [], None + + result = [] + for item_cross_data_list in cross_data_list: + item_delay_pb = pb.xl_cross_delayinfo_t() + item_delay_pb.ParseFromString(item_cross_data_list['data']) + result.append({ + 'crossid': item_delay_pb.crossid, + 'jam_index': item_delay_pb.delay_info.jam_index, + 'unbalance_index': item_delay_pb.delay_info.imbalance_index, + 'queue_len': item_delay_pb.delay_info.queue_len, + 'stop_times': item_delay_pb.delay_info.stop_times, + 'delay_time': item_delay_pb.delay_info.delay_time, + }) + + return result, None + + +def delete_favorite_list(params, token): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_areaid_list(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = map(int, area_list) + if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: + return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) + + ids = check_param(params, 'ids') + if not ids or len(ids) < 1: + return json.dumps(make_common_res(2, '缺少需要删除的收藏项')) + if not token: + return json.dumps(make_common_res(3, '用户信息异常,请刷新一下后重试')) + ids_str = ", ".join([f"'{fid}'" for fid in ids]) + token_params = {'token': token} + token_res = json.loads(do_get_user_info(token_params, token)) + user_id = token_res['token']['userno'] + user_name = token_res['token']['user_name'] + ret = db_workstation.delete_favorite_ids(nodeid, ids_str, user_id, user_name) + if ret == len(ids): + return json.dumps(make_common_res(0, '操作成功')) + + +def get_workstation_task_list(params, token): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + if not token: + return json.dumps(make_common_res(2, '用户信息异常,请刷新重试')) + token_params = {'token': token} + token_res = json.loads(do_get_user_info(token_params, token)) + user_id = token_res['token']['userno'] + + area_list = db_user.query_areaid_list(user_id) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = map(int, area_list) + if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: + return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) + + user_name = token_res['token']['user_name'] + pending_task, timeout_task, approaching_overdue_task, done_task = [], [], [], [] + last_month_should_done, last_month_done, last_month_should_done_but_not, need_todo = 0, [], 0, 0 + last_week_should_done, last_week_done, last_week_should_done_but_not, week_need_todo = 0, [], 0, 0 + row_list = db_workstation.query_task_list(nodeid, user_name, area_id) + current_date = datetime.now(timezone.utc).date() + # current_date = datetime.strptime('2024-07-30', "%Y-%m-%d") + yesterday_obj = current_date - timedelta(days=1) + yesterday_date = yesterday_obj.strftime('%Y-%m-%d') + first_day_of_this_month = current_date.replace(day=1) + last_day_of_last_month_obj = first_day_of_this_month - timedelta(days=1) + last_month = last_day_of_last_month_obj.strftime('%Y-%m') + first_day_of_this_week = current_date - timedelta(days=current_date.weekday()) + last_day_of_last_week_obj = first_day_of_this_week - timedelta(days=1) + formatted_last_week = f'{last_day_of_last_week_obj.year}年第{last_day_of_last_week_obj.isocalendar()[1]}周' + for row in row_list: + task_name = row['task_name'] + task_type = int(row['task_type']) + plan_begin_time = int(row['plan_begin_time']) + plan_end_time = int(row['plan_end_time']) + task_state = int(row['task_state']) + task_end_time = None + if row['task_end_time']: + task_end_time = int(row['task_end_time']) + if task_state == 4: + done_task.append({ + 'task_name': task_name, + 'task_type': task_type, + 'plan_begin_time': plan_begin_time, + 'plan_end_time': plan_end_time + }) + elif task_state != 0 and task_state != 4: + pending_task.append({ + 'task_name': task_name, + 'task_type': task_type, + 'plan_begin_time': plan_begin_time, + 'plan_end_time': plan_end_time + }) + plan_endtime_date = datetime.fromtimestamp(plan_end_time // 1000, tz=timezone.utc).date() + if plan_endtime_date < current_date and task_state != 0 and task_state != 4: + timeout_task.append({ + 'task_name': task_name, + 'task_type': task_type, + 'plan_begin_time': plan_begin_time, + 'plan_end_time': plan_end_time + }) + elif current_date - plan_endtime_date <= timedelta(days=3) and task_state != 0 and task_state != 4: + approaching_overdue_task.append({ + 'task_name': task_name, + 'task_type': task_type, + 'plan_begin_time': plan_begin_time, + 'plan_end_time': plan_end_time + }) + # task_statistics + # month & week + if task_end_time: + end_time = datetime.fromtimestamp(task_end_time // 1000, tz=timezone.utc).date() + end_time_year = end_time.year + end_time_month = end_time.month + end_time_week = end_time.isocalendar()[1] + if task_state == 4 and f'{end_time_year}-{end_time_month}' == last_month: + last_month_done.append(task_type) + if plan_endtime_date < first_day_of_this_month: + last_month_should_done += 1 + if task_state == 4 and f'{end_time_year}年第{end_time_week}周' == formatted_last_week: + last_week_done.append(task_type) + if plan_endtime_date.year == end_time_year and plan_endtime_date.isocalendar()[1] == end_time_week: + last_week_should_done += 1 + else: + if task_state in [1, 2, 3] and plan_endtime_date < first_day_of_this_month: + last_month_should_done += 1 + last_month_should_done_but_not += 1 + if task_state in [1, 2, 3] and plan_endtime_date < first_day_of_this_week: + last_week_should_done += 1 + last_week_should_done_but_not += 1 + if task_state in [1, 2, 3] and plan_endtime_date > last_day_of_last_month_obj: + need_todo += 1 + if task_state in [1, 2, 3] and plan_endtime_date >= first_day_of_this_week: + week_need_todo += 1 + yesterday_res = db_workstation.query_yesterday_task_data(yesterday_date, nodeid, user_name, area_id) + task_statistics = gen_task_statistics_res(last_month_should_done, last_month_done, last_month_should_done_but_not, + need_todo, + last_week_should_done, last_week_done, last_week_should_done_but_not, + week_need_todo, + yesterday_res, last_day_of_last_month_obj, formatted_last_week, + yesterday_date) + pending_res = cal_task_type_num(pending_task) + timeout_res = cal_task_type_num(timeout_task) + approaching_overdue_res = cal_task_type_num(approaching_overdue_task) + done_res = cal_task_type_num(done_task) + res = make_common_res(0, 'ok') + data = { + 'join_task': { + 'pending_res': pending_res, + 'timeout_res': timeout_res, + 'approaching_overdue_res': approaching_overdue_res, + 'done_res': done_res + }, + 'task_statistics': task_statistics + } + res['data'] = data + return json.dumps(res, ensure_ascii=False) + + +# 路口优化页面 获取路口列表 接口1 +def get_crosses_list(params): + nodeid = check_nodeid(params) + if not nodeid: + return json.dumps(make_common_res(1, 'nodeid异常')) + cross_list = [] + for crossid in g_info_dict.get(g.nodeid)['g_crossroad_idset']: + crossName = g_info_dict.get(g.nodeid)['g_roadnet'].query_cross(crossid).name + cross_list.append({'crossid': crossid, 'crossName': crossName}) + cross_useable_date = situation_db_pool.query_situation_usable_date()['cross'] + data = { + 'cross_list': cross_list, + 'useable_date': cross_useable_date + } + res = make_common_res(0, 'ok') + res['data'] = data + return json.dumps(res, ensure_ascii=False) + + +# 路口优化页面 获取指定路口指定日期的配时方案与异常情况列表 接口2 +def get_cross_phase_symptom_info(params): + tp_time_range_dict = {'早高峰': '07:00-09:00', '晚高峰': '17:00-19:00', + '平峰': '06:00-07:00,09:00-17:00,19:00-22:00'} + tp_dict = {'早高峰': 100, '晚高峰': 200, '平峰': 400} + symptom_dict = { + '对向失衡': 'opposite_tide', + '路口溢流': 'inroad_overflow', + '路口失衡': 'intersection_imbalance', + '转向失衡': 'straight_left_imbalance', + '左转低效': 'inefficient_left', + '路口死锁': 'cross_deadlock' + } + nodeid = check_nodeid(params) + if not nodeid: + return json.dumps(make_common_res(1, 'nodeid异常')) + crossid = check_param(params, 'crossid') + if not crossid: + return json.dumps(make_common_res(2, '缺少路口信息,请选择路口')) + date = check_param(params, 'date') + if not date: + return json.dumps(make_common_res(3, '缺少查询日期')) + date_type = check_param(params, 'date_type') + weekday = datetime.strptime(date, '%Y%m%d').weekday() + jj_crossid = db_mapping.query_jjcrossid_by_xlcrossid(nodeid, crossid) + if not jj_crossid: + return json.dumps(make_common_res(4, '路口不存在')) + row_list = situation_db_pool.query_phase_data(jj_crossid, weekday + 1, crossid) + symptom_list = situation_db_pool.query_symptom_info_by_crossid(nodeid, crossid, date, date_type=date_type) + res_list = [] + for item in row_list.keys(): + plan_time_range = row_list[item]['tp_start'] + '-' + row_list[item]['tp_end'] + row_list[item]['symptom_type_list'] = [] + row_list[item]['symptom_detail_list'] = [] + row_list[item]['todo_detail_list'] = [] + row_list[item]['tp_desc_list'] = [] + row_list[item]['desc_key_list'] = [] + for symptom in symptom_list: + symptom_detail = json.loads(str(symptom['symptom_detail']).replace("'", "\"")) + todo_detail = json.loads(str(symptom['todo_detail']).replace("'", "\"")) + if symptom['tp'] != '平峰': + symptom_time_range = tp_time_range_dict.get(symptom['tp']) + if has_overlap(plan_time_range, symptom_time_range): + tp_num = tp_dict.get(symptom['tp']) + symptom_desc = symptom_dict.get(symptom['symptom_type']) + desc_key = situation_db_pool.query_symptom_desckey(crossid, tp_num, symptom_desc, + date_type=date_type) + row_list[item]['symptom_type_list'].append({ + 'symptom_type': symptom['symptom_type'], + 'symptom_type_eng': symptom_desc + }) + row_list[item]['symptom_detail_list'].append(symptom_detail) + row_list[item]['tp_desc_list'].append(symptom['tp']) + row_list[item]['desc_key_list'].append(desc_key) + row_list[item]['todo_detail_list'].append(todo_detail) + else: + symptom_time_range_list = tp_time_range_dict.get(symptom['tp']).split(',') + for symptom_time_range in symptom_time_range_list: + if has_overlap(plan_time_range, symptom_time_range): + tp_num = tp_dict.get(symptom['tp']) + symptom_desc = symptom_dict.get(symptom['symptom_type']) + desc_key = situation_db_pool.query_symptom_desckey(crossid, tp_num, symptom_desc, + date_type=date_type) + row_list[item]['symptom_type_list'].append({ + 'symptom_type': symptom['symptom_type'], + 'symptom_type_eng': symptom_desc + }) + row_list[item]['symptom_detail_list'].append(symptom_detail) + row_list[item]['tp_desc_list'].append(symptom['tp']) + row_list[item]['desc_key_list'].append(desc_key) + row_list[item]['todo_detail_list'].append(todo_detail) + continue + res_list.append(row_list[item]) + res = make_common_res(0, 'ok') + res['data'] = { + 'nodeid': nodeid, + 'date': date, + 'crossid': crossid, + 'res_list': res_list + } + return json.dumps(res, ensure_ascii=False) + + +def parse_time(time_str): + """将时间字符串转换为datetime.time对象""" + return datetime.strptime(time_str, '%H:%M').time() + + +def has_overlap(time_range1, time_range2): + """判断两个时间段是否有交集""" + # 解析时间段 + start1, end1 = map(parse_time, time_range1.split('-')) + start2, end2 = map(parse_time, time_range2.split('-')) + + # 检查交集条件 + return start1 < end2 and start2 < end1 + + +# 路口优化页面 获取路口已存在的优化任务信息 接口5 +def get_cross_optimize_plan_detail(params): + nodeid = check_nodeid(params) + if not nodeid: + return json.dumps(make_common_res(1, 'nodeid异常')) + crossid = check_param(params, 'crossid') + if not crossid: + return json.dumps(make_common_res(2, '缺少路口id信息')) + planid = check_param(params, 'planid') + if not planid: + return json.dumps(make_common_res(3, '缺少方案id信息')) + tp_start = check_param(params, 'tp_start') + if not tp_start: + return json.dumps(make_common_res(4, '缺少时段开始时刻')) + tp_end = check_param(params, 'tp_end') + if not tp_end: + return json.dumps(make_common_res(5, '缺少时段结束时刻')) + taskId = check_param(params, 'taskid') + if not taskId: + return json.dumps(make_common_res(6, '缺少任务id信息')) + crossName = g_info_dict.get(g.nodeid)['g_roadnet'].query_cross(crossid).name + init_plan_res, target_plan_res, his_plan_list, edit_plan_res, delay_matrix_res, delay_matrix_now_res = None, None, {}, None, None, None + init_weekday = '' + init_plan_str, delay_input_str = situation_db_pool.query_init_info(nodeid, taskId) + if init_plan_str: + init_plan_res = Plan.parse_from_json(init_plan_str).to_dict() + if delay_input_str: + delay_matrix_res = DelayMatrix.parse_from_json(delay_input_str).to_dict() + init_weekday = datetime.strptime(delay_matrix_res['date'], '%Y%m%d').weekday() + query_type = check_param(params, 'type') + if not query_type or query_type == 'all': + target_plan_str, edit_plan_str, his_plan_dict = situation_db_pool.query_optimize_records(nodeid, taskId) + if target_plan_str: + target_plan_res = Plan.parse_from_json(target_plan_str).to_dict() + if edit_plan_str: + edit_plan_res = Plan.parse_from_json(edit_plan_str).to_dict() + for seq in his_plan_dict.keys(): + item = Plan.parse_from_json(his_plan_dict[seq]).to_dict() + timestamp = str(datetime.fromtimestamp(item['timestamp']).__format__('%Y%m%d %H:%M')) + his_plan_list[timestamp] = item + if init_weekday: + max_date = get_max_usable_date(init_weekday) + dm_now = get_delay_matrix(str(max_date), tp_start, tp_end, crossid) + delay_matrix_now_res = dm_now.to_dict() + # i_list = calc_dm_delay_change_detail(delay_matrix_res['delay_matrix'], delay_matrix_now_res['delay_matrix']) + # delay_matrix_now_res['change_flag'] = i_list + res = make_common_res(0, 'ok') + res['data'] = { + 'nodeid': nodeid, + 'crossid': crossid, + 'crossName': crossName, + 'planid': planid, + 'tp_start': tp_start, + 'tp_end': tp_end, + 'optimize_taskid': taskId, + 'init_plan': init_plan_res, # 初始方案 + 'target_plan': target_plan_res, # 推荐方案 + 'his_plan': his_plan_list, # 历史方案 + 'edit_plan': edit_plan_res, # 编辑方案 + 'delay_matrix': delay_matrix_res, # 初始关键指标延误矩阵 + 'delay_matrix_now': delay_matrix_now_res, # 当前最新的关键指标延误矩阵 + } + return json.dumps(res, ensure_ascii=False) + + +def get_max_usable_date(init_weekday): + cross_useable_date = situation_db_pool.query_situation_usable_date()['cross'] + max_date = 00000000 + for date in cross_useable_date['hisday']: + tmp_weekday = datetime.strptime(str(date), '%Y%m%d').weekday() + if tmp_weekday == init_weekday and int(date) > int(max_date): + max_date = date + return max_date + + +def get_delay_matrix(date_str, tp_start, tp_end, crossid): + roadnet = g_info_dict.get(g.nodeid)['g_roadnet'] + crossform = CrossForm.extract_cross_form(roadnet, crossid) + start_time = str(date_str) + ' ' + str(tp_start) + end_time = str(date_str) + ' ' + str(tp_end) + start_timestamp = datetime.strptime(start_time, '%Y%m%d %H:%M').timestamp() + end_timestamp = datetime.strptime(end_time, '%Y%m%d %H:%M').timestamp() + flow_delay_list = db.query_delay_matrix(start_timestamp, end_timestamp, crossid) + for item in flow_delay_list: + inroadid = item['inroadid'] + dirname = crossform.inroad_dirname_map[inroadid] + item['dirname'] = dirname + dm = DelayMatrix() + dirname_list = list(crossform.dirname_turns_map.keys()) + dm.set_inroad_names(dirname_list) + dm.add_flow_delay_data(flow_delay_list) + dm.disable_all_badturns(crossform.dirname_turns_map) + dm.date = date_str + dm.tp_start = tp_start + dm.tp_end = tp_end + return dm + + +# 路口优化页面 获取路口方案与延误矩阵信息 接口3 +def get_cross_init_info(params): + nodeid = check_nodeid(params) + if not nodeid: + return json.dumps(make_common_res(1, 'nodeid异常')) + crossid = check_param(params, 'crossid') + if not crossid: + return json.dumps(make_common_res(2, '缺少路口id信息')) + date = check_param(params, 'date') + if not date: + return json.dumps(make_common_res(3, '缺少日期信息')) + planid = check_param(params, 'planid') + if not planid: + return json.dumps(make_common_res(4, '缺少方案id信息')) + tp_start = check_param(params, 'tp_start') + if not tp_start: + return json.dumps(make_common_res(5, '缺少时段开始时刻')) + tp_end = check_param(params, 'tp_end') + if not tp_end: + return json.dumps(make_common_res(6, '缺少时段结束时刻')) + delay_matrix = get_delay_matrix(str(date), tp_start, tp_end, crossid) + plan: Plan = get_cross_init_plan(nodeid, crossid, planid) + symptom_list = check_param(params, 'symptom_list') + if not symptom_list or len(symptom_list) == 0: + logging.info('异常类型为空') + else: + s_list = [] + for item in symptom_list: + info = {} + info['daytype'] = 'hisday' + info['crossid'] = crossid + info['day'] = date + info['tp'] = 0 + info['duration'] = 0 + info['desc_key'] = item['desc_key'] + info['type'] = item['symptom_type_eng'] + s = SymptomRecord.make_from_dict(info) + s_list.append(s) + OptimizePlanTool.judge_plan_adjust_type(plan, s_list) + res = make_common_res(0, 'ok') + res['data'] = { + 'nodeid': nodeid, + 'crossid': crossid, + 'planid': planid, + 'tp_start': tp_start, + 'tp_end': tp_end, + 'date': date, + 'init_plan': plan.to_dict(), + 'delay_matrix': delay_matrix.to_dict() + } + return json.dumps(res, ensure_ascii=False) + + +def get_cross_init_plan(nodeid, crossid, planid): + crossid_mapping = g_info_dict.get(nodeid)['g_jj_cross'] + jj_crossid = crossid_mapping[crossid] + phasetable_data = db_phasetable.query_phasetable(nodeid, jj_crossid) + plan_data = phasetable_data['plans'][int(planid)] + phase_data: dict = phasetable_data['phases'] + plan = Plan() + plan.init_from_dict(plan_data) + for info in plan_data['stages']: + stage = PhaseStage() + stage.init_from_dict(info) + plan.add_stage(stage) + for phase_info in phase_data.values(): + phase = PhaseInfo() + phase.init_from_dict(phase_info) + plan.add_phase(phase) + plan.update_stages_max_min_green() + return plan + + +# 路口优化页面 生成方案,创建路口配时方案优化方案 接口4 +def gen_cross_optimize(params): + tp_dict = {'早高峰': 100, '晚高峰': 200, '平峰': 400} + nodeid = check_nodeid(params) + if not nodeid: + return json.dumps(make_common_res(1, 'nodeid异常')) + crossid = check_param(params, 'crossid') + if not crossid: + return json.dumps(make_common_res(2, '缺少路口id信息')) + planid = check_param(params, 'planid') + if not planid: + return json.dumps(make_common_res(3, '缺少方案id信息')) + tp_start = check_param(params, 'tp_start') + if not tp_start: + return json.dumps(make_common_res(4, '缺少时段开始时刻')) + tp_end = check_param(params, 'tp_end') + if not tp_end: + return json.dumps(make_common_res(5, '缺少时段结束时刻')) + init_plan_str = check_param(params, 'init_plan') + if not init_plan_str: + return json.dumps(make_common_res(6, '缺少初始方案信息')) + init_plan = Plan.parse_from_json(init_plan_str) + delay_matrix_str = check_param(params, 'delay_matrix') + if not delay_matrix_str: + return json.dumps(make_common_res(7, '缺少初始延误数据矩阵信息')) + date = check_param(params, 'date') + if not date: + return json.dumps(make_common_res(8, '缺少日期信息')) + delay_matrix = DelayMatrix.parse_from_json(delay_matrix_str) + roadnet = g_info_dict.get(g.nodeid)['g_roadnet'] + crossName = roadnet.query_cross(crossid).name + cross_form = CrossForm.extract_cross_form(roadnet, crossid) + symptom_list = check_param(params, 'symptom_list') + init_plan_id = init_plan.planid + init_plan_name = init_plan.name + if not symptom_list or len(symptom_list) == 0: + logging.info('异常类型为空') + target_plan, diff_list = OptimizePlanTool.do_plan_optimize(cross_form, init_plan, delay_matrix) + else: + symptom_list = json.loads(symptom_list) + s_list = [] + for item in symptom_list: + item['daytype'] = 'hisday' + item['crossid'] = crossid + item['day'] = date + item['tp'] = tp_dict[item['tp_desc']] + item['duration'] = 0 + s = SymptomRecord.make_from_dict(item) + s_list.append(s) + target_plan, diff_list = OptimizePlanTool.do_plan_optimize(cross_form, init_plan, delay_matrix, s_list) + if not target_plan: + return json.dumps(make_common_res(10, '未能生成方案')) + now_timestamp = datetime.now().timestamp() + init_plan.timestamp = now_timestamp + target_plan.timestamp = now_timestamp + optimize_taskid = gen_optimize_taskid(nodeid, crossid, planid, tp_start, tp_end) + sum_success_ret = 0 + ret = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, planid, tp_start, tp_end, 'init_plan', + init_plan_str, nodeid, 0, init_plan_id, init_plan_name) + if ret: + sum_success_ret += 1 + ret = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, planid, tp_start, tp_end, 'delay_matrix', + delay_matrix_str, nodeid, 0, init_plan_id, init_plan_name) + if ret: + sum_success_ret += 1 + ret = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, planid, tp_start, tp_end, 'target_plan', + json.dumps(target_plan.to_dict(), ensure_ascii=False), nodeid, 0, + init_plan_id, init_plan_name) + if ret: + sum_success_ret += 1 + if sum_success_ret == 3: + res = make_common_res(0, 'ok') + res['data'] = { + 'nodeid': nodeid, + 'crossid': crossid, + 'planid': planid, + 'tp_start': tp_start, + 'tp_end': tp_end, + 'crossName': crossName, + 'optimize_taskid': optimize_taskid, + 'init_plan': init_plan.to_dict(), + 'target_plan': target_plan.to_dict(), + 'his_plans': [], + 'edit_plan': None, + 'delay_matrix': delay_matrix.to_dict(), + 'delay_matrix_now': None + } + return json.dumps(res, ensure_ascii=False) + else: + return json.dumps(make_common_res(11, '保存方案优化信息失败')) + + +def gen_optimize_taskid(nodeid, crossid, planid, tp_start, tp_end): + current_timestamp = time.time() + optimize_taskid = md5_hash(f"""{nodeid}-{crossid}-{planid}-{tp_start}-{tp_end}-{current_timestamp}""") + return optimize_taskid + + +def save_edit_plan(params): + nodeid = check_nodeid(params) + if not nodeid: + return json.dumps(make_common_res(1, 'nodeid异常')) + optimize_taskid = check_param(params, 'taskid') + if not optimize_taskid: + return json.dumps(make_common_res(2, '缺少优化任务id')) + crossid = check_param(params, 'crossid') + if not crossid: + return json.dumps(make_common_res(3, '缺少路口id信息')) + planid = check_param(params, 'planid') + if not planid: + return json.dumps(make_common_res(4, '缺少方案id信息')) + tp_start = check_param(params, 'tp_start') + if not tp_start: + return json.dumps(make_common_res(5, '缺少时段开始时刻')) + tp_end = check_param(params, 'tp_end') + if not tp_end: + return json.dumps(make_common_res(6, '缺少时段结束时刻')) + edit_plan = check_param(params, 'edit_plan') + if not edit_plan: + return json.dumps(make_common_res(7, '缺少编辑方案信息')) + init_plan_id = check_param(params, 'init_planid') + if not init_plan_id: + return json.dumps(make_common_res(8, '缺少初始方案id')) + init_plan_name = check_param(params, 'init_planname') + if not init_plan_name: + return json.dumps(make_common_res(9, '缺少初始方案名称')) + new_planid = Plan.parse_from_json(edit_plan).planid + # 表示当前的操作是更新操作 + modify_params = gen_modify_params(planid, crossid, nodeid, edit_plan) + if new_planid == planid: + modify_res = json.loads(modify_plan_stage(modify_params)) + else: + # 当前的方案号不存在 当前的操作是创建新的方案 + modify_res = json.loads(add_plan_stage(modify_params)) + flag, status = situation_db_pool.check_exists_edit_plan(optimize_taskid) + tmp_plan = Plan.parse_from_json(edit_plan) + tmp_plan.timestamp = datetime.now().timestamp() + edit_plan = tmp_plan.to_dict() + if flag: + res = situation_db_pool.update_optimize_edit_plan(optimize_taskid, edit_plan, new_planid) + else: + res = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, planid, tp_start, tp_end, 'edit_plan', + json.dumps(edit_plan, ensure_ascii=False), nodeid, status, + init_plan_id, init_plan_name) + if res and modify_res['status'] == 0: + return json.dumps(make_common_res(0, '操作成功')) + else: + return json.dumps(modify_res) + + +def issued_edit_plan(params): + nodeid = check_nodeid(params) + if not nodeid: + return json.dumps(make_common_res(1, 'nodeid异常')) + optimize_taskid = check_param(params, 'taskid') + if not optimize_taskid: + return json.dumps(make_common_res(2, '缺少优化任务id')) + issued_plan = check_param(params, 'issued_plan') + if not issued_plan: + return json.dumps(make_common_res(3, '缺少下发方案信息')) + crossid = check_param(params, 'crossid') + if not crossid: + return json.dumps(make_common_res(4, '缺少路口id信息')) + planid = check_param(params, 'planid') + if not planid: + return json.dumps(make_common_res(5, '缺少方案id信息')) + tp_start = check_param(params, 'tp_start') + if not tp_start: + return json.dumps(make_common_res(6, '缺少时段开始时刻')) + tp_end = check_param(params, 'tp_end') + if not tp_end: + return json.dumps(make_common_res(7, '缺少时段结束时刻')) + init_plan_id = check_param(params, 'init_planid') + if not init_plan_id: + return json.dumps(make_common_res(8, '缺少初始方案id')) + init_plan_name = check_param(params, 'init_planname') + if not init_plan_name: + return json.dumps(make_common_res(9, '缺少初始方案名称')) + # 判定当前id是否存在his_plan,如果存在则取出tag字段 截取出当前最大的iter_plan_x中x值,然后+1,否则tag='iter_plan_1' + plan_seq = 0 + his_plan_list = situation_db_pool.query_optimize_his_plans(optimize_taskid) + new_planid = Plan.parse_from_json(issued_plan).planid + modify_params = gen_modify_params(planid, crossid, nodeid, issued_plan) + if planid == new_planid: + modify_res = json.loads(modify_plan_stage(modify_params)) + else: + # 当前的方案号不存在 当前的操作是创建新的方案 + modify_res = json.loads(add_plan_stage(modify_params)) + for his_plan in his_plan_list: + item_seq = int(str(his_plan['tag']).split('iter_plan_')[1]) + if item_seq > plan_seq: + plan_seq = item_seq + tmp_plan = Plan.parse_from_json(issued_plan) + tmp_plan.timestamp = datetime.now().timestamp() + issued_plan = tmp_plan.to_dict() + # 当his_plan不存在时说明是第一次下发,还需要将该taskid的状态字段status更新为迭代中 也就是 1 + if plan_seq == 0: + ret = situation_db_pool.update_optimize_records_status(optimize_taskid, 1) + if not ret: + return json.dumps(make_common_res(10, '更新状态失败')) + flag, status = situation_db_pool.check_exists_edit_plan(optimize_taskid) + if flag: + ret = situation_db_pool.update_optimize_edit_plan(optimize_taskid, issued_plan, new_planid) + else: + ret = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, new_planid, tp_start, tp_end, + 'edit_plan', + json.dumps(issued_plan, ensure_ascii=False), nodeid, status, + init_plan_id, init_plan_name) + if not ret: + return json.dumps(make_common_res(11, '保存方案失败')) + ret = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, new_planid, tp_start, tp_end, + f'iter_plan_{plan_seq + 1}', + json.dumps(issued_plan, ensure_ascii=False), nodeid, status, + init_plan_id, init_plan_name) + if not ret: + return json.dumps(make_common_res(12, '保存方案失败')) + else: + # 说明方案已经下发过 当前任务状态为1 :迭代中,需要将当前方案保存为his_plan中的一个 且将当前编辑的方案保存为编辑方案 + ret1 = situation_db_pool.update_optimize_edit_plan(optimize_taskid, issued_plan, new_planid) + ret2 = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, new_planid, tp_start, tp_end, + f'iter_plan_{plan_seq + 1}', + json.dumps(issued_plan, ensure_ascii=False), nodeid, 1, + init_plan_id, + init_plan_name) + if not ret1 or not ret2: + return json.dumps(make_common_res(13, '保存方案失败')) + row_list = situation_db_pool.query_optimize_issued_plan(optimize_taskid) + edit_plan, his_plans = None, [] + for row in row_list: + tag = row['tag'] + if tag == 'edit_plan': + edit_plan = Plan.parse_from_json(row['content']).to_dict() + else: + his_plans.append(Plan.parse_from_json(row['content']).to_dict()) + if modify_res['status'] == 0: + res = make_common_res(0, 'ok') + res['data'] = { + 'edit_plan': edit_plan, + 'his_plans': his_plans + } + return json.dumps(res, ensure_ascii=False) + else: + return json.dumps(modify_res) + + +def update_optimize_plan_status(params): + nodeid = check_nodeid(params) + if not nodeid: + return json.dumps(make_common_res(1, 'nodeid异常')) + optimize_taskid = check_param(params, 'optimize_taskid') + if not optimize_taskid: + return json.dumps(make_common_res(2, '缺少优化任务id')) + status = check_param(params, 'status') + if not status or status not in ('finish', 'abort'): + return json.dumps(make_common_res(3, '缺少状态信息')) + ret = situation_db_pool.update_optimize_plan_status_byid(nodeid, optimize_taskid, status) + if ret: + return json.dumps(make_common_res(0, 'ok')) + else: + return json.dumps(make_common_res(4, '更新失败')) + + +def gen_modify_params(planid, crossid, nodeid, edit_plan): + plan_dict = Plan.parse_from_json(edit_plan).to_dict() + new_plan_stages = [] + plan_stages = plan_dict['stages'] + plan_phases = plan_dict['phases'] + plan_phase_dict = {} + for phase in plan_phases: + plan_phase_dict[phase['phaseid']] = phase + for stage in plan_stages: + phaseids = stage['phaseids'].split(',') + phaseid_names = plan_phase_dict[int(phaseids[0])]['name'] + max_green, min_green = plan_phase_dict[int(phaseids[0])]['max_green'], plan_phase_dict[int(phaseids[0])][ + 'min_green'] + if len(phaseids) > 1: + for item in phaseids[1:]: + phaseid_names += f",{plan_phase_dict[int(item)]['name']}" + new_plan_stages.append({ + 'stageid': stage['stageid'], + 'stage_name': stage['name'], + 'stage_duration': stage['duration'], + 'green': stage['green'], + 'redyellow': stage['redyellow'], + 'yellow': stage['yellow'], + 'allred': stage['allred'], + 'phaseids': stage['phaseids'], + 'phaseid_names': phaseid_names, + 'max_green': max_green, + 'min_green': min_green, + 'phases': phaseid_names + }) + modify_params = { + 'planid': planid, + 'plan_name': plan_dict['name'], + 'offset': plan_dict['offset'], + 'cycle': plan_dict['cycle'], + 'crossid': crossid, + 'stages': new_plan_stages, + 'nodeid': nodeid + } + return modify_params + + +def calc_dm_delay_change_detail(dm, dm_now): + i_list = [] + if dm is not None and dm_now is not None: + # 同时遍历两个三维数组 + for i in range(len(dm)): + j_list = [] + for j in range(len(dm[i])): + flag_list = [] + for k in range(len(dm[i][j])): + # 比较相同位置的元素 + value1 = dm[i][j][k] if dm[i][j][k] != '-' else 0 + value2 = dm_now[i][j][k] if dm_now[i][j][k] != '-' else 0 + if float(value2) > float(value1): + flag = 1 + elif float(value2) < float(value1): + flag = -1 + else: + flag = 0 + flag_list.append(flag) + j_list.append(flag_list) + i_list.append(j_list) + return i_list + + +def download_optimize_plan(params): + nodeid = check_nodeid(params) + if not nodeid: + return json.dumps(make_common_res(1, 'nodeid异常')) + crossid = check_param(params, 'crossid') + if not crossid: + return json.dumps(make_common_res(2, '缺少路口id')) + cross_info = g_info_dict.get(g.nodeid)['g_roadnet'].query_cross(crossid) + download_plan = check_param(params, 'download_plan') + if not download_plan: + return json.dumps(make_common_res(3, '缺少下载方案')) + + download_plan_obj = Plan.parse_from_json(download_plan) + download_plan_dict = download_plan_obj.to_dict() + wb = Workbook() + sheet1 = wb.active + sheet1.title = "配时方案表" + sheet1.append(t_sheet_cols['配时方案表']['head']) + sheet2 = wb.create_sheet("阶段表") + sheet2.append(t_sheet_cols['阶段表']['head']) + stages = download_plan_dict['stages'] + for index, stage in enumerate(stages): + num = index + 1 + sheet1.append([num, download_plan_dict['planid'], download_plan_dict['name'], download_plan_dict['cycle'], num, + stage['stageid'], stage['duration']]) + sheet2.append([num, stage['stageid'], stage['name'], stage['yellow'], stage['allred'], stage['phaseids']]) + + sheet3 = wb.create_sheet("相位表") + sheet3.append(t_sheet_cols['相位表']['head']) + phases = download_plan_dict['phases'] + for index, phase in enumerate(phases): + num = index + 1 + sheet3.append([num, phase['phaseid'], phase['name'], phase['min_green'], phase['max_green']]) + + excel_data = BytesIO() + wb.save(excel_data) + excel_data.seek(0) + return send_file( + excel_data, + mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', + as_attachment=True, + download_name=f"{cross_info.name}_{download_plan_obj.name}_配时方案.xlsx") + + +def get_road_net_detail(params): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + roadid = check_param(params, 'roadid') + if not roadid: + return json.dumps(make_common_res(3, '缺少roadid, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_areaid_list(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = map(int, area_list) + if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: + return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) + road_info, error = get_road_info([roadid]) + if error: + return json.dumps(make_common_res(2, f"{error}")) + return json.dumps(make_common_res(0, road_info[roadid]['src_direct']))