修复路口诊断页面部分产品走查bug

This commit is contained in:
wangxu 2025-11-03 11:25:21 +08:00
parent dfa4dcb760
commit fcb6fd6117
3 changed files with 59 additions and 30 deletions

View File

@ -426,3 +426,17 @@ def parse_week_str(s: str):
date_list = [(start + timedelta(days=i)).strftime("%Y%m%d")
for i in range((end - start).days + 1)]
return date_list
def sort_dict_by_clockwise(input_dict: Dict[str, any]) -> dict:
"""
将字典按照顺时针方向从北开始排序键的顺序
参数:
input_dict: 键为方向字母的字典
返回:
按照顺时针顺序排列的有序字典
"""
clockwise_order = ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW']
return {key: input_dict[key] for key in clockwise_order if key in input_dict}

View File

@ -253,9 +253,8 @@ def query_cross_index_trend_controller(params):
prev_date = (datetime.strptime(query_date, '%Y%m%d') - timedelta(days=1)).strftime('%Y%m%d')
month_ago_date = (datetime.now().date() - timedelta(days=30)).strftime('%Y%m%d')
month_date_list = generate_date_range(month_ago_date, query_date)
# ten_week_ago_date = (datetime.now().date() - timedelta(days=70)).strftime('%Y%m%d')
# week_date_list = generate_date_range(ten_week_ago_date, query_date)
now_prev_date = (datetime.now().date() - timedelta(days=1)).strftime('%Y%m%d')
month_date_list = generate_date_range(month_ago_date, now_prev_date)
ten_weeks_date_list = gen_ten_weeks_ago_data_list()
# 查询台账信息 获取路网渠化关系

View File

@ -172,7 +172,8 @@ def gen_avg_cross_delay_pb(cross_delay_data_list):
inroad_delay_info.delay_info.turn_ratio_2 = turn_ratio_2
inroad_delay_info.delay_info.turn_ratio_3 = turn_ratio_3
inroad_delay_pb_list.append(inroad_delay_info)
cross_imbalance_index = round((max_stop_times - min_stop_times) / avg_cross_delay.delay_info.stop_times, 2) if avg_cross_delay.delay_info.stop_times != 0 else 0
avg_road_stop_times = sum(item.delay_info.stop_times for item in inroad_delay_pb_list) / len(inroad_delay_pb_list)
cross_imbalance_index = round((round(max_stop_times, 2) - round(min_stop_times, 2)) / round(avg_road_stop_times, 2), 2) if avg_cross_delay.delay_info.stop_times != 0 else 0
avg_cross_delay.delay_info.imbalance_index = cross_imbalance_index
for flow_delay_info in flow_delay_list:
@ -358,7 +359,7 @@ def gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict):
src_light_dict = gen_dir_light_info_dict(light_infos)
roads = cross_ledger_info_dict['data']['roads']
dir_list = roads.keys()
length_info, lane_nums_list, road_special_info = {}, [], {}
length_info, lane_nums_dict, road_special_info = {}, {}, {}
sum_length = 0
for dir in dir_list:
dir_length = roads[dir]['topology']['entry']['road_length']
@ -377,7 +378,7 @@ def gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict):
lane_turn_info = roads[dir]['topology']['entry']['lane_turn_info']
if check_outside_left(lane_turn_info):
outside_left = 1
road_special_info[dir_str_dict[dir]] = {
road_special_info[dir] = {
'straight_left_split': straight_left_split,
'reverse_turn': reverse_turn,
'hold_left': left_turn_waiting_area,
@ -385,11 +386,14 @@ def gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict):
'reversible_lanes_info': reversible_lanes_info,
'outside_left': outside_left
}
lane_nums_list.append([dir_str_dict[dir] + '进口', roads[dir]['entry_lane_num'], dir_str_dict[src_reverse[dir]] + '出口', roads[dir]['exit_lane_num']])
lane_nums_dict[dir] = [[dir_str_dict[dir] + '进口', roads[dir]['entry_lane_num'], dir_str_dict[src_reverse[dir]] + '出口', roads[src_reverse[dir]]['exit_lane_num']] if src_reverse[dir] in roads.keys() else '-']
if dir_length != '-':
sum_length += dir_length
length_info[dir] = dir_length
avg_length = round(sum_length / len(length_info.keys()), 0) if length_info else 0
sorted_length_info = sort_dict_by_clockwise(length_info)
sorted_road_special_info = sort_dict_by_clockwise(road_special_info)
sort_lane_num_info = sort_dict_by_clockwise(lane_nums_dict)
cross_static_info = {
'crossid': crossid,
'nodeid': nodeid,
@ -398,9 +402,9 @@ def gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict):
'slc_company': slc_company_name,
'internet': internet_str,
'cross_model': cross_model,
'lane_nums_list': lane_nums_list,
'road_special_info': road_special_info,
'length_info': length_info,
'lane_nums_list': list(sort_lane_num_info.values()),
'road_special_info': sorted_road_special_info,
'length_info': sorted_length_info,
'avg_length': avg_length
}
return cross_static_info, cross_ledger_info_dict['data']
@ -439,9 +443,10 @@ def gen_dir_light_info_dict(light_infos):
def check_outside_left(lane_turn_info):
seen_s = False # 是否已出现直行车道
for func in lane_turn_info:
if 's' in func: # 当前是直行车道
turn_type_str = g_turn2str[str(func)]
if 's' in turn_type_str: # 当前是直行车道
seen_s = True
if seen_s and ('l' in func): # 直行车道右侧出现左转
if seen_s and ('l' in turn_type_str): # 直行车道右侧出现左转
return True
return False
@ -484,7 +489,11 @@ def gen_road_delay_index(avg_cross_delay_info, roads_dir_dict):
'service_level': flow_service_level,
'move_speed': flow_delay_info.delay_info.move_speed / 100
}
return road_flow_index
tmp_dict = {v['src_dir']: k for k, v in road_flow_index.items()}
sorted_tmp_dict = sort_dict_by_clockwise(tmp_dict)
sorted_key = list(sorted_tmp_dict.values())
res = {k: road_flow_index[k] for k in sorted_key}
return res
def gen_road_dir_dict(cross_ledger_info_dict):
@ -503,15 +512,15 @@ def gen_road_dir_dict(cross_ledger_info_dict):
def calc_inroad_imbalance_index(inroad_delay_pb_list):
tmp_list = copy.deepcopy(inroad_delay_pb_list)
for item in tmp_list:
avg_stop_times = item.delay_info.stop_times
max_stop_times, min_stop_times = 0, 99999
flow_delay_infos = item.flow_delay_infos
avg_stop_times = sum(flow_delay_info.delay_info.stop_times for flow_delay_info in flow_delay_infos) / len(flow_delay_infos)
for flow_delay_info in flow_delay_infos:
if flow_delay_info.delay_info.stop_times > max_stop_times:
max_stop_times = flow_delay_info.delay_info.stop_times
if flow_delay_info.delay_info.stop_times < min_stop_times:
min_stop_times = flow_delay_info.delay_info.stop_times
imbalance_index = round((max_stop_times - min_stop_times) / avg_stop_times, 2) if avg_stop_times > 0 and max_stop_times > 0 and min_stop_times != 99999 else 0
imbalance_index = round((round(max_stop_times, 2) - round(min_stop_times, 2)) / round(avg_stop_times, 2), 2) if avg_stop_times > 0 and max_stop_times > 0 and min_stop_times != 99999 else 0
item.delay_info.imbalance_index = imbalance_index
return tmp_list
@ -553,7 +562,11 @@ def gen_flow_turn_rate_index(avg_cross_delay_info, roads_dir_dict):
'out_s_rate': out_s_rate,
'out_r_rate': out_r_rate
}
return road_flow_turn_rate
tmp_dict = {v['src_dir']: k for k, v in road_flow_turn_rate.items()}
sorted_tmp_dict = sort_dict_by_clockwise(tmp_dict)
sorted_key = list(sorted_tmp_dict.values())
res = {k: road_flow_turn_rate[k] for k in sorted_key}
return res
def calc_service_level(delay_time):
@ -707,9 +720,12 @@ def parse_single_cross_delay_info(crossid, nodeid, data_list, data_type, roads_d
if item_cross_delay_info:
cross_car_num = item_cross_delay_info.delay_info.car_num
road_delay_infos = item_cross_delay_info.inroad_delay_infos
max_stop_times = max(road_delay_infos, key=lambda x: x.delay_info.stop_times).delay_info.stop_times
min_stop_times = min(road_delay_infos, key=lambda x: x.delay_info.stop_times).delay_info.stop_times
cross_imbalance_index = round((max_stop_times - min_stop_times) / item_cross_delay_info.delay_info.stop_times, 2) if item_cross_delay_info.delay_info.stop_times != 0 else 0
if data_type != 'week':
cross_imbalance_index = item_cross_delay_info.delay_info.imbalance_index
else:
max_stop_times = max(road_delay_infos, key=lambda x: x.delay_info.stop_times).delay_info.stop_times
min_stop_times = min(road_delay_infos, key=lambda x: x.delay_info.stop_times).delay_info.stop_times
cross_imbalance_index = round((max_stop_times - min_stop_times) / item_cross_delay_info.delay_info.stop_times, 2) if item_cross_delay_info.delay_info.stop_times != 0 else 0
overview_data = [
# 停车次数
round(item_cross_delay_info.delay_info.stop_times, 2),
@ -829,27 +845,27 @@ def calc_overview_change_rate(overview_data, prev_overview_data):
continue
rate = (overview_data[i] - prev_overview_data[i]) / prev_overview_data[i] * 100 if prev_overview_data[i] > 0 else 0
if rate < -20:
res_data[i + 10] = 1
elif rate > 20:
res_data[i + 10] = 2
elif rate > 20:
res_data[i + 10] = 1
else:
# 指标下降代表恶化的有平均速度、不停车速度
if overview_data[i] == '-' or prev_overview_data[i] == '-':
continue
rate = (overview_data[i] - prev_overview_data[i]) / prev_overview_data[i] * 100 if prev_overview_data[i] > 0 else 0
if rate > 20:
res_data[i + 10] = 1
elif rate < -20:
res_data[i + 10] = 2
elif rate < -20:
res_data[i + 10] = 1
return res_data
def calc_roads_data_change_rate(roads_data, prev_roads_data):
res_data = copy.deepcopy(roads_data)
src_dir_list = list(roads_data.keys())
src_dir_list = list(res_data.keys())
for src_dir in src_dir_list:
src_dir_road_data = roads_data[src_dir]['road']
src_dir_road_data = res_data[src_dir]['road']
if src_dir not in prev_roads_data.keys():
continue
# 指标提升代表恶化的有停车次数、多次停车率、停车时间、延误时间、相对流量、分流转向占比
@ -862,17 +878,17 @@ def calc_roads_data_change_rate(roads_data, prev_roads_data):
if i in (0, 1, 2, 3, 4, 7, 8):
rate = (src_dir_road_data[i] - prev_src_dir_road_data[i]) / prev_src_dir_road_data[i] * 100 if prev_src_dir_road_data[i] > 0 else 0
if rate < -20:
src_dir_road_data[i + 10] = 1
elif rate > 20:
src_dir_road_data[i + 10] = 2
elif rate > 20:
src_dir_road_data[i + 10] = 1
else:
# 指标下降代表恶化的有平均速度、不停车速度
rate = (src_dir_road_data[i] - prev_src_dir_road_data[i]) / prev_src_dir_road_data[i] * 100 if prev_src_dir_road_data[i] > 0 else 0
if rate > 20:
src_dir_road_data[i + 10] = 1
elif rate < -20:
src_dir_road_data[i + 10] = 2
flow_datas = roads_data[src_dir]['flow']
elif rate < -20:
src_dir_road_data[i + 10] = 1
flow_datas = res_data[src_dir]['flow']
prev_flow_datas = prev_roads_data[src_dir]['flow']
for turn_type in flow_datas.keys():
flow_data = flow_datas[turn_type]