From 6e1220ea80cfb3af85105b97ec34f1b671aad701 Mon Sep 17 00:00:00 2001 From: wangxu <1318272526@qq.com> Date: Thu, 19 Mar 2026 14:26:36 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4=E7=BB=BF=E6=B3=A2=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E4=BB=BB=E5=8A=A1=E5=B7=A5=E4=BD=9C=E6=B5=81=E7=9B=B8?= =?UTF-8?q?=E5=85=B3=E4=BB=A3=E7=A0=81=EF=BC=8C=E6=9C=AA=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/common_worker.py | 20 +++ app/cross_monitor_worker.py | 13 +- app/db_cross_delay.py | 8 +- app/global_source.py | 11 ++ app/monitor_common.py | 6 +- app/task_db_func.py | 50 ++++++- app/task_worker.py | 256 +++++++++++++++++++++++++++++------- app/tmnet_db_func.py | 7 +- app/wave_db_func.py | 100 ++++++++++++++ cross_doctor.ini | 7 + 10 files changed, 414 insertions(+), 64 deletions(-) create mode 100644 app/wave_db_func.py diff --git a/app/common_worker.py b/app/common_worker.py index 9caf19d..b472183 100644 --- a/app/common_worker.py +++ b/app/common_worker.py @@ -241,6 +241,24 @@ def init_with_config(): if config.has_option('task_db', 'password'): password = config.get('task_db', 'password') g_task_db['password'] = password + + # 新增绿波相关mysql链接信息 + if config.has_option('wave_db', 'host'): + host = config.get('wave_db', 'host') + g_wave_db['host'] = host + if config.has_option('wave_db', 'port'): + port = int(config.get('wave_db', 'port')) + g_wave_db['port'] = port + if config.has_option('wave_db', 'dbname'): + dbname = config.get('wave_db', 'dbname') + g_wave_db['db'] = dbname + if config.has_option('wave_db', 'user'): + user = config.get('wave_db', 'user') + g_wave_db['user'] = user + if config.has_option('wave_db', 'password'): + password = config.get('wave_db', 'password') + g_wave_db['password'] = password + # redis if config.has_section('redis'): if config.has_option('redis', 'ip'): @@ -279,6 +297,8 @@ def init_with_config(): g_cross_delay_db['host'] = config.get('cross_delay_db', 'host_inner') if config.has_option('task_db', 'host_inner'): g_task_db['host'] = config.get('task_db', 'host_inner') + if config.has_option('wave_db', 'host_inner'): + g_wave_db['host'] = config.get('wave_db', 'host_inner') print(g_dbinfo) print(g_roadnet_db) print(g_cloud_db) diff --git a/app/cross_monitor_worker.py b/app/cross_monitor_worker.py index 7110fed..4e2b3b4 100644 --- a/app/cross_monitor_worker.py +++ b/app/cross_monitor_worker.py @@ -26,7 +26,7 @@ def query_monitor_task_usable_date_list(params): if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) - day_list, week_list = db_cross.query_monitor_task_dates(nodeid, area_id) + day_list, week_list, month_list = db_cross.query_monitor_task_dates(nodeid, area_id) tp_desc = db_tmnet.query_city_tp_info(nodeid, area_id) if not tp_desc: tp_info = [ @@ -52,6 +52,7 @@ def query_monitor_task_usable_date_list(params): res['data'] = { 'days': day_list, 'weeks': week_list, + 'months': month_list, 'tp_info': tp_info, 'peak_tp': peak_tp } @@ -78,7 +79,7 @@ def query_monitor_data(params): date_type = check_param(params, 'date_type') if not date_type: return json.dumps(make_common_res(6, '缺少查询时间类型, 请刷新后重试')) - if date_type not in ['day', 'week', 'workday', 'weekend']: + if date_type not in ['day', 'week', 'workday', 'weekend', 'month']: return json.dumps(make_common_res(7, '查询时间类型异常, 请刷新后重试')) special_time_range = check_param(params, 'special_time_range') if not special_time_range or special_time_range == '00:00-23:59': @@ -129,7 +130,7 @@ def query_monitor_data_trend(params): date_type = check_param(params, 'date_type') if not date_type: return json.dumps(make_common_res(6, '缺少查询时间类型, 请刷新后重试')) - if date_type not in ['day', 'week', 'workday', 'weekend']: + if date_type not in ['day', 'week', 'workday', 'weekend', 'month']: return json.dumps(make_common_res(7, '查询时间类型异常, 请刷新后重试')) query_date = check_param(params, 'query_date') if not query_date: @@ -165,7 +166,7 @@ def query_cross_tp_data_trend(params): date_type = check_param(params, 'date_type') if not date_type: return json.dumps(make_common_res(6, '缺少查询时间类型, 请刷新后重试')) - if date_type not in ['day', 'week', 'workday', 'weekend']: + if date_type not in ['day', 'week', 'workday', 'weekend', 'month']: return json.dumps(make_common_res(7, '查询时间类型异常, 请刷新后重试')) weekdays = check_param(params, 'weekdays') if not weekdays: @@ -192,7 +193,7 @@ def query_cross_tp_data_trend(params): cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict) roads_dir_dict = gen_road_dir_dict(cross_ledger_info) - if date_type == 'day': + if date_type in ('day', 'month'): month_ago_date = (datetime.now().date() - timedelta(days=30)).strftime('%Y%m%d') now_prev_date = (datetime.now().date() - timedelta(days=1)).strftime('%Y%m%d') date_list = generate_date_range(month_ago_date, now_prev_date) @@ -244,7 +245,7 @@ def query_monitor_problems(params): date_type = check_param(params, 'date_type') if not date_type: return json.dumps(make_common_res(6, '缺少查询时间类型, 请刷新后重试')) - if date_type not in ['day', 'week', 'workday', 'weekend']: + if date_type not in ['day', 'week', 'workday', 'weekend', 'month']: return json.dumps(make_common_res(7, '查询时间类型异常, 请刷新后重试')) special_time_range = check_param(params, 'special_time_range') if not special_time_range or special_time_range == '00:00-23:59': diff --git a/app/db_cross_delay.py b/app/db_cross_delay.py index b75c28a..8e3a990 100644 --- a/app/db_cross_delay.py +++ b/app/db_cross_delay.py @@ -67,11 +67,14 @@ class CrossDbHelper(TableDbHelperBase): def query_monitor_task_dates(self, nodeid, area_id): day_sql = f"select distinct day from traffic_{nodeid}.cross_inspect where citycode = {nodeid} and area_id = {area_id} and type = 'day'" week_sql = f"select distinct day from traffic_{nodeid}.cross_inspect where citycode = {nodeid} and area_id = {area_id} and type = 'week'" + month_sql = f"select distinct day from traffic_{nodeid}.cross_inspect where citycode = {nodeid} and area_id = {area_id} and type = 'month'" day_date_list = self.do_select(day_sql) week_date_list = self.do_select(week_sql) + month_date_list = self.do_select(month_sql) day_list = [item['day'] for item in day_date_list] week_list = [item['day'] for item in week_date_list] - return day_list, week_list + month_list = [item['day'] for item in month_date_list] + return day_list, week_list, month_list def query_monitor_data_sql(self, nodeid, area_id, date_type, query_date): if date_type in ['week', 'workday', 'weekend']: @@ -86,6 +89,9 @@ class CrossDbHelper(TableDbHelperBase): if date_type in ['week', 'workday', 'weekend']: date_type = 'week' limit_num = 10 + elif date_type == 'month': + date_type = 'month' + limit_num = 12 sql = f""" select * from traffic_{nodeid}.cross_inspect where area_id = {area_id} and type = '{date_type}' order by day desc limit {limit_num} """ diff --git a/app/global_source.py b/app/global_source.py index 27f7b45..a6f94c0 100644 --- a/app/global_source.py +++ b/app/global_source.py @@ -6,6 +6,7 @@ from app.models import * from app.models_wave import RoadLinkManager from app.phase_db_func import PhaseTableDbHelper from app.task_db_func import TaskDbHelper +from app.wave_db_func import WaveDBFunction from app.workstation_db_function import WorkstationDbHelper from tool.mysql_common_connector_pool import * from app.user_db_func import * @@ -69,6 +70,14 @@ g_task_db = { 'db': 'task' } +g_wave_tool_db = { + 'host': '43.140.225.219', + 'port': 3306, + 'user': 'root', + 'password': 'pmenJIn7EaK40oThn~~', + 'db': 'greenwave' +} + # 服务配置 g_citycode_set = set() g_config = {} @@ -89,6 +98,7 @@ g_cloud_pool = DatabaseManager(g_cloud_db) g_cross_delay_pool = DatabaseManager(g_cross_delay_db) g_user_pool = DatabaseManager(g_user_db) g_task_pool = DatabaseManager(g_task_db) +g_wave_db = DatabaseManager(g_wave_tool_db) # 全局的数据库对象 @@ -102,6 +112,7 @@ db_task = TaskDbHelper(g_task_pool) db_phasetable = PhaseTableDbHelper(g_db_pool) db_tmnet = TmnetDbHelper(g_roadnet_pool) db_workstation = WorkstationDbHelper(g_db_pool) +db_wave = WaveDBFunction(g_wave_db) nodeid_list = [] diff --git a/app/monitor_common.py b/app/monitor_common.py index 9273e78..546137c 100644 --- a/app/monitor_common.py +++ b/app/monitor_common.py @@ -1360,8 +1360,10 @@ def phase_tp_check_problems(routing_crosses, special_time_range, first_date, dat 6: '自动结束' } min_date = first_date - if date_type != 'day': + if date_type in ('week', 'workday', 'weekend'): min_date = (datetime.strptime(first_date, "%Y%m%d") - timedelta(days=7)).strftime("%Y%m%d") + elif date_type == 'month': + min_date = datetime.strptime(first_date, '%Y%m%d').replace(day=1) cross_examine_records = db_cross.query_crosses_examine_records(crossid_list, first_date, min_date) for row in cross_examine_records: start_hm = row['start_hm'] @@ -1438,7 +1440,7 @@ def phase_tp_check_problems(routing_crosses, special_time_range, first_date, dat # 配时方案相关异常 def monitor_phase_problems(nodeid, area_id, date_type, query_date, special_time_range, shield_info, routing_crosses, filter_shield): weekdays = str(datetime.strptime(query_date, "%Y%m%d").weekday() + 1) - if date_type == 'week': + if date_type in ('week', 'month'): weekdays = '1,2,3,4,5,6,7' elif date_type == 'workday': weekdays = '1,2,3,4,5' diff --git a/app/task_db_func.py b/app/task_db_func.py index 1d6dc94..7e28068 100644 --- a/app/task_db_func.py +++ b/app/task_db_func.py @@ -175,13 +175,13 @@ class TaskDbHelper(TableDbHelperBase): return count def add_task(self, timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time, - executor, progress, task_state, description, crossids, sectionids, arteryids, comment, + executor, progress, task_state, description, crossids, waveid, wave_name, comment, record_state, task_src, task_class, nodeid, area_id, task_type_class, full_review): sql_insert = "insert into task (timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time," \ - " executor, progress, task_state, description, crossids, sectionids, arteryids, comment," \ + " executor, progress, task_state, description, crossids, waveid, wave_name, comment," \ " record_state, task_src, task_class, nodeid, area_id, task_type_class, full_review) values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s', %s, %s, %s)" % ( timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time, - executor, progress, task_state, description, crossids, sectionids, arteryids, comment, + executor, progress, task_state, description, crossids, waveid, wave_name, comment, record_state, task_src, task_class, nodeid, area_id, task_type_class, full_review) count = self.do_execute(sql_insert) @@ -257,6 +257,34 @@ class TaskDbHelper(TableDbHelperBase): """ % task_no return self.do_select(sql) + def query_ledger_task_crosses_info_by_area_id(self, nodeid, area_id): + sql = """ + select * from ledger_task_detail where nodeid = %s and area_id = %s + """ % (nodeid, area_id) + res = self.do_select(sql) + task_info_dict = {} + for row in res: + task_no = row['task_no'] + if task_no not in task_info_dict.keys(): + task_info_dict[task_no] = [] + task_info_dict[task_no].append(row) + task_res_dict = {} + for task_no in task_info_dict.keys(): + task_cross_num = len(task_info_dict[task_no]) + entered_cross_num = len([cross for cross in task_info_dict[task_no] if cross['ledger_status'] == 2 and cross['phase_status'] in (1, 3)]) + approve_cross_num = len([cross for cross in task_info_dict[task_no] if cross['submit_status'] == 2]) + entered_percent = int(entered_cross_num * 100 / task_cross_num) if task_cross_num > 0 else 0 + approve_percent = int(approve_cross_num * 100 / task_cross_num) if task_cross_num > 0 else 0 + task_res_dict[task_no] = { + 'task_no': task_no, + 'task_cross_num': task_cross_num, + 'entered_cross_num': entered_cross_num, + 'entered_percent': entered_percent, + 'approve_cross_num': approve_cross_num, + 'approve_percent': approve_percent + } + return task_res_dict + def query_ledger_task_crosses_pics(self, crossid_list): crossids = "'" + "', '".join(item for item in crossid_list) + "'" sql = """ @@ -358,6 +386,22 @@ class TaskDbHelper(TableDbHelperBase): """ % taskno return self.do_select(sql) + def query_cross_entering_status(self, crossid_list): + crossids = "'" + "', '".join(item for item in crossid_list) + "'" + sql = """ + select crossid, ledger_status, phase_status, update_time from ledger_task_detail where crossid in (%s) order by update_time asc + """ % crossids + return self.do_select(sql) + + def query_greenwave_task_requirement_validation_info_sql(self, task_no, nodeid, area_id): + sql = """ + select * from greenwave_task_detail where task_no = %s and nodeid = %s and area_id = %s + """ % (task_no, nodeid, area_id) + res = self.do_select(sql) + if res: + return res[0]['requirement_validation'] + return None + # # if __name__ == '__main__': # tt_5min = get_latest_5min_timestamp() diff --git a/app/task_worker.py b/app/task_worker.py index 55abf18..c9213a2 100644 --- a/app/task_worker.py +++ b/app/task_worker.py @@ -91,14 +91,7 @@ def do_query_task_list(params): if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) - #任务名称`task_name` varchar(300) DEFAULT NULL COMMENT '任务名称', - #任务类型`task_type` varchar(300) DEFAULT NULL COMMENT '任务类型', - #任务等级`task_class` varchar(300) DEFAULT NULL COMMENT '需求等级'; - #计划开始时间`plan_begin_time` bigint DEFAULT NULL COMMENT '计划开始时间', - #计划结束时间`plan_end_time` bigint DEFAULT NULL COMMENT '计划结束时间', - #任务状态`task_state` int NOT NULL COMMENT '任务状态 0未开始,1进行中,2完成,3挂起,4废止', - #负责人`executor` varchar(300) DEFAULT NULL COMMENT '负责人', - #需求来源 `task_src` varchar(300) DEFAULT NULL COMMENT '需求来源'; + all_task_list = db_task.query_all_tak_list(nodeid, area_id) upload_file_dict = db_task.query_task_file(nodeid, area_id) for task_info in all_task_list: @@ -137,52 +130,47 @@ def do_query_task_list(params): for i in range(len(task_name)): if any('\u4e00' <= c <= '\u9fff' for c in task_info['task_name']) and task_name[i] == list(task_name_pinyin)[i][0] and task_info not in name_list: name_list.append(task_info) + filter_conditions = [] if task_type and len(task_type) > 0: type_list.extend([task_info for task_info in all_task_list if int(task_type) == task_info['task_type']]) - if len(type_list) == 0: - # 用于多种筛选条件取交集使用 - type_list.append({'arteryids': -1}) + filter_conditions.append((True, type_list)) if task_class and len(task_class) > 0: class_list.extend([task_info for task_info in all_task_list if int(task_class) == task_info['task_class']]) - if len(class_list) == 0: - # 用于多种筛选条件取交集使用 - class_list.append({'arteryids': -1}) + filter_conditions.append((True, class_list)) if plan_begin_time and len(plan_begin_time) > 0: begin_list.extend([task_info for task_info in all_task_list if int(plan_begin_time) <= task_info['plan_begin_time']]) - if len(begin_list) == 0: - # 用于多种筛选条件取交集使用 - begin_list.append({'arteryids': -1}) + filter_conditions.append((True, begin_list)) if plan_end_time and len(plan_end_time) > 0: end_list.extend([task_info for task_info in all_task_list if int(plan_end_time) >= task_info['plan_end_time']]) - if len(end_list) == 0: - # 用于多种筛选条件取交集使用 - end_list.append({'arteryids': -1}) + filter_conditions.append((True, end_list)) if task_state and len(task_state) > 0: state_list.extend([task_info for task_info in all_task_list if int(task_state) == task_info['task_state']]) - if len(state_list) == 0: - # 用于多种筛选条件取交集使用 - state_list.append({'arteryids': -1}) + filter_conditions.append((True, state_list)) if executor and len(executor) > 0: executor_list.extend([task_info for task_info in all_task_list if executor == task_info['executor']]) - if len(executor_list) == 0: - # 用于多种筛选条件取交集使用 - executor_list.append({'arteryids': -1}) + filter_conditions.append((True, executor_list)) if task_src and len(task_src) > 0: task_src_list.extend([task_info for task_info in all_task_list if task_src == task_info['task_src']]) - if len(task_src_list) == 0: - # 用于多种筛选条件取交集使用 - task_src_list.append({'arteryids': -1}) - non_empty_lists = [lst for lst in [name_list, type_list, class_list, begin_list, end_list, state_list, executor_list, task_src_list] if len(lst) > 0] - if non_empty_lists: - res_list = intersect_dicts(non_empty_lists) - else: + filter_conditions.append((True, task_src_list)) + + has_empty_filter = any(has_condition and len(lst) == 0 for has_condition, lst in filter_conditions) + + if has_empty_filter: res_list = [] + else: + non_empty_lists = [lst for lst in [name_list, type_list, class_list, begin_list, end_list, state_list, executor_list, task_src_list] if len(lst) > 0] + if non_empty_lists: + res_list = intersect_dicts(non_empty_lists) + else: + res_list = [] + if task_no and len(task_no) > 0: res_list = [task_info for task_info in all_task_list if int(task_no) == task_info['taskno']] if not task_name and not task_type and not task_class and not plan_begin_time and not plan_end_time and not task_state and not executor and not task_src and not task_no: res_list = all_task_list - # task_list = db_task.query_task_list(task_name,task_type,data_type, task_class,plan_begin_time,plan_end_time,publish_time,task_state,executor,task_src, nodeid) - filtered_list = [d for d in res_list if d['arteryids'] != -1] + task_entering_info_dict = db_task.query_ledger_task_crosses_info_by_area_id(nodeid, area_id) + filtered_list = res_list + for task_info in filtered_list: comment = task_info['comment'] if '_split_suffix_for_query_info' in comment: @@ -193,7 +181,7 @@ def do_query_task_list(params): task_info['comment'] = comment_list if task_info['task_type_class'] == 1: item_task_no = task_info['taskno'] - ledger_task_add_info = query_ledger_task_additional_info(item_task_no, nodeid, area_id) + ledger_task_add_info = task_entering_info_dict[item_task_no] task_info['entered_percent'] = 100 if task_info['task_state'] == 4 else ledger_task_add_info['entered_percent'] task_info['approve_percent'] = 100 if task_info['task_state'] == 4 else ledger_task_add_info['approve_percent'] sorted_list = sorted(filtered_list, key=sort_key) @@ -483,15 +471,16 @@ def do_add_task(params): data_type = check_param(params, 'data_type') if not data_type: return json.dumps(make_common_res(2, '关联路网状态缺失,请检查后重试')) - crossids, arteryids, sectionids = '', '', '' + crossids, waveid, wave_name = '', '', '' if int(data_type) == 1: crossids = check_param(params, 'crossids') if not crossids: return json.dumps(make_common_res(2, '当前所选关联路网状态为:关联路口,但关联路口信息缺失,请检查后重试')) - elif int(data_type) == 2: - arteryids = check_param(params, 'arteryids') - if not arteryids: - return json.dumps(make_common_res(2, '当前所选关联路网状态为:关联干线,但关联干线信息缺失,请检查后重试')) + elif int(data_type) == 3: + waveid = check_param(params, 'waveid') + wave_name = check_param(params, 'wave_name') + if not waveid or not wave_name: + return json.dumps(make_common_res(2, '当前所选关联路网状态为:关联绿波,但关联绿波信息缺失,请检查后重试')) add_type = check_param(params, 'add_type') if not add_type or add_type == 'normal': plan_begin_time = check_param(params, 'plan_begin_time') @@ -540,7 +529,7 @@ def do_add_task(params): if add_type and add_type != 'normal': task_state = 0 count = db_task.add_task(timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time, publish_time, - executor, progress, task_state, description, crossids, sectionids, arteryids, comment, + executor, progress, task_state, description, crossids, waveid, wave_name, comment, record_task_state, task_src, task_class, nodeid, area_id, task_type_class, full_review) if count != 1: logging.error(str(params) + ' 添加任务报错!') @@ -575,6 +564,16 @@ def do_add_task(params): if e: logging.error(e) return json.dumps(make_common_res(2, '台账任务创建成功,但路口列表配时方案状态更新失败,请反馈该情况至管理员!')) + elif task_type_class == 3: + if full_review != 1: + requirement_validation_json = check_param(params, 'requirement_validation_json') + if not requirement_validation_json: + return json.dumps(make_common_res(2, '缺少需求核验单信息,请检查后重试')) + predict_issue_time = check_param(params, 'predict_issue_time') + if not predict_issue_time: + return json.dumps(make_common_res(2, '计划下发时间信息,请检查后重试')) + + pass res = make_common_res(0, 'ok') res['nodeid'] = nodeid @@ -966,9 +965,13 @@ def do_query_task_detail(params): if task['task_type_class'] == 1: ledger_task_additional_info = query_ledger_task_additional_info(taskno, nodeid, area_id) task['ledger_task_additional_info'] = ledger_task_additional_info - if task['task_state'] == 4: - task['ledger_task_additional_info']['entered_percent'] = 100 - task['ledger_task_additional_info']['approve_percent'] = 100 + if task['task_state'] == 4: + task['ledger_task_additional_info']['entered_percent'] = 100 + task['ledger_task_additional_info']['approve_percent'] = 100 + elif task['task_type_class'] == 3: + # 绿波优化任务 + greenwave_task_additional_info = gen_greenwave_task_additional_info(taskno, nodeid, area_id) + pass task.pop('update_time') res['desc'] = '' res['data'] = task @@ -1013,10 +1016,11 @@ def gen_update_sql(params, task_old_info): crossids = check_param(params, 'crossids') if crossids and crossids != '' and crossids != task_old_info['crossids']: modify_data += f", crossids = '{crossids}'" - elif int(data_type) == 2: - arteryids = check_param(params, 'arteryids') - if arteryids and arteryids != '' and arteryids != task_old_info['arteryids']: - modify_data += f", arteryids = '{arteryids}'" + elif int(data_type) == 3: + waveid = check_param(params, 'waveid') + wave_name = check_param(params, 'wave_name') + if (waveid and waveid != '' and waveid != task_old_info['waveid']) or (wave_name and wave_name != '' and wave_name != task_old_info['wave_name']): + modify_data += f", waveid = '{waveid}', wave_name = '{wave_name}'" modify_item += prefix modify_item += f"【编辑】关联路网信息:{task_old_info['data_type']} -> {data_type}" executor = check_param(params, 'executor') @@ -1337,3 +1341,157 @@ def get_year_week(time_input): # 使用 ISO 标准获取年周 year, week, _ = time_input.isocalendar() return f"{year}-{week:02d}" + + +# 下方为绿波优化任务工作流相关代码 20260317 +def verify_cross_entering_status(params): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_areaid_list(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = map(int, area_list) + if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: + return json.dumps(make_common_res(5, 'area_id异常,请检查后重试')) + crossid_list = check_param(params, 'crossid_list') + if not crossid_list or len(crossid_list) < 1: + return json.dumps(make_common_res(6, '缺少crossid_list, 请刷新后重试')) + + cross_enter_status = db_task.query_cross_entering_status(crossid_list) + cross_status_dict = {} + for row in cross_enter_status: + cross_status_dict[row['crossid']] = { + 'ledger_status': row['ledger_status'], + 'phase_status': row['phase_status'] + } + verify_res = True + for crossid in crossid_list: + if crossid not in cross_status_dict.keys(): + verify_res = False + break + if cross_status_dict[crossid]['ledger_status'] != 1 or cross_status_dict[crossid]['phase_status'] not in [1, 3]: + verify_res = False + break + if verify_res: + return json.dumps(make_common_res(0, 'ok')) + else: + return json.dumps(make_common_res(1, '验证未通过')) + + +def query_usable_wave_list(params): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_areaid_list(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = map(int, area_list) + if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: + return json.dumps(make_common_res(5, 'area_id异常,请检查后重试')) + + area_cross_list = db_tmnet.query_cross_list_sql(nodeid, area_id) + area_crossid_list = [row['crossid'] for row in area_cross_list] + node_wave_cross_dict = db_wave.query_node_wave(nodeid) + usable_wave_list = [] + for waveid in node_wave_cross_dict.keys(): + wave_info = node_wave_cross_dict[waveid] + cross_list = wave_info['cross_list'] + if set(cross_list) & set(area_crossid_list): + usable_wave_list.append(wave_info) + + res = make_common_res(0, 'ok') + res['data'] = { + 'wave_list': usable_wave_list, + 'cross_list': area_cross_list + } + return json.dumps(res) + + +def query_wave_task_requirement_validation_detail(params): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_areaid_list(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = map(int, area_list) + if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: + return json.dumps(make_common_res(5, 'area_id异常,请检查后重试')) + task_no = check_param(params, 'task_no') + if not task_no: + return json.dumps(make_common_res(6, '缺少任务编号, 请刷新后重试')) + requirement_validation_res = db_task.query_greenwave_task_requirement_validation_info_sql(task_no, nodeid, area_id) + if not requirement_validation_res: + return json.dumps(make_common_res(7, '需求确认单信息异常')) + + res = make_common_res(0, 'ok') + res['data'] = requirement_validation_res + return json.dumps(res) + + +def query_wave_task_params(params): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_areaid_list(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = map(int, area_list) + if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: + return json.dumps(make_common_res(5, 'area_id异常,请检查后重试')) + waveid = check_param(params, 'waveid') + if not waveid: + waveid = '' + wave_name = check_param(params, 'wave_name') + if not wave_name: + wave_name = '' + wave_crosses = check_param(params, 'wave_crosses') + if not wave_crosses or len(wave_crosses) < 1: + return json.dumps(make_common_res(8, '缺少wave_crosses, 请刷新后重试')) + srcDir = check_param(params, 'srcDir') + if not srcDir: + srcDir = '' + # 需要查明 路口名称、信号机品牌、 最大相位数量、绿波时段信息、配时方案信息 + wave_tp_info = [] + if waveid != '' and srcDir != '': + wave_tp_info = db_wave.query_wave_tp_infos(waveid, srcDir) + wave_crosses_infos = db_tmnet.query_cross_infos(wave_crosses) + wave_cross_info_dict = {row['crossid']: row for row in wave_crosses_infos} + phase_infos, e = None, None + for crossid in wave_cross_info_dict.keys(): + pass + + + + + + +def gen_greenwave_task_additional_info(taskno, nodeid, area_id): + # additional_info = + + pass diff --git a/app/tmnet_db_func.py b/app/tmnet_db_func.py index ccd0d42..6da7e60 100644 --- a/app/tmnet_db_func.py +++ b/app/tmnet_db_func.py @@ -675,9 +675,10 @@ class TmnetDbHelper(TableDbHelperBase): t1.crossid, if (t2.location is not null, t2.location, t1.location) as location, t1.nodeid, - t1.area_id - from (select name,crossid, location,nodeid, area_id from `cross` where crossid in (%s) and at_edge=0 and isdeleted=0 ) as t1 - left join (select name,crossid, location,nodeid, area_id from `cross_ledger_update_info` where crossid in (%s) and at_edge=0 and isdeleted=0 ) as t2 on t1.crossid=t2.crossid + t1.area_id, + if(t2.slc_company is not null, t2.slc_company, t1.slc_company) as slc_company + from (select name,crossid, location,nodeid, area_id, slc_company from `cross` where crossid in (%s) and at_edge=0 and isdeleted=0 ) as t1 + left join (select name,crossid, location,nodeid, area_id, slc_company from `cross_ledger_update_info` where crossid in (%s) and at_edge=0 and isdeleted=0 ) as t2 on t1.crossid=t2.crossid """ % (crossids, crossids) cross_list = self.do_select(sql) virtual_cross_sql = f'select name, crossid, location, nodeid, area_id from user_defined_cross where crossid in ({crossids})' diff --git a/app/wave_db_func.py b/app/wave_db_func.py new file mode 100644 index 0000000..cda4618 --- /dev/null +++ b/app/wave_db_func.py @@ -0,0 +1,100 @@ +# -*- coding: utf-8 -*- +# @Author: Owl +# @Date: 2026/3/17 12:01 +# @Description: +from app.db_func_base import * + + +class WaveDBFunction(TableDbHelperBase): + def __init__(self, pool): + self.db_pool = pool + self.DB_Name = 'greenwave' + + def query_node_wave(self, nodeid): + sql = """ + select t1.name as wave_name, t1.waveid, t2.crossid, t1.service_status, t1.srcDir, t1.first_left, t1.last_left from wave_cross t2 + inner join + (select name, waveid, service_status, srcDir, first_left, last_left from greenwave where orgid = %s and service_status = 1 and status = 0) t1 + on t1.waveid = t2.waveid + """ % nodeid + res = self.do_select(sql) + wave_cross_dict = {} + for wave_name, waveid, crossid, service_status, srcDir, first_left, last_left in res: + left_coor = 0 if first_left == 0 and last_left == 0 else 1 + if waveid not in wave_cross_dict: + wave_cross_dict[waveid] = { + 'wave_name': wave_name, + 'waveid': waveid, + 'src_dir': srcDir, + 'left_coor': left_coor, + 'service_status': service_status, + 'cross_list': [crossid] + } + else: + wave_cross_dict[waveid]['cross_list'].append(crossid) + return wave_cross_dict + + def query_wave_tp_infos(self, waveid, srcDir): + sql = """ + select type, weekday, tp_start, tp_end from wave_tps where waveid = '%s' + """ % waveid + res = self.do_select(sql) + wave_tp_infos = [] + for type, weekday, tp_start, tp_end in res: + type_str = gen_type_str(type, srcDir) + weekday_str = gen_weekday_str(weekday) + wave_tp_infos.append({ + 'type_str': type_str, + 'weekday_str': weekday_str, + 'type': type, + 'weekday': weekday, + 'tp_start': tp_start, + 'tp_end': tp_end + }) + return wave_tp_infos + + +sir2Str = { + 'S': ['南向北', '北向南'], + 'E': ['东向西', '西向东'], + 'W': ['西向东', '东向西'], + 'N': ['北向南', '南向北'], + 'NE': ['东北向西南', '西南向东北'], + 'NW': ['西北向东南', '东南向西北'], + 'SE': ['东南向西北', '西北向东南'], + 'SW': ['西南向东北', '东北向西南'] +} + +weekday2Str = { + 1: '周一', + 2: '周二', + 3: '周三', + 4: '周四', + 5: '周五', + 6: '周六', + 7: '周日' +} + + +def gen_type_str(type_int, srcDir): + type_str = '双向' + if type_int == 0: + type_str = sir2Str[srcDir][0] + elif type_int == 1: + type_str = sir2Str[srcDir][1] + return type_str + + +def gen_weekday_str(weekday): + if weekday == '1,2,3,4,5': + weekday_str = '工作日' + elif weekday == '6,7': + weekday_str = '节假日' + elif weekday == '1,2,3,4,5,6,7': + weekday_str = '全周' + else: + weekday_list = weekday.split(',') + weekday_str = weekday2Str[int(weekday_list[0])] + for i in range(1, len(weekday_list)): + weekday_str += '、' + weekday2Str[int(weekday_list[i])] + return weekday_str diff --git a/cross_doctor.ini b/cross_doctor.ini index cbc4e51..e1f60ec 100644 --- a/cross_doctor.ini +++ b/cross_doctor.ini @@ -67,6 +67,13 @@ user = root password = pmenJIn7EaK40oThn~~~ dbname = tmnet +[wave_db] +host = 43.140.225.219 +host_inner = 172.21.32.10 +user = root +password = pmenJIn7EaK40oThn~~ +db_name = greenwave + [executive] userid = 15836903493,18518481331 \ No newline at end of file