修复路口诊断页面部分bug

This commit is contained in:
wangxu 2025-11-10 16:47:13 +08:00
parent d9c9d0e6ad
commit a6fbacc73e
2 changed files with 154 additions and 128 deletions

View File

@ -371,39 +371,38 @@ def gen_ten_weeks_ago_data_list():
return result
def count_lsr(turn_str):
def count_lsr(turn_str: str):
"""
统计车道转向中左转(l)直行(s)右转(r)的数量
Args:
turn_str: 形如 "2|1|1|1|5" 的字符串每个数字对应 g_turn2str 中的键
Returns:
形如 "1/3/1" 的字符串顺序为左转直行右转的数量
按车道顺序统计转向组合种类及数量跳过'-'其余均保留
返回: (中文组合列表, 数量列表) 两个字符串
"""
# 初始化计数器
left_count = 0
straight_count = 0
right_count = 0
# 分割输入字符串
turn_keys = turn_str.split('|')
# 遍历每个车道
for key in turn_keys:
if key in g_turn2str:
turn_value = g_turn2str[key]
# 忽略掉头和特殊车道
if turn_value in ['t', 'bus', 'reversible', '-']:
continue
# 统计每个转向
if 'l' in turn_value:
left_count += 1
if 's' in turn_value:
straight_count += 1
if 'r' in turn_value:
right_count += 1
if not turn_str.strip():
return '-', '0'
# 返回格式化的结果
return f"{left_count}/{straight_count}/{right_count}"
# 代码 -> 中文
g_turn2cn = {
'1': '直行', '2': '左转', '3': '右转', '4': '直左', '5': '直右',
'6': '调头', '7': '直调', '8': '左调', '9': '直左',
'10': '左直调', '11': '-', '12': '直左右', '13': '右转调头',
'14': '左右调', '15': '直右调', '16': '左直右调', '17': '公交车道',
'19': '可变车道'
}
seen = {} # 顺序去重计数
for key in turn_str.split('|'):
if key == '11': # 只跳过 '-'
continue
if key in g_turn2cn:
name = g_turn2cn[key]
seen.setdefault(name, 0)
seen[name] += 1
if not seen:
return '-', '0'
name_lst = list(seen.keys())
count_lst = list(seen.values())
return '/'.join(name_lst), '/'.join(map(str, count_lst))
def time_overlap(a: str, b: List[str]) -> bool:

View File

@ -273,15 +273,15 @@ def gen_avg_cross_delay_pb(cross_delay_data_list):
def gen_overview_index(avg_cross_delay_info, inroad_static_info_dict, nodeid, date_list, roads_dir_dict):
car_num = avg_cross_delay_info.delay_info.car_num
crossid = avg_cross_delay_info.crossid
jam_index = round(avg_cross_delay_info.delay_info.jam_index, 2)
stop_times = round(avg_cross_delay_info.delay_info.stop_times, 2)
high_park_percent = str(avg_cross_delay_info.delay_info.high_park_percent) + '%'
imbalance_index = round(avg_cross_delay_info.delay_info.imbalance_index, 2)
speed = avg_cross_delay_info.delay_info.speed / 100
move_speed = avg_cross_delay_info.delay_info.move_speed / 100
park_time = avg_cross_delay_info.delay_info.park_time
delay_time = avg_cross_delay_info.delay_info.delay_time
high_stop_turn_ratio_desc, main_flow_src_desc = gen_high_stop_turn_ratio_desc(avg_cross_delay_info.inroad_delay_infos, inroad_static_info_dict, car_num)
jam_index = round(avg_cross_delay_info.delay_info.jam_index, 2) if car_num >= 10 else '-'
stop_times = round(avg_cross_delay_info.delay_info.stop_times, 2) if car_num >= 10 else '-'
high_park_percent = str(avg_cross_delay_info.delay_info.high_park_percent) + '%' if car_num >= 10 else '-'
imbalance_index = round(avg_cross_delay_info.delay_info.imbalance_index, 2) if car_num >= 10 else '-'
speed = avg_cross_delay_info.delay_info.speed / 100 if car_num >= 10 else '-'
move_speed = avg_cross_delay_info.delay_info.move_speed / 100 if car_num >= 10 else '-'
park_time = avg_cross_delay_info.delay_info.park_time if car_num >= 10 else '-'
delay_time = avg_cross_delay_info.delay_info.delay_time if car_num >= 10 else '-'
high_stop_turn_ratio_desc, main_flow_src_desc = gen_high_stop_turn_ratio_desc(avg_cross_delay_info.inroad_delay_infos, inroad_static_info_dict, car_num) if car_num >= 10 else '', ''
tide_index_list = calc_tide_index(crossid, nodeid, date_list, roads_dir_dict)
usable_tide_list = [item for item in tide_index_list if item != 0]
@ -453,10 +453,10 @@ def check_outside_left(lane_turn_info):
if func == '-':
continue
turn_type_str = g_turn2str[str(func)]
if 's' in turn_type_str: # 当前是直行车道
seen_s = True
if seen_s and ('l' in turn_type_str): # 直行车道右侧出现左转
return True
if 's' in turn_type_str: # 当前是直行车道
seen_s = True
return False
@ -474,14 +474,14 @@ def gen_road_delay_index(avg_cross_delay_info, roads_dir_dict):
service_level = calc_service_level(road_index.delay_info.delay_time)
road_flow_index[roadid] = {
'src_dir': road_dir[roadid],
'stop_times': round(road_index.delay_info.stop_times, 2),
'high_park_percent': str(road_index.delay_info.high_park_percent) + '%',
'imbalance_index': round(road_index.delay_info.imbalance_index, 2),
'park_time': road_index.delay_info.park_time,
'delay_time': road_index.delay_info.delay_time,
'speed': road_index.delay_info.speed / 100,
'move_speed': road_index.delay_info.move_speed / 100,
'service_level': service_level,
'stop_times': round(road_index.delay_info.stop_times, 2) if road_index.delay_info.car_num >= 5 else '-',
'high_park_percent': str(road_index.delay_info.high_park_percent) + '%' if road_index.delay_info.car_num >= 5 else '-',
'imbalance_index': round(road_index.delay_info.imbalance_index, 2) if road_index.delay_info.car_num >= 5 else '-',
'park_time': road_index.delay_info.park_time if road_index.delay_info.car_num >= 5 else '-',
'delay_time': road_index.delay_info.delay_time if road_index.delay_info.car_num >= 5 else '-',
'speed': road_index.delay_info.speed / 100 if road_index.delay_info.car_num >= 5 else '-',
'move_speed': road_index.delay_info.move_speed / 100 if road_index.delay_info.car_num >= 5 else '-',
'service_level': service_level if road_index.delay_info.car_num >= 5 else '-',
'flow_delays': {}
}
flow_delay_infos = road_index.flow_delay_infos
@ -490,13 +490,13 @@ def gen_road_delay_index(avg_cross_delay_info, roads_dir_dict):
continue
flow_service_level = calc_service_level(flow_delay_info.delay_info.delay_time)
road_flow_index[roadid]['flow_delays'][flow_delay_info.turn_type] = {
'stop_times': round(flow_delay_info.delay_info.stop_times, 2),
'high_park_percent': str(flow_delay_info.delay_info.high_park_percent) + '%',
'park_time': flow_delay_info.delay_info.park_time,
'delay_time': flow_delay_info.delay_info.delay_time,
'speed': flow_delay_info.delay_info.speed / 100,
'service_level': flow_service_level,
'move_speed': flow_delay_info.delay_info.move_speed / 100
'stop_times': round(flow_delay_info.delay_info.stop_times, 2) if flow_delay_info.delay_info.car_num >= 5 else '-',
'high_park_percent': str(flow_delay_info.delay_info.high_park_percent) + '%' if flow_delay_info.delay_info.car_num >= 5 else '-',
'park_time': flow_delay_info.delay_info.park_time if flow_delay_info.delay_info.car_num >= 5 else '-',
'delay_time': flow_delay_info.delay_info.delay_time if flow_delay_info.delay_info.car_num >= 5 else '-',
'speed': flow_delay_info.delay_info.speed / 100 if flow_delay_info.delay_info.car_num >= 5 else '-',
'service_level': flow_service_level if flow_delay_info.delay_info.car_num >= 5 else '-',
'move_speed': flow_delay_info.delay_info.move_speed / 100 if flow_delay_info.delay_info.car_num >= 5 else '-'
}
tmp_dict = {v['src_dir']: k for k, v in road_flow_index.items()}
sorted_tmp_dict = sort_dict_by_clockwise(tmp_dict)
@ -786,7 +786,7 @@ def parse_single_cross_delay_info(crossid, nodeid, data_list, data_type, roads_d
tide_index = calc_tide_index(crossid, nodeid, [day], roads_dir_dict, date_type='week' if data_type == 'week' else None) if data_type != 'hour' else '-'
road_data_dict = {}
cross_car_num = 0
if item_cross_delay_info:
if item_cross_delay_info and item_cross_delay_info.delay_info.car_num >= 10:
cross_car_num = item_cross_delay_info.delay_info.car_num
road_delay_infos = item_cross_delay_info.inroad_delay_infos
if data_type != 'week':
@ -819,7 +819,22 @@ def parse_single_cross_delay_info(crossid, nodeid, data_list, data_type, roads_d
}
src_dir_list = list(roads_dir_dict.keys())
for src_dir in src_dir_list:
if roads_dir_dict[src_dir]['in'] != '-' and roads_dir_dict[src_dir]['in'] in road_data_dict.keys():
src_data = ['-', '-', '-', '-', '-', '-', '-', '-', '-', '-', '-',
# 下方为上述指标变化率颜色展示flag
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
flow_data_dict = {
'0': [
'-', '-', '-', '-', '-', '-', '-', '-', '-', '-',
# 下方为上述指标变化率颜色展示flag
0, 0, 0, 0, 0, 0, 0, 0, 0
],
'1': [
'-', '-', '-', '-', '-', '-', '-', '-', '-', '-',
# 下方为上述指标变化率颜色展示flag
0, 0, 0, 0, 0, 0, 0, 0, 0
]
}
if roads_dir_dict[src_dir]['in'] != '-' and roads_dir_dict[src_dir]['in'] in road_data_dict.keys() and road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.car_num >= 5:
road_car_num = road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.car_num
src_data = [
# 停车次数
@ -853,35 +868,36 @@ def parse_single_cross_delay_info(crossid, nodeid, data_list, data_type, roads_d
turn_type = flow_delay_info.turn_type
if turn_type not in (0, 1):
continue
flow_data = [
# 停车次数
round(flow_delay_info.delay_info.stop_times, 2),
# 多次停车率
flow_delay_info.delay_info.high_park_percent,
# 停车时间
flow_delay_info.delay_info.park_time,
# 延误时间
flow_delay_info.delay_info.delay_time,
# 平均速度
flow_delay_info.delay_info.speed / 100,
# 不停车速度
flow_delay_info.delay_info.move_speed / 100,
# 相对流量
round(flow_delay_info.delay_info.car_num / max_flow_car_num * 100, 2) if max_flow_car_num > 0 else '-',
# 20251104新增流量真实数值需求
flow_delay_info.delay_info.car_num,
# 分流转向占比
round(flow_delay_info.delay_info.car_num / road_car_num * 100, 2) if road_car_num > 0 else '-',
# 服务水平
calc_service_level(flow_delay_info.delay_info.delay_time),
# 指标变化率颜色展示flag 不含服务水平
0, 0, 0, 0, 0, 0, 0, 0, 0
]
flow_data_dict[turn_type] = flow_data
data_dict[key]['roads_data'][src_dir] = {
'road': src_data,
'flow': flow_data_dict
}
if flow_delay_info.delay_info.car_num >= 5:
flow_data = [
# 停车次数
round(flow_delay_info.delay_info.stop_times, 2),
# 多次停车率
flow_delay_info.delay_info.high_park_percent,
# 停车时间
flow_delay_info.delay_info.park_time,
# 延误时间
flow_delay_info.delay_info.delay_time,
# 平均速度
flow_delay_info.delay_info.speed / 100,
# 不停车速度
flow_delay_info.delay_info.move_speed / 100,
# 相对流量
round(flow_delay_info.delay_info.car_num / max_flow_car_num * 100, 2) if max_flow_car_num > 0 else '-',
# 20251104新增流量真实数值需求
flow_delay_info.delay_info.car_num,
# 分流转向占比
round(flow_delay_info.delay_info.car_num / road_car_num * 100, 2) if road_car_num > 0 else '-',
# 服务水平
calc_service_level(flow_delay_info.delay_info.delay_time),
# 指标变化率颜色展示flag 不含服务水平
0, 0, 0, 0, 0, 0, 0, 0, 0
]
flow_data_dict[turn_type] = flow_data
data_dict[key]['roads_data'][src_dir] = {
'road': src_data,
'flow': flow_data_dict
}
return data_dict
@ -949,7 +965,7 @@ def calc_roads_data_change_rate(roads_data, prev_roads_data):
if i >= 10:
continue
if src_dir_road_data[i] == '-' or prev_src_dir_road_data[i] == '-':
src_dir_road_data[i + 11] = 0
continue
if i in (0, 1, 2, 3, 4, 7, 8, 9):
rate = (src_dir_road_data[i] - prev_src_dir_road_data[i]) / prev_src_dir_road_data[i] * 100 if prev_src_dir_road_data[i] > 0 else 0
if rate < -20:
@ -974,7 +990,7 @@ def calc_roads_data_change_rate(roads_data, prev_roads_data):
if i >= 9:
continue
if flow_data[i] == '-' or prev_flow_data[i] == '-':
flow_data[i + 9] = 0
continue
if i in (0, 1, 2, 3, 6, 7, 8):
rate = (flow_data[i] - prev_flow_data[i]) / prev_flow_data[i] * 100 if prev_flow_data[i] > 0 else 0
if rate < -20:
@ -1076,7 +1092,7 @@ def gen_high_park_problems(road_delay_infos, roads_dir_dict, cross_phase):
flow_delay_info = turn_type_flow_delay_info_dict[turn_type]
if turn_type not in (0, 1):
continue
if flow_delay_info.delay_info.high_park_percent > 15:
if flow_delay_info.delay_info.high_park_percent > 15 and flow_delay_info.delay_info.car_num >= 5:
if src_dir not in err_src_dict.keys():
err_src_dict[src_dir] = [str(turn_type) + ':' + str(flow_delay_info.delay_info.high_park_percent)]
else:
@ -1129,7 +1145,7 @@ def gen_high_stop_time_problems(avg_cross_delay_info, is_peak):
# 表示为高峰时段,否则为平峰时段
desc, max_stop_times = '高峰时段', 2
if avg_cross_delay_info:
if avg_cross_delay_info.delay_info.stop_times > max_stop_times:
if avg_cross_delay_info.delay_info.stop_times > max_stop_times and avg_cross_delay_info.delay_info.car_num >= 10:
total_num = 1
detail = [{
'child_detail': [
@ -1209,7 +1225,7 @@ def gen_cross_imbalance_problems(road_delay_infos, roads_dir_dict, cross_phase):
max_stop_times_road = max(road_delay_infos, key=lambda x: x.delay_info.stop_times)
min_stop_times_road = min(road_delay_infos, key=lambda x: x.delay_info.stop_times)
if max_stop_times_road.delay_info.stop_times > 1 \
and max_stop_times_road.delay_info.stop_times - min_stop_times_road.delay_info.stop_times > 0.5:
and max_stop_times_road.delay_info.stop_times - min_stop_times_road.delay_info.stop_times > 0.5 and max_stop_times_road.delay_info.car_num >= 5 and min_stop_times_road.delay_info.car_num >= 5:
max_roadid = max_stop_times_road.inroadid
min_roadid = min_stop_times_road.inroadid
max_src = road_src_dict[max_roadid] if max_roadid in road_src_dict else None
@ -1218,18 +1234,20 @@ def gen_cross_imbalance_problems(road_delay_infos, roads_dir_dict, cross_phase):
return detail, suggestion, total_num
total_num = 1
detail = [
{
'child_detail': [
{
'max_src_dir': dir_str_dict[max_src] + '进口',
'text': '的停车次数(' + str(round(max_stop_times_road.delay_info.stop_times, 2)) + ')与'
},
{
'min_src_dir': dir_str_dict[min_src] + '进口',
'text': f"""的停车次数({str(round(max_stop_times_road.delay_info.stop_times, 2))})相差过大,两者之比为{int(round(max_stop_times_road.delay_info.stop_times, 2) / round(min_stop_times_road.delay_info.stop_times, 2) * 100)}%,分配的绿灯时长不匹配"""
}
]
}
[
{
'child_detail': [
{
'max_src_dir': dir_str_dict[max_src] + '进口',
'text': '的停车次数(' + str(round(max_stop_times_road.delay_info.stop_times, 2)) + ')与'
},
{
'min_src_dir': dir_str_dict[min_src] + '进口',
'text': f"""的停车次数({str(round(min_stop_times_road.delay_info.stop_times, 2))})相差过大,两者之比为{int(round(max_stop_times_road.delay_info.stop_times, 2) / round(min_stop_times_road.delay_info.stop_times, 2) * 100)}%,分配的绿灯时长不匹配"""
}
]
}
]
]
suggestion = [
{
@ -1269,15 +1287,15 @@ def gen_turn_imbalance_problems(road_delay_infos, roads_dir_dict, inroad_static_
continue
flow_delay_infos = road_delay_info.flow_delay_infos
turn_type_flow_delay_info_dict = {item.turn_type: item for item in flow_delay_infos}
lane_num_info = count_lsr(inroad_static_info_dict[inroadid]['lane_turn_info']) if inroadid in inroad_static_info_dict.keys() else None
lane_num_info, count_num = count_lsr(inroad_static_info_dict[inroadid]['lane_turn_info']) if inroadid in inroad_static_info_dict.keys() else None
if 0 in turn_type_flow_delay_info_dict.keys() and 1 in turn_type_flow_delay_info_dict.keys():
s_stop_times = turn_type_flow_delay_info_dict[0].delay_info.stop_times
l_stop_times = turn_type_flow_delay_info_dict[1].delay_info.stop_times
if s_stop_times > 1 or l_stop_times > 1 and abs(l_stop_times - s_stop_times) > 0.5:
if (s_stop_times > 1 or l_stop_times > 1) and abs(l_stop_times - s_stop_times) > 0.5 and turn_type_flow_delay_info_dict[0].delay_info.car_num >= 5 and turn_type_flow_delay_info_dict[1].delay_info.car_num >= 5:
if s_stop_times > l_stop_times:
err_road_dict[road_src_dict[inroadid]] = ['直行' + ':' + str(round(s_stop_times, 2)), '左转' + ':' + str(round(l_stop_times, 2)), lane_num_info]
err_road_dict[road_src_dict[inroadid]] = ['直行' + ':' + str(round(s_stop_times, 2)), '左转' + ':' + str(round(l_stop_times, 2)), lane_num_info, count_num]
else:
err_road_dict[road_src_dict[inroadid]] = ['左转' + ':' + str(round(s_stop_times, 2)), '直行' + ':' + str(round(l_stop_times, 2)), lane_num_info]
err_road_dict[road_src_dict[inroadid]] = ['左转' + ':' + str(round(s_stop_times, 2)), '直行' + ':' + str(round(l_stop_times, 2)), lane_num_info, count_num]
if len(err_road_dict.keys()) > 0:
detail, suggestion, road_num_suggestion = [], [], []
for src_dir in err_road_dict.keys():
@ -1321,7 +1339,7 @@ def gen_turn_imbalance_problems(road_delay_infos, roads_dir_dict, inroad_static_
'child_detail': [
{
'src_dir': dir_str_dict[src_dir] + '进口',
'text': '左转/直行/右转现有车道数分别为' + err_road_dict[src_dir][2]
'text': err_road_dict[src_dir][2] + '现有车道数分别为' + err_road_dict[src_dir][3]
}
]
}
@ -1337,10 +1355,10 @@ def gen_turn_imbalance_problems(road_delay_infos, roads_dir_dict, inroad_static_
{
'text': '车道分配情况'
}
]
],
'next_line': road_num_suggestion
}
]
lane_num_suggestion.extend(road_num_suggestion)
suggestion = suggestion + lane_num_suggestion
return detail, suggestion, total_num
@ -1366,6 +1384,8 @@ def gen_cross_tide_problems(crossid, nodeid, date_list, roads_dir_dict):
pm_inroad_info_dict = {item.inroadid: item for item in pm_inroad_infos}
# 找出早高峰最高流量的进口道
for am_inroad_info in am_inroad_infos:
if am_inroad_info.delay_info.car_num < 5:
continue
am_flow = am_inroad_info.delay_info.car_num
if am_flow > max_am_flow:
max_am_flow = am_flow
@ -1378,6 +1398,8 @@ def gen_cross_tide_problems(crossid, nodeid, date_list, roads_dir_dict):
return detail, suggestions, total_num
# 找出晚高峰最高流量的进口道
for pm_inroad_info in pm_inroad_infos:
if pm_inroad_info.delay_info.car_num < 5:
continue
pm_flow = pm_inroad_info.delay_info.car_num
if pm_flow > max_pm_flow:
max_pm_flow = pm_flow
@ -1566,10 +1588,10 @@ def gen_inroad_num_problems(road_delay_infos, inroad_static_info_dict, roads_dir
road_car_num = road_delay.delay_info.car_num
turn_delay_dict = {item.turn_type: item for item in flow_delay_infos}
left_lane_num_half, straight_lane_num_half, right_lane_num_half = count_lsr_half(lane_info)
lane_num_info = count_lsr(lane_info)
lane_num_info, count_num = count_lsr(lane_info)
sum_half_num = left_lane_num_half + straight_lane_num_half + right_lane_num_half
left_lane_num_rate = round(left_lane_num_half / sum_half_num) if sum_half_num > 0 else 0
straight_lane_num_rate = round(straight_lane_num_half / sum_half_num) if sum_half_num > 0 else 0
left_lane_num_rate = int(round(left_lane_num_half / sum_half_num, 2) * 100) if sum_half_num > 0 else 0
straight_lane_num_rate = int(round(straight_lane_num_half / sum_half_num, 2) * 100) if sum_half_num > 0 else 0
for turn_type in [0, 1]:
if turn_type not in turn_delay_dict.keys():
continue
@ -1588,14 +1610,16 @@ def gen_inroad_num_problems(road_delay_infos, inroad_static_info_dict, roads_dir
'turn_type': turn_type_str,
'flow_rate': flow_rate,
'lane_num_rate': lane_num_rate,
'lane_num_info': lane_num_info
'lane_num_info': lane_num_info,
'count_num': count_num
})
else:
err_src_dirs[src_dir] = [{
'turn_type': turn_type_str,
'flow_rate': flow_rate,
'lane_num_rate': lane_num_rate,
'lane_num_info': lane_num_info
'lane_num_info': lane_num_info,
'count_num': count_num
}]
if len(err_src_dirs.keys()) > 0:
detail = []
@ -1624,19 +1648,22 @@ def gen_inroad_num_problems(road_delay_infos, inroad_static_info_dict, roads_dir
},
{
'text': '车道分配情况'
},
{
'next_line': f"{err_src_dirs[src_dir][0]['lane_num_info']}现有车道数分别为{err_src_dirs[src_dir][0]['count_num']}"
}
]
})
suggestions.append(
{
'child_detail': [
{
'src_dir': dir_str_dict[src_dir] + '进口',
'text': f"左转/直行/右转现有车道数分别为{err_src_dirs[src_dir][0]['lane_num_info']}"
}
]
}
)
# suggestions.append(
# {
# 'child_detail': [
# {
# 'src_dir': dir_str_dict[src_dir] + '进口',
# 'text': f"{err_src_dirs[src_dir][0]['lane_num_info']}现有车道数分别为{err_src_dirs[src_dir][0]['count_num']}"
# }
# ]
# }
# )
return detail, suggestions, total_num
@ -1729,14 +1756,14 @@ def get_flow_phase_detail(src_dir, turn_type_list, cross_phase):
schedule_suggestion.append(
{
'src_dir': keys[i],
'text': '绿灯时长为' + str(keys[i]) + 's'
'text': '绿灯时长为' + str(need_src_flow[keys[i]]) + 's'
}
)
else:
schedule_suggestion.append(
{
'src_dir': keys[i],
'text': '绿灯时长为' + str(keys[i]) + 's'
'text': '绿灯时长为' + str(need_src_flow[keys[i]]) + 's'
}
)
phase_suggestion.append({