# -*- coding: utf-8 -*- # @Author: Owl # @Date: 2025/11/4 15:14 # @Description: 路口巡检页面接口的公共函数类 import logging from datetime import timezone from app.eva_common import * from proto.phase_grpc import QueryCrossPhaseTpStatistics, GetCrossPhaseDiagnosis weekday2Str = { 1: '周一', 2: '周二', 3: '周三', 4: '周四', 5: '周五', 6: '周六', 7: '周日' } def gen_monitor_overview_data(cross_report_pb, date_type, routing_crosses, special_time_range, routing_crosses_dict): overview = { 'stop_times': '-', 'high_park_percent': '-', 'park_time': '-', 'delay_time': '-', 'speed': '-', 'move_speed': '-', 'jam_index': '-', 'cross_num': len(routing_crosses), 'tp_num': 0, 'stop_times_color': 0, 'high_park_percent_color': 0, 'park_time_color': 0, 'delay_time_color': 0, 'speed_color': 0, 'move_speed_color': 0, 'jam_index_color': 0, 'cross_service_levels': { 'A/B': 0, 'C/D': 0, 'E': 0, 'F': 0, 'total': 0 } } if not cross_report_pb: return overview cross_delay_info_list = cross_report_pb.all_crosstp_indexes if date_type == 'day': stop_times = round(cross_report_pb.job_index.day_index.stop_times, 2) high_park_percent = cross_report_pb.job_index.day_index.high_park_percent park_time = cross_report_pb.job_index.day_index.park_time delay_time = cross_report_pb.job_index.day_index.delay_time speed = round(cross_report_pb.job_index.day_index.speed / 100, 2) move_speed = round(cross_report_pb.job_index.day_index.move_speed / 100, 2) jam_index = round(cross_report_pb.job_index.day_index.jam_index, 2) prev_stop_times = round(cross_report_pb.job_index_prev.day_index.stop_times, 2) prev_high_park_percent = cross_report_pb.job_index_prev.day_index.high_park_percent prev_park_time = cross_report_pb.job_index_prev.day_index.park_time prev_delay_time = cross_report_pb.job_index_prev.day_index.delay_time prev_speed = round(cross_report_pb.job_index_prev.day_index.speed / 100, 2) prev_move_speed = round(cross_report_pb.job_index_prev.day_index.move_speed / 100, 2) prev_jam_index = round(cross_report_pb.job_index_prev.day_index.jam_index, 2) tmp_list = [] for cross_delay_info in cross_delay_info_list: if cross_delay_info.crossid not in routing_crosses_dict.keys(): continue tmp_list.append(cross_delay_info) cross_delay_info_list = tmp_list elif date_type == 'week': stop_times = round(cross_report_pb.job_index.week_index.stop_times, 2) high_park_percent = cross_report_pb.job_index.week_index.high_park_percent park_time = cross_report_pb.job_index.week_index.park_time delay_time = cross_report_pb.job_index.week_index.delay_time speed = round(cross_report_pb.job_index.week_index.speed / 100, 2) move_speed = round(cross_report_pb.job_index.week_index.move_speed / 100, 2) jam_index = round(cross_report_pb.job_index.week_index.jam_index, 2) prev_stop_times = round(cross_report_pb.job_index_prev.week_index.stop_times, 2) prev_high_park_percent = cross_report_pb.job_index_prev.week_index.high_park_percent prev_park_time = cross_report_pb.job_index_prev.week_index.park_time prev_delay_time = cross_report_pb.job_index_prev.week_index.delay_time prev_speed = round(cross_report_pb.job_index_prev.week_index.speed / 100, 2) prev_move_speed = round(cross_report_pb.job_index_prev.week_index.move_speed / 100, 2) prev_jam_index = round(cross_report_pb.job_index_prev.week_index.jam_index, 2) tmp_list = [] for cross_delay_info in cross_delay_info_list: if cross_delay_info.crossid not in routing_crosses_dict.keys(): continue tmp_list.append(cross_delay_info) cross_delay_info_list = tmp_list elif date_type == 'weekend': stop_times = round(cross_report_pb.job_index.weekend_index.stop_times, 2) high_park_percent = cross_report_pb.job_index.weekend_index.high_park_percent park_time = cross_report_pb.job_index.weekend_index.park_time delay_time = cross_report_pb.job_index.weekend_index.delay_time speed = round(cross_report_pb.job_index.weekend_index.speed / 100, 2) move_speed = round(cross_report_pb.job_index.weekend_index.move_speed / 100, 2) jam_index = round(cross_report_pb.job_index.weekend_index.jam_index, 2) prev_stop_times = round(cross_report_pb.job_index_prev.weekend_index.stop_times, 2) prev_high_park_percent = cross_report_pb.job_index_prev.weekend_index.high_park_percent prev_park_time = cross_report_pb.job_index_prev.weekend_index.park_time prev_delay_time = cross_report_pb.job_index_prev.weekend_index.delay_time prev_speed = round(cross_report_pb.job_index_prev.weekend_index.speed / 100, 2) prev_move_speed = round(cross_report_pb.job_index_prev.weekend_index.move_speed / 100, 2) prev_jam_index = round(cross_report_pb.job_index_prev.weekend_index.jam_index, 2) tmp_list = [] for cross_delay_info in cross_delay_info_list: if cross_delay_info.crossid not in routing_crosses_dict.keys(): continue if '6' in parse_wd_bitmap(cross_delay_info.tp.weekday) or '7' in parse_wd_bitmap(cross_delay_info.tp.weekday): tmp_list.append(cross_delay_info) cross_delay_info_list = tmp_list else: stop_times = round(cross_report_pb.job_index.workday_index.stop_times, 2) high_park_percent = cross_report_pb.job_index.workday_index.high_park_percent park_time = cross_report_pb.job_index.workday_index.park_time delay_time = cross_report_pb.job_index.workday_index.delay_time speed = round(cross_report_pb.job_index.workday_index.speed / 100, 2) move_speed = round(cross_report_pb.job_index.workday_index.move_speed / 100, 2) jam_index = round(cross_report_pb.job_index.workday_index.jam_index, 2) prev_stop_times = round(cross_report_pb.job_index_prev.workday_index.stop_times, 2) prev_high_park_percent = cross_report_pb.job_index_prev.workday_index.high_park_percent prev_park_time = cross_report_pb.job_index_prev.workday_index.park_time prev_delay_time = cross_report_pb.job_index_prev.workday_index.delay_time prev_speed = round(cross_report_pb.job_index_prev.workday_index.speed / 100, 2) prev_move_speed = round(cross_report_pb.job_index_prev.workday_index.move_speed / 100, 2) prev_jam_index = round(cross_report_pb.job_index_prev.workday_index.jam_index, 2) tmp_list = [] for cross_delay_info in cross_delay_info_list: if cross_delay_info.crossid not in routing_crosses_dict.keys(): continue if '1' in parse_wd_bitmap(cross_delay_info.tp.weekday) or '2' in parse_wd_bitmap(cross_delay_info.tp.weekday) \ or '3' in parse_wd_bitmap(cross_delay_info.tp.weekday) or '4' in parse_wd_bitmap(cross_delay_info.tp.weekday) \ or '5' in parse_wd_bitmap(cross_delay_info.tp.weekday): tmp_list.append(cross_delay_info) cross_delay_info_list = tmp_list for cross_delay_info in cross_delay_info_list: if special_time_range != '' and not is_overlap_greater_than_one_hour(cross_delay_info.tp.start_hm, cross_delay_info.tp.end_hm, special_time_range): continue service_level = calc_service_level(cross_delay_info.delay_info.delay_time) if service_level != '-': overview['cross_service_levels']['total'] += 1 if service_level in ('A', 'B'): overview['cross_service_levels']['A/B'] += 1 elif service_level in ('C', 'D'): overview['cross_service_levels']['C/D'] += 1 elif service_level == 'E': overview['cross_service_levels']['E'] += 1 elif service_level == 'F': overview['cross_service_levels']['F'] += 1 overview['stop_times'] = stop_times overview['high_park_percent'] = high_park_percent overview['park_time'] = park_time overview['delay_time'] = delay_time overview['speed'] = speed overview['move_speed'] = move_speed overview['jam_index'] = jam_index overview['stop_times_color'] = calc_index_color(stop_times, prev_stop_times, 'stop_times') overview['high_park_percent_color'] = calc_index_color(high_park_percent, prev_high_park_percent, 'high_park_percent') overview['park_time_color'] = calc_index_color(park_time, prev_park_time, 'park_time') overview['delay_time_color'] = calc_index_color(delay_time, prev_delay_time, 'delay_time') overview['speed_color'] = calc_index_color(speed, prev_speed, 'speed') overview['move_speed_color'] = calc_index_color(move_speed, prev_move_speed, 'move_speed') overview['jam_index_color'] = calc_index_color(jam_index, prev_jam_index, 'jam_index') return overview def parse_wd_bitmap(wd_bitmap: int): weekday_list = [] for i in range(0, 7): if (wd_bitmap >> i) & 0x1: weekday_list.append(str(i+1)) return ','.join(weekday_list) def calc_index_color(index, prev_index, key): color = 0 # 0表示正常 1表示恶化 2表示变优 if key in ('stop_times', 'high_park_percent', 'park_time', 'delay_time', 'jam_index'): if index and prev_index and prev_index != 0: if (index - prev_index) / prev_index > 0.2: color = 1 elif (index - prev_index) / prev_index < -0.2: color = 2 if key in ('speed', 'move_speed'): if index and prev_index and prev_index != 0: if (index - prev_index) / prev_index < -0.2: color = 1 elif (index - prev_index) / prev_index > 0.2: color = 2 return color def get_week_dates(sunday, type_='day'): """ 根据周日日期返回指定类型的日期列表 参数: sunday: 周日日期('YYYY-MM-DD'字符串或datetime/date对象) type_: 返回类型 ('day', 'week', 'weekend', 'workday') 返回: 格式为['yyyyMMdd', ...]的列表 """ # 统一转为date对象 if isinstance(sunday, str): sunday = datetime.strptime(sunday, '%Y%m%d').date() elif isinstance(sunday, datetime): sunday = sunday.date() # 生成本周日期(周一到周日) week = [sunday - timedelta(days=6 - i) for i in range(7)] # 根据类型切片选择 slices = { 'day': slice(6, 7), # 周日 'week': slice(7), # 周一到周日 'weekend': slice(5, 7), # 周六到周日 'workday': slice(5) # 周一到周五 } selected = week[slices[type_]] return [d.strftime('%Y%m%d') for d in selected] def gen_monitor_cross_ledger_info(routing_crosses, nodeid, area_id, slc_company_dict, date_type, query_date): crossid_list = [item['crossid'] for item in routing_crosses] date_list = get_week_dates(query_date, type_=date_type) if date_type == 'weekend': weekdays = '6,7' elif date_type == 'workday': weekdays = '1,2,3,4,5' elif date_type == 'week': weekdays = '1,2,3,4,5,6,7' else: weekdays = str(datetime.strptime(query_date, '%Y%m%d').weekday() + 1) phase_info, e = QueryCrossPhaseTpStatistics(int(nodeid), int(area_id), weekdays, date_list) phase_res = {} if not e and phase_info.code == 0: tp_num_res = [] tp_num_detail = phase_info.data.tp_num_detail for item in tp_num_detail: tp_num_res.append({ 'name': item.range, 'num': item.value }) phase_res = { 'phase_cross_num': phase_info.data.cross_phase_num, 'update_cross_num': phase_info.data.cross_update_num, 'avg_num': round(phase_info.data.avg_tp_num, 2), 'tp_num': tp_num_res, } slc_company_info = db_tmnet.query_crosses_slc_company_info(crossid_list) internet_info = db_tmnet.query_crosses_internet_info(crossid_list) slc_company_info_list, internet_info_list, has_reverse_turn, reversible_lane_num = [], [], 0, 0 for row in slc_company_info: if row['slc_company'] not in slc_company_dict.keys(): continue slc_company_name = slc_company_dict[row['slc_company']] slc_num = row['num'] slc_company_info_list.append({ 'name': slc_company_name, 'num': slc_num }) for row in internet_info: internet_code = row['internet'] if internet_code == 0: internet_name = '未配置' elif internet_code == 1: internet_name = '联网' elif internet_code == 2: internet_name = '脱机' else: internet_name = '未联网' num = row['num'] internet_info_list.append({ 'name': internet_name, 'num': num }) for croosid in crossid_list: inroads = db_tmnet.query_cross_inroads(croosid, nodeid) inroadid_list = [item['roadid'] for item in inroads] cross_has_reverse_lane = db_tmnet.check_reverse_turn(inroadid_list) if cross_has_reverse_lane: has_reverse_turn += 1 reversible_lane_num = db_tmnet.calc_has_reversible_lane_crosses(crossid_list)[0]['num'] special_info = { 'reverse_turn': has_reverse_turn, 'reversible_lane_num': reversible_lane_num } monitor_crosses_ledger_info = { 'slc_company_info': slc_company_info_list, 'internet_info': internet_info_list, 'special_info': special_info, 'phase_info': phase_res } return monitor_crosses_ledger_info def gen_cross_delay_info_list(userid, area_id, nodeid, date_type, cross_report_pb, special_time_range, routing_crosses_dict, slc_company_dict): cross_delay_index_list = cross_report_pb.all_crosstp_indexes cross_delay_index_list_prev = cross_report_pb.all_crosstp_indexes_prev all_cross_index_dict = parse_cross_index_dict(cross_delay_index_list, special_time_range, routing_crosses_dict, date_type, area_id, nodeid, userid) all_cross_index_dict_prev = parse_cross_index_dict(cross_delay_index_list_prev, special_time_range, routing_crosses_dict, date_type, area_id, nodeid, userid) # print(json.dumps(all_cross_index_dict, ensure_ascii=False)) for crossid in all_cross_index_dict.keys(): if 'udc_' in crossid: cross_ledger_info = db_tmnet.query_cross_ledger_info(crossid, nodeid, area_id) else: cross_ledger_info = db_tmnet.query_virtual_cross_info(crossid, nodeid, area_id) internet_code = cross_ledger_info[0]['internet'] if cross_ledger_info else 0 if internet_code == 0: internet_name = '未配置' elif internet_code == 1: internet_name = '联网' elif internet_code == 2: internet_name = '脱机' else: internet_name = '未联网' slc_company = cross_ledger_info[0]['slc_company'] if cross_ledger_info else 0 slc_company_name = slc_company_dict[slc_company] if slc_company in slc_company_dict.keys() else '-' cross_model = cross_ledger_info[0]['cross_model'].split(',') if cross_ledger_info and cross_ledger_info[0]['cross_model'] and cross_ledger_info[0]['cross_model'] != '-' and cross_ledger_info[0]['cross_model'] != '' else ['3', '3'] cross_model_dict = { '1': '快', '2': '主', '3': '次', '4': '支', } cross_model_str = '-'.join([cross_model_dict[item] for item in cross_model]) all_cross_index_dict[crossid]['cross_model'] = cross_model_str all_cross_index_dict[crossid]['internet'] = internet_name all_cross_index_dict[crossid]['slc_company'] = slc_company_name all_cross_index_dict[crossid]['worst_service_level'] = 'A' if crossid not in all_cross_index_dict_prev: continue delay_infos = all_cross_index_dict[crossid]['delay_infos'] delay_infos_prev = all_cross_index_dict_prev[crossid]['delay_infos'] weekdays_list = list(delay_infos.keys()) for weekdays in weekdays_list: if weekdays not in delay_infos_prev.keys(): continue tp_delay_infos = delay_infos[weekdays] tp_delay_infos_prev = delay_infos_prev[weekdays] for tp_info in tp_delay_infos: for tp_info_prev in tp_delay_infos_prev: tp_start, tp_end = tp_info['start_time'], tp_info['end_time'] if tp_start == tp_info_prev['start_time'] and tp_end == tp_info_prev['end_time']: if tp_info['service_level'] != '-': if tp_info['service_level'] > all_cross_index_dict[crossid]['worst_service_level']: all_cross_index_dict[crossid]['worst_service_level'] = tp_info['service_level'] if tp_info_prev['service_level'] != '-': if tp_info['service_level'] < tp_info_prev['service_level']: tp_info['service_level_color'] = 2 elif tp_info['service_level'] > tp_info_prev['service_level']: tp_info['service_level_color'] = 1 for key in ('delay_time', 'stop_times', 'high_park_percent', 'park_time', 'relative_flow_rate', 'flow', 'jam_index', 'imbalance_index'): if tp_info_prev[key] != 0: rate = round((tp_info[key] - tp_info_prev[key]) / tp_info_prev[key] * 100, 2) tp_info[key + '_rate'] = rate if rate > 20: tp_info[key + '_color'] = 1 elif rate < -20: tp_info[key + '_color'] = 2 for key in ('speed', 'move_speed'): if tp_info_prev[key] != 0: rate = round((tp_info[key] - tp_info_prev[key]) / tp_info_prev[key] * 100, 2) tp_info[key + '_rate'] = rate if rate > 20: tp_info[key + '_color'] = 2 elif rate < -20: tp_info[key + '_color'] = 1 return all_cross_index_dict def parse_cross_index_dict(delay_index_list, special_time_range, routing_crosses_dict, date_type, area_id, nodeid, userid): res = {} tp_desc = db_tmnet.query_city_tp_info(nodeid, area_id) peak_tp = ['07:00-09:00', '17:00-19:00'] if tp_desc and len(tp_desc) > 0: peak_tp = tp_desc[0]['peak_tp'].split(',') user_favorite_crosses = db_workstation.query_favorite_crosses(userid, nodeid, area_id) user_favorite_crosses = [item['favorite_id'] for item in user_favorite_crosses] cross_week_flow_dict = {} for delay_index in delay_index_list: crossid = delay_index.crossid if crossid not in routing_crosses_dict.keys(): continue weekdays = parse_wd_bitmap(delay_index.tp.weekday) key = '%s_%s' % (weekdays, crossid) if key not in cross_week_flow_dict.keys(): cross_week_flow_dict[key] = delay_index.delay_info.car_num if key in cross_week_flow_dict.keys() and delay_index.delay_info.car_num > cross_week_flow_dict[key]: cross_week_flow_dict[key] = delay_index.delay_info.car_num for delay_index in delay_index_list: crossid = delay_index.crossid if crossid not in routing_crosses_dict.keys(): continue start_hm = delay_index.tp.start_hm end_hm = delay_index.tp.end_hm tp_type = delay_index.tp.type if special_time_range != '' and not is_overlap_greater_than_one_hour(start_hm, end_hm, special_time_range): continue start_time, end_time = convert_time(start_hm), convert_time(end_hm) weekdays = parse_wd_bitmap(delay_index.tp.weekday) weekdays_str = gen_week_str({'weekday': weekdays}) if date_type in ('weekend', 'workday'): if date_type == 'weekend' and '6' not in weekdays and '7' not in weekdays: continue if date_type == 'workday' and '1' not in weekdays and '2' not in weekdays and '3' not in weekdays and '4' not in weekdays and '5' not in weekdays: continue if crossid not in res.keys(): res[crossid] = { 'crossid': crossid, 'name': routing_crosses_dict[crossid]['name'], 'location': routing_crosses_dict[crossid]['location'], 'delay_infos': {}, 'favorite': 1 if crossid in user_favorite_crosses else 0 } if weekdays not in res[crossid]['delay_infos'].keys(): res[crossid]['delay_infos'][weekdays] = [] is_peak = 0 if not time_overlap(start_time + '-' + end_time, peak_tp) else 1 relative_flow_rate = round(delay_index.delay_info.car_num / cross_week_flow_dict[weekdays + '_' + crossid], 2) if weekdays + '_' + crossid in cross_week_flow_dict.keys() and cross_week_flow_dict[weekdays + '_' + crossid] > 0 else 0 delay_info = { 'start_time': start_time, 'end_time': end_time if end_time != '24:00' else '23:59', 'weekdays': weekdays, 'tp_type': tp_type, # 时段类型,默认0表示普通时段,1表示小时级时段,2表示单日典型时段 'weekdays_str': weekdays_str, 'service_level': calc_service_level(delay_index.delay_info.delay_time), 'stop_times': round(delay_index.delay_info.stop_times, 2), 'high_park_percent': delay_index.delay_info.high_park_percent, 'park_time': delay_index.delay_info.park_time, 'delay_time': delay_index.delay_info.delay_time, 'speed': round(delay_index.delay_info.speed / 100, 2), 'move_speed': round(delay_index.delay_info.move_speed / 100, 2), 'relative_flow_rate': relative_flow_rate, 'flow': delay_index.delay_info.car_num, 'jam_index': round(delay_index.delay_info.jam_index, 2), 'imbalance_index': round(delay_index.delay_info.imbalance_index, 2), 'service_level_color': 0, 'stop_times_color': 0, 'high_park_percent_color': 0, 'park_time_color': 0, 'delay_time_color': 0, 'speed_color': 0, 'move_speed_color': 0, 'relative_flow_rate_color': 0, 'flow_color': 0, 'jam_index_color': 0, 'imbalance_index_color': 0, 'stop_times_rate': 0, 'high_park_percent_rate': 0, 'park_time_rate': 0, 'delay_time_rate': 0, 'speed_rate': 0, 'move_speed_rate': 0, 'relative_flow_rate_rate': 0, 'flow_rate': 0, 'jam_index_rate': 0, 'imbalance_index_rate': 0, 'is_peak': is_peak } res[crossid]['delay_infos'][weekdays].append(delay_info) for crossid in res.keys(): delay_infos = res[crossid]['delay_infos'] for weekday in delay_infos.keys(): delay_infos[weekday].sort(key=lambda x: x['start_time']) return res def is_overlap_greater_than_one_hour(startHm, endHm, time_range): # 将 startHm 和 endHm 转换为小时和分钟 start_hour = startHm // 100 start_minute = startHm % 100 end_hour = endHm // 100 end_minute = endHm % 100 # 解析 time_range start_time_range, end_time_range = time_range.split('-') start_hr, start_min = map(int, start_time_range.split(':')) end_hr, end_min = map(int, end_time_range.split(':')) # 将所有时间转换为分钟(自午夜以来的总分钟数) def to_minutes(hour, minute): return hour * 60 + minute # 转换所有时间点 start = to_minutes(start_hour, start_minute) end = to_minutes(end_hour, end_minute) range_start = to_minutes(start_hr, start_min) range_end = to_minutes(end_hr, end_min) # 计算重叠部分 overlap_start = max(start, range_start) overlap_end = min(end, range_end) if overlap_start < overlap_end: overlap_duration = overlap_end - overlap_start return overlap_duration >= 60 # 重叠时长大于等于60分钟返回True else: return False def gen_week_str(tp_info): weekdays = tp_info['weekday'] if weekdays == '1,2,3,4,5,6,7': week_str = '全周' elif weekdays == '1,2,3,4,5': week_str = '工作日' else: weekday_list = str(weekdays).split(',') weekday_str_list = [] for item in weekday_list: weekday_str_list.append(weekday2Str[int(item)]) week_str = '、'.join(weekday_str_list) return week_str def parse_monitor_trend_data(monitor_datas, date_type, special_time_range): res = {} for monitor_data in monitor_datas: item_res = { 'stop_times': '-', 'high_park_percent': '-', 'park_time': '-', 'delay_time': '-', 'speed': '-', 'move_speed': '-', 'jam_index': '-', 'EF_rate': '-', 'stop_times_color': 0, 'high_park_percent_color': 0, 'park_time_color': 0, 'delay_time_color': 0, 'speed_color': 0, 'move_speed_color': 0, 'jam_index_color': 0, 'imbalance_index_color': 0, 'EF_rate_color': 0 } day = monitor_data['day'] day_str = str(day) if date_type in ('week', 'weekend', 'workday'): day_list = [(datetime.strptime(str(day), "%Y%m%d") + timedelta(days=i)).strftime('%Y%m%d') for i in range(7)] day_str = day_list[0] + '-' + day_list[-1] cross_report_pb = pb.xl_cross_report_t() cross_report_pb.ParseFromString(monitor_data['data']) if not cross_report_pb: res[day_str] = item_res continue cross_delay_info_list = cross_report_pb.all_crosstp_indexes if date_type == 'day': stop_times = round(cross_report_pb.job_index.day_index.stop_times, 2) high_park_percent = cross_report_pb.job_index.day_index.high_park_percent park_time = cross_report_pb.job_index.day_index.park_time delay_time = cross_report_pb.job_index.day_index.delay_time speed = round(cross_report_pb.job_index.day_index.speed / 100, 2) move_speed = round(cross_report_pb.job_index.day_index.move_speed / 100, 2) jam_index = round(cross_report_pb.job_index.day_index.jam_index, 2) elif date_type == 'week': stop_times = round(cross_report_pb.job_index.week_index.stop_times, 2) high_park_percent = cross_report_pb.job_index.week_index.high_park_percent park_time = cross_report_pb.job_index.week_index.park_time delay_time = cross_report_pb.job_index.week_index.delay_time speed = round(cross_report_pb.job_index.week_index.speed / 100, 2) move_speed = round(cross_report_pb.job_index.week_index.move_speed / 100, 2) jam_index = round(cross_report_pb.job_index.week_index.jam_index, 2) elif date_type == 'weekend': stop_times = round(cross_report_pb.job_index.weekend_index.stop_times, 2) high_park_percent = cross_report_pb.job_index.weekend_index.high_park_percent park_time = cross_report_pb.job_index.weekend_index.park_time delay_time = cross_report_pb.job_index.weekend_index.delay_time speed = round(cross_report_pb.job_index.weekend_index.speed / 100, 2) move_speed = round(cross_report_pb.job_index.weekend_index.move_speed / 100, 2) jam_index = round(cross_report_pb.job_index.weekend_index.jam_index, 2) tmp_list = [] for cross_delay_info in cross_delay_info_list: if '6' in parse_wd_bitmap(cross_delay_info.tp.weekday) or '7' in parse_wd_bitmap(cross_delay_info.tp.weekday): tmp_list.append(cross_delay_info) cross_delay_info_list = tmp_list else: stop_times = round(cross_report_pb.job_index.workday_index.stop_times, 2) high_park_percent = cross_report_pb.job_index.workday_index.high_park_percent park_time = cross_report_pb.job_index.workday_index.park_time delay_time = cross_report_pb.job_index.workday_index.delay_time speed = round(cross_report_pb.job_index.workday_index.speed / 100, 2) move_speed = round(cross_report_pb.job_index.workday_index.move_speed / 100, 2) jam_index = round(cross_report_pb.job_index.workday_index.jam_index, 2) tmp_list = [] for cross_delay_info in cross_delay_info_list: if '1' in parse_wd_bitmap(cross_delay_info.tp.weekday) or '2' in parse_wd_bitmap(cross_delay_info.tp.weekday)\ or '3' in parse_wd_bitmap(cross_delay_info.tp.weekday) or '4' in parse_wd_bitmap(cross_delay_info.tp.weekday)\ or '5' in parse_wd_bitmap(cross_delay_info.tp.weekday): tmp_list.append(cross_delay_info) cross_delay_info_list = tmp_list total, EF_num = 0, 0 for cross_delay_info in cross_delay_info_list: if special_time_range != '' and not is_overlap_greater_than_one_hour(cross_delay_info.tp.start_hm, cross_delay_info.tp.end_hm, special_time_range): continue service_level = calc_service_level(cross_delay_info.delay_info.delay_time) if service_level != '-': total += 1 if service_level in('E', 'F'): EF_num += 1 EF_rate = round(EF_num / total * 100, 2) if total > 0 else 0 item_res['stop_times'] = stop_times item_res['high_park_percent'] = high_park_percent item_res['park_time'] = park_time item_res['delay_time'] = delay_time item_res['speed'] = speed item_res['move_speed'] = move_speed item_res['jam_index'] = jam_index item_res['EF_rate'] = EF_rate res[day_str] = item_res res_with_color = calc_monitor_data_trend_color(res) return res_with_color def calc_monitor_data_trend_color(days_data): tmp_index_data = {} date_list = sorted(days_data.keys(), reverse=True) for i in range(len(date_list) - 1): today_data = days_data[date_list[i]] prev_day_data = days_data[date_list[i + 1]] for key in today_data.keys(): if key not in ('stop_times', 'high_park_percent', 'park_time', 'delay_time', 'speed', 'move_speed', 'jam_index', 'EF_rate'): continue color = calc_color(key, today_data, prev_day_data) today_data[key + '_color'] = color tmp_index_data[date_list[i]] = today_data sorted_dict = {k: v for k, v in sorted(tmp_index_data.items(), key=lambda x: x[0], reverse=True)} return sorted_dict def calc_color(key, data, prev_data): color = 0 rate = 0 if (prev_data[key] in (0, '-') or data[key] == '-') else round((data[key] - prev_data[key]) / prev_data[key] * 100, 2) if key not in ('speed', 'move_speed'): if rate > 20: color = 1 elif rate < -20: color = 2 else: if rate < -20: color = 1 elif rate > 20: color = 2 return color def parse_data2pb4monitor(data_list, weekdays): res_list = [] for row in data_list: day = row['day'] tp_start = row['tp_start'] tp_end = row['tp_end'] item_cross_delay_info = pb.xl_cross_delayinfo_t() item_cross_delay_info.ParseFromString(row['data']) item_weekdays = parse_wd_bitmap(item_cross_delay_info.tp.weekday) if item_weekdays != weekdays: item_cross_delay_info = None res_list.append({ 'day': day, 'tp_start': tp_start, 'tp_end': tp_end, 'data': item_cross_delay_info }) return res_list def parse_single_cross_delay_info4monitor(crossid, nodeid, data_list, date_type, roads_dir_dict): data_dict = {} pb_data_list = [] for item in data_list: if item['data']: pb_data_list.append(item['data']) max_cross_car_num_pb = max( (x for x in pb_data_list if x is not None), key=lambda x: x.delay_info.car_num, default=None ) max_cross_car_num = max_cross_car_num_pb.delay_info.car_num if max_cross_car_num_pb else 0 for item in data_list: day = item['day'] key = day item_cross_delay_info = item['data'] item_res = { 'day': day, 'service_level': '-', 'stop_times': '-', 'high_park_percent': '-', 'park_time': '-', 'delay_time': '-', 'speed': '-', 'move_speed': '-', 'relative_flow_rate': '-', 'flow': '-', 'jam_index': '-', 'imbalance_index': '-', 'tide_index': '-', 'stop_times_color': 0, 'high_park_percent_color': 0, 'park_time_color': 0, 'delay_time_color': 0, 'speed_color': 0, 'move_speed_color': 0, 'relative_flow_rate_color': 0, 'flow_color': 0, 'jam_index_color': 0, 'imbalance_index_color': 0, 'tide_index_color': 0, 'stop_times_rate': 0, 'high_park_percent_rate': 0, 'park_time_rate': 0, 'delay_time_rate': 0, 'speed_rate': 0, 'move_speed_rate': 0, 'relative_flow_rate_rate': 0, 'flow_rate': 0, 'jam_index_rate': 0, 'imbalance_index_rate': 0, 'tide_index_rate': 0 } if not item_cross_delay_info: data_dict[key] = item_res continue relative_flow_rate = round(item_cross_delay_info.delay_info.car_num / max_cross_car_num * 100, 2) if date_type != 'day': date_list = item['week_dates'] tide_index_list = calc_tide_index(crossid, nodeid, date_list, roads_dir_dict) usable_tide_list = [item for item in tide_index_list if item != 0] tide_index = round(sum(usable_tide_list) / len(usable_tide_list), 2) if len(usable_tide_list) > 0 else 0 else: date_list = [day] tide_index = calc_tide_index(crossid, nodeid, date_list, roads_dir_dict) tide_index = tide_index[0] if len(tide_index) != 0 else '-' item_res['stop_times'] = round(item_cross_delay_info.delay_info.stop_times, 2) item_res['high_park_percent'] = item_cross_delay_info.delay_info.high_park_percent item_res['park_time'] = item_cross_delay_info.delay_info.park_time item_res['delay_time'] = item_cross_delay_info.delay_info.delay_time item_res['service_level'] = calc_service_level(item_cross_delay_info.delay_info.delay_time) item_res['speed'] = round(item_cross_delay_info.delay_info.speed / 100, 2) item_res['move_speed'] = round(item_cross_delay_info.delay_info.move_speed / 100, 2) item_res['relative_flow_rate'] = relative_flow_rate item_res['flow'] = item_cross_delay_info.delay_info.car_num item_res['jam_index'] = round(item_cross_delay_info.delay_info.jam_index, 2) item_res['imbalance_index'] = round(item_cross_delay_info.delay_info.imbalance_index, 2) item_res['tide_index'] = tide_index data_dict[key] = item_res return data_dict def calc_single_day_delay_info_change_rate4monitor(data_dict): key_list = list(data_dict.keys()) for i in range(len(key_list)-1, -1, -1): if i != 0: today_data = data_dict[key_list[i]] prev_day_data = data_dict[key_list[i - 1]] for key in today_data.keys(): if key in ('day', 'service_level', 'stop_times_color', 'high_park_percent_color', 'park_time_color', 'delay_time_color', 'speed_color', 'move_speed_color', 'relative_flow_rate_color', 'flow_color', 'jam_index_color', 'imbalance_index_color', 'tide_index_color', 'stop_times_rate', 'high_park_percent_rate', 'park_time_rate', 'delay_time_rate', 'speed_rate', 'move_speed_rate', 'relative_flow_rate_rate', 'flow_rate', 'jam_index_rate', 'imbalance_index_rate', 'tide_index_rate'): continue if today_data[key] == '-' or prev_day_data[key] == '-': continue if key not in ('speed', 'move_speed'): rate = (today_data[key] - prev_day_data[key]) / prev_day_data[key] * 100 if prev_day_data[key] > 0 else 0 today_data[key + '_rate'] = rate if rate > 20: today_data[key + '_color'] = 1 elif rate < -20: today_data[key + '_color'] = 2 else: rate = (today_data[key] - prev_day_data[key]) / prev_day_data[key] * 100 if prev_day_data[key] > 0 else 0 today_data[key + '_rate'] = rate if rate < -20: today_data[key + '_color'] = 1 elif rate > 20: today_data[key + '_color'] = 2 return data_dict # 多次停车路口 def gen_high_stop_times_problems(multi_park_crosses, routing_crosses, special_time_range, records, date_type, shield_info, filter_shield): high_park_problems, total_num = {}, 0 for item in multi_park_crosses: crossid = item.cross_delay.crossid if crossid not in routing_crosses: continue cross_name = routing_crosses[crossid]['name'] crossno = routing_crosses[crossid]['crossno'] roads_dir_dict = routing_crosses[crossid]['roads_dir_dict'] road_dir = {v['in']: k for k, v in roads_dir_dict.items()} start_hm, end_hm = item.cross_delay.tp.start_hm, item.cross_delay.tp.end_hm if special_time_range and not is_overlap_greater_than_one_hour(start_hm, end_hm, special_time_range): continue time_range = convert_time(start_hm) + '-' + convert_time(end_hm) weekdays = parse_wd_bitmap(item.cross_delay.tp.weekday) weekdays_str = gen_week_str({'weekday': parse_wd_bitmap(item.cross_delay.tp.weekday)}) tp_type = item.cross_delay.tp.type flow_num = len(item.target_flows) dir_flow_dict = {} for flow in item.target_flows: inroadid = flow.inroadid if inroadid not in road_dir: continue src_dir = road_dir[inroadid] if src_dir not in dir_flow_dict: dir_flow_dict[src_dir] = [int_turn_type2str[flow.turn_type]] else: dir_flow_dict[src_dir].append(int_turn_type2str[flow.turn_type]) problem_src_dir = '、'.join([dir_str_dict[dir_item] + '/'.join(dir_flow_dict[dir_item]) for dir_item in dir_flow_dict.keys()]) if dir_flow_dict else '-' is_new, cont_times = 0, 0 key = crossid + '^' + weekdays + '^' + time_range + '^' + date_type if key in records['normal'].keys(): cont_times = records['normal'][key]['high_park_problem'] if cont_times == 1: is_new = 1 shield_state = 0 if problem_src_dir != '-': if key in shield_info['normal'].keys() and 'high_park_problem' in shield_info['normal'][key]: if filter_shield == 1: continue else: shield_state = 1 total_num += 1 if crossid not in high_park_problems: high_park_problems[crossid] = [{ 'crossid': crossid, 'crossno': crossno, 'cross_name': cross_name, 'time_range': time_range, 'weekdays': weekdays, 'weekdays_str': weekdays_str, 'flow_num': flow_num, 'problem_src_dir': problem_src_dir, 'is_new': is_new, 'cont_times': cont_times, 'shield_state': shield_state, 'tp_type': tp_type, 'location': routing_crosses[crossid]['location'] }] else: high_park_problems[crossid].append({ 'crossid': crossid, 'crossno': crossno, 'cross_name': cross_name, 'time_range': time_range, 'weekdays': weekdays, 'weekdays_str': weekdays_str, 'flow_num': flow_num, 'problem_src_dir': problem_src_dir, 'is_new': is_new, 'cont_times': cont_times, 'shield_state': shield_state, 'tp_type': tp_type, 'location': routing_crosses[crossid]['location'] }) res = { 'name': '多次排队转向过多时段', 'problems': high_park_problems, 'total_num': total_num, 'key': 'high_park_problem' } return res # 停车较多时段 def too_many_stop_times_problems(bad_park_crosses, routing_crosses, special_time_range, records, date_type, shield_info, filter_shield): too_many_stop_times_problems, total_num = {}, 0 for item in bad_park_crosses: crossid = item.cross_delay.crossid if crossid not in routing_crosses: continue cross_name = routing_crosses[crossid]['name'] crossno = routing_crosses[crossid]['crossno'] start_hm, end_hm = item.cross_delay.tp.start_hm, item.cross_delay.tp.end_hm if special_time_range and not is_overlap_greater_than_one_hour(start_hm, end_hm, special_time_range): continue time_range = convert_time(start_hm) + '-' + convert_time(end_hm) weekdays = parse_wd_bitmap(item.cross_delay.tp.weekday) weekdays_str = gen_week_str({'weekday': parse_wd_bitmap(item.cross_delay.tp.weekday)}) tp_type = item.cross_delay.tp.type stop_times = round(item.cross_delay.delay_info.stop_times, 2) key = crossid + '^' + weekdays + '^' + time_range + '^' + date_type shield_state = 0 if key in shield_info['normal'].keys() and 'too_many_stop_times' in shield_info['normal'][key]: if filter_shield == 1: continue else: shield_state = 1 total_num += 1 is_new, cont_times = 0, 0 if key in records['normal'].keys(): cont_times = records['normal'][key]['too_many_stop_times'] if cont_times == 1: is_new = 1 if crossid not in too_many_stop_times_problems: too_many_stop_times_problems[crossid] = [{ 'crossid': crossid, 'crossno': crossno, 'cross_name': cross_name, 'time_range': time_range, 'weekdays_str': weekdays_str, 'weekdays': weekdays, 'stop_times': stop_times, 'is_new': is_new, 'cont_times': cont_times, 'shield_state': shield_state, 'tp_type': tp_type, 'location': routing_crosses[crossid]['location'] }] else: too_many_stop_times_problems[crossid].append({ 'crossid': crossid, 'crossno': crossno, 'cross_name': cross_name, 'time_range': time_range, 'weekdays_str': weekdays_str, 'weekdays': weekdays, 'stop_times': stop_times, 'is_new': is_new, 'cont_times': cont_times, 'shield_state': shield_state, 'tp_type': tp_type, 'location': routing_crosses[crossid]['location'] }) res = { 'name': '停车较多时段', 'problems': too_many_stop_times_problems, 'total_num': total_num, 'key': 'too_many_stop_times' } return res # 进口道失衡时段 def imbalance_inroad_problems(inroad_imbalance_crosses, routing_crosses, special_time_range, records, date_type, shield_info, filter_shield): imbalance_inroad_problems, total_num = {}, 0 for item in inroad_imbalance_crosses: crossid = item.cross_delay.crossid if crossid not in routing_crosses: continue cross_name = routing_crosses[crossid]['name'] crossno = routing_crosses[crossid]['crossno'] start_hm, end_hm = item.cross_delay.tp.start_hm, item.cross_delay.tp.end_hm if special_time_range and not is_overlap_greater_than_one_hour(start_hm, end_hm, special_time_range): continue time_range = convert_time(start_hm) + '-' + convert_time(end_hm) weekdays = parse_wd_bitmap(item.cross_delay.tp.weekday) weekdays_str = gen_week_str({'weekday': parse_wd_bitmap(item.cross_delay.tp.weekday)}) tp_type = item.cross_delay.tp.type imbalance_index = round(item.cross_delay.delay_info.imbalance_index, 2) roads_dir_dict = routing_crosses[crossid]['roads_dir_dict'] road_dir = {v['in']: k for k, v in roads_dir_dict.items()} dir_list = [] if len(item.target_flows) == 2: for inroad in item.target_flows: inroadid = inroad.inroadid if inroadid not in road_dir: continue dir_list.append(road_dir[inroadid]) is_new, cont_times = 0, 0 key = crossid + '^' + weekdays + '^' + time_range + '^' + date_type if key in records['normal'].keys(): cont_times = records['normal'][key]['imbalance_inroad'] if cont_times == 1: is_new = 1 if len(dir_list) == 2: shield_state = 0 if key in shield_info['normal'].keys() and 'imbalance_inroad' in shield_info['normal'][key]: if filter_shield == 1: continue else: shield_state = 1 total_num += 1 if crossid not in imbalance_inroad_problems: imbalance_inroad_problems[crossid] = [{ 'crossid': crossid, 'crossno': crossno, 'cross_name': cross_name, 'time_range': time_range, 'weekdays_str': weekdays_str, 'weekdays': weekdays, 'imbalance_index': imbalance_index, 'dir_src': dir_str_dict[dir_list[0]] + '进口 - ' + dir_str_dict[dir_list[1]] + '进口', 'is_new': is_new, 'cont_times': cont_times, 'shield_state': shield_state, 'tp_type': tp_type, 'location': routing_crosses[crossid]['location'] }] else: imbalance_inroad_problems[crossid].append({ 'crossid': crossid, 'crossno': crossno, 'cross_name': cross_name, 'time_range': time_range, 'weekdays_str': weekdays_str, 'weekdays': weekdays, 'imbalance_index': imbalance_index, 'dir_src': dir_str_dict[dir_list[0]] + '进口 - ' + dir_str_dict[dir_list[1]] + '进口', 'is_new': is_new, 'cont_times': cont_times, 'shield_state': shield_state, 'tp_type': tp_type, 'location': routing_crosses[crossid]['location'] }) res = { 'name': '进口道失衡时段', 'problems': imbalance_inroad_problems, 'total_num': total_num, 'key': 'imbalance_inroad' } return res # 转向失衡时段 def turn_imbalance_problems(turn_imbalance_crosses, routing_crosses, special_time_range, records, date_type, shield_info, filter_shield): turn_imbalance_problems, total_num = {}, 0 for item in turn_imbalance_crosses: crossid = item.cross_delay.crossid if crossid not in routing_crosses: continue cross_name = routing_crosses[crossid]['name'] crossno = routing_crosses[crossid]['crossno'] start_hm, end_hm = item.cross_delay.tp.start_hm, item.cross_delay.tp.end_hm if special_time_range and not is_overlap_greater_than_one_hour(start_hm, end_hm, special_time_range): continue time_range = convert_time(start_hm) + '-' + convert_time(end_hm) weekdays = parse_wd_bitmap(item.cross_delay.tp.weekday) weekdays_str = gen_week_str({'weekday': parse_wd_bitmap(item.cross_delay.tp.weekday)}) tp_type = item.cross_delay.tp.type road_delay_list = item.cross_delay.inroad_delay_infos road_delay_dict = {item.inroadid: item for item in road_delay_list} roads_dir_dict = routing_crosses[crossid]['roads_dir_dict'] road_dir = {v['in']: k for k, v in roads_dir_dict.items()} tmp_dict = {} inroad_imbalance_index_list = [] target_flows = item.target_flows for target_flow in target_flows: inroadid = target_flow.inroadid if inroadid not in road_dir: continue inroad_imbalance_index_list.append(str(round(road_delay_dict[inroadid].delay_info.imbalance_index, 2))) src_dir = road_dir[inroadid] if src_dir not in tmp_dict: tmp_dict[src_dir] = [int_turn_type2str[target_flow.turn_type]] else: tmp_dict[src_dir].append(int_turn_type2str[target_flow.turn_type]) detail = '、'.join([dir_str_dict[k] + '进口(' + '-'.join(v) + ')' for k, v in tmp_dict.items()]) key = crossid + '^' + weekdays + '^' + time_range + '^' + date_type shield_state = 0 if key in shield_info['normal'].keys() and 'turn_imbalance' in shield_info['normal'][key]: if filter_shield == 1: continue else: shield_state = 1 total_num += 1 is_new, cont_times = 0, 0 if key in records['normal'].keys(): cont_times = records['normal'][key]['turn_imbalance'] if cont_times == 1: is_new = 1 if crossid not in turn_imbalance_problems: turn_imbalance_problems[crossid] = [{ 'crossid': crossid, 'crossno': crossno, 'cross_name': cross_name, 'time_range': time_range, 'weekdays_str': weekdays_str, 'weekdays': weekdays, 'imbalance_index': '、'.join(inroad_imbalance_index_list), 'detail': detail, 'is_new': is_new, 'cont_times': cont_times, 'shield_state': shield_state, 'tp_type': tp_type, 'location': routing_crosses[crossid]['location'] }] else: turn_imbalance_problems[crossid].append({ 'crossid': crossid, 'crossno': crossno, 'cross_name': cross_name, 'time_range': time_range, 'weekdays_str': weekdays_str, 'weekdays': weekdays, 'imbalance_index': '、'.join(inroad_imbalance_index_list), 'detail': detail, 'is_new': is_new, 'cont_times': cont_times, 'shield_state': shield_state, 'tp_type': tp_type, 'location': routing_crosses[crossid]['location'] }) res = { 'name': '转向失衡时段', 'problems': turn_imbalance_problems, 'total_num': total_num, 'key': 'turn_imbalance' } return res # 潮汐路口 def cross_tide_problems(cross_tide_crosses, routing_crosses, date_type, records, shield_info, filter_shield): cross_tide_problems, total_num = {}, 0 for item in cross_tide_crosses: crossid = item.crossid if crossid not in routing_crosses: continue if not item.is_tide: continue cross_name = routing_crosses[crossid]['name'] crossno = routing_crosses[crossid]['crossno'] daynum = item.daynum if date_type in ('weekend', 'workday'): if date_type == 'weekend' and datetime.strptime(str(daynum), "%Y%m%d").weekday() + 1 not in (6, 7): continue if date_type == 'workday' and datetime.strptime(str(daynum), "%Y%m%d").weekday() + 1 in (6, 7): continue shield_state = 0 if crossid in shield_info['tide'].keys() and 'tide_cross' in shield_info['tide'][crossid]: if filter_shield == 1: continue else: shield_state = 1 roads_dir_dict = routing_crosses[crossid]['roads_dir_dict'] road_dir = {v['in']: k for k, v in roads_dir_dict.items()} total_num += 1 is_new, cont_times = 0, 0 if crossid in records['tide'].keys(): cont_times = records['tide'][crossid] if cont_times == 1: is_new = 1 if crossid not in cross_tide_problems: cross_tide_problems[crossid] = [{ 'crossid': crossid, 'crossno': crossno, 'cross_name': cross_name, 'date': daynum, 'tide_index': round(item.tide_index, 2), 'src_dir': dir_str_dict[road_dir[item.detail.inroadid_pair1[0]]] + '进口 - ' + dir_str_dict[road_dir[item.detail.inroadid_pair1[1]]] + '进口', 'is_new': is_new, 'cont_times': cont_times, 'shield_state': shield_state, 'location': routing_crosses[crossid]['location'] }] else: cross_tide_problems[crossid].append({ 'crossid': crossid, 'crossno': crossno, 'cross_name': cross_name, 'date': daynum, 'tide_index': round(item.tide_index, 2), 'src_dir': dir_str_dict[road_dir[item.detail.inroadid_pair1[0]]] + '进口 - ' + dir_str_dict[road_dir[item.detail.inroadid_pair1[1]]] + '进口', 'is_new': is_new, 'cont_times': cont_times, 'shield_state': shield_state, 'location': routing_crosses[crossid]['location'] }) res = { 'name': '潮汐路口', 'problems': cross_tide_problems, 'total_num': total_num, 'key': 'tide_cross' } return res # 车道资源不匹配 def inroad_turnlane_mismatch_problems(inroad_turnlane_mismatchs, routing_crosses, special_time_range, date_type, records, shield_info, filter_shield): inroad_turnlane_mismatch_problems, total_num = {}, 0 for item in inroad_turnlane_mismatchs: crossid = item.crossid if crossid not in routing_crosses: continue cross_name = routing_crosses[crossid]['name'] crossno = routing_crosses[crossid]['crossno'] start_hm, end_hm = item.tp.start_hm, item.tp.end_hm if special_time_range and not is_overlap_greater_than_one_hour(start_hm, end_hm, special_time_range): continue time_range = convert_time(start_hm) + '-' + convert_time(end_hm) weekdays = parse_wd_bitmap(item.tp.weekday) weekdays_str = gen_week_str({'weekday': parse_wd_bitmap(item.tp.weekday)}) tp_type = item.tp.type roads_dir_dict = routing_crosses[crossid]['roads_dir_dict'] road_dir = {v['in']: k for k, v in roads_dir_dict.items()} roadid = item.inroadid if roadid not in road_dir.keys(): continue if item.flow_percent0 == 0 and item.flow_percent1 == 0: continue flow_rate_item = str(item.flow_percent0) + '%' if item.flow_percent0 > item.flow_percent1 else str(item.flow_percent1) + '%' src_info = srcDir_toStr(road_dir[roadid]) + '直行' if item.flow_percent0 > item.flow_percent1 else srcDir_toStr(road_dir[roadid]) + '左转' key = crossid + '^' + time_range + '^' + weekdays record_key = crossid + '^' + weekdays + '^' + time_range + '^' + date_type shield_state = 0 if record_key in shield_info['normal'].keys() and 'inroad_turnlane_mismatch' in shield_info['normal'][record_key]: if filter_shield == 1: continue else: shield_state = 1 is_new, cont_times = 0, 0 if record_key in records['normal'].keys(): cont_times = records['normal'][record_key]['inroad_turnlane_mismatch'] if cont_times == 1: is_new = 1 if crossid not in inroad_turnlane_mismatch_problems.keys(): inroad_turnlane_mismatch_problems[crossid] = {} if key not in inroad_turnlane_mismatch_problems[crossid].keys(): inroad_turnlane_mismatch_problems[crossid][key] = { 'crossid': crossid, 'crossno': crossno, 'cross_name': cross_name, 'time_range': time_range, 'weekdays_str': weekdays_str, 'weekdays': weekdays, 'flow_rate': [flow_rate_item], 'src_info': [src_info], 'is_new': is_new, 'cont_times': cont_times, 'shield_state': shield_state, 'tp_type': tp_type, 'location': routing_crosses[crossid]['location'] } else: inroad_turnlane_mismatch_problems[crossid][key]['flow_rate'].append(flow_rate_item) inroad_turnlane_mismatch_problems[crossid][key]['src_info'].append(src_info) else: if key not in inroad_turnlane_mismatch_problems[crossid].keys(): inroad_turnlane_mismatch_problems[crossid][key] = { 'crossid': crossid, 'crossno': crossno, 'cross_name': cross_name, 'time_range': time_range, 'weekdays_str': weekdays_str, 'weekdays': weekdays, 'flow_rate': [flow_rate_item], 'src_info': [src_info], 'is_new': is_new, 'cont_times': cont_times, 'shield_state': shield_state, 'tp_type': tp_type, 'location': routing_crosses[crossid]['location'] } else: inroad_turnlane_mismatch_problems[crossid][key]['flow_rate'].append(flow_rate_item) inroad_turnlane_mismatch_problems[crossid][key]['src_info'].append(src_info) parsed_res = {} for crossid in inroad_turnlane_mismatch_problems.keys(): if crossid not in parsed_res: parsed_res[crossid] = [] for item in inroad_turnlane_mismatch_problems[crossid].values(): item['flow_rate'] = '、'.join(item['flow_rate']) item['src_info'] = '、'.join(item['src_info']) parsed_res[crossid].append(item) total_num += 1 else: for item in inroad_turnlane_mismatch_problems[crossid].values(): item['flow_rate'] = '、'.join(item['flow_rate']) item['src_info'] = '、'.join(item['src_info']) parsed_res[crossid].append(item) total_num += 1 res = { 'name': '车道资源不匹配', 'key': 'inroad_turnlane_mismatch', 'problems': parsed_res, 'total_num': total_num } return res # 进出口车道不匹配 def unmatched_lane_num_problems(routing_crosses, shield_info, filter_shield): cross_lane_num_problems = {} for crossid in routing_crosses.keys(): cross_name = routing_crosses[crossid]['name'] crossno = routing_crosses[crossid]['crossno'] cross_ledger_info = routing_crosses[crossid]['cross_ledger_info'] road_infos = cross_ledger_info['roads'] err_src_dict = [] for src_dir in road_infos.keys(): entry_lane_num = road_infos[src_dir]['entry_lane_num'] exit_lane_num = '-' if src_reverse[src_dir] in road_infos.keys(): exit_lane_num = road_infos[src_reverse[src_dir]]['exit_lane_num'] if exit_lane_num != '-' and entry_lane_num != '-' and entry_lane_num - exit_lane_num > 2: err_src_dict.append(srcDir_toStr(src_dir) + '进口 - ' + srcDir_toStr(src_reverse[src_dir]) + '出口') key = crossid shield_state = 0 if key in shield_info['tide'].keys() and 'unmatched_lane_num' in shield_info['tide'][key]: if filter_shield == 1: continue else: shield_state = 1 if len(err_src_dict) > 0: cross_lane_num_problems[crossid] = [ { 'crossid': crossid, 'cross_name': cross_name, 'crossno': crossno, 'bad_num': len(err_src_dict), 'src_dir': '、'.join(err_src_dict), 'shield_state': shield_state, 'location': routing_crosses[crossid]['location'] } ] res = { 'name': '进出口车道不匹配', 'key': 'unmatched_lane_num', 'problems': cross_lane_num_problems, 'total_num': len(cross_lane_num_problems.keys()) } return res # 配时方案异常时段 def phase_tp_check_problems(routing_crosses, special_time_range, first_date, date_type, shield_info, filter_shield): phase_tp_problems, total_num = {}, 0 crossid_list = list(routing_crosses.keys()) color_dict = { 1: '红色', 2: '橙色', 3: '黄色', 4: '绿色' } state_dict = { 1: '需核查逾期', 2: '需核查', 3: '监测中', 4: '核查有异常', 5: '核查无异常', 6: '自动结束' } min_date = first_date if date_type != 'day': min_date = (datetime.strptime(first_date, "%Y%m%d") - timedelta(days=7)).strftime("%Y%m%d") cross_examine_records = db_cross.query_crosses_examine_records(crossid_list, first_date, min_date) for row in cross_examine_records: start_hm = row['start_hm'] end_hm = row['end_hm'] if special_time_range and not is_overlap_greater_than_one_hour(start_hm, end_hm, special_time_range): continue if row['level_color'] == 4: continue total_num += 1 cont_times, is_new = row['cont_times'], 0 if cont_times == 1: is_new = 1 crossid = row['crossid'] if crossid not in routing_crosses: continue first_date = row['first_date'] cross_name = routing_crosses[crossid]['name'] cross_no = routing_crosses[crossid]['crossno'] time_range = convert_time(start_hm) + '-' + convert_time(end_hm) key = crossid + '^' + time_range shield_state = 0 if key in shield_info['phase']: if filter_shield == 1: continue else: shield_state = 1 final_state = state_dict[row['final_state']] level_color = color_dict[row['level_color']] if crossid not in phase_tp_problems: phase_tp_problems[crossid] = [ { 'crossid': crossid, 'cross_name': cross_name, 'cross_no': cross_no, 'time_range': time_range, 'cont_times': cont_times, 'first_date': first_date, 'final_state': final_state, 'final_state_int': row['final_state'], 'level_color': level_color, 'level_color_int': row['level_color'], 'is_new': is_new, 'shield_state': shield_state, 'location': routing_crosses[crossid]['location'] } ] else: phase_tp_problems[crossid].append( { 'crossid': crossid, 'cross_name': cross_name, 'cross_no': cross_no, 'time_range': time_range, 'cont_times': cont_times, 'first_date': first_date, 'final_state': final_state, 'final_state_int': row['final_state'], 'level_color': level_color, 'level_color_int': row['level_color'], 'is_new': is_new, 'shield_state': shield_state, 'location': routing_crosses[crossid]['location'] } ) res = { 'name': '配时方案异常时段', 'key': 'phase_error', 'problems': phase_tp_problems, 'total_num': total_num } return res # 配时方案相关异常 def monitor_phase_problems(nodeid, area_id, date_type, query_date, special_time_range, shield_info, routing_crosses, filter_shield): weekdays = str(datetime.strptime(query_date, "%Y%m%d").weekday() + 1) if date_type == 'week': weekdays = '1,2,3,4,5,6,7' elif date_type == 'workday': weekdays = '1,2,3,4,5' elif date_type == 'weekend': weekdays = '6,7' phase_problems = [] res, e = GetCrossPhaseDiagnosis(int(nodeid), int(area_id), weekdays, special_time_range) if not e and len(res) > 0: items = res for item in items: item_res = { 'name': item.name, 'key': item.key, 'problems': {}, 'total_num': item.total_num } problems = item.problems for problem in problems: crossid = problem.crossid if crossid not in routing_crosses: continue weekdays = problem.schedule_week time_range = problem.tp if item.key in ('sample_sch', 'sample_tp'): key = crossid + '^' + weekdays if key in shield_info['sample'].keys() and item.key in shield_info['sample'][key]: if filter_shield == 1: item_res['total_num'] -= 1 continue else: problem.shield_state = 1 else: key = crossid + '^' + weekdays + '^' + time_range + '^' + date_type if key in shield_info['normal'].keys() and item.key in shield_info['normal'][key]: if filter_shield == 1: item_res['total_num'] -= 1 continue else: problem.shield_state = 1 if item.key == 'sample_sch': item_res['problems'][crossid] = [ { 'crossid': crossid, 'cross_name': routing_crosses[crossid]['name'], 'cross_no': routing_crosses[crossid]['crossno'], 'weekdays': weekdays, 'schedule_name': problem.schedule_name + '-' + str(problem.scheduleid), 'shield_state': problem.shield_state, 'location': routing_crosses[crossid]['location'] } ] elif item.key == 'sample_tp': if crossid not in item_res['problems'].keys(): item_res['problems'][crossid] = [ { 'crossid': crossid, 'cross_name': routing_crosses[crossid]['name'], 'cross_no': routing_crosses[crossid]['crossno'], 'cross_model': problem.shaped_mode, 'weekdays_str': problem.schedule_name, 'scheduleid': problem.scheduleid, 'tp_num': problem.tp_num, 'shield_state': problem.shield_state, 'location': routing_crosses[crossid]['location'] } ] else: item_res['problems'][crossid].append( { 'crossid': crossid, 'cross_name': routing_crosses[crossid]['name'], 'cross_no': routing_crosses[crossid]['crossno'], 'cross_model': problem.shaped_mode, 'weekdays_str': problem.schedule_name, 'scheduleid': problem.scheduleid, 'tp_num': problem.tp_num, 'shield_state': problem.shield_state, 'location': routing_crosses[crossid]['location'] } ) elif item.key in ('short_tp', 'insufficient_yellow', 'unreasonable_red'): if crossid not in item_res['problems'].keys(): item_res['problems'][crossid] = [ { 'crossid': crossid, 'cross_name': routing_crosses[crossid]['name'], 'cross_no': routing_crosses[crossid]['crossno'], 'weekdays': weekdays, 'weekdays_str': problem.schedule_name, 'time_range': time_range, 'plan_info': str(problem.planid) + '-' + problem.plan_name, 'shield_state': problem.shield_state, 'location': routing_crosses[crossid]['location'] } ] else: item_res['problems'][crossid].append( { 'crossid': crossid, 'cross_name': routing_crosses[crossid]['name'], 'cross_no': routing_crosses[crossid]['crossno'], 'weekdays': weekdays, 'time_range': time_range, 'weekdays_str': problem.schedule_name, 'plan_info': str(problem.planid) + '-' + problem.plan_name, 'shield_state': problem.shield_state, 'location': routing_crosses[crossid]['location'] } ) elif item.key in ('big_cycle', 'small_cycle'): if crossid not in item_res['problems'].keys(): item_res['problems'][crossid] = [ { 'crossid': crossid, 'cross_name': routing_crosses[crossid]['name'], 'cross_no': routing_crosses[crossid]['crossno'], 'weekdays_str': problem.schedule_name, 'weekdays': weekdays, 'time_range': time_range, 'cross_model': problem.shaped_mode, 'plan_info': str(problem.planid) + '-' + problem.plan_name, 'stage_num': problem.stage_num, 'cycle_time': problem.cycle, 'shield_state': problem.shield_state, 'location': routing_crosses[crossid]['location'] } ] else: item_res['problems'][crossid].append( { 'crossid': crossid, 'cross_name': routing_crosses[crossid]['name'], 'cross_no': routing_crosses[crossid]['crossno'], 'weekdays_str': problem.schedule_name, 'weekdays': weekdays, 'time_range': time_range, 'cross_model': problem.shaped_mode, 'plan_info': str(problem.planid) + '-' + problem.plan_name, 'stage_num': problem.stage_num, 'cycle_time': problem.cycle, 'shield_state': problem.shield_state, 'location': routing_crosses[crossid]['location'] } ) elif item.key == 'insufficient_ped_time': if crossid not in item_res['problems'].keys(): item_res['problems'][crossid] = [ { 'crossid': crossid, 'cross_name': routing_crosses[crossid]['name'], 'cross_no': routing_crosses[crossid]['crossno'], 'weekdays_str': problem.schedule_name, 'weekdays': weekdays, 'time_range': time_range, 'plan_info': str(problem.planid) + '-' + problem.plan_name, 'ped_stage': problem.crosswalk_phase, 'ped_time': problem.crosswalk_green, 'shield_state': problem.shield_state, 'location': routing_crosses[crossid]['location'] } ] else: item_res['problems'][crossid].append( { 'crossid': crossid, 'cross_name': routing_crosses[crossid]['name'], 'cross_no': routing_crosses[crossid]['crossno'], 'weekdays_str': problem.schedule_name, 'weekdays': weekdays, 'time_range': time_range, 'plan_info': str(problem.planid) + '-' + problem.plan_name, 'ped_stage': problem.crosswalk_phase, 'ped_time': problem.crosswalk_green, 'shield_state': problem.shield_state, 'location': routing_crosses[crossid]['location'] } ) phase_problems.append(item_res) else: logging.error('phase check res is None', e) return phase_problems def gen_monitor_problem_info(cross_report_pb, routing_crosses, special_time_range, records, date_type, shield_info, query_date, nodeid, area_id, filter_shield): if not cross_report_pb: return {} high_park_problems = gen_high_stop_times_problems(cross_report_pb.multi_park_crosses, routing_crosses, special_time_range, records, date_type, shield_info, filter_shield) too_many_stop_times = too_many_stop_times_problems(cross_report_pb.bad_park_crosses, routing_crosses, special_time_range, records, date_type, shield_info, filter_shield) imbalance_inroad = imbalance_inroad_problems(cross_report_pb.inroad_imbalance_crosses, routing_crosses, special_time_range, records, date_type, shield_info, filter_shield) turn_imbalance = turn_imbalance_problems(cross_report_pb.turn_imbalance_crosses, routing_crosses, special_time_range, records, date_type, shield_info, filter_shield) tide_cross = cross_tide_problems(cross_report_pb.tide_crosses, routing_crosses, date_type, records, shield_info, filter_shield) inroad_turnlane_mismatch = inroad_turnlane_mismatch_problems(cross_report_pb.inroad_turnlane_mismatchs, routing_crosses, special_time_range, date_type, records, shield_info, filter_shield) unmatch_lane_num = unmatched_lane_num_problems(routing_crosses, shield_info, filter_shield) phase_tp = phase_tp_check_problems(routing_crosses, special_time_range, query_date, date_type, shield_info, filter_shield) monitor_phase = monitor_phase_problems(nodeid, area_id, date_type, query_date, special_time_range, shield_info, routing_crosses, filter_shield) cross_problem_detail = gen_cross_problem_detail(routing_crosses, high_park_problems, too_many_stop_times, imbalance_inroad, turn_imbalance, tide_cross, phase_tp, monitor_phase, inroad_turnlane_mismatch, unmatch_lane_num) res = { 'items': [ { 'name': '运行效率', 'num': high_park_problems['total_num'] + too_many_stop_times['total_num'], 'child_items': [ high_park_problems, too_many_stop_times ] }, { 'name': '均衡调控', 'num': imbalance_inroad['total_num'] + turn_imbalance['total_num'] + tide_cross['total_num'], 'child_items': [ imbalance_inroad, turn_imbalance, tide_cross ] }, { 'name': '路口渠化', 'num': inroad_turnlane_mismatch['total_num'] + unmatch_lane_num['total_num'], 'child_items': [ inroad_turnlane_mismatch, unmatch_lane_num ] }, { 'name': '配时方案', 'num': phase_tp['total_num'] + sum([item['total_num'] for item in monitor_phase]), 'child_items': [phase_tp] + monitor_phase } ], 'cross_problem_detail': cross_problem_detail } return res def gen_cross_problem_detail(routing_crosses, high_park_problems, too_many_stop_times, imbalance_inroad, turn_imbalance, tide_cross, phase_tp, monitor_phase, inroad_turnlane_mismatch, unmatch_lane_num_problem): cross_problem_detail = [] for crossid in routing_crosses: crossno = routing_crosses[crossid]['crossno'] cross_name = routing_crosses[crossid]['name'] company = routing_crosses[crossid]['company'] location = routing_crosses[crossid]['location'] high_park, stop_times_problem, inroad_problem, turn_problem, tide, phase, phase_color, phase_final_state = [], [], [], [], [], [], 9, 9 inroad_turnlane, unmatched_lane_num, sample_sch, sample_tp, short_tp, insufficient_yellow, unreasonable_red, big_cycle, small_cycle, insufficient_ped_time = [], [], [], [], [], [], [], [], [], [] if crossid in high_park_problems['problems'].keys(): item_high_parks = high_park_problems['problems'][crossid] item_high_park_list = [] for item in item_high_parks: item_desc = '【' + item['weekdays_str'] + ' ' + item['time_range'] + '】' + item['problem_src_dir'] item_high_park_list.append(item_desc) high_park = item_high_park_list if crossid in too_many_stop_times['problems'].keys(): item_stop_times = too_many_stop_times['problems'][crossid] item_stop_times_list = [] for item in item_stop_times: item_desc = '【' + item['weekdays_str'] + ' ' + item['time_range'] + '】停车' + str(item['stop_times']) + '次' item_stop_times_list.append(item_desc) stop_times_problem = item_stop_times_list if crossid in imbalance_inroad['problems'].keys(): item_imbalance_inroads = imbalance_inroad['problems'][crossid] item_imbalance_inroad_list = [] for item in item_imbalance_inroads: item_desc = '【' + item['weekdays_str'] + ' ' + item['time_range'] + '】' + item['dir_src'] item_imbalance_inroad_list.append(item_desc) inroad_problem = item_imbalance_inroad_list if crossid in turn_imbalance['problems'].keys(): item_turn_imbalance = turn_imbalance['problems'][crossid] item_turn_imbalance_list = [] for item in item_turn_imbalance: item_desc = '【' + item['weekdays_str'] + ' ' + item['time_range'] + '】' + item['detail'] item_turn_imbalance_list.append(item_desc) turn_problem = item_turn_imbalance_list if crossid in tide_cross['problems'].keys(): tide_problems = tide_cross['problems'][crossid] item_tide_cross_list = [] for item in tide_problems: item_desc = '【' + str(item['date']) + '】' + item['src_dir'] item_tide_cross_list.append(item_desc) tide = item_tide_cross_list if crossid in phase_tp['problems'].keys(): phase_infos = phase_tp['problems'][crossid] phase_list = [] for item in phase_infos: if item['level_color_int'] < phase_color: phase_color = item['level_color_int'] if item['final_state_int'] < phase_final_state: phase_final_state = item['final_state_int'] item_desc = '【' + item['first_date'] + ' ' + item['time_range'] + '】' + item['level_color'] + '异常' phase_list.append({ 'desc': item_desc, 'level_color': item['level_color'], 'level_color_int': item['level_color_int'], 'final_state': item['final_state'], 'final_state_int': item['final_state_int'], 'first_date': item['first_date'], 'time_range': item['time_range'] }) phase = phase_list for item_problem in monitor_phase: if item_problem['key'] == 'sample_sch': sample_sch_problems = item_problem['problems'] if crossid in sample_sch_problems.keys(): sample_sch = [sample_sch_problems[crossid][0]['schedule_name']] if item_problem['key'] == 'sample_tp': sample_tp_problems = item_problem['problems'] if crossid in sample_tp_problems.keys(): cross_sample_tp = sample_tp_problems[crossid] cross_sample_tp_list = [] for item in cross_sample_tp: item_desc = '【' + item['weekdays_str'] + '】' + str(item['scheduleid']) cross_sample_tp_list.append(item_desc) sample_tp = cross_sample_tp_list if item_problem['key'] == 'short_tp': short_tp_problems = item_problem['problems'] if crossid in short_tp_problems.keys(): cross_short_tp = short_tp_problems[crossid] cross_short_tp_list = [] for item in cross_short_tp: item_desc = '【' + item['weekdays_str'] + '】' + item['plan_info'] cross_short_tp_list.append(item_desc) short_tp = cross_short_tp_list if item_problem['key'] == 'insufficient_yellow': insufficient_yellow_problems = item_problem['problems'] if crossid in insufficient_yellow_problems.keys(): cross_insufficient_yellow = insufficient_yellow_problems[crossid] cross_insufficient_yellow_list = [] for item in cross_insufficient_yellow: item_desc = '【' + item['weekdays_str'] + '】' + item['plan_info'] cross_insufficient_yellow_list.append(item_desc) insufficient_yellow = cross_insufficient_yellow_list if item_problem['key'] == 'unreasonable_red': unreasonable_red_problems = item_problem['problems'] if crossid in unreasonable_red_problems.keys(): cross_unreasonable_red = unreasonable_red_problems[crossid] cross_unreasonable_red_list = [] for item in cross_unreasonable_red: item_desc = '【' + item['weekdays_str'] + '】' + item['plan_info'] cross_unreasonable_red_list.append(item_desc) unreasonable_red = cross_unreasonable_red_list if item_problem['key'] == 'big_cycle': big_cycle_problems = item_problem['problems'] if crossid in big_cycle_problems.keys(): cross_big_cycle = big_cycle_problems[crossid] cross_big_cycle_list = [] for item in cross_big_cycle: item_desc = '【' + item['weekdays_str'] + '】' + item['plan_info'] cross_big_cycle_list.append(item_desc) big_cycle = cross_big_cycle_list if item_problem['key'] == 'small_cycle': small_cycle_problems = item_problem['problems'] if crossid in small_cycle_problems.keys(): cross_small_cycle = small_cycle_problems[crossid] cross_small_cycle_list = [] for item in cross_small_cycle: item_desc = '【' + item['weekdays_str'] + '】' + item['plan_info'] cross_small_cycle_list.append(item_desc) small_cycle = cross_small_cycle_list if item_problem['key'] == 'insufficient_ped_time': insufficient_ped_time_problems = item_problem['problems'] if crossid in insufficient_ped_time_problems.keys(): cross_insufficient_ped_time = insufficient_ped_time_problems[crossid] cross_insufficient_ped_time_list = [] for item in cross_insufficient_ped_time: item_desc = '【' + item['weekdays_str'] + '】' + item['plan_info'] cross_insufficient_ped_time_list.append(item_desc) insufficient_ped_time = cross_insufficient_ped_time_list if crossid in inroad_turnlane_mismatch.keys(): inroad_turnlane_problems = inroad_turnlane_mismatch[crossid] inroad_turnlane_mismatch_list = [] for item in inroad_turnlane_problems: inroad_turnlane = '【' + item['weekdays_str'] + ' ' + item['time_range'] + '】' + item['src_info'] inroad_turnlane_mismatch_list.append(inroad_turnlane) inroad_turnlane = inroad_turnlane_mismatch_list if crossid in unmatch_lane_num_problem.keys(): cross_unmatch_problem = unmatch_lane_num_problem[crossid] unmatched_lane_num = [cross_unmatch_problem[0]['src_dir']] cross_problem_detail.append({ 'crossid': crossid, 'crossno': crossno, 'cross_name': cross_name, 'location': location, 'company': company, 'high_park_problem': high_park, 'too_many_stop_times': stop_times_problem, 'imbalance_inroad': inroad_problem, 'turn_imbalance': turn_problem, 'tide_cross': tide, 'phase_error': phase, 'phase_color': phase_color if phase_color != 9 else 0, 'phase_final_state': phase_final_state if phase_final_state != 9 else 0, 'inroad_turnlane_mismatch': inroad_turnlane, 'unmatched_lane_num': unmatched_lane_num, 'sample_sch': sample_sch, 'sample_tp': sample_tp, 'short_tp': short_tp, 'insufficient_yellow': insufficient_yellow, 'unreasonable_red': unreasonable_red, 'big_cycle': big_cycle, 'small_cycle': small_cycle, 'insufficient_ped_time': insufficient_ped_time }) return cross_problem_detail