cross_doctor/app/cross_evaluate_worker.py

296 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2025/10/10 14:33
# @Description:
import json
from app.common_worker import *
from app.eva_common import *
# 查询可用路口列表
from proto.phase_grpc import QueryCrossRunningPhase
def query_cross_list(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(1, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(2, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(3, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(4, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(5, '用户信息异常'))
# 查询路口列表
cross_list = db_tmnet.query_cross_list_sql(nodeid, area_id)
tp_desc = db_tmnet.query_city_tp_info(nodeid, area_id)
if not tp_desc:
tp_info = [
"00:00-07:00",
"07:00-09:00",
"09:00-12:00",
"12:00-14:00",
"14:00-17:00",
"17:00-19:00",
"19:00-22:00",
"22:00-00:00"
]
peak_tp = [
"07:00-09:00",
"17:00-19:00"
]
else:
tp_info = tp_desc[0]['tp_desc'].split(',')
peak_tp = tp_desc[0]['peak_tp'].split(',')
res = make_common_res(0, 'ok')
res['data'] = {
'cross_list': cross_list,
'tp_info': tp_info,
'peak_tp': peak_tp
}
return json.dumps(res, ensure_ascii=False)
# 查询路口可用时段
def query_cross_usable_date(params):
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(1, '缺少crossid 请刷新后重试'))
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(6, '用户信息异常'))
row_list = db_cross.query_cross_usable_date_sql(crossid, nodeid)
date_list = [row['day'] for row in row_list]
res = make_common_res(0, 'ok')
res['data'] = date_list
return json.dumps(res, ensure_ascii=False)
# 获取路口延迟信息
def query_cross_delay_info_controller(params):
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(1, '缺少crossid 请刷新后重试'))
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(6, '用户信息异常'))
date_list = check_param(params, 'date_list')
if not date_list or len(date_list) < 1:
return json.dumps(make_common_res(7, '缺少日期参数,请最少选择一天作为查询日期'))
query_type = check_param(params, 'query_type')
if not query_type:
query_type = 0
time_range = check_param(params, 'time_range')
if not time_range:
return json.dumps(make_common_res(8, '缺少时段范围,请选择时段范围'))
tp_start = int(str(time_range.split('-')[0]).split(':')[0]) * 100 + int(str(time_range.split('-')[0]).split(':')[1])
# tp_end = int(str(time_range.split('-')[1]).split(':')[0]) * 100 + int(str(time_range.split('-')[1]).split(':')[1])
# if tp_end == 0:
# tp_end = 2400
if query_type == 1:
tp_start = 't' + str(tp_start)
elif query_type == 2:
tp_start = 'h' + str(tp_start)
cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, tp_start)
avg_cross_delay_info = gen_avg_cross_delay_pb(cross_delay_data_list)
if not avg_cross_delay_info:
return json.dumps(make_common_res(9, '当前所选日期范围内该评测时段无可用数据'))
cross_inroads = db_tmnet.query_cross_inroads(crossid, nodeid)
inroad_static_info_dict = {item['roadid']: item for item in cross_inroads}
# 路口静态信息及台账信息
cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid)
if not cross_ledger_info_dict:
return json.dumps(make_common_res(10, '查询路口信息失败'))
cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict)
roads_dir_dict = gen_road_dir_dict(cross_ledger_info)
# 路口指标数据概览
overview_res = gen_overview_index(avg_cross_delay_info, inroad_static_info_dict, nodeid, date_list, roads_dir_dict)
# 路段及流向数据概览
road_flow_delay_infos = gen_road_delay_index(avg_cross_delay_info, roads_dir_dict)
# 路段进出口方向分流比
road_flow_turn_rate = gen_flow_turn_rate_index(avg_cross_delay_info, roads_dir_dict)
res = make_common_res(0, 'ok')
res['data'] = {
'overview': overview_res,
'road_flow_delay_infos': road_flow_delay_infos,
'road_flow_turn_rate': road_flow_turn_rate,
'cross_static_info': cross_static_info,
'ledger_info': cross_ledger_info
}
return json.dumps(res, ensure_ascii=False)
# 问题诊断接口
def query_cross_problems(params):
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(1, '缺少crossid 请刷新后重试'))
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(6, '用户信息异常'))
date_list = check_param(params, 'date_list')
if not date_list or len(date_list) < 1:
return json.dumps(make_common_res(7, '缺少日期参数,请最少选择一天作为查询日期'))
query_type = check_param(params, 'query_type')
if not query_type:
query_type = 0
time_range = check_param(params, 'time_range')
if not time_range:
return json.dumps(make_common_res(8, '缺少时段范围,请选择时段范围'))
tp_start = int(str(time_range.split('-')[0]).split(':')[0]) * 100 + int(str(time_range.split('-')[0]).split(':')[1])
# tp_end = int(str(time_range.split('-')[1]).split(':')[0]) * 100 + int(str(time_range.split('-')[1]).split(':')[1])
# if tp_end == 0:
# tp_end = 2400
if query_type == 1:
tp_start = 't' + str(tp_start)
elif query_type == 2:
tp_start = 'h' + str(tp_start)
cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, tp_start)
avg_cross_delay_info = gen_avg_cross_delay_pb(cross_delay_data_list)
if not avg_cross_delay_info:
return json.dumps(make_common_res(9, '当前所选日期范围内该评测时段无可用数据'))
cross_inroads = db_tmnet.query_cross_inroads(crossid, nodeid)
inroad_static_info_dict = {item['roadid']: item for item in cross_inroads}
# 验证是否为高峰时段
is_peak = check_is_peak_tp(time_range, area_id, nodeid)
# 路口静态信息及台账信息
cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid)
if not cross_ledger_info_dict:
return json.dumps(make_common_res(10, '查询路口信息失败'))
cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict)
roads_dir_dict = gen_road_dir_dict(cross_ledger_info)
cross_phase, err = QueryCrossRunningPhase(nodeid, [crossid], date_list, time_range)
if err or not cross_phase or cross_phase.code != 0:
logging.warning("路口未录入配时方案")
gen_cross_problems(crossid, nodeid, date_list, avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict, cross_phase, is_peak, cross_ledger_info)
# 指标变化趋势接口
def query_cross_index_trend_controller(params):
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(1, '缺少crossid 请刷新后重试'))
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_user_areas(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = [int(row['area_id']) for row in area_list]
if int(area_id) not in area_list:
return json.dumps(make_common_res(6, '用户信息异常'))
query_date = check_param(params, 'query_date')
if not query_date:
return json.dumps(make_common_res(7, '缺少查询日期,请选择查询日期'))
time_range = check_param(params, 'time_range')
if not time_range:
return json.dumps(make_common_res(8, '缺少时段范围,请选择时段范围'))
query_type= check_param(params, 'query_type')
if not query_type:
query_type = 0
tp_start = int(str(time_range.split('-')[0]).split(':')[0]) * 100 + int(str(time_range.split('-')[0]).split(':')[1])
tp_end = int(str(time_range.split('-')[1]).split(':')[0]) * 100 + int(str(time_range.split('-')[1]).split(':')[1])
if query_type == 1:
tp_start = 't' + str(tp_start)
elif query_type == 2:
tp_start = 'h' + str(tp_start)
prev_date = (datetime.strptime(query_date, '%Y%m%d') - timedelta(days=1)).strftime('%Y%m%d')
month_ago_date = (datetime.now().date() - timedelta(days=30)).strftime('%Y%m%d')
month_date_list = generate_date_range(month_ago_date, query_date)
# ten_week_ago_date = (datetime.now().date() - timedelta(days=70)).strftime('%Y%m%d')
# week_date_list = generate_date_range(ten_week_ago_date, query_date)
ten_weeks_date_list = gen_ten_weeks_ago_data_list()
# 查询台账信息 获取路网渠化关系
cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid)
if not cross_ledger_info_dict:
return json.dumps(make_common_res(9, '查询路口信息失败'))
cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict)
roads_dir_dict = gen_road_dir_dict(cross_ledger_info)
# 查询前一天的小时级别数据
hours_data = db_cross.query_cross_delay_whole_day_hours(crossid, nodeid, prev_date)
hour_pb_list = parse_data2pb(hours_data)
hours_data_dict = parse_single_cross_delay_info(crossid, nodeid, hour_pb_list, 'hour', roads_dir_dict)
hours_data_with_change_rate = calc_single_day_delay_info_change_rate(hours_data_dict)
# 查询近30天的数据
days_data = db_cross.query_cross_delay_info(crossid, nodeid, month_date_list, tp_start)
days_pb_list = parse_data2pb(days_data)
days_data_dict = parse_single_cross_delay_info(crossid, nodeid, days_pb_list, 'day', roads_dir_dict)
days_data_with_change_rate = calc_single_day_delay_info_change_rate(days_data_dict)
# 查询近10周的数据 需要聚合 可能不能使用下述方式查询
ten_week_datas = []
for week_dates in ten_weeks_date_list:
weeks_data = db_cross.query_cross_delay_info(crossid, nodeid, week_dates, tp_start)
week_cross_delay_info = gen_avg_cross_delay_pb(weeks_data)
ten_week_datas.append({
'day': week_dates[0] + '-' + week_dates[-1],
'tp_start': tp_start,
'tp_end': tp_end,
'data': week_cross_delay_info
})
weeks_data_dict = parse_single_cross_delay_info(crossid, nodeid, ten_week_datas, 'week', roads_dir_dict)
weeks_data_with_change_rate = calc_single_day_delay_info_change_rate(weeks_data_dict)
res = make_common_res(0, 'ok')
res['data'] = {
'hours_data': hours_data_with_change_rate,
'days_data': days_data_with_change_rate,
'weeks_data': weeks_data_with_change_rate
}
return json.dumps(res, ensure_ascii=False)