cross_doctor/app/eva_common.py

1155 lines
58 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2025/10/10 19:51
# @Description: 路口评价相关的公共函数
import copy
import json
import logging
import requests
from google.protobuf.json_format import MessageToJson
import proto.xlcomm_pb2 as pb
from app.common_worker import *
from proto.phase_grpc import QueryCrossRunningPhase
src_reverse = {'N': 'S', 'S': 'N', 'W': 'E', 'E': 'W', 'NE': 'SW', 'SW': 'NE', 'SE': 'NW', 'NW': 'SE'}
dir_str_dict = {
'E': '', 'S': '', 'W': '西', 'N': '', 'NE': '东北', 'SE': '东南', 'SW': '西南', 'NW': '西北'
}
int_turn_type2str = {
0: '直行',
1: '左转',
2: '右转',
3: '掉头'
}
int_internet_code2str = {
0: '',
1: '联网',
2: '脱机',
3: '未接入'
}
int_cross_model2str = {
'1': '',
'2': '',
'3': '',
'4': ''
}
def gen_avg_cross_delay_pb(cross_delay_data_list):
"""
路口评价指标数据聚合
:param cross_delay_data_list:
:return:
"""
cross_delay_pb_list = []
for item in cross_delay_data_list:
item_delay_pb = pb.xl_cross_delayinfo_t()
item_delay_pb.ParseFromString(item['data'])
cross_delay_pb_list.append(item_delay_pb)
avg_cross_delay = None
if len(cross_delay_pb_list) == 0:
return avg_cross_delay
elif len(cross_delay_pb_list) == 1:
avg_cross_delay = cross_delay_pb_list[0]
return avg_cross_delay
avg_cross_delay = pb.xl_cross_delayinfo_t()
avg_cross_delay.crossid = cross_delay_pb_list[0].crossid
avg_cross_delay.tp.start_hm = cross_delay_pb_list[0].tp.start_hm
avg_cross_delay.tp.end_hm = cross_delay_pb_list[0].tp.end_hm
avg_cross_delay.tp.weekday = cross_delay_pb_list[0].tp.weekday
avg_cross_delay.tp.type = cross_delay_pb_list[0].tp.type
avg_cross_delay.daynum = 0
sum_car_num, delay_time, stop_times, queue_len, speed, jam_index, park_time, std_flow, high_park_percent, truck_percent, park_percent, move_speed, imbalance_index\
= 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
# 路口延误数据指标聚合逻辑
for item in cross_delay_pb_list:
item_delay_info = item.delay_info
item_car_num = item_delay_info.car_num
if item_car_num != 0:
sum_car_num += item_car_num
delay_time += item_delay_info.delay_time * item_car_num
stop_times += item_delay_info.stop_times * item_car_num
queue_len += item_delay_info.queue_len * item_car_num
speed += item_delay_info.speed * item_car_num
jam_index += item_delay_info.jam_index * item_car_num
park_time += item_delay_info.park_time * item_car_num
high_park_percent += item_delay_info.high_park_percent * item_car_num
truck_percent += item_delay_info.truck_percent * item_car_num
park_percent += item_delay_info.park_percent * item_car_num
move_speed += item_delay_info.move_speed * item_car_num
# imbalance_index += item_delay_info.imbalance_index * item_car_num
std_flow += item_delay_info.std_flow
avg_cross_delay.delay_info.car_num = sum_car_num
avg_cross_delay.delay_info.delay_time = int(delay_time / sum_car_num)
avg_cross_delay.delay_info.stop_times = stop_times / sum_car_num
avg_cross_delay.delay_info.queue_len = int(queue_len / sum_car_num)
avg_cross_delay.delay_info.speed = int(speed / sum_car_num)
avg_cross_delay.delay_info.jam_index = jam_index / sum_car_num
avg_cross_delay.delay_info.park_time = int(park_time / sum_car_num)
avg_cross_delay.delay_info.high_park_percent = int(high_park_percent / sum_car_num)
avg_cross_delay.delay_info.truck_percent = int(truck_percent / sum_car_num)
avg_cross_delay.delay_info.park_percent = int(park_percent / sum_car_num)
avg_cross_delay.delay_info.move_speed = int(move_speed / sum_car_num)
# avg_cross_delay.delay_info.imbalance_index = imbalance_index / sum_car_num
avg_cross_delay.delay_info.std_flow = int(std_flow / len(cross_delay_pb_list))
# 进口道延误数据聚合处理逻辑
inroad_delay_info_dict = {}
for item in cross_delay_pb_list:
inroad_delay_infos = item.inroad_delay_infos
for inroad_item in inroad_delay_infos:
inroadid = inroad_item.inroadid
if inroadid not in inroad_delay_info_dict:
inroad_delay_info_dict[inroadid] = [inroad_item]
else:
inroad_delay_info_dict[inroadid].append(inroad_item)
flow_delay_list, inroad_delay_pb_list = [], []
flow_delay_info_dict = {}
max_stop_times, min_stop_times = 0, 99999
for inroadid in inroad_delay_info_dict.keys():
road_delay_infos = inroad_delay_info_dict[inroadid]
inroad_sum_car_num, inroad_delay_time, inroad_stop_times, inroad_queue_len, inroad_speed, inroad_jam_index, inroad_park_time, inroad_high_park_percent, inroad_truck_percent, inroad_park_percent, inroad_move_speed, inroad_imbalance_index, inroad_std_flow, inroad_travel_time\
= 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
ffs, capacity = road_delay_infos[0].delay_info.ffs, road_delay_infos[0].delay_info.capacity
turn_ratio_0, turn_ratio_1, turn_ratio_2, turn_ratio_3 = 0, 0, 0, 0
for road_delay_info in road_delay_infos:
inroad_sum_car_num += road_delay_info.delay_info.car_num
inroad_delay_time += road_delay_info.delay_info.delay_time * road_delay_info.delay_info.car_num
inroad_stop_times += road_delay_info.delay_info.stop_times * road_delay_info.delay_info.car_num
inroad_queue_len += road_delay_info.delay_info.queue_len * road_delay_info.delay_info.car_num
inroad_speed += road_delay_info.delay_info.speed * road_delay_info.delay_info.car_num
inroad_jam_index += road_delay_info.delay_info.jam_index * road_delay_info.delay_info.car_num
inroad_park_time += road_delay_info.delay_info.park_time * road_delay_info.delay_info.car_num
inroad_high_park_percent += road_delay_info.delay_info.high_park_percent * road_delay_info.delay_info.car_num
inroad_truck_percent += road_delay_info.delay_info.truck_percent * road_delay_info.delay_info.car_num
inroad_park_percent += road_delay_info.delay_info.park_percent * road_delay_info.delay_info.car_num
inroad_move_speed += road_delay_info.delay_info.move_speed * road_delay_info.delay_info.car_num
inroad_travel_time += road_delay_info.delay_info.travel_time * road_delay_info.delay_info.car_num
# inroad_imbalance_index += delay_info.imbalance_index * delay_info.car_num
inroad_std_flow += road_delay_info.delay_info.std_flow
turn_ratio_0 += road_delay_info.delay_info.turn_ratio_0
turn_ratio_1 += road_delay_info.delay_info.turn_ratio_1
turn_ratio_2 += road_delay_info.delay_info.turn_ratio_2
turn_ratio_3 += road_delay_info.delay_info.turn_ratio_3
# 流向级别数据指标聚合处理逻辑
flow_delay_infos = road_delay_info.flow_delay_infos
flow_delay_list.extend(flow_delay_infos)
inroad_delay_info = pb.xl_inroad_delayinfo_t()
inroad_delay_info.inroadid = inroadid
inroad_delay_info.delay_info.ffs = ffs
inroad_delay_info.delay_info.capacity = capacity
inroad_delay_info.delay_info.car_num = inroad_sum_car_num
inroad_delay_info.delay_info.delay_time = int(inroad_delay_time / inroad_sum_car_num)
inroad_delay_info.delay_info.stop_times = inroad_stop_times / inroad_sum_car_num
if stop_times > max_stop_times:
max_stop_times = stop_times
if stop_times < min_stop_times:
min_stop_times = stop_times
inroad_delay_info.delay_info.queue_len = int(inroad_queue_len / inroad_sum_car_num)
inroad_delay_info.delay_info.speed = int(inroad_speed / inroad_sum_car_num)
inroad_delay_info.delay_info.jam_index = inroad_jam_index / inroad_sum_car_num
inroad_delay_info.delay_info.park_time = int(inroad_park_time / inroad_sum_car_num)
inroad_delay_info.delay_info.high_park_percent = int(inroad_high_park_percent / inroad_sum_car_num)
inroad_delay_info.delay_info.truck_percent = int(inroad_truck_percent / inroad_sum_car_num)
inroad_delay_info.delay_info.park_percent = int(inroad_park_percent / inroad_sum_car_num)
inroad_delay_info.delay_info.move_speed = int(inroad_move_speed / inroad_sum_car_num)
inroad_delay_info.delay_info.travel_time = int(inroad_travel_time / inroad_sum_car_num)
# inroad_delay_info.delay_info.imbalance_index = inroad_imbalance_index / inroad_sum_car_num
inroad_delay_info.delay_info.std_flow = int(inroad_std_flow / len(road_delay_infos))
inroad_delay_info.delay_info.turn_ratio_0 = turn_ratio_0
inroad_delay_info.delay_info.turn_ratio_1 = turn_ratio_1
inroad_delay_info.delay_info.turn_ratio_2 = turn_ratio_2
inroad_delay_info.delay_info.turn_ratio_3 = turn_ratio_3
inroad_delay_pb_list.append(inroad_delay_info)
cross_imbalance_index = round((max_stop_times - min_stop_times) / avg_cross_delay.delay_info.stop_times, 2) if avg_cross_delay.delay_info.stop_times != 0 else 0
avg_cross_delay.delay_info.imbalance_index = cross_imbalance_index
for flow_delay_info in flow_delay_list:
xlink_id = flow_delay_info.xlink_id
if xlink_id not in flow_delay_info_dict:
flow_delay_info_dict[xlink_id] = [flow_delay_info]
else:
flow_delay_info_dict[xlink_id].append(flow_delay_info)
for xlink_id in flow_delay_info_dict.keys():
flow_delay_infos = flow_delay_info_dict[xlink_id]
inroadid = flow_delay_infos[0].inroadid
turn_type = flow_delay_infos[0].turn_type
flow_sum_car_num, flow_delay_time, flow_stop_times, flow_queue_len, flow_speed, flow_jam_index, flow_park_time, flow_high_park_percent, flow_truck_percent, flow_park_percent, flow_move_speed, flow_imbalance_index, flow_std_flow, flow_travel_time \
= 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
for flow_delay_info in flow_delay_infos:
flow_sum_car_num += flow_delay_info.delay_info.car_num
flow_delay_time += flow_delay_info.delay_info.delay_time * flow_delay_info.delay_info.car_num
flow_stop_times += flow_delay_info.delay_info.stop_times * flow_delay_info.delay_info.car_num
flow_queue_len += flow_delay_info.delay_info.queue_len * flow_delay_info.delay_info.car_num
flow_travel_time += flow_delay_info.delay_info.travel_time * flow_delay_info.delay_info.car_num
flow_speed += flow_delay_info.delay_info.speed * flow_delay_info.delay_info.car_num
flow_jam_index += flow_delay_info.delay_info.jam_index * flow_delay_info.delay_info.car_num
flow_park_time += flow_delay_info.delay_info.park_time * flow_delay_info.delay_info.car_num
flow_high_park_percent += flow_delay_info.delay_info.high_park_percent * flow_delay_info.delay_info.car_num
flow_truck_percent += flow_delay_info.delay_info.truck_percent * flow_delay_info.delay_info.car_num
flow_park_percent += flow_delay_info.delay_info.park_percent * flow_delay_info.delay_info.car_num
flow_move_speed += flow_delay_info.delay_info.move_speed * flow_delay_info.delay_info.car_num
# flow_imbalance_index += flow_delay_info.delay_info.imbalance_index * flow_delay_info.delay_info.car_num
flow_std_flow += flow_delay_info.delay_info.std_flow
flow_info = pb.xl_flow_delayinfo_t()
flow_info.xlink_id = xlink_id
flow_info.inroadid = inroadid
flow_info.turn_type = turn_type
flow_info.delay_info.car_num = flow_sum_car_num
flow_info.delay_info.delay_time = int(flow_delay_time / flow_sum_car_num)
flow_info.delay_info.stop_times = flow_stop_times / flow_sum_car_num
flow_info.delay_info.queue_len = int(flow_queue_len / flow_sum_car_num)
flow_info.delay_info.speed = int(flow_speed / flow_sum_car_num)
flow_info.delay_info.travel_time = int(flow_travel_time / flow_sum_car_num)
flow_info.delay_info.jam_index = flow_jam_index / flow_sum_car_num
flow_info.delay_info.park_time = int(flow_park_time / flow_sum_car_num)
flow_info.delay_info.high_park_percent = int(flow_high_park_percent / flow_sum_car_num)
flow_info.delay_info.truck_percent = int(flow_truck_percent / flow_sum_car_num)
flow_info.delay_info.park_percent = int(flow_park_percent / flow_sum_car_num)
flow_info.delay_info.move_speed = int(flow_move_speed / flow_sum_car_num)
# flow_info.delay_info.imbalance_index = flow_imbalance_index / flow_sum_car_num
flow_info.delay_info.std_flow = int(flow_std_flow / len(flow_delay_infos))
for inroad_info in inroad_delay_pb_list:
if inroad_info.inroadid == inroadid:
inroad_info.flow_delay_infos.append(flow_info)
inroad_delay_infos_with_imbalance = calc_inroad_imbalance_index(inroad_delay_pb_list)
avg_cross_delay.inroad_delay_infos.extend(inroad_delay_infos_with_imbalance)
# 出口道流量数据聚合逻辑
outroad_delay_info_dict = {}
for item in cross_delay_pb_list:
outroadid_infos = item.outroad_infos
for outroadid_info in outroadid_infos:
outroadid = outroadid_info.outroadid
if outroadid not in outroad_delay_info_dict:
outroad_delay_info_dict[outroadid] = [outroadid_info]
else:
outroad_delay_info_dict[outroadid].append(outroadid_info)
for outroadid in outroad_delay_info_dict.keys():
outroadid_infos = outroad_delay_info_dict[outroadid]
car_num, turn_ratio_0, turn_ratio_1, turn_ratio_2, turn_ratio_3 = 0, 0, 0, 0, 0
for outroadid_info in outroadid_infos:
turn_info = outroadid_info.turn_info
car_num += turn_info.car_num
turn_ratio_0 += turn_info.turn_ratio_0
turn_ratio_1 += turn_info.turn_ratio_1
turn_ratio_2 += turn_info.turn_ratio_2
turn_ratio_3 += turn_info.turn_ratio_3
outroadid_info = pb.xl_outroad_info_t()
outroadid_info.outroadid = outroadid
outroadid_info.turn_info.car_num = car_num
outroadid_info.turn_info.turn_ratio_0 = turn_ratio_0
outroadid_info.turn_info.turn_ratio_1 = turn_ratio_1
outroadid_info.turn_info.turn_ratio_2 = turn_ratio_2
outroadid_info.turn_info.turn_ratio_3 = turn_ratio_3
avg_cross_delay.outroad_infos.append(outroadid_info)
return avg_cross_delay
# 生成路口诊断页面路口概览数据指标的函数
def gen_overview_index(avg_cross_delay_info, inroad_static_info_dict, nodeid, date_list, roads_dir_dict):
car_num = avg_cross_delay_info.delay_info.car_num
crossid = avg_cross_delay_info.crossid
jam_index = round(avg_cross_delay_info.delay_info.jam_index, 2)
stop_times = round(avg_cross_delay_info.delay_info.stop_times, 2)
high_park_percent = str(avg_cross_delay_info.delay_info.high_park_percent) + '%'
imbalance_index = round(avg_cross_delay_info.delay_info.imbalance_index, 2)
speed = avg_cross_delay_info.delay_info.speed / 100
move_speed = avg_cross_delay_info.delay_info.move_speed / 100
park_time = avg_cross_delay_info.delay_info.park_time
delay_time = avg_cross_delay_info.delay_info.delay_time
high_stop_turn_ratio_desc, main_flow_src_desc = gen_high_stop_turn_ratio_desc(avg_cross_delay_info.inroad_delay_infos, inroad_static_info_dict, car_num)
tide_index_list = calc_tide_index(crossid, nodeid, date_list, roads_dir_dict)
usable_tide_list = [item for item in tide_index_list if item != 0]
tide_index = round(sum(usable_tide_list) / len(usable_tide_list), 2) if len(usable_tide_list) > 0 else 0
service_level = calc_service_level(delay_time)
overview_res = {
'jam_index': jam_index,
'stop_times': stop_times,
'high_park_percent': high_park_percent,
'imbalance_index': imbalance_index,
'speed': speed,
'move_speed': move_speed,
'park_time': park_time,
'delay_time': delay_time,
'service_level': service_level,
'high_stop_turn_ratio_desc': high_stop_turn_ratio_desc,
'main_flow_src_desc': main_flow_src_desc,
'tide_index': tide_index
}
return overview_res
def gen_high_stop_turn_ratio_desc(inroad_delay_infos, inroad_static_info_dict, car_num):
src_dict, src_desc_list, main_flow_src_list = {}, [], []
for inroad_info in inroad_delay_infos:
inroadid = inroad_info.inroadid
if inroadid not in inroad_static_info_dict.keys():
continue
inroad_car_num = inroad_info.delay_info.car_num
src_dir = dir_str_dict[inroad_static_info_dict[inroadid]['src_direct']]
if inroad_car_num / car_num > 0.25:
check_res = found_max_car_num_flow(inroad_car_num, inroad_info)
if check_res != -1:
main_flow_src_list.append(src_dir + int_turn_type2str[check_res])
flow_delay_infos = inroad_info.flow_delay_infos
for item in flow_delay_infos:
if item.stop_times >= 1:
turn_type = int_turn_type2str[item.turn_type]
if src_dir not in src_dict.keys():
src_dict[src_dir] = [turn_type]
else:
src_dict[src_dir].append(turn_type)
for item in src_dict.keys():
src_desc_list.append(item + '/'.join(src_dict[item]))
return ''.join(src_desc_list) if src_desc_list else '', ''.join(main_flow_src_list) if main_flow_src_list else ''
def found_max_car_num_flow(inroad_car_num, inroad_info):
data_list = [inroad_info.turn_ratio_0, inroad_info.turn_ratio_1, inroad_info.turn_ratio_2, inroad_info.turn_ratio_3]
for i in range(len(data_list)):
if data_list[i] / inroad_car_num > 0.5:
return i
return -1
def query_cross_ledger_info(crossid, nodeid, area_id, userid):
ledger_url = f"http://120.53.125.169:7070/api/common/cross_ledger_detail?crossid={crossid}&crossno=&nodeid={nodeid}&area_id={area_id}&userid={userid}"
headers = {"Content-Type": "application/json"}
cross_ledger_info = requests.get(ledger_url, headers=headers)
if cross_ledger_info.status_code != 200 or cross_ledger_info.json()['status'] != 0:
logging.error(f"查询路口台账信息失败,crossid:{crossid},nodeid:{nodeid},area_id:{area_id},userid:{userid}")
return None, None
cross_ledger_info_dict = json.loads(cross_ledger_info.text)
return cross_ledger_info_dict
def gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict):
division_list, company_list, slc_company_dict, slckind_list, detector_type_dict = gen_ledger_base_info(nodeid, area_id)
slc_company = cross_ledger_info_dict['data']['ledger']['slc_company']
location = cross_ledger_info_dict['data']['ledger']['location']
slc_company_name = slc_company_dict[slc_company] if slc_company in slc_company_dict.keys() else '-'
internet = cross_ledger_info_dict['data']['ledger']['internet']
internet_str = int_internet_code2str[internet]
primary = cross_ledger_info_dict['data']['ledger']['primary']
secondary = cross_ledger_info_dict['data']['ledger']['secondary']
cross_model = int_cross_model2str[primary] + '-' + int_cross_model2str[secondary]
light_infos = cross_ledger_info_dict['data']['light_infos']
src_light_dict = gen_dir_light_info_dict(light_infos)
roads = cross_ledger_info_dict['data']['roads']
dir_list = roads.keys()
length_info, lane_nums_list, road_special_info = {}, [], {}
sum_length = 0
for dir in dir_list:
dir_length = roads[dir]['topology']['entry']['road_length']
straight_left_split, reverse_turn, left_turn_waiting_area, straight_turn_waiting_area, reversible_lanes_info, outside_left = 0, 0, 0, 0, 0, 0
if 2 in src_light_dict[dir] and (1 in src_light_dict[dir] or 3 in src_light_dict[dir]):
straight_left_split = 1
if roads[dir]['canaliza']['reverse_turn'] == 1:
reverse_turn = 1
if roads[dir]['canaliza']['hold_left'] == 1:
left_turn_waiting_area = 1
if roads[dir]['canaliza']['hold_straight'] == 1:
straight_turn_waiting_area = 1
if len(roads[dir]['canaliza']['reversible_lanes_info']) > 0:
reversible_lanes_info = 1
lane_turn_info = roads[dir]['topology']['entry']['lane_turn_info']
if check_outside_left(lane_turn_info):
outside_left = 1
road_special_info[dir_str_dict[dir]] = {
'straight_left_split': straight_left_split,
'reverse_turn': reverse_turn,
'hold_left': left_turn_waiting_area,
'hold_straight': straight_turn_waiting_area,
'reversible_lanes_info': reversible_lanes_info,
'outside_left': outside_left
}
lane_nums_list.append([dir_str_dict[dir] + '进口', roads[dir]['entry_lane_num'], dir_str_dict[src_reverse[dir]] + '出口', roads[dir]['exit_lane_num']])
if dir_length != '-':
sum_length += dir_length
length_info[dir] = dir_length
avg_length = round(sum_length / len(length_info.keys()), 0) if length_info else 0
cross_static_info = {
'crossid': crossid,
'nodeid': nodeid,
'area_id': area_id,
'location': location,
'slc_company': slc_company_name,
'internet': internet_str,
'cross_model': cross_model,
'lane_nums_list': lane_nums_list,
'road_special_info': road_special_info,
'length_info': length_info,
'avg_length': avg_length
}
return cross_static_info, cross_ledger_info_dict['data']
def gen_ledger_base_info(nodeid, area_id):
node_base_params_info = db_tmnet.query_base_info(nodeid, area_id)
# 下列字段当为空时需在前端展示尚未针对该城市进行配置的同义提示语句
division_list, company_list, slc_company_dict, slckind_list, detector_type_dict = [], [], {}, [], {}
for row in node_base_params_info:
if row['param_type'] == 'division':
division_list.append(row['param_value'])
elif row['param_type'] == 'company':
company_list.append(row['param_value'])
elif row['param_type'] == 'slc_company':
slc_company_dict[row['param_code']] = row['param_value']
elif row['param_type'] == 'slckind':
slckind_list.append(row['param_value'])
elif row['param_type'] == 'detector_type':
detector_type_dict[row['param_code']] = row['param_value']
return division_list, company_list, slc_company_dict, slckind_list, detector_type_dict
def gen_dir_light_info_dict(light_infos):
dir_light_dict = {}
for row in light_infos:
srcDir = row['srcDir']
light_type = row['light_type']
if srcDir not in dir_light_dict.keys():
dir_light_dict[srcDir] = [light_type]
else:
dir_light_dict[srcDir].append(light_type)
return dir_light_dict
def check_outside_left(lane_turn_info):
seen_s = False # 是否已出现直行车道
for func in lane_turn_info:
if 's' in func: # 当前是直行车道
seen_s = True
if seen_s and ('l' in func): # 直行车道右侧出现左转
return True
return False
def gen_road_delay_index(avg_cross_delay_info, roads_dir_dict):
road_delay_infos = []
if not avg_cross_delay_info:
return road_delay_infos
road_delay_infos = avg_cross_delay_info.inroad_delay_infos
road_dir = {v['in']: k for k, v in roads_dir_dict.items()}
road_flow_index = {}
for road_index in road_delay_infos:
roadid = road_index.inroadid
if roadid not in road_dir.keys():
continue
service_level = calc_service_level(road_index.delay_info.delay_time)
road_flow_index[roadid] = {
'src_dir': road_dir[roadid],
'stop_times': round(road_index.delay_info.stop_times, 2),
'high_park_percent': str(road_index.delay_info.high_park_percent) + '%',
'imbalance_index': round(road_index.delay_info.imbalance_index, 2),
'park_time': road_index.delay_info.park_time,
'delay_time': road_index.delay_info.delay_time,
'speed': road_index.delay_info.speed / 100,
'move_speed': road_index.delay_info.move_speed / 100,
'service_level': service_level,
'flow_delays': {}
}
flow_delay_infos = road_index.flow_delay_infos
for flow_delay_info in flow_delay_infos:
if flow_delay_info.turn_type not in (0, 1):
continue
flow_service_level = calc_service_level(flow_delay_info.delay_info.delay_time)
road_flow_index[roadid]['flow_delays'][flow_delay_info.turn_type] = {
'stop_times': round(flow_delay_info.delay_info.stop_times, 2),
'high_park_percent': str(flow_delay_info.delay_info.high_park_percent) + '%',
'park_time': flow_delay_info.delay_info.park_time,
'delay_time': flow_delay_info.delay_info.delay_time,
'speed': flow_delay_info.delay_info.speed / 100,
'service_level': flow_service_level,
'move_speed': flow_delay_info.delay_info.move_speed / 100
}
return road_flow_index
def gen_road_dir_dict(cross_ledger_info_dict):
roads_dir_dict = {}
if not cross_ledger_info_dict:
return roads_dir_dict
roads = cross_ledger_info_dict['roads'] if 'roads' in cross_ledger_info_dict.keys() else None
for dir in roads.keys():
roads_dir_dict[dir] = {
'in': roads[dir]['roadid'],
'out': roads[dir]['out_roadid']
}
return roads_dir_dict
def calc_inroad_imbalance_index(inroad_delay_pb_list):
for item in inroad_delay_pb_list:
avg_stop_times = item.delay_info.stop_times
max_stop_times, min_stop_times = 0, 99999
flow_delay_infos = item.flow_delay_infos
for flow_delay_info in flow_delay_infos:
if flow_delay_info.delay_info.stop_times > max_stop_times:
max_stop_times = flow_delay_info.delay_info.stop_times
if flow_delay_info.delay_info.stop_times < min_stop_times:
min_stop_times = flow_delay_info.delay_info.stop_times
imbalance_index = round((max_stop_times - min_stop_times) / avg_stop_times, 2) if avg_stop_times > 0 and max_stop_times > 0 and min_stop_times != 99999 else 0
item.delay_info.imbalance_index = imbalance_index
return inroad_delay_pb_list
def gen_flow_turn_rate_index(avg_cross_delay_info, roads_dir_dict):
road_delay_infos = avg_cross_delay_info.inroad_delay_infos
outroad_infos = avg_cross_delay_info.outroad_infos
road_delay_dict = {item.inroadid: item for item in road_delay_infos}
outroad_info_dict = {item.outroadid: item for item in outroad_infos}
cross_sum_car_num = sum([item.delay_info.car_num for item in road_delay_infos])
cross_out_sum_car_num = sum([item.turn_info.car_num for item in outroad_infos])
road_flow_turn_rate = {}
for dir in roads_dir_dict.keys():
roadid = roads_dir_dict[dir]['in']
l_rate, s_rate, r_rate = '-', '-', '-'
out_l_rate, out_s_rate, out_r_rate = '-', '-', '-'
in_flow_rate, out_flow_rate = '-', '-'
if roadid != '-' and roadid in road_delay_dict.keys():
car_num = road_delay_dict[roadid].delay_info.car_num
in_flow_rate = round(car_num / cross_sum_car_num, 2) if cross_out_sum_car_num != 0 else '-'
flow_delay_infos = road_delay_dict[roadid].flow_delay_infos
for flow_delay_info in flow_delay_infos:
if flow_delay_info.turn_type not in (0, 1, 2):
continue
if flow_delay_info.turn_type == 0:
l_rate = round(flow_delay_info.delay_info.car_num / car_num, 2) if car_num != 0 else '-'
elif flow_delay_info.turn_type == 1:
s_rate = round(flow_delay_info.delay_info.car_num / car_num, 2) if car_num != 0 else '-'
elif flow_delay_info.turn_type == 2:
r_rate = round(flow_delay_info.delay_info.car_num / car_num, 2) if car_num != 0 else '-'
out_road_id = roads_dir_dict[dir]['out']
if out_road_id != '-' and out_road_id in outroad_info_dict.keys():
out_car_num = outroad_info_dict[out_road_id].turn_info.car_num
out_flow_rate = round(out_car_num / cross_out_sum_car_num, 2) if cross_out_sum_car_num != 0 else '-'
out_l_rate = round(outroad_info_dict[out_road_id].turn_info.turn_ratio_1 / out_car_num, 2) if out_car_num != 0 else '-'
out_s_rate = round(outroad_info_dict[out_road_id].turn_info.turn_ratio_0 / out_car_num, 2) if out_car_num != 0 else '-'
out_r_rate = round(outroad_info_dict[out_road_id].turn_info.turn_ratio_2 / out_car_num, 2) if out_car_num != 0 else '-'
road_flow_turn_rate[roadid] = {
'src_dir': dir,
'in_flow_rate': in_flow_rate,
'out_flow_rate': out_flow_rate,
'l_rate': l_rate,
's_rate': s_rate,
'r_rate': r_rate,
'out_l_rate': out_l_rate,
'out_s_rate': out_s_rate,
'out_r_rate': out_r_rate
}
return road_flow_turn_rate
def calc_service_level(delay_time):
if delay_time <= 10:
service_level = 'A'
elif 10 < delay_time <= 20:
service_level = 'B'
elif 20 < delay_time <= 35:
service_level = 'C'
elif 35 < delay_time <= 55:
service_level = 'D'
elif 55 < delay_time <= 80:
service_level = 'E'
else:
service_level = 'F'
return service_level
def calc_tide_index(crossid, nodeid, date_list, roads_dir_dict):
tide_index_list = []
am_tp_start, am_tp_end = 't700', '900'
pm_tp_start, pm_tp_end = 't1700', '1900'
am_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, am_tp_start)
pm_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, pm_tp_start)
date_am_delay_info_dict = {item['day']: item['data'] for item in am_cross_delay_data_list}
date_pm_delay_info_dict = {item['day']: item['data'] for item in pm_cross_delay_data_list}
# 构建对向进口道对
subtend_road_pair = gen_subtend_road_pair(roads_dir_dict)
for date in date_list:
max_am_flow, max_pm_flow, tide_index, max_am_flow_road, max_pm_flow_road = 0, 99999999999, 0, '', ''
if date not in date_am_delay_info_dict.keys() or date not in date_pm_delay_info_dict.keys():
tide_index_list.append(tide_index)
continue
item_am_cross_delay_info = pb.xl_cross_delayinfo_t()
item_pm_cross_delay_info = pb.xl_cross_delayinfo_t()
item_am_cross_delay_info.ParseFromString(date_am_delay_info_dict[date])
item_pm_cross_delay_info.ParseFromString(date_pm_delay_info_dict[date])
if not item_am_cross_delay_info or not item_pm_cross_delay_info:
tide_index_list.append(tide_index)
continue
am_inroad_infos = item_am_cross_delay_info.inroad_delay_infos
pm_inroad_infos = item_pm_cross_delay_info.inroad_delay_infos
am_inroad_info_dict = {item.inroadid: item for item in am_inroad_infos}
pm_inroad_info_dict = {item.inroadid: item for item in pm_inroad_infos}
# 找出早高峰最高流量的进口道
for am_inroad_info in am_inroad_infos:
am_flow = am_inroad_info.delay_info.car_num
if am_flow > max_am_flow:
max_am_flow = am_flow
max_am_flow_road = am_inroad_info.inroadid
if max_am_flow_road == '' or max_am_flow_road not in subtend_road_pair.keys():
tide_index_list.append(tide_index)
continue
subtend_road_am_flow = am_inroad_info_dict[subtend_road_pair[max_am_flow_road]].delay_info.car_num if subtend_road_pair[max_am_flow_road] in am_inroad_info_dict.keys() else 0
# 如果进口道流量小于20 则无意义
if min(subtend_road_am_flow, max_am_flow) < 20:
tide_index_list.append(tide_index)
continue
# 找出晚高峰最高流量的进口道
for pm_inroad_info in pm_inroad_infos:
pm_flow = pm_inroad_info.delay_info.car_num
if pm_flow > max_pm_flow:
max_pm_flow = pm_flow
max_pm_flow_road = pm_inroad_info.inroadid
# 如果路段id为空或不在对向对里则无意义
if max_pm_flow_road == '' or max_pm_flow_road not in subtend_road_pair.keys():
tide_index_list.append(tide_index)
continue
subtend_road_pm_flow = pm_inroad_info_dict[subtend_road_pair[max_pm_flow_road]].delay_info.car_num
if min(subtend_road_pm_flow, max_pm_flow) < 20:
tide_index_list.append(tide_index)
continue
# 如果早晚高峰的最大流量进口道不处于同一个对向对中,则无意义
if max_pm_flow_road != max_am_flow_road and max_pm_flow_road != subtend_road_pair[max_am_flow_road]:
tide_index_list.append(tide_index)
continue
Fam = max(max_am_flow_road, subtend_road_am_flow) / min(max_am_flow_road, subtend_road_am_flow)
Fpm = max(max_pm_flow_road, subtend_road_pm_flow) / min(max_pm_flow_road, subtend_road_pm_flow)
if Fam >= 1.5 and Fpm >= 1.5 and max_pm_flow_road == subtend_road_pair[max_am_flow_road]:
tide_index = min(Fam, Fpm) / max(Fam, Fpm)
elif min(Fam, Fpm) <= 0:
tide_index = 0
elif max_pm_flow_road == max_am_flow_road:
tide_index = Fam * Fpm
tide_index_list.append(tide_index)
return tide_index_list
def gen_subtend_road_pair(roads_dir_dict):
subtend_road_pair = {}
tmp_road_dir = copy.deepcopy(roads_dir_dict)
keys_to_process = list(tmp_road_dir.keys())
for dir in keys_to_process:
if dir in tmp_road_dir.keys() and tmp_road_dir[dir]['in'] == '-':
continue
subtend_dir = src_reverse[dir]
if subtend_dir in tmp_road_dir.keys() and tmp_road_dir[subtend_dir]['in'] != '-':
subtend_road_pair[tmp_road_dir[dir]['in']] = tmp_road_dir[subtend_dir]['in']
tmp_road_dir.pop(dir)
tmp_road_dir.pop(subtend_dir)
return subtend_road_pair
def parse_single_cross_delay_info(crossid, nodeid, data_list, data_type, roads_dir_dict):
data_dict = {}
max_cross_car_num, max_road_car_num, max_flow_car_num = 0, 0, 0
pb_data_list = []
for item in data_list:
if item:
pb_data_list.append(item['data'])
road_delay_infos = item['data'].inroad_delay_infos if item['data'] and item['data'].inroad_delay_infos else []
for road_delay_info in road_delay_infos:
if road_delay_info.delay_info.car_num > max_road_car_num:
max_road_car_num = road_delay_info.delay_info.car_num
flow_delay_infos = road_delay_info.flow_delay_infos
for flow_delay_info in flow_delay_infos:
if flow_delay_info.delay_info.car_num > max_flow_car_num:
max_flow_car_num = flow_delay_info.delay_info.car_num
max_cross_car_num = max(
(x for x in pb_data_list if x is not None),
key=lambda x: x.delay_info.car_num,
default=None
).delay_info.car_num
for item in data_list:
tp_start = item['tp_start']
tp_end = item['tp_end']
day = item['day']
if data_type == 'day':
key = day
elif data_type == 'hour':
key = convert_time(int(tp_start.replace('h', ''))) + '-' + convert_time(int(tp_end.replace('h', '')))
else:
key = day
item_cross_delay_info = item['data']
# 指标内容依次是 停车次数 多次停车率 停车时间 延误时间 平均速度 不停车速度 相对流量 路口失衡系数 潮汐指数 服务水平
overview_data = ['-', '-', '-', '-', '-', '-', '-', '-', '-', '-',
# 下方为上述指标变化率颜色展示flag
0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
tide_index = calc_tide_index(crossid, nodeid, [day], roads_dir_dict) if data_type == 'day' or data_type == 'week' else '-'
road_data_dict = {}
cross_car_num = 0
if item_cross_delay_info:
cross_car_num = item_cross_delay_info.delay_info.car_num
road_delay_infos = item_cross_delay_info.inroad_delay_infos
max_stop_times = max(road_delay_infos, key=lambda x: x.delay_info.stop_times).delay_info.stop_times
min_stop_times = min(road_delay_infos, key=lambda x: x.delay_info.stop_times).delay_info.stop_times
cross_imbalance_index = round((max_stop_times - min_stop_times) / item_cross_delay_info.delay_info.stop_times, 2) if item_cross_delay_info.delay_info.stop_times != 0 else 0
overview_data = [
# 停车次数
round(item_cross_delay_info.delay_info.stop_times, 2),
# 多次停车率 停车时间
item_cross_delay_info.delay_info.high_park_percent, item_cross_delay_info.delay_info.park_time,
# 延误时间 平均速度
item_cross_delay_info.delay_info.delay_time, item_cross_delay_info.delay_info.speed / 100,
# 不停车速度 相对流量占比
item_cross_delay_info.delay_info.move_speed / 100, round(cross_car_num / max_cross_car_num * 100, 2) if max_cross_car_num != 0 else 0,
# 路口失衡系数 潮汐指数(当为小时级指标时,不计算该值) 服务水平
cross_imbalance_index, tide_index[0], calc_service_level(item_cross_delay_info.delay_info.delay_time),
# 指标变化率颜色展示flag, 不含服务水平
0, 0, 0, 0, 0, 0, 0, 0, 0
]
inroad_delay_infos_with_imbalance = calc_inroad_imbalance_index(road_delay_infos)
road_data_dict = {item.inroadid: item for item in inroad_delay_infos_with_imbalance}
data_dict[key] = {
'overview': overview_data,
'roads_data': {}
}
src_dir_list = list(roads_dir_dict.keys())
for src_dir in src_dir_list:
if roads_dir_dict[src_dir]['in'] != '-' and roads_dir_dict[src_dir]['in'] in road_data_dict.keys():
road_car_num = road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.car_num
src_data = [
# 停车次数
round(road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.stop_times, 2),
# 多次停车率
road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.high_park_percent,
# 转向失衡指数
round(road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.imbalance_index, 2),
# 停车时间
road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.park_time,
# 延误时间
road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.delay_time,
# 平均速度
road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.speed / 100,
# 不停车速度
road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.move_speed / 100,
# 相对流量占比
round(road_car_num / max_road_car_num * 100, 2) if max_road_car_num > 0 else '-',
# 进口道流量占比
round(road_car_num / cross_car_num * 100, 2) if cross_car_num > 0 else '-',
# 服务水平
calc_service_level(road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.delay_time),
# 指标变化率颜色展示flag, 顺序为上述指标顺序, 不含服务水平
0, 0, 0, 0, 0, 0, 0, 0, 0
]
flow_delay_infos = road_data_dict[roads_dir_dict[src_dir]['in']].flow_delay_infos
flow_data_dict = {}
for flow_delay_info in flow_delay_infos:
turn_type = flow_delay_info.turn_type
if turn_type not in (0, 1):
continue
flow_data = [
# 停车次数
round(flow_delay_info.delay_info.stop_times, 2),
# 多次停车率
flow_delay_info.delay_info.high_park_percent,
# 停车时间
flow_delay_info.delay_info.park_time,
# 延误时间
flow_delay_info.delay_info.delay_time,
# 平均速度
flow_delay_info.delay_info.speed / 100,
# 不停车速度
flow_delay_info.delay_info.move_speed / 100,
# 相对流量
round(flow_delay_info.delay_info.car_num / max_flow_car_num * 100, 2) if max_flow_car_num > 0 else '-',
# 分流转向占比
round(flow_delay_info.delay_info.car_num / road_car_num * 100, 2) if road_car_num > 0 else '-',
# 服务水平
calc_service_level(flow_delay_info.delay_info.delay_time),
# 指标变化率颜色展示flag 不含服务水平
0, 0, 0, 0, 0, 0, 0, 0
]
flow_data_dict[turn_type] = flow_data
data_dict[key]['roads_data'][src_dir] = {
'road': src_data,
'flow': flow_data_dict
}
return data_dict
def calc_single_day_delay_info_change_rate(data_dict):
res_data_dict = copy.deepcopy(data_dict)
key_list = list(res_data_dict.keys())
empty_overview = ['-', '-', '-', '-', '-', '-', '-', '-', '-', '-',
# 指标变化率颜色展示flag
0, 0, 0, 0, 0, 0, 0, 0, 0]
for i in range(len(key_list)-1, -1, -1):
overview_data = res_data_dict[key_list[i]]['overview'] # list
roads_data = res_data_dict[key_list[i]]['roads_data'] # dict 内部包含多个方向路段和各个方向路段的流向级别的数据
if i != 0:
prev_item_index = i - 1
prev_overview_data = res_data_dict[key_list[prev_item_index]]['overview']
prev_roads_data = res_data_dict[key_list[prev_item_index]]['roads_data']
if overview_data != empty_overview and prev_overview_data != empty_overview:
overview_data_with_change_rate = calc_overview_change_rate(overview_data, prev_overview_data)
res_data_dict[key_list[i]]['overview'] = overview_data_with_change_rate
roads_data_wit_change_rate = calc_roads_data_change_rate(roads_data, prev_roads_data)
res_data_dict[key_list[i]]['roads_data'] = roads_data_wit_change_rate
return res_data_dict
def calc_overview_change_rate(overview_data, prev_overview_data):
res_data = copy.copy(overview_data)
for i in range(len(overview_data)):
if i >= 9:
continue
if overview_data[i] == '-' or prev_overview_data[i] == '-':
res_data[i + 10] = 0
# 变化率 0表示正常 1表示恶化 2表示优化
if i in (0, 1, 2, 3, 6, 7, 8):
# 指标提升代表恶化的有停车次数、多次停车率、转向失衡指数、停车时间、延误时间、相对流量、进口道流量占比
rate = (overview_data[i] - prev_overview_data[i]) / prev_overview_data[i] * 100 if prev_overview_data[i] != '-' and prev_overview_data[i] > 0 else 0
if rate < -20:
res_data[i + 10] = 1
elif rate > 20:
res_data[i + 10] = 2
else:
# 指标下降代表恶化的有平均速度、不停车速度
rate = (overview_data[i] - prev_overview_data[i]) / prev_overview_data[i] * 100 if prev_overview_data[i] != '-' and prev_overview_data[i] > 0 else 0
if rate > 20:
res_data[i + 10] = 1
elif rate < -20:
res_data[i + 10] = 2
return res_data
def calc_roads_data_change_rate(roads_data, prev_roads_data):
res_data = copy.deepcopy(roads_data)
src_dir_list = list(roads_data.keys())
for src_dir in src_dir_list:
src_dir_road_data = roads_data[src_dir]['road']
if src_dir not in prev_roads_data.keys():
continue
# 指标提升代表恶化的有停车次数、多次停车率、停车时间、延误时间、相对流量、分流转向占比
prev_src_dir_road_data = prev_roads_data[src_dir]['road']
for i in range(len(src_dir_road_data)):
if i >= 9:
continue
if src_dir_road_data[i] == '-' or prev_src_dir_road_data[i] == '-':
src_dir_road_data[i + 10] = 0
if i in (0, 1, 2, 3, 4, 7, 8):
rate = (src_dir_road_data[i] - prev_src_dir_road_data[i]) / prev_src_dir_road_data[i] * 100 if prev_src_dir_road_data[i] > 0 else 0
if rate < -20:
src_dir_road_data[i + 10] = 1
elif rate > 20:
src_dir_road_data[i + 10] = 2
else:
# 指标下降代表恶化的有平均速度、不停车速度
rate = (src_dir_road_data[i] - prev_src_dir_road_data[i]) / prev_src_dir_road_data[i] * 100 if prev_src_dir_road_data[i] > 0 else 0
if rate > 20:
src_dir_road_data[i + 10] = 1
elif rate < -20:
src_dir_road_data[i + 10] = 2
flow_datas = roads_data[src_dir]['flow']
prev_flow_datas = prev_roads_data[src_dir]['flow']
for turn_type in flow_datas.keys():
flow_data = flow_datas[turn_type]
if turn_type not in prev_flow_datas.keys():
continue
prev_flow_data = prev_flow_datas[turn_type]
for i in range(len(flow_data)):
if i >= 8:
continue
if flow_data[i] == '-' or prev_flow_data[i] == '-':
flow_data[i + 9] = 0
if i in (0, 1, 2, 3, 6, 7):
rate = (flow_data[i] - prev_flow_data[i]) / prev_flow_data[i] * 100 if prev_flow_data[i] > 0 else 0
if rate < -20:
flow_data[i + 9] = 1
elif rate > 20:
flow_data[i + 9] = 2
else:
rate = (flow_data[i] - prev_flow_data[i]) / prev_flow_data[i] * 100 if prev_flow_data[i] > 0 else 0
if rate > 20:
flow_data[i + 9] = 1
elif rate < -20:
flow_data[i + 9] = 2
return res_data
def parse_data2pb(data_list):
res_list = []
for row in data_list:
day = row['day']
tp_start = row['tp_start']
tp_end = row['tp_end']
item_cross_delay_info = pb.xl_cross_delayinfo_t()
item_cross_delay_info.ParseFromString(row['data'])
res_list.append({
'day': day,
'tp_start': tp_start,
'tp_end': tp_end,
'data': item_cross_delay_info
})
return res_list
def gen_cross_problems(avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict):
# 运行效率、均衡调控、配时方案、路口渠化
operating_efficiency_problems = gen_operating_efficiency_problems(avg_cross_delay_info, roads_dir_dict)
balanced_control_problems = gen_balanced_control_problems(avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict)
phase_problems = gen_phase_problems()
cross_channelized_problems = gen_cross_channelized_problems()
problems = [operating_efficiency_problems, balanced_control_problems, phase_problems, cross_channelized_problems]
return problems
def gen_operating_efficiency_problems(avg_cross_delay_info, roads_dir_dict):
if not avg_cross_delay_info:
return [{
'item': '运行效率',
'values': []
}]
road_delay_infos = avg_cross_delay_info.inroad_delay_infos
high_park_problems, high_park_suggestions = gen_high_park_problems(road_delay_infos, roads_dir_dict)
operating_efficiency_problems = {
'item': '运行效率',
'values': [
{
'item': '多次排队',
'detail': high_park_problems,
'reason': '某一进口道转向的多次停车率大于15%',
'suggestions': high_park_suggestions
},
]
}
pass
def gen_high_park_problems(road_delay_infos, roads_dir_dict):
detail = [{
'text': '未见异常'
}]
suggestion = []
err_src_dict = {}
src_list = list(roads_dir_dict.keys())
road_delays_dict = {item.inroadid: item for item in road_delay_infos}
for src_dir in src_list:
if roads_dir_dict[src_dir]['in'] == '-' or roads_dir_dict[src_dir]['in'] not in road_delays_dict.keys():
continue
road_info = road_delays_dict[roads_dir_dict[src_dir]['in']]
flow_delay_infos = road_info.flow_delay_infos
turn_type_flow_delay_info_dict = {item.turn_type: item for item in flow_delay_infos}
for turn_type in turn_type_flow_delay_info_dict.keys():
flow_delay_info = turn_type_flow_delay_info_dict[turn_type]
if turn_type not in (0, 1):
continue
if flow_delay_info.delay_info.high_park_percent > 15:
if src_dir not in err_src_dict.keys():
err_src_dict[src_dir] = [str(turn_type) + ':' + str(flow_delay_info.delay_info.high_park_percent)]
else:
err_src_dict[src_dir].append(str(turn_type) + ':' + str(flow_delay_info.delay_info.high_park_percent))
if err_src_dict:
detail = []
for src_dir in err_src_dict.keys():
src_detail = []
for turn_type in err_src_dict[src_dir]:
flow_detail = {
'turn_type': int_turn_type2str[int(turn_type.split(':')[0])],
'text': '车辆多次停车率过大(',
'percent': turn_type.split(':')[1] + '%'
}
src_detail.append(flow_detail)
src_detail.append({
'text': '车辆需要多次排队才能通过'
})
detail.append({
'src_dir': dir_str_dict[src_dir] + '进口',
'src_detail': src_detail
})
# todo 补充生成建议的逻辑
return detail, suggestion
def gen_high_stop_time_problems(road_delay_infos, roads_dir_dict):
detail = [{
'text': '未见异常'
}]
suggestion = []
# todo 需与产品再次确认逻辑
pass
def gen_balanced_control_problems(avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict):
road_delay_infos = avg_cross_delay_info.inroad_delay_infos
cross_imbalance_detail, cross_imbalance_suggestions = gen_cross_imbalance_problems(road_delay_infos, roads_dir_dict)
turn_imbalance_detail, turn_imbalance_suggestions = gen_turn_imbalance_problems(road_delay_infos, roads_dir_dict, inroad_static_info_dict)
# todo 需要与产品确认路口潮汐问题诊断的逻辑
balanced_control_problems = {
'item': '均衡调控',
'values': [
{
'item': '路口失衡',
'detail': cross_imbalance_detail,
'reason': '路口存在某个流向绿灯时长不足而另一个方向绿灯存在空放的现象',
'suggestions': cross_imbalance_suggestions
},
{
'item': '转向失衡',
'detail': turn_imbalance_detail,
'reason': '同一进口道直行与左转停车次数之差的绝对值大于0.5且转向停车次数的最大值大于1',
'suggestions': turn_imbalance_suggestions
},
]
}
return balanced_control_problems
# 路口失衡问题诊断
def gen_cross_imbalance_problems(road_delay_infos, roads_dir_dict):
detail = [{
'text': '未见异常'
}]
suggestion = []
road_src_dict = {item['in']: item for item in roads_dir_dict}
max_stop_times_road = max(road_delay_infos, key=lambda x: x.delay_info.stop_times)
min_stop_times_road = min(road_delay_infos, key=lambda x: x.delay_info.stop_times)
if max_stop_times_road.delay_info.stop_times > 1 \
and max_stop_times_road.delay_info.stop_times - min_stop_times_road.delay_info.stop_times > 0.5:
max_roadid = max_stop_times_road.inroadid
min_roadid = min_stop_times_road.inroadid
max_src = road_src_dict[max_roadid]
min_src = road_src_dict[min_roadid]
detail = [
{
'src_dir': dir_str_dict[max_src] + '进口',
'text': '的停车次数(' + str(round(max_stop_times_road.delay_info.stop_times, 2)) + ')与'
},
{
'src_dir': dir_str_dict[min_src] + '进口',
'text': f"""的停车次数({str(round(max_stop_times_road.delay_info.stop_times, 2))})相差过大,两者之比为{int(round(max_stop_times_road.delay_info.stop_times, 2) / round(min_stop_times_road.delay_info.stop_times, 2) * 100)}%,分配的绿灯时长不匹配"""
}
]
suggestion = [
{
'text': '调整',
'max_src_dir': dir_str_dict[max_src] + '进口',
},
{
'text': '',
'min_src_dir': dir_str_dict[min_src] + '进口',
},
{
'text': '的绿灯时长的分配情况'
}
]
if max_src == src_reverse[min_src]:
# todo 补充对向情况下额外建议的情况
pass
return detail, suggestion
# 转向失衡问题诊断
def gen_turn_imbalance_problems(road_delay_infos, roads_dir_dict, inroad_static_info_dict):
detail = [{
'text': '未见异常'
}]
suggestion = []
err_road_dict = {}
road_src_dict = {item['in']: item for item in roads_dir_dict}
for road_delay_info in road_delay_infos:
inroadid = road_delay_info.inroadid
flow_delay_infos = road_delay_info.flow_delay_infos
turn_type_flow_delay_info_dict = {item.turn_type: item for item in flow_delay_infos}
lane_num_info = count_lsr(inroad_static_info_dict[inroadid]['lane_turn_info']) if inroadid in inroad_static_info_dict.keys() else None
if 0 in turn_type_flow_delay_info_dict.keys() and 1 in turn_type_flow_delay_info_dict.keys():
s_stop_times = turn_type_flow_delay_info_dict[0].delay_info.stop_times
l_stop_times = turn_type_flow_delay_info_dict[1].delay_info.stop_times
if s_stop_times > 1 or l_stop_times > 1 and abs(l_stop_times - s_stop_times) > 0.5:
if s_stop_times > l_stop_times:
err_road_dict[road_src_dict[inroadid]] = ['直行' + ':' + str(round(s_stop_times, 2)), '左转' + ':' + str(round(l_stop_times, 2)), lane_num_info]
else:
err_road_dict[road_src_dict[inroadid]] = ['左转' + ':' + str(round(s_stop_times, 2)), '直行' + ':' + str(round(l_stop_times, 2)), lane_num_info]
if len(err_road_dict.keys()) > 0:
detail, suggestion, road_num_suggestion = [], [], []
for src_dir in err_road_dict.keys():
item_detail = [
{
'src_dir': dir_str_dict[src_dir] + '进口',
'child_detail': [
{
'turn_type': err_road_dict[src_dir][0].split(':')[0],
},
{
'text': '的停车次数(' + str(err_road_dict[src_dir][0].split(':')[1]) + ')与'
},
{
'turn_type': err_road_dict[src_dir][1].split(':')[0],
},
{
'text': '的停车次数(' + str(err_road_dict[src_dir][1].split(':')[1]) + ')相差过大,两者之比为' + str(int(float(err_road_dict[src_dir][0].split(':')[1]) / float(err_road_dict[src_dir][1].split(':')[1]) * 100)) + '%,分配的绿灯时长不匹配'
}
]
}
]
detail.append(item_detail)
item_suggestion = [
{
'src_dir': dir_str_dict[src_dir] + '进口',
'text': '信号灯直左分控,调整执行和左转车辆绿灯时长分配情况'
},
# todo 补充配时相关建议
]
suggestion.extend(item_suggestion)
if err_road_dict[src_dir][2]:
road_num_suggestion.append(
{
'src_dir': dir_str_dict[src_dir] + '进口',
'text': '左转/直行/右转现有车道数分别为' + err_road_dict[src_dir][2]
}
)
lane_num_suggestion = [
{
'text': '调整',
'src_dir': ''.join([dir_str_dict[item] + '进口' for item in err_road_dict.keys()])
},
{
'text': '车道分配情况'
}
]
lane_num_suggestion.extend(road_num_suggestion)
suggestion = suggestion + lane_num_suggestion
return detail, suggestion
def gen_phase_problems():
pass
def gen_cross_channelized_problems():
pass
def query_cross_phase(nodeid, crossid):
cross_phases, err = QueryCrossRunningPhase(nodeid, [crossid])
if err or not cross_phases or cross_phases.code != 0:
return None
cross_phase_info = cross_phases.data[0]
pass