cross_doctor/app/eva_common.py

1759 lines
84 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2025/10/10 19:51
# @Description: 路口评价相关的公共函数
import copy
import json
import logging
import requests
from google.protobuf.json_format import MessageToJson
import proto.xlcomm_pb2 as pb
from app.common_worker import *
from proto.phase_grpc import QueryCrossRunningPhase, QueryCrossPhaseDiagnosis
src_reverse = {'N': 'S', 'S': 'N', 'W': 'E', 'E': 'W', 'NE': 'SW', 'SW': 'NE', 'SE': 'NW', 'NW': 'SE'}
dir_str_dict = {
'E': '', 'S': '', 'W': '西', 'N': '', 'NE': '东北', 'SE': '东南', 'SW': '西南', 'NW': '西北'
}
int_turn_type2str = {
0: '直行',
1: '左转',
2: '右转',
3: '掉头'
}
int_internet_code2str = {
0: '',
1: '联网',
2: '脱机',
3: '未接入'
}
int_cross_model2str = {
'1': '',
'2': '',
'3': '',
'4': ''
}
def gen_avg_cross_delay_pb(cross_delay_data_list):
"""
路口评价指标数据聚合
:param cross_delay_data_list:
:return:
"""
cross_delay_pb_list = []
for item in cross_delay_data_list:
item_delay_pb = pb.xl_cross_delayinfo_t()
item_delay_pb.ParseFromString(item['data'])
cross_delay_pb_list.append(item_delay_pb)
avg_cross_delay = None
if len(cross_delay_pb_list) == 0:
return avg_cross_delay
elif len(cross_delay_pb_list) == 1:
avg_cross_delay = cross_delay_pb_list[0]
inroad_delay_pb_list = avg_cross_delay.inroad_delay_infos
inroad_delay_infos_with_imbalance = calc_inroad_imbalance_index(inroad_delay_pb_list)
del avg_cross_delay.inroad_delay_infos[:]
avg_cross_delay.inroad_delay_infos.extend(inroad_delay_infos_with_imbalance)
return avg_cross_delay
avg_cross_delay = pb.xl_cross_delayinfo_t()
avg_cross_delay.crossid = cross_delay_pb_list[0].crossid
avg_cross_delay.tp.start_hm = cross_delay_pb_list[0].tp.start_hm
avg_cross_delay.tp.end_hm = cross_delay_pb_list[0].tp.end_hm
avg_cross_delay.tp.weekday = cross_delay_pb_list[0].tp.weekday
avg_cross_delay.tp.type = cross_delay_pb_list[0].tp.type
avg_cross_delay.daynum = 0
sum_car_num, delay_time, stop_times, queue_len, speed, jam_index, park_time, std_flow, high_park_percent, truck_percent, park_percent, move_speed, imbalance_index\
= 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
# 路口延误数据指标聚合逻辑
for item in cross_delay_pb_list:
item_delay_info = item.delay_info
item_car_num = item_delay_info.car_num
if item_car_num != 0:
sum_car_num += item_car_num
delay_time += item_delay_info.delay_time * item_car_num
stop_times += item_delay_info.stop_times * item_car_num
queue_len += item_delay_info.queue_len * item_car_num
speed += item_delay_info.speed * item_car_num
jam_index += item_delay_info.jam_index * item_car_num
park_time += item_delay_info.park_time * item_car_num
high_park_percent += item_delay_info.high_park_percent * item_car_num
truck_percent += item_delay_info.truck_percent * item_car_num
park_percent += item_delay_info.park_percent * item_car_num
move_speed += item_delay_info.move_speed * item_car_num
# imbalance_index += item_delay_info.imbalance_index * item_car_num
std_flow += item_delay_info.std_flow
avg_cross_delay.delay_info.car_num = sum_car_num
avg_cross_delay.delay_info.delay_time = int(delay_time / sum_car_num)
avg_cross_delay.delay_info.stop_times = stop_times / sum_car_num
avg_cross_delay.delay_info.queue_len = int(queue_len / sum_car_num)
avg_cross_delay.delay_info.speed = int(speed / sum_car_num)
avg_cross_delay.delay_info.jam_index = jam_index / sum_car_num
avg_cross_delay.delay_info.park_time = int(park_time / sum_car_num)
avg_cross_delay.delay_info.high_park_percent = int(high_park_percent / sum_car_num)
avg_cross_delay.delay_info.truck_percent = int(truck_percent / sum_car_num)
avg_cross_delay.delay_info.park_percent = int(park_percent / sum_car_num)
avg_cross_delay.delay_info.move_speed = int(move_speed / sum_car_num)
# avg_cross_delay.delay_info.imbalance_index = imbalance_index / sum_car_num
avg_cross_delay.delay_info.std_flow = int(std_flow / len(cross_delay_pb_list))
# 进口道延误数据聚合处理逻辑
inroad_delay_info_dict = {}
for item in cross_delay_pb_list:
inroad_delay_infos = item.inroad_delay_infos
for inroad_item in inroad_delay_infos:
inroadid = inroad_item.inroadid
if inroadid not in inroad_delay_info_dict:
inroad_delay_info_dict[inroadid] = [inroad_item]
else:
inroad_delay_info_dict[inroadid].append(inroad_item)
flow_delay_list, inroad_delay_pb_list = [], []
flow_delay_info_dict = {}
max_stop_times, min_stop_times = 0, 999999999
for inroadid in inroad_delay_info_dict.keys():
road_delay_infos = inroad_delay_info_dict[inroadid]
inroad_sum_car_num, inroad_delay_time, inroad_stop_times, inroad_queue_len, inroad_speed, inroad_jam_index, inroad_park_time, inroad_high_park_percent, inroad_truck_percent, inroad_park_percent, inroad_move_speed, inroad_imbalance_index, inroad_std_flow, inroad_travel_time\
= 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
ffs, capacity = road_delay_infos[0].delay_info.ffs, road_delay_infos[0].delay_info.capacity
turn_ratio_0, turn_ratio_1, turn_ratio_2, turn_ratio_3 = 0, 0, 0, 0
for road_delay_info in road_delay_infos:
inroad_sum_car_num += road_delay_info.delay_info.car_num
inroad_delay_time += road_delay_info.delay_info.delay_time * road_delay_info.delay_info.car_num
inroad_stop_times += road_delay_info.delay_info.stop_times * road_delay_info.delay_info.car_num
inroad_queue_len += road_delay_info.delay_info.queue_len * road_delay_info.delay_info.car_num
inroad_speed += road_delay_info.delay_info.speed * road_delay_info.delay_info.car_num
inroad_jam_index += road_delay_info.delay_info.jam_index * road_delay_info.delay_info.car_num
inroad_park_time += road_delay_info.delay_info.park_time * road_delay_info.delay_info.car_num
inroad_high_park_percent += road_delay_info.delay_info.high_park_percent * road_delay_info.delay_info.car_num
inroad_truck_percent += road_delay_info.delay_info.truck_percent * road_delay_info.delay_info.car_num
inroad_park_percent += road_delay_info.delay_info.park_percent * road_delay_info.delay_info.car_num
inroad_move_speed += road_delay_info.delay_info.move_speed * road_delay_info.delay_info.car_num
inroad_travel_time += road_delay_info.delay_info.travel_time * road_delay_info.delay_info.car_num
# inroad_imbalance_index += delay_info.imbalance_index * delay_info.car_num
inroad_std_flow += road_delay_info.delay_info.std_flow
turn_ratio_0 += road_delay_info.delay_info.turn_ratio_0
turn_ratio_1 += road_delay_info.delay_info.turn_ratio_1
turn_ratio_2 += road_delay_info.delay_info.turn_ratio_2
turn_ratio_3 += road_delay_info.delay_info.turn_ratio_3
# 流向级别数据指标聚合处理逻辑
flow_delay_infos = road_delay_info.flow_delay_infos
flow_delay_list.extend(flow_delay_infos)
inroad_delay_info = pb.xl_inroad_delayinfo_t()
inroad_delay_info.inroadid = inroadid
inroad_delay_info.delay_info.ffs = ffs
inroad_delay_info.delay_info.capacity = capacity
inroad_delay_info.delay_info.car_num = inroad_sum_car_num
inroad_delay_info.delay_info.delay_time = int(inroad_delay_time / inroad_sum_car_num)
inroad_delay_info.delay_info.stop_times = inroad_stop_times / inroad_sum_car_num
if inroad_delay_info.delay_info.stop_times > max_stop_times:
max_stop_times = inroad_delay_info.delay_info.stop_times
if inroad_delay_info.delay_info.stop_times < min_stop_times:
min_stop_times = inroad_delay_info.delay_info.stop_times
inroad_delay_info.delay_info.queue_len = int(inroad_queue_len / inroad_sum_car_num)
inroad_delay_info.delay_info.speed = int(inroad_speed / inroad_sum_car_num)
inroad_delay_info.delay_info.jam_index = inroad_jam_index / inroad_sum_car_num
inroad_delay_info.delay_info.park_time = int(inroad_park_time / inroad_sum_car_num)
inroad_delay_info.delay_info.high_park_percent = int(inroad_high_park_percent / inroad_sum_car_num)
inroad_delay_info.delay_info.truck_percent = int(inroad_truck_percent / inroad_sum_car_num)
inroad_delay_info.delay_info.park_percent = int(inroad_park_percent / inroad_sum_car_num)
inroad_delay_info.delay_info.move_speed = int(inroad_move_speed / inroad_sum_car_num)
inroad_delay_info.delay_info.travel_time = int(inroad_travel_time / inroad_sum_car_num)
# inroad_delay_info.delay_info.imbalance_index = inroad_imbalance_index / inroad_sum_car_num
inroad_delay_info.delay_info.std_flow = int(inroad_std_flow / len(road_delay_infos))
inroad_delay_info.delay_info.turn_ratio_0 = turn_ratio_0
inroad_delay_info.delay_info.turn_ratio_1 = turn_ratio_1
inroad_delay_info.delay_info.turn_ratio_2 = turn_ratio_2
inroad_delay_info.delay_info.turn_ratio_3 = turn_ratio_3
inroad_delay_pb_list.append(inroad_delay_info)
cross_imbalance_index = round((max_stop_times - min_stop_times) / avg_cross_delay.delay_info.stop_times, 2) if avg_cross_delay.delay_info.stop_times != 0 else 0
avg_cross_delay.delay_info.imbalance_index = cross_imbalance_index
for flow_delay_info in flow_delay_list:
xlink_id = flow_delay_info.xlink_id
turn_type = flow_delay_info.turn_type
if xlink_id not in flow_delay_info_dict:
flow_delay_info_dict[str(xlink_id) + '-' + str(turn_type)] = [flow_delay_info]
else:
flow_delay_info_dict[str(xlink_id) + '-' + str(turn_type)].append(flow_delay_info)
for key in flow_delay_info_dict.keys():
flow_delay_infos = flow_delay_info_dict[key]
inroadid = flow_delay_infos[0].inroadid
turn_type = flow_delay_infos[0].turn_type
flow_sum_car_num, flow_delay_time, flow_stop_times, flow_queue_len, flow_speed, flow_jam_index, flow_park_time, flow_high_park_percent, flow_truck_percent, flow_park_percent, flow_move_speed, flow_imbalance_index, flow_std_flow, flow_travel_time \
= 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
for flow_delay_info in flow_delay_infos:
flow_sum_car_num += flow_delay_info.delay_info.car_num
flow_delay_time += flow_delay_info.delay_info.delay_time * flow_delay_info.delay_info.car_num
flow_stop_times += flow_delay_info.delay_info.stop_times * flow_delay_info.delay_info.car_num
flow_queue_len += flow_delay_info.delay_info.queue_len * flow_delay_info.delay_info.car_num
flow_travel_time += flow_delay_info.delay_info.travel_time * flow_delay_info.delay_info.car_num
flow_speed += flow_delay_info.delay_info.speed * flow_delay_info.delay_info.car_num
flow_jam_index += flow_delay_info.delay_info.jam_index * flow_delay_info.delay_info.car_num
flow_park_time += flow_delay_info.delay_info.park_time * flow_delay_info.delay_info.car_num
flow_high_park_percent += flow_delay_info.delay_info.high_park_percent * flow_delay_info.delay_info.car_num
flow_truck_percent += flow_delay_info.delay_info.truck_percent * flow_delay_info.delay_info.car_num
flow_park_percent += flow_delay_info.delay_info.park_percent * flow_delay_info.delay_info.car_num
flow_move_speed += flow_delay_info.delay_info.move_speed * flow_delay_info.delay_info.car_num
# flow_imbalance_index += flow_delay_info.delay_info.imbalance_index * flow_delay_info.delay_info.car_num
flow_std_flow += flow_delay_info.delay_info.std_flow
flow_info = pb.xl_flow_delayinfo_t()
flow_info.xlink_id = int(key.split('-')[0])
flow_info.inroadid = inroadid
flow_info.turn_type = turn_type
flow_info.delay_info.car_num = flow_sum_car_num
flow_info.delay_info.delay_time = int(flow_delay_time / flow_sum_car_num)
flow_info.delay_info.stop_times = flow_stop_times / flow_sum_car_num
flow_info.delay_info.queue_len = int(flow_queue_len / flow_sum_car_num)
flow_info.delay_info.speed = int(flow_speed / flow_sum_car_num)
flow_info.delay_info.travel_time = int(flow_travel_time / flow_sum_car_num)
flow_info.delay_info.jam_index = flow_jam_index / flow_sum_car_num
flow_info.delay_info.park_time = int(flow_park_time / flow_sum_car_num)
flow_info.delay_info.high_park_percent = int(flow_high_park_percent / flow_sum_car_num)
flow_info.delay_info.truck_percent = int(flow_truck_percent / flow_sum_car_num)
flow_info.delay_info.park_percent = int(flow_park_percent / flow_sum_car_num)
flow_info.delay_info.move_speed = int(flow_move_speed / flow_sum_car_num)
# flow_info.delay_info.imbalance_index = flow_imbalance_index / flow_sum_car_num
flow_info.delay_info.std_flow = int(flow_std_flow / len(flow_delay_infos))
for inroad_info in inroad_delay_pb_list:
if inroad_info.inroadid == inroadid:
inroad_info.flow_delay_infos.append(flow_info)
inroad_delay_infos_with_imbalance = calc_inroad_imbalance_index(inroad_delay_pb_list)
avg_cross_delay.inroad_delay_infos.extend(inroad_delay_infos_with_imbalance)
# 出口道流量数据聚合逻辑
outroad_delay_info_dict = {}
for item in cross_delay_pb_list:
outroadid_infos = item.outroad_infos
for outroadid_info in outroadid_infos:
outroadid = outroadid_info.outroadid
if outroadid not in outroad_delay_info_dict:
outroad_delay_info_dict[outroadid] = [outroadid_info]
else:
outroad_delay_info_dict[outroadid].append(outroadid_info)
for outroadid in outroad_delay_info_dict.keys():
outroadid_infos = outroad_delay_info_dict[outroadid]
car_num, turn_ratio_0, turn_ratio_1, turn_ratio_2, turn_ratio_3 = 0, 0, 0, 0, 0
for outroadid_info in outroadid_infos:
turn_info = outroadid_info.turn_info
car_num += turn_info.car_num
turn_ratio_0 += turn_info.turn_ratio_0
turn_ratio_1 += turn_info.turn_ratio_1
turn_ratio_2 += turn_info.turn_ratio_2
turn_ratio_3 += turn_info.turn_ratio_3
outroadid_info = pb.xl_outroad_info_t()
outroadid_info.outroadid = outroadid
outroadid_info.turn_info.car_num = car_num
outroadid_info.turn_info.turn_ratio_0 = turn_ratio_0
outroadid_info.turn_info.turn_ratio_1 = turn_ratio_1
outroadid_info.turn_info.turn_ratio_2 = turn_ratio_2
outroadid_info.turn_info.turn_ratio_3 = turn_ratio_3
avg_cross_delay.outroad_infos.append(outroadid_info)
return avg_cross_delay
# 生成路口诊断页面路口概览数据指标的函数
def gen_overview_index(avg_cross_delay_info, inroad_static_info_dict, nodeid, date_list, roads_dir_dict):
car_num = avg_cross_delay_info.delay_info.car_num
crossid = avg_cross_delay_info.crossid
jam_index = round(avg_cross_delay_info.delay_info.jam_index, 2)
stop_times = round(avg_cross_delay_info.delay_info.stop_times, 2)
high_park_percent = str(avg_cross_delay_info.delay_info.high_park_percent) + '%'
imbalance_index = round(avg_cross_delay_info.delay_info.imbalance_index, 2)
speed = avg_cross_delay_info.delay_info.speed / 100
move_speed = avg_cross_delay_info.delay_info.move_speed / 100
park_time = avg_cross_delay_info.delay_info.park_time
delay_time = avg_cross_delay_info.delay_info.delay_time
high_stop_turn_ratio_desc, main_flow_src_desc = gen_high_stop_turn_ratio_desc(avg_cross_delay_info.inroad_delay_infos, inroad_static_info_dict, car_num)
tide_index_list = calc_tide_index(crossid, nodeid, date_list, roads_dir_dict)
usable_tide_list = [item for item in tide_index_list if item != 0]
tide_index = round(sum(usable_tide_list) / len(usable_tide_list), 2) if len(usable_tide_list) > 0 else 0
service_level = calc_service_level(delay_time)
overview_res = {
'jam_index': jam_index,
'stop_times': stop_times,
'high_park_percent': high_park_percent,
'imbalance_index': imbalance_index,
'speed': speed,
'move_speed': move_speed,
'park_time': park_time,
'delay_time': delay_time,
'service_level': service_level,
'high_stop_turn_ratio_desc': high_stop_turn_ratio_desc,
'main_flow_src_desc': main_flow_src_desc,
'tide_index': tide_index
}
return overview_res
def gen_high_stop_turn_ratio_desc(inroad_delay_infos, inroad_static_info_dict, car_num):
src_dict, src_desc_list, main_flow_src_list = {}, [], []
for inroad_info in inroad_delay_infos:
inroadid = inroad_info.inroadid
if inroadid not in inroad_static_info_dict.keys():
continue
inroad_car_num = inroad_info.delay_info.car_num
src_dir = dir_str_dict[inroad_static_info_dict[inroadid]['src_direct']]
if inroad_car_num / car_num > 0.25:
check_res = found_max_car_num_flow(inroad_car_num, inroad_info)
if check_res != -1:
main_flow_src_list.append(src_dir + int_turn_type2str[check_res])
flow_delay_infos = inroad_info.flow_delay_infos
for item in flow_delay_infos:
if item.delay_info.stop_times >= 1:
turn_type = int_turn_type2str[item.turn_type]
if src_dir not in src_dict.keys():
src_dict[src_dir] = [turn_type]
else:
src_dict[src_dir].append(turn_type)
for item in src_dict.keys():
src_desc_list.append(item + '/'.join(src_dict[item]))
return ''.join(src_desc_list) if src_desc_list else '', ''.join(main_flow_src_list) if main_flow_src_list else ''
def found_max_car_num_flow(inroad_car_num, inroad_info):
if not inroad_info:
return -1
data_list = [inroad_info.delay_info.turn_ratio_0, inroad_info.delay_info.turn_ratio_1, inroad_info.delay_info.turn_ratio_2, inroad_info.delay_info.turn_ratio_3]
for i in range(len(data_list)):
if data_list[i] / inroad_car_num > 0.5:
return i
return -1
def query_cross_ledger_info(crossid, nodeid, area_id, userid):
ledger_url = f"http://82.157.173.20:7070/api/common/cross_ledger_detail?crossid={crossid}&crossno=&nodeid={nodeid}&area_id={area_id}&userid={userid}"
# ledger_url = f"http://120.53.125.169:7070/api/common/cross_ledger_detail?crossid={crossid}&crossno=&nodeid={nodeid}&area_id={area_id}&userid={userid}"
headers = {"Content-Type": "application/json"}
cross_ledger_info = requests.get(ledger_url, headers=headers)
if cross_ledger_info.status_code != 200 or cross_ledger_info.json()['status'] != 0:
logging.error(f"查询路口台账信息失败,crossid:{crossid},nodeid:{nodeid},area_id:{area_id},userid:{userid}")
return None
cross_ledger_info_dict = json.loads(cross_ledger_info.text)
return cross_ledger_info_dict
def gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict):
division_list, company_list, slc_company_dict, slckind_list, detector_type_dict = gen_ledger_base_info(nodeid, area_id)
slc_company = cross_ledger_info_dict['data']['ledger']['slc_company']
location = cross_ledger_info_dict['data']['ledger']['location']
slc_company_name = slc_company_dict[slc_company] if slc_company in slc_company_dict.keys() else '-'
internet = cross_ledger_info_dict['data']['ledger']['internet']
internet_str = int_internet_code2str[internet]
primary = cross_ledger_info_dict['data']['ledger']['primary']
secondary = cross_ledger_info_dict['data']['ledger']['secondary']
cross_model = int_cross_model2str[primary] + '-' + int_cross_model2str[secondary]
light_infos = cross_ledger_info_dict['data']['light_infos']
src_light_dict = gen_dir_light_info_dict(light_infos)
roads = cross_ledger_info_dict['data']['roads']
dir_list = roads.keys()
length_info, lane_nums_list, road_special_info = {}, [], {}
sum_length = 0
for dir in dir_list:
dir_length = roads[dir]['topology']['entry']['road_length']
straight_left_split, reverse_turn, left_turn_waiting_area, straight_turn_waiting_area, reversible_lanes_info, outside_left = 0, 0, 0, 0, 0, 0
if dir in src_light_dict.keys():
if 2 in src_light_dict[dir] and (1 in src_light_dict[dir] or 3 in src_light_dict[dir]):
straight_left_split = 1
if roads[dir]['canaliza']['reverse_turn'] == 1:
reverse_turn = 1
if roads[dir]['canaliza']['hold_left'] == 1:
left_turn_waiting_area = 1
if roads[dir]['canaliza']['hold_straight'] == 1:
straight_turn_waiting_area = 1
if len(roads[dir]['canaliza']['reversible_lanes_info']) > 0:
reversible_lanes_info = 1
lane_turn_info = roads[dir]['topology']['entry']['lane_turn_info']
if check_outside_left(lane_turn_info):
outside_left = 1
road_special_info[dir_str_dict[dir]] = {
'straight_left_split': straight_left_split,
'reverse_turn': reverse_turn,
'hold_left': left_turn_waiting_area,
'hold_straight': straight_turn_waiting_area,
'reversible_lanes_info': reversible_lanes_info,
'outside_left': outside_left
}
lane_nums_list.append([dir_str_dict[dir] + '进口', roads[dir]['entry_lane_num'], dir_str_dict[src_reverse[dir]] + '出口', roads[dir]['exit_lane_num']])
if dir_length != '-':
sum_length += dir_length
length_info[dir] = dir_length
avg_length = round(sum_length / len(length_info.keys()), 0) if length_info else 0
cross_static_info = {
'crossid': crossid,
'nodeid': nodeid,
'area_id': area_id,
'location': location,
'slc_company': slc_company_name,
'internet': internet_str,
'cross_model': cross_model,
'lane_nums_list': lane_nums_list,
'road_special_info': road_special_info,
'length_info': length_info,
'avg_length': avg_length
}
return cross_static_info, cross_ledger_info_dict['data']
def gen_ledger_base_info(nodeid, area_id):
node_base_params_info = db_tmnet.query_base_info(nodeid, area_id)
# 下列字段当为空时需在前端展示尚未针对该城市进行配置的同义提示语句
division_list, company_list, slc_company_dict, slckind_list, detector_type_dict = [], [], {}, [], {}
for row in node_base_params_info:
if row['param_type'] == 'division':
division_list.append(row['param_value'])
elif row['param_type'] == 'company':
company_list.append(row['param_value'])
elif row['param_type'] == 'slc_company':
slc_company_dict[row['param_code']] = row['param_value']
elif row['param_type'] == 'slckind':
slckind_list.append(row['param_value'])
elif row['param_type'] == 'detector_type':
detector_type_dict[row['param_code']] = row['param_value']
return division_list, company_list, slc_company_dict, slckind_list, detector_type_dict
def gen_dir_light_info_dict(light_infos):
dir_light_dict = {}
for row in light_infos:
srcDir = row['srcDir']
light_type = row['light_type']
if srcDir not in dir_light_dict.keys():
dir_light_dict[srcDir] = [light_type]
else:
dir_light_dict[srcDir].append(light_type)
return dir_light_dict
def check_outside_left(lane_turn_info):
seen_s = False # 是否已出现直行车道
for func in lane_turn_info:
if 's' in func: # 当前是直行车道
seen_s = True
if seen_s and ('l' in func): # 直行车道右侧出现左转
return True
return False
def gen_road_delay_index(avg_cross_delay_info, roads_dir_dict):
road_delay_infos = []
if not avg_cross_delay_info:
return road_delay_infos
road_delay_infos = avg_cross_delay_info.inroad_delay_infos
road_dir = {v['in']: k for k, v in roads_dir_dict.items()}
road_flow_index = {}
for road_index in road_delay_infos:
roadid = road_index.inroadid
if roadid not in road_dir.keys():
continue
service_level = calc_service_level(road_index.delay_info.delay_time)
road_flow_index[roadid] = {
'src_dir': road_dir[roadid],
'stop_times': round(road_index.delay_info.stop_times, 2),
'high_park_percent': str(road_index.delay_info.high_park_percent) + '%',
'imbalance_index': round(road_index.delay_info.imbalance_index, 2),
'park_time': road_index.delay_info.park_time,
'delay_time': road_index.delay_info.delay_time,
'speed': road_index.delay_info.speed / 100,
'move_speed': road_index.delay_info.move_speed / 100,
'service_level': service_level,
'flow_delays': {}
}
flow_delay_infos = road_index.flow_delay_infos
for flow_delay_info in flow_delay_infos:
if flow_delay_info.turn_type not in (0, 1):
continue
flow_service_level = calc_service_level(flow_delay_info.delay_info.delay_time)
road_flow_index[roadid]['flow_delays'][flow_delay_info.turn_type] = {
'stop_times': round(flow_delay_info.delay_info.stop_times, 2),
'high_park_percent': str(flow_delay_info.delay_info.high_park_percent) + '%',
'park_time': flow_delay_info.delay_info.park_time,
'delay_time': flow_delay_info.delay_info.delay_time,
'speed': flow_delay_info.delay_info.speed / 100,
'service_level': flow_service_level,
'move_speed': flow_delay_info.delay_info.move_speed / 100
}
return road_flow_index
def gen_road_dir_dict(cross_ledger_info_dict):
roads_dir_dict = {}
if not cross_ledger_info_dict:
return roads_dir_dict
roads = cross_ledger_info_dict['roads'] if 'roads' in cross_ledger_info_dict.keys() else None
for dir in roads.keys():
roads_dir_dict[dir] = {
'in': roads[dir]['roadid'],
'out': roads[dir]['out_roadid']
}
return roads_dir_dict
def calc_inroad_imbalance_index(inroad_delay_pb_list):
tmp_list = copy.deepcopy(inroad_delay_pb_list)
for item in tmp_list:
avg_stop_times = item.delay_info.stop_times
max_stop_times, min_stop_times = 0, 99999
flow_delay_infos = item.flow_delay_infos
for flow_delay_info in flow_delay_infos:
if flow_delay_info.delay_info.stop_times > max_stop_times:
max_stop_times = flow_delay_info.delay_info.stop_times
if flow_delay_info.delay_info.stop_times < min_stop_times:
min_stop_times = flow_delay_info.delay_info.stop_times
imbalance_index = round((max_stop_times - min_stop_times) / avg_stop_times, 2) if avg_stop_times > 0 and max_stop_times > 0 and min_stop_times != 99999 else 0
item.delay_info.imbalance_index = imbalance_index
return tmp_list
def gen_flow_turn_rate_index(avg_cross_delay_info, roads_dir_dict):
road_delay_infos = avg_cross_delay_info.inroad_delay_infos
outroad_infos = avg_cross_delay_info.outroad_infos
road_delay_dict = {item.inroadid: item for item in road_delay_infos}
outroad_info_dict = {item.outroadid: item for item in outroad_infos}
cross_sum_car_num = sum([item.delay_info.car_num for item in road_delay_infos])
cross_out_sum_car_num = sum([item.turn_info.car_num for item in outroad_infos])
road_flow_turn_rate = {}
for dir in roads_dir_dict.keys():
roadid = roads_dir_dict[dir]['in']
l_rate, s_rate, r_rate = '-', '-', '-'
out_l_rate, out_s_rate, out_r_rate = '-', '-', '-'
in_flow_rate, out_flow_rate = '-', '-'
if roadid != '-' and roadid in road_delay_dict.keys():
car_num = road_delay_dict[roadid].delay_info.car_num
in_flow_rate = round(car_num / cross_sum_car_num, 2) if cross_out_sum_car_num != 0 else '-'
l_rate = round(road_delay_dict[roadid].delay_info.turn_ratio_1 / car_num, 2) if car_num != 0 else '-'
s_rate = round(road_delay_dict[roadid].delay_info.turn_ratio_0 / car_num, 2) if car_num != 0 else '-'
r_rate = round(road_delay_dict[roadid].delay_info.turn_ratio_2 / car_num, 2) if car_num != 0 else '-'
out_road_id = roads_dir_dict[dir]['out']
if out_road_id != '-' and out_road_id in outroad_info_dict.keys():
out_car_num = outroad_info_dict[out_road_id].turn_info.car_num
out_flow_rate = round(out_car_num / cross_out_sum_car_num, 2) if cross_out_sum_car_num != 0 else '-'
out_l_rate = round(outroad_info_dict[out_road_id].turn_info.turn_ratio_1 / out_car_num, 2) if out_car_num != 0 else '-'
out_s_rate = round(outroad_info_dict[out_road_id].turn_info.turn_ratio_0 / out_car_num, 2) if out_car_num != 0 else '-'
out_r_rate = round(outroad_info_dict[out_road_id].turn_info.turn_ratio_2 / out_car_num, 2) if out_car_num != 0 else '-'
road_flow_turn_rate[roadid] = {
'src_dir': dir,
'in_flow_rate': in_flow_rate,
'out_flow_rate': out_flow_rate,
'l_rate': l_rate,
's_rate': s_rate,
'r_rate': r_rate,
'out_l_rate': out_l_rate,
'out_s_rate': out_s_rate,
'out_r_rate': out_r_rate
}
return road_flow_turn_rate
def calc_service_level(delay_time):
if delay_time <= 10:
service_level = 'A'
elif 10 < delay_time <= 20:
service_level = 'B'
elif 20 < delay_time <= 35:
service_level = 'C'
elif 35 < delay_time <= 55:
service_level = 'D'
elif 55 < delay_time <= 80:
service_level = 'E'
else:
service_level = 'F'
return service_level
def calc_tide_index(crossid, nodeid, date_list, roads_dir_dict, date_type=None):
tide_index_list = []
am_tp_start, am_tp_end = 't700', '900'
pm_tp_start, pm_tp_end = 't1700', '1900'
if date_type == 'week':
date_list = parse_week_str(date_list[0])
am_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, am_tp_start)
pm_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, pm_tp_start)
date_am_delay_info_dict = {item['day']: item['data'] for item in am_cross_delay_data_list}
date_pm_delay_info_dict = {item['day']: item['data'] for item in pm_cross_delay_data_list}
# 构建对向进口道对
subtend_road_pair = gen_subtend_road_pair(roads_dir_dict)
for date in date_list:
max_am_flow, max_pm_flow, tide_index, max_am_flow_road, max_pm_flow_road = 0, 99999999999, 0, '', ''
if date not in date_am_delay_info_dict.keys() or date not in date_pm_delay_info_dict.keys():
tide_index_list.append(tide_index)
continue
item_am_cross_delay_info = pb.xl_cross_delayinfo_t()
item_pm_cross_delay_info = pb.xl_cross_delayinfo_t()
item_am_cross_delay_info.ParseFromString(date_am_delay_info_dict[date])
item_pm_cross_delay_info.ParseFromString(date_pm_delay_info_dict[date])
if not item_am_cross_delay_info or not item_pm_cross_delay_info:
tide_index_list.append(tide_index)
continue
am_inroad_infos = item_am_cross_delay_info.inroad_delay_infos
pm_inroad_infos = item_pm_cross_delay_info.inroad_delay_infos
am_inroad_info_dict = {item.inroadid: item for item in am_inroad_infos}
pm_inroad_info_dict = {item.inroadid: item for item in pm_inroad_infos}
# 找出早高峰最高流量的进口道
for am_inroad_info in am_inroad_infos:
am_flow = am_inroad_info.delay_info.car_num
if am_flow > max_am_flow:
max_am_flow = am_flow
max_am_flow_road = am_inroad_info.inroadid
if max_am_flow_road == '' or max_am_flow_road not in subtend_road_pair.keys():
tide_index_list.append(tide_index)
continue
subtend_road_am_flow = am_inroad_info_dict[subtend_road_pair[max_am_flow_road]].delay_info.car_num if subtend_road_pair[max_am_flow_road] in am_inroad_info_dict.keys() else 0
# 如果进口道流量小于20 则无意义
if min(subtend_road_am_flow, max_am_flow) < 20:
tide_index_list.append(tide_index)
continue
# 找出晚高峰最高流量的进口道
for pm_inroad_info in pm_inroad_infos:
pm_flow = pm_inroad_info.delay_info.car_num
if pm_flow > max_pm_flow:
max_pm_flow = pm_flow
max_pm_flow_road = pm_inroad_info.inroadid
# 如果路段id为空或不在对向对里则无意义
if max_pm_flow_road == '' or max_pm_flow_road not in subtend_road_pair.keys():
tide_index_list.append(tide_index)
continue
subtend_road_pm_flow = pm_inroad_info_dict[subtend_road_pair[max_pm_flow_road]].delay_info.car_num
if min(subtend_road_pm_flow, max_pm_flow) < 20:
tide_index_list.append(tide_index)
continue
# 如果早晚高峰的最大流量进口道不处于同一个对向对中,则无意义
if max_pm_flow_road != max_am_flow_road and max_pm_flow_road != subtend_road_pair[max_am_flow_road]:
tide_index_list.append(tide_index)
continue
Fam = max(max_am_flow_road, subtend_road_am_flow) / min(max_am_flow_road, subtend_road_am_flow)
Fpm = max(max_pm_flow_road, subtend_road_pm_flow) / min(max_pm_flow_road, subtend_road_pm_flow)
if Fam >= 1.5 and Fpm >= 1.5 and max_pm_flow_road == subtend_road_pair[max_am_flow_road]:
tide_index = min(Fam, Fpm) / max(Fam, Fpm)
elif min(Fam, Fpm) <= 0:
tide_index = 0
elif max_pm_flow_road == max_am_flow_road:
tide_index = Fam * Fpm
tide_index_list.append(tide_index)
return tide_index_list
def gen_subtend_road_pair(roads_dir_dict):
subtend_road_pair = {}
tmp_road_dir = copy.deepcopy(roads_dir_dict)
keys_to_process = list(tmp_road_dir.keys())
for dir in keys_to_process:
if dir in tmp_road_dir.keys() and tmp_road_dir[dir]['in'] == '-':
continue
subtend_dir = src_reverse[dir]
if subtend_dir in tmp_road_dir.keys() and tmp_road_dir[subtend_dir]['in'] != '-':
subtend_road_pair[tmp_road_dir[dir]['in']] = tmp_road_dir[subtend_dir]['in']
tmp_road_dir.pop(dir)
tmp_road_dir.pop(subtend_dir)
return subtend_road_pair
def parse_single_cross_delay_info(crossid, nodeid, data_list, data_type, roads_dir_dict):
data_dict = {}
max_cross_car_num, max_road_car_num, max_flow_car_num = 0, 0, 0
pb_data_list = []
for item in data_list:
if item:
pb_data_list.append(item['data'])
road_delay_infos = item['data'].inroad_delay_infos if item['data'] and item['data'].inroad_delay_infos else []
for road_delay_info in road_delay_infos:
if road_delay_info.delay_info.car_num > max_road_car_num:
max_road_car_num = road_delay_info.delay_info.car_num
flow_delay_infos = road_delay_info.flow_delay_infos
for flow_delay_info in flow_delay_infos:
if flow_delay_info.delay_info.car_num > max_flow_car_num:
max_flow_car_num = flow_delay_info.delay_info.car_num
max_cross_car_num_pb = max(
(x for x in pb_data_list if x is not None),
key=lambda x: x.delay_info.car_num,
default=None
)
max_cross_car_num = max_cross_car_num_pb.delay_info.car_num if max_cross_car_num_pb else 0
for item in data_list:
tp_start = item['tp_start']
tp_end = item['tp_end']
day = item['day']
if data_type == 'day':
key = day
elif data_type == 'hour':
key = convert_time(int(tp_start.replace('h', ''))) + '-' + convert_time(int(tp_end.replace('h', '')))
else:
key = day
item_cross_delay_info = item['data']
# 指标内容依次是 停车次数 多次停车率 停车时间 延误时间 平均速度 不停车速度 相对流量 路口失衡系数 潮汐指数 服务水平
overview_data = ['-', '-', '-', '-', '-', '-', '-', '-', '-', '-',
# 下方为上述指标变化率颜色展示flag
0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
tide_index = calc_tide_index(crossid, nodeid, [day], roads_dir_dict, date_type='week' if data_type == 'week' else None) if data_type != 'hour' else '-'
road_data_dict = {}
cross_car_num = 0
if item_cross_delay_info:
cross_car_num = item_cross_delay_info.delay_info.car_num
road_delay_infos = item_cross_delay_info.inroad_delay_infos
max_stop_times = max(road_delay_infos, key=lambda x: x.delay_info.stop_times).delay_info.stop_times
min_stop_times = min(road_delay_infos, key=lambda x: x.delay_info.stop_times).delay_info.stop_times
cross_imbalance_index = round((max_stop_times - min_stop_times) / item_cross_delay_info.delay_info.stop_times, 2) if item_cross_delay_info.delay_info.stop_times != 0 else 0
overview_data = [
# 停车次数
round(item_cross_delay_info.delay_info.stop_times, 2),
# 多次停车率 停车时间
item_cross_delay_info.delay_info.high_park_percent, item_cross_delay_info.delay_info.park_time,
# 延误时间 平均速度
item_cross_delay_info.delay_info.delay_time, item_cross_delay_info.delay_info.speed / 100,
# 不停车速度 相对流量占比
item_cross_delay_info.delay_info.move_speed / 100, round(cross_car_num / max_cross_car_num * 100, 2) if max_cross_car_num != 0 else 0,
# 路口失衡系数 潮汐指数(当为小时级指标时,不计算该值) 服务水平
cross_imbalance_index, tide_index[0], calc_service_level(item_cross_delay_info.delay_info.delay_time),
# 指标变化率颜色展示flag, 不含服务水平
0, 0, 0, 0, 0, 0, 0, 0, 0
]
inroad_delay_infos_with_imbalance = calc_inroad_imbalance_index(road_delay_infos)
road_data_dict = {item.inroadid: item for item in inroad_delay_infos_with_imbalance}
data_dict[key] = {
'overview': overview_data,
'roads_data': {}
}
src_dir_list = list(roads_dir_dict.keys())
for src_dir in src_dir_list:
if roads_dir_dict[src_dir]['in'] != '-' and roads_dir_dict[src_dir]['in'] in road_data_dict.keys():
road_car_num = road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.car_num
src_data = [
# 停车次数
round(road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.stop_times, 2),
# 多次停车率
road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.high_park_percent,
# 转向失衡指数
round(road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.imbalance_index, 2),
# 停车时间
road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.park_time,
# 延误时间
road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.delay_time,
# 平均速度
road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.speed / 100,
# 不停车速度
road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.move_speed / 100,
# 相对流量占比
round(road_car_num / max_road_car_num * 100, 2) if max_road_car_num > 0 else '-',
# 进口道流量占比
round(road_car_num / cross_car_num * 100, 2) if cross_car_num > 0 else '-',
# 服务水平
calc_service_level(road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.delay_time),
# 指标变化率颜色展示flag, 顺序为上述指标顺序, 不含服务水平
0, 0, 0, 0, 0, 0, 0, 0, 0
]
flow_delay_infos = road_data_dict[roads_dir_dict[src_dir]['in']].flow_delay_infos
flow_data_dict = {}
for flow_delay_info in flow_delay_infos:
turn_type = flow_delay_info.turn_type
if turn_type not in (0, 1):
continue
flow_data = [
# 停车次数
round(flow_delay_info.delay_info.stop_times, 2),
# 多次停车率
flow_delay_info.delay_info.high_park_percent,
# 停车时间
flow_delay_info.delay_info.park_time,
# 延误时间
flow_delay_info.delay_info.delay_time,
# 平均速度
flow_delay_info.delay_info.speed / 100,
# 不停车速度
flow_delay_info.delay_info.move_speed / 100,
# 相对流量
round(flow_delay_info.delay_info.car_num / max_flow_car_num * 100, 2) if max_flow_car_num > 0 else '-',
# 分流转向占比
round(flow_delay_info.delay_info.car_num / road_car_num * 100, 2) if road_car_num > 0 else '-',
# 服务水平
calc_service_level(flow_delay_info.delay_info.delay_time),
# 指标变化率颜色展示flag 不含服务水平
0, 0, 0, 0, 0, 0, 0, 0
]
flow_data_dict[turn_type] = flow_data
data_dict[key]['roads_data'][src_dir] = {
'road': src_data,
'flow': flow_data_dict
}
return data_dict
def calc_single_day_delay_info_change_rate(data_dict):
res_data_dict = copy.deepcopy(data_dict)
key_list = list(res_data_dict.keys())
empty_overview = ['-', '-', '-', '-', '-', '-', '-', '-', '-', '-',
# 指标变化率颜色展示flag
0, 0, 0, 0, 0, 0, 0, 0, 0]
for i in range(len(key_list)-1, -1, -1):
overview_data = res_data_dict[key_list[i]]['overview'] # list
roads_data = res_data_dict[key_list[i]]['roads_data'] # dict 内部包含多个方向路段和各个方向路段的流向级别的数据
if i != 0:
prev_item_index = i - 1
prev_overview_data = res_data_dict[key_list[prev_item_index]]['overview']
prev_roads_data = res_data_dict[key_list[prev_item_index]]['roads_data']
if overview_data != empty_overview and prev_overview_data != empty_overview:
overview_data_with_change_rate = calc_overview_change_rate(overview_data, prev_overview_data)
res_data_dict[key_list[i]]['overview'] = overview_data_with_change_rate
roads_data_wit_change_rate = calc_roads_data_change_rate(roads_data, prev_roads_data)
res_data_dict[key_list[i]]['roads_data'] = roads_data_wit_change_rate
return res_data_dict
def calc_overview_change_rate(overview_data, prev_overview_data):
res_data = copy.copy(overview_data)
for i in range(len(overview_data)):
if i >= 9:
continue
if overview_data[i] == '-' or prev_overview_data[i] == '-':
res_data[i + 10] = 0
# 变化率 0表示正常 1表示恶化 2表示优化
if i in (0, 1, 2, 3, 6, 7, 8):
# 指标提升代表恶化的有停车次数、多次停车率、转向失衡指数、停车时间、延误时间、相对流量、进口道流量占比
if overview_data[i] == '-' or prev_overview_data[i] == '-':
continue
rate = (overview_data[i] - prev_overview_data[i]) / prev_overview_data[i] * 100 if prev_overview_data[i] > 0 else 0
if rate < -20:
res_data[i + 10] = 1
elif rate > 20:
res_data[i + 10] = 2
else:
# 指标下降代表恶化的有平均速度、不停车速度
if overview_data[i] == '-' or prev_overview_data[i] == '-':
continue
rate = (overview_data[i] - prev_overview_data[i]) / prev_overview_data[i] * 100 if prev_overview_data[i] > 0 else 0
if rate > 20:
res_data[i + 10] = 1
elif rate < -20:
res_data[i + 10] = 2
return res_data
def calc_roads_data_change_rate(roads_data, prev_roads_data):
res_data = copy.deepcopy(roads_data)
src_dir_list = list(roads_data.keys())
for src_dir in src_dir_list:
src_dir_road_data = roads_data[src_dir]['road']
if src_dir not in prev_roads_data.keys():
continue
# 指标提升代表恶化的有停车次数、多次停车率、停车时间、延误时间、相对流量、分流转向占比
prev_src_dir_road_data = prev_roads_data[src_dir]['road']
for i in range(len(src_dir_road_data)):
if i >= 9:
continue
if src_dir_road_data[i] == '-' or prev_src_dir_road_data[i] == '-':
src_dir_road_data[i + 10] = 0
if i in (0, 1, 2, 3, 4, 7, 8):
rate = (src_dir_road_data[i] - prev_src_dir_road_data[i]) / prev_src_dir_road_data[i] * 100 if prev_src_dir_road_data[i] > 0 else 0
if rate < -20:
src_dir_road_data[i + 10] = 1
elif rate > 20:
src_dir_road_data[i + 10] = 2
else:
# 指标下降代表恶化的有平均速度、不停车速度
rate = (src_dir_road_data[i] - prev_src_dir_road_data[i]) / prev_src_dir_road_data[i] * 100 if prev_src_dir_road_data[i] > 0 else 0
if rate > 20:
src_dir_road_data[i + 10] = 1
elif rate < -20:
src_dir_road_data[i + 10] = 2
flow_datas = roads_data[src_dir]['flow']
prev_flow_datas = prev_roads_data[src_dir]['flow']
for turn_type in flow_datas.keys():
flow_data = flow_datas[turn_type]
if turn_type not in prev_flow_datas.keys():
continue
prev_flow_data = prev_flow_datas[turn_type]
for i in range(len(flow_data)):
if i >= 8:
continue
if flow_data[i] == '-' or prev_flow_data[i] == '-':
flow_data[i + 9] = 0
if i in (0, 1, 2, 3, 6, 7):
rate = (flow_data[i] - prev_flow_data[i]) / prev_flow_data[i] * 100 if prev_flow_data[i] > 0 else 0
if rate < -20:
flow_data[i + 9] = 1
elif rate > 20:
flow_data[i + 9] = 2
else:
rate = (flow_data[i] - prev_flow_data[i]) / prev_flow_data[i] * 100 if prev_flow_data[i] > 0 else 0
if rate > 20:
flow_data[i + 9] = 1
elif rate < -20:
flow_data[i + 9] = 2
return res_data
def parse_data2pb(data_list):
res_list = []
for row in data_list:
day = row['day']
tp_start = row['tp_start']
tp_end = row['tp_end']
item_cross_delay_info = pb.xl_cross_delayinfo_t()
item_cross_delay_info.ParseFromString(row['data'])
res_list.append({
'day': day,
'tp_start': tp_start,
'tp_end': tp_end,
'data': item_cross_delay_info
})
return res_list
# 生成路口诊断问题
def gen_cross_problems(crossid, nodeid, area_id, time_range, start_hm, date_list, avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict, cross_phase, is_peak, cross_ledger_info):
# 运行效率、均衡调控、配时方案、路口渠化
operating_efficiency_problems = gen_operating_efficiency_problems(avg_cross_delay_info, roads_dir_dict, cross_phase, is_peak)
balanced_control_problems = gen_balanced_control_problems(crossid, nodeid, date_list, avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict, cross_phase)
phase_problems = gen_phase_problems(nodeid, area_id, crossid, time_range, date_list, min(date_list), max(date_list), start_hm)
cross_channelized_problems = gen_cross_channelized_problems(avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict, cross_ledger_info)
problems = {
'operating_efficiency_problems': operating_efficiency_problems,
'balanced_control_problems': balanced_control_problems,
'phase_problems': phase_problems,
'cross_channelized_problems': cross_channelized_problems
}
return problems
# 生成运行效率诊断问题结果
def gen_operating_efficiency_problems(avg_cross_delay_info, roads_dir_dict, cross_phase, is_peak):
if not avg_cross_delay_info:
return [{
'item': '运行效率',
'values': []
}]
road_delay_infos = avg_cross_delay_info.inroad_delay_infos
high_park_problems, high_park_suggestions, high_park_total_num = gen_high_park_problems(road_delay_infos, roads_dir_dict, cross_phase)
high_stop_times_problems, high_stop_times_suggestions, high_stop_times_total_num = gen_high_stop_time_problems(avg_cross_delay_info, is_peak)
operating_efficiency_problems = {
'item': '运行效率',
'values': [
{
'item': '多次排队',
'detail': high_park_problems,
'reason': '某一进口道转向的多次停车率大于15%',
'suggestions': high_park_suggestions
},
{
'item': '停车较多',
'detail': high_stop_times_problems,
'reason': '高峰时段路口停车次数大于2次非高峰时段停车次数大于1次',
'suggestions': high_stop_times_suggestions
}
],
'total_num': high_park_total_num + high_stop_times_total_num
}
return operating_efficiency_problems
# 运行效率-多次排队
def gen_high_park_problems(road_delay_infos, roads_dir_dict, cross_phase):
detail = []
suggestion, total_num = [], 0
err_src_dict = {}
src_list = list(roads_dir_dict.keys())
road_delays_dict = {item.inroadid: item for item in road_delay_infos}
for src_dir in src_list:
if roads_dir_dict[src_dir]['in'] == '-' or roads_dir_dict[src_dir]['in'] not in road_delays_dict.keys():
continue
road_info = road_delays_dict[roads_dir_dict[src_dir]['in']]
flow_delay_infos = road_info.flow_delay_infos
turn_type_flow_delay_info_dict = {item.turn_type: item for item in flow_delay_infos}
for turn_type in turn_type_flow_delay_info_dict.keys():
flow_delay_info = turn_type_flow_delay_info_dict[turn_type]
if turn_type not in (0, 1):
continue
if flow_delay_info.delay_info.high_park_percent > 15:
if src_dir not in err_src_dict.keys():
err_src_dict[src_dir] = [str(turn_type) + ':' + str(flow_delay_info.delay_info.high_park_percent)]
else:
err_src_dict[src_dir].append(str(turn_type) + ':' + str(flow_delay_info.delay_info.high_park_percent))
if err_src_dict:
detail = []
for src_dir in err_src_dict.keys():
total_num = 1
src_detail, src_suggestions, turn_type_list = [], [], []
for turn_type in err_src_dict[src_dir]:
turn_type_list.append(int(turn_type.split(':')[0]))
flow_detail = {
'turn_type': int_turn_type2str[int(turn_type.split(':')[0])],
'text': '车辆多次停车率过大(' + turn_type.split(':')[1] + '%'
}
src_detail.append(flow_detail)
src_detail.append({
'text': '车辆需要多次排队才能通过'
})
detail.append({
'src_dir': dir_str_dict[src_dir] + '进口',
'child_detail': src_detail
})
turn_type_str = ''.join([int_turn_type2str[int(item.split(':')[0])] for item in err_src_dict[src_dir]])
src_suggestions.append(
{
'text': '增加'
}
)
src_suggestions.append({
'src_dir': dir_str_dict[src_dir] + '进口' + turn_type_str,
'text': '所属相位的绿灯时间'
})
phase_suggestions = get_flow_phase_detail(src_dir, turn_type_list, cross_phase)
suggestion.append({
'child_detail': src_suggestions
})
suggestion.extend(phase_suggestions)
return detail, suggestion, total_num
# 运行效率-停车较多
def gen_high_stop_time_problems(avg_cross_delay_info, is_peak):
detail = []
suggestion, total_num = [], 0
desc, max_stop_times = '非高峰时段', 1
if is_peak:
# 表示为高峰时段,否则为平峰时段
desc, max_stop_times = '高峰时段', 2
if avg_cross_delay_info:
if avg_cross_delay_info.delay_info.stop_times > max_stop_times:
total_num = 1
detail = [{
'child_detail': [
{
'desc': desc,
'text': '车辆多次停车时长过长(' + str(avg_cross_delay_info.delay_info.stop_times) + '),整体运行效率不高',
}
]
}]
suggestion = [
{
'child_detail': [
{
'text': '对路口',
'desc': desc
},
{
'text': '配时方案进行优化'
}
]
}
]
return detail, suggestion, total_num
# 均衡调控
def gen_balanced_control_problems(crossid, nodeid, date_list, avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict, cross_phase):
road_delay_infos = avg_cross_delay_info.inroad_delay_infos
cross_imbalance_detail, cross_imbalance_suggestions, cross_imbalance_total_num = gen_cross_imbalance_problems(road_delay_infos, roads_dir_dict, cross_phase)
turn_imbalance_detail, turn_imbalance_suggestions, turn_imbalance_total_num = gen_turn_imbalance_problems(road_delay_infos, roads_dir_dict, inroad_static_info_dict, cross_phase)
cross_tide_problems, cross_tide_suggestions, cross_tide_total_num = gen_cross_tide_problems(crossid, nodeid, date_list, roads_dir_dict)
balanced_control_problems = {
'item': '均衡调控',
'values': [
{
'item': '路口失衡',
'detail': cross_imbalance_detail,
'reason': '路口存在某个流向绿灯时长不足而另一个方向绿灯存在空放的现象',
'suggestions': cross_imbalance_suggestions
},
{
'item': '转向失衡',
'detail': turn_imbalance_detail,
'reason': '同一进口道直行与左转停车次数之差的绝对值大于0.5且转向停车次数的最大值大于1',
'suggestions': turn_imbalance_suggestions
},
{
'item': '路口潮汐',
'detail': cross_tide_problems,
'reason': '对向进口道早高峰其中一个方向的进口道与出口道流量比大于150%晚高峰另一个方向进口道与出口道流量比大于150%',
'suggestions': cross_tide_suggestions
}
],
'total_num': cross_imbalance_total_num + turn_imbalance_total_num + cross_tide_total_num
}
return balanced_control_problems
# 均衡调控-路口失衡问题诊断
def gen_cross_imbalance_problems(road_delay_infos, roads_dir_dict, cross_phase):
detail = []
suggestion, total_num = [], 0
road_src_dict = {v['in']: k for k, v in roads_dir_dict.items()}
max_stop_times_road = max(road_delay_infos, key=lambda x: x.delay_info.stop_times)
min_stop_times_road = min(road_delay_infos, key=lambda x: x.delay_info.stop_times)
if max_stop_times_road.delay_info.stop_times > 1 \
and max_stop_times_road.delay_info.stop_times - min_stop_times_road.delay_info.stop_times > 0.5:
max_roadid = max_stop_times_road.inroadid
min_roadid = min_stop_times_road.inroadid
max_src = road_src_dict[max_roadid] if max_roadid in road_src_dict else None
min_src = road_src_dict[min_roadid] if min_roadid in road_src_dict else None
if max_src is None or min_src is None:
return detail, suggestion, total_num
total_num = 1
detail = [
{
'child_detail': [
{
'src_dir': dir_str_dict[max_src] + '进口',
'text': '的停车次数(' + str(round(max_stop_times_road.delay_info.stop_times, 2)) + ')与'
},
{
'src_dir': dir_str_dict[min_src] + '进口',
'text': f"""的停车次数({str(round(max_stop_times_road.delay_info.stop_times, 2))})相差过大,两者之比为{int(round(max_stop_times_road.delay_info.stop_times, 2) / round(min_stop_times_road.delay_info.stop_times, 2) * 100)}%,分配的绿灯时长不匹配"""
}
]
}
]
suggestion = [
{
'child_detail': [
{
'text': '调整',
'max_src_dir': dir_str_dict[max_src] + '进口',
},
{
'text': '',
'min_src_dir': dir_str_dict[min_src] + '进口',
},
{
'text': '的绿灯时长的分配情况'
}
]
}
]
is_tide = False
if max_src == src_reverse[min_src]:
is_tide = True
phase_suggestions = gen_src_dir_phase_detail(max_src, min_src, cross_phase, is_tide=is_tide)
suggestion.extend(phase_suggestions)
return detail, suggestion, total_num
# 均衡调控-转向失衡问题诊断
def gen_turn_imbalance_problems(road_delay_infos, roads_dir_dict, inroad_static_info_dict, cross_phase):
detail = []
suggestion, total_num = [], 0
err_road_dict = {}
road_src_dict = {v['in']: k for k, v in roads_dir_dict.items()}
for road_delay_info in road_delay_infos:
inroadid = road_delay_info.inroadid
src_dir = road_src_dict[inroadid] if inroadid in road_src_dict.keys() else None
if src_dir is None:
continue
flow_delay_infos = road_delay_info.flow_delay_infos
turn_type_flow_delay_info_dict = {item.turn_type: item for item in flow_delay_infos}
lane_num_info = count_lsr(inroad_static_info_dict[inroadid]['lane_turn_info']) if inroadid in inroad_static_info_dict.keys() else None
if 0 in turn_type_flow_delay_info_dict.keys() and 1 in turn_type_flow_delay_info_dict.keys():
s_stop_times = turn_type_flow_delay_info_dict[0].delay_info.stop_times
l_stop_times = turn_type_flow_delay_info_dict[1].delay_info.stop_times
if s_stop_times > 1 or l_stop_times > 1 and abs(l_stop_times - s_stop_times) > 0.5:
if s_stop_times > l_stop_times:
err_road_dict[road_src_dict[inroadid]] = ['直行' + ':' + str(round(s_stop_times, 2)), '左转' + ':' + str(round(l_stop_times, 2)), lane_num_info]
else:
err_road_dict[road_src_dict[inroadid]] = ['左转' + ':' + str(round(s_stop_times, 2)), '直行' + ':' + str(round(l_stop_times, 2)), lane_num_info]
if len(err_road_dict.keys()) > 0:
detail, suggestion, road_num_suggestion = [], [], []
for src_dir in err_road_dict.keys():
total_num = 1
item_detail = [
{
'src_dir': dir_str_dict[src_dir] + '进口',
'child_detail': [
{
'turn_type': err_road_dict[src_dir][0].split(':')[0],
},
{
'text': '的停车次数(' + str(err_road_dict[src_dir][0].split(':')[1]) + ')与'
},
{
'turn_type': err_road_dict[src_dir][1].split(':')[0],
},
{
'text': '的停车次数(' + str(err_road_dict[src_dir][1].split(':')[1]) + ')相差过大,两者之比为' + str(int(float(err_road_dict[src_dir][0].split(':')[1]) / float(err_road_dict[src_dir][1].split(':')[1]) * 100)) + '%,分配的绿灯时长不匹配'
}
]
}
]
detail.append(item_detail)
item_suggestion = [
{
'child_detail': [
{
'src_dir': dir_str_dict[src_dir] + '进口',
'text': '信号灯直左分控,调整直行和左转车辆绿灯时长分配情况'
}
]
}
]
suggestion.extend(item_suggestion)
phase_suggestion = get_flow_phase_detail(src_dir, ['直行', '左转'], cross_phase)
suggestion.extend(phase_suggestion)
if err_road_dict[src_dir][2]:
road_num_suggestion.append(
{
'child_detail': [
{
'src_dir': dir_str_dict[src_dir] + '进口',
'text': '左转/直行/右转现有车道数分别为' + err_road_dict[src_dir][2]
}
]
}
)
lane_num_suggestion = [
{
'child_detail': [
{
'text': '调整',
'src_dir': ''.join([dir_str_dict[item] + '进口' for item in err_road_dict.keys()])
},
{
'text': '车道分配情况'
}
]
}
]
lane_num_suggestion.extend(road_num_suggestion)
suggestion = suggestion + lane_num_suggestion
return detail, suggestion, total_num
# 均衡调控-路口潮汐问题诊断
def gen_cross_tide_problems(crossid, nodeid, date_list, roads_dir_dict):
detail = []
suggestions, total_num = [], 0
am_tp_start, am_tp_end = 't700', '900'
pm_tp_start, pm_tp_end = 't1700', '1900'
am_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, am_tp_start)
pm_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, pm_tp_start)
am_avg_cross_delay_info = gen_avg_cross_delay_pb(am_cross_delay_data_list)
pm_avg_cross_delay_info = gen_avg_cross_delay_pb(pm_cross_delay_data_list)
# 构建对向进口道对
subtend_road_pair = gen_subtend_road_pair(roads_dir_dict)
road_src_dict = {v['in']: k for k, v in roads_dir_dict.items()}
max_am_flow, max_pm_flow, tide_index, max_am_flow_road, max_pm_flow_road = 0, 99999999999, 0, '', ''
am_inroad_infos = am_avg_cross_delay_info.inroad_delay_infos
pm_inroad_infos = pm_avg_cross_delay_info.inroad_delay_infos
am_inroad_info_dict = {item.inroadid: item for item in am_inroad_infos}
pm_inroad_info_dict = {item.inroadid: item for item in pm_inroad_infos}
# 找出早高峰最高流量的进口道
for am_inroad_info in am_inroad_infos:
am_flow = am_inroad_info.delay_info.car_num
if am_flow > max_am_flow:
max_am_flow = am_flow
max_am_flow_road = am_inroad_info.inroadid
subtend_road_am_flow = 0
if max_am_flow_road in subtend_road_pair.keys() and subtend_road_pair[max_am_flow_road] in am_inroad_info_dict.keys():
subtend_road_am_flow = am_inroad_info_dict[subtend_road_pair[max_am_flow_road]].delay_info.car_num
# 如果进口道流量小于20 则无意义
if min(subtend_road_am_flow, max_am_flow) < 20:
return detail, suggestions, total_num
# 找出晚高峰最高流量的进口道
for pm_inroad_info in pm_inroad_infos:
pm_flow = pm_inroad_info.delay_info.car_num
if pm_flow > max_pm_flow:
max_pm_flow = pm_flow
max_pm_flow_road = pm_inroad_info.inroadid
# 如果路段id为空或不在对向对里则无意义
if max_pm_flow_road == '' or max_pm_flow_road not in subtend_road_pair.keys():
return detail, suggestions, total_num
subtend_road_pm_flow = pm_inroad_info_dict[subtend_road_pair[max_pm_flow_road]].delay_info.car_num
if min(subtend_road_pm_flow, max_pm_flow) < 20:
return detail, suggestions, total_num
Fam = max(max_am_flow_road, subtend_road_am_flow) / min(max_am_flow_road, subtend_road_am_flow)
Fpm = max(max_pm_flow_road, subtend_road_pm_flow) / min(max_pm_flow_road, subtend_road_pm_flow)
if Fam >= 1.5 and Fpm >= 1.5 and max_pm_flow_road == subtend_road_pair[max_am_flow_road]:
total_num = 1
src_dir = road_src_dict[max_am_flow_road]
full_src_dir = dir_str_dict[src_dir] + dir_str_dict[src_reverse[src_dir]] + '方向'
am_src_dir = dir_str_dict[src_dir] + '' + dir_str_dict[src_reverse[src_dir]]
pm_src_dir = dir_str_dict[src_reverse[src_dir]] + '' + dir_str_dict[src_dir]
detail = [{
'src_dir': full_src_dir,
'child_detail': [
{
'text': '早高峰'
},
{
'src_dir': am_src_dir,
'text': '进口道与出口道流量比为' + str(int(Fam * 100)) + '%,晚高峰'
},
{
'src_dir': pm_src_dir,
'text': '进口道与出口道流量比为' + str(int(Fpm * 100)) + '%,存在潮汐现象'
}
]
}]
suggestions = [
{
'child_detail': [
{
'text': '根据主车流方向调整绿灯时长分配'
}
]
},
{
'child_detail': [
{
'src_dir': full_src_dir,
'text': '机动车车道数双向均为3车道及以上可以考虑设置潮汐车道'
}
]
}
]
return detail, suggestions, total_num
# 配时方案
def gen_phase_problems(nodeid, area_id, crossid, time_range, date_list, min_date, max_date, start_hm):
phase_problems = {
'item': '配时方案',
'values': [],
'total_num': 0
}
phase_problems_detail, err = QueryCrossPhaseDiagnosis(int(nodeid), crossid, [str(item) for item in date_list], time_range, int(area_id))
if not err:
phase_problems = phase_problems_detail
phase_err_detail, phase_err_suggestions, phase_err_total_num = gen_err_phase_problems(start_hm, max_date, crossid, min_date)
if len(phase_err_suggestions) > 0:
phase_problems['values'].append({
'item': '路口方案异常',
'detail': phase_err_detail,
'reason': '路口停车次数徒增或大数据计算方案与录入方案不一致',
'suggestions': phase_err_suggestions
})
total = phase_problems_detail['total'] if len(phase_problems_detail) > 0 else 0
phase_problems['total_num'] = total + phase_err_total_num
return phase_problems
def gen_err_phase_problems(start_hm, max_date, crossid, min_date):
cross_examine_records = db_cross.query_cross_examine_records(start_hm, max_date, crossid, min_date)
detail = []
suggestions, total_num = [], 0
color_dict = {
1: '红色',
2: '橙色',
3: '黄色',
4: '绿色'
}
state_dict = {
1: '需核查逾期',
2: '需核查',
3: '监测中',
4: '核查有异常',
5: '核查无异常',
6: '自动结束'
}
if len(cross_examine_records) > 0:
total_num = 1
detail = []
suggestions = [
{
'child_detail': [
{
'text': '核查该时段配时方案是否正常运行'
}
]
}
]
for cross_examine_record in cross_examine_records:
if cross_examine_record['level_color'] == 4 or cross_examine_record['final_state'] == 6:
continue
first_date = cross_examine_record['first_date']
start_hm = convert_time(cross_examine_record['start_hm'])
end_hm = convert_time(cross_examine_record['end_hm'])
color_level = color_dict[cross_examine_record['level_color']]
final_state = state_dict[cross_examine_record['final_state']]
item_detail = [
{
'text': f'{first_date}起在{start_hm}-{end_hm}时段运行的配时方案有异常变动风险,异常等级为'
},
{
'color': color_level,
'text': f',处理状态为',
'final_state': final_state
}
]
detail.append({
'child_detail': item_detail
})
return detail, suggestions, total_num
# 路口渠化
def gen_cross_channelized_problems(avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict, cross_ledger_info):
road_delay_infos = avg_cross_delay_info.inroad_delay_infos
inroad_num_detail, inroad_num_suggestion, inroad_num_err_total_num = gen_inroad_num_problems(road_delay_infos, inroad_static_info_dict, roads_dir_dict)
inout_lane_num_gap_problems, inout_lane_num_gap_suggestions, inout_lane_num_gap_total_num = gen_in_out_lane_num_gap_problems(cross_ledger_info)
cross_channelized_problems = {
'item': '路口渠化',
'values': [
{
'item': '车道资源不匹配',
'detail': inroad_num_detail,
'reason': '同一进口道直行或左转的流量占比大于50%且相对流量占比大于60%且车道数量占比小于流量占比的一半多方向放行车道每个方向各算0.5',
'suggestions': inroad_num_suggestion
},
{
'item': '进出口车道数不匹配',
'detail': inout_lane_num_gap_problems,
'reason': '进口道车道数比对向的出口道车道数多2',
'suggestions': inout_lane_num_gap_suggestions
},
],
'total_num': inroad_num_err_total_num + inout_lane_num_gap_total_num
}
return cross_channelized_problems
# 路口渠化-车道资源不匹配
def gen_inroad_num_problems(road_delay_infos, inroad_static_info_dict, roads_dir_dict):
detail = []
suggestions, err_src_dirs, total_num = [], {}, 0
road_src_dict = {v['in']: k for k, v in roads_dir_dict.items()}
max_flow_car_num = 0
for road_delay in road_delay_infos:
inroadid = road_delay.inroadid
src_dir = road_src_dict[inroadid] if inroadid in road_src_dict else None
if src_dir is None:
continue
lane_info = inroad_static_info_dict[inroadid]['lane_turn_info']
flow_delay_infos = road_delay.flow_delay_infos
max_car_num = max([item.delay_info.car_num for item in flow_delay_infos if item.turn_type in [0, 1]])
if max_car_num > max_flow_car_num:
max_flow_car_num = max_car_num
road_car_num = road_delay.delay_info.car_num
turn_delay_dict = {item.turn_type: item for item in flow_delay_infos}
left_lane_num_half, straight_lane_num_half, right_lane_num_half = count_lsr_half(lane_info)
lane_num_info = count_lsr(lane_info)
sum_half_num = left_lane_num_half + straight_lane_num_half + right_lane_num_half
left_lane_num_rate = round(left_lane_num_half / sum_half_num) if sum_half_num > 0 else 0
straight_lane_num_rate = round(straight_lane_num_half / sum_half_num) if sum_half_num > 0 else 0
for turn_type in [0, 1]:
if turn_type not in turn_delay_dict.keys():
continue
if turn_type == 0:
turn_type_str = '直行'
lane_num_rate = straight_lane_num_rate
else:
turn_type_str = '左转'
lane_num_rate = left_lane_num_rate
flow_car_num = turn_delay_dict[turn_type].delay_info.car_num
relative_rate = flow_car_num / max_flow_car_num
flow_rate = flow_car_num / road_car_num
if relative_rate > 0.6 and lane_num_rate < 0.5:
if src_dir in err_src_dirs.keys():
err_src_dirs[src_dir].append({
'turn_type': turn_type_str,
'flow_rate': flow_rate,
'lane_num_rate': lane_num_rate,
'lane_num_info': lane_num_info
})
else:
err_src_dirs[src_dir] = [{
'turn_type': turn_type_str,
'flow_rate': flow_rate,
'lane_num_rate': lane_num_rate,
'lane_num_info': lane_num_info
}]
if len(err_src_dirs.keys()) > 0:
detail = []
total_num = 1
for src_dir in err_src_dirs.keys():
err_road_dict = err_src_dirs[src_dir]
src_detail = [
{
'src_dir': dir_str_dict[src_dir] + '进口'
}
]
for turn_type in err_road_dict:
src_detail.append({
'child_detail': [
{
'turn_type': turn_type['turn_type'],
'text': '分流转向比过大(' + str(int(round(turn_type['flow_rate'], 2) * 100)) + '%),但分配的转向车道占比不足(' + str(int(round(turn_type['lane_num_rate'], 2) * 100)) + '%),分配车道资源不匹配'
}
]
})
detail.append(
{
'child_detail': src_detail
}
)
suggestions.append({
'child_detail': [
{
'text': '调整',
'src_dir': dir_str_dict[src_dir] + '进口'
},
{
'text': '车道分配情况'
}
]
})
suggestions.append(
{
'child_detail': [
{
'src_dir': dir_str_dict[src_dir] + '进口',
'text': f"左转/直行/右转现有车道数分别为{err_src_dirs[src_dir][0]['lane_num_info']}"
}
]
}
)
return detail, suggestions, total_num
# 路口渠化-进出口道数不匹配
def gen_in_out_lane_num_gap_problems(cross_ledger_info):
detail = []
suggestions, err_src_dict, total_num = [], {}, 0
road_infos = cross_ledger_info['roads']
for src_dir in road_infos.keys():
entry_lane_num = road_infos[src_dir]['entry_lane_num']
exit_lane_num = '-'
if src_reverse[src_dir] in road_infos.keys():
exit_lane_num = road_infos[src_reverse[src_dir]]['exit_lane_num']
if exit_lane_num != '-' and entry_lane_num != '-' and entry_lane_num - exit_lane_num > 2:
err_src_dict[src_dir] = {
'in': entry_lane_num,
'out': exit_lane_num
}
if len(err_src_dict.keys()) > 0:
detail = []
total_num = 1
suggestions = {
'child_detail': [
{
'text': '预留全红时间清空积压车辆'
}
]
}
for src_dir in err_src_dict.keys():
detail.append([
{
'src_dir': dir_str_dict[src_dir] + '进口'
},
{
'child_detail': [
{
'text': '进口道车道数(' + str(err_src_dict[src_dir]['in']) + ')比对向出口道车道数(' + str(err_src_dict[src_dir]['out']) + ')多,车辆冰岛减速可能会造成路口车辆积压'
}
]
}
])
return detail, suggestions, total_num
def check_is_peak_tp(time_range, area_id, nodeid):
tp_desc = db_tmnet.query_city_tp_info(nodeid, area_id)
peak_tp = ['07:00-09:00', '17:00-19:00']
if tp_desc and len(tp_desc) > 0:
peak_tp = tp_desc[0]['peak_tp'].split(',')
return time_overlap(time_range, peak_tp)
def get_flow_phase_detail(src_dir, turn_type_list, cross_phase):
phase_suggestion = []
if not cross_phase:
return phase_suggestion
for item in cross_phase:
# 单个元素的含义为一个周计划
schedule_suggestion = []
need_src_flow = {dir_str_dict[src_dir] + int_turn_type2str[item]: 0 for item in turn_type_list}
schedule_name = item.schedule_name
tp_info = item.tps[0]
planid = tp_info.planid
plan_name = tp_info.plan_name
stage_list = tp_info.stages
for stage in stage_list:
phases_name = stage.phases_name
phases_name_list = []
if phases_name != '':
phases_name_list = phases_name.split(',')
green = stage.green
for key in need_src_flow.keys():
if key in phases_name_list:
need_src_flow[key] += green
need_src_flow = {k: v for k, v in need_src_flow.items() if v != 0}
if len(need_src_flow.keys()) > 0:
schedule_suggestion.append(
{
'phase': f"{schedule_name}{str(planid)}-{plan_name}",
'text': '现有的'
}
)
keys = list(need_src_flow.keys())
for i in range(len(keys)):
if i != len(keys) - 1:
schedule_suggestion.append(
{
'src_dir': keys[i],
'text': '绿灯时长为' + str(keys[i]) + 's'
}
)
else:
schedule_suggestion.append(
{
'src_dir': keys[i],
'text': '绿灯时长为' + str(keys[i]) + 's'
}
)
phase_suggestion.append({
'child_detail': schedule_suggestion
})
return phase_suggestion
def gen_src_dir_phase_detail(max_src_dir, min_src_dir, cross_phase, is_tide=False):
phase_suggestion = []
for item in cross_phase:
# 单个元素的含义为一个周计划
schedule_suggestion = []
schedule_name = item.schedule_name
src_green = {
dir_str_dict[max_src_dir]: 0,
dir_str_dict[min_src_dir]: 0
}
tp_info = item.tps[0]
planid = tp_info.planid
plan_name = tp_info.plan_name
stage_list = tp_info.stages
for stage in stage_list:
phases_name = stage.phases_name
src_dir_set = set()
if phases_name != '':
tmp_list = phases_name.split(',')
for phase in tmp_list:
if '行人' in phase:
continue
src_dir_set.add(phase[:-2])
if dir_str_dict[max_src_dir] in src_dir_set:
src_green[dir_str_dict[max_src_dir]] += stage.green
if dir_str_dict[min_src_dir] in src_dir_set:
src_green[dir_str_dict[min_src_dir]] += stage.green
if src_green[dir_str_dict[max_src_dir]] != 0 and src_green[dir_str_dict[min_src_dir]] != 0:
schedule_suggestion = [
{
'phase': f"{schedule_name}{str(planid)}-{plan_name}",
'text': '现有的'
},
{
'src_dir': dir_str_dict[max_src_dir] + '进口',
'text': '绿灯时长为' + str(src_green[dir_str_dict[max_src_dir]]) + 's'
},
{
'src_dir': dir_str_dict[min_src_dir] + '进口',
'text': '绿灯时长为' + str(src_green[dir_str_dict[min_src_dir]]) + 's'
}
]
if schedule_suggestion:
phase_suggestion.append({
'child_detail': schedule_suggestion
})
need2sug = False
if is_tide:
for item in cross_phase:
stages_info = item.tps[0].stages
for stage in stages_info:
phases_name = stage.phases_name
if dir_str_dict[max_src_dir] + '直行' in phases_name\
and dir_str_dict[min_src_dir] + '直行' in phases_name\
and dir_str_dict[max_src_dir] + '左转' in phases_name\
and dir_str_dict[min_src_dir] + '左转' in phases_name:
need2sug = True
if need2sug:
phase_suggestion.append({
'text': '相位放行方式可以考虑从对称放行调整为单路口放行'
})
return phase_suggestion
def count_lsr_half(turn_str):
"""
统计车道转向中左转(l)、直行(s)、右转(r)的数量。
当一个车道有多个转向时每个转向计0.5。
Args:
turn_str: 形如 "2|1|1|1|5" 的字符串,每个数字对应 g_turn2str 中的键。
Returns:
形如 "1.0/3.0/1.0" 的字符串,顺序为左转、直行、右转的数量。
"""
# 初始化计数器
left_count = 0.0
straight_count = 0.0
right_count = 0.0
# 分割输入字符串
turn_keys = turn_str.split('|')
# 遍历每个车道
for key in turn_keys:
if key in g_turn2str:
turn_value = g_turn2str[key]
# 忽略掉头和特殊车道
if turn_value in ['t', 'bus', 'reversible', '-']:
continue
# 计算该车道的转向数量
turn_count = 0
if 'l' in turn_value:
turn_count += 1
if 's' in turn_value:
turn_count += 1
if 'r' in turn_value:
turn_count += 1
# 如果该车道有转向,则按转向数量平分权重
if turn_count > 0:
weight = 0.5 if turn_count > 1 else 1.0
if 'l' in turn_value:
left_count += weight
if 's' in turn_value:
straight_count += weight
if 'r' in turn_value:
right_count += weight
# 返回格式化的结果,保留一位小数
return round(left_count, 1), round(straight_count, 1), round(right_count, 1)
def get_next_cross(nodeid, area_id, roads_dir_dict):
src_cross = {}
for src_dir in roads_dir_dict.keys():
outroadid = roads_dir_dict[src_dir]['out']
if outroadid and outroadid != '-':
row_list = db_tmnet.query_out_cross(outroadid)
if len(row_list) > 0:
crossid = row_list[0]['to_crossid']
cross_info = db_tmnet.query_next_cross_info(nodeid, area_id, crossid)
if cross_info:
src_cross[src_dir] = cross_info[0]
return src_cross