提交路口诊断页面指标变化趋势相关接口内容和路口问题诊断接口未完成代码

This commit is contained in:
wangxu 2025-10-27 10:35:25 +08:00
parent febd66aaed
commit 1ac4c64b4a
7 changed files with 763 additions and 26 deletions

View File

@ -340,4 +340,67 @@ def convert_time(time_int):
minutes = time_int % 100
time_str = f"{hours:02d}:{minutes:02d}"
return time_str
return time_str
def generate_date_range(start_date_str, end_date_str):
"""生成两个日期之间的所有日期(包括开始和结束日期)"""
start_date = datetime.strptime(start_date_str, "%Y%m%d")
end_date = datetime.strptime(end_date_str, "%Y%m%d")
date_list = []
current_date = start_date
while current_date <= end_date:
date_list.append(current_date.strftime("%Y%m%d"))
current_date += timedelta(days=1)
return date_list
def gen_ten_weeks_ago_data_list():
fmt = "%Y%m%d"
end = datetime.now() - timedelta(days=1) # 昨天
start = end - timedelta(days=69) # 70 天跨度 = 10 个完整周
week_buckets = {}
for d in range((end - start).days + 1):
day = start + timedelta(days=d)
monday = day - timedelta(days=day.weekday()) # 本周一
week_buckets.setdefault(monday, []).append(day.strftime(fmt))
# 大 list从远到近最早一周在最前面
result = [week_buckets[k] for k in sorted(week_buckets, reverse=False)]
return result
def count_lsr(turn_str):
"""
统计车道转向中左转(l)直行(s)右转(r)的数量
Args:
turn_str: 形如 "2|1|1|1|5" 的字符串每个数字对应 g_turn2str 中的键
Returns:
形如 "1/3/1" 的字符串顺序为左转直行右转的数量
"""
# 初始化计数器
left_count = 0
straight_count = 0
right_count = 0
# 分割输入字符串
turn_keys = turn_str.split('|')
# 遍历每个车道
for key in turn_keys:
if key in g_turn2str:
turn_value = g_turn2str[key]
# 忽略掉头和特殊车道
if turn_value in ['t', 'bus', 'reversible', '-']:
continue
# 统计每个转向
if 'l' in turn_value:
left_count += 1
if 's' in turn_value:
straight_count += 1
if 'r' in turn_value:
right_count += 1
# 返回格式化的结果
return f"{left_count}/{straight_count}/{right_count}"

View File

@ -34,7 +34,11 @@ def query_cross_usable_date_api():
@app.route('/api/query_cross_delay_info', methods=['POST'])
def query_cross_delay_info_api():
return query_cross_delay_info(request.json)
return query_cross_delay_info_controller(request.json)
@app.route('/api/query_cross_index_trend', methods=['POST'])
def query_cross_index_trend_api():
return query_cross_index_trend_controller(request.json)
if __name__ == '__main__':

View File

@ -9,6 +9,9 @@ from app.eva_common import *
# 查询可用路口列表
from proto.phase_grpc import QueryCrossRunningPhase
def query_cross_list(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
@ -28,8 +31,25 @@ def query_cross_list(params):
# 查询路口列表
cross_list = db_tmnet.query_cross_list_sql(nodeid, area_id)
tp_desc = db_tmnet.query_city_tp_info(nodeid, area_id)
if not tp_desc:
tp_info = [
"00:00-07:00",
"07:00-09:00",
"09:00-12:00",
"12:00-14:00",
"14:00-17:00",
"17:00-19:00",
"19:00-22:00",
"22:00-00:00"
]
else:
tp_info = tp_desc[0]['tp_desc'].split(',')
res = make_common_res(0, 'ok')
res['data'] = cross_list
res['data'] = {
'cross_list': cross_list,
'tp_info': tp_info,
}
return json.dumps(res, ensure_ascii=False)
@ -62,7 +82,7 @@ def query_cross_usable_date(params):
# 获取路口延迟信息
def query_cross_delay_info(params):
def query_cross_delay_info_controller(params):
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(1, '缺少crossid 请刷新后重试'))
@ -91,20 +111,23 @@ def query_cross_delay_info(params):
if not time_range:
return json.dumps(make_common_res(8, '缺少时段范围,请选择时段范围'))
tp_start = int(str(time_range.split('-')[0]).split(':')[0]) * 100 + int(str(time_range.split('-')[0]).split(':')[1])
tp_end = int(str(time_range.split('-')[1]).split(':')[0]) * 100 + int(str(time_range.split('-')[1]).split(':')[1])
# tp_end = int(str(time_range.split('-')[1]).split(':')[0]) * 100 + int(str(time_range.split('-')[1]).split(':')[1])
# if tp_end == 0:
# tp_end = 2400
if query_type == 1:
tp_start = 't' + str(tp_start)
elif query_type == 2:
tp_start = 'h' + str(tp_start)
cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, tp_start, tp_end)
cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, tp_start)
avg_cross_delay_info = gen_avg_cross_delay_pb(cross_delay_data_list)
if not avg_cross_delay_info:
return json.dumps(make_common_res(9, '当前所选日期范围内该评测时段无可用数据'))
cross_inroads = db_tmnet.query_cross_inroads(crossid, nodeid)
inroad_static_info_dict = {item['roadid']: item for item in cross_inroads}
# 路口静态信息及台账信息
cross_static_info, cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid)
roads_dir_dict = gen_road_dir_dict(cross_ledger_info_dict)
cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid)
cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict)
roads_dir_dict = gen_road_dir_dict(cross_ledger_info)
# 路口指标数据概览
overview_res = gen_overview_index(avg_cross_delay_info, inroad_static_info_dict, nodeid, date_list, roads_dir_dict)
# 路段及流向数据概览
@ -117,8 +140,141 @@ def query_cross_delay_info(params):
'road_flow_delay_infos': road_flow_delay_infos,
'road_flow_turn_rate': road_flow_turn_rate,
'cross_static_info': cross_static_info,
'ledger_info': cross_ledger_info_dict
'ledger_info': cross_ledger_info
}
return json.dumps(res, ensure_ascii=False)
# 问题诊断接口
def query_cross_problems(params):
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(1, '缺少crossid 请刷新后重试'))
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(6, '用户信息异常'))
date_list = check_param(params, 'date_list')
if not date_list or len(date_list) < 1:
return json.dumps(make_common_res(7, '缺少日期参数,请最少选择一天作为查询日期'))
query_type = check_param(params, 'query_type')
if not query_type:
query_type = 0
time_range = check_param(params, 'time_range')
if not time_range:
return json.dumps(make_common_res(8, '缺少时段范围,请选择时段范围'))
tp_start = int(str(time_range.split('-')[0]).split(':')[0]) * 100 + int(str(time_range.split('-')[0]).split(':')[1])
# tp_end = int(str(time_range.split('-')[1]).split(':')[0]) * 100 + int(str(time_range.split('-')[1]).split(':')[1])
# if tp_end == 0:
# tp_end = 2400
if query_type == 1:
tp_start = 't' + str(tp_start)
elif query_type == 2:
tp_start = 'h' + str(tp_start)
cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, tp_start)
avg_cross_delay_info = gen_avg_cross_delay_pb(cross_delay_data_list)
if not avg_cross_delay_info:
return json.dumps(make_common_res(9, '当前所选日期范围内该评测时段无可用数据'))
cross_inroads = db_tmnet.query_cross_inroads(crossid, nodeid)
inroad_static_info_dict = {item['roadid']: item for item in cross_inroads}
# 路口静态信息及台账信息
cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid)
cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict)
roads_dir_dict = gen_road_dir_dict(cross_ledger_info)
gen_cross_problems(avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict)
# todo 与子鉴确认逻辑
cross_phase, err = QueryCrossRunningPhase(nodeid, [crossid])
# 指标变化趋势接口
def query_cross_index_trend_controller(params):
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(1, '缺少crossid 请刷新后重试'))
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(6, '用户信息异常'))
query_date = check_param(params, 'query_date')
if not query_date:
return json.dumps(make_common_res(7, '缺少查询日期,请选择查询日期'))
time_range = check_param(params, 'time_range')
if not time_range:
return json.dumps(make_common_res(8, '缺少时段范围,请选择时段范围'))
query_type= check_param(params, 'query_type')
if not query_type:
query_type = 0
tp_start = int(str(time_range.split('-')[0]).split(':')[0]) * 100 + int(str(time_range.split('-')[0]).split(':')[1])
tp_end = int(str(time_range.split('-')[1]).split(':')[0]) * 100 + int(str(time_range.split('-')[1]).split(':')[1])
if query_type == 1:
tp_start = 't' + str(tp_start)
elif query_type == 2:
tp_start = 'h' + str(tp_start)
prev_date = (datetime.strptime(query_date, '%Y%m%d') - timedelta(days=1)).strftime('%Y%m%d')
month_ago_date = (datetime.now().date() - timedelta(days=30)).strftime('%Y%m%d')
month_date_list = generate_date_range(month_ago_date, query_date)
# ten_week_ago_date = (datetime.now().date() - timedelta(days=70)).strftime('%Y%m%d')
# week_date_list = generate_date_range(ten_week_ago_date, query_date)
ten_weeks_date_list = gen_ten_weeks_ago_data_list()
# 查询台账信息 获取路网渠化关系
cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid)
cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict)
roads_dir_dict = gen_road_dir_dict(cross_ledger_info)
# 查询前一天的小时级别数据
hours_data = db_cross.query_cross_delay_whole_day_hours(crossid, nodeid, prev_date)
hour_pb_list = parse_data2pb(hours_data)
hours_data_dict = parse_single_cross_delay_info(crossid, nodeid, hour_pb_list, 'hour', roads_dir_dict)
hours_data_with_change_rate = calc_single_day_delay_info_change_rate(hours_data_dict)
# 查询近30天的数据
days_data = db_cross.query_cross_delay_info(crossid, nodeid, month_date_list, tp_start)
days_pb_list = parse_data2pb(days_data)
days_data_dict = parse_single_cross_delay_info(crossid, nodeid, days_pb_list, 'day', roads_dir_dict)
days_data_with_change_rate = calc_single_day_delay_info_change_rate(days_data_dict)
# 查询近10周的数据 需要聚合 可能不能使用下述方式查询
ten_week_datas = []
for week_dates in ten_weeks_date_list:
weeks_data = db_cross.query_cross_delay_info(crossid, nodeid, week_dates, tp_start)
week_cross_delay_info = gen_avg_cross_delay_pb(weeks_data)
ten_week_datas.append({
'day': week_dates[0] + '-' + week_dates[-1],
'tp_start': tp_start,
'tp_end': tp_end,
'data': week_cross_delay_info
})
weeks_data_dict = parse_single_cross_delay_info(crossid, nodeid, ten_week_datas, 'week', roads_dir_dict)
weeks_data_with_change_rate = calc_single_day_delay_info_change_rate(weeks_data_dict)
res = make_common_res(0, 'ok')
res['data'] = {
'hours_data': hours_data_with_change_rate,
'days_data': days_data_with_change_rate,
'weeks_data': weeks_data_with_change_rate
}
return json.dumps(res, ensure_ascii=False)

View File

@ -18,9 +18,16 @@ class CrossDbHelper(TableDbHelperBase):
"""
return self.do_select(sql)
def query_cross_delay_info(self, crossid, nodeid, date_list, tp_start, tp_end):
def query_cross_delay_info(self, crossid, nodeid, date_list, tp_start):
date_list = ','.join(["'" + str(item) + "'" for item in date_list])
sql = f"""
select * from traffic_{nodeid}.cross_delay where crossid = '{crossid}' and day in ({date_list}) and tp_start = '{tp_start}' and tp_end = '{tp_end}'
select * from traffic_{nodeid}.cross_delay where crossid = '{crossid}' and day in ({date_list}) and tp_start = '{tp_start}' order by day
"""
return self.do_select(sql)
def query_cross_delay_whole_day_hours(self, crossid, nodeid, query_date):
sql = f"""
select * from traffic_{nodeid}.cross_delay where crossid = '{crossid}' and day = '{query_date}' and tp_start like 'h%' order by CAST(REGEXP_SUBSTR(tp_start, '[0-9]+') AS UNSIGNED)
"""
return self.do_select(sql)

View File

@ -10,7 +10,8 @@ import requests
from google.protobuf.json_format import MessageToJson
import proto.xlcomm_pb2 as pb
from app.global_source import db_tmnet, db_cross
from app.common_worker import *
from proto.phase_grpc import QueryCrossRunningPhase
src_reverse = {'N': 'S', 'S': 'N', 'W': 'E', 'E': 'W', 'NE': 'SW', 'SW': 'NE', 'SE': 'NW', 'NW': 'SE'}
@ -167,7 +168,7 @@ def gen_avg_cross_delay_pb(cross_delay_data_list):
inroad_delay_info.delay_info.turn_ratio_2 = turn_ratio_2
inroad_delay_info.delay_info.turn_ratio_3 = turn_ratio_3
inroad_delay_pb_list.append(inroad_delay_info)
cross_imbalance_index = round((max_stop_times - min_stop_times) / avg_cross_delay.delay_info.stop_times, 2)
cross_imbalance_index = round((max_stop_times - min_stop_times) / avg_cross_delay.delay_info.stop_times, 2) if avg_cross_delay.delay_info.stop_times != 0 else 0
avg_cross_delay.delay_info.imbalance_index = cross_imbalance_index
for flow_delay_info in flow_delay_list:
@ -262,7 +263,7 @@ def gen_overview_index(avg_cross_delay_info, inroad_static_info_dict, nodeid, da
jam_index = round(avg_cross_delay_info.delay_info.jam_index, 2)
stop_times = round(avg_cross_delay_info.delay_info.stop_times, 2)
high_park_percent = str(avg_cross_delay_info.delay_info.high_park_percent) + '%'
imbanlance_index = round(avg_cross_delay_info.delay_info.imbalance_index, 2)
imbalance_index = round(avg_cross_delay_info.delay_info.imbalance_index, 2)
speed = avg_cross_delay_info.delay_info.speed / 100
move_speed = avg_cross_delay_info.delay_info.move_speed / 100
park_time = avg_cross_delay_info.delay_info.park_time
@ -277,7 +278,7 @@ def gen_overview_index(avg_cross_delay_info, inroad_static_info_dict, nodeid, da
'jam_index': jam_index,
'stop_times': stop_times,
'high_park_percent': high_park_percent,
'imbanlance_index': imbanlance_index,
'imbalance_index': imbalance_index,
'speed': speed,
'move_speed': move_speed,
'park_time': park_time,
@ -330,8 +331,13 @@ def query_cross_ledger_info(crossid, nodeid, area_id, userid):
if cross_ledger_info.status_code != 200 or cross_ledger_info.json()['status'] != 0:
logging.error(f"查询路口台账信息失败,crossid:{crossid},nodeid:{nodeid},area_id:{area_id},userid:{userid}")
return None, None
division_list, company_list, slc_company_dict, slckind_list, detector_type_dict = gen_ledger_base_info(nodeid, area_id)
cross_ledger_info_dict = json.loads(cross_ledger_info.text)
return cross_ledger_info_dict
def gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict):
division_list, company_list, slc_company_dict, slckind_list, detector_type_dict = gen_ledger_base_info(nodeid, area_id)
slc_company = cross_ledger_info_dict['data']['ledger']['slc_company']
location = cross_ledger_info_dict['data']['ledger']['location']
slc_company_name = slc_company_dict[slc_company] if slc_company in slc_company_dict.keys() else '-'
@ -388,7 +394,7 @@ def query_cross_ledger_info(crossid, nodeid, area_id, userid):
'length_info': length_info,
'avg_length': avg_length
}
return cross_static_info, cross_ledger_info_dict
return cross_static_info, cross_ledger_info_dict['data']
def gen_ledger_base_info(nodeid, area_id):
@ -444,6 +450,7 @@ def gen_road_delay_index(avg_cross_delay_info, roads_dir_dict):
continue
service_level = calc_service_level(road_index.delay_info.delay_time)
road_flow_index[roadid] = {
'src_dir': road_dir[roadid],
'stop_times': round(road_index.delay_info.stop_times, 2),
'high_park_percent': str(road_index.delay_info.high_park_percent) + '%',
'imbalance_index': round(road_index.delay_info.imbalance_index, 2),
@ -475,7 +482,7 @@ def gen_road_dir_dict(cross_ledger_info_dict):
roads_dir_dict = {}
if not cross_ledger_info_dict:
return roads_dir_dict
roads = cross_ledger_info_dict['data']['roads']
roads = cross_ledger_info_dict['roads'] if 'roads' in cross_ledger_info_dict.keys() else None
for dir in roads.keys():
roads_dir_dict[dir] = {
'in': roads[dir]['roadid'],
@ -494,7 +501,7 @@ def calc_inroad_imbalance_index(inroad_delay_pb_list):
max_stop_times = flow_delay_info.delay_info.stop_times
if flow_delay_info.delay_info.stop_times < min_stop_times:
min_stop_times = flow_delay_info.delay_info.stop_times
imbalance_index = round((max_stop_times - min_stop_times) / avg_stop_times, 2)
imbalance_index = round((max_stop_times - min_stop_times) / avg_stop_times, 2) if avg_stop_times > 0 and max_stop_times > 0 and min_stop_times != 99999 else 0
item.delay_info.imbalance_index = imbalance_index
return inroad_delay_pb_list
@ -530,10 +537,11 @@ def gen_flow_turn_rate_index(avg_cross_delay_info, roads_dir_dict):
if out_road_id != '-' and out_road_id in outroad_info_dict.keys():
out_car_num = outroad_info_dict[out_road_id].turn_info.car_num
out_flow_rate = round(out_car_num / cross_out_sum_car_num, 2) if cross_out_sum_car_num != 0 else '-'
out_l_rate = round(outroad_info_dict[out_road_id].turn_info.turn_ratio_1 / cross_out_sum_car_num, 2) if cross_out_sum_car_num != 0 else '-'
out_s_rate = round(outroad_info_dict[out_road_id].turn_info.turn_ratio_0 / cross_out_sum_car_num, 2) if cross_out_sum_car_num != 0 else '-'
out_r_rate = round(outroad_info_dict[out_road_id].turn_info.turn_ratio_2 / cross_out_sum_car_num, 2) if cross_out_sum_car_num != 0 else '-'
out_l_rate = round(outroad_info_dict[out_road_id].turn_info.turn_ratio_1 / out_car_num, 2) if out_car_num != 0 else '-'
out_s_rate = round(outroad_info_dict[out_road_id].turn_info.turn_ratio_0 / out_car_num, 2) if out_car_num != 0 else '-'
out_r_rate = round(outroad_info_dict[out_road_id].turn_info.turn_ratio_2 / out_car_num, 2) if out_car_num != 0 else '-'
road_flow_turn_rate[roadid] = {
'src_dir': dir,
'in_flow_rate': in_flow_rate,
'out_flow_rate': out_flow_rate,
'l_rate': l_rate,
@ -566,8 +574,8 @@ def calc_tide_index(crossid, nodeid, date_list, roads_dir_dict):
tide_index_list = []
am_tp_start, am_tp_end = 't700', '900'
pm_tp_start, pm_tp_end = 't1700', '1900'
am_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, am_tp_start, am_tp_end)
pm_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, pm_tp_start, pm_tp_end)
am_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, am_tp_start)
pm_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, pm_tp_start)
date_am_delay_info_dict = {item['day']: item['data'] for item in am_cross_delay_data_list}
date_pm_delay_info_dict = {item['day']: item['data'] for item in pm_cross_delay_data_list}
@ -599,7 +607,7 @@ def calc_tide_index(crossid, nodeid, date_list, roads_dir_dict):
if max_am_flow_road == '' or max_am_flow_road not in subtend_road_pair.keys():
tide_index_list.append(tide_index)
continue
subtend_road_am_flow = am_inroad_info_dict[subtend_road_pair[max_am_flow_road]].delay_info.car_num
subtend_road_am_flow = am_inroad_info_dict[subtend_road_pair[max_am_flow_road]].delay_info.car_num if subtend_road_pair[max_am_flow_road] in am_inroad_info_dict.keys() else 0
# 如果进口道流量小于20 则无意义
if min(subtend_road_am_flow, max_am_flow) < 20:
tide_index_list.append(tide_index)
@ -650,3 +658,498 @@ def gen_subtend_road_pair(roads_dir_dict):
tmp_road_dir.pop(subtend_dir)
return subtend_road_pair
def parse_single_cross_delay_info(crossid, nodeid, data_list, data_type, roads_dir_dict):
data_dict = {}
max_cross_car_num, max_road_car_num, max_flow_car_num = 0, 0, 0
pb_data_list = []
for item in data_list:
if item:
pb_data_list.append(item['data'])
road_delay_infos = item['data'].inroad_delay_infos if item['data'] and item['data'].inroad_delay_infos else []
for road_delay_info in road_delay_infos:
if road_delay_info.delay_info.car_num > max_road_car_num:
max_road_car_num = road_delay_info.delay_info.car_num
flow_delay_infos = road_delay_info.flow_delay_infos
for flow_delay_info in flow_delay_infos:
if flow_delay_info.delay_info.car_num > max_flow_car_num:
max_flow_car_num = flow_delay_info.delay_info.car_num
max_cross_car_num = max(
(x for x in pb_data_list if x is not None),
key=lambda x: x.delay_info.car_num,
default=None
).delay_info.car_num
for item in data_list:
tp_start = item['tp_start']
tp_end = item['tp_end']
day = item['day']
if data_type == 'day':
key = day
elif data_type == 'hour':
key = convert_time(int(tp_start.replace('h', ''))) + '-' + convert_time(int(tp_end.replace('h', '')))
else:
key = day
item_cross_delay_info = item['data']
# 指标内容依次是 停车次数 多次停车率 停车时间 延误时间 平均速度 不停车速度 相对流量 路口失衡系数 潮汐指数 服务水平
overview_data = ['-', '-', '-', '-', '-', '-', '-', '-', '-', '-',
# 下方为上述指标变化率颜色展示flag
0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
tide_index = calc_tide_index(crossid, nodeid, [day], roads_dir_dict) if data_type == 'day' or data_type == 'week' else '-'
road_data_dict = {}
cross_car_num = 0
if item_cross_delay_info:
cross_car_num = item_cross_delay_info.delay_info.car_num
road_delay_infos = item_cross_delay_info.inroad_delay_infos
max_stop_times = max(road_delay_infos, key=lambda x: x.delay_info.stop_times).delay_info.stop_times
min_stop_times = min(road_delay_infos, key=lambda x: x.delay_info.stop_times).delay_info.stop_times
cross_imbalance_index = round((max_stop_times - min_stop_times) / item_cross_delay_info.delay_info.stop_times, 2) if item_cross_delay_info.delay_info.stop_times != 0 else 0
overview_data = [
# 停车次数
round(item_cross_delay_info.delay_info.stop_times, 2),
# 多次停车率 停车时间
item_cross_delay_info.delay_info.high_park_percent, item_cross_delay_info.delay_info.park_time,
# 延误时间 平均速度
item_cross_delay_info.delay_info.delay_time, item_cross_delay_info.delay_info.speed / 100,
# 不停车速度 相对流量占比
item_cross_delay_info.delay_info.move_speed / 100, round(cross_car_num / max_cross_car_num * 100, 2) if max_cross_car_num != 0 else 0,
# 路口失衡系数 潮汐指数(当为小时级指标时,不计算该值) 服务水平
cross_imbalance_index, tide_index[0], calc_service_level(item_cross_delay_info.delay_info.delay_time),
# 指标变化率颜色展示flag, 不含服务水平
0, 0, 0, 0, 0, 0, 0, 0, 0
]
inroad_delay_infos_with_imbalance = calc_inroad_imbalance_index(road_delay_infos)
road_data_dict = {item.inroadid: item for item in inroad_delay_infos_with_imbalance}
data_dict[key] = {
'overview': overview_data,
'roads_data': {}
}
src_dir_list = list(roads_dir_dict.keys())
for src_dir in src_dir_list:
if roads_dir_dict[src_dir]['in'] != '-' and roads_dir_dict[src_dir]['in'] in road_data_dict.keys():
road_car_num = road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.car_num
src_data = [
# 停车次数
round(road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.stop_times, 2),
# 多次停车率
road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.high_park_percent,
# 转向失衡指数
round(road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.imbalance_index, 2),
# 停车时间
road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.park_time,
# 延误时间
road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.delay_time,
# 平均速度
road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.speed / 100,
# 不停车速度
road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.move_speed / 100,
# 相对流量占比
round(road_car_num / max_road_car_num * 100, 2) if max_road_car_num > 0 else '-',
# 进口道流量占比
round(road_car_num / cross_car_num * 100, 2) if cross_car_num > 0 else '-',
# 服务水平
calc_service_level(road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.delay_time),
# 指标变化率颜色展示flag, 顺序为上述指标顺序, 不含服务水平
0, 0, 0, 0, 0, 0, 0, 0, 0
]
flow_delay_infos = road_data_dict[roads_dir_dict[src_dir]['in']].flow_delay_infos
flow_data_dict = {}
for flow_delay_info in flow_delay_infos:
turn_type = flow_delay_info.turn_type
if turn_type not in (0, 1):
continue
flow_data = [
# 停车次数
round(flow_delay_info.delay_info.stop_times, 2),
# 多次停车率
flow_delay_info.delay_info.high_park_percent,
# 停车时间
flow_delay_info.delay_info.park_time,
# 延误时间
flow_delay_info.delay_info.delay_time,
# 平均速度
flow_delay_info.delay_info.speed / 100,
# 不停车速度
flow_delay_info.delay_info.move_speed / 100,
# 相对流量
round(flow_delay_info.delay_info.car_num / max_flow_car_num * 100, 2) if max_flow_car_num > 0 else '-',
# 分流转向占比
round(flow_delay_info.delay_info.car_num / road_car_num * 100, 2) if road_car_num > 0 else '-',
# 服务水平
calc_service_level(flow_delay_info.delay_info.delay_time),
# 指标变化率颜色展示flag 不含服务水平
0, 0, 0, 0, 0, 0, 0, 0
]
flow_data_dict[turn_type] = flow_data
data_dict[key]['roads_data'][src_dir] = {
'road': src_data,
'flow': flow_data_dict
}
return data_dict
def calc_single_day_delay_info_change_rate(data_dict):
res_data_dict = copy.deepcopy(data_dict)
key_list = list(res_data_dict.keys())
empty_overview = ['-', '-', '-', '-', '-', '-', '-', '-', '-', '-',
# 指标变化率颜色展示flag
0, 0, 0, 0, 0, 0, 0, 0, 0]
for i in range(len(key_list)-1, -1, -1):
overview_data = res_data_dict[key_list[i]]['overview'] # list
roads_data = res_data_dict[key_list[i]]['roads_data'] # dict 内部包含多个方向路段和各个方向路段的流向级别的数据
if i != 0:
prev_item_index = i - 1
prev_overview_data = res_data_dict[key_list[prev_item_index]]['overview']
prev_roads_data = res_data_dict[key_list[prev_item_index]]['roads_data']
if overview_data != empty_overview and prev_overview_data != empty_overview:
overview_data_with_change_rate = calc_overview_change_rate(overview_data, prev_overview_data)
res_data_dict[key_list[i]]['overview'] = overview_data_with_change_rate
roads_data_wit_change_rate = calc_roads_data_change_rate(roads_data, prev_roads_data)
res_data_dict[key_list[i]]['roads_data'] = roads_data_wit_change_rate
return res_data_dict
def calc_overview_change_rate(overview_data, prev_overview_data):
res_data = copy.copy(overview_data)
for i in range(len(overview_data)):
if i >= 9:
continue
if overview_data[i] == '-' or prev_overview_data[i] == '-':
res_data[i + 10] = 0
# 变化率 0表示正常 1表示恶化 2表示优化
if i in (0, 1, 2, 3, 6, 7, 8):
# 指标提升代表恶化的有停车次数、多次停车率、转向失衡指数、停车时间、延误时间、相对流量、进口道流量占比
rate = (overview_data[i] - prev_overview_data[i]) / prev_overview_data[i] * 100 if prev_overview_data[i] != '-' and prev_overview_data[i] > 0 else 0
if rate < -20:
res_data[i + 10] = 1
elif rate > 20:
res_data[i + 10] = 2
else:
# 指标下降代表恶化的有平均速度、不停车速度
rate = (overview_data[i] - prev_overview_data[i]) / prev_overview_data[i] * 100 if prev_overview_data[i] != '-' and prev_overview_data[i] > 0 else 0
if rate > 20:
res_data[i + 10] = 1
elif rate < -20:
res_data[i + 10] = 2
return res_data
def calc_roads_data_change_rate(roads_data, prev_roads_data):
res_data = copy.deepcopy(roads_data)
src_dir_list = list(roads_data.keys())
for src_dir in src_dir_list:
src_dir_road_data = roads_data[src_dir]['road']
if src_dir not in prev_roads_data.keys():
continue
# 指标提升代表恶化的有停车次数、多次停车率、停车时间、延误时间、相对流量、分流转向占比
prev_src_dir_road_data = prev_roads_data[src_dir]['road']
for i in range(len(src_dir_road_data)):
if i >= 9:
continue
if src_dir_road_data[i] == '-' or prev_src_dir_road_data[i] == '-':
src_dir_road_data[i + 10] = 0
if i in (0, 1, 2, 3, 4, 7, 8):
rate = (src_dir_road_data[i] - prev_src_dir_road_data[i]) / prev_src_dir_road_data[i] * 100 if prev_src_dir_road_data[i] > 0 else 0
if rate < -20:
src_dir_road_data[i + 10] = 1
elif rate > 20:
src_dir_road_data[i + 10] = 2
else:
# 指标下降代表恶化的有平均速度、不停车速度
rate = (src_dir_road_data[i] - prev_src_dir_road_data[i]) / prev_src_dir_road_data[i] * 100 if prev_src_dir_road_data[i] > 0 else 0
if rate > 20:
src_dir_road_data[i + 10] = 1
elif rate < -20:
src_dir_road_data[i + 10] = 2
flow_datas = roads_data[src_dir]['flow']
prev_flow_datas = prev_roads_data[src_dir]['flow']
for turn_type in flow_datas.keys():
flow_data = flow_datas[turn_type]
if turn_type not in prev_flow_datas.keys():
continue
prev_flow_data = prev_flow_datas[turn_type]
for i in range(len(flow_data)):
if i >= 8:
continue
if flow_data[i] == '-' or prev_flow_data[i] == '-':
flow_data[i + 9] = 0
if i in (0, 1, 2, 3, 6, 7):
rate = (flow_data[i] - prev_flow_data[i]) / prev_flow_data[i] * 100 if prev_flow_data[i] > 0 else 0
if rate < -20:
flow_data[i + 9] = 1
elif rate > 20:
flow_data[i + 9] = 2
else:
rate = (flow_data[i] - prev_flow_data[i]) / prev_flow_data[i] * 100 if prev_flow_data[i] > 0 else 0
if rate > 20:
flow_data[i + 9] = 1
elif rate < -20:
flow_data[i + 9] = 2
return res_data
def parse_data2pb(data_list):
res_list = []
for row in data_list:
day = row['day']
tp_start = row['tp_start']
tp_end = row['tp_end']
item_cross_delay_info = pb.xl_cross_delayinfo_t()
item_cross_delay_info.ParseFromString(row['data'])
res_list.append({
'day': day,
'tp_start': tp_start,
'tp_end': tp_end,
'data': item_cross_delay_info
})
return res_list
def gen_cross_problems(avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict):
# 运行效率、均衡调控、配时方案、路口渠化
operating_efficiency_problems = gen_operating_efficiency_problems(avg_cross_delay_info, roads_dir_dict)
balanced_control_problems = gen_balanced_control_problems(avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict)
phase_problems = gen_phase_problems()
cross_channelized_problems = gen_cross_channelized_problems()
problems = [operating_efficiency_problems, balanced_control_problems, phase_problems, cross_channelized_problems]
return problems
def gen_operating_efficiency_problems(avg_cross_delay_info, roads_dir_dict):
if not avg_cross_delay_info:
return [{
'item': '运行效率',
'values': []
}]
road_delay_infos = avg_cross_delay_info.inroad_delay_infos
high_park_problems, high_park_suggestions = gen_high_park_problems(road_delay_infos, roads_dir_dict)
operating_efficiency_problems = {
'item': '运行效率',
'values': [
{
'item': '多次排队',
'detail': high_park_problems,
'reason': '某一进口道转向的多次停车率大于15%',
'suggestions': high_park_suggestions
},
]
}
pass
def gen_high_park_problems(road_delay_infos, roads_dir_dict):
detail = [{
'text': '未见异常'
}]
suggestion = []
err_src_dict = {}
src_list = list(roads_dir_dict.keys())
road_delays_dict = {item.inroadid: item for item in road_delay_infos}
for src_dir in src_list:
if roads_dir_dict[src_dir]['in'] == '-' or roads_dir_dict[src_dir]['in'] not in road_delays_dict.keys():
continue
road_info = road_delays_dict[roads_dir_dict[src_dir]['in']]
flow_delay_infos = road_info.flow_delay_infos
turn_type_flow_delay_info_dict = {item.turn_type: item for item in flow_delay_infos}
for turn_type in turn_type_flow_delay_info_dict.keys():
flow_delay_info = turn_type_flow_delay_info_dict[turn_type]
if turn_type not in (0, 1):
continue
if flow_delay_info.delay_info.high_park_percent > 15:
if src_dir not in err_src_dict.keys():
err_src_dict[src_dir] = [str(turn_type) + ':' + str(flow_delay_info.delay_info.high_park_percent)]
else:
err_src_dict[src_dir].append(str(turn_type) + ':' + str(flow_delay_info.delay_info.high_park_percent))
if err_src_dict:
detail = []
for src_dir in err_src_dict.keys():
src_detail = []
for turn_type in err_src_dict[src_dir]:
flow_detail = {
'turn_type': int_turn_type2str[int(turn_type.split(':')[0])],
'text': '车辆多次停车率过大(',
'percent': turn_type.split(':')[1] + '%'
}
src_detail.append(flow_detail)
src_detail.append({
'text': '车辆需要多次排队才能通过'
})
detail.append({
'src_dir': dir_str_dict[src_dir] + '进口',
'src_detail': src_detail
})
# todo 补充生成建议的逻辑
return detail, suggestion
def gen_high_stop_time_problems(road_delay_infos, roads_dir_dict):
detail = [{
'text': '未见异常'
}]
suggestion = []
# todo 需与产品再次确认逻辑
pass
def gen_balanced_control_problems(avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict):
road_delay_infos = avg_cross_delay_info.inroad_delay_infos
cross_imbalance_detail, cross_imbalance_suggestions = gen_cross_imbalance_problems(road_delay_infos, roads_dir_dict)
turn_imbalance_detail, turn_imbalance_suggestions = gen_turn_imbalance_problems(road_delay_infos, roads_dir_dict, inroad_static_info_dict)
# todo 需要与产品确认路口潮汐问题诊断的逻辑
balanced_control_problems = {
'item': '均衡调控',
'values': [
{
'item': '路口失衡',
'detail': cross_imbalance_detail,
'reason': '路口存在某个流向绿灯时长不足而另一个方向绿灯存在空放的现象',
'suggestions': cross_imbalance_suggestions
},
{
'item': '转向失衡',
'detail': turn_imbalance_detail,
'reason': '同一进口道直行与左转停车次数之差的绝对值大于0.5且转向停车次数的最大值大于1',
'suggestions': turn_imbalance_suggestions
},
]
}
return balanced_control_problems
# 路口失衡问题诊断
def gen_cross_imbalance_problems(road_delay_infos, roads_dir_dict):
detail = [{
'text': '未见异常'
}]
suggestion = []
road_src_dict = {item['in']: item for item in roads_dir_dict}
max_stop_times_road = max(road_delay_infos, key=lambda x: x.delay_info.stop_times)
min_stop_times_road = min(road_delay_infos, key=lambda x: x.delay_info.stop_times)
if max_stop_times_road.delay_info.stop_times > 1 \
and max_stop_times_road.delay_info.stop_times - min_stop_times_road.delay_info.stop_times > 0.5:
max_roadid = max_stop_times_road.inroadid
min_roadid = min_stop_times_road.inroadid
max_src = road_src_dict[max_roadid]
min_src = road_src_dict[min_roadid]
detail = [
{
'src_dir': dir_str_dict[max_src] + '进口',
'text': '的停车次数(' + str(round(max_stop_times_road.delay_info.stop_times, 2)) + ')与'
},
{
'src_dir': dir_str_dict[min_src] + '进口',
'text': f"""的停车次数({str(round(max_stop_times_road.delay_info.stop_times, 2))})相差过大,两者之比为{int(round(max_stop_times_road.delay_info.stop_times, 2) / round(min_stop_times_road.delay_info.stop_times, 2) * 100)}%,分配的绿灯时长不匹配"""
}
]
suggestion = [
{
'text': '调整',
'max_src_dir': dir_str_dict[max_src] + '进口',
},
{
'text': '',
'min_src_dir': dir_str_dict[min_src] + '进口',
},
{
'text': '的绿灯时长的分配情况'
}
]
if max_src == src_reverse[min_src]:
# todo 补充对向情况下额外建议的情况
pass
return detail, suggestion
# 转向失衡问题诊断
def gen_turn_imbalance_problems(road_delay_infos, roads_dir_dict, inroad_static_info_dict):
detail = [{
'text': '未见异常'
}]
suggestion = []
err_road_dict = {}
road_src_dict = {item['in']: item for item in roads_dir_dict}
for road_delay_info in road_delay_infos:
inroadid = road_delay_info.inroadid
flow_delay_infos = road_delay_info.flow_delay_infos
turn_type_flow_delay_info_dict = {item.turn_type: item for item in flow_delay_infos}
lane_num_info = count_lsr(inroad_static_info_dict[inroadid]['lane_turn_info']) if inroadid in inroad_static_info_dict.keys() else None
if 0 in turn_type_flow_delay_info_dict.keys() and 1 in turn_type_flow_delay_info_dict.keys():
s_stop_times = turn_type_flow_delay_info_dict[0].delay_info.stop_times
l_stop_times = turn_type_flow_delay_info_dict[1].delay_info.stop_times
if s_stop_times > 1 or l_stop_times > 1 and abs(l_stop_times - s_stop_times) > 0.5:
if s_stop_times > l_stop_times:
err_road_dict[road_src_dict[inroadid]] = ['直行' + ':' + str(round(s_stop_times, 2)), '左转' + ':' + str(round(l_stop_times, 2)), lane_num_info]
else:
err_road_dict[road_src_dict[inroadid]] = ['左转' + ':' + str(round(s_stop_times, 2)), '直行' + ':' + str(round(l_stop_times, 2)), lane_num_info]
if len(err_road_dict.keys()) > 0:
detail, suggestion, road_num_suggestion = [], [], []
for src_dir in err_road_dict.keys():
item_detail = [
{
'src_dir': dir_str_dict[src_dir] + '进口',
'child_detail': [
{
'turn_type': err_road_dict[src_dir][0].split(':')[0],
},
{
'text': '的停车次数(' + str(err_road_dict[src_dir][0].split(':')[1]) + ')与'
},
{
'turn_type': err_road_dict[src_dir][1].split(':')[0],
},
{
'text': '的停车次数(' + str(err_road_dict[src_dir][1].split(':')[1]) + ')相差过大,两者之比为' + str(int(float(err_road_dict[src_dir][0].split(':')[1]) / float(err_road_dict[src_dir][1].split(':')[1]) * 100)) + '%,分配的绿灯时长不匹配'
}
]
}
]
detail.append(item_detail)
item_suggestion = [
{
'src_dir': dir_str_dict[src_dir] + '进口',
'text': '信号灯直左分控,调整执行和左转车辆绿灯时长分配情况'
},
# todo 补充配时相关建议
]
suggestion.extend(item_suggestion)
if err_road_dict[src_dir][2]:
road_num_suggestion.append(
{
'src_dir': dir_str_dict[src_dir] + '进口',
'text': '左转/直行/右转现有车道数分别为' + err_road_dict[src_dir][2]
}
)
lane_num_suggestion = [
{
'text': '调整',
'src_dir': ''.join([dir_str_dict[item] + '进口' for item in err_road_dict.keys()])
},
{
'text': '车道分配情况'
}
]
lane_num_suggestion.extend(road_num_suggestion)
suggestion = suggestion + lane_num_suggestion
return detail, suggestion
def gen_phase_problems():
pass
def gen_cross_channelized_problems():
pass
def query_cross_phase(nodeid, crossid):
cross_phases, err = QueryCrossRunningPhase(nodeid, [crossid])
if err or not cross_phases or cross_phases.code != 0:
return None
cross_phase_info = cross_phases.data[0]
pass

View File

@ -88,4 +88,8 @@ class TmnetDbHelper(TableDbHelperBase):
def query_base_info(self, nodeid, area_id):
sql = "select * from ledger.leger_base_info where nodeid= %d and area_id = %s" % (int(nodeid), int(area_id))
return self.do_select(sql)
def query_city_tp_info(self, nodeid, area_id):
sql = f"select tp_desc from cross_doctor_config.area_tp_config where nodeid = {nodeid} and area_id = {area_id}"
return self.do_select(sql)

View File

@ -40,7 +40,7 @@ def init():
def test_get_cross_delay_data():
row_list = db_cross.query_cross_delay_info('CR_11987179_2632645', 350100, '20251013', 't830', '1200')
row_list = db_cross.query_cross_delay_info('CR_11987179_2632645', 350100, '20251013', 't830')
data = row_list[0]['data']
cross_delay = pb.xl_cross_delayinfo_t()
cross_delay.ParseFromString(data)