cross_doctor/app/eva_common.py

653 lines
34 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2025/10/10 19:51
# @Description: 路口评价相关的公共函数
import copy
import json
import logging
import requests
from google.protobuf.json_format import MessageToJson
import proto.xlcomm_pb2 as pb
from app.global_source import db_tmnet, db_cross
src_reverse = {'N': 'S', 'S': 'N', 'W': 'E', 'E': 'W', 'NE': 'SW', 'SW': 'NE', 'SE': 'NW', 'NW': 'SE'}
dir_str_dict = {
'E': '', 'S': '', 'W': '西', 'N': '', 'NE': '东北', 'SE': '东南', 'SW': '西南', 'NW': '西北'
}
int_turn_type2str = {
0: '直行',
1: '左转',
2: '右转',
3: '掉头'
}
int_internet_code2str = {
0: '',
1: '联网',
2: '脱机',
3: '未接入'
}
int_cross_model2str = {
'1': '',
'2': '',
'3': '',
'4': ''
}
def gen_avg_cross_delay_pb(cross_delay_data_list):
"""
路口评价指标数据聚合
:param cross_delay_data_list:
:return:
"""
cross_delay_pb_list = []
for item in cross_delay_data_list:
item_delay_pb = pb.xl_cross_delayinfo_t()
item_delay_pb.ParseFromString(item['data'])
cross_delay_pb_list.append(item_delay_pb)
avg_cross_delay = None
if len(cross_delay_pb_list) == 0:
return avg_cross_delay
elif len(cross_delay_pb_list) == 1:
avg_cross_delay = cross_delay_pb_list[0]
return avg_cross_delay
avg_cross_delay = pb.xl_cross_delayinfo_t()
avg_cross_delay.crossid = cross_delay_pb_list[0].crossid
avg_cross_delay.tp.start_hm = cross_delay_pb_list[0].tp.start_hm
avg_cross_delay.tp.end_hm = cross_delay_pb_list[0].tp.end_hm
avg_cross_delay.tp.weekday = cross_delay_pb_list[0].tp.weekday
avg_cross_delay.tp.type = cross_delay_pb_list[0].tp.type
avg_cross_delay.daynum = 0
sum_car_num, delay_time, stop_times, queue_len, speed, jam_index, park_time, std_flow, high_park_percent, truck_percent, park_percent, move_speed, imbalance_index\
= 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
# 路口延误数据指标聚合逻辑
for item in cross_delay_pb_list:
item_delay_info = item.delay_info
item_car_num = item_delay_info.car_num
if item_car_num != 0:
sum_car_num += item_car_num
delay_time += item_delay_info.delay_time * item_car_num
stop_times += item_delay_info.stop_times * item_car_num
queue_len += item_delay_info.queue_len * item_car_num
speed += item_delay_info.speed * item_car_num
jam_index += item_delay_info.jam_index * item_car_num
park_time += item_delay_info.park_time * item_car_num
high_park_percent += item_delay_info.high_park_percent * item_car_num
truck_percent += item_delay_info.truck_percent * item_car_num
park_percent += item_delay_info.park_percent * item_car_num
move_speed += item_delay_info.move_speed * item_car_num
# imbalance_index += item_delay_info.imbalance_index * item_car_num
std_flow += item_delay_info.std_flow
avg_cross_delay.delay_info.car_num = sum_car_num
avg_cross_delay.delay_info.delay_time = int(delay_time / sum_car_num)
avg_cross_delay.delay_info.stop_times = stop_times / sum_car_num
avg_cross_delay.delay_info.queue_len = int(queue_len / sum_car_num)
avg_cross_delay.delay_info.speed = int(speed / sum_car_num)
avg_cross_delay.delay_info.jam_index = jam_index / sum_car_num
avg_cross_delay.delay_info.park_time = int(park_time / sum_car_num)
avg_cross_delay.delay_info.high_park_percent = int(high_park_percent / sum_car_num)
avg_cross_delay.delay_info.truck_percent = int(truck_percent / sum_car_num)
avg_cross_delay.delay_info.park_percent = int(park_percent / sum_car_num)
avg_cross_delay.delay_info.move_speed = int(move_speed / sum_car_num)
# avg_cross_delay.delay_info.imbalance_index = imbalance_index / sum_car_num
avg_cross_delay.delay_info.std_flow = int(std_flow / len(cross_delay_pb_list))
# 进口道延误数据聚合处理逻辑
inroad_delay_info_dict = {}
for item in cross_delay_pb_list:
inroad_delay_infos = item.inroad_delay_infos
for inroad_item in inroad_delay_infos:
inroadid = inroad_item.inroadid
if inroadid not in inroad_delay_info_dict:
inroad_delay_info_dict[inroadid] = [inroad_item]
else:
inroad_delay_info_dict[inroadid].append(inroad_item)
flow_delay_list, inroad_delay_pb_list = [], []
flow_delay_info_dict = {}
max_stop_times, min_stop_times = 0, 99999
for inroadid in inroad_delay_info_dict.keys():
road_delay_infos = inroad_delay_info_dict[inroadid]
inroad_sum_car_num, inroad_delay_time, inroad_stop_times, inroad_queue_len, inroad_speed, inroad_jam_index, inroad_park_time, inroad_high_park_percent, inroad_truck_percent, inroad_park_percent, inroad_move_speed, inroad_imbalance_index, inroad_std_flow, inroad_travel_time\
= 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
ffs, capacity = road_delay_infos[0].delay_info.ffs, road_delay_infos[0].delay_info.capacity
turn_ratio_0, turn_ratio_1, turn_ratio_2, turn_ratio_3 = 0, 0, 0, 0
for road_delay_info in road_delay_infos:
inroad_sum_car_num += road_delay_info.delay_info.car_num
inroad_delay_time += road_delay_info.delay_info.delay_time * road_delay_info.delay_info.car_num
inroad_stop_times += road_delay_info.delay_info.stop_times * road_delay_info.delay_info.car_num
inroad_queue_len += road_delay_info.delay_info.queue_len * road_delay_info.delay_info.car_num
inroad_speed += road_delay_info.delay_info.speed * road_delay_info.delay_info.car_num
inroad_jam_index += road_delay_info.delay_info.jam_index * road_delay_info.delay_info.car_num
inroad_park_time += road_delay_info.delay_info.park_time * road_delay_info.delay_info.car_num
inroad_high_park_percent += road_delay_info.delay_info.high_park_percent * road_delay_info.delay_info.car_num
inroad_truck_percent += road_delay_info.delay_info.truck_percent * road_delay_info.delay_info.car_num
inroad_park_percent += road_delay_info.delay_info.park_percent * road_delay_info.delay_info.car_num
inroad_move_speed += road_delay_info.delay_info.move_speed * road_delay_info.delay_info.car_num
inroad_travel_time += road_delay_info.delay_info.travel_time * road_delay_info.delay_info.car_num
# inroad_imbalance_index += delay_info.imbalance_index * delay_info.car_num
inroad_std_flow += road_delay_info.delay_info.std_flow
turn_ratio_0 += road_delay_info.delay_info.turn_ratio_0
turn_ratio_1 += road_delay_info.delay_info.turn_ratio_1
turn_ratio_2 += road_delay_info.delay_info.turn_ratio_2
turn_ratio_3 += road_delay_info.delay_info.turn_ratio_3
# 流向级别数据指标聚合处理逻辑
flow_delay_infos = road_delay_info.flow_delay_infos
flow_delay_list.extend(flow_delay_infos)
inroad_delay_info = pb.xl_inroad_delayinfo_t()
inroad_delay_info.inroadid = inroadid
inroad_delay_info.delay_info.ffs = ffs
inroad_delay_info.delay_info.capacity = capacity
inroad_delay_info.delay_info.car_num = inroad_sum_car_num
inroad_delay_info.delay_info.delay_time = int(inroad_delay_time / inroad_sum_car_num)
inroad_delay_info.delay_info.stop_times = inroad_stop_times / inroad_sum_car_num
if stop_times > max_stop_times:
max_stop_times = stop_times
if stop_times < min_stop_times:
min_stop_times = stop_times
inroad_delay_info.delay_info.queue_len = int(inroad_queue_len / inroad_sum_car_num)
inroad_delay_info.delay_info.speed = int(inroad_speed / inroad_sum_car_num)
inroad_delay_info.delay_info.jam_index = inroad_jam_index / inroad_sum_car_num
inroad_delay_info.delay_info.park_time = int(inroad_park_time / inroad_sum_car_num)
inroad_delay_info.delay_info.high_park_percent = int(inroad_high_park_percent / inroad_sum_car_num)
inroad_delay_info.delay_info.truck_percent = int(inroad_truck_percent / inroad_sum_car_num)
inroad_delay_info.delay_info.park_percent = int(inroad_park_percent / inroad_sum_car_num)
inroad_delay_info.delay_info.move_speed = int(inroad_move_speed / inroad_sum_car_num)
inroad_delay_info.delay_info.travel_time = int(inroad_travel_time / inroad_sum_car_num)
# inroad_delay_info.delay_info.imbalance_index = inroad_imbalance_index / inroad_sum_car_num
inroad_delay_info.delay_info.std_flow = int(inroad_std_flow / len(road_delay_infos))
inroad_delay_info.delay_info.turn_ratio_0 = turn_ratio_0
inroad_delay_info.delay_info.turn_ratio_1 = turn_ratio_1
inroad_delay_info.delay_info.turn_ratio_2 = turn_ratio_2
inroad_delay_info.delay_info.turn_ratio_3 = turn_ratio_3
inroad_delay_pb_list.append(inroad_delay_info)
cross_imbalance_index = round((max_stop_times - min_stop_times) / avg_cross_delay.delay_info.stop_times, 2)
avg_cross_delay.delay_info.imbalance_index = cross_imbalance_index
for flow_delay_info in flow_delay_list:
xlink_id = flow_delay_info.xlink_id
if xlink_id not in flow_delay_info_dict:
flow_delay_info_dict[xlink_id] = [flow_delay_info]
else:
flow_delay_info_dict[xlink_id].append(flow_delay_info)
for xlink_id in flow_delay_info_dict.keys():
flow_delay_infos = flow_delay_info_dict[xlink_id]
inroadid = flow_delay_infos[0].inroadid
turn_type = flow_delay_infos[0].turn_type
flow_sum_car_num, flow_delay_time, flow_stop_times, flow_queue_len, flow_speed, flow_jam_index, flow_park_time, flow_high_park_percent, flow_truck_percent, flow_park_percent, flow_move_speed, flow_imbalance_index, flow_std_flow, flow_travel_time \
= 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
for flow_delay_info in flow_delay_infos:
flow_sum_car_num += flow_delay_info.delay_info.car_num
flow_delay_time += flow_delay_info.delay_info.delay_time * flow_delay_info.delay_info.car_num
flow_stop_times += flow_delay_info.delay_info.stop_times * flow_delay_info.delay_info.car_num
flow_queue_len += flow_delay_info.delay_info.queue_len * flow_delay_info.delay_info.car_num
flow_travel_time += flow_delay_info.delay_info.travel_time * flow_delay_info.delay_info.car_num
flow_speed += flow_delay_info.delay_info.speed * flow_delay_info.delay_info.car_num
flow_jam_index += flow_delay_info.delay_info.jam_index * flow_delay_info.delay_info.car_num
flow_park_time += flow_delay_info.delay_info.park_time * flow_delay_info.delay_info.car_num
flow_high_park_percent += flow_delay_info.delay_info.high_park_percent * flow_delay_info.delay_info.car_num
flow_truck_percent += flow_delay_info.delay_info.truck_percent * flow_delay_info.delay_info.car_num
flow_park_percent += flow_delay_info.delay_info.park_percent * flow_delay_info.delay_info.car_num
flow_move_speed += flow_delay_info.delay_info.move_speed * flow_delay_info.delay_info.car_num
# flow_imbalance_index += flow_delay_info.delay_info.imbalance_index * flow_delay_info.delay_info.car_num
flow_std_flow += flow_delay_info.delay_info.std_flow
flow_info = pb.xl_flow_delayinfo_t()
flow_info.xlink_id = xlink_id
flow_info.inroadid = inroadid
flow_info.turn_type = turn_type
flow_info.delay_info.car_num = flow_sum_car_num
flow_info.delay_info.delay_time = int(flow_delay_time / flow_sum_car_num)
flow_info.delay_info.stop_times = flow_stop_times / flow_sum_car_num
flow_info.delay_info.queue_len = int(flow_queue_len / flow_sum_car_num)
flow_info.delay_info.speed = int(flow_speed / flow_sum_car_num)
flow_info.delay_info.travel_time = int(flow_travel_time / flow_sum_car_num)
flow_info.delay_info.jam_index = flow_jam_index / flow_sum_car_num
flow_info.delay_info.park_time = int(flow_park_time / flow_sum_car_num)
flow_info.delay_info.high_park_percent = int(flow_high_park_percent / flow_sum_car_num)
flow_info.delay_info.truck_percent = int(flow_truck_percent / flow_sum_car_num)
flow_info.delay_info.park_percent = int(flow_park_percent / flow_sum_car_num)
flow_info.delay_info.move_speed = int(flow_move_speed / flow_sum_car_num)
# flow_info.delay_info.imbalance_index = flow_imbalance_index / flow_sum_car_num
flow_info.delay_info.std_flow = int(flow_std_flow / len(flow_delay_infos))
for inroad_info in inroad_delay_pb_list:
if inroad_info.inroadid == inroadid:
inroad_info.flow_delay_infos.append(flow_info)
inroad_delay_infos_with_imbalance = calc_inroad_imbalance_index(inroad_delay_pb_list)
avg_cross_delay.inroad_delay_infos.extend(inroad_delay_infos_with_imbalance)
# 出口道流量数据聚合逻辑
outroad_delay_info_dict = {}
for item in cross_delay_pb_list:
outroadid_infos = item.outroad_infos
for outroadid_info in outroadid_infos:
outroadid = outroadid_info.outroadid
if outroadid not in outroad_delay_info_dict:
outroad_delay_info_dict[outroadid] = [outroadid_info]
else:
outroad_delay_info_dict[outroadid].append(outroadid_info)
for outroadid in outroad_delay_info_dict.keys():
outroadid_infos = outroad_delay_info_dict[outroadid]
car_num, turn_ratio_0, turn_ratio_1, turn_ratio_2, turn_ratio_3 = 0, 0, 0, 0, 0
for outroadid_info in outroadid_infos:
turn_info = outroadid_info.turn_info
car_num += turn_info.car_num
turn_ratio_0 += turn_info.turn_ratio_0
turn_ratio_1 += turn_info.turn_ratio_1
turn_ratio_2 += turn_info.turn_ratio_2
turn_ratio_3 += turn_info.turn_ratio_3
outroadid_info = pb.xl_outroad_info_t()
outroadid_info.outroadid = outroadid
outroadid_info.turn_info.car_num = car_num
outroadid_info.turn_info.turn_ratio_0 = turn_ratio_0
outroadid_info.turn_info.turn_ratio_1 = turn_ratio_1
outroadid_info.turn_info.turn_ratio_2 = turn_ratio_2
outroadid_info.turn_info.turn_ratio_3 = turn_ratio_3
avg_cross_delay.outroad_infos.append(outroadid_info)
return avg_cross_delay
# 生成路口诊断页面路口概览数据指标的函数
def gen_overview_index(avg_cross_delay_info, inroad_static_info_dict, nodeid, date_list, roads_dir_dict):
car_num = avg_cross_delay_info.delay_info.car_num
crossid = avg_cross_delay_info.crossid
jam_index = round(avg_cross_delay_info.delay_info.jam_index, 2)
stop_times = round(avg_cross_delay_info.delay_info.stop_times, 2)
high_park_percent = str(avg_cross_delay_info.delay_info.high_park_percent) + '%'
imbanlance_index = round(avg_cross_delay_info.delay_info.imbalance_index, 2)
speed = avg_cross_delay_info.delay_info.speed / 100
move_speed = avg_cross_delay_info.delay_info.move_speed / 100
park_time = avg_cross_delay_info.delay_info.park_time
delay_time = avg_cross_delay_info.delay_info.delay_time
high_stop_turn_ratio_desc, main_flow_src_desc = gen_high_stop_turn_ratio_desc(avg_cross_delay_info.inroad_delay_infos, inroad_static_info_dict, car_num)
tide_index_list = calc_tide_index(crossid, nodeid, date_list, roads_dir_dict)
usable_tide_list = [item for item in tide_index_list if item != 0]
tide_index = round(sum(usable_tide_list) / len(usable_tide_list), 2) if len(usable_tide_list) > 0 else 0
service_level = calc_service_level(delay_time)
overview_res = {
'jam_index': jam_index,
'stop_times': stop_times,
'high_park_percent': high_park_percent,
'imbanlance_index': imbanlance_index,
'speed': speed,
'move_speed': move_speed,
'park_time': park_time,
'delay_time': delay_time,
'service_level': service_level,
'high_stop_turn_ratio_desc': high_stop_turn_ratio_desc,
'main_flow_src_desc': main_flow_src_desc,
'tide_index': tide_index
}
return overview_res
def gen_high_stop_turn_ratio_desc(inroad_delay_infos, inroad_static_info_dict, car_num):
src_dict, src_desc_list, main_flow_src_list = {}, [], []
for inroad_info in inroad_delay_infos:
inroadid = inroad_info.inroadid
if inroadid not in inroad_static_info_dict.keys():
continue
inroad_car_num = inroad_info.delay_info.car_num
src_dir = dir_str_dict[inroad_static_info_dict[inroadid]['src_direct']]
if inroad_car_num / car_num > 0.25:
check_res = found_max_car_num_flow(inroad_car_num, inroad_info)
if check_res != -1:
main_flow_src_list.append(src_dir + int_turn_type2str[check_res])
flow_delay_infos = inroad_info.flow_delay_infos
for item in flow_delay_infos:
if item.stop_times >= 1:
turn_type = int_turn_type2str[item.turn_type]
if src_dir not in src_dict.keys():
src_dict[src_dir] = [turn_type]
else:
src_dict[src_dir].append(turn_type)
for item in src_dict.keys():
src_desc_list.append(item + '/'.join(src_dict[item]))
return ''.join(src_desc_list) if src_desc_list else '', ''.join(main_flow_src_list) if main_flow_src_list else ''
def found_max_car_num_flow(inroad_car_num, inroad_info):
data_list = [inroad_info.turn_ratio_0, inroad_info.turn_ratio_1, inroad_info.turn_ratio_2, inroad_info.turn_ratio_3]
for i in range(len(data_list)):
if data_list[i] / inroad_car_num > 0.5:
return i
return -1
def query_cross_ledger_info(crossid, nodeid, area_id, userid):
ledger_url = f"http://120.53.125.169:7070/api/common/cross_ledger_detail?crossid={crossid}&crossno=&nodeid={nodeid}&area_id={area_id}&userid={userid}"
headers = {"Content-Type": "application/json"}
cross_ledger_info = requests.get(ledger_url, headers=headers)
if cross_ledger_info.status_code != 200 or cross_ledger_info.json()['status'] != 0:
logging.error(f"查询路口台账信息失败,crossid:{crossid},nodeid:{nodeid},area_id:{area_id},userid:{userid}")
return None, None
division_list, company_list, slc_company_dict, slckind_list, detector_type_dict = gen_ledger_base_info(nodeid, area_id)
cross_ledger_info_dict = json.loads(cross_ledger_info.text)
slc_company = cross_ledger_info_dict['data']['ledger']['slc_company']
location = cross_ledger_info_dict['data']['ledger']['location']
slc_company_name = slc_company_dict[slc_company] if slc_company in slc_company_dict.keys() else '-'
internet = cross_ledger_info_dict['data']['ledger']['internet']
internet_str = int_internet_code2str[internet]
primary = cross_ledger_info_dict['data']['ledger']['primary']
secondary = cross_ledger_info_dict['data']['ledger']['secondary']
cross_model = int_cross_model2str[primary] + '-' + int_cross_model2str[secondary]
light_infos = cross_ledger_info_dict['data']['light_infos']
src_light_dict = gen_dir_light_info_dict(light_infos)
roads = cross_ledger_info_dict['data']['roads']
dir_list = roads.keys()
length_info, lane_nums_list, road_special_info = {}, [], {}
sum_length = 0
for dir in dir_list:
dir_length = roads[dir]['topology']['entry']['road_length']
straight_left_split, reverse_turn, left_turn_waiting_area, straight_turn_waiting_area, reversible_lanes_info, outside_left = 0, 0, 0, 0, 0, 0
if 2 in src_light_dict[dir] and (1 in src_light_dict[dir] or 3 in src_light_dict[dir]):
straight_left_split = 1
if roads[dir]['canaliza']['reverse_turn'] == 1:
reverse_turn = 1
if roads[dir]['canaliza']['hold_left'] == 1:
left_turn_waiting_area = 1
if roads[dir]['canaliza']['hold_straight'] == 1:
straight_turn_waiting_area = 1
if len(roads[dir]['canaliza']['reversible_lanes_info']) > 0:
reversible_lanes_info = 1
lane_turn_info = roads[dir]['topology']['entry']['lane_turn_info']
if check_outside_left(lane_turn_info):
outside_left = 1
road_special_info[dir_str_dict[dir]] = {
'straight_left_split': straight_left_split,
'reverse_turn': reverse_turn,
'hold_left': left_turn_waiting_area,
'hold_straight': straight_turn_waiting_area,
'reversible_lanes_info': reversible_lanes_info,
'outside_left': outside_left
}
lane_nums_list.append([dir_str_dict[dir] + '进口', roads[dir]['entry_lane_num'], dir_str_dict[src_reverse[dir]] + '出口', roads[dir]['exit_lane_num']])
if dir_length != '-':
sum_length += dir_length
length_info[dir] = dir_length
avg_length = round(sum_length / len(length_info.keys()), 0) if length_info else 0
cross_static_info = {
'crossid': crossid,
'nodeid': nodeid,
'area_id': area_id,
'location': location,
'slc_company': slc_company_name,
'internet': internet_str,
'cross_model': cross_model,
'lane_nums_list': lane_nums_list,
'road_special_info': road_special_info,
'length_info': length_info,
'avg_length': avg_length
}
return cross_static_info, cross_ledger_info_dict
def gen_ledger_base_info(nodeid, area_id):
node_base_params_info = db_tmnet.query_base_info(nodeid, area_id)
# 下列字段当为空时需在前端展示尚未针对该城市进行配置的同义提示语句
division_list, company_list, slc_company_dict, slckind_list, detector_type_dict = [], [], {}, [], {}
for row in node_base_params_info:
if row['param_type'] == 'division':
division_list.append(row['param_value'])
elif row['param_type'] == 'company':
company_list.append(row['param_value'])
elif row['param_type'] == 'slc_company':
slc_company_dict[row['param_code']] = row['param_value']
elif row['param_type'] == 'slckind':
slckind_list.append(row['param_value'])
elif row['param_type'] == 'detector_type':
detector_type_dict[row['param_code']] = row['param_value']
return division_list, company_list, slc_company_dict, slckind_list, detector_type_dict
def gen_dir_light_info_dict(light_infos):
dir_light_dict = {}
for row in light_infos:
srcDir = row['srcDir']
light_type = row['light_type']
if srcDir not in dir_light_dict.keys():
dir_light_dict[srcDir] = [light_type]
else:
dir_light_dict[srcDir].append(light_type)
return dir_light_dict
def check_outside_left(lane_turn_info):
seen_s = False # 是否已出现直行车道
for func in lane_turn_info:
if 's' in func: # 当前是直行车道
seen_s = True
if seen_s and ('l' in func): # 直行车道右侧出现左转
return True
return False
def gen_road_delay_index(avg_cross_delay_info, roads_dir_dict):
road_delay_infos = []
if not avg_cross_delay_info:
return road_delay_infos
road_delay_infos = avg_cross_delay_info.inroad_delay_infos
road_dir = {v['in']: k for k, v in roads_dir_dict.items()}
road_flow_index = {}
for road_index in road_delay_infos:
roadid = road_index.inroadid
if roadid not in road_dir.keys():
continue
service_level = calc_service_level(road_index.delay_info.delay_time)
road_flow_index[roadid] = {
'stop_times': round(road_index.delay_info.stop_times, 2),
'high_park_percent': str(road_index.delay_info.high_park_percent) + '%',
'imbalance_index': round(road_index.delay_info.imbalance_index, 2),
'park_time': road_index.delay_info.park_time,
'delay_time': road_index.delay_info.delay_time,
'speed': road_index.delay_info.speed / 100,
'move_speed': road_index.delay_info.move_speed / 100,
'service_level': service_level,
'flow_delays': {}
}
flow_delay_infos = road_index.flow_delay_infos
for flow_delay_info in flow_delay_infos:
if flow_delay_info.turn_type not in (0, 1):
continue
flow_service_level = calc_service_level(flow_delay_info.delay_info.delay_time)
road_flow_index[roadid]['flow_delays'][flow_delay_info.turn_type] = {
'stop_times': round(flow_delay_info.delay_info.stop_times, 2),
'high_park_percent': str(flow_delay_info.delay_info.high_park_percent) + '%',
'park_time': flow_delay_info.delay_info.park_time,
'delay_time': flow_delay_info.delay_info.delay_time,
'speed': flow_delay_info.delay_info.speed / 100,
'service_level': flow_service_level,
'move_speed': flow_delay_info.delay_info.move_speed / 100
}
return road_flow_index
def gen_road_dir_dict(cross_ledger_info_dict):
roads_dir_dict = {}
if not cross_ledger_info_dict:
return roads_dir_dict
roads = cross_ledger_info_dict['data']['roads']
for dir in roads.keys():
roads_dir_dict[dir] = {
'in': roads[dir]['roadid'],
'out': roads[dir]['out_roadid']
}
return roads_dir_dict
def calc_inroad_imbalance_index(inroad_delay_pb_list):
for item in inroad_delay_pb_list:
avg_stop_times = item.delay_info.stop_times
max_stop_times, min_stop_times = 0, 99999
flow_delay_infos = item.flow_delay_infos
for flow_delay_info in flow_delay_infos:
if flow_delay_info.delay_info.stop_times > max_stop_times:
max_stop_times = flow_delay_info.delay_info.stop_times
if flow_delay_info.delay_info.stop_times < min_stop_times:
min_stop_times = flow_delay_info.delay_info.stop_times
imbalance_index = round((max_stop_times - min_stop_times) / avg_stop_times, 2)
item.delay_info.imbalance_index = imbalance_index
return inroad_delay_pb_list
def gen_flow_turn_rate_index(avg_cross_delay_info, roads_dir_dict):
road_delay_infos = avg_cross_delay_info.inroad_delay_infos
outroad_infos = avg_cross_delay_info.outroad_infos
road_delay_dict = {item.inroadid: item for item in road_delay_infos}
outroad_info_dict = {item.outroadid: item for item in outroad_infos}
cross_sum_car_num = sum([item.delay_info.car_num for item in road_delay_infos])
cross_out_sum_car_num = sum([item.turn_info.car_num for item in outroad_infos])
road_flow_turn_rate = {}
for dir in roads_dir_dict.keys():
roadid = roads_dir_dict[dir]['in']
l_rate, s_rate, r_rate = '-', '-', '-'
out_l_rate, out_s_rate, out_r_rate = '-', '-', '-'
in_flow_rate, out_flow_rate = '-', '-'
if roadid != '-' and roadid in road_delay_dict.keys():
car_num = road_delay_dict[roadid].delay_info.car_num
in_flow_rate = round(car_num / cross_sum_car_num, 2) if cross_out_sum_car_num != 0 else '-'
flow_delay_infos = road_delay_dict[roadid].flow_delay_infos
for flow_delay_info in flow_delay_infos:
if flow_delay_info.turn_type not in (0, 1, 2):
continue
if flow_delay_info.turn_type == 0:
l_rate = round(flow_delay_info.delay_info.car_num / car_num, 2) if car_num != 0 else '-'
elif flow_delay_info.turn_type == 1:
s_rate = round(flow_delay_info.delay_info.car_num / car_num, 2) if car_num != 0 else '-'
elif flow_delay_info.turn_type == 2:
r_rate = round(flow_delay_info.delay_info.car_num / car_num, 2) if car_num != 0 else '-'
out_road_id = roads_dir_dict[dir]['out']
if out_road_id != '-' and out_road_id in outroad_info_dict.keys():
out_car_num = outroad_info_dict[out_road_id].turn_info.car_num
out_flow_rate = round(out_car_num / cross_out_sum_car_num, 2) if cross_out_sum_car_num != 0 else '-'
out_l_rate = round(outroad_info_dict[out_road_id].turn_info.turn_ratio_1 / cross_out_sum_car_num, 2) if cross_out_sum_car_num != 0 else '-'
out_s_rate = round(outroad_info_dict[out_road_id].turn_info.turn_ratio_0 / cross_out_sum_car_num, 2) if cross_out_sum_car_num != 0 else '-'
out_r_rate = round(outroad_info_dict[out_road_id].turn_info.turn_ratio_2 / cross_out_sum_car_num, 2) if cross_out_sum_car_num != 0 else '-'
road_flow_turn_rate[roadid] = {
'in_flow_rate': in_flow_rate,
'out_flow_rate': out_flow_rate,
'l_rate': l_rate,
's_rate': s_rate,
'r_rate': r_rate,
'out_l_rate': out_l_rate,
'out_s_rate': out_s_rate,
'out_r_rate': out_r_rate
}
return road_flow_turn_rate
def calc_service_level(delay_time):
if delay_time <= 10:
service_level = 'A'
elif 10 < delay_time <= 20:
service_level = 'B'
elif 20 < delay_time <= 35:
service_level = 'C'
elif 35 < delay_time <= 55:
service_level = 'D'
elif 55 < delay_time <= 80:
service_level = 'E'
else:
service_level = 'F'
return service_level
def calc_tide_index(crossid, nodeid, date_list, roads_dir_dict):
tide_index_list = []
am_tp_start, am_tp_end = 't700', '900'
pm_tp_start, pm_tp_end = 't1700', '1900'
am_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, am_tp_start, am_tp_end)
pm_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, pm_tp_start, pm_tp_end)
date_am_delay_info_dict = {item['day']: item['data'] for item in am_cross_delay_data_list}
date_pm_delay_info_dict = {item['day']: item['data'] for item in pm_cross_delay_data_list}
# 构建对向进口道对
subtend_road_pair = gen_subtend_road_pair(roads_dir_dict)
for date in date_list:
max_am_flow, max_pm_flow, tide_index, max_am_flow_road, max_pm_flow_road = 0, 99999999999, 0, '', ''
if date not in date_am_delay_info_dict.keys() or date not in date_pm_delay_info_dict.keys():
tide_index_list.append(tide_index)
continue
item_am_cross_delay_info = pb.xl_cross_delayinfo_t()
item_pm_cross_delay_info = pb.xl_cross_delayinfo_t()
item_am_cross_delay_info.ParseFromString(date_am_delay_info_dict[date])
item_pm_cross_delay_info.ParseFromString(date_pm_delay_info_dict[date])
if not item_am_cross_delay_info or not item_pm_cross_delay_info:
tide_index_list.append(tide_index)
continue
am_inroad_infos = item_am_cross_delay_info.inroad_delay_infos
pm_inroad_infos = item_pm_cross_delay_info.inroad_delay_infos
am_inroad_info_dict = {item.inroadid: item for item in am_inroad_infos}
pm_inroad_info_dict = {item.inroadid: item for item in pm_inroad_infos}
# 找出早高峰最高流量的进口道
for am_inroad_info in am_inroad_infos:
am_flow = am_inroad_info.delay_info.car_num
if am_flow > max_am_flow:
max_am_flow = am_flow
max_am_flow_road = am_inroad_info.inroadid
if max_am_flow_road == '' or max_am_flow_road not in subtend_road_pair.keys():
tide_index_list.append(tide_index)
continue
subtend_road_am_flow = am_inroad_info_dict[subtend_road_pair[max_am_flow_road]].delay_info.car_num
# 如果进口道流量小于20 则无意义
if min(subtend_road_am_flow, max_am_flow) < 20:
tide_index_list.append(tide_index)
continue
# 找出晚高峰最高流量的进口道
for pm_inroad_info in pm_inroad_infos:
pm_flow = pm_inroad_info.delay_info.car_num
if pm_flow > max_pm_flow:
max_pm_flow = pm_flow
max_pm_flow_road = pm_inroad_info.inroadid
# 如果路段id为空或不在对向对里则无意义
if max_pm_flow_road == '' or max_pm_flow_road not in subtend_road_pair.keys():
tide_index_list.append(tide_index)
continue
subtend_road_pm_flow = pm_inroad_info_dict[subtend_road_pair[max_pm_flow_road]].delay_info.car_num
if min(subtend_road_pm_flow, max_pm_flow) < 20:
tide_index_list.append(tide_index)
continue
# 如果早晚高峰的最大流量进口道不处于同一个对向对中,则无意义
if max_pm_flow_road != max_am_flow_road and max_pm_flow_road != subtend_road_pair[max_am_flow_road]:
tide_index_list.append(tide_index)
continue
Fam = max(max_am_flow_road, subtend_road_am_flow) / min(max_am_flow_road, subtend_road_am_flow)
Fpm = max(max_pm_flow_road, subtend_road_pm_flow) / min(max_pm_flow_road, subtend_road_pm_flow)
if Fam >= 1.5 and Fpm >= 1.5 and max_pm_flow_road == subtend_road_pair[max_am_flow_road]:
tide_index = min(Fam, Fpm) / max(Fam, Fpm)
elif min(Fam, Fpm) <= 0:
tide_index = 0
elif max_pm_flow_road == max_am_flow_road:
tide_index = Fam * Fpm
tide_index_list.append(tide_index)
return tide_index_list
def gen_subtend_road_pair(roads_dir_dict):
subtend_road_pair = {}
tmp_road_dir = copy.deepcopy(roads_dir_dict)
keys_to_process = list(tmp_road_dir.keys())
for dir in keys_to_process:
if dir in tmp_road_dir.keys() and tmp_road_dir[dir]['in'] == '-':
continue
subtend_dir = src_reverse[dir]
if subtend_dir in tmp_road_dir.keys() and tmp_road_dir[subtend_dir]['in'] != '-':
subtend_road_pair[tmp_road_dir[dir]['in']] = tmp_road_dir[subtend_dir]['in']
tmp_road_dir.pop(dir)
tmp_road_dir.pop(subtend_dir)
return subtend_road_pair