# -*- coding: utf-8 -*- # @Author: Owl # @Date: 2025/11/27 10:15 # @Description: 路口优化对比页面相关接口函数 from app.eva_common import * from app.cross_compare_common import * import proto.greenwave_type_pb2 as survey_pb def query_compare_data(params): """ 对比数据查询接口 :param data: :return: json """ crossid = check_param(params, 'crossid') if not crossid: return json.dumps(make_common_res(1, '缺少crossid, 请刷新后重试')) nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) date_list = check_param(params, 'date_list') # 优化后 if not date_list or len(date_list) < 1: return json.dumps(make_common_res(7, '缺少日期参数,请最少选择一天作为查询日期')) compare_date_list = check_param(params, 'compare_date_list') # 优化前 if not compare_date_list or len(compare_date_list) < 1: return json.dumps(make_common_res(6, '缺少对比日期参数,请最少选择一天作为对比日期')) query_type = check_param(params, 'query_type') if not query_type: query_type = 0 time_range = check_param(params, 'time_range') if not time_range: return json.dumps(make_common_res(8, '缺少时段范围,请选择时段范围')) export = 1 if check_param(params, 'export') and params['export'] == 1 else 0 tp_start = int(str(time_range.split('-')[0]).split(':')[0]) * 100 + int(str(time_range.split('-')[0]).split(':')[1]) # tp_end = int(str(time_range.split('-')[1]).split(':')[0]) * 100 + int(str(time_range.split('-')[1]).split(':')[1]) # if tp_end == 0: # tp_end = 2400 if query_type == 1: tp_start = 't' + str(tp_start) elif query_type == 2: tp_start = 'h' + str(tp_start) cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, tp_start) avg_cross_delay_info = gen_avg_cross_delay_pb(cross_delay_data_list) if not avg_cross_delay_info: return json.dumps(make_common_res(9, '当前所选日期范围内该评测时段无可用数据')) comp_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, compare_date_list, tp_start) avg_comp_cross_delay_info = gen_avg_cross_delay_pb(comp_cross_delay_data_list) if not avg_comp_cross_delay_info: return json.dumps(make_common_res(10, '当前所选对比日期范围内该评测时段无可用数据')) cross_inroads = db_tmnet.query_cross_inroads(crossid, nodeid) inroad_static_info_dict = {item['roadid']: item for item in cross_inroads} # 路口静态信息及台账信息 cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid) if not cross_ledger_info_dict: return json.dumps(make_common_res(10, '查询路口信息失败')) cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict) roads_dir_dict = gen_road_dir_dict(cross_ledger_info) # 路口指标数据概览 overview_res = gen_overview_index(avg_cross_delay_info, inroad_static_info_dict, nodeid, date_list, roads_dir_dict) comp_overview_res = gen_overview_index(avg_comp_cross_delay_info, inroad_static_info_dict, nodeid, compare_date_list, roads_dir_dict) final_overview = gen_compare_overview_res(overview_res, comp_overview_res) # 路段及流向数据概览 road_flow_delay_infos = gen_road_delay_index(avg_cross_delay_info, roads_dir_dict) comp_road_flow_delay_infos = gen_road_delay_index(avg_comp_cross_delay_info, roads_dir_dict) compared_inroad_delay_infos = parse_comp_inroad_delay_infos(road_flow_delay_infos, comp_road_flow_delay_infos) prev_cross_info = get_prev_cross(nodeid, area_id, roads_dir_dict) res = make_common_res(0, 'ok') res['data'] = { 'cross_static_info': cross_static_info, 'overview': final_overview, 'ledger_info': cross_ledger_info, 'next_cross_info': prev_cross_info, 'road_flow_delay_infos': compared_inroad_delay_infos } if export == 1: return query_compare_data_export_excel(compared_inroad_delay_infos, cross_static_info['name'],compare_date_list,date_list,time_range) return json.dumps(res, ensure_ascii=False) def do_add_cross_survey_job(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) start_date = check_param(params, 'start_date') if not start_date: return json.dumps(make_common_res(6, '缺少开始日期,请选择开始日期')) crossids = check_param(params, 'crossids') if not crossids or len(crossids) < 1: return json.dumps(make_common_res(7, '缺少路口id,请选择路口')) fail_num, err_str, values = 0, '创建实景勘察任务失败的路口为:', [] for crossid in crossids: existed_jobs_list = db_tmnet.query_cross_survey_job(crossid, area_id) new_job_end_date = datetime.strptime(start_date, '%Y%m%d') + + timedelta(days=30) bad_flag = False cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid) if not cross_ledger_info_dict: err_str += ",路口【%s】信息查询失败" % crossid fail_num += 1 continue cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict) roads_dir_dict = gen_road_dir_dict(cross_ledger_info) for job in existed_jobs_list: if job['status'] == 1: err_str += ",路口【%s】已存在进行中任务" % g_roadnet.query_cross(crossid).name bad_flag = True continue elif job['status'] == 0: if datetime.strptime(str(job['start_day']), '%Y%m%d') < new_job_end_date < datetime.strptime(str(job['end_day']), '%Y%m%d')\ or datetime.strptime(str(job['start_day']), '%Y%m%d') < datetime.strptime(start_date, '%Y%m%d') < datetime.strptime(str(job['end_day']), '%Y%m%d'): err_str += ",路口【%s】已存在未开始任务" % g_roadnet.query_cross(crossid).name bad_flag = True continue if bad_flag: fail_num += 1 continue done_inroads = '|'.join([src_dir + ':0' for src_dir in roads_dir_dict.keys() if roads_dir_dict[src_dir]['in'] != '-']) inroads_dir = '|'.join([src_dir + ':' + roads_dir_dict[src_dir]['in'] for src_dir in roads_dir_dict.keys() if roads_dir_dict[src_dir]['in'] != '-']) values.append((crossid, start_date, new_job_end_date.strftime('%Y%m%d'), 0, done_inroads, inroads_dir, nodeid, area_id)) if len(values) > 0: ret = db_tmnet.insert_cross_survey_job(values) if ret == len(values): res = make_common_res(0, 'ok') else: res = make_common_res(1, '创建任务失败') res['data'] = { 'fail_num': fail_num, 'err_str': err_str if err_str != '创建实景勘察任务失败的路口为:' else '无' } return json.dumps(res) else: res = make_common_res(1, '创建任务失败') res['data'] = { 'fail_num': fail_num, 'err_str': err_str if err_str != '创建实景勘察任务失败的路口为:' else '无' } return json.dumps(res) def do_del_cross_survey_job(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) jobid = check_param(params, 'jobid') if not jobid: return json.dumps(make_common_res(6, '缺少任务ID,请刷新后重试')) job_info = db_tmnet.query_survey_job_info_by_id(jobid) if not job_info: return json.dumps(make_common_res(7, '任务不存在,请刷新后重试')) if job_info[0]['status'] in [1, 2]: return json.dumps(make_common_res(8, '任务正在执行中,请勿删除')) ret = db_tmnet.del_cross_survey_job(jobid) if ret == 1: return json.dumps(make_common_res(0, 'ok')) return json.dumps(make_common_res(9, '删除任务失败')) def query_cross_survey_job_list(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) keyword = check_param(params, 'keyword') if not keyword: keyword = '' start_date = check_param(params, 'start_date') if not start_date: start_date = '' end_date = check_param(params, 'end_date') if not end_date: end_date = '' page = check_param(params, 'page') if not page: page = 1 page_size = check_param(params, 'page_size') if not page_size: page_size = 10 job_state = check_param(params, 'job_state') if not job_state: job_state = -1 start_index = (int(page) - 1) * int(page_size) end_index = start_index + int(page_size) job_list = db_tmnet.query_survey_job_info_by_area_id(area_id) for row in job_list: row['cross_name'] = g_roadnet.query_cross(row['crossid']).name complete_day = '-' status = row['status'] if row['end_day'] < int(datetime.now().strftime('%Y%m%d')) and status != 2: status = 3 if row['start_day'] > int(datetime.now().strftime('%Y%m%d')): status = 0 if status == 2: complete_day = row['update_time'].strftime('%Y%m%d') row['status'] = status row['complete_day'] = complete_day row['update_time'] = row['update_time'].strftime('%Y%m%d') row['create_time'] = row['create_time'].strftime('%Y%m%d') if keyword and keyword != '': job_list = find_job_info(keyword, job_list) if start_date and start_date != '' and end_date and end_date != '': job_list = list(filter(lambda item: str(item['end_day']) > str(end_date), job_list)) job_list = list(filter(lambda item: str(item['start_day']) < str(start_date), job_list)) if job_state and job_state != -1: job_list = list(filter(lambda item: item['status'] == job_state, job_list)) job_list = sorted(job_list, key=lambda item: item['update_time'], reverse=True) total_num = len(job_list) job_list = job_list[start_index:end_index] res = make_common_res(0, 'ok') res['data'] = { 'total_num': total_num, 'job_list': job_list } return json.dumps(res) def rerun_cross_survey_job(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) jobid = check_param(params, 'jobid') if not jobid: return json.dumps(make_common_res(6, '缺少任务ID,请刷新后重试')) job_info = db_tmnet.query_survey_job_info_by_id(jobid) if not job_info: return json.dumps(make_common_res(7, '任务不存在,请刷新后重试')) if job_info[0]['status'] in [1, 2]: return json.dumps(make_common_res(8, '任务正在执行中,无法执行重跑操作')) ret = db_tmnet.update_cross_survey_job_status(job_info, jobid) if ret == 1: return json.dumps(make_common_res(0, 'ok')) return json.dumps(make_common_res(9, '重跑任务失败')) def query_usable_survey_crosses(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) area_servey_jobs = db_tmnet.query_survey_job_info_by_area_id(area_id) usable_crossid_list = list(set([job['crossid'] for job in area_servey_jobs.values()])) res_list = [] for crossid in usable_crossid_list: cross_name = g_roadnet.query_cross(crossid).name crossid = crossid res_list.append({'crossid': crossid, 'cross_name': cross_name}) res = make_common_res(0, 'ok') res['data'] = res_list return json.dumps(res) def query_cross_survey_usable_dates(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) crossid = check_param(params, 'crossid') if not crossid: return json.dumps(make_common_res(6, '缺少crossid, 请刷新后重试')) usable_info = { 'crossid': crossid, 'job_info': [] } existed_jobs_list = db_tmnet.query_cross_survey_job(crossid, area_id) for row in existed_jobs_list: start_day = row['start_day'] end_day = '至今' if row['status'] == 2: # 判定当前任务状态值,如果当前任务状态为2 则任务可查询时间区间的极大值取当前日期和任务完成时间(即最新的update_time)的较小值 end_day = row['update_time'].strftime('%Y%m%d') usable_info['job_info'].append({ 'jobid': row['id'], 'time_range': start_day + '-' + end_day, }) res = make_common_res(0, 'ok') res['data'] = usable_info return json.dumps(res) def query_cross_survey_result(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) crossid = check_param(params, 'crossid') if not crossid: return json.dumps(make_common_res(6, '缺少路口id,请刷新后重试')) jobid = check_param(params, 'jobid') if not jobid: return json.dumps(make_common_res(7, '缺少任务id,请刷新后重试')) job_info = db_tmnet.query_survey_job_info_by_id(jobid) done_inroads = job_info[0]['done_inroads'] done_inroads_list = done_inroads.split('|') done_src_dir_list = [] for item in done_inroads_list: if item.split(':')[1] == '2': done_src_dir_list.append(item.split(':')[0]) # 路口静态信息及台账信息 cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid) if not cross_ledger_info_dict: return json.dumps(make_common_res(10, '查询路口信息失败')) cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict) roads_dir_dict = gen_road_dir_dict(cross_ledger_info) road_src_dict = {v['in']: k for k, v in roads_dir_dict.items()} src_images = {} for item in done_src_dir_list: item_key = 'csr_' + crossid + '_' + str(jobid) + '_' + item item_data = db_cross.query_csr_data(nodeid, item_key, crossid) if not item_data: logging.error('路口id: %s, 任务id: %s, 源方向: %s, 数据不存在' % (crossid, jobid, item)) continue item_csr_data = survey_pb.xl_cross_survey_result_t() item_csr_data.ParseFromString(item_data[0]['data']) inroadid = item_csr_data.inroadid if inroadid not in road_src_dict: logging.error('路口id: %s, 源方向: %s, 数据不存在' % (crossid, inroadid)) continue src_dir = road_src_dict[inroadid] pos_list = item_csr_data.pos_list image_list = [] for pos in pos_list: speed = pos.speed image_time = datetime.fromtimestamp(pos.timestamp).strftime('%Y-%m-%d %H:%M:%S') image_url = pos.image_url image_location = str(pos.lon / 10000000) + ',' + str(pos.lat / 10000000) dist = pos.dist image_list.append({ 'src_dir': srcDir_toStr(src_dir) + '进口', 'speed': speed, 'image_time': image_time, 'image_url': image_url, 'image_location': image_location, 'dist': dist }) src_images[src_dir] = image_list dir_info = {} for item in done_inroads_list: dir_info[item.split(':')[0]] = 0 if item.split(':')[1] != '2' else 1 res = make_common_res(0, 'ok') res['data'] = { 'crossid': crossid, 'name': cross_static_info['name'], 'location': cross_static_info['location'], 'src_images': src_images, 'ledger_info': cross_ledger_info, 'can_rerun_dir': dir_info } return json.dumps(res, ensure_ascii=False) def rerun_cross_survey_dir(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) crossid = check_param(params, 'crossid') if not crossid: return json.dumps(make_common_res(6, '缺少路口id,请刷新后重试')) jobid = check_param(params, 'jobid') if not jobid: return json.dumps(make_common_res(7, '缺少任务id,请刷新后重试')) rerun_list = check_param(params, 'rerun_list') if not rerun_list or len(rerun_list) < 1: return json.dumps(make_common_res(8, '缺少重跑方向,请刷新后重试')) job_info = db_tmnet.query_survey_job_info_by_id(jobid) done_inroads = job_info[0]['done_inroads'] done_inroads_list = done_inroads.split('|') done_src_dir_dict = {item.split(':')[0]: item.split(':')[1] for item in done_inroads_list} fail_dir_list, fail_desc = [], '失败的方向有:' for src_dir in rerun_list: if src_dir not in done_src_dir_dict: fail_dir_list.append(src_dir) fail_desc += f'{srcDir_toStr(src_dir)}' + '进口: 方向信息异常' continue if done_src_dir_dict[src_dir] != '2': fail_dir_list.append(src_dir) fail_desc += f'{srcDir_toStr(src_dir)}' + '进口: 当前方向任务状态未完成' continue done_src_dir_dict[src_dir] = '0' done_inroads_str = '|'.join([key + ':' + done_src_dir_dict[key] for key in done_src_dir_dict]) ret = db_tmnet.update_cross_survey_job_status(job_info, jobid, done_inroads_str) if not ret: return json.dumps(make_common_res(9, '更新任务状态失败')) res = make_common_res(0, 'ok') res['data'] = { 'fail_dir_list': fail_dir_list, 'fail_desc': fail_desc } return json.dumps(res)