路口诊断导出诊断指标excel

Signed-off-by: yinzijian <yinzijian@haomozhixing.onaliyun.com>
This commit is contained in:
yinzijian 2025-11-04 17:30:20 +08:00
parent 22c54bdf8d
commit 17259bef5a
1 changed files with 103 additions and 1 deletions

View File

@ -2,14 +2,18 @@
# @Author: Owl
# @Date: 2025/10/10 14:33
# @Description:
import io
import json
from io import BytesIO
from app.common_worker import *
from app.eva_common import *
from openpyxl import Workbook
from openpyxl.styles import Alignment
# 查询可用路口列表
from proto.phase_grpc import QueryCrossRunningPhase
from flask import send_file
def query_cross_list(params):
@ -155,8 +159,106 @@ def query_cross_delay_info_controller(params):
'ledger_info': cross_ledger_info,
'next_cross_info': next_cross_info
}
if params.get('excel') == 1:
return query_cross_delay_info_controller_export_excel(road_flow_delay_infos, road_flow_turn_rate)
return json.dumps(res, ensure_ascii=False)
def query_cross_delay_info_controller_export_excel(road_flow_delay_infos, road_flow_turn_rate):
wb = Workbook()
sheet1 = wb.active
sheet1.title = "调度计划表" # 给第一个 sheet 命名
table_head = [
{"range": "A1:A2", "value": "进口道"},
{"range": "B1:B2", "value": "服务水平"},
{"range": "C1:D1", "value": "转向服务水平"},
{"range": "C2", "value": "左转"},
{"range": "D2", "value": "直行"},
{"range": "E1:E2", "value": "停车次数"},
{"range": "F1:G1", "value": "转向停车次数"},
{"range": "F2", "value": "左转"},
{"range": "G2", "value": "直行"},
{"range": "H1:H2", "value": "多次停车率", "width": 20},
{"range": "I1:J1", "value": "转向多次停车率"},
{"range": "I2", "value": "左转"},
{"range": "J2", "value": "直行"},
{"range": "K1:K2", "value": "延误时间(s)", "width": 20},
{"range": "L1:M1", "value": "转向延误时间(s)"},
{"range": "L2", "value": "左转"},
{"range": "M2", "value": "直行"},
{"range": "N1:N2", "value": "平均速度(km/h)", "width": 20},
{"range": "O1:P1", "value": "转向平均速度(km/h)"},
{"range": "O2", "value": "左转"},
{"range": "P2", "value": "直行"},
{"range": "Q1:Q2", "value": "不停车速度(km/h)", "width": 20},
{"range": "R1:S1", "value": "转向不停车速度"},
{"range": "R2", "value": "左转"},
{"range": "S2", "value": "直行"},
{"range": "T1:T2", "value": "进口道流量占比","width": 20},
{"range": "U1:W1", "value": "分流转向占比"},
{"range": "U2", "value": "左转"},
{"range": "V2", "value": "直行"},
{"range": "W2", "value": "右转"},
{"range": "X1:X2", "value": "出口道流量占比","width": 20},
{"range": "Y1:AA1", "value": "汇入转向占比"},
{"range": "Y2", "value": "左转"},
{"range": "Z2", "value": "直行"},
{"range": "AA2", "value": "右转"},
{"range": "AB1:AB2", "value": "转向失衡系数","width": 20},
]
for item_head in table_head:
sheet1.merge_cells(item_head['range'])
cell = item_head['range'].split(":")[0]
sheet1[cell] = item_head['value']
sheet1[cell].alignment = Alignment(horizontal='center', vertical='center')
if item_head.get('width'):
sheet1.column_dimensions[cell[:-1]].width = item_head['width']
src_dir_map = {}
for road, item_delay_infos in road_flow_delay_infos.items():
src_dir = item_delay_infos['src_dir']
if src_dir not in src_dir_map:
src_dir_map[src_dir] = []
src_dir_map[src_dir].append(srcDir_toStr(item_delay_infos['src_dir']))
src_dir_map[src_dir].append(item_delay_infos['service_level'])
src_dir_map[src_dir].append(item_delay_infos['flow_delays'][1]['service_level'] if 1 in item_delay_infos['flow_delays'] else '-')
src_dir_map[src_dir].append(item_delay_infos['flow_delays'][0]['service_level'] if 0 in item_delay_infos['flow_delays'] else '-')
src_dir_map[src_dir].append(item_delay_infos['stop_times'])
src_dir_map[src_dir].append(item_delay_infos['flow_delays'][1]['stop_times'] if 1 in item_delay_infos['flow_delays'] else '-')
src_dir_map[src_dir].append(item_delay_infos['flow_delays'][0]['stop_times'] if 0 in item_delay_infos['flow_delays'] else '-')
src_dir_map[src_dir].append(item_delay_infos['high_park_percent'])
src_dir_map[src_dir].append(item_delay_infos['flow_delays'][1]['high_park_percent'] if 1 in item_delay_infos['flow_delays'] else '-')
src_dir_map[src_dir].append(item_delay_infos['flow_delays'][0]['high_park_percent'] if 0 in item_delay_infos['flow_delays'] else '-')
src_dir_map[src_dir].append(item_delay_infos['delay_time'])
src_dir_map[src_dir].append(item_delay_infos['flow_delays'][1]['delay_time'] if 1 in item_delay_infos['flow_delays'] else '-')
src_dir_map[src_dir].append(item_delay_infos['flow_delays'][0]['delay_time'] if 0 in item_delay_infos['flow_delays'] else '-')
src_dir_map[src_dir].append(item_delay_infos['speed'])
src_dir_map[src_dir].append(item_delay_infos['flow_delays'][1]['speed'] if 1 in item_delay_infos['flow_delays'] else '-')
src_dir_map[src_dir].append(item_delay_infos['flow_delays'][0]['speed'] if 0 in item_delay_infos['flow_delays'] else '-')
src_dir_map[src_dir].append(item_delay_infos['move_speed'])
src_dir_map[src_dir].append(item_delay_infos['flow_delays'][1]['move_speed'] if 1 in item_delay_infos['flow_delays'] else '-')
src_dir_map[src_dir].append(item_delay_infos['flow_delays'][0]['move_speed'] if 0 in item_delay_infos['flow_delays'] else '-')
src_dir_map[src_dir].append(road_flow_turn_rate[road]['in_flow_rate'] if road in road_flow_turn_rate and road_flow_turn_rate[road]['in_flow_rate'] else '-')
src_dir_map[src_dir].append(road_flow_turn_rate[road]['l_rate'] if road in road_flow_turn_rate and road_flow_turn_rate[road]['l_rate'] else '-')
src_dir_map[src_dir].append(road_flow_turn_rate[road]['s_rate'] if road in road_flow_turn_rate and road_flow_turn_rate[road]['s_rate'] else '-')
src_dir_map[src_dir].append(road_flow_turn_rate[road]['r_rate'] if road in road_flow_turn_rate and road_flow_turn_rate[road]['r_rate'] else '-')
src_dir_map[src_dir].append(road_flow_turn_rate[road]['out_flow_rate'] if road in road_flow_turn_rate and road_flow_turn_rate[road]['out_flow_rate'] else '-')
src_dir_map[src_dir].append(road_flow_turn_rate[road]['out_l_rate'] if road in road_flow_turn_rate and road_flow_turn_rate[road]['out_l_rate'] else '-')
src_dir_map[src_dir].append(road_flow_turn_rate[road]['out_s_rate'] if road in road_flow_turn_rate and road_flow_turn_rate[road]['out_s_rate'] else '-')
src_dir_map[src_dir].append(road_flow_turn_rate[road]['out_r_rate'] if road in road_flow_turn_rate and road_flow_turn_rate[road]['out_r_rate'] else '-')
src_dir_map[src_dir].append(item_delay_infos['imbalance_index'])
src_dir_data = list(src_dir_map.values())
for item_src_dir_data in src_dir_data:
sheet1.append(item_src_dir_data)
file_stream = io.BytesIO()
wb.save(file_stream)
file_stream.seek(0) # 将指针移到文件开头
return send_file(
file_stream,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=f"路口诊断指标.xlsx")
# 问题诊断接口
def query_cross_problems(params):