提交未编辑完成的问题诊断代码

This commit is contained in:
wangxu 2025-10-28 10:53:40 +08:00
parent 7a635dfdf9
commit 6b70ef10b8
4 changed files with 448 additions and 69 deletions

View File

@ -404,3 +404,25 @@ def count_lsr(turn_str):
# 返回格式化的结果 # 返回格式化的结果
return f"{left_count}/{straight_count}/{right_count}" return f"{left_count}/{straight_count}/{right_count}"
def time_overlap(a: str, b: List[str]) -> bool:
def to_minutes(t: str) -> int:
h, m = map(int, t.split(':'))
return h * 60 + m
a_start, a_end = map(to_minutes, a.split('-'))
for interval in b:
b_start, b_end = map(to_minutes, interval.split('-'))
# 检查是否有重叠
if not (a_end <= b_start or a_start >= b_end):
return True
return False
def parse_week_str(s: str):
start, end = [datetime.strptime(d, "%Y%m%d") for d in s.split('-')]
date_list = [(start + timedelta(days=i)).strftime("%Y%m%d")
for i in range((end - start).days + 1)]
return date_list

View File

@ -43,12 +43,18 @@ def query_cross_list(params):
"19:00-22:00", "19:00-22:00",
"22:00-00:00" "22:00-00:00"
] ]
peak_tp = [
"07:00-09:00",
"17:00-19:00"
]
else: else:
tp_info = tp_desc[0]['tp_desc'].split(',') tp_info = tp_desc[0]['tp_desc'].split(',')
peak_tp = tp_desc[0]['peak_tp'].split(',')
res = make_common_res(0, 'ok') res = make_common_res(0, 'ok')
res['data'] = { res['data'] = {
'cross_list': cross_list, 'cross_list': cross_list,
'tp_info': tp_info, 'tp_info': tp_info,
'peak_tp': peak_tp
} }
return json.dumps(res, ensure_ascii=False) return json.dumps(res, ensure_ascii=False)
@ -126,6 +132,8 @@ def query_cross_delay_info_controller(params):
inroad_static_info_dict = {item['roadid']: item for item in cross_inroads} inroad_static_info_dict = {item['roadid']: item for item in cross_inroads}
# 路口静态信息及台账信息 # 路口静态信息及台账信息
cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid) cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid)
if not cross_ledger_info_dict:
return json.dumps(make_common_res(10, '查询路口信息失败'))
cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict) cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict)
roads_dir_dict = gen_road_dir_dict(cross_ledger_info) roads_dir_dict = gen_road_dir_dict(cross_ledger_info)
# 路口指标数据概览 # 路口指标数据概览
@ -188,13 +196,18 @@ def query_cross_problems(params):
return json.dumps(make_common_res(9, '当前所选日期范围内该评测时段无可用数据')) return json.dumps(make_common_res(9, '当前所选日期范围内该评测时段无可用数据'))
cross_inroads = db_tmnet.query_cross_inroads(crossid, nodeid) cross_inroads = db_tmnet.query_cross_inroads(crossid, nodeid)
inroad_static_info_dict = {item['roadid']: item for item in cross_inroads} inroad_static_info_dict = {item['roadid']: item for item in cross_inroads}
# 验证是否为高峰时段
is_peak = check_is_peak_tp(time_range, area_id, nodeid)
# 路口静态信息及台账信息 # 路口静态信息及台账信息
cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid) cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid)
if not cross_ledger_info_dict:
return json.dumps(make_common_res(10, '查询路口信息失败'))
cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict) cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict)
roads_dir_dict = gen_road_dir_dict(cross_ledger_info) roads_dir_dict = gen_road_dir_dict(cross_ledger_info)
gen_cross_problems(avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict) cross_phase, err = QueryCrossRunningPhase(nodeid, [crossid], date_list, time_range)
# todo 与子鉴确认逻辑 if err or not cross_phase or cross_phase.code != 0:
cross_phase, err = QueryCrossRunningPhase(nodeid, [crossid]) logging.warning("路口未录入配时方案")
gen_cross_problems(avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict, cross_phase, is_peak)
# 指标变化趋势接口 # 指标变化趋势接口
@ -242,6 +255,8 @@ def query_cross_index_trend_controller(params):
# 查询台账信息 获取路网渠化关系 # 查询台账信息 获取路网渠化关系
cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid) cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid)
if not cross_ledger_info_dict:
return json.dumps(make_common_res(9, '查询路口信息失败'))
cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict) cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict)
roads_dir_dict = gen_road_dir_dict(cross_ledger_info) roads_dir_dict = gen_road_dir_dict(cross_ledger_info)

View File

@ -330,7 +330,7 @@ def query_cross_ledger_info(crossid, nodeid, area_id, userid):
cross_ledger_info = requests.get(ledger_url, headers=headers) cross_ledger_info = requests.get(ledger_url, headers=headers)
if cross_ledger_info.status_code != 200 or cross_ledger_info.json()['status'] != 0: if cross_ledger_info.status_code != 200 or cross_ledger_info.json()['status'] != 0:
logging.error(f"查询路口台账信息失败,crossid:{crossid},nodeid:{nodeid},area_id:{area_id},userid:{userid}") logging.error(f"查询路口台账信息失败,crossid:{crossid},nodeid:{nodeid},area_id:{area_id},userid:{userid}")
return None, None return None
cross_ledger_info_dict = json.loads(cross_ledger_info.text) cross_ledger_info_dict = json.loads(cross_ledger_info.text)
return cross_ledger_info_dict return cross_ledger_info_dict
@ -570,10 +570,12 @@ def calc_service_level(delay_time):
return service_level return service_level
def calc_tide_index(crossid, nodeid, date_list, roads_dir_dict): def calc_tide_index(crossid, nodeid, date_list, roads_dir_dict, date_type=None):
tide_index_list = [] tide_index_list = []
am_tp_start, am_tp_end = 't700', '900' am_tp_start, am_tp_end = 't700', '900'
pm_tp_start, pm_tp_end = 't1700', '1900' pm_tp_start, pm_tp_end = 't1700', '1900'
if date_type == 'week':
date_list = parse_week_str(date_list[0])
am_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, am_tp_start) am_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, am_tp_start)
pm_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, pm_tp_start) pm_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, pm_tp_start)
date_am_delay_info_dict = {item['day']: item['data'] for item in am_cross_delay_data_list} date_am_delay_info_dict = {item['day']: item['data'] for item in am_cross_delay_data_list}
@ -679,7 +681,7 @@ def parse_single_cross_delay_info(crossid, nodeid, data_list, data_type, roads_d
(x for x in pb_data_list if x is not None), (x for x in pb_data_list if x is not None),
key=lambda x: x.delay_info.car_num, key=lambda x: x.delay_info.car_num,
default=None default=None
).delay_info.car_num ).delay_info.car_num if pb_data_list else 0
for item in data_list: for item in data_list:
tp_start = item['tp_start'] tp_start = item['tp_start']
tp_end = item['tp_end'] tp_end = item['tp_end']
@ -696,7 +698,7 @@ def parse_single_cross_delay_info(crossid, nodeid, data_list, data_type, roads_d
overview_data = ['-', '-', '-', '-', '-', '-', '-', '-', '-', '-', overview_data = ['-', '-', '-', '-', '-', '-', '-', '-', '-', '-',
# 下方为上述指标变化率颜色展示flag # 下方为上述指标变化率颜色展示flag
0, 0, 0, 0, 0, 0, 0, 0, 0, 0] 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
tide_index = calc_tide_index(crossid, nodeid, [day], roads_dir_dict) if data_type == 'day' or data_type == 'week' else '-' tide_index = calc_tide_index(crossid, nodeid, [day], roads_dir_dict, date_type='week' if data_type == 'week' else None) if data_type != 'hour' else '-'
road_data_dict = {} road_data_dict = {}
cross_car_num = 0 cross_car_num = 0
if item_cross_delay_info: if item_cross_delay_info:
@ -907,25 +909,25 @@ def parse_data2pb(data_list):
return res_list return res_list
def gen_cross_problems(avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict): def gen_cross_problems(avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict, cross_phase, is_peak):
# 运行效率、均衡调控、配时方案、路口渠化 # 运行效率、均衡调控、配时方案、路口渠化
operating_efficiency_problems = gen_operating_efficiency_problems(avg_cross_delay_info, roads_dir_dict) operating_efficiency_problems = gen_operating_efficiency_problems(avg_cross_delay_info, roads_dir_dict, cross_phase, is_peak)
balanced_control_problems = gen_balanced_control_problems(avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict) balanced_control_problems = gen_balanced_control_problems(avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict, cross_phase)
phase_problems = gen_phase_problems() phase_problems = gen_phase_problems()
cross_channelized_problems = gen_cross_channelized_problems() cross_channelized_problems = gen_cross_channelized_problems()
problems = [operating_efficiency_problems, balanced_control_problems, phase_problems, cross_channelized_problems] problems = [operating_efficiency_problems, balanced_control_problems, phase_problems, cross_channelized_problems]
return problems return problems
def gen_operating_efficiency_problems(avg_cross_delay_info, roads_dir_dict): def gen_operating_efficiency_problems(avg_cross_delay_info, roads_dir_dict, cross_phase, is_peak):
if not avg_cross_delay_info: if not avg_cross_delay_info:
return [{ return [{
'item': '运行效率', 'item': '运行效率',
'values': [] 'values': []
}] }]
road_delay_infos = avg_cross_delay_info.inroad_delay_infos road_delay_infos = avg_cross_delay_info.inroad_delay_infos
high_park_problems, high_park_suggestions = gen_high_park_problems(road_delay_infos, roads_dir_dict) high_park_problems, high_park_suggestions = gen_high_park_problems(road_delay_infos, roads_dir_dict, cross_phase)
high_stop_times_problems, high_stop_times_suggestions = gen_high_stop_time_problems(avg_cross_delay_info, is_peak)
operating_efficiency_problems = { operating_efficiency_problems = {
'item': '运行效率', 'item': '运行效率',
'values': [ 'values': [
@ -935,14 +937,24 @@ def gen_operating_efficiency_problems(avg_cross_delay_info, roads_dir_dict):
'reason': '某一进口道转向的多次停车率大于15%', 'reason': '某一进口道转向的多次停车率大于15%',
'suggestions': high_park_suggestions 'suggestions': high_park_suggestions
}, },
{
'item': '停车较多',
'detail': high_stop_times_problems,
'reason': '高峰时段路口停车次数大于2次非高峰时段停车次数大于1次',
'suggestions': high_stop_times_suggestions
}
] ]
} }
pass return operating_efficiency_problems
def gen_high_park_problems(road_delay_infos, roads_dir_dict): def gen_high_park_problems(road_delay_infos, roads_dir_dict, cross_phase):
detail = [{ detail = [{
'text': '未见异常' 'child_detail': [
{
'text': '未见异常'
}
]
}] }]
suggestion = [] suggestion = []
err_src_dict = {} err_src_dict = {}
@ -966,12 +978,12 @@ def gen_high_park_problems(road_delay_infos, roads_dir_dict):
if err_src_dict: if err_src_dict:
detail = [] detail = []
for src_dir in err_src_dict.keys(): for src_dir in err_src_dict.keys():
src_detail = [] src_detail, src_suggestions, turn_type_list = [], [], []
for turn_type in err_src_dict[src_dir]: for turn_type in err_src_dict[src_dir]:
turn_type_list.append(int(turn_type.split(':')[0]))
flow_detail = { flow_detail = {
'turn_type': int_turn_type2str[int(turn_type.split(':')[0])], 'turn_type': int_turn_type2str[int(turn_type.split(':')[0])],
'text': '车辆多次停车率过大(', 'text': '车辆多次停车率过大(' + turn_type.split(':')[1] + '%'
'percent': turn_type.split(':')[1] + '%'
} }
src_detail.append(flow_detail) src_detail.append(flow_detail)
src_detail.append({ src_detail.append({
@ -979,24 +991,68 @@ def gen_high_park_problems(road_delay_infos, roads_dir_dict):
}) })
detail.append({ detail.append({
'src_dir': dir_str_dict[src_dir] + '进口', 'src_dir': dir_str_dict[src_dir] + '进口',
'src_detail': src_detail 'child_detail': src_detail
}) })
# todo 补充生成建议的逻辑 turn_type_str = ''.join([int_turn_type2str[int(item.split(':')[0])] for item in err_src_dict[src_dir]])
src_suggestions.append(
{
'text': '增加'
}
)
src_suggestions.append({
'src_dir': dir_str_dict[src_dir] + '进口' + turn_type_str,
'text': '所属相位的绿灯时间'
})
phase_suggestions = get_flow_phase_detail(src_dir, turn_type_list, cross_phase)
suggestion.append({
'child_detail': src_suggestions
})
suggestion.extend(phase_suggestions)
return detail, suggestion return detail, suggestion
def gen_high_stop_time_problems(road_delay_infos, roads_dir_dict): def gen_high_stop_time_problems(avg_cross_delay_info, is_peak):
detail = [{ detail = [{
'text': '未见异常' 'child_detail': [
{
'text': '未见异常'
}
]
}] }]
suggestion = [] suggestion = []
# todo 需与产品再次确认逻辑 desc, max_stop_times = '非高峰时段', 1
pass if is_peak:
# 表示为高峰时段,否则为平峰时段
desc, max_stop_times = '高峰时段', 2
if avg_cross_delay_info:
if avg_cross_delay_info.delay_info.stop_times > max_stop_times:
detail = [{
'child_detail': [
{
'desc': desc,
'text': '车辆多次停车时长过长(' + str(avg_cross_delay_info.delay_info.stop_times) + '),整体运行效率不高',
}
]
}]
suggestion = [
{
'child_detail': [
{
'text': '对路口',
'desc': desc
},
{
'text': '配时方案进行优化'
}
]
}
]
return detail, suggestion
def gen_balanced_control_problems(avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict): def gen_balanced_control_problems(avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict, cross_phase):
road_delay_infos = avg_cross_delay_info.inroad_delay_infos road_delay_infos = avg_cross_delay_info.inroad_delay_infos
cross_imbalance_detail, cross_imbalance_suggestions = gen_cross_imbalance_problems(road_delay_infos, roads_dir_dict) cross_imbalance_detail, cross_imbalance_suggestions = gen_cross_imbalance_problems(road_delay_infos, roads_dir_dict, cross_phase)
turn_imbalance_detail, turn_imbalance_suggestions = gen_turn_imbalance_problems(road_delay_infos, roads_dir_dict, inroad_static_info_dict) turn_imbalance_detail, turn_imbalance_suggestions = gen_turn_imbalance_problems(road_delay_infos, roads_dir_dict, inroad_static_info_dict)
# todo 需要与产品确认路口潮汐问题诊断的逻辑 # todo 需要与产品确认路口潮汐问题诊断的逻辑
balanced_control_problems = { balanced_control_problems = {
@ -1020,9 +1076,13 @@ def gen_balanced_control_problems(avg_cross_delay_info, roads_dir_dict, inroad_s
# 路口失衡问题诊断 # 路口失衡问题诊断
def gen_cross_imbalance_problems(road_delay_infos, roads_dir_dict): def gen_cross_imbalance_problems(road_delay_infos, roads_dir_dict, cross_phase):
detail = [{ detail = [{
'text': '未见异常' 'child_detail': [
{
'text': '未见异常'
}
]
}] }]
suggestion = [] suggestion = []
road_src_dict = {item['in']: item for item in roads_dir_dict} road_src_dict = {item['in']: item for item in roads_dir_dict}
@ -1032,47 +1092,66 @@ def gen_cross_imbalance_problems(road_delay_infos, roads_dir_dict):
and max_stop_times_road.delay_info.stop_times - min_stop_times_road.delay_info.stop_times > 0.5: and max_stop_times_road.delay_info.stop_times - min_stop_times_road.delay_info.stop_times > 0.5:
max_roadid = max_stop_times_road.inroadid max_roadid = max_stop_times_road.inroadid
min_roadid = min_stop_times_road.inroadid min_roadid = min_stop_times_road.inroadid
max_src = road_src_dict[max_roadid] max_src = road_src_dict[max_roadid] if max_roadid in road_src_dict else None
min_src = road_src_dict[min_roadid] min_src = road_src_dict[min_roadid] if min_roadid in road_src_dict else None
if max_src is None or min_src is None:
return detail, suggestion
detail = [ detail = [
{ {
'src_dir': dir_str_dict[max_src] + '进口', 'child_detail': [
'text': '的停车次数(' + str(round(max_stop_times_road.delay_info.stop_times, 2)) + ')与' {
}, 'src_dir': dir_str_dict[max_src] + '进口',
{ 'text': '的停车次数(' + str(round(max_stop_times_road.delay_info.stop_times, 2)) + ')与'
'src_dir': dir_str_dict[min_src] + '进口', },
'text': f"""的停车次数({str(round(max_stop_times_road.delay_info.stop_times, 2))})相差过大,两者之比为{int(round(max_stop_times_road.delay_info.stop_times, 2) / round(min_stop_times_road.delay_info.stop_times, 2) * 100)}%,分配的绿灯时长不匹配""" {
'src_dir': dir_str_dict[min_src] + '进口',
'text': f"""的停车次数({str(round(max_stop_times_road.delay_info.stop_times, 2))})相差过大,两者之比为{int(round(max_stop_times_road.delay_info.stop_times, 2) / round(min_stop_times_road.delay_info.stop_times, 2) * 100)}%,分配的绿灯时长不匹配"""
}
]
} }
] ]
suggestion = [ suggestion = [
{ {
'text': '调整', 'child_detail': [
'max_src_dir': dir_str_dict[max_src] + '进口', {
}, 'text': '调整',
{ 'max_src_dir': dir_str_dict[max_src] + '进口',
'text': '', },
'min_src_dir': dir_str_dict[min_src] + '进口', {
}, 'text': '',
{ 'min_src_dir': dir_str_dict[min_src] + '进口',
'text': '的绿灯时长的分配情况' },
{
'text': '的绿灯时长的分配情况'
}
]
} }
] ]
is_tide = False
if max_src == src_reverse[min_src]: if max_src == src_reverse[min_src]:
# todo 补充对向情况下额外建议的情况 is_tide = True
pass phase_suggestions = gen_src_dir_phase_detail(max_src, min_src, cross_phase, is_tide=is_tide)
suggestion.extend(phase_suggestions)
return detail, suggestion return detail, suggestion
# 转向失衡问题诊断 # 转向失衡问题诊断
def gen_turn_imbalance_problems(road_delay_infos, roads_dir_dict, inroad_static_info_dict): def gen_turn_imbalance_problems(road_delay_infos, roads_dir_dict, inroad_static_info_dict, cross_phase):
detail = [{ detail = [{
'text': '未见异常' 'child_detail': [
{
'text': '未见异常'
}
]
}] }]
suggestion = [] suggestion = []
err_road_dict = {} err_road_dict = {}
road_src_dict = {item['in']: item for item in roads_dir_dict} road_src_dict = {item['in']: item for item in roads_dir_dict}
for road_delay_info in road_delay_infos: for road_delay_info in road_delay_infos:
inroadid = road_delay_info.inroadid inroadid = road_delay_info.inroadid
src_dir = road_src_dict[inroadid] if inroadid in road_src_dict.keys() else None
if src_dir is None:
continue
flow_delay_infos = road_delay_info.flow_delay_infos flow_delay_infos = road_delay_info.flow_delay_infos
turn_type_flow_delay_info_dict = {item.turn_type: item for item in flow_delay_infos} turn_type_flow_delay_info_dict = {item.turn_type: item for item in flow_delay_infos}
lane_num_info = count_lsr(inroad_static_info_dict[inroadid]['lane_turn_info']) if inroadid in inroad_static_info_dict.keys() else None lane_num_info = count_lsr(inroad_static_info_dict[inroadid]['lane_turn_info']) if inroadid in inroad_static_info_dict.keys() else None
@ -1109,27 +1188,40 @@ def gen_turn_imbalance_problems(road_delay_infos, roads_dir_dict, inroad_static_
detail.append(item_detail) detail.append(item_detail)
item_suggestion = [ item_suggestion = [
{ {
'src_dir': dir_str_dict[src_dir] + '进口', 'child_detail': [
'text': '信号灯直左分控,调整执行和左转车辆绿灯时长分配情况' {
}, 'src_dir': dir_str_dict[src_dir] + '进口',
# todo 补充配时相关建议 'text': '信号灯直左分控,调整直行和左转车辆绿灯时长分配情况'
}
]
}
] ]
suggestion.extend(item_suggestion) suggestion.extend(item_suggestion)
phase_suggestion = get_flow_phase_detail(src_dir, ['直行', '左转'], cross_phase)
suggestion.extend(phase_suggestion)
if err_road_dict[src_dir][2]: if err_road_dict[src_dir][2]:
road_num_suggestion.append( road_num_suggestion.append(
{ {
'src_dir': dir_str_dict[src_dir] + '进口', 'child_detail': [
'text': '左转/直行/右转现有车道数分别为' + err_road_dict[src_dir][2] {
'src_dir': dir_str_dict[src_dir] + '进口',
'text': '左转/直行/右转现有车道数分别为' + err_road_dict[src_dir][2]
}
]
} }
) )
lane_num_suggestion = [ lane_num_suggestion = [
{ {
'text': '调整', 'child_detail': [
'src_dir': ''.join([dir_str_dict[item] + '进口' for item in err_road_dict.keys()]) {
}, 'text': '调整',
{ 'src_dir': ''.join([dir_str_dict[item] + '进口' for item in err_road_dict.keys()])
'text': '车道分配情况' },
{
'text': '车道分配情况'
}
]
} }
] ]
lane_num_suggestion.extend(road_num_suggestion) lane_num_suggestion.extend(road_num_suggestion)
@ -1142,14 +1234,264 @@ def gen_phase_problems():
pass pass
def gen_cross_channelized_problems(): def gen_cross_channelized_problems(inroad_static_info_dict):
pass pass
def query_cross_phase(nodeid, crossid): def gen_inroad_num_problems(road_delay_infos, inroad_static_info_dict, roads_dir_dict):
cross_phases, err = QueryCrossRunningPhase(nodeid, [crossid]) detail = [{
if err or not cross_phases or cross_phases.code != 0: 'child_detail': [
return None {
cross_phase_info = cross_phases.data[0] 'text': '未见异常'
}
]
}]
suggestions, err_src_dirs = [], {}
road_src_dict = {item['in']: item for item in roads_dir_dict}
max_flow_car_num = 0
for road_delay in road_delay_infos:
inroadid = road_delay.inroadid
src_dir = road_src_dict[inroadid]
if src_dir is None:
continue
lane_info = inroad_static_info_dict[inroadid]['lane_turn_info']
flow_delay_infos = road_delay.flow_delay_infos
max_car_num = max([item.delay_info.car_num for item in flow_delay_infos if item.turn_type in [0, 1]]).delay_info.car_num
if max_car_num > max_flow_car_num:
max_flow_car_num = max_car_num
road_car_num = road_delay.delay_info.car_num
turn_delay_dict = {item.turn_type: item for item in flow_delay_infos}
left_lane_num_half, straight_lane_num_half, right_lane_num_half = count_lsr_half(lane_info)
sum_half_num = left_lane_num_half + straight_lane_num_half + right_lane_num_half
left_lane_num_rate = round(left_lane_num_half / sum_half_num) if sum_half_num > 0 else 0
straight_lane_num_rate = round(straight_lane_num_half / sum_half_num) if sum_half_num > 0 else 0
for turn_type in [0, 1]:
if turn_type not in turn_delay_dict.keys():
continue
if turn_type == 0:
turn_type_str = '直行'
lane_num_rate = straight_lane_num_rate
else:
turn_type_str = '左转'
lane_num_rate = left_lane_num_rate
flow_car_num = turn_delay_dict[turn_type].delay_info.car_num
relative_rate = flow_car_num / max_flow_car_num
flow_rate = flow_car_num / road_car_num
if relative_rate > 0.6 and lane_num_rate < 0.5:
if src_dir in err_src_dirs.keys():
err_src_dirs[src_dir].append({
'turn_type': turn_type_str,
'flow_rate': flow_rate,
'lane_num_rate': lane_num_rate
})
else:
err_src_dirs[src_dir] = [{
'turn_type': turn_type_str,
'flow_rate': flow_rate,
'lane_num_rate': lane_num_rate
}]
if len(err_src_dirs.keys()) > 0:
detail = []
for src_dir in err_src_dirs.keys():
err_road_dict = err_src_dirs[src_dir]
src_detail = [
{
'src_dir': dir_str_dict[src_dir] + '进口'
}
]
for turn_type in err_road_dict:
src_detail.append({
'child_detail': [
{
'turn_type': turn_type['turn_type'],
'text': '分流转向比过大(' + str(int(round(turn_type['flow_rate'], 2) * 100)) + '%),但分配的转向车道占比不足(' + str(int(round(turn_type['lane_num_rate'], 2) * 100)) + '%),分配车道资源不匹配'
}
]
})
detail.append(
{
'child_detail': src_detail
}
)
pass
pass pass
def check_is_peak_tp(time_range, area_id, nodeid):
tp_desc = db_tmnet.query_city_tp_info(nodeid, area_id)
peak_tp = ['07:00-09:00', '17:00-19:00']
if tp_desc and len(tp_desc) > 0:
peak_tp = tp_desc[0]['peak_tp'].split(',')
return time_overlap(time_range, peak_tp)
def get_flow_phase_detail(src_dir, turn_type_list, cross_phase):
phase_suggestion = []
for item in cross_phase:
# 单个元素的含义为一个周计划
schedule_suggestion = []
need_src_flow = {dir_str_dict[src_dir] + int_turn_type2str[item]: 0 for item in turn_type_list}
schedule_name = item.schedule_name
tp_info = item.tps[0]
planid = tp_info.planid
plan_name = tp_info.plan_name
stage_list = tp_info.stages
for stage in stage_list:
phases_name = stage.phases_name
phases_name_list = []
if phases_name != '':
phases_name_list = phases_name.split(',')
green = stage.green
for key in need_src_flow.keys():
if key in phases_name_list:
need_src_flow[key] += green
need_src_flow = {k: v for k, v in need_src_flow.items() if v != 0}
if len(need_src_flow.keys()) > 0:
schedule_suggestion.append(
{
'phase': f"{schedule_name}{str(planid)}-{plan_name}",
'text': '现有的'
}
)
keys = list(need_src_flow.keys())
for i in range(len(keys)):
if i != len(keys) - 1:
schedule_suggestion.append(
{
'src_dir': keys[i],
'text': '绿灯时长为' + str(keys[i]) + 's'
}
)
else:
schedule_suggestion.append(
{
'src_dir': keys[i],
'text': '绿灯时长为' + str(keys[i]) + 's'
}
)
phase_suggestion.append({
'child_detail': schedule_suggestion
})
return phase_suggestion
def gen_src_dir_phase_detail(max_src_dir, min_src_dir, cross_phase, is_tide=False):
phase_suggestion = []
for item in cross_phase:
# 单个元素的含义为一个周计划
schedule_suggestion = []
schedule_name = item.schedule_name
src_green = {
dir_str_dict[max_src_dir]: 0,
dir_str_dict[min_src_dir]: 0
}
tp_info = item.tps[0]
planid = tp_info.planid
plan_name = tp_info.plan_name
stage_list = tp_info.stages
for stage in stage_list:
phases_name = stage.phases_name
src_dir_set = set()
if phases_name != '':
tmp_list = phases_name.split(',')
for phase in tmp_list:
if '行人' in phase:
continue
src_dir_set.add(phase[:-2])
if dir_str_dict[max_src_dir] in src_dir_set:
src_green[dir_str_dict[max_src_dir]] += stage.green
if dir_str_dict[min_src_dir] in src_dir_set:
src_green[dir_str_dict[min_src_dir]] += stage.green
if src_green[dir_str_dict[max_src_dir]] != 0 and src_green[dir_str_dict[min_src_dir]] != 0:
schedule_suggestion = [
{
'phase': f"{schedule_name}{str(planid)}-{plan_name}",
'text': '现有的'
},
{
'src_dir': dir_str_dict[max_src_dir] + '进口',
'text': '绿灯时长为' + str(src_green[dir_str_dict[max_src_dir]]) + 's'
},
{
'src_dir': dir_str_dict[min_src_dir] + '进口',
'text': '绿灯时长为' + str(src_green[dir_str_dict[min_src_dir]]) + 's'
}
]
if schedule_suggestion:
phase_suggestion.append({
'child_detail': schedule_suggestion
})
need2sug = False
if is_tide:
for item in cross_phase:
stages_info = item.tps[0].stages
for stage in stages_info:
phases_name = stage.phases_name
if dir_str_dict[max_src_dir] + '直行' in phases_name\
and dir_str_dict[min_src_dir] + '直行' in phases_name\
and dir_str_dict[max_src_dir] + '左转' in phases_name\
and dir_str_dict[min_src_dir] + '左转' in phases_name:
need2sug = True
if need2sug:
phase_suggestion.append({
'text': '相位放行方式可以考虑从对称放行调整为单路口放行'
})
return phase_suggestion
def count_lsr_half(turn_str):
"""
统计车道转向中左转(l)直行(s)右转(r)的数量
当一个车道有多个转向时每个转向计0.5
Args:
turn_str: 形如 "2|1|1|1|5" 的字符串每个数字对应 g_turn2str 中的键
Returns:
形如 "1.0/3.0/1.0" 的字符串顺序为左转直行右转的数量
"""
# 初始化计数器
left_count = 0.0
straight_count = 0.0
right_count = 0.0
# 分割输入字符串
turn_keys = turn_str.split('|')
# 遍历每个车道
for key in turn_keys:
if key in g_turn2str:
turn_value = g_turn2str[key]
# 忽略掉头和特殊车道
if turn_value in ['t', 'bus', 'reversible', '-']:
continue
# 计算该车道的转向数量
turn_count = 0
if 'l' in turn_value:
turn_count += 1
if 's' in turn_value:
turn_count += 1
if 'r' in turn_value:
turn_count += 1
# 如果该车道有转向,则按转向数量平分权重
if turn_count > 0:
weight = 0.5 if turn_count > 1 else 1.0
if 'l' in turn_value:
left_count += weight
if 's' in turn_value:
straight_count += weight
if 'r' in turn_value:
right_count += weight
# 返回格式化的结果,保留一位小数
return round(left_count, 1), round(straight_count, 1), round(right_count, 1)

View File

@ -91,5 +91,5 @@ class TmnetDbHelper(TableDbHelperBase):
return self.do_select(sql) return self.do_select(sql)
def query_city_tp_info(self, nodeid, area_id): def query_city_tp_info(self, nodeid, area_id):
sql = f"select tp_desc from cross_doctor_config.area_tp_config where nodeid = {nodeid} and area_id = {area_id}" sql = f"select tp_desc, peak_tp from cross_doctor_config.area_tp_config where nodeid = {nodeid} and area_id = {area_id}"
return self.do_select(sql) return self.do_select(sql)