优化路口运行评测页面转向比角度判定的逻辑

This commit is contained in:
xuwang 2026-06-09 16:36:01 +08:00
parent 8d2a1d1533
commit 7c19f1b5ea
1 changed files with 98 additions and 32 deletions

View File

@ -739,16 +739,110 @@ def calc_inroad_imbalance_index(inroad_delay_pb_list):
return tmp_list
def _classify_angles_to_turn_types(angles, is_merge=False):
"""
自适应阈值转向分类
从实际角度数据中提取候选阈值优先填满更多转向类型其次选择最接近默认阈值的方案
"""
if not angles:
return set()
default_straight = 30
default_uturn = 150
abs_angles = sorted(set(round(abs(a)) for a in angles))
straight_candidates = set()
uturn_candidates = set()
for aa in abs_angles:
if 20 <= aa <= 50:
straight_candidates.add(aa)
if 135 <= aa <= 165:
uturn_candidates.add(aa)
straight_candidates.add(default_straight)
uturn_candidates.add(default_uturn)
straight_candidates = sorted(straight_candidates)
uturn_candidates = sorted(uturn_candidates)
def classify(straight_thr, uturn_thr):
types = set()
for a in angles:
if abs(a) <= straight_thr:
types.add(0)
elif abs(a) >= uturn_thr:
types.add(3)
elif a > 0:
types.add(1 if is_merge else 2)
else:
types.add(2 if is_merge else 1)
return types
best_types = None
best_score = (-1, float('inf'), float('inf'))
for s_thr in straight_candidates:
for u_thr in uturn_candidates:
types = classify(s_thr, u_thr)
score = (len(types), -abs(s_thr - default_straight), -abs(u_thr - default_uturn))
if score > best_score:
best_score = score
best_types = types
return best_types
def gen_flow_turn_rate_index(avg_cross_delay_info, roads_dir_dict):
road_delay_infos = avg_cross_delay_info.inroad_delay_infos
outroad_infos = avg_cross_delay_info.outroad_infos
road_delay_dict = {item.inroadid: item for item in road_delay_infos}
outroad_info_dict = {item.outroadid: item for item in outroad_infos}
cross_sum_car_num = sum([item.delay_info.car_num for item in road_delay_infos])
cross_out_sum_car_num = sum([item.turn_info.car_num for item in outroad_infos])
road_flow_turn_rate = {}
inroadid_list = [roads_dir_dict[k]['in'] for k in roads_dir_dict.keys()]
outroadid_list = [roads_dir_dict[k]['out'] for k in roads_dir_dict.keys()]
cross_sum_car_num = sum([item.delay_info.car_num for item in road_delay_infos if item.inroadid in inroadid_list])
cross_out_sum_car_num = sum([item.turn_info.car_num for item in outroad_infos if item.outroadid in outroadid_list])
# ---- 预处理:为每个进口道预计算 split_turns_set ----
split_turns_precomputed = {}
for dir_key in roads_dir_dict:
roadid = roads_dir_dict[dir_key]['in']
if 'udr_' in roadid or roadid == '-' or roadid not in road_delay_dict:
continue
inroad_info = g_roadnet.query_road(roadid)
if not inroad_info:
continue
angles = []
for outroadid in outroadid_list:
if not outroadid or outroadid == '-':
continue
outroad = g_roadnet.query_road(outroadid)
if not outroad:
continue
angle = g_roadnet.calc_road_turn_angle_without_abs_split(inroad_info, outroad)
angles.append(angle)
split_turns_precomputed[roadid] = _classify_angles_to_turn_types(angles, is_merge=False)
# ---- 预处理:为所有出口道预计算 merge_turns_set ----
merge_turns_precomputed = {}
valid_inroad_infos = []
for inroadid in inroadid_list:
if not inroadid or inroadid == '-':
continue
inroad = g_roadnet.query_road(inroadid)
if inroad:
valid_inroad_infos.append(inroad)
for out_road_id in set(outroadid_list):
if 'udr_' in out_road_id or not out_road_id or out_road_id == '-':
continue
out_road_info = g_roadnet.query_road(out_road_id)
if not out_road_info:
continue
angles = []
for inroad in valid_inroad_infos:
angle = g_roadnet.calc_road_turn_angle_without_abs_merge(out_road_info, inroad)
angles.append(angle)
merge_turns_precomputed[out_road_id] = _classify_angles_to_turn_types(angles, is_merge=True)
for dir in roads_dir_dict.keys():
roadid = roads_dir_dict[dir]['in']
if 'udr_' in roadid:
@ -759,21 +853,7 @@ def gen_flow_turn_rate_index(avg_cross_delay_info, roads_dir_dict):
l_num, s_num, r_num = 0, 0, 0
out_l_num, out_s_num, out_r_num = 0, 0, 0
if roadid != '-' and roadid in road_delay_dict.keys():
inroad_info = g_roadnet.query_road(roadid)
split_turns_set = set()
for outroadid in outroadid_list:
outroad = g_roadnet.query_road(outroadid)
if not outroad:
continue
angle = g_roadnet.calc_road_turn_angle_without_abs_split(inroad_info, outroad)
if abs(angle) <= 30:
split_turns_set.add(0)
elif abs(angle) >= 150:
split_turns_set.add(3)
elif angle > 0:
split_turns_set.add(2)
else:
split_turns_set.add(1)
split_turns_set = split_turns_precomputed.get(roadid, set())
car_num = road_delay_dict[roadid].delay_info.car_num
in_flow_rate = round(car_num / cross_sum_car_num * 100, 1) if cross_sum_car_num != 0 else 0
l_rate = max(1, round((road_delay_dict[roadid].delay_info.turn_ratio_1 + road_delay_dict[roadid].delay_info.turn_ratio_3)) / car_num * 100) if car_num != 0 else 0
@ -795,21 +875,7 @@ def gen_flow_turn_rate_index(avg_cross_delay_info, roads_dir_dict):
l_rate, s_rate, r_rate = rate_list[0] if rate_list[0] == '-' else rate_list[0], rate_list[1] if rate_list[1] == '-' else rate_list[1], rate_list[2] if rate_list[2] == '-' else rate_list[2]
out_road_id = roads_dir_dict[dir]['out']
if out_road_id != '-' and out_road_id in outroad_info_dict.keys():
out_road_info = g_roadnet.query_road(out_road_id)
merge_turns_set = set()
for inroadid in inroadid_list:
inroad = g_roadnet.query_road(inroadid)
if not inroad:
continue
angle = g_roadnet.calc_road_turn_angle_without_abs_merge(out_road_info, inroad)
if abs(angle) <= 30:
merge_turns_set.add(0)
elif abs(angle) >= 150:
merge_turns_set.add(3)
elif angle > 0:
merge_turns_set.add(1)
else:
merge_turns_set.add(2)
merge_turns_set = merge_turns_precomputed.get(out_road_id, set())
out_car_num = outroad_info_dict[out_road_id].turn_info.car_num
out_flow_rate = round(out_car_num / cross_out_sum_car_num * 100, 1) if cross_out_sum_car_num != 0 else 0
# out_l_rate = max(1, round(outroad_info_dict[out_road_id].turn_info.turn_ratio_1 / out_car_num * 100)) if out_car_num != 0 else 0