cross_doctor/app/cross_evaluate_worker.py

372 lines
18 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2025/10/10 14:33
# @Description:
import json
from app.common_worker import *
from app.eva_common import *
# 查询可用路口列表
from proto.phase_grpc import QueryCrossRunningPhase
def query_cross_list(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(1, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(2, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(3, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(4, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(5, '用户信息异常'))
# 查询路口列表
cross_list = db_tmnet.query_cross_list_sql(nodeid, area_id)
tp_desc = db_tmnet.query_city_tp_info(nodeid, area_id)
if not tp_desc:
tp_info = [
"00:00-07:00",
"07:00-09:00",
"09:00-17:00",
"17:00-19:00",
"19:00-22:00",
"22:00-00:00"
]
peak_tp = [
"07:00-09:00",
"17:00-19:00"
]
else:
tp_info = tp_desc[0]['tp_desc'].split(',')
peak_tp = tp_desc[0]['peak_tp'].split(',')
res = make_common_res(0, 'ok')
res['data'] = {
'cross_list': cross_list,
'tp_info': tp_info,
'peak_tp': peak_tp
}
return json.dumps(res, ensure_ascii=False)
# 查询路口可用时段
def query_cross_usable_date(params):
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(1, '缺少crossid 请刷新后重试'))
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(6, '用户信息异常'))
row_list = db_cross.query_cross_usable_date_sql(crossid, nodeid)
date_list = [row['day'] for row in row_list]
res = make_common_res(0, 'ok')
res['data'] = date_list
return json.dumps(res, ensure_ascii=False)
# 获取路口延迟信息
def query_cross_delay_info_controller(params):
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(1, '缺少crossid 请刷新后重试'))
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(6, '用户信息异常'))
date_list = check_param(params, 'date_list')
if not date_list or len(date_list) < 1:
return json.dumps(make_common_res(7, '缺少日期参数,请最少选择一天作为查询日期'))
query_type = check_param(params, 'query_type')
if not query_type:
query_type = 0
time_range = check_param(params, 'time_range')
if not time_range:
return json.dumps(make_common_res(8, '缺少时段范围,请选择时段范围'))
tp_start = int(str(time_range.split('-')[0]).split(':')[0]) * 100 + int(str(time_range.split('-')[0]).split(':')[1])
# tp_end = int(str(time_range.split('-')[1]).split(':')[0]) * 100 + int(str(time_range.split('-')[1]).split(':')[1])
# if tp_end == 0:
# tp_end = 2400
if query_type == 1:
tp_start = 't' + str(tp_start)
elif query_type == 2:
tp_start = 'h' + str(tp_start)
cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, tp_start)
avg_cross_delay_info = gen_avg_cross_delay_pb(cross_delay_data_list)
if not avg_cross_delay_info:
return json.dumps(make_common_res(9, '当前所选日期范围内该评测时段无可用数据'))
# print(MessageToJson(
# avg_cross_delay_info,
# always_print_fields_with_no_presence=True, # 输出所有默认值
# preserving_proto_field_name=True # 保持原始字段名(不用驼峰)
# ))
cross_inroads = db_tmnet.query_cross_inroads(crossid, nodeid)
inroad_static_info_dict = {item['roadid']: item for item in cross_inroads}
# 路口静态信息及台账信息
cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid)
if not cross_ledger_info_dict:
return json.dumps(make_common_res(10, '查询路口信息失败'))
cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict)
roads_dir_dict = gen_road_dir_dict(cross_ledger_info)
# 路口指标数据概览
overview_res = gen_overview_index(avg_cross_delay_info, inroad_static_info_dict, nodeid, date_list, roads_dir_dict)
# 路段及流向数据概览
road_flow_delay_infos = gen_road_delay_index(avg_cross_delay_info, roads_dir_dict)
# 路段进出口方向分流比
road_flow_turn_rate = gen_flow_turn_rate_index(avg_cross_delay_info, roads_dir_dict)
next_cross_info = get_next_cross(nodeid, area_id, roads_dir_dict)
res = make_common_res(0, 'ok')
res['data'] = {
'overview': overview_res,
'road_flow_delay_infos': road_flow_delay_infos,
'road_flow_turn_rate': road_flow_turn_rate,
'cross_static_info': cross_static_info,
'ledger_info': cross_ledger_info,
'next_cross_info': next_cross_info
}
return json.dumps(res, ensure_ascii=False)
# 问题诊断接口
def query_cross_problems(params):
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(1, '缺少crossid 请刷新后重试'))
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(6, '用户信息异常'))
date_list = check_param(params, 'date_list')
if not date_list or len(date_list) < 1:
return json.dumps(make_common_res(7, '缺少日期参数,请最少选择一天作为查询日期'))
query_type = check_param(params, 'query_type')
if not query_type:
query_type = 0
time_range = check_param(params, 'time_range')
if not time_range:
return json.dumps(make_common_res(8, '缺少时段范围,请选择时段范围'))
tp_start = int(str(time_range.split('-')[0]).split(':')[0]) * 100 + int(str(time_range.split('-')[0]).split(':')[1])
if query_type == 1:
tp_start = 't' + str(tp_start)
elif query_type == 2:
tp_start = 'h' + str(tp_start)
cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, tp_start)
avg_cross_delay_info = gen_avg_cross_delay_pb(cross_delay_data_list)
if not avg_cross_delay_info:
return json.dumps(make_common_res(9, '当前所选日期范围内该评测时段无可用数据'))
cross_inroads = db_tmnet.query_cross_inroads(crossid, nodeid)
inroad_static_info_dict = {item['roadid']: item for item in cross_inroads}
# 验证是否为高峰时段
is_peak = check_is_peak_tp(time_range, area_id, nodeid)
# 路口静态信息及台账信息
cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid)
if not cross_ledger_info_dict:
return json.dumps(make_common_res(10, '查询路口信息失败'))
cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict)
roads_dir_dict = gen_road_dir_dict(cross_ledger_info)
cross_phase, err = QueryCrossRunningPhase(int(nodeid), [crossid], [str(item) for item in date_list], time_range)
if err or not cross_phase or cross_phase.code != 0:
logging.warning("路口未录入配时方案")
problems = gen_cross_problems(crossid, nodeid, area_id, time_range, tp_start.replace('h', '').replace('t', ''), date_list, avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict, cross_phase, is_peak, cross_ledger_info)
res = make_common_res(0, 'ok')
res['data'] = problems
return json.dumps(res, ensure_ascii=False)
# 指标变化趋势接口
def query_cross_index_trend_controller(params):
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(1, '缺少crossid 请刷新后重试'))
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(6, '用户信息异常'))
query_date = check_param(params, 'query_date')
if not query_date:
return json.dumps(make_common_res(7, '缺少查询日期,请选择查询日期'))
time_range = check_param(params, 'time_range')
if not time_range:
return json.dumps(make_common_res(8, '缺少时段范围,请选择时段范围'))
query_type= check_param(params, 'query_type')
if not query_type:
query_type = 0
tp_start = int(str(time_range.split('-')[0]).split(':')[0]) * 100 + int(str(time_range.split('-')[0]).split(':')[1])
tp_end = int(str(time_range.split('-')[1]).split(':')[0]) * 100 + int(str(time_range.split('-')[1]).split(':')[1])
if query_type == 1:
tp_start = 't' + str(tp_start)
elif query_type == 2:
tp_start = 'h' + str(tp_start)
prev_date = (datetime.strptime(query_date, '%Y%m%d') - timedelta(days=1)).strftime('%Y%m%d')
month_ago_date = (datetime.now().date() - timedelta(days=30)).strftime('%Y%m%d')
now_prev_date = (datetime.now().date() - timedelta(days=1)).strftime('%Y%m%d')
month_date_list = generate_date_range(month_ago_date, now_prev_date)
ten_weeks_date_list = gen_ten_weeks_ago_data_list()
# 查询台账信息 获取路网渠化关系
cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid)
if not cross_ledger_info_dict:
return json.dumps(make_common_res(9, '查询路口信息失败'))
cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict)
roads_dir_dict = gen_road_dir_dict(cross_ledger_info)
# 查询前一天的小时级别数据
2025-11-03 15:52:58 +08:00
hours_data = db_cross.query_cross_delay_whole_day_hours(crossid, nodeid, query_date)
hour_pb_list = parse_data2pb(hours_data)
hours_data_dict = parse_single_cross_delay_info(crossid, nodeid, hour_pb_list, 'hour', roads_dir_dict)
hours_data_with_change_rate = calc_single_day_delay_info_change_rate(hours_data_dict)
# 查询近30天的数据
days_data = db_cross.query_cross_delay_info(crossid, nodeid, month_date_list, tp_start)
days_pb_list = parse_data2pb(days_data)
days_data_dict = parse_single_cross_delay_info(crossid, nodeid, days_pb_list, 'day', roads_dir_dict)
days_data_with_change_rate = calc_single_day_delay_info_change_rate(days_data_dict)
# 查询近10周的数据 需要聚合 可能不能使用下述方式查询
ten_week_datas = []
for week_dates in ten_weeks_date_list:
weeks_data = db_cross.query_cross_delay_info(crossid, nodeid, week_dates, tp_start)
week_cross_delay_info = gen_avg_cross_delay_pb(weeks_data)
ten_week_datas.append({
'day': week_dates[0] + '-' + week_dates[-1],
'tp_start': tp_start,
'tp_end': tp_end,
'data': week_cross_delay_info
})
weeks_data_dict = parse_single_cross_delay_info(crossid, nodeid, ten_week_datas, 'week', roads_dir_dict)
weeks_data_with_change_rate = calc_single_day_delay_info_change_rate(weeks_data_dict)
res = make_common_res(0, 'ok')
res['data'] = {
'hours_data': hours_data_with_change_rate,
'days_data': days_data_with_change_rate,
'weeks_data': weeks_data_with_change_rate
}
return json.dumps(res, ensure_ascii=False)
def query_cross_examine_records_detail(params):
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(1, '缺少crossid 请刷新后重试'))
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(6, '用户信息异常'))
date_list = check_param(params, 'date_list')
if not date_list:
return json.dumps(make_common_res(7, '缺少查询日期,请选择查询日期'))
time_range = check_param(params, 'time_range')
if not time_range:
return json.dumps(make_common_res(8, '缺少时段范围,请选择时段范围'))
max_date = max(date_list)
min_date = min(date_list)
tp_start = int(str(time_range.split('-')[0]).split(':')[0]) * 100 + int(str(time_range.split('-')[0]).split(':')[1])
cross_examine_records = db_cross.query_cross_examine_records(tp_start, max_date, crossid, min_date)
prev_day = (datetime.now() - timedelta(days=1)).strftime('%Y%m%d')
if not cross_examine_records:
return json.dumps(make_common_res(9, '当前路口在所选时段无异常情况'))
days_records = {}
dir_str_dict = {
'E': '', 'S': '', 'W': '西', 'N': '', 'NE': '东北', 'SE': '东南', 'SW': '西南', 'NW': '西北'
}
for row in cross_examine_records:
final_state = row['final_state']
level_color = row['level_color']
first_date = row['first_date']
if final_state == 6 or level_color == 4:
continue
date_list = generate_date_range(first_date, prev_day)
phase_types = row['phase_type']
phase_details = row['phase_detail']
phase_type_list = phase_types.split('^')
phase_detail_list = phase_details.split('^')
for i in range(len(date_list)):
date = date_list[i]
phase_type = phase_type_list[i]
phase_detail = phase_detail_list[i]
item_phase_type_list = phase_type.split(';')
item_phase_detail_list = phase_detail.split(';')
phase_type_dir_dict, dir_phase_detail_dict = {}, {}
for item in item_phase_detail_list:
src_dir_str = item.split(':')[0]
clean_key = re.sub(r'正|\(\d+\)', '', src_dir_str)
dir_phase_detail_dict[clean_key] = item
for item in item_phase_type_list:
if item == '':
continue
src_dir = item.split(':')[0]
item_type = item.split(':')[1]
item_detail = dir_phase_detail_dict[dir_str_dict[src_dir]]
if int(item_type) not in phase_type_dir_dict.keys():
phase_type_dir_dict[int(item_type)] = [item_detail]
else:
phase_type_dir_dict[int(item_type)].append(item_detail)
days_records[date] = phase_type_dir_dict
res = make_common_res(0, 'ok')
res['data'] = days_records
return json.dumps(res, ensure_ascii=False)