# -*- coding: utf-8 -*- # @Author: Owl # @Date: 2025/10/10 19:51 # @Description: 路口评价相关的公共函数 import copy import io import json import logging import requests from flask import send_file from google.protobuf.json_format import MessageToJson import random from openpyxl import Workbook from openpyxl.styles import Alignment import proto.xlcomm_pb2 as pb from app.common_worker import * from proto.phase_grpc import QueryCrossRunningPhase, QueryCrossPhaseDiagnosis src_reverse = {'N': 'S', 'S': 'N', 'W': 'E', 'E': 'W', 'NE': 'SW', 'SW': 'NE', 'SE': 'NW', 'NW': 'SE'} dir_str_dict = { 'E': '东', 'S': '南', 'W': '西', 'N': '北', 'NE': '东北', 'SE': '东南', 'SW': '西南', 'NW': '西北' } int_turn_type2str = { 0: '直行', 1: '左转', 2: '右转', 3: '掉头' } int_internet_code2str = { 0: '无', 1: '联网', 2: '脱机', 3: '未接入' } int_cross_model2str = { '1': '快', '2': '主', '3': '次', '4': '支' } def gen_avg_cross_delay_pb(cross_delay_data_list): """ 路口评价指标数据聚合 :param cross_delay_data_list: :return: """ cross_delay_pb_list = [] for item in cross_delay_data_list: item_delay_pb = pb.xl_cross_delayinfo_t() item_delay_pb.ParseFromString(item['data']) cross_delay_pb_list.append(item_delay_pb) avg_cross_delay = None if len(cross_delay_pb_list) == 0: return avg_cross_delay elif len(cross_delay_pb_list) == 1: avg_cross_delay = cross_delay_pb_list[0] inroad_delay_pb_list = avg_cross_delay.inroad_delay_infos inroad_delay_infos_with_imbalance = calc_inroad_imbalance_index(inroad_delay_pb_list) del avg_cross_delay.inroad_delay_infos[:] avg_cross_delay.inroad_delay_infos.extend(inroad_delay_infos_with_imbalance) return avg_cross_delay avg_cross_delay = pb.xl_cross_delayinfo_t() avg_cross_delay.crossid = cross_delay_pb_list[0].crossid avg_cross_delay.tp.start_hm = cross_delay_pb_list[0].tp.start_hm avg_cross_delay.tp.end_hm = cross_delay_pb_list[0].tp.end_hm avg_cross_delay.tp.weekday = cross_delay_pb_list[0].tp.weekday avg_cross_delay.tp.type = cross_delay_pb_list[0].tp.type avg_cross_delay.daynum = 0 sum_car_num, delay_time, stop_times, queue_len, speed, jam_index, park_time, std_flow, high_park_percent, truck_percent, park_percent, move_speed, imbalance_index\ = 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 # 路口延误数据指标聚合逻辑 for item in cross_delay_pb_list: item_delay_info = item.delay_info item_car_num = item_delay_info.car_num if item_car_num != 0: sum_car_num += item_car_num delay_time += item_delay_info.delay_time * item_car_num stop_times += item_delay_info.stop_times * item_car_num queue_len += item_delay_info.queue_len * item_car_num speed += item_delay_info.speed * item_car_num jam_index += item_delay_info.jam_index * item_car_num park_time += item_delay_info.park_time * item_car_num high_park_percent += item_delay_info.high_park_percent * item_car_num truck_percent += item_delay_info.truck_percent * item_car_num park_percent += item_delay_info.park_percent * item_car_num move_speed += item_delay_info.move_speed * item_car_num # imbalance_index += item_delay_info.imbalance_index * item_car_num std_flow += item_delay_info.std_flow avg_cross_delay.delay_info.car_num = sum_car_num avg_cross_delay.delay_info.delay_time = int(delay_time / sum_car_num) avg_cross_delay.delay_info.stop_times = stop_times / sum_car_num avg_cross_delay.delay_info.queue_len = int(queue_len / sum_car_num) avg_cross_delay.delay_info.speed = int(speed / sum_car_num) avg_cross_delay.delay_info.jam_index = jam_index / sum_car_num avg_cross_delay.delay_info.park_time = int(park_time / sum_car_num) avg_cross_delay.delay_info.high_park_percent = int(high_park_percent / sum_car_num) avg_cross_delay.delay_info.truck_percent = int(truck_percent / sum_car_num) avg_cross_delay.delay_info.park_percent = int(park_percent / sum_car_num) avg_cross_delay.delay_info.move_speed = int(move_speed / sum_car_num) # avg_cross_delay.delay_info.imbalance_index = imbalance_index / sum_car_num avg_cross_delay.delay_info.std_flow = int(std_flow / len(cross_delay_pb_list)) # 进口道延误数据聚合处理逻辑 inroad_delay_info_dict = {} for item in cross_delay_pb_list: inroad_delay_infos = item.inroad_delay_infos for inroad_item in inroad_delay_infos: inroadid = inroad_item.inroadid if inroadid not in inroad_delay_info_dict: inroad_delay_info_dict[inroadid] = [inroad_item] else: inroad_delay_info_dict[inroadid].append(inroad_item) flow_delay_list, inroad_delay_pb_list = [], [] flow_delay_info_dict = {} max_stop_times, min_stop_times = 0, 999999999 for inroadid in inroad_delay_info_dict.keys(): road_delay_infos = inroad_delay_info_dict[inroadid] inroad_sum_car_num, inroad_delay_time, inroad_stop_times, inroad_queue_len, inroad_speed, inroad_jam_index, inroad_park_time, inroad_high_park_percent, inroad_truck_percent, inroad_park_percent, inroad_move_speed, inroad_imbalance_index, inroad_std_flow, inroad_travel_time\ = 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ffs, capacity = road_delay_infos[0].delay_info.ffs, road_delay_infos[0].delay_info.capacity turn_ratio_0, turn_ratio_1, turn_ratio_2, turn_ratio_3 = 0, 0, 0, 0 for road_delay_info in road_delay_infos: inroad_sum_car_num += road_delay_info.delay_info.car_num inroad_delay_time += road_delay_info.delay_info.delay_time * road_delay_info.delay_info.car_num inroad_stop_times += road_delay_info.delay_info.stop_times * road_delay_info.delay_info.car_num inroad_queue_len += road_delay_info.delay_info.queue_len * road_delay_info.delay_info.car_num inroad_speed += road_delay_info.delay_info.speed * road_delay_info.delay_info.car_num inroad_jam_index += road_delay_info.delay_info.jam_index * road_delay_info.delay_info.car_num inroad_park_time += road_delay_info.delay_info.park_time * road_delay_info.delay_info.car_num inroad_high_park_percent += road_delay_info.delay_info.high_park_percent * road_delay_info.delay_info.car_num inroad_truck_percent += road_delay_info.delay_info.truck_percent * road_delay_info.delay_info.car_num inroad_park_percent += road_delay_info.delay_info.park_percent * road_delay_info.delay_info.car_num inroad_move_speed += road_delay_info.delay_info.move_speed * road_delay_info.delay_info.car_num inroad_travel_time += road_delay_info.delay_info.travel_time * road_delay_info.delay_info.car_num # inroad_imbalance_index += delay_info.imbalance_index * delay_info.car_num inroad_std_flow += road_delay_info.delay_info.std_flow turn_ratio_0 += road_delay_info.delay_info.turn_ratio_0 turn_ratio_1 += road_delay_info.delay_info.turn_ratio_1 turn_ratio_2 += road_delay_info.delay_info.turn_ratio_2 turn_ratio_3 += road_delay_info.delay_info.turn_ratio_3 # 流向级别数据指标聚合处理逻辑 flow_delay_infos = road_delay_info.flow_delay_infos flow_delay_list.extend(flow_delay_infos) inroad_delay_info = pb.xl_inroad_delayinfo_t() inroad_delay_info.inroadid = inroadid inroad_delay_info.delay_info.ffs = ffs inroad_delay_info.delay_info.capacity = capacity inroad_delay_info.delay_info.car_num = inroad_sum_car_num inroad_delay_info.delay_info.delay_time = int(inroad_delay_time / inroad_sum_car_num) inroad_delay_info.delay_info.stop_times = inroad_stop_times / inroad_sum_car_num if inroad_delay_info.delay_info.stop_times > max_stop_times: max_stop_times = inroad_delay_info.delay_info.stop_times if inroad_delay_info.delay_info.stop_times < min_stop_times: min_stop_times = inroad_delay_info.delay_info.stop_times inroad_delay_info.delay_info.queue_len = int(inroad_queue_len / inroad_sum_car_num) inroad_delay_info.delay_info.speed = int(inroad_speed / inroad_sum_car_num) inroad_delay_info.delay_info.jam_index = inroad_jam_index / inroad_sum_car_num inroad_delay_info.delay_info.park_time = int(inroad_park_time / inroad_sum_car_num) inroad_delay_info.delay_info.high_park_percent = int(inroad_high_park_percent / inroad_sum_car_num) inroad_delay_info.delay_info.truck_percent = int(inroad_truck_percent / inroad_sum_car_num) inroad_delay_info.delay_info.park_percent = int(inroad_park_percent / inroad_sum_car_num) inroad_delay_info.delay_info.move_speed = int(inroad_move_speed / inroad_sum_car_num) inroad_delay_info.delay_info.travel_time = int(inroad_travel_time / inroad_sum_car_num) # inroad_delay_info.delay_info.imbalance_index = inroad_imbalance_index / inroad_sum_car_num inroad_delay_info.delay_info.std_flow = int(inroad_std_flow / len(road_delay_infos)) inroad_delay_info.delay_info.turn_ratio_0 = turn_ratio_0 inroad_delay_info.delay_info.turn_ratio_1 = turn_ratio_1 inroad_delay_info.delay_info.turn_ratio_2 = turn_ratio_2 inroad_delay_info.delay_info.turn_ratio_3 = turn_ratio_3 inroad_delay_pb_list.append(inroad_delay_info) avg_road_stop_times = sum(item.delay_info.stop_times for item in inroad_delay_pb_list) / len(inroad_delay_pb_list) if len(inroad_delay_pb_list) > 0 else 0 cross_imbalance_index = round((round(max_stop_times, 2) - round(min_stop_times, 2)) / round(avg_road_stop_times, 2), 2) if round(avg_road_stop_times, 2) != 0 else 0 avg_cross_delay.delay_info.imbalance_index = cross_imbalance_index for flow_delay_info in flow_delay_list: xlink_id = flow_delay_info.xlink_id turn_type = flow_delay_info.turn_type key = str(xlink_id) + '-' + str(turn_type) if key not in flow_delay_info_dict: flow_delay_info_dict[key] = [flow_delay_info] else: flow_delay_info_dict[key].append(flow_delay_info) for key in flow_delay_info_dict.keys(): flow_delay_infos = flow_delay_info_dict[key] inroadid = flow_delay_infos[0].inroadid turn_type = flow_delay_infos[0].turn_type flow_sum_car_num, flow_delay_time, flow_stop_times, flow_queue_len, flow_speed, flow_jam_index, flow_park_time, flow_high_park_percent, flow_truck_percent, flow_park_percent, flow_move_speed, flow_imbalance_index, flow_std_flow, flow_travel_time \ = 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 for flow_delay_info in flow_delay_infos: flow_sum_car_num += flow_delay_info.delay_info.car_num flow_delay_time += flow_delay_info.delay_info.delay_time * flow_delay_info.delay_info.car_num flow_stop_times += flow_delay_info.delay_info.stop_times * flow_delay_info.delay_info.car_num flow_queue_len += flow_delay_info.delay_info.queue_len * flow_delay_info.delay_info.car_num flow_travel_time += flow_delay_info.delay_info.travel_time * flow_delay_info.delay_info.car_num flow_speed += flow_delay_info.delay_info.speed * flow_delay_info.delay_info.car_num flow_jam_index += flow_delay_info.delay_info.jam_index * flow_delay_info.delay_info.car_num flow_park_time += flow_delay_info.delay_info.park_time * flow_delay_info.delay_info.car_num flow_high_park_percent += flow_delay_info.delay_info.high_park_percent * flow_delay_info.delay_info.car_num flow_truck_percent += flow_delay_info.delay_info.truck_percent * flow_delay_info.delay_info.car_num flow_park_percent += flow_delay_info.delay_info.park_percent * flow_delay_info.delay_info.car_num flow_move_speed += flow_delay_info.delay_info.move_speed * flow_delay_info.delay_info.car_num # flow_imbalance_index += flow_delay_info.delay_info.imbalance_index * flow_delay_info.delay_info.car_num flow_std_flow += flow_delay_info.delay_info.std_flow flow_info = pb.xl_flow_delayinfo_t() flow_info.xlink_id = int(key.split('-')[0]) flow_info.inroadid = inroadid flow_info.turn_type = turn_type flow_info.delay_info.car_num = flow_sum_car_num flow_info.delay_info.delay_time = int(flow_delay_time / flow_sum_car_num) flow_info.delay_info.stop_times = flow_stop_times / flow_sum_car_num flow_info.delay_info.queue_len = int(flow_queue_len / flow_sum_car_num) flow_info.delay_info.speed = int(flow_speed / flow_sum_car_num) flow_info.delay_info.travel_time = int(flow_travel_time / flow_sum_car_num) flow_info.delay_info.jam_index = flow_jam_index / flow_sum_car_num flow_info.delay_info.park_time = int(flow_park_time / flow_sum_car_num) flow_info.delay_info.high_park_percent = int(flow_high_park_percent / flow_sum_car_num) flow_info.delay_info.truck_percent = int(flow_truck_percent / flow_sum_car_num) flow_info.delay_info.park_percent = int(flow_park_percent / flow_sum_car_num) flow_info.delay_info.move_speed = int(flow_move_speed / flow_sum_car_num) # flow_info.delay_info.imbalance_index = flow_imbalance_index / flow_sum_car_num flow_info.delay_info.std_flow = int(flow_std_flow / len(flow_delay_infos)) for inroad_info in inroad_delay_pb_list: if inroad_info.inroadid == inroadid: inroad_info.flow_delay_infos.append(flow_info) inroad_delay_infos_with_imbalance = calc_inroad_imbalance_index(inroad_delay_pb_list) avg_cross_delay.inroad_delay_infos.extend(inroad_delay_infos_with_imbalance) # 出口道流量数据聚合逻辑 outroad_delay_info_dict = {} for item in cross_delay_pb_list: outroadid_infos = item.outroad_infos for outroadid_info in outroadid_infos: outroadid = outroadid_info.outroadid if outroadid not in outroad_delay_info_dict: outroad_delay_info_dict[outroadid] = [outroadid_info] else: outroad_delay_info_dict[outroadid].append(outroadid_info) for outroadid in outroad_delay_info_dict.keys(): outroadid_infos = outroad_delay_info_dict[outroadid] car_num, turn_ratio_0, turn_ratio_1, turn_ratio_2, turn_ratio_3 = 0, 0, 0, 0, 0 for outroadid_info in outroadid_infos: turn_info = outroadid_info.turn_info car_num += turn_info.car_num turn_ratio_0 += turn_info.turn_ratio_0 turn_ratio_1 += turn_info.turn_ratio_1 turn_ratio_2 += turn_info.turn_ratio_2 turn_ratio_3 += turn_info.turn_ratio_3 outroadid_info = pb.xl_outroad_info_t() outroadid_info.outroadid = outroadid outroadid_info.turn_info.car_num = car_num outroadid_info.turn_info.turn_ratio_0 = turn_ratio_0 outroadid_info.turn_info.turn_ratio_1 = turn_ratio_1 outroadid_info.turn_info.turn_ratio_2 = turn_ratio_2 outroadid_info.turn_info.turn_ratio_3 = turn_ratio_3 avg_cross_delay.outroad_infos.append(outroadid_info) return avg_cross_delay # 生成路口诊断页面路口概览数据指标的函数 def gen_overview_index(avg_cross_delay_info, inroad_static_info_dict, nodeid, date_list, roads_dir_dict): car_num = avg_cross_delay_info.delay_info.car_num crossid = avg_cross_delay_info.crossid jam_index = round(avg_cross_delay_info.delay_info.jam_index, 2) if car_num >= 10 else '-' stop_times = round(avg_cross_delay_info.delay_info.stop_times, 2) if car_num >= 10 else '-' high_park_percent = str(avg_cross_delay_info.delay_info.high_park_percent) + '%' if car_num >= 10 else '-' imbalance_index = round(avg_cross_delay_info.delay_info.imbalance_index, 2) if car_num >= 10 else '-' speed = avg_cross_delay_info.delay_info.speed / 100 if car_num >= 10 else '-' move_speed = avg_cross_delay_info.delay_info.move_speed / 100 if car_num >= 10 else '-' park_time = avg_cross_delay_info.delay_info.park_time if car_num >= 10 else '-' delay_time = avg_cross_delay_info.delay_info.delay_time if car_num >= 10 else '-' high_stop_turn_ratio_desc = gen_high_stop_turn_ratio_desc(avg_cross_delay_info.inroad_delay_infos, inroad_static_info_dict, car_num) if car_num >= 10 else [] tide_index_list = calc_tide_index(crossid, nodeid, date_list, roads_dir_dict) usable_tide_list = [item for item in tide_index_list if item != 0] tide_index = round(sum(usable_tide_list) / len(usable_tide_list), 2) if len(usable_tide_list) > 0 else 0 service_level = calc_service_level(delay_time) overview_res = { 'jam_index': jam_index, 'stop_times': stop_times, 'high_park_percent': high_park_percent, 'imbalance_index': imbalance_index, 'speed': speed, 'move_speed': move_speed, 'park_time': park_time, 'delay_time': delay_time, 'service_level': service_level, 'high_stop_turn_ratio_desc': high_stop_turn_ratio_desc[0] if len(high_stop_turn_ratio_desc) > 0 else '', 'main_flow_src_desc': high_stop_turn_ratio_desc[1] if len(high_stop_turn_ratio_desc) > 1 else '', 'tide_index': tide_index } return overview_res def gen_high_stop_turn_ratio_desc(inroad_delay_infos, inroad_static_info_dict, car_num): src_dict, src_desc_list, main_flow_src_list = {}, [], [] for inroad_info in inroad_delay_infos: inroadid = inroad_info.inroadid if inroadid not in inroad_static_info_dict.keys(): continue inroad_car_num = inroad_info.delay_info.car_num src_dir = dir_str_dict[inroad_static_info_dict[inroadid]['src_direct']] if inroad_car_num / car_num > 0.25: check_res = found_max_car_num_flow(inroad_car_num, inroad_info) if check_res != -1: main_flow_src_list.append(src_dir + int_turn_type2str[check_res]) flow_delay_infos = inroad_info.flow_delay_infos for item in flow_delay_infos: if item.delay_info.stop_times >= 1: turn_type = int_turn_type2str[item.turn_type] if src_dir not in src_dict.keys(): src_dict[src_dir] = [turn_type] else: src_dict[src_dir].append(turn_type) for item in src_dict.keys(): src_desc_list.append(item + '/'.join(src_dict[item])) return ['、'.join(src_desc_list) if src_desc_list else '无', '、'.join(main_flow_src_list) if main_flow_src_list else '无'] def found_max_car_num_flow(inroad_car_num, inroad_info): if not inroad_info: return -1 data_list = [inroad_info.delay_info.turn_ratio_0, inroad_info.delay_info.turn_ratio_1, inroad_info.delay_info.turn_ratio_2, inroad_info.delay_info.turn_ratio_3] for i in range(len(data_list)): if data_list[i] / inroad_car_num > 0.5: return i return -1 def query_cross_ledger_info(crossid, nodeid, area_id, userid): # ledger_url = f"http://82.157.173.20:7070/api/common/cross_ledger_detail?crossid={crossid}&crossno=&nodeid={nodeid}&area_id={area_id}&userid={userid}" ledger_url = f"http://120.53.125.169:7070/api/common/cross_ledger_detail?crossid={crossid}&crossno=&nodeid={nodeid}&area_id={area_id}&userid={userid}" headers = {"Content-Type": "application/json"} cross_ledger_info = requests.get(ledger_url, headers=headers) if cross_ledger_info.status_code != 200 or cross_ledger_info.json()['status'] != 0: logging.error(f"查询路口台账信息失败,crossid:{crossid},nodeid:{nodeid},area_id:{area_id},userid:{userid}") return None cross_ledger_info_dict = json.loads(cross_ledger_info.text) return cross_ledger_info_dict def gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict): division_list, company_list, slc_company_dict, slckind_list, detector_type_dict = gen_ledger_base_info(nodeid, area_id) slc_company = cross_ledger_info_dict['data']['ledger']['slc_company'] location = cross_ledger_info_dict['data']['ledger']['location'] slc_company_name = slc_company_dict[slc_company] if slc_company in slc_company_dict.keys() else '-' internet = cross_ledger_info_dict['data']['ledger']['internet'] internet_str = int_internet_code2str[internet] primary = cross_ledger_info_dict['data']['ledger']['primary'] secondary = cross_ledger_info_dict['data']['ledger']['secondary'] cross_model = int_cross_model2str[primary] + '-' + int_cross_model2str[secondary] light_infos = cross_ledger_info_dict['data']['light_infos'] src_light_dict = gen_dir_light_info_dict(light_infos) roads = cross_ledger_info_dict['data']['roads'] dir_list = roads.keys() length_info, lane_nums_dict, road_special_info = {}, {}, {} sum_length = 0 for dir in dir_list: dir_length = roads[dir]['topology']['entry']['road_length'] straight_left_split, reverse_turn, left_turn_waiting_area, straight_turn_waiting_area, reversible_lanes_info, outside_left = 0, 0, 0, 0, 0, 0 if dir in src_light_dict.keys(): if 2 in src_light_dict[dir] and (1 in src_light_dict[dir] or 3 in src_light_dict[dir]): straight_left_split = 1 if roads[dir]['canaliza']['reverse_turn'] == 1: reverse_turn = 1 if roads[dir]['canaliza']['hold_left'] == 1: left_turn_waiting_area = 1 if roads[dir]['canaliza']['hold_straight'] == 1: straight_turn_waiting_area = 1 if len(roads[dir]['canaliza']['reversible_lanes_info']) > 0: reversible_lanes_info = 1 lane_turn_info = roads[dir]['topology']['entry']['lane_turn_info'] if check_outside_left(lane_turn_info): outside_left = 1 road_special_info[dir] = { 'straight_left_split': straight_left_split, 'reverse_turn': reverse_turn, 'hold_left': left_turn_waiting_area, 'hold_straight': straight_turn_waiting_area, 'reversible_lanes_info': reversible_lanes_info, 'outside_left': outside_left } lane_nums_dict[dir] = [[dir_str_dict[dir] + '进口', roads[dir]['entry_lane_num'], dir_str_dict[src_reverse[dir]] + '出口', roads[src_reverse[dir]]['exit_lane_num'] if src_reverse[dir] in roads.keys() else '-']] if dir_length != '-': sum_length += dir_length length_info[dir] = dir_length avg_length = round(sum_length / len(length_info.keys()), 0) if length_info else 0 sorted_length_info = sort_dict_by_clockwise(length_info) sorted_road_special_info = sort_dict_by_clockwise(road_special_info) sort_lane_num_info = sort_dict_by_clockwise(lane_nums_dict) cross_static_info = { 'crossid': crossid, 'nodeid': nodeid, 'area_id': area_id, 'location': location, 'slc_company': slc_company_name, 'internet': internet_str, 'cross_model': cross_model, 'lane_nums_list': [item[0] for item in list(sort_lane_num_info.values())], 'road_special_info': sorted_road_special_info, 'length_info': sorted_length_info, 'avg_length': avg_length } return cross_static_info, cross_ledger_info_dict['data'] def gen_ledger_base_info(nodeid, area_id): node_base_params_info = db_tmnet.query_base_info(nodeid, area_id) # 下列字段当为空时需在前端展示尚未针对该城市进行配置的同义提示语句 division_list, company_list, slc_company_dict, slckind_list, detector_type_dict = [], [], {}, [], {} for row in node_base_params_info: if row['param_type'] == 'division': division_list.append(row['param_value']) elif row['param_type'] == 'company': company_list.append(row['param_value']) elif row['param_type'] == 'slc_company': slc_company_dict[row['param_code']] = row['param_value'] elif row['param_type'] == 'slckind': slckind_list.append(row['param_value']) elif row['param_type'] == 'detector_type': detector_type_dict[row['param_code']] = row['param_value'] return division_list, company_list, slc_company_dict, slckind_list, detector_type_dict def gen_dir_light_info_dict(light_infos): dir_light_dict = {} for row in light_infos: srcDir = row['srcDir'] light_type = row['light_type'] if srcDir not in dir_light_dict.keys(): dir_light_dict[srcDir] = [light_type] else: dir_light_dict[srcDir].append(light_type) return dir_light_dict def check_outside_left(lane_turn_info): seen_s = False # 是否已出现直行车道 for func in lane_turn_info: if func == '-' or func in ('17', '18', '19', '20'): continue turn_type_str = g_turn2str[str(func)] if seen_s and ('l' in turn_type_str): # 直行车道右侧出现左转 return True if 's' in turn_type_str: # 当前是直行车道 seen_s = True return False def gen_road_delay_index(avg_cross_delay_info, roads_dir_dict): road_delay_infos = [] if not avg_cross_delay_info: return road_delay_infos road_delay_infos = avg_cross_delay_info.inroad_delay_infos road_dir = {v['in']: k for k, v in roads_dir_dict.items()} road_flow_index = {} for road_index in road_delay_infos: roadid = road_index.inroadid if roadid not in road_dir.keys(): continue service_level = calc_service_level(road_index.delay_info.delay_time) road_flow_index[roadid] = { 'src_dir': road_dir[roadid], 'stop_times': round(road_index.delay_info.stop_times, 2) if road_index.delay_info.car_num >= 5 else '-', 'high_park_percent': str(road_index.delay_info.high_park_percent) + '%' if road_index.delay_info.car_num >= 5 else '-', 'imbalance_index': round(road_index.delay_info.imbalance_index, 2) if road_index.delay_info.car_num >= 5 else '-', 'park_time': road_index.delay_info.park_time if road_index.delay_info.car_num >= 5 else '-', 'delay_time': road_index.delay_info.delay_time if road_index.delay_info.car_num >= 5 else '-', 'speed': road_index.delay_info.speed / 100 if road_index.delay_info.car_num >= 5 else '-', 'move_speed': road_index.delay_info.move_speed / 100 if road_index.delay_info.car_num >= 5 else '-', 'service_level': service_level if road_index.delay_info.car_num >= 5 else '-', 'flow_delays': {} } flow_delay_infos = road_index.flow_delay_infos for flow_delay_info in flow_delay_infos: if flow_delay_info.turn_type not in (0, 1): continue flow_service_level = calc_service_level(flow_delay_info.delay_info.delay_time) road_flow_index[roadid]['flow_delays'][flow_delay_info.turn_type] = { 'stop_times': round(flow_delay_info.delay_info.stop_times, 2) if flow_delay_info.delay_info.car_num >= 5 else '-', 'high_park_percent': str(flow_delay_info.delay_info.high_park_percent) + '%' if flow_delay_info.delay_info.car_num >= 5 else '-', 'park_time': flow_delay_info.delay_info.park_time if flow_delay_info.delay_info.car_num >= 5 else '-', 'delay_time': flow_delay_info.delay_info.delay_time if flow_delay_info.delay_info.car_num >= 5 else '-', 'speed': flow_delay_info.delay_info.speed / 100 if flow_delay_info.delay_info.car_num >= 5 else '-', 'service_level': flow_service_level if flow_delay_info.delay_info.car_num >= 5 else '-', 'move_speed': flow_delay_info.delay_info.move_speed / 100 if flow_delay_info.delay_info.car_num >= 5 else '-' } tmp_dict = {v['src_dir']: k for k, v in road_flow_index.items()} sorted_tmp_dict = sort_dict_by_clockwise(tmp_dict) sorted_key = list(sorted_tmp_dict.values()) res = {k: road_flow_index[k] for k in sorted_key} return res def gen_road_dir_dict(cross_ledger_info_dict): roads_dir_dict = {} if not cross_ledger_info_dict: return roads_dir_dict roads = cross_ledger_info_dict['roads'] if 'roads' in cross_ledger_info_dict.keys() else None for dir in roads.keys(): roads_dir_dict[dir] = { 'in': roads[dir]['roadid'], 'out': roads[dir]['out_roadid'] } return roads_dir_dict def calc_inroad_imbalance_index(inroad_delay_pb_list): tmp_list = copy.deepcopy(inroad_delay_pb_list) for item in tmp_list: max_stop_times, min_stop_times = 0, 99999 flow_delay_infos = item.flow_delay_infos avg_stop_times = sum(flow_delay_info.delay_info.stop_times for flow_delay_info in flow_delay_infos) / len(flow_delay_infos) if len(flow_delay_infos) > 0 else 0 for flow_delay_info in flow_delay_infos: if flow_delay_info.delay_info.stop_times > max_stop_times: max_stop_times = flow_delay_info.delay_info.stop_times if flow_delay_info.delay_info.stop_times < min_stop_times: min_stop_times = flow_delay_info.delay_info.stop_times imbalance_index = round((round(max_stop_times, 2) - round(min_stop_times, 2)) / round(avg_stop_times, 2), 2) if avg_stop_times > 0 and max_stop_times > 0 and min_stop_times != 99999 else 0 item.delay_info.imbalance_index = imbalance_index return tmp_list def gen_flow_turn_rate_index(avg_cross_delay_info, roads_dir_dict): road_delay_infos = avg_cross_delay_info.inroad_delay_infos outroad_infos = avg_cross_delay_info.outroad_infos road_delay_dict = {item.inroadid: item for item in road_delay_infos} outroad_info_dict = {item.outroadid: item for item in outroad_infos} cross_sum_car_num = sum([item.delay_info.car_num for item in road_delay_infos]) cross_out_sum_car_num = sum([item.turn_info.car_num for item in outroad_infos]) road_flow_turn_rate = {} inroadid_list = [roads_dir_dict[k]['in'] for k in roads_dir_dict.keys()] outroadid_list = [roads_dir_dict[k]['out'] for k in roads_dir_dict.keys()] for dir in roads_dir_dict.keys(): roadid = roads_dir_dict[dir]['in'] l_rate, s_rate, r_rate = 0, 0, 0 out_l_rate, out_s_rate, out_r_rate = 0, 0, 0 in_flow_rate, out_flow_rate = 0, 0 l_num, s_num, r_num = 0, 0, 0 out_l_num, out_s_num, out_r_num = 0, 0, 0 if roadid != '-' and roadid in road_delay_dict.keys(): inroad_info = g_roadnet.query_road(roadid) split_turns_set = set() for outroadid in outroadid_list: outroad = g_roadnet.query_road(outroadid) if not outroad: continue angle = g_roadnet.calc_road_turn_angle_without_abs(inroad_info, outroad) if abs(angle) <= 30: split_turns_set.add(0) elif abs(angle) >= 150: split_turns_set.add(3) elif angle > 0: split_turns_set.add(2) else: split_turns_set.add(1) car_num = road_delay_dict[roadid].delay_info.car_num in_flow_rate = int(car_num / cross_sum_car_num * 100) if cross_out_sum_car_num != 0 else 0 l_rate = int(road_delay_dict[roadid].delay_info.turn_ratio_1 / car_num * 100) if car_num != 0 else 0 s_rate = int(road_delay_dict[roadid].delay_info.turn_ratio_0 / car_num * 100) if car_num != 0 else 0 r_rate = int(road_delay_dict[roadid].delay_info.turn_ratio_2 / car_num * 100) if car_num != 0 else 0 if 0 not in split_turns_set: s_rate = '-' if 1 not in split_turns_set: l_rate = '-' if 2 not in split_turns_set: r_rate = '-' l_num = road_delay_dict[roadid].delay_info.turn_ratio_1 s_num = road_delay_dict[roadid].delay_info.turn_ratio_0 r_num = road_delay_dict[roadid].delay_info.turn_ratio_2 rate_list = fix_to_100(l_rate, s_rate, r_rate) l_rate, s_rate, r_rate = rate_list[0] if rate_list[0] == '-' else int(rate_list[0]), rate_list[1] if rate_list[1] == '-' else int(rate_list[1]), rate_list[2] if rate_list[2] == '-' else int(rate_list[2]) out_road_id = roads_dir_dict[dir]['out'] if out_road_id != '-' and out_road_id in outroad_info_dict.keys(): out_road_info = g_roadnet.query_road(out_road_id) merge_turns_set = set() for inroadid in inroadid_list: inroad = g_roadnet.query_road(inroadid) if not inroad: continue angle = g_roadnet.calc_road_turn_angle_without_abs(out_road_info, inroad) if abs(angle) <= 30: merge_turns_set.add(0) elif abs(angle) >= 150: merge_turns_set.add(3) elif angle > 0: merge_turns_set.add(2) else: merge_turns_set.add(1) out_car_num = outroad_info_dict[out_road_id].turn_info.car_num out_flow_rate = int(out_car_num / cross_out_sum_car_num * 100) if cross_out_sum_car_num != 0 else 0 out_l_rate = int(outroad_info_dict[out_road_id].turn_info.turn_ratio_1 / out_car_num * 100) if out_car_num != 0 else 0 out_s_rate = int(outroad_info_dict[out_road_id].turn_info.turn_ratio_0 / out_car_num * 100) if out_car_num != 0 else 0 out_r_rate = int(outroad_info_dict[out_road_id].turn_info.turn_ratio_2 / out_car_num * 100) if out_car_num != 0 else 0 if 0 not in merge_turns_set: out_s_rate = '-' if 1 not in merge_turns_set: out_l_rate = '-' if 2 not in merge_turns_set: out_r_rate = '-' out_l_num, out_s_num, out_r_num = outroad_info_dict[out_road_id].turn_info.turn_ratio_1, outroad_info_dict[out_road_id].turn_info.turn_ratio_0, outroad_info_dict[out_road_id].turn_info.turn_ratio_2 rate_list = fix_to_100(out_l_rate, out_s_rate, out_r_rate) out_l_rate, out_s_rate, out_r_rate = rate_list[0] if rate_list[0] == '-' else int(rate_list[0]), rate_list[1] if rate_list[1] == '-' else int(rate_list[1]), rate_list[2] if rate_list[2] == '-' else int(rate_list[2]) road_flow_turn_rate[roadid] = { 'src_dir': dir, 'in_flow_rate': in_flow_rate, 'out_flow_rate': out_flow_rate, 'l_rate': l_rate, 's_rate': s_rate, 'r_rate': r_rate, 'out_l_rate': out_l_rate, 'out_s_rate': out_s_rate, 'out_r_rate': out_r_rate, 'l_num': l_num, 's_num': s_num, 'r_num': r_num, 'out_l_num': out_l_num, 'out_s_num': out_s_num, 'out_r_num': out_r_num } tmp_dict = {v['src_dir']: k for k, v in road_flow_turn_rate.items()} sorted_tmp_dict = sort_dict_by_clockwise(tmp_dict) sorted_key = list(sorted_tmp_dict.values()) res = {k: road_flow_turn_rate[k] for k in sorted_key} return res def calc_service_level(delay_time): if delay_time == '-' or delay_time == '': return '-' if delay_time <= 10: service_level = 'A' elif 10 < delay_time <= 20: service_level = 'B' elif 20 < delay_time <= 35: service_level = 'C' elif 35 < delay_time <= 55: service_level = 'D' elif 55 < delay_time <= 80: service_level = 'E' else: service_level = 'F' return service_level def calc_tide_index(crossid, nodeid, date_list, roads_dir_dict, date_type=None): tide_index_list = [] am_tp_start, am_tp_end = 't700', '900' pm_tp_start, pm_tp_end = 't1700', '1900' if date_type == 'week': date_list = parse_week_str(date_list[0]) am_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, am_tp_start) pm_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, pm_tp_start) date_am_delay_info_dict = {item['day']: item['data'] for item in am_cross_delay_data_list} date_pm_delay_info_dict = {item['day']: item['data'] for item in pm_cross_delay_data_list} # 构建对向进口道对 subtend_road_pair = gen_subtend_road_pair(roads_dir_dict) for date in date_list: max_am_flow, max_pm_flow, tide_index, max_am_flow_road, max_pm_flow_road = 0, 0, 0, '', '' if date not in date_am_delay_info_dict.keys() or date not in date_pm_delay_info_dict.keys(): tide_index_list.append(tide_index) continue item_am_cross_delay_info = pb.xl_cross_delayinfo_t() item_pm_cross_delay_info = pb.xl_cross_delayinfo_t() item_am_cross_delay_info.ParseFromString(date_am_delay_info_dict[date]) item_pm_cross_delay_info.ParseFromString(date_pm_delay_info_dict[date]) if not item_am_cross_delay_info or not item_pm_cross_delay_info: tide_index_list.append(tide_index) continue am_inroad_infos = item_am_cross_delay_info.inroad_delay_infos pm_inroad_infos = item_pm_cross_delay_info.inroad_delay_infos am_inroad_info_dict = {item.inroadid: item for item in am_inroad_infos} pm_inroad_info_dict = {item.inroadid: item for item in pm_inroad_infos} # 找出早高峰最高流量的进口道 for am_inroad_info in am_inroad_infos: am_flow = am_inroad_info.delay_info.car_num if am_flow > max_am_flow: max_am_flow = am_flow max_am_flow_road = am_inroad_info.inroadid if max_am_flow_road == '' or max_am_flow_road not in subtend_road_pair.keys(): tide_index_list.append(tide_index) continue subtend_road_am_flow = am_inroad_info_dict[subtend_road_pair[max_am_flow_road]].delay_info.car_num if subtend_road_pair[max_am_flow_road] in am_inroad_info_dict.keys() else 0 # 如果进口道流量小于20 则无意义 if min(subtend_road_am_flow, max_am_flow) < 10: tide_index_list.append(tide_index) continue # 找出晚高峰最高流量的进口道 for pm_inroad_info in pm_inroad_infos: pm_flow = pm_inroad_info.delay_info.car_num if pm_flow > max_pm_flow: max_pm_flow = pm_flow max_pm_flow_road = pm_inroad_info.inroadid # 如果路段id为空或不在对向对里,则无意义 if max_pm_flow_road == '' or max_pm_flow_road not in subtend_road_pair.keys(): tide_index_list.append(tide_index) continue subtend_road_pm_flow = pm_inroad_info_dict[subtend_road_pair[max_pm_flow_road]].delay_info.car_num if min(subtend_road_pm_flow, max_pm_flow) < 10: tide_index_list.append(tide_index) continue # 如果早晚高峰的最大流量进口道不处于同一个对向对中,则无意义 if max_pm_flow_road != max_am_flow_road and max_pm_flow_road != subtend_road_pair[max_am_flow_road]: tide_index_list.append(tide_index) continue Fam = max(max_am_flow, subtend_road_am_flow) / min(max_am_flow, subtend_road_am_flow) Fpm = max(max_pm_flow, subtend_road_pm_flow) / min(max_pm_flow, subtend_road_pm_flow) if Fam >= 1.5 and Fpm >= 1.5 and max_pm_flow_road == subtend_road_pair[max_am_flow_road]: tide_index = round(min(Fam, Fpm) / max(Fam, Fpm), 2) if min(Fam, Fpm) <= 0: tide_index = 0 if max_pm_flow_road == max_am_flow_road: tide_index = round(Fam * Fpm, 2) tide_index_list.append(tide_index) return tide_index_list def gen_subtend_road_pair(roads_dir_dict): subtend_road_pair = {} tmp_road_dir = copy.deepcopy(roads_dir_dict) keys_to_process = list(tmp_road_dir.keys()) for dir in keys_to_process: if dir in tmp_road_dir.keys() and tmp_road_dir[dir]['in'] == '-': continue subtend_dir = src_reverse[dir] if subtend_dir in tmp_road_dir.keys() and tmp_road_dir[subtend_dir]['in'] != '-': subtend_road_pair[tmp_road_dir[dir]['in']] = tmp_road_dir[subtend_dir]['in'] return subtend_road_pair def parse_single_cross_delay_info(crossid, nodeid, data_list, data_type, roads_dir_dict): data_dict = {} max_cross_car_num, max_road_car_num, max_flow_car_num = 0, 0, 0 pb_data_list = [] for item in data_list: if item: pb_data_list.append(item['data']) road_delay_infos = item['data'].inroad_delay_infos if item['data'] and item['data'].inroad_delay_infos else [] for road_delay_info in road_delay_infos: if road_delay_info.delay_info.car_num > max_road_car_num: max_road_car_num = road_delay_info.delay_info.car_num flow_delay_infos = road_delay_info.flow_delay_infos for flow_delay_info in flow_delay_infos: if flow_delay_info.delay_info.car_num > max_flow_car_num: max_flow_car_num = flow_delay_info.delay_info.car_num max_cross_car_num_pb = max( (x for x in pb_data_list if x is not None), key=lambda x: x.delay_info.car_num, default=None ) max_cross_car_num = max_cross_car_num_pb.delay_info.car_num if max_cross_car_num_pb else 0 for item in data_list: tp_start = item['tp_start'] tp_end = item['tp_end'] day = item['day'] if data_type == 'day': key = day elif data_type == 'hour': key = convert_time(int(str(tp_start).replace('h', ''))) + '-' + convert_time(int(str(tp_end).replace('h', ''))) else: key = day item_cross_delay_info = item['data'] # 指标内容依次是 停车次数 多次停车率 停车时间 延误时间 平均速度 不停车速度 相对流量 (20251104新增流量真实数值需求) 路口失衡系数 潮汐指数 服务水平 overview_data = ['-', '-', '-', '-', '-', '-', '-', '-', '-', '-', '-', # 下方为上述指标变化率颜色展示flag 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] tide_index = calc_tide_index(crossid, nodeid, [day], roads_dir_dict, date_type='week' if data_type == 'week' else None) if data_type != 'hour' else '-' road_data_dict = {} cross_car_num = 0 if item_cross_delay_info and item_cross_delay_info.delay_info.car_num >= 10: cross_car_num = item_cross_delay_info.delay_info.car_num road_delay_infos = item_cross_delay_info.inroad_delay_infos if data_type != 'week': cross_imbalance_index = round(item_cross_delay_info.delay_info.imbalance_index, 2) else: max_stop_times = max(road_delay_infos, key=lambda x: x.delay_info.stop_times).delay_info.stop_times min_stop_times = min(road_delay_infos, key=lambda x: x.delay_info.stop_times).delay_info.stop_times cross_imbalance_index = round((max_stop_times - min_stop_times) / item_cross_delay_info.delay_info.stop_times, 2) if item_cross_delay_info.delay_info.stop_times != 0 else 0 overview_data = [ # 停车次数 round(item_cross_delay_info.delay_info.stop_times, 2), # 多次停车率 停车时间 item_cross_delay_info.delay_info.high_park_percent, item_cross_delay_info.delay_info.park_time, # 延误时间 平均速度 item_cross_delay_info.delay_info.delay_time, item_cross_delay_info.delay_info.speed / 100, # 不停车速度 相对流量占比 item_cross_delay_info.delay_info.move_speed / 100, round(cross_car_num / max_cross_car_num * 100, 2) if max_cross_car_num != 0 else 0, # 20251104新增流量真实数值需求 cross_car_num, # 路口失衡系数 潮汐指数(当为小时级指标时,不计算该值) 服务水平 cross_imbalance_index, tide_index[0], calc_service_level(item_cross_delay_info.delay_info.delay_time), # 指标变化率颜色展示flag, 不含服务水平 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ] inroad_delay_infos_with_imbalance = calc_inroad_imbalance_index(road_delay_infos) road_data_dict = {item.inroadid: item for item in inroad_delay_infos_with_imbalance} data_dict[key] = { 'overview': overview_data, 'roads_data': {} } src_dir_list = list(roads_dir_dict.keys()) for src_dir in src_dir_list: src_data = ['-', '-', '-', '-', '-', '-', '-', '-', '-', '-', '-', # 下方为上述指标变化率颜色展示flag 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] flow_data_dict = { '0': [ '-', '-', '-', '-', '-', '-', '-', '-', '-', '-', # 下方为上述指标变化率颜色展示flag 0, 0, 0, 0, 0, 0, 0, 0, 0 ], '1': [ '-', '-', '-', '-', '-', '-', '-', '-', '-', '-', # 下方为上述指标变化率颜色展示flag 0, 0, 0, 0, 0, 0, 0, 0, 0 ] } if roads_dir_dict[src_dir]['in'] != '-' and roads_dir_dict[src_dir]['in'] in road_data_dict.keys() and road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.car_num >= 5: road_car_num = road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.car_num src_data = [ # 停车次数 round(road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.stop_times, 2), # 多次停车率 road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.high_park_percent, # 转向失衡指数 round(road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.imbalance_index, 2), # 停车时间 road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.park_time, # 延误时间 road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.delay_time, # 平均速度 road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.speed / 100, # 不停车速度 road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.move_speed / 100, # 相对流量占比 round(road_car_num / max_road_car_num * 100, 2) if max_road_car_num > 0 else '-', # 20251104新增流量真实数值需求 road_car_num, # 进口道流量占比 round(road_car_num / cross_car_num * 100, 2) if cross_car_num > 0 else '-', # 服务水平 calc_service_level(road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.delay_time), # 指标变化率颜色展示flag, 顺序为上述指标顺序, 不含服务水平 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ] flow_delay_infos = road_data_dict[roads_dir_dict[src_dir]['in']].flow_delay_infos flow_data_dict = {} for flow_delay_info in flow_delay_infos: turn_type = flow_delay_info.turn_type if turn_type not in (0, 1): continue if flow_delay_info.delay_info.car_num >= 3: flow_data = [ # 停车次数 round(flow_delay_info.delay_info.stop_times, 2), # 多次停车率 flow_delay_info.delay_info.high_park_percent, # 停车时间 flow_delay_info.delay_info.park_time, # 延误时间 flow_delay_info.delay_info.delay_time, # 平均速度 flow_delay_info.delay_info.speed / 100, # 不停车速度 flow_delay_info.delay_info.move_speed / 100, # 相对流量 round(flow_delay_info.delay_info.car_num / max_flow_car_num * 100, 2) if max_flow_car_num > 0 else '-', # 20251104新增流量真实数值需求 flow_delay_info.delay_info.car_num, # 分流转向占比 round(flow_delay_info.delay_info.car_num / road_car_num * 100, 2) if road_car_num > 0 else '-', # 服务水平 calc_service_level(flow_delay_info.delay_info.delay_time), # 指标变化率颜色展示flag 不含服务水平 0, 0, 0, 0, 0, 0, 0, 0, 0 ] flow_data_dict[turn_type] = flow_data data_dict[key]['roads_data'][src_dir] = { 'road': src_data, 'flow': flow_data_dict } return data_dict def calc_single_day_delay_info_change_rate(data_dict): res_data_dict = copy.deepcopy(data_dict) key_list = list(res_data_dict.keys()) empty_overview = ['-', '-', '-', '-', '-', '-', '-', '-', '-', '-', '-', # 指标变化率颜色展示flag 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] for i in range(len(key_list)-1, -1, -1): overview_data = res_data_dict[key_list[i]]['overview'] # list roads_data = res_data_dict[key_list[i]]['roads_data'] # dict 内部包含多个方向路段和各个方向路段的流向级别的数据 if i != 0: prev_item_index = i - 1 prev_overview_data = res_data_dict[key_list[prev_item_index]]['overview'] prev_roads_data = res_data_dict[key_list[prev_item_index]]['roads_data'] if overview_data != empty_overview and prev_overview_data != empty_overview: overview_data_with_change_rate = calc_overview_change_rate(overview_data, prev_overview_data) res_data_dict[key_list[i]]['overview'] = overview_data_with_change_rate roads_data_wit_change_rate = calc_roads_data_change_rate(roads_data, prev_roads_data) res_data_dict[key_list[i]]['roads_data'] = roads_data_wit_change_rate return res_data_dict def calc_overview_change_rate(overview_data, prev_overview_data): res_data = copy.copy(overview_data) for i in range(len(overview_data)): if i >= 10: continue if overview_data[i] == '-' or prev_overview_data[i] == '-': res_data[i + 11] = 0 # 变化率 0表示正常 1表示恶化 2表示优化 if i in (0, 1, 2, 3, 6, 7, 8, 9): # 指标提升代表恶化的有停车次数、多次停车率、转向失衡指数、停车时间、延误时间、相对流量、20251104新增流量真实数值需求、进口道流量占比 if overview_data[i] == '-' or prev_overview_data[i] == '-': continue rate = (overview_data[i] - prev_overview_data[i]) / prev_overview_data[i] * 100 if prev_overview_data[i] > 0 else 0 if rate < -20: res_data[i + 11] = 2 elif rate > 20: res_data[i + 11] = 1 else: # 指标下降代表恶化的有平均速度、不停车速度 if overview_data[i] == '-' or prev_overview_data[i] == '-': continue rate = (overview_data[i] - prev_overview_data[i]) / prev_overview_data[i] * 100 if prev_overview_data[i] > 0 else 0 if rate > 20: res_data[i + 11] = 2 elif rate < -20: res_data[i + 11] = 1 return res_data def calc_roads_data_change_rate(roads_data, prev_roads_data): res_data = copy.deepcopy(roads_data) src_dir_list = list(res_data.keys()) for src_dir in src_dir_list: src_dir_road_data = res_data[src_dir]['road'] if src_dir not in prev_roads_data.keys(): continue # 指标提升代表恶化的有停车次数、多次停车率、停车时间、延误时间、相对流量、20251104新增流量真实数值需求、分流转向占比 prev_src_dir_road_data = prev_roads_data[src_dir]['road'] for i in range(len(src_dir_road_data)): if i >= 10: continue if src_dir_road_data[i] == '-' or prev_src_dir_road_data[i] == '-': continue if i in (0, 1, 2, 3, 4, 7, 8, 9): rate = (src_dir_road_data[i] - prev_src_dir_road_data[i]) / prev_src_dir_road_data[i] * 100 if prev_src_dir_road_data[i] > 0 else 0 if rate < -20: src_dir_road_data[i + 11] = 2 elif rate > 20: src_dir_road_data[i + 11] = 1 else: # 指标下降代表恶化的有平均速度、不停车速度 rate = (src_dir_road_data[i] - prev_src_dir_road_data[i]) / prev_src_dir_road_data[i] * 100 if prev_src_dir_road_data[i] > 0 else 0 if rate > 20: src_dir_road_data[i + 11] = 2 elif rate < -20: src_dir_road_data[i + 11] = 1 flow_datas = res_data[src_dir]['flow'] prev_flow_datas = prev_roads_data[src_dir]['flow'] for turn_type in flow_datas.keys(): flow_data = flow_datas[turn_type] if turn_type not in prev_flow_datas.keys(): continue prev_flow_data = prev_flow_datas[turn_type] for i in range(len(flow_data)): if i >= 9: continue if flow_data[i] == '-' or prev_flow_data[i] == '-': continue if i in (0, 1, 2, 3, 6, 7, 8): rate = (flow_data[i] - prev_flow_data[i]) / prev_flow_data[i] * 100 if prev_flow_data[i] > 0 else 0 if rate < -20: flow_data[i + 10] = 2 elif rate > 20: flow_data[i + 10] = 1 else: rate = (flow_data[i] - prev_flow_data[i]) / prev_flow_data[i] * 100 if prev_flow_data[i] > 0 else 0 if rate > 20: flow_data[i + 10] = 2 elif rate < -20: flow_data[i + 10] = 1 return res_data def parse_data2pb(data_list): res_list = [] for row in data_list: day = row['day'] tp_start = row['tp_start'] tp_end = row['tp_end'] item_cross_delay_info = pb.xl_cross_delayinfo_t() item_cross_delay_info.ParseFromString(row['data']) res_list.append({ 'day': day, 'tp_start': tp_start, 'tp_end': tp_end, 'data': item_cross_delay_info }) return res_list # 生成路口诊断问题 def gen_cross_problems(crossid, nodeid, area_id, time_range, start_hm, date_list, avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict, cross_phase, is_peak, cross_ledger_info): # 运行效率、均衡调控、配时方案、路口渠化 operating_efficiency_problems = gen_operating_efficiency_problems(avg_cross_delay_info, roads_dir_dict, cross_phase, is_peak) balanced_control_problems = gen_balanced_control_problems(crossid, nodeid, date_list, avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict, cross_phase) phase_problems = gen_phase_problems(nodeid, area_id, crossid, time_range, date_list, min(date_list), max(date_list), start_hm) cross_channelized_problems = gen_cross_channelized_problems(avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict, cross_ledger_info) problems = { 'operating_efficiency_problems': operating_efficiency_problems, 'balanced_control_problems': balanced_control_problems, 'phase_problems': phase_problems, 'cross_channelized_problems': cross_channelized_problems } return problems # 生成运行效率诊断问题结果 def gen_operating_efficiency_problems(avg_cross_delay_info, roads_dir_dict, cross_phase, is_peak): if not avg_cross_delay_info: return [{ 'item': '运行效率', 'values': [] }] operating_efficiency_problems = { 'item': '运行效率', 'values': [], 'total_num': 0 } road_delay_infos = avg_cross_delay_info.inroad_delay_infos high_park_problems, high_park_suggestions, high_park_total_num = gen_high_park_problems(road_delay_infos, roads_dir_dict, cross_phase) high_stop_times_problems, high_stop_times_suggestions, high_stop_times_total_num = gen_high_stop_time_problems(avg_cross_delay_info, is_peak) sum_num = 0 if high_park_total_num > 0: sum_num += high_park_total_num operating_efficiency_problems['values'].append({ 'item': '多次排队', 'detail': high_park_problems, 'reason': '某一进口道转向的多次停车率大于15%', 'suggestions': high_park_suggestions }) if high_stop_times_total_num > 0: sum_num += high_stop_times_total_num operating_efficiency_problems['values'].append({ 'item': '停车较多', 'detail': high_stop_times_problems, 'reason': '高峰时段路口停车次数大于2次,非高峰时段停车次数大于1次', 'suggestions': high_stop_times_suggestions }) operating_efficiency_problems['total_num'] = sum_num return operating_efficiency_problems # 运行效率-多次排队 def gen_high_park_problems(road_delay_infos, roads_dir_dict, cross_phase): detail = [] suggestion, total_num = [], 0 err_src_dict = {} src_list = list(roads_dir_dict.keys()) road_delays_dict = {item.inroadid: item for item in road_delay_infos} for src_dir in src_list: if roads_dir_dict[src_dir]['in'] == '-' or roads_dir_dict[src_dir]['in'] not in road_delays_dict.keys(): continue road_info = road_delays_dict[roads_dir_dict[src_dir]['in']] flow_delay_infos = road_info.flow_delay_infos turn_type_flow_delay_info_dict = {item.turn_type: item for item in flow_delay_infos} for turn_type in turn_type_flow_delay_info_dict.keys(): flow_delay_info = turn_type_flow_delay_info_dict[turn_type] if turn_type not in (0, 1): continue if flow_delay_info.delay_info.high_park_percent > 15 and flow_delay_info.delay_info.car_num >= 5: if src_dir not in err_src_dict.keys(): err_src_dict[src_dir] = [str(turn_type) + ':' + str(flow_delay_info.delay_info.high_park_percent)] else: err_src_dict[src_dir].append(str(turn_type) + ':' + str(flow_delay_info.delay_info.high_park_percent)) if err_src_dict: detail = [] for src_dir in err_src_dict.keys(): total_num = 1 src_detail, src_suggestions, turn_type_list = [], [], [] for turn_type in err_src_dict[src_dir]: turn_type_list.append(int(turn_type.split(':')[0])) flow_detail = { 'turn_type': int_turn_type2str[int(turn_type.split(':')[0])], 'text': '车辆多次停车率过大(' + turn_type.split(':')[1] + '%),' } src_detail.append(flow_detail) src_detail.append({ 'text': '车辆需要多次排队才能通过' }) detail.append([ { 'src_dir': dir_str_dict[src_dir] + '进口', 'child_detail': src_detail } ]) turn_type_str = '、'.join([int_turn_type2str[int(item.split(':')[0])] for item in err_src_dict[src_dir]]) src_suggestions.append( { 'text': '增加' } ) phase_suggestions = get_flow_phase_detail(src_dir, turn_type_list, cross_phase) src_suggestions.append({ 'src_dir': dir_str_dict[src_dir] + '进口' + turn_type_str, 'text': '所属相位的绿灯时间' }) suggestion.append({ 'child_detail': src_suggestions, 'next_line': phase_suggestions }) # suggestion.extend(phase_suggestions) return detail, suggestion, total_num # 运行效率-停车较多 def gen_high_stop_time_problems(avg_cross_delay_info, is_peak): detail = [] suggestion, total_num = [], 0 desc, max_stop_times = '非高峰时段', 1 if is_peak: # 表示为高峰时段,否则为平峰时段 desc, max_stop_times = '高峰时段', 2 if avg_cross_delay_info: if avg_cross_delay_info.delay_info.stop_times > max_stop_times and avg_cross_delay_info.delay_info.car_num >= 10: total_num = 1 detail = [{ 'child_detail': [ { 'desc': desc, 'text': '车辆多次停车时长过长(' + str(avg_cross_delay_info.delay_info.stop_times) + '),整体运行效率不高', } ] }] suggestion = [ { 'child_detail': [ { 'text': '对路口', 'desc': desc }, { 'text': '配时方案进行优化' } ] } ] return detail, suggestion, total_num # 均衡调控 def gen_balanced_control_problems(crossid, nodeid, date_list, avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict, cross_phase): road_delay_infos = avg_cross_delay_info.inroad_delay_infos cross_imbalance_detail, cross_imbalance_suggestions, cross_imbalance_total_num = gen_cross_imbalance_problems(road_delay_infos, roads_dir_dict, cross_phase) turn_imbalance_detail, turn_imbalance_suggestions, turn_imbalance_total_num = gen_turn_imbalance_problems(road_delay_infos, roads_dir_dict, inroad_static_info_dict, cross_phase) cross_tide_problems, cross_tide_suggestions, cross_tide_total_num = gen_cross_tide_problems(crossid, nodeid, date_list, roads_dir_dict) balanced_control_problems = { 'item': '均衡调控', 'values': [], 'total_num': 0 } sum_num = 0 if cross_imbalance_total_num > 0: sum_num += cross_imbalance_total_num balanced_control_problems['values'].append( { 'item': '路口失衡', 'detail': cross_imbalance_detail, 'reason': '路口存在某个流向绿灯时长不足而另一个方向绿灯存在空放的现象', 'suggestions': cross_imbalance_suggestions } ) if turn_imbalance_total_num > 0: sum_num += turn_imbalance_total_num balanced_control_problems['values'].append( { 'item': '转向失衡', 'detail': turn_imbalance_detail, 'reason': '同一进口道,直行与左转停车次数之差的绝对值大于0.5,且转向停车次数的最大值大于1', 'suggestions': turn_imbalance_suggestions } ) if cross_tide_total_num > 0: sum_num += cross_tide_total_num balanced_control_problems['values'].append( { 'item': '路口潮汐', 'detail': cross_tide_problems, 'reason': '对向进口道,早高峰其中一个方向的进口道与出口道流量比大于150%,晚高峰另一个方向进口道与出口道流量比大于150%', 'suggestions': cross_tide_suggestions } ) balanced_control_problems['total_num'] = sum_num return balanced_control_problems # 均衡调控-路口失衡问题诊断 def gen_cross_imbalance_problems(road_delay_infos, roads_dir_dict, cross_phase): detail = [] suggestion, total_num = [], 0 road_src_dict = {v['in']: k for k, v in roads_dir_dict.items()} max_stop_times_road = max(road_delay_infos, key=lambda x: x.delay_info.stop_times) min_stop_times_road = min(road_delay_infos, key=lambda x: x.delay_info.stop_times) if max_stop_times_road.delay_info.stop_times > 1 \ and max_stop_times_road.delay_info.stop_times - min_stop_times_road.delay_info.stop_times > 0.5 and max_stop_times_road.delay_info.car_num >= 5 and min_stop_times_road.delay_info.car_num >= 5: max_roadid = max_stop_times_road.inroadid min_roadid = min_stop_times_road.inroadid max_src = road_src_dict[max_roadid] if max_roadid in road_src_dict else None min_src = road_src_dict[min_roadid] if min_roadid in road_src_dict else None if max_src is None or min_src is None: return detail, suggestion, total_num total_num = 1 detail = [ [ { 'child_detail': [ { 'max_src_dir': dir_str_dict[max_src] + '进口', 'text': '的停车次数(' + str(round(max_stop_times_road.delay_info.stop_times, 2)) + ')与' }, { 'min_src_dir': dir_str_dict[min_src] + '进口', 'text': f"""的停车次数({str(round(min_stop_times_road.delay_info.stop_times, 2))})相差过大,两者之比为{int(round(max_stop_times_road.delay_info.stop_times, 2) / round(min_stop_times_road.delay_info.stop_times, 2) * 100)}%,分配的绿灯时长不匹配""" } ] } ] ] is_tide = False if max_src == src_reverse[min_src]: is_tide = True phase_suggestions = gen_src_dir_phase_detail(max_src, min_src, cross_phase, is_tide=is_tide) suggestion = [ { 'child_detail': [ { 'text': '调整', 'max_src_dir': dir_str_dict[max_src] + '进口', }, { 'text': '和', 'min_src_dir': dir_str_dict[min_src] + '进口', }, { 'text': '的绿灯时长的分配情况' } ], 'next_line': phase_suggestions } ] # suggestion.extend(phase_suggestions) return detail, suggestion, total_num # 均衡调控-转向失衡问题诊断 def gen_turn_imbalance_problems(road_delay_infos, roads_dir_dict, inroad_static_info_dict, cross_phase): detail = [] suggestion, total_num = [], 0 err_road_dict = {} road_src_dict = {v['in']: k for k, v in roads_dir_dict.items()} for road_delay_info in road_delay_infos: inroadid = road_delay_info.inroadid src_dir = road_src_dict[inroadid] if inroadid in road_src_dict.keys() else None if src_dir is None: continue flow_delay_infos = road_delay_info.flow_delay_infos turn_type_flow_delay_info_dict = {item.turn_type: item for item in flow_delay_infos} lane_num_info, count_num = count_lsr(inroad_static_info_dict[inroadid]['lane_turn_info']) if inroadid in inroad_static_info_dict.keys() else None if 0 in turn_type_flow_delay_info_dict.keys() and 1 in turn_type_flow_delay_info_dict.keys(): s_stop_times = turn_type_flow_delay_info_dict[0].delay_info.stop_times l_stop_times = turn_type_flow_delay_info_dict[1].delay_info.stop_times if (s_stop_times > 1 or l_stop_times > 1) and abs(l_stop_times - s_stop_times) > 0.5 and turn_type_flow_delay_info_dict[0].delay_info.car_num >= 5 and turn_type_flow_delay_info_dict[1].delay_info.car_num >= 5: if s_stop_times > l_stop_times: err_road_dict[road_src_dict[inroadid]] = ['直行' + ':' + str(round(s_stop_times, 2)), '左转' + ':' + str(round(l_stop_times, 2)), lane_num_info, count_num] else: err_road_dict[road_src_dict[inroadid]] = ['左转' + ':' + str(round(l_stop_times, 2)), '直行' + ':' + str(round(s_stop_times, 2)), lane_num_info, count_num] if len(err_road_dict.keys()) > 0: detail, suggestion, road_num_suggestion = [], [], [] for src_dir in err_road_dict.keys(): total_num = 1 item_detail = [ { 'src_dir': dir_str_dict[src_dir] + '进口', 'child_detail': [ { 'turn_type': err_road_dict[src_dir][0].split(':')[0], }, { 'text': '的停车次数(' + str(err_road_dict[src_dir][0].split(':')[1]) + ')与' }, { 'turn_type': err_road_dict[src_dir][1].split(':')[0], }, { 'text': '的停车次数(' + str(err_road_dict[src_dir][1].split(':')[1]) + ')相差过大,两者之比为' + str(int(float(err_road_dict[src_dir][0].split(':')[1]) / float(err_road_dict[src_dir][1].split(':')[1]) * 100)) + '%,分配的绿灯时长不匹配' } ] } ] detail.append(item_detail) phase_suggestion = get_flow_phase_detail(src_dir, [0, 1], cross_phase) item_suggestion = [ { 'child_detail': [ { 'src_dir': dir_str_dict[src_dir] + '进口', 'text': '信号灯直左分控,调整直行和左转车辆绿灯时长分配情况' } ], 'next_line': phase_suggestion } ] suggestion.extend(item_suggestion) if err_road_dict[src_dir][2]: road_num_suggestion.append( { 'child_detail': [ { 'src_dir': dir_str_dict[src_dir] + '进口', 'text': err_road_dict[src_dir][2] + '现有车道数分别为' + err_road_dict[src_dir][3] } ] } ) lane_num_suggestion = [ { 'child_detail': [ { 'text': '调整', 'src_dir': '、'.join([dir_str_dict[item] + '进口' for item in err_road_dict.keys()]) }, { 'text': '车道分配情况' } ], 'next_line': road_num_suggestion } ] suggestion = suggestion + lane_num_suggestion return detail, suggestion, total_num # 均衡调控-路口潮汐问题诊断 def gen_cross_tide_problems(crossid, nodeid, date_list, roads_dir_dict): detail = [] suggestions, total_num = [], 0 am_tp_start, am_tp_end = 't700', '900' pm_tp_start, pm_tp_end = 't1700', '1900' am_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, am_tp_start) pm_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, pm_tp_start) am_avg_cross_delay_info = gen_avg_cross_delay_pb(am_cross_delay_data_list) pm_avg_cross_delay_info = gen_avg_cross_delay_pb(pm_cross_delay_data_list) # 构建对向进口道对 subtend_road_pair = gen_subtend_road_pair(roads_dir_dict) road_src_dict = {v['in']: k for k, v in roads_dir_dict.items()} max_am_flow, max_pm_flow, tide_index, max_am_flow_road, max_pm_flow_road = 0, 99999999999, 0, '', '' am_inroad_infos = am_avg_cross_delay_info.inroad_delay_infos pm_inroad_infos = pm_avg_cross_delay_info.inroad_delay_infos am_inroad_info_dict = {item.inroadid: item for item in am_inroad_infos} pm_inroad_info_dict = {item.inroadid: item for item in pm_inroad_infos} # 找出早高峰最高流量的进口道 for am_inroad_info in am_inroad_infos: if am_inroad_info.delay_info.car_num < 5: continue am_flow = am_inroad_info.delay_info.car_num if am_flow > max_am_flow: max_am_flow = am_flow max_am_flow_road = am_inroad_info.inroadid subtend_road_am_flow = 0 if max_am_flow_road in subtend_road_pair.keys() and subtend_road_pair[max_am_flow_road] in am_inroad_info_dict.keys(): subtend_road_am_flow = am_inroad_info_dict[subtend_road_pair[max_am_flow_road]].delay_info.car_num # 如果进口道流量小于20 则无意义 if min(subtend_road_am_flow, max_am_flow) < 20: return detail, suggestions, total_num # 找出晚高峰最高流量的进口道 for pm_inroad_info in pm_inroad_infos: if pm_inroad_info.delay_info.car_num < 5: continue pm_flow = pm_inroad_info.delay_info.car_num if pm_flow > max_pm_flow: max_pm_flow = pm_flow max_pm_flow_road = pm_inroad_info.inroadid # 如果路段id为空或不在对向对里,则无意义 if max_pm_flow_road == '' or max_pm_flow_road not in subtend_road_pair.keys(): return detail, suggestions, total_num subtend_road_pm_flow = pm_inroad_info_dict[subtend_road_pair[max_pm_flow_road]].delay_info.car_num if min(subtend_road_pm_flow, max_pm_flow) < 20: return detail, suggestions, total_num Fam = max(max_am_flow_road, subtend_road_am_flow) / min(max_am_flow_road, subtend_road_am_flow) Fpm = max(max_pm_flow_road, subtend_road_pm_flow) / min(max_pm_flow_road, subtend_road_pm_flow) if Fam >= 1.5 and Fpm >= 1.5 and max_pm_flow_road == subtend_road_pair[max_am_flow_road]: total_num = 1 src_dir = road_src_dict[max_am_flow_road] full_src_dir = dir_str_dict[src_dir] + dir_str_dict[src_reverse[src_dir]] + '方向' am_src_dir = dir_str_dict[src_dir] + '向' + dir_str_dict[src_reverse[src_dir]] pm_src_dir = dir_str_dict[src_reverse[src_dir]] + '向' + dir_str_dict[src_dir] detail = [{ 'src_dir': full_src_dir, 'child_detail': [ { 'text': '早高峰' }, { 'src_dir': am_src_dir, 'text': '进口道与出口道流量比为' + str(int(Fam * 100)) + '%,晚高峰' }, { 'src_dir': pm_src_dir, 'text': '进口道与出口道流量比为' + str(int(Fpm * 100)) + '%,存在潮汐现象' } ] }] suggestions = [ { 'child_detail': [ { 'text': '根据主车流方向调整绿灯时长分配' } ] }, { 'child_detail': [ { 'src_dir': full_src_dir, 'text': '机动车车道数双向均为3车道及以上,可以考虑设置潮汐车道' } ] } ] return detail, suggestions, total_num # 配时方案 def gen_phase_problems(nodeid, area_id, crossid, time_range, date_list, min_date, max_date, start_hm): phase_problems = { 'item': '配时方案', 'values': [], 'total_num': 0 } phase_problems_detail, err = QueryCrossPhaseDiagnosis(int(nodeid), crossid, [str(item) for item in date_list], time_range, int(area_id)) if not err and phase_problems_detail: phase_problems = phase_problems_detail phase_err_detail, phase_err_suggestions, phase_err_total_num = gen_err_phase_problems(max_date, crossid, min_date, time_range) if len(phase_err_suggestions) > 0: phase_problems['values'].append({ 'item': '路口方案异常', 'detail': phase_err_detail, 'reason': '路口停车次数徒增或大数据计算方案与录入方案不一致', 'suggestions': phase_err_suggestions }) total = phase_problems_detail['total_num'] if phase_problems_detail and len(phase_problems_detail) > 0 else 0 phase_problems['total_num'] = total + phase_err_total_num return phase_problems def gen_err_phase_problems(max_date, crossid, min_date, time_range): cross_examine_records = db_cross.query_cross_examine_records_nostarthm(max_date, crossid, min_date) detail = [] suggestions, total_num = [], 0 color_dict = { 1: '红色', 2: '橙色', 3: '黄色', 4: '绿色' } state_dict = { 1: '需核查逾期', 2: '需核查', 3: '监测中', 4: '核查有异常', 5: '核查无异常', 6: '自动结束' } if len(cross_examine_records) > 0: total_num = 1 detail = [] suggestions = [ { 'child_detail': [ { 'text': '核查该时段配时方案是否正常运行' } ] } ] for cross_examine_record in cross_examine_records: start_hm = convert_time(cross_examine_record['start_hm']) end_hm = convert_time(cross_examine_record['end_hm']) if not time_overlap(time_range, [start_hm + '-' + end_hm]): continue if cross_examine_record['level_color'] == 4 or cross_examine_record['final_state'] == 6: continue first_date = cross_examine_record['first_date'] color_level = color_dict[cross_examine_record['level_color']] final_state = state_dict[cross_examine_record['final_state']] item_detail = [ { 'text': f'从{first_date}起在{start_hm}-{end_hm}时段运行的配时方案有异常变动风险,异常等级为', 'start_hm': start_hm, 'first_date': first_date }, { 'level_color': color_level, 'text': f',处理状态为', 'final_state': final_state } ] detail.append({ 'child_detail': item_detail }) return detail, suggestions, total_num # 路口渠化 def gen_cross_channelized_problems(avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict, cross_ledger_info): road_delay_infos = avg_cross_delay_info.inroad_delay_infos inroad_num_detail, inroad_num_suggestion, inroad_num_err_total_num = gen_inroad_num_problems(road_delay_infos, inroad_static_info_dict, roads_dir_dict) inout_lane_num_gap_problems, inout_lane_num_gap_suggestions, inout_lane_num_gap_total_num = gen_in_out_lane_num_gap_problems(cross_ledger_info) cross_channelized_problems = { 'item': '路口渠化', 'values': [], 'total_num': 0 } sum_num = 0 if inroad_num_err_total_num > 0: sum_num += inroad_num_err_total_num cross_channelized_problems['values'].append( { 'item': '车道资源不匹配', 'detail': inroad_num_detail, 'reason': '同一进口道,直行或左转的流量占比大于50%,且相对流量占比大于60%,且车道数量占比小于流量占比的一半(多方向放行车道每个方向各算0.5)', 'suggestions': inroad_num_suggestion } ) if inout_lane_num_gap_total_num > 0: sum_num += inout_lane_num_gap_total_num cross_channelized_problems['values'].append( { 'item': '进出口车道数不匹配', 'detail': inout_lane_num_gap_problems, 'reason': '进口道车道数比对向的出口道车道数多2', 'suggestions': inout_lane_num_gap_suggestions } ) cross_channelized_problems['total_num'] = sum_num return cross_channelized_problems # 路口渠化-车道资源不匹配 def gen_inroad_num_problems(road_delay_infos, inroad_static_info_dict, roads_dir_dict): detail = [] suggestions, err_src_dirs, total_num = [], {}, 0 road_src_dict = {v['in']: k for k, v in roads_dir_dict.items()} max_flow_car_num = 0 for road_delay in road_delay_infos: inroadid = road_delay.inroadid src_dir = road_src_dict[inroadid] if inroadid in road_src_dict else None if src_dir is None: continue lane_info = inroad_static_info_dict[inroadid]['lane_turn_info'] flow_delay_infos = road_delay.flow_delay_infos max_car_num = max([item.delay_info.car_num for item in flow_delay_infos if item.turn_type in [0, 1]]) if flow_delay_infos else 0 if max_car_num > max_flow_car_num: max_flow_car_num = max_car_num road_car_num = road_delay.delay_info.car_num turn_delay_dict = {item.turn_type: item for item in flow_delay_infos} left_lane_num_half, straight_lane_num_half, right_lane_num_half = count_lsr_half(lane_info) lane_num_info, count_num = count_lsr(lane_info) sum_half_num = left_lane_num_half + straight_lane_num_half + right_lane_num_half left_lane_num_rate = int(round(left_lane_num_half / sum_half_num, 2) * 100) if sum_half_num > 0 else 0 straight_lane_num_rate = int(round(straight_lane_num_half / sum_half_num, 2) * 100) if sum_half_num > 0 else 0 for turn_type in [0, 1]: if turn_type not in turn_delay_dict.keys(): continue if turn_type == 0: turn_type_str = '直行' lane_num_rate = straight_lane_num_rate else: turn_type_str = '左转' lane_num_rate = left_lane_num_rate flow_car_num = turn_delay_dict[turn_type].delay_info.car_num relative_rate = flow_car_num / max_flow_car_num flow_rate = flow_car_num / road_car_num if relative_rate > 0.6 and lane_num_rate < 0.5: if src_dir in err_src_dirs.keys(): err_src_dirs[src_dir].append({ 'turn_type': turn_type_str, 'flow_rate': flow_rate, 'lane_num_rate': lane_num_rate, 'lane_num_info': lane_num_info, 'count_num': count_num }) else: err_src_dirs[src_dir] = [{ 'turn_type': turn_type_str, 'flow_rate': flow_rate, 'lane_num_rate': lane_num_rate, 'lane_num_info': lane_num_info, 'count_num': count_num }] if len(err_src_dirs.keys()) > 0: detail = [] total_num = 1 for src_dir in err_src_dirs.keys(): err_road_dict = err_src_dirs[src_dir] src_detail = { 'src_dir': dir_str_dict[src_dir] + '进口', 'child_detail': [] } for turn_type in err_road_dict: src_detail['child_detail'].append( { 'turn_type': turn_type['turn_type'], 'text': '分流转向比过大(' + str(int(round(turn_type['flow_rate'], 2) * 100)) + '%),但分配的转向车道占比不足(' + str(int(round(turn_type['lane_num_rate'], 2) * 100)) + '%),分配车道资源不匹配' } ) detail.append( [src_detail] ) suggestions.append({ 'child_detail': [ { 'text': '调整', 'src_dir': dir_str_dict[src_dir] + '进口' }, { 'text': '车道分配情况' }, { 'next_line': f"{err_src_dirs[src_dir][0]['lane_num_info']}现有车道数分别为{err_src_dirs[src_dir][0]['count_num']}" } ] }) # suggestions.append( # { # 'child_detail': [ # { # 'src_dir': dir_str_dict[src_dir] + '进口', # 'text': f"{err_src_dirs[src_dir][0]['lane_num_info']}现有车道数分别为{err_src_dirs[src_dir][0]['count_num']}" # } # ] # } # ) return detail, suggestions, total_num # 路口渠化-进出口道数不匹配 def gen_in_out_lane_num_gap_problems(cross_ledger_info): detail = [] suggestions, err_src_dict, total_num = [], {}, 0 road_infos = cross_ledger_info['roads'] for src_dir in road_infos.keys(): entry_lane_num = road_infos[src_dir]['entry_lane_num'] exit_lane_num = '-' if src_reverse[src_dir] in road_infos.keys(): exit_lane_num = road_infos[src_reverse[src_dir]]['exit_lane_num'] if exit_lane_num != '-' and entry_lane_num != '-' and entry_lane_num - exit_lane_num > 2: err_src_dict[src_dir] = { 'in': entry_lane_num, 'out': exit_lane_num } if len(err_src_dict.keys()) > 0: detail = [] total_num = 1 suggestions = { 'child_detail': [ { 'text': '预留全红时间清空积压车辆' } ] } for src_dir in err_src_dict.keys(): detail.append([ { 'src_dir': dir_str_dict[src_dir] + '进口', 'child_detail': [ { 'text': '进口道车道数(' + str(err_src_dict[src_dir]['in']) + ')比对向出口道车道数(' + str(err_src_dict[src_dir]['out']) + ')多,车辆冰岛减速可能会造成路口车辆积压' } ] } ]) return detail, suggestions, total_num def check_is_peak_tp(time_range, area_id, nodeid): tp_desc = db_tmnet.query_city_tp_info(nodeid, area_id) peak_tp = ['07:00-09:00', '17:00-19:00'] if tp_desc and len(tp_desc) > 0: peak_tp = tp_desc[0]['peak_tp'].split(',') return time_overlap(time_range, peak_tp) def get_flow_phase_detail(src_dir, turn_type_list, cross_phase): phase_suggestion = [] if not cross_phase: return phase_suggestion for item in cross_phase.data: # 单个元素的含义为一个周计划 schedule_suggestion = [] need_src_flow = {dir_str_dict[src_dir] + int_turn_type2str[item]: 0 for item in turn_type_list} schedule_name = item.schedule_name schedule_week = item.schedule_week scheduleid = item.scheduleid tp_info = item.tps[0] tp_start = tp_info.tp_start planid = tp_info.planid plan_name = tp_info.plan_name stage_list = tp_info.stage_list for stage in stage_list: phases_name = stage.phases_name phases_name_list = [] if phases_name != '': phases_name_list = phases_name.split(',') green = stage.green for key in need_src_flow.keys(): if key in phases_name_list: need_src_flow[key] += green need_src_flow = {k: v for k, v in need_src_flow.items() if v != 0} if len(need_src_flow.keys()) > 0: schedule_suggestion.append( { 'phase': f"【{schedule_name}】{str(planid)}-{plan_name}", 'text': '现有的', 'schedule_week': schedule_week, 'scheduleid': scheduleid, 'tp_start': tp_start } ) keys = list(need_src_flow.keys()) for i in range(len(keys)): if i != len(keys) - 1: schedule_suggestion.append( { 'src_dir': keys[i], 'text': '绿灯时长为' + str(need_src_flow[keys[i]]) + 's,' } ) else: schedule_suggestion.append( { 'src_dir': keys[i], 'text': '绿灯时长为' + str(need_src_flow[keys[i]]) + 's' } ) phase_suggestion.append({ 'child_detail': schedule_suggestion }) return phase_suggestion def gen_src_dir_phase_detail(max_src_dir, min_src_dir, cross_phase, is_tide=False): phase_suggestion = [] if not cross_phase: return phase_suggestion for item in cross_phase.data: # 单个元素的含义为一个周计划 schedule_suggestion = [] schedule_name = item.schedule_name src_green = { dir_str_dict[max_src_dir]: 0, dir_str_dict[min_src_dir]: 0 } schedule_week = item.schedule_week scheduleid = item.scheduleid tp_info = item.tps[0] tp_start = tp_info.tp_start planid = tp_info.planid plan_name = tp_info.plan_name stage_list = tp_info.stage_list for stage in stage_list: phases_name = stage.phases_name src_dir_set = set() if phases_name != '': tmp_list = phases_name.split(',') for phase in tmp_list: if '行人' in phase: continue src_dir_set.add(phase[:-2]) if dir_str_dict[max_src_dir] in src_dir_set: src_green[dir_str_dict[max_src_dir]] += stage.green if dir_str_dict[min_src_dir] in src_dir_set: src_green[dir_str_dict[min_src_dir]] += stage.green if src_green[dir_str_dict[max_src_dir]] != 0 and src_green[dir_str_dict[min_src_dir]] != 0: schedule_suggestion = [ { 'phase': f"【{schedule_name}】{str(planid)}-{plan_name}", 'text': '现有的', 'schedule_week': schedule_week, 'scheduleid': scheduleid, 'tp_start': tp_start }, { 'src_dir': dir_str_dict[max_src_dir] + '进口', 'text': '绿灯时长为' + str(src_green[dir_str_dict[max_src_dir]]) + 's,' }, { 'src_dir': dir_str_dict[min_src_dir] + '进口', 'text': '绿灯时长为' + str(src_green[dir_str_dict[min_src_dir]]) + 's' } ] if schedule_suggestion: phase_suggestion.append({ 'child_detail': schedule_suggestion }) need2sug = False if is_tide: for item in cross_phase.data: stages_info = item.tps[0].stage_list for stage in stages_info: phases_name = stage.phases_name if dir_str_dict[max_src_dir] + '直行' in phases_name\ and dir_str_dict[min_src_dir] + '直行' in phases_name\ and dir_str_dict[max_src_dir] + '左转' in phases_name\ and dir_str_dict[min_src_dir] + '左转' in phases_name: need2sug = True if need2sug: phase_suggestion.append({ 'text': '相位放行方式可以考虑从对称放行调整为单路口放行' }) return phase_suggestion def count_lsr_half(turn_str): """ 统计车道转向中左转(l)、直行(s)、右转(r)的数量。 当一个车道有多个转向时,每个转向计0.5。 Args: turn_str: 形如 "2|1|1|1|5" 的字符串,每个数字对应 g_turn2str 中的键。 Returns: 形如 "1.0/3.0/1.0" 的字符串,顺序为左转、直行、右转的数量。 """ # 初始化计数器 left_count = 0.0 straight_count = 0.0 right_count = 0.0 # 分割输入字符串 turn_keys = turn_str.split('|') # 遍历每个车道 for key in turn_keys: if key in g_turn2str: turn_value = g_turn2str[key] # 忽略掉头和特殊车道 if turn_value in ['t', 'bus', 'reversible', '-']: continue # 计算该车道的转向数量 turn_count = 0 if 'l' in turn_value: turn_count += 1 if 's' in turn_value: turn_count += 1 if 'r' in turn_value: turn_count += 1 # 如果该车道有转向,则按转向数量平分权重 if turn_count > 0: weight = 0.5 if turn_count > 1 else 1.0 if 'l' in turn_value: left_count += weight if 's' in turn_value: straight_count += weight if 'r' in turn_value: right_count += weight # 返回格式化的结果,保留一位小数 return round(left_count, 1), round(straight_count, 1), round(right_count, 1) def get_prev_cross(nodeid, area_id, roads_dir_dict): src_cross = {} for src_dir in roads_dir_dict.keys(): inroadid = roads_dir_dict[src_dir]['in'] if inroadid and inroadid != '-': row_list = db_tmnet.query_out_cross(inroadid) if len(row_list) > 0: crossid = row_list[0]['from_crossid'] length = row_list[0]['from_road_length'] cross_info = db_tmnet.query_next_cross_info(nodeid, area_id, crossid) if cross_info: src_cross[src_dir] = cross_info[0] src_cross[src_dir]['length'] = length return src_cross def _to_int_list(nums, mask): """把非'-'位转成int,再微调±1使总和=100,只动最大位。""" # 先全部转int nums = [int(v) if not mask[i] else v for i, v in enumerate(nums)] # 有效位下标 valid = [i for i, m in enumerate(mask) if not m] if len(valid) < 2: # 单有效或全'-'无需调 return nums s = sum(nums[i] for i in valid) if s == 100: return nums # 差±1 delta = s - 100 idx_big = max(valid, key=nums.__getitem__) # 仍只动最大位 nums[idx_big] -= delta return nums def fix_to_100(a, b, c): nums = [a, b, c] mask = [v == '-' for v in nums] valid_idx = [i for i, m in enumerate(mask) if not m] # ----- 全 '-' ----- if not valid_idx: return nums[:] # ----- 仅 1 个有效 ----- if len(valid_idx) == 1: nums[valid_idx[0]] = 100 return _to_int_list(nums, mask) # ----- 2 个有效 ----- if len(valid_idx) == 2: i0, i1 = valid_idx v0, v1 = nums[i0], nums[i1] if 100 in (v0, v1) or 0 in (v0, v1): r1 = random.randint(1, 3) r2 = 100 - r1 # 原100位放较大值,原0位放较小值 if v0 == 100 or v1 == 100: idx_100 = i0 if v0 == 100 else i1 idx_zro = i1 if v0 == 100 else i0 nums[idx_100] = max(r1, r2) nums[idx_zro] = min(r1, r2) else: idx_zro = i0 if v0 == 0 else i1 idx_nz = i1 if v0 == 0 else i0 nums[idx_zro] = min(r1, r2) nums[idx_nz] = max(r1, r2) return _to_int_list(nums, mask) # 这里已int,但保险再调一次 # 正常值:等比缩放 s = v0 + v1 nums[i0] = v0 * 100 / s nums[i1] = v1 * 100 / s return _to_int_list(nums, mask) # ----- 3 个有效 ----- if 100 in nums or 0 in nums: if 100 in nums: idx_100 = nums.index(100) zeros = [i for i in range(3) if nums[i] == 0] r1 = random.randint(1, 3) r2 = 100 - r1 nums[zeros[0]] = r1 nums[zeros[1]] = r2 nums[idx_100] = 0 # 先清零,再走下面统一处理 zeros = [i for i in range(3) if nums[i] == 0] if len(zeros) == 1: nums[zeros[0]] = random.randint(1, 3) elif len(zeros) == 2: r1 = random.randint(1, 3) r2 = 100 - r1 nums[zeros[0]] = r1 nums[zeros[1]] = r2 return _to_int_list(nums, mask) # 正常缩放 s = sum(nums) if s == 100: return [int(v) for v in nums] for i in range(3): nums[i] = max(1, round(nums[i] * 100 / s)) return _to_int_list(nums, mask) def query_cross_delay_info_controller_export_excel(road_flow_delay_infos, road_flow_turn_rate): wb = Workbook() sheet1 = wb.active sheet1.title = "调度计划表" # 给第一个 sheet 命名 table_head = [ {"range": "A1:A2", "value": "进口道"}, {"range": "B1:B2", "value": "服务水平"}, {"range": "C1:D1", "value": "转向服务水平"}, {"range": "C2", "value": "左转"}, {"range": "D2", "value": "直行"}, {"range": "E1:E2", "value": "停车次数"}, {"range": "F1:G1", "value": "转向停车次数"}, {"range": "F2", "value": "左转"}, {"range": "G2", "value": "直行"}, {"range": "H1:H2", "value": "多次停车率", "width": 20}, {"range": "I1:J1", "value": "转向多次停车率"}, {"range": "I2", "value": "左转"}, {"range": "J2", "value": "直行"}, {"range": "K1:K2", "value": "延误时间(s)", "width": 20}, {"range": "L1:M1", "value": "转向延误时间(s)"}, {"range": "L2", "value": "左转"}, {"range": "M2", "value": "直行"}, {"range": "N1:N2", "value": "平均速度(km/h)", "width": 20}, {"range": "O1:P1", "value": "转向平均速度(km/h)"}, {"range": "O2", "value": "左转"}, {"range": "P2", "value": "直行"}, {"range": "Q1:Q2", "value": "不停车速度(km/h)", "width": 20}, {"range": "R1:S1", "value": "转向不停车速度"}, {"range": "R2", "value": "左转"}, {"range": "S2", "value": "直行"}, {"range": "T1:T2", "value": "进口道流量占比","width": 20}, {"range": "U1:W1", "value": "分流转向占比"}, {"range": "U2", "value": "左转"}, {"range": "V2", "value": "直行"}, {"range": "W2", "value": "右转"}, {"range": "X1:X2", "value": "出口道流量占比","width": 20}, {"range": "Y1:AA1", "value": "汇入转向占比"}, {"range": "Y2", "value": "左转"}, {"range": "Z2", "value": "直行"}, {"range": "AA2", "value": "右转"}, {"range": "AB1:AB2", "value": "转向失衡系数","width": 20}, ] for item_head in table_head: sheet1.merge_cells(item_head['range']) cell = item_head['range'].split(":")[0] sheet1[cell] = item_head['value'] sheet1[cell].alignment = Alignment(horizontal='center', vertical='center') if item_head.get('width'): sheet1.column_dimensions[cell[:-1]].width = item_head['width'] src_dir_map = {} for road, item_delay_infos in road_flow_delay_infos.items(): src_dir = item_delay_infos['src_dir'] if src_dir not in src_dir_map: src_dir_map[src_dir] = [] src_dir_map[src_dir].append(srcDir_toStr(item_delay_infos['src_dir'])) src_dir_map[src_dir].append(item_delay_infos['service_level']) src_dir_map[src_dir].append(item_delay_infos['flow_delays'][1]['service_level'] if 1 in item_delay_infos['flow_delays'] else '-') src_dir_map[src_dir].append(item_delay_infos['flow_delays'][0]['service_level'] if 0 in item_delay_infos['flow_delays'] else '-') src_dir_map[src_dir].append(item_delay_infos['stop_times']) src_dir_map[src_dir].append(item_delay_infos['flow_delays'][1]['stop_times'] if 1 in item_delay_infos['flow_delays'] else '-') src_dir_map[src_dir].append(item_delay_infos['flow_delays'][0]['stop_times'] if 0 in item_delay_infos['flow_delays'] else '-') src_dir_map[src_dir].append(item_delay_infos['high_park_percent']) src_dir_map[src_dir].append(item_delay_infos['flow_delays'][1]['high_park_percent'] if 1 in item_delay_infos['flow_delays'] else '-') src_dir_map[src_dir].append(item_delay_infos['flow_delays'][0]['high_park_percent'] if 0 in item_delay_infos['flow_delays'] else '-') src_dir_map[src_dir].append(item_delay_infos['delay_time']) src_dir_map[src_dir].append(item_delay_infos['flow_delays'][1]['delay_time'] if 1 in item_delay_infos['flow_delays'] else '-') src_dir_map[src_dir].append(item_delay_infos['flow_delays'][0]['delay_time'] if 0 in item_delay_infos['flow_delays'] else '-') src_dir_map[src_dir].append(item_delay_infos['speed']) src_dir_map[src_dir].append(item_delay_infos['flow_delays'][1]['speed'] if 1 in item_delay_infos['flow_delays'] else '-') src_dir_map[src_dir].append(item_delay_infos['flow_delays'][0]['speed'] if 0 in item_delay_infos['flow_delays'] else '-') src_dir_map[src_dir].append(item_delay_infos['move_speed']) src_dir_map[src_dir].append(item_delay_infos['flow_delays'][1]['move_speed'] if 1 in item_delay_infos['flow_delays'] else '-') src_dir_map[src_dir].append(item_delay_infos['flow_delays'][0]['move_speed'] if 0 in item_delay_infos['flow_delays'] else '-') src_dir_map[src_dir].append(road_flow_turn_rate[road]['in_flow_rate'] if road in road_flow_turn_rate and road_flow_turn_rate[road]['in_flow_rate'] else '-') src_dir_map[src_dir].append(road_flow_turn_rate[road]['l_rate'] if road in road_flow_turn_rate and road_flow_turn_rate[road]['l_rate'] else '-') src_dir_map[src_dir].append(road_flow_turn_rate[road]['s_rate'] if road in road_flow_turn_rate and road_flow_turn_rate[road]['s_rate'] else '-') src_dir_map[src_dir].append(road_flow_turn_rate[road]['r_rate'] if road in road_flow_turn_rate and road_flow_turn_rate[road]['r_rate'] else '-') src_dir_map[src_dir].append(road_flow_turn_rate[road]['out_flow_rate'] if road in road_flow_turn_rate and road_flow_turn_rate[road]['out_flow_rate'] else '-') src_dir_map[src_dir].append(road_flow_turn_rate[road]['out_l_rate'] if road in road_flow_turn_rate and road_flow_turn_rate[road]['out_l_rate'] else '-') src_dir_map[src_dir].append(road_flow_turn_rate[road]['out_s_rate'] if road in road_flow_turn_rate and road_flow_turn_rate[road]['out_s_rate'] else '-') src_dir_map[src_dir].append(road_flow_turn_rate[road]['out_r_rate'] if road in road_flow_turn_rate and road_flow_turn_rate[road]['out_r_rate'] else '-') src_dir_map[src_dir].append(item_delay_infos['imbalance_index']) src_dir_data = list(src_dir_map.values()) for item_src_dir_data in src_dir_data: sheet1.append(item_src_dir_data) file_stream = io.BytesIO() wb.save(file_stream) file_stream.seek(0) # 将指针移到文件开头 return send_file( file_stream, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', as_attachment=True, download_name=f"路口诊断指标.xlsx")