cross_doctor/app/cross_evaluate_worker.py

451 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2025/10/10 14:33
# @Description:
import io
import json
from io import BytesIO
from app.common_worker import *
from app.eva_common import *
from openpyxl import Workbook
from openpyxl.styles import Alignment
# 查询可用路口列表
from proto.phase_grpc import QueryCrossRunningPhase
from flask import send_file
def query_cross_list(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(1, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(2, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(3, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(4, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
# 查询路口列表
cross_list = db_tmnet.query_cross_list_sql(nodeid, area_id)
tp_desc = db_tmnet.query_city_tp_info(nodeid, area_id)
if not tp_desc:
tp_info = [
"00:00-07:00",
"07:00-09:00",
"09:00-17:00",
"17:00-19:00",
"19:00-22:00",
"22:00-23:59"
]
peak_tp = [
"07:00-09:00",
"17:00-19:00"
]
else:
tp_info = tp_desc[0]['tp_desc'].split(',')
peak_tp = tp_desc[0]['peak_tp'].split(',')
res = make_common_res(0, 'ok')
res['data'] = {
'cross_list': cross_list,
'tp_info': tp_info,
'peak_tp': peak_tp
}
return json.dumps(res, ensure_ascii=False)
# 查询路口可用时段
def query_cross_usable_date(params):
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(1, '缺少crossid 请刷新后重试'))
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
row_list = db_cross.query_cross_usable_date_sql(crossid, nodeid)
date_list = [row['day'] for row in row_list]
res = make_common_res(0, 'ok')
res['data'] = date_list
return json.dumps(res, ensure_ascii=False)
# 获取路口延迟信息
def query_cross_delay_info_controller(params):
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(1, '缺少crossid 请刷新后重试'))
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
date_list = check_param(params, 'date_list')
if not date_list or len(date_list) < 1:
return json.dumps(make_common_res(7, '缺少日期参数,请最少选择一天作为查询日期'))
query_type = check_param(params, 'query_type')
if not query_type:
query_type = 0
time_range = check_param(params, 'time_range')
if not time_range:
return json.dumps(make_common_res(8, '缺少时段范围,请选择时段范围'))
tp_start = int(str(time_range.split('-')[0]).split(':')[0]) * 100 + int(str(time_range.split('-')[0]).split(':')[1])
# tp_end = int(str(time_range.split('-')[1]).split(':')[0]) * 100 + int(str(time_range.split('-')[1]).split(':')[1])
# if tp_end == 0:
# tp_end = 2400
if query_type == 1:
tp_start = 't' + str(tp_start)
elif query_type == 2:
tp_start = 'h' + str(tp_start)
cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, tp_start)
avg_cross_delay_info = gen_avg_cross_delay_pb(cross_delay_data_list)
if not avg_cross_delay_info:
return json.dumps(make_common_res(9, '当前所选日期范围内该评测时段无可用数据'))
# print(MessageToJson(
# avg_cross_delay_info,
# always_print_fields_with_no_presence=True, # 输出所有默认值
# preserving_proto_field_name=True # 保持原始字段名(不用驼峰)
# ))
cross_inroads = db_tmnet.query_cross_inroads(crossid, nodeid)
inroad_static_info_dict = {item['roadid']: item for item in cross_inroads}
# 路口静态信息及台账信息
cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid)
if not cross_ledger_info_dict:
return json.dumps(make_common_res(10, '查询路口信息失败'))
cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict)
roads_dir_dict = gen_road_dir_dict(cross_ledger_info)
# 路口指标数据概览
overview_res = gen_overview_index(avg_cross_delay_info, inroad_static_info_dict, nodeid, date_list, roads_dir_dict)
# 路段及流向数据概览
road_flow_delay_infos = gen_road_delay_index(avg_cross_delay_info, roads_dir_dict)
# 路段进出口方向分流比
road_flow_turn_rate = gen_flow_turn_rate_index(avg_cross_delay_info, roads_dir_dict)
prev_cross_info = get_prev_cross(nodeid, area_id, roads_dir_dict)
res = make_common_res(0, 'ok')
res['data'] = {
'overview': overview_res,
'road_flow_delay_infos': road_flow_delay_infos,
'road_flow_turn_rate': road_flow_turn_rate,
'cross_static_info': cross_static_info,
'ledger_info': cross_ledger_info,
'next_cross_info': prev_cross_info
}
excel = check_param(params, 'excel')
if not excel:
excel = 0
excel = int(excel)
if excel == 1:
return query_cross_delay_info_controller_export_excel(road_flow_delay_infos, road_flow_turn_rate)
return json.dumps(res, ensure_ascii=False)
# 问题诊断接口
def query_cross_problems(params):
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(1, '缺少crossid 请刷新后重试'))
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
date_list = check_param(params, 'date_list')
if not date_list or len(date_list) < 1:
return json.dumps(make_common_res(7, '缺少日期参数,请最少选择一天作为查询日期'))
query_type = check_param(params, 'query_type')
if not query_type:
query_type = 0
time_range = check_param(params, 'time_range')
if not time_range:
return json.dumps(make_common_res(8, '缺少时段范围,请选择时段范围'))
tp_start = int(str(time_range.split('-')[0]).split(':')[0]) * 100 + int(str(time_range.split('-')[0]).split(':')[1])
if query_type == 1:
tp_start = 't' + str(tp_start)
elif query_type == 2:
tp_start = 'h' + str(tp_start)
cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, tp_start)
avg_cross_delay_info = gen_avg_cross_delay_pb(cross_delay_data_list)
if not avg_cross_delay_info:
return json.dumps(make_common_res(9, '当前所选日期范围内该评测时段无可用数据'))
cross_inroads = db_tmnet.query_cross_inroads(crossid, nodeid)
inroad_static_info_dict = {item['roadid']: item for item in cross_inroads}
# 验证是否为高峰时段
is_peak = check_is_peak_tp(time_range, area_id, nodeid)
# 路口静态信息及台账信息
cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid)
if not cross_ledger_info_dict:
return json.dumps(make_common_res(10, '查询路口信息失败'))
cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict)
roads_dir_dict = gen_road_dir_dict(cross_ledger_info)
cross_phase, err = QueryCrossRunningPhase(int(nodeid), [crossid], [str(item) for item in date_list], time_range)
if err or not cross_phase or cross_phase.code != 0:
logging.warning("路口未录入配时方案")
problems = gen_cross_problems(crossid, nodeid, area_id, time_range, str(tp_start).replace('h', '').replace('t', ''), date_list, avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict, cross_phase, is_peak, cross_ledger_info)
res = make_common_res(0, 'ok')
res['data'] = problems
return json.dumps(res, ensure_ascii=False)
# 指标变化趋势接口
def query_cross_index_trend_controller(params):
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(1, '缺少crossid 请刷新后重试'))
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
date_list = check_param(params, 'date_list')
if not date_list or len(date_list) < 1:
return json.dumps(make_common_res(7, '缺少日期参数,请最少选择一天作为查询日期'))
time_range = check_param(params, 'time_range')
if not time_range:
return json.dumps(make_common_res(8, '缺少时段范围,请选择时段范围'))
query_type= check_param(params, 'query_type')
if not query_type:
query_type = 0
tp_start = int(str(time_range.split('-')[0]).split(':')[0]) * 100 + int(str(time_range.split('-')[0]).split(':')[1])
tp_end = int(str(time_range.split('-')[1]).split(':')[0]) * 100 + int(str(time_range.split('-')[1]).split(':')[1])
if query_type == 1:
tp_start = 't' + str(tp_start)
elif query_type == 2:
tp_start = 'h' + str(tp_start)
month_ago_date = (datetime.now().date() - timedelta(days=30)).strftime('%Y%m%d')
now_prev_date = (datetime.now().date() - timedelta(days=1)).strftime('%Y%m%d')
month_date_list = generate_date_range(month_ago_date, now_prev_date)
ten_weeks_date_list = gen_ten_weeks_ago_data_list()
# 查询台账信息 获取路网渠化关系
cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid)
if not cross_ledger_info_dict:
return json.dumps(make_common_res(9, '查询路口信息失败'))
cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict)
roads_dir_dict = gen_road_dir_dict(cross_ledger_info)
# 查询前一天的小时级别数据
hours_data_list = []
for tp in ('h0', 'h100', 'h200', 'h300', 'h400', 'h500', 'h600', 'h700', 'h800', 'h900', 'h1000', 'h1100', 'h1200', 'h1300', 'h1400', 'h1500', 'h1600', 'h1700', 'h1800', 'h1900', 'h2000', 'h2100', 'h2200', 'h2300'):
hour_data = db_cross.query_cross_delay_info(crossid, nodeid, date_list, tp)
hour_cross_delay_info = gen_avg_cross_delay_pb(hour_data)
hour_tp_end = int(tp.replace('h', '')) + 100
hours_data_list.append({
'day': str(date_list[0]) + '-' + str(date_list[-1]),
'tp_start': tp,
'tp_end': hour_tp_end,
'data': hour_cross_delay_info
})
# hours_data = db_cross.query_cross_delay_whole_day_hours(crossid, nodeid, query_date)
# hour_pb_list = parse_data2pb(hours_data)
hours_data_dict = parse_single_cross_delay_info(crossid, nodeid, hours_data_list, 'hour', roads_dir_dict)
hours_data_with_change_rate = calc_single_day_delay_info_change_rate(hours_data_dict)
# 查询近30天的数据
days_data = db_cross.query_cross_delay_info(crossid, nodeid, month_date_list, tp_start)
days_pb_list = parse_data2pb(days_data)
days_data_dict = parse_single_cross_delay_info(crossid, nodeid, days_pb_list, 'day', roads_dir_dict)
days_data_with_change_rate = calc_single_day_delay_info_change_rate(days_data_dict)
# 查询近10周的数据 需要聚合 可能不能使用下述方式查询
ten_week_datas = []
for week_dates in ten_weeks_date_list:
weeks_data = db_cross.query_cross_delay_info(crossid, nodeid, week_dates, tp_start)
week_cross_delay_info = gen_avg_cross_delay_pb(weeks_data)
ten_week_datas.append({
'day': week_dates[0] + '-' + week_dates[-1],
'tp_start': tp_start,
'tp_end': tp_end,
'data': week_cross_delay_info
})
weeks_data_dict = parse_single_cross_delay_info(crossid, nodeid, ten_week_datas, 'week', roads_dir_dict)
weeks_data_with_change_rate = calc_single_day_delay_info_change_rate(weeks_data_dict)
res = make_common_res(0, 'ok')
res['data'] = {
'hours_data': hours_data_with_change_rate,
'days_data': days_data_with_change_rate,
'weeks_data': weeks_data_with_change_rate
}
return json.dumps(res, ensure_ascii=False)
def query_cross_examine_records_detail(params):
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(1, '缺少crossid 请刷新后重试'))
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
date_list = check_param(params, 'date_list')
if not date_list:
return json.dumps(make_common_res(7, '缺少查询日期,请选择查询日期'))
time_range = check_param(params, 'time_range')
if not time_range:
return json.dumps(make_common_res(8, '缺少时段范围,请选择时段范围'))
start_time = check_param(params, 'start_time')
if not start_time:
return json.dumps(make_common_res(9, '缺少开始时间,请选择开始时间'))
max_date = max(date_list)
min_date = min(date_list)
tp_start = int(str(start_time).split(':')[0]) * 100 + int(str(start_time).split(':')[1])
cross_examine_records = db_cross.query_cross_examine_records(tp_start, max_date, crossid, min_date)
prev_day = (datetime.now() - timedelta(days=1)).strftime('%Y%m%d')
if not cross_examine_records:
return json.dumps(make_common_res(9, '当前路口在所选时段无异常情况'))
days_records = {}
dir_str_dict = {
'E': '', 'S': '', 'W': '西', 'N': '', 'NE': '东北', 'SE': '东南', 'SW': '西南', 'NW': '西北'
}
if not cross_examine_records:
return json.dumps(make_common_res(10, '当前路口在所选时段无异常情况'))
final_state, level_color, first_date = 0, 0, ''
tp_start, tp_end = '', ''
for row in cross_examine_records:
if row['final_state'] == 6 or row['level_color'] == 4:
continue
final_state = row['final_state']
first_date = row['first_date']
level_color = row['level_color']
end_date = row['end_date']
if end_date:
date_list = generate_date_range(first_date, end_date)
else:
date_list = generate_date_range(first_date, prev_day)
phase_types = row['phase_type']
phase_details = row['phase_detail']
tp_start = convert_time(row['start_hm'])
tp_end = convert_time(row['end_hm'])
phase_type_list = phase_types.split('^')
phase_detail_list = phase_details.split('^')
for i in range(len(phase_type_list)):
date = date_list[i]
phase_type = phase_type_list[i]
phase_detail = phase_detail_list[i]
item_phase_type_list = phase_type.split(';')
item_phase_detail_list = phase_detail.split(';')
phase_type_dir_dict, dir_phase_detail_dict = {}, {}
for item in item_phase_detail_list:
src_dir_str = item.split(':')[0]
clean_key = re.sub(r'正|\(\d+\)', '', src_dir_str)
dir_phase_detail_dict[clean_key] = item
for item in item_phase_type_list:
if item == '':
continue
src_dir = item.split(':')[0]
item_type = item.split(':')[1]
item_detail = dir_phase_detail_dict[dir_str_dict[src_dir]]
if int(item_type) not in phase_type_dir_dict.keys():
phase_type_dir_dict[int(item_type)] = [item_detail]
else:
phase_type_dir_dict[int(item_type)].append(item_detail)
days_records[date] = phase_type_dir_dict
res = make_common_res(0, 'ok')
res['data'] = {
'days_records': days_records,
'final_state': final_state,
'level_color': level_color,
'first_date': first_date,
'time_range': tp_start + '-' + tp_end if tp_start != '' else '',
}
return json.dumps(res, ensure_ascii=False)
def update_cross_examine_record_state(params):
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(1, '缺少crossid 请刷新后重试'))
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
final_state = check_param(params, 'final_state')
if not final_state:
return json.dumps(make_common_res(7, '缺少需要修改的状态, 请刷新后重试'))
if int(final_state) not in [4, 5]:
return json.dumps(make_common_res(8, '最终状态信息异常,清检查后重试'))
first_date = check_param(params, 'first_date')
if not first_date:
return json.dumps(make_common_res(9, '缺少开始时间, 请刷新后重试'))
time_range = check_param(params, 'time_range')
if not time_range:
return json.dumps(make_common_res(10, '缺少时段范围, 请刷新后重试'))
tp_start = int(str(time_range.split('-')[0]).split(':')[0]) * 100 + int(str(time_range.split('-')[0]).split(':')[1])
end_date = datetime.now().strftime("%Y%m%d")
ret = db_cross.update_cross_examine_record_state_sql(crossid, first_date, end_date, tp_start, final_state)
if ret == 1:
return json.dumps(make_common_res(0, 'ok'))
return json.dumps(make_common_res(11, '修改失败,请检查后重试'))