# -*- coding: utf-8 -*- # @Author: Owl # @Date: 2025/11/4 15:13 # @Description: 路口巡检页面相关接口 import json from app.monitor_common import * # 查询当前城市巡检任务可用日期列表 def query_monitor_task_usable_date_list(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少城市信息, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少辖区信息, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少用户信息, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) day_list, week_list = db_cross.query_monitor_task_dates(nodeid, area_id) tp_desc = db_tmnet.query_city_tp_info(nodeid, area_id) if not tp_desc: tp_info = [ "00:00-07:00", "07:00-09:00", "09:00-17:00", "17:00-19:00", "19:00-22:00", "22:00-23:59", "00:00-23:59" ] peak_tp = [ "07:00-09:00", "17:00-19:00" ] else: tp_info = tp_desc[0]['tp_desc'].split(',') tp_info.append("00:00-23:59") peak_tp = tp_desc[0]['peak_tp'].split(',') res = make_common_res(0, 'ok') res['data'] = { 'days': day_list, 'weeks': week_list, 'tp_info': tp_info, 'peak_tp': peak_tp } return json.dumps(res, ensure_ascii=False) # 查询巡检数据详情 def query_monitor_data(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少城市信息, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少辖区信息, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少用户信息, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) date_type = check_param(params, 'date_type') if not date_type: return json.dumps(make_common_res(6, '缺少查询时间类型, 请刷新后重试')) if date_type not in ['day', 'week', 'workday', 'weekend']: return json.dumps(make_common_res(7, '查询时间类型异常, 请刷新后重试')) special_time_range = check_param(params, 'special_time_range') if not special_time_range or special_time_range == '00:00-23:59': special_time_range = '' query_date = check_param(params, 'query_date') if not query_date: return json.dumps(make_common_res(8, '缺少查询日期, 请刷新后重试')) row_list = db_cross.query_monitor_data_sql(nodeid, area_id, date_type, query_date) if len(row_list) < 1: return json.dumps(make_common_res(9, '没有查询到数据')) routing_crosses = db_tmnet.query_routing_crosses(nodeid, area_id) routing_crosses_dict = {item['crossid']: item for item in routing_crosses} cross_report_pb = pb.xl_cross_report_t() cross_report_pb.ParseFromString(row_list[0]['data']) # print(MessageToJson(cross_report_pb, indent=None, always_print_fields_with_no_presence=True)) overview_data = gen_monitor_overview_data(cross_report_pb, date_type, routing_crosses, special_time_range, routing_crosses_dict) division_list, company_list, slc_company_dict, slckind_list, detector_type_dict = gen_ledger_base_info(nodeid, area_id) monitor_crosses_ledger_info = gen_monitor_cross_ledger_info(routing_crosses, nodeid, area_id, slc_company_dict, date_type, query_date) cross_delay_info_list = gen_cross_delay_info_list(userid, area_id, nodeid, date_type, cross_report_pb, special_time_range, routing_crosses_dict, slc_company_dict) res = make_common_res(0, 'ok') res['data'] = { 'overview_data': overview_data, 'monitor_crosses_ledger_info': monitor_crosses_ledger_info, 'cross_delay_info_list': cross_delay_info_list } return json.dumps(res, ensure_ascii=False) # 查询巡检数据变化趋势 def query_monitor_data_trend(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少城市信息, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少辖区信息, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少用户信息, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) date_type = check_param(params, 'date_type') if not date_type: return json.dumps(make_common_res(6, '缺少查询时间类型, 请刷新后重试')) if date_type not in ['day', 'week', 'workday', 'weekend']: return json.dumps(make_common_res(7, '查询时间类型异常, 请刷新后重试')) query_date = check_param(params, 'query_date') if not query_date: return json.dumps(make_common_res(8, '缺少查询日期, 请刷新后重试')) special_time_range = check_param(params, 'special_time_range') if not special_time_range or special_time_range == '00:00-23:59': special_time_range = '' monitor_datas = db_cross.query_monitor_data_trend_sql(nodeid, area_id, date_type) data = parse_monitor_trend_data(monitor_datas, date_type, special_time_range) res = make_common_res(0, 'ok') res['data'] = data return json.dumps(res, ensure_ascii=False) # 查询路口时段数据变化趋势 def query_cross_tp_data_trend(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少城市信息, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少辖区信息, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少用户信息, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) date_type = check_param(params, 'date_type') if not date_type: return json.dumps(make_common_res(6, '缺少查询时间类型, 请刷新后重试')) if date_type not in ['day', 'week', 'workday', 'weekend']: return json.dumps(make_common_res(7, '查询时间类型异常, 请刷新后重试')) weekdays = check_param(params, 'weekdays') if not weekdays: return json.dumps(make_common_res(8, '缺少查询星期, 请刷新后重试')) query_type= check_param(params, 'query_type') if not query_type: query_type = 0 time_range = check_param(params, 'time_range') if not time_range: return json.dumps(make_common_res(10, '缺少查询时段, 请刷新后重试')) tp_start = int(str(time_range.split('-')[0]).split(':')[0]) * 100 + int(str(time_range.split('-')[0]).split(':')[1]) tp_end = int(str(time_range.split('-')[1]).split(':')[0]) * 100 + int(str(time_range.split('-')[1]).split(':')[1]) if query_type == 1: tp_start = 't' + str(tp_start) elif query_type == 2: tp_start = 'h' + str(tp_start) crossid = check_param(params, 'crossid') if not crossid: return json.dumps(make_common_res(11, '缺少路口id, 请刷新后重试')) # 查询台账信息 获取路网渠化关系 cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid) if not cross_ledger_info_dict: return json.dumps(make_common_res(9, '查询路口信息失败')) cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict) roads_dir_dict = gen_road_dir_dict(cross_ledger_info) if date_type == 'day': month_ago_date = (datetime.now().date() - timedelta(days=30)).strftime('%Y%m%d') now_prev_date = (datetime.now().date() - timedelta(days=1)).strftime('%Y%m%d') date_list = generate_date_range(month_ago_date, now_prev_date) # 查询近30天的数据 days_data = db_cross.query_cross_delay_info(crossid, nodeid, date_list, tp_start) data_list = parse_data2pb4monitor(days_data, weekdays) else: date_list = gen_ten_weeks_ago_data_list_with_weekdays(date_type) data_list = [] for week_dates in date_list: week_data = db_cross.query_cross_delay_info(crossid, nodeid, week_dates, tp_start) week_cross_delay_info = gen_avg_cross_delay_pb(week_data) data_list.append({ 'day': week_dates[0] + '-' + week_dates[-1], 'tp_start': tp_start, 'tp_end': tp_end, 'data': week_cross_delay_info, 'week_dates': week_dates }) cross_delay_dict = parse_single_cross_delay_info4monitor(crossid, nodeid, data_list, date_type, roads_dir_dict) res_data = calc_single_day_delay_info_change_rate4monitor(cross_delay_dict) res = make_common_res(0, 'ok') weekdays_str = gen_week_str({'weekday': weekdays}) res['data'] = { 'name': cross_static_info['name'], 'weekdays': weekdays_str, 'time_range': time_range, 'delay_infos': res_data } return json.dumps(res, ensure_ascii=False) def query_monitor_problems(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少城市信息, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少辖区信息, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少用户信息, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) date_type = check_param(params, 'date_type') if not date_type: return json.dumps(make_common_res(6, '缺少查询时间类型, 请刷新后重试')) if date_type not in ['day', 'week', 'workday', 'weekend']: return json.dumps(make_common_res(7, '查询时间类型异常, 请刷新后重试')) special_time_range = check_param(params, 'special_time_range') if not special_time_range or special_time_range == '00:00-23:59': special_time_range = '' query_date = check_param(params, 'query_date') if not query_date: return json.dumps(make_common_res(8, '缺少查询日期, 请刷新后重试')) filter_shield = check_param(params, 'filter_shield') if not filter_shield: filter_shield = 0 shield_info = { 'normal': {}, 'tide': {}, 'sample': {}, 'phase': [] } row_list = db_cross.query_monitor_data_sql(nodeid, area_id, date_type, query_date) if len(row_list) < 1: return json.dumps(make_common_res(9, '没有查询到数据')) routing_crosses = db_tmnet.query_routing_crosses(nodeid, area_id) routing_crosses_dict = {item['crossid']: item for item in routing_crosses} for crossid in routing_crosses_dict.keys(): cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid) if not cross_ledger_info_dict: logging.error('查询路口信息失败', crossid) continue cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict) roads_dir_dict = gen_road_dir_dict(cross_ledger_info) routing_crosses_dict[crossid]['roads_dir_dict'] = roads_dir_dict routing_crosses_dict[crossid]['cross_static_info'] = cross_static_info cross_inroads = db_tmnet.query_cross_inroads(crossid, nodeid) inroad_static_info_dict = {item['roadid']: item for item in cross_inroads} routing_crosses_dict[crossid]['inroad_static_info_dict'] = inroad_static_info_dict routing_crosses_dict[crossid]['cross_ledger_info'] = cross_ledger_info cross_report_pb = pb.xl_cross_report_t() cross_report_pb.ParseFromString(row_list[0]['data']) cross_problem_records = db_cross.query_cross_problem_record(area_id, datetime.strptime(query_date, '%Y%m%d').strftime('%Y-%m-%d'), date_type) records = { 'normal': {}, 'tide': {} } for row in cross_problem_records: if row['tide_cross'] and row['tide_cross'] != 0: records['tide'][row['crossid']] = row['tide_cross'] continue crossid = row['crossid'] weekdays = row['weekday'] time_range = row['time_range'] date_type = row['date_type'] key = crossid + '^' + weekdays + '^' + time_range + '^' + date_type records['normal'][key] = row # if filter_shield == 1: shield_row_list = db_cross.query_shield_records(area_id) for row in shield_row_list: problem_key = row['problem_key'] if problem_key in ('tide_cross', 'unmatched_lane_num'): if row['crossid'] not in shield_info['tide'].keys(): shield_info['tide'][row['crossid']] = [problem_key] else: shield_info['tide'][row['crossid']].append(problem_key) elif problem_key == 'phase_error': shield_info['phase'].append(row['crossid'] + '^' + row['time_range']) elif problem_key in ('sample_sch', 'sample_tp'): key = row['crossid'] + '^' + row['weekdays'] if key not in shield_info['sample'].keys(): shield_info['sample'][key] = [problem_key] else: shield_info['sample'][key].append(problem_key) else: key = row['crossid'] + '^' + row['weekdays'] + '^' + row['time_range'] + '^' + row['date_type'] if key not in shield_info['normal'].keys(): shield_info['normal'][key] = [problem_key] else: shield_info['normal'][key].append(problem_key) monitor_problems = gen_monitor_problem_info(cross_report_pb, routing_crosses_dict, special_time_range, records, date_type, shield_info, query_date, nodeid, area_id, filter_shield) res = make_common_res(0, 'ok') res['data'] = monitor_problems return json.dumps(res, ensure_ascii=False) def update_cross_problem_shield_state(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少城市信息, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少辖区信息, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少用户信息, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) weekdays = check_param(params, 'weekdays') if not weekdays: weekdays = '' time_range = check_param(params, 'time_range') if not time_range: time_range = '' crossid = check_param(params, 'crossid') if not crossid: return json.dumps(make_common_res(6, '缺少路口信息, 请刷新后重试')) problem_key = check_param(params, 'problem_key') if not problem_key: return json.dumps(make_common_res(7, '缺少问题类型, 请刷新后重试')) op_type = check_param(params, 'op_type') if not op_type: return json.dumps(make_common_res(8, '缺少操作类型, 请刷新后重试')) date_type = check_param(params, 'date_type') if not date_type: return json.dumps(make_common_res(9, '缺少查询日期类型, 请刷新后重试')) if op_type == 'add': ret = db_cross.add_shield_records(crossid ,area_id, weekdays, time_range, problem_key, date_type) if ret == 1: return json.dumps(make_common_res(0, 'ok')) else: return json.dumps(make_common_res(10, '添加屏蔽信息失败')) elif op_type == 'del': ret = db_cross.delete_shield_records(crossid, area_id, weekdays, time_range, problem_key, date_type) if ret == 1: return json.dumps(make_common_res(0, 'ok')) else: return json.dumps(make_common_res(11, '删除屏蔽信息失败')) else: return json.dumps(make_common_res(9, '操作类型异常'))