修复特定值下计算分流转向比返回结果顺序出现异常bug

This commit is contained in:
wangxu 2026-01-19 16:52:02 +08:00
parent 35ac27fbc7
commit 09fd320ff4
1 changed files with 117 additions and 52 deletions

View File

@ -785,9 +785,9 @@ def gen_flow_turn_rate_index(avg_cross_delay_info, roads_dir_dict):
split_turns_set.add(1)
car_num = road_delay_dict[roadid].delay_info.turn_ratio_1 + road_delay_dict[roadid].delay_info.turn_ratio_0 + road_delay_dict[roadid].delay_info.turn_ratio_2
in_flow_rate = int(car_num / cross_sum_car_num * 100) if cross_out_sum_car_num != 0 else 0
l_rate = int(road_delay_dict[roadid].delay_info.turn_ratio_1 / car_num * 100) if car_num != 0 else 0
s_rate = int(road_delay_dict[roadid].delay_info.turn_ratio_0 / car_num * 100) if car_num != 0 else 0
r_rate = int(road_delay_dict[roadid].delay_info.turn_ratio_2 / car_num * 100) if car_num != 0 else 0
l_rate = max(1, round(road_delay_dict[roadid].delay_info.turn_ratio_1 / car_num * 100)) if car_num != 0 else 0
s_rate = max(1, round(road_delay_dict[roadid].delay_info.turn_ratio_0 / car_num * 100)) if car_num != 0 else 0
r_rate = max(1, round(road_delay_dict[roadid].delay_info.turn_ratio_2 / car_num * 100)) if car_num != 0 else 0
if 0 not in split_turns_set:
s_rate = '-'
if 1 not in split_turns_set:
@ -818,9 +818,9 @@ def gen_flow_turn_rate_index(avg_cross_delay_info, roads_dir_dict):
merge_turns_set.add(1)
out_car_num = outroad_info_dict[out_road_id].turn_info.car_num
out_flow_rate = int(out_car_num / cross_out_sum_car_num * 100) if cross_out_sum_car_num != 0 else 0
out_l_rate = int(outroad_info_dict[out_road_id].turn_info.turn_ratio_1 / out_car_num * 100) if out_car_num != 0 else 0
out_s_rate = int(outroad_info_dict[out_road_id].turn_info.turn_ratio_0 / out_car_num * 100) if out_car_num != 0 else 0
out_r_rate = int(outroad_info_dict[out_road_id].turn_info.turn_ratio_2 / out_car_num * 100) if out_car_num != 0 else 0
out_l_rate = max(1, round(outroad_info_dict[out_road_id].turn_info.turn_ratio_1 / out_car_num * 100)) if out_car_num != 0 else 0
out_s_rate = max(1, round(outroad_info_dict[out_road_id].turn_info.turn_ratio_0 / out_car_num * 100)) if out_car_num != 0 else 0
out_r_rate = max(1, round(outroad_info_dict[out_road_id].turn_info.turn_ratio_2 / out_car_num * 100)) if out_car_num != 0 else 0
if 0 not in merge_turns_set:
out_s_rate = '-'
if 1 not in merge_turns_set:
@ -2254,22 +2254,93 @@ def get_prev_cross(nodeid, area_id, roads_dir_dict):
return src_cross
def _to_int_list(nums, mask):
"""把非'-'位转成int再微调±1使总和=100只动最大位。"""
# 先全部转int
nums = [int(v) if not mask[i] else v for i, v in enumerate(nums)]
# 有效位下标
valid = [i for i, m in enumerate(mask) if not m]
if len(valid) < 2: # 单有效或全'-'无需调
return nums
s = sum(nums[i] for i in valid)
if s == 100:
return nums
# 差±1
delta = s - 100
idx_big = max(valid, key=nums.__getitem__) # 仍只动最大位
nums[idx_big] -= delta
return nums
import random
def _to_int_list(float_nums, mask):
"""
将非'-'的浮点数转为整数满足
- 总和 = 100
- 原始值 > 0 的位置结果至少为 1
- 尽量保持比例
"""
n = len(float_nums)
result = [0] * n
valid_indices = [i for i in range(n) if not mask[i]]
if len(valid_indices) == 0:
return ['-' if m else 0 for m in mask]
if len(valid_indices) == 1:
result[valid_indices[0]] = 100
for i in range(n):
if mask[i]:
result[i] = '-'
return result
# Step 1: 初始 round
rounded = []
remainders = []
total = 0
for i in valid_indices:
val = float_nums[i]
r = int(round(val))
# 保证正数至少为1
if r == 0 and val > 0:
r = 1
rounded.append(r)
remainders.append(val - int(val)) # 小数部分,用于后续调整
total += r
# 构建初始结果
for idx, i in enumerate(valid_indices):
result[i] = rounded[idx]
delta = total - 100
if delta == 0:
# 补充 '-'
for i in range(n):
if mask[i]:
result[i] = '-'
return result
# Step 2: 调整 delta
# 如果 delta > 0需要减掉 delta从“最不重要”的位置减
# 如果 delta < 0需要加上 |delta|
# 策略:按小数部分排序(或按值大小),优先调整“对比例影响最小”的项
# 创建可调整项列表:(index_in_valid, original_value, remainder)
adjust_list = []
for idx, i in enumerate(valid_indices):
val = float_nums[i]
# 只允许调整到 >=1
can_decrease = result[i] > 1
can_increase = True # 总是可以加
adjust_list.append((idx, val, remainders[idx], can_decrease))
if delta > 0:
# 需要减少 delta
# 按小数部分升序(小数越小,越该减),或按值升序
adjust_list.sort(key=lambda x: x[2]) # 按 remainder 升序
for _ in range(delta):
for item in adjust_list:
idx_in_valid = item[0]
if result[valid_indices[idx_in_valid]] > 1:
result[valid_indices[idx_in_valid]] -= 1
break
else:
# delta < 0需要增加 |delta|
# 按小数部分降序(小数越大,越该加)
adjust_list.sort(key=lambda x: x[2], reverse=True)
for _ in range(-delta):
for item in adjust_list:
idx_in_valid = item[0]
result[valid_indices[idx_in_valid]] += 1
break
# 补充 '-'
for i in range(n):
if mask[i]:
result[i] = '-'
return result
def fix_to_100(a, b, c):
@ -2277,63 +2348,57 @@ def fix_to_100(a, b, c):
mask = [v == '-' for v in nums]
valid_idx = [i for i, m in enumerate(mask) if not m]
# ----- 全 '-' -----
if not valid_idx:
return nums[:]
return ['-', '-', '-']
# ----- 仅 1 个有效 -----
if len(valid_idx) == 1:
nums[valid_idx[0]] = 100
return _to_int_list(nums, mask)
res = ['-', '-', '-']
res[valid_idx[0]] = 100
return res
# ----- 2 个有效 -----
if len(valid_idx) == 2:
i0, i1 = valid_idx
v0, v1 = nums[i0], nums[i1]
# 只有当两个值恰好是 {0, 100} 时才用特殊处理
if {v0, v1} == {0, 100}:
r1 = random.randint(1, 3)
r2 = 100 - r1
res = ['-', '-', '-']
if v0 == 100:
nums[i0] = max(r1, r2)
nums[i1] = min(r1, r2)
res[i0] = max(r1, r2)
res[i1] = min(r1, r2)
else:
nums[i1] = max(r1, r2)
nums[i0] = min(r1, r2)
return _to_int_list(nums, mask)
res[i1] = max(r1, r2)
res[i0] = min(r1, r2)
return res
else:
# 其他情况:正常归一化(包括 100+50, 0+50 等)
s = v0 + v1
if s == 0:
nums[i0] = 50
nums[i1] = 50
scaled = [50.0, 50.0]
else:
nums[i0] = v0 * 100 / s
nums[i1] = v1 * 100 / s
return _to_int_list(nums, mask)
scaled = [v0 * 100 / s, v1 * 100 / s]
float_result = ['-', '-', '-']
float_result[i0] = scaled[0]
float_result[i1] = scaled[1]
return _to_int_list(float_result, mask)
# ----- 3 个有效 -----
# 特殊处理:仅当存在一个 100 且其余两个为 0 时
if 100 in nums:
zeros = [i for i in range(3) if nums[i] == 0]
if len(zeros) == 2:
# 确认为 [100, 0, 0] 的某种排列
r1 = random.randint(1, 3)
r2 = 100 - r1
nums[zeros[0]] = r1
nums[zeros[1]] = r2
# 100 位置保持不变
return _to_int_list(nums, mask)
res = list(nums)
res[zeros[0]] = r1
res[zeros[1]] = r2
return _to_int_list(res, mask)
# 所有其他情况(包括含 0 但无 100如 [0, 98, 0])→ 正常归一化
# 正常缩放(包括 [1,99,1]
s = sum(nums)
if s == 0:
# 全为 0平均分配或按需调整
nums = [34, 33, 33]
float_result = [33.33, 33.33, 33.33]
else:
for i in range(3):
nums[i] = nums[i] * 100 / s
return _to_int_list(nums, mask)
float_result = [x * 100 / s for x in nums]
return _to_int_list(float_result, mask)
def query_cross_delay_info_controller_export_excel(road_flow_delay_infos, road_flow_turn_rate):