From 7c19f1b5eafc135e9d379e84dfc68ad7ef7ba00c Mon Sep 17 00:00:00 2001 From: xuwang Date: Tue, 9 Jun 2026 16:36:01 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=B7=AF=E5=8F=A3=E8=BF=90?= =?UTF-8?q?=E8=A1=8C=E8=AF=84=E6=B5=8B=E9=A1=B5=E9=9D=A2=E8=BD=AC=E5=90=91?= =?UTF-8?q?=E6=AF=94=E8=A7=92=E5=BA=A6=E5=88=A4=E5=AE=9A=E7=9A=84=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/eva_common.py | 130 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 98 insertions(+), 32 deletions(-) diff --git a/app/eva_common.py b/app/eva_common.py index cb3bfe3..bf1cc73 100644 --- a/app/eva_common.py +++ b/app/eva_common.py @@ -739,16 +739,110 @@ def calc_inroad_imbalance_index(inroad_delay_pb_list): return tmp_list +def _classify_angles_to_turn_types(angles, is_merge=False): + """ + 自适应阈值转向分类。 + 从实际角度数据中提取候选阈值,优先填满更多转向类型,其次选择最接近默认阈值的方案。 + """ + if not angles: + return set() + + default_straight = 30 + default_uturn = 150 + + abs_angles = sorted(set(round(abs(a)) for a in angles)) + + straight_candidates = set() + uturn_candidates = set() + for aa in abs_angles: + if 20 <= aa <= 50: + straight_candidates.add(aa) + if 135 <= aa <= 165: + uturn_candidates.add(aa) + straight_candidates.add(default_straight) + uturn_candidates.add(default_uturn) + straight_candidates = sorted(straight_candidates) + uturn_candidates = sorted(uturn_candidates) + + def classify(straight_thr, uturn_thr): + types = set() + for a in angles: + if abs(a) <= straight_thr: + types.add(0) + elif abs(a) >= uturn_thr: + types.add(3) + elif a > 0: + types.add(1 if is_merge else 2) + else: + types.add(2 if is_merge else 1) + return types + + best_types = None + best_score = (-1, float('inf'), float('inf')) + + for s_thr in straight_candidates: + for u_thr in uturn_candidates: + types = classify(s_thr, u_thr) + score = (len(types), -abs(s_thr - default_straight), -abs(u_thr - default_uturn)) + if score > best_score: + best_score = score + best_types = types + + return best_types + + def gen_flow_turn_rate_index(avg_cross_delay_info, roads_dir_dict): road_delay_infos = avg_cross_delay_info.inroad_delay_infos outroad_infos = avg_cross_delay_info.outroad_infos road_delay_dict = {item.inroadid: item for item in road_delay_infos} outroad_info_dict = {item.outroadid: item for item in outroad_infos} + cross_sum_car_num = sum([item.delay_info.car_num for item in road_delay_infos]) + cross_out_sum_car_num = sum([item.turn_info.car_num for item in outroad_infos]) road_flow_turn_rate = {} inroadid_list = [roads_dir_dict[k]['in'] for k in roads_dir_dict.keys()] outroadid_list = [roads_dir_dict[k]['out'] for k in roads_dir_dict.keys()] - cross_sum_car_num = sum([item.delay_info.car_num for item in road_delay_infos if item.inroadid in inroadid_list]) - cross_out_sum_car_num = sum([item.turn_info.car_num for item in outroad_infos if item.outroadid in outroadid_list]) + + # ---- 预处理:为每个进口道预计算 split_turns_set ---- + split_turns_precomputed = {} + for dir_key in roads_dir_dict: + roadid = roads_dir_dict[dir_key]['in'] + if 'udr_' in roadid or roadid == '-' or roadid not in road_delay_dict: + continue + inroad_info = g_roadnet.query_road(roadid) + if not inroad_info: + continue + angles = [] + for outroadid in outroadid_list: + if not outroadid or outroadid == '-': + continue + outroad = g_roadnet.query_road(outroadid) + if not outroad: + continue + angle = g_roadnet.calc_road_turn_angle_without_abs_split(inroad_info, outroad) + angles.append(angle) + split_turns_precomputed[roadid] = _classify_angles_to_turn_types(angles, is_merge=False) + + # ---- 预处理:为所有出口道预计算 merge_turns_set ---- + merge_turns_precomputed = {} + valid_inroad_infos = [] + for inroadid in inroadid_list: + if not inroadid or inroadid == '-': + continue + inroad = g_roadnet.query_road(inroadid) + if inroad: + valid_inroad_infos.append(inroad) + for out_road_id in set(outroadid_list): + if 'udr_' in out_road_id or not out_road_id or out_road_id == '-': + continue + out_road_info = g_roadnet.query_road(out_road_id) + if not out_road_info: + continue + angles = [] + for inroad in valid_inroad_infos: + angle = g_roadnet.calc_road_turn_angle_without_abs_merge(out_road_info, inroad) + angles.append(angle) + merge_turns_precomputed[out_road_id] = _classify_angles_to_turn_types(angles, is_merge=True) + for dir in roads_dir_dict.keys(): roadid = roads_dir_dict[dir]['in'] if 'udr_' in roadid: @@ -759,21 +853,7 @@ def gen_flow_turn_rate_index(avg_cross_delay_info, roads_dir_dict): l_num, s_num, r_num = 0, 0, 0 out_l_num, out_s_num, out_r_num = 0, 0, 0 if roadid != '-' and roadid in road_delay_dict.keys(): - inroad_info = g_roadnet.query_road(roadid) - split_turns_set = set() - for outroadid in outroadid_list: - outroad = g_roadnet.query_road(outroadid) - if not outroad: - continue - angle = g_roadnet.calc_road_turn_angle_without_abs_split(inroad_info, outroad) - if abs(angle) <= 30: - split_turns_set.add(0) - elif abs(angle) >= 150: - split_turns_set.add(3) - elif angle > 0: - split_turns_set.add(2) - else: - split_turns_set.add(1) + split_turns_set = split_turns_precomputed.get(roadid, set()) car_num = road_delay_dict[roadid].delay_info.car_num in_flow_rate = round(car_num / cross_sum_car_num * 100, 1) if cross_sum_car_num != 0 else 0 l_rate = max(1, round((road_delay_dict[roadid].delay_info.turn_ratio_1 + road_delay_dict[roadid].delay_info.turn_ratio_3)) / car_num * 100) if car_num != 0 else 0 @@ -795,21 +875,7 @@ def gen_flow_turn_rate_index(avg_cross_delay_info, roads_dir_dict): l_rate, s_rate, r_rate = rate_list[0] if rate_list[0] == '-' else rate_list[0], rate_list[1] if rate_list[1] == '-' else rate_list[1], rate_list[2] if rate_list[2] == '-' else rate_list[2] out_road_id = roads_dir_dict[dir]['out'] if out_road_id != '-' and out_road_id in outroad_info_dict.keys(): - out_road_info = g_roadnet.query_road(out_road_id) - merge_turns_set = set() - for inroadid in inroadid_list: - inroad = g_roadnet.query_road(inroadid) - if not inroad: - continue - angle = g_roadnet.calc_road_turn_angle_without_abs_merge(out_road_info, inroad) - if abs(angle) <= 30: - merge_turns_set.add(0) - elif abs(angle) >= 150: - merge_turns_set.add(3) - elif angle > 0: - merge_turns_set.add(1) - else: - merge_turns_set.add(2) + merge_turns_set = merge_turns_precomputed.get(out_road_id, set()) out_car_num = outroad_info_dict[out_road_id].turn_info.car_num out_flow_rate = round(out_car_num / cross_out_sum_car_num * 100, 1) if cross_out_sum_car_num != 0 else 0 # out_l_rate = max(1, round(outroad_info_dict[out_road_id].turn_info.turn_ratio_1 / out_car_num * 100)) if out_car_num != 0 else 0