# -*- coding: utf-8 -*- # @Author: Owl # @Date: 2026/5/11 14:37 # @Description: 路口对比报告文件必须数据产出代码 import os from app.cross_compare_common import * from app.report_common import * from tool.qcos_func import get_client, CosFolderManager g_cos_root = 'https://xinglu-1324629296.cos.ap-beijing.myqcloud.com' g_cos_bucket = 'xinglu-1324629296' weekday2Str = { 1: '周一', 2: '周二', 3: '周三', 4: '周四', 5: '周五', 6: '周六', 7: '周日' } def gen_cross_compare_report(params): crossid = check_param(params, 'crossid') if not crossid: return json.dumps(make_common_res(1, '缺少crossid, 请刷新后重试')) nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) date_list = check_param(params, 'date_list') # 优化后 if not date_list or len(date_list) < 1: return json.dumps(make_common_res(7, '缺少日期参数,请最少选择一天作为查询日期')) compare_date_list = check_param(params, 'compare_date_list') # 优化前 if not compare_date_list or len(compare_date_list) < 1: return json.dumps(make_common_res(6, '缺少对比日期参数,请最少选择一天作为对比日期')) query_type = check_param(params, 'query_type') if not query_type: query_type = 0 time_range = check_param(params, 'time_range') if not time_range: return json.dumps(make_common_res(8, '缺少时段范围,请选择时段范围')) weekdays = check_param(params, 'weekdays') if not weekdays: weekdays = '1,2,3,4,5,6,7' tp_start = int(str(time_range.split('-')[0]).split(':')[0]) * 100 + int(str(time_range.split('-')[0]).split(':')[1]) if query_type == 1: tp_start = 't' + str(tp_start) elif query_type == 2: tp_start = 'h' + str(tp_start) parts = check_param(params, 'parts') if not parts or len(parts) < 1: return json.dumps(make_common_res(11, '缺少章节信息,请选择输出章节内容信息')) cut_images = check_param(params, 'cut_images') if not cut_images: cut_images = 0 # 路口静态信息及台账信息 cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid) if not cross_ledger_info_dict: return json.dumps(make_common_res(10, '查询路口信息失败')) cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict) roads_dir_dict = gen_road_dir_dict(cross_ledger_info) gen_report_date = datetime.datetime.now().strftime('%Y年%m月%d日') file_name = gen_report_file_name(cross_static_info['name'] + '路口优化对比报告' + str(cut_images), date_list + compare_date_list, time_range, parts, gen_report_date) # 优化后 data_range = gen_date_range_str(date_list) # 优化前 compare_date_range = gen_date_range_str(compare_date_list) # 验证文件是否已经存在 cos_client = get_client() folder_manager = CosFolderManager(cos_client, g_cos_bucket) download_url = g_cos_root + "/user/cross_doctor/export_files/cross_compare_files/" + file_name if folder_manager.file_exists(download_url): res = make_common_res(0, 'ok') res['data'] = { 'crossid': crossid, 'nodeid': nodeid, 'area_id': area_id, 'name': cross_static_info['name'], 'file_name': file_name, 'gen_report_date': gen_report_date, 'date_range': data_range, 'compare_date_range': compare_date_range, 'weekday_str': gen_weekday_str(weekdays), 'download_url': download_url } return json.dumps(res, ensure_ascii=False) dir = os.path.dirname(os.path.abspath(__file__)) localfile_path = f'{dir}/../temp/{file_name}.lock' with open(localfile_path, 'w') as f: pass if os.path.exists(localfile_path): return json.dumps(make_common_res(12, '正在生成中,请勿重复提交')) cut_image_params = { 'crossid': crossid, 'nodeid': nodeid, 'area_id': area_id, 'date_list': date_list, 'compare_date_list': compare_date_list, 'query_type': query_type, 'time_range': time_range, 'weekdays': weekdays, 'userid': userid, 'cross_name': cross_static_info['name'], } # 必要数据获取 cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, tp_start) avg_cross_delay_info = gen_avg_cross_delay_pb(cross_delay_data_list, weekdays) if not avg_cross_delay_info: return json.dumps(make_common_res(9, '当前所选日期范围内该评测时段无可用数据')) comp_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, compare_date_list, tp_start) avg_comp_cross_delay_info = gen_avg_cross_delay_pb(comp_cross_delay_data_list, weekdays) if not avg_comp_cross_delay_info: return json.dumps(make_common_res(10, '当前所选对比日期范围内该评测时段无可用数据')) cross_inroads = db_tmnet.query_cross_inroads(crossid, nodeid) inroad_static_info_dict = {item['roadid']: item for item in cross_inroads} overview_res = gen_overview_index(avg_cross_delay_info, inroad_static_info_dict, nodeid, date_list, roads_dir_dict) comp_overview_res = gen_overview_index(avg_comp_cross_delay_info, inroad_static_info_dict, nodeid, compare_date_list, roads_dir_dict) road_flow_turn_rate = gen_flow_turn_rate_index(avg_cross_delay_info, roads_dir_dict) is_peak = check_is_peak_tp(time_range, area_id, nodeid) cross_phase, err = QueryCrossRunningPhase(int(nodeid), [crossid], [str(item) for item in date_list], time_range) # 整体指标 final_overview = gen_compare_overview_res(overview_res, comp_overview_res) # 路段指标 road_flow_delay_infos = gen_road_delay_index(avg_cross_delay_info, roads_dir_dict) comp_road_flow_delay_infos = gen_road_delay_index(avg_comp_cross_delay_info, roads_dir_dict) compared_inroad_delay_infos = parse_comp_inroad_delay_infos(road_flow_delay_infos, comp_road_flow_delay_infos) # 装配数据 title = f"{cross_static_info['name']}路口优化对比报告" part1_data = gen_compare_report_part1_data(data_range, compare_date_range, time_range, cross_static_info['name'], final_overview) if 'part1' in parts: if cut_images == 1: part1_data['cut_images'] = True part1_data['cut_image_params'] = cut_image_params part1_data['cut_image_params']['image_part'] = 'part1' # todo 补充自动截图逻辑 if 'part2' in parts: if 'part2_1' in parts: part2_1_data = gen_compare_report_part2_1_data(cross_static_info, cross_ledger_info) if cut_images == 1: part2_1_data['cut_images'] = True part2_1_data['cut_image_params'] = cut_image_params part2_1_data['cut_image_params']['image_part'] = 'part2_1' # todo 补充自动截图逻辑 if 'part2_2' in parts: part2_2_data = gen_compare_report_part2_2_data(road_flow_delay_infos, roads_dir_dict) if cut_images == 1: part2_2_data['cut_images'] = True part2_2_data['cut_image_params'] = cut_image_params part2_2_data['cut_image_params']['image_part'] = 'part2_2' # todo 补充自动截图逻辑 if 'part2_3' in parts: part2_3_data = gen_compare_report_part2_3_data(road_flow_turn_rate, avg_cross_delay_info, inroad_static_info_dict) if cut_images == 1: part2_3_data['cut_images'] = True part2_3_data['cut_image_params'] = cut_image_params part2_3_data['cut_image_params']['image_part'] = 'part2_3' # todo 补充自动截图逻辑 if 'part3' in parts: part3_data = gen_compare_report_part3_data(crossid, nodeid, area_id, time_range, tp_start, date_list, avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict, cross_phase, is_peak, cross_ledger_info, weekdays) if 'part4' in parts: pass if 'part5' in parts: part5_data = gen_compare_report_part5_data(part1_data, compared_inroad_delay_infos) # 清除临时本地文件 os.remove(localfile_path) def gen_compare_report_part1_data(data_range, compare_date_range, time_range, cross_name, final_overview): part1_data = { 'data_range': data_range, 'compare_date_range': compare_date_range, 'cross_name': cross_name, 'time_range': time_range, 'data_list': [] } index_dict = { 'service_level': '路口服务水平', 'jam_index': '路口拥堵指数', 'stop_times': '路口停车次数', 'high_park_percent': '路口多次停车率', 'speed': '路口平均速度', 'delay_time': '路口延误时间' } for key in ('service_level', 'jam_index', 'stop_times', 'high_park_percent', 'speed', 'delay_time'): compare_key = 'comp' + key if key == 'service_level': service_level_compare_res = compare_service_level(final_overview[key], final_overview[compare_key]) if service_level_compare_res and service_level_compare_res > 0: service_level_compare_str = f"{index_dict[key]}由{final_overview[compare_key]}提升为{final_overview[key]}, 提升{service_level_compare_res}个等级;" part1_data['data_list'].append(service_level_compare_str) elif key in ('jam_index', 'stop_times', 'high_park_percent', 'delay_time'): if final_overview[compare_key] and final_overview[key] and final_overview[compare_key] > final_overview[key]: rate = round(final_overview[key] / final_overview[compare_key] * 100, 2) if final_overview[compare_key] > 0 else 0 compare_res_str = f"{index_dict[key]}由{final_overview[compare_key]}下降为{final_overview[key]}, 减少{final_overview[key] - final_overview[compare_key]},优化率为{rate}%;" part1_data['data_list'].append(compare_res_str) else: if final_overview[compare_key] and final_overview[key] and final_overview[compare_key] < final_overview[key]: rate = round(final_overview[key] / final_overview[compare_key] * 100, 2) if final_overview[compare_key] > 0 else 0 compare_res_str = f"{index_dict[key]}由{final_overview[compare_key]}Km/h提升为{final_overview[key]}km/h, 提升{final_overview[key] - final_overview[compare_key]}km/h,提升率为{rate}%;" part1_data['data_list'].append(compare_res_str) return part1_data def gen_compare_report_part2_1_data(cross_static_info, cross_ledger_info): cross_type_dict = db_tmnet.gen_cross_type_dict() cross_type = cross_type_dict[int(cross_ledger_info['data']['ledger']['cross_type'])] if cross_ledger_info['data']['ledger']['cross_type'] and int(cross_ledger_info['data']['ledger']['cross_type']) in cross_type_dict.keys() else '' part2_1_data = { 'cross_name': cross_static_info['name'], 'division': cross_ledger_info['data']['ledger']['division'], 'cross_model': cross_static_info['cross_model'], 'cross_type': cross_type, 'slc_company': cross_static_info['slc_company'] } return part2_1_data def gen_compare_report_part2_2_data(road_flow_delay_infos, roads_dir_dict): road_dir = {v['in']: k for k, v in roads_dir_dict.items()} part2_2_data = { 'total_num': 0, 'detail': '' } err_dir_list = [] for roadid in road_flow_delay_infos.keys(): if road_flow_delay_infos[roadid]['service_level'] in ('E', 'F'): item_str = f"{road_dir[roadid]}进口的延误时间为{road_flow_delay_infos[roadid]['delay_time']}s" part2_2_data['total_num'] += 1 err_dir_list.append(item_str) total_str = '' if err_dir_list: total_str = ','.join(err_dir_list) total_str += '。' part2_2_data['detail'] = total_str return part2_2_data def gen_compare_report_part2_3_data(road_flow_turn_rate, avg_cross_delay_info, inroad_static_info_dict): flow_rate_dict = { 'max': { 'roadid': '', 'flow_rate': 0, 'src_dir': '' }, 'min': { 'roadid': '', 'flow_rate': 101, 'src_dir': '' } } main_flow_src_list, main_flow_str_list = [], [] car_num = avg_cross_delay_info.delay_info.car_num for inroad_info in avg_cross_delay_info.inroad_delay_infos: inroadid = inroad_info.inroadid if inroadid not in inroad_static_info_dict.keys(): continue inroad_car_num = inroad_info.delay_info.car_num src_dir = inroad_static_info_dict[inroadid]['src_direct'] if inroad_car_num / car_num > 0.25: check_res = found_max_car_num_flow(inroad_car_num, inroad_info) if check_res != -1: main_flow_src_list.append(inroadid + ' ' + src_dir + ' ' +str(check_res)) main_flow_str_list.append(dir_str_dict[src_dir] + int_turn_type2str[check_res]) for roadid in road_flow_turn_rate.keys(): if road_flow_turn_rate[roadid]['in_flow_rate'] > flow_rate_dict['max']['flow_rate']: flow_rate_dict['max']['roadid'] = roadid flow_rate_dict['max']['flow_rate'] = road_flow_turn_rate[roadid]['in_flow_rate'] flow_rate_dict['max']['src_dir'] = dir_str_dict[road_flow_turn_rate[roadid]['src_dir']] if road_flow_turn_rate[roadid]['in_flow_rate'] < flow_rate_dict['min']['flow_rate']: flow_rate_dict['min']['roadid'] = roadid flow_rate_dict['min']['flow_rate'] = road_flow_turn_rate[roadid]['in_flow_rate'] flow_rate_dict['min']['src_dir'] = dir_str_dict[road_flow_turn_rate[roadid]['src_dir']] main_flow_str = '、'.join(main_flow_str_list) main_flow_detail = [] for item in main_flow_src_list: roadid = item.split(' ')[0] src_dir = item.split(' ')[1] turn_type = int(item.split(' ')[2]) if turn_type == 0: turn_type_str = 's_rate' elif turn_type == 1: turn_type_str = 'l_rate' else: turn_type_str = 'r_rate' main_flow_detail.append(f"{dir_str_dict[src_dir]}进口道以{int_turn_type2str[turn_type]}车流为主,占比为{road_flow_turn_rate[roadid][turn_type_str]}%") part2_3_data = { 'max_src_dir': flow_rate_dict['max']['src_dir'], 'max_flow_rate': str(flow_rate_dict['max']['flow_rate']) + '%', 'mi_src_dirn': flow_rate_dict['min']['src_dir'], 'min_flow_rate': str(flow_rate_dict['min']['flow_rate']) + '%', 'main_flow_str': main_flow_str, 'main_flow_detail': ';'.join(main_flow_detail) } return part2_3_data def gen_compare_report_part3_data(crossid, nodeid, area_id, time_range, tp_start, date_list, avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict, cross_phase, is_peak, cross_ledger_info, weekdays): cross_problems = gen_cross_problems(crossid, nodeid, area_id, time_range, str(tp_start).replace('h', '').replace('t', ''), date_list, avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict, cross_phase, is_peak, cross_ledger_info, weekdays) problem_key_dict = { 'operating_efficiency_problems': '运行效果', 'balanced_control_problems': '均衡调控', 'phase_problems': '配时方案', 'cross_channelized_problems': '路口渠化' } problem_detail_dict = {} for problem_key in cross_problems.keys(): if problem_key != 'phase_problems': if 'total_num' in cross_problems[problem_key].keys() and cross_problems[problem_key]['total_num'] > 0: if problem_key not in problem_key_dict.keys(): problem_detail_dict[problem_key] = { 'class_name': problem_key_dict[problem_key], 'detail': [] } problem_infos = cross_problems[problem_key]['values']['problem_infos'] problem_detail_dict[problem_key]['detail'].append({ 'item_name': problem_infos['item_name'], 'item_detail': problem_infos['item_detail_str'] }) else: if 'total_num' in cross_problems[problem_key].keys() and cross_problems[problem_key]['total_num'] > 0: if problem_key not in problem_key_dict.keys(): problem_detail_dict[problem_key] = { 'class_name': problem_key_dict[problem_key], 'detail': [] } for phase_item in cross_problems[problem_key]['values']: item_list = [] if len(phase_item['detail']) > 0: if phase_item['item'] == '路口方案异常': for item in phase_item['detail']: item_detail = item['child_detail'] item_str = f"{item_detail[0]['text']}{item_detail[1]['level_color']}{item_detail[1]['text']}{item_detail[1]['final_state']}" item_list.append(item_str) elif phase_item['item'] == '调度计划单一': item_detail = phase_item['detail'][0]['child_detail'] item_str = f"{item_detail[0]['text']}{item_detail[1]['scheduleid']}{item_detail[2]['text']}" item_list.append(item_str) elif phase_item['item'] == '时段划分单一': for item in phase_item['detail']: item_detail = item['child_detail'] item_str = f"{item_detail[0]['text']}{item_detail[1]['scheduleid']}{item_detail[2]['text']}" item_list.append(item_str) elif phase_item['item'] in ('方案运行时段过短', '方案周期过大', '方案周期过小', '黄灯时间过短', '全红时间不合理', '行人过街时间不足'): for item in phase_item['detail']: item_detail = item['child_detail'] item_str = f"{item_detail[0]['plan_name']}{item_detail[1]['text']}" item_list.append(item_str) problem_detail_dict[problem_key]['detail'].append({ 'item_name': phase_item['item'], 'item_detail': ';'.join(item_list) }) return problem_detail_dict def gen_compare_report_part5_data(part1_data, compared_inroad_delay_infos): part5_data = { 'cross_data': part1_data, 'src_dir_data_dict': {} } index_dict = { 'service_level': '路口服务水平', 'jam_index': '路口拥堵指数', 'stop_times': '路口停车次数', 'high_park_percent': '路口多次停车率', 'speed': '路口平均速度', 'delay_time': '路口延误时间' } for roadid in compared_inroad_delay_infos.keys(): src_dir = compared_inroad_delay_infos[roadid]['src_dir'] src_dir_data = [] compare_data = compared_inroad_delay_infos[roadid]['comp_data'] item_data = compared_inroad_delay_infos[roadid]['item_data'] for key in ('service_level', 'jam_index', 'stop_times', 'high_park_percent', 'speed', 'delay_time'): if key == 'service_level': service_level_compare_res = compare_service_level(item_data[key], compare_data[key]) if service_level_compare_res and service_level_compare_res > 0: service_level_compare_str = f"{index_dict[key]}由{compare_data[key]}提升为{item_data[key]}, 提升{service_level_compare_res}个等级;" src_dir_data.append(service_level_compare_str) elif key in ('jam_index', 'stop_times', 'high_park_percent', 'delay_time'): if compare_data[key] and item_data[key] and compare_data[key] > item_data[key]: rate = round(item_data[key] / compare_data[key] * 100, 2) if compare_data[key] > 0 else 0 compare_res_str = f"{index_dict[key]}由{compare_data[key]}下降为{item_data[key]}, 减少{item_data[key] - compare_data[key]},优化率为{rate}%;" src_dir_data.append(compare_res_str) else: if compare_data[key] and item_data[key] and compare_data[key] < item_data[key]: rate = round(item_data[key] / compare_data[key] * 100, 2) if compare_data[key] > 0 else 0 compare_res_str = f"{index_dict[key]}由{compare_data[key]}Km/h提升为{item_data[key]}km/h, 提升{item_data[key] - compare_data[key]}km/h,提升率为{rate}%;" src_dir_data.append(compare_res_str) if len(src_dir_data) > 0: part5_data['src_dir_data_dict'][srcDir_toStr(src_dir)] = { 'src_dir': src_dir, 'src_dir_str': srcDir_toStr(src_dir), 'data_list': src_dir_data } return part5_data def gen_weekday_str(weekday): if weekday == '1,2,3,4,5': weekday_str = '工作日' elif weekday == '6,7': weekday_str = '节假日' elif weekday == '1,2,3,4,5,6,7': weekday_str = '全周' else: weekday_list = weekday.split(',') weekday_str = weekday2Str[int(weekday_list[0])] for i in range(1, len(weekday_list)): weekday_str += '、' + weekday2Str[int(weekday_list[i])] return weekday_str def collect_report(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) crossid = check_param(params, 'crossid') if not crossid: return json.dumps(make_common_res(6, '缺少路口id,请刷新后重试')) cross_name = check_param(params, 'cross_name') if not cross_name: return json.dumps(make_common_res(7, '缺少路口名称,请刷新后重试')) download_url = check_param(params, 'download_url') if not download_url: return json.dumps(make_common_res(8, '缺少下载地址,请刷新后重试')) report_type = check_param(params, 'report_type') if not report_type or report_type not in(0, 1, 2, 3): # 0 对比报告 ,其余为预留 return json.dumps(make_common_res(7, '缺少报告类型,请刷新后重试')) report_type = int(report_type) ret = None if report_type == 0: date_range = check_param(params, 'date_range') if not date_range: return json.dumps(make_common_res(8, '缺少时间范围, 请刷新后重试')) compare_date_range = check_param(params, 'compare_date_range') if not compare_date_range: return json.dumps(make_common_res(9, '缺少对比时间范围, 请刷新后重试')) time_range = check_param(params, 'time_range') if not time_range: return json.dumps(make_common_res(10, '缺少时间范围, 请刷新后重试')) weekday_str = check_param(params, 'weekday_str') if not weekday_str: return json.dumps(make_common_res(11, '缺少周信息, 请刷新后重试')) gen_report_date = check_param(params, 'gen_report_date') if not gen_report_date: return json.dumps(make_common_res(12, '缺少生成时间, 请刷新后重试')) ret = db_tmnet.save_collect_report(report_type, params) if ret: return json.dumps(make_common_res(0, 'ok')) def query_collect_report_record(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) report_type = check_param(params, 'report_type') if not report_type or report_type not in(0, 1, 2, 3): # 0 对比报告 ,其余为预留 return json.dumps(make_common_res(7, '缺少报告类型,请刷新后重试')) report_type = int(report_type) records = db_tmnet.query_collect_report_records(nodeid, area_id, report_type) res = make_common_res(0, 'ok') res['data'] = records return json.dumps(res, ensure_ascii=False)