cross_doctor/app/cross_compare_worker.py

605 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2025/11/27 10:15
# @Description: 路口优化对比页面相关接口函数
from app.eva_common import *
from app.cross_compare_common import *
import proto.greenwave_type_pb2 as survey_pb
def query_compare_data(params):
"""
对比数据查询接口
:param data:
:return: json
"""
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(1, '缺少crossid 请刷新后重试'))
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
date_list = check_param(params, 'date_list') # 优化后
if not date_list or len(date_list) < 1:
return json.dumps(make_common_res(7, '缺少日期参数,请最少选择一天作为查询日期'))
compare_date_list = check_param(params, 'compare_date_list') # 优化前
if not compare_date_list or len(compare_date_list) < 1:
return json.dumps(make_common_res(6, '缺少对比日期参数,请最少选择一天作为对比日期'))
query_type = check_param(params, 'query_type')
if not query_type:
query_type = 0
time_range = check_param(params, 'time_range')
if not time_range:
return json.dumps(make_common_res(8, '缺少时段范围,请选择时段范围'))
weekdays = check_param(params, 'weekdays')
if not weekdays:
weekdays = '1,2,3,4,5,6,7'
export = 1 if check_param(params, 'export') and params['export'] == 1 else 0
tp_start = int(str(time_range.split('-')[0]).split(':')[0]) * 100 + int(str(time_range.split('-')[0]).split(':')[1])
# tp_end = int(str(time_range.split('-')[1]).split(':')[0]) * 100 + int(str(time_range.split('-')[1]).split(':')[1])
# if tp_end == 0:
# tp_end = 2400
if query_type == 1:
tp_start = 't' + str(tp_start)
elif query_type == 2:
tp_start = 'h' + str(tp_start)
cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, tp_start)
avg_cross_delay_info = gen_avg_cross_delay_pb(cross_delay_data_list, weekdays)
if not avg_cross_delay_info:
return json.dumps(make_common_res(9, '当前所选日期范围内该评测时段无可用数据'))
comp_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, compare_date_list, tp_start)
avg_comp_cross_delay_info = gen_avg_cross_delay_pb(comp_cross_delay_data_list, weekdays)
if not avg_comp_cross_delay_info:
return json.dumps(make_common_res(10, '当前所选对比日期范围内该评测时段无可用数据'))
cross_inroads = db_tmnet.query_cross_inroads(crossid, nodeid)
inroad_static_info_dict = {item['roadid']: item for item in cross_inroads}
# 路口静态信息及台账信息
cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid)
if not cross_ledger_info_dict:
return json.dumps(make_common_res(10, '查询路口信息失败'))
cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict)
roads_dir_dict = gen_road_dir_dict(cross_ledger_info)
# 路口指标数据概览
overview_res = gen_overview_index(avg_cross_delay_info, inroad_static_info_dict, nodeid, date_list, roads_dir_dict)
comp_overview_res = gen_overview_index(avg_comp_cross_delay_info, inroad_static_info_dict, nodeid, compare_date_list, roads_dir_dict)
final_overview = gen_compare_overview_res(overview_res, comp_overview_res)
# 路段及流向数据概览
road_flow_delay_infos = gen_road_delay_index(avg_cross_delay_info, roads_dir_dict)
comp_road_flow_delay_infos = gen_road_delay_index(avg_comp_cross_delay_info, roads_dir_dict)
compared_inroad_delay_infos = parse_comp_inroad_delay_infos(road_flow_delay_infos, comp_road_flow_delay_infos)
prev_cross_info = get_prev_cross(nodeid, area_id, roads_dir_dict)
res = make_common_res(0, 'ok')
res['data'] = {
'cross_static_info': cross_static_info,
'overview': final_overview,
'ledger_info': cross_ledger_info,
'next_cross_info': prev_cross_info,
'road_flow_delay_infos': compared_inroad_delay_infos
}
if export == 1:
return query_compare_data_export_excel(compared_inroad_delay_infos, cross_static_info['name'],compare_date_list,date_list,time_range)
return json.dumps(clean_dict_nan(res, '-'), ensure_ascii=False)
def do_add_cross_survey_job(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
start_date = check_param(params, 'start_date')
if not start_date:
return json.dumps(make_common_res(6, '缺少开始日期,请选择开始日期'))
crossids = check_param(params, 'crossids')
if not crossids or len(crossids) < 1:
return json.dumps(make_common_res(7, '缺少路口id请选择路口'))
fail_num, err_str, values = 0, '创建实景勘察任务失败的路口为:', []
for crossid in crossids:
existed_jobs_list = db_tmnet.query_cross_survey_job(crossid, area_id)
new_job_end_date = datetime.strptime(start_date, '%Y%m%d') + + timedelta(days=30)
bad_flag = False
cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid)
if not cross_ledger_info_dict:
err_str += ",路口【%s】信息查询失败" % crossid
fail_num += 1
continue
cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict)
roads_dir_dict = gen_road_dir_dict(cross_ledger_info)
for job in existed_jobs_list:
if job['status'] == 1:
err_str += ",路口【%s】已存在进行中任务" % g_roadnet.query_cross(crossid).name
bad_flag = True
continue
elif job['status'] == 0:
if datetime.strptime(str(job['start_day']), '%Y%m%d') < new_job_end_date < datetime.strptime(str(job['end_day']), '%Y%m%d')\
or datetime.strptime(str(job['start_day']), '%Y%m%d') < datetime.strptime(start_date, '%Y%m%d') < datetime.strptime(str(job['end_day']), '%Y%m%d'):
err_str += ",路口【%s】已存在未开始任务" % g_roadnet.query_cross(crossid).name
bad_flag = True
continue
if bad_flag:
fail_num += 1
continue
done_inroads = '|'.join([src_dir + ':0' for src_dir in roads_dir_dict.keys() if roads_dir_dict[src_dir]['in'] != '-'])
inroads_dir = '|'.join([src_dir + ':' + roads_dir_dict[src_dir]['in'] for src_dir in roads_dir_dict.keys() if roads_dir_dict[src_dir]['in'] != '-'])
values.append((crossid, start_date, new_job_end_date.strftime('%Y%m%d'), 0, done_inroads, inroads_dir, nodeid, area_id))
if len(values) > 0:
ret = db_tmnet.insert_cross_survey_job(values)
if ret == len(values):
res = make_common_res(0, 'ok')
else:
res = make_common_res(1, '创建任务失败')
res['data'] = {
'fail_num': fail_num,
'err_str': err_str if err_str != '创建实景勘察任务失败的路口为:' else ''
}
return json.dumps(res)
else:
res = make_common_res(1, '创建任务失败')
res['data'] = {
'fail_num': fail_num,
'err_str': err_str if err_str != '创建实景勘察任务失败的路口为:' else ''
}
return json.dumps(res)
def do_del_cross_survey_job(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
jobid = check_param(params, 'jobid')
if not jobid:
return json.dumps(make_common_res(6, '缺少任务ID请刷新后重试'))
job_info = db_tmnet.query_survey_job_info_by_id(jobid)
if not job_info:
return json.dumps(make_common_res(7, '任务不存在,请刷新后重试'))
if job_info[0]['status'] in [1, 2]:
return json.dumps(make_common_res(8, '任务正在执行中,请勿删除'))
ret = db_tmnet.del_cross_survey_job(jobid)
if ret == 1:
return json.dumps(make_common_res(0, 'ok'))
return json.dumps(make_common_res(9, '删除任务失败'))
def query_cross_survey_job_list(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
keyword = check_param(params, 'keyword')
if not keyword:
keyword = ''
start_date = check_param(params, 'start_date')
if not start_date:
start_date = ''
end_date = check_param(params, 'end_date')
if not end_date:
end_date = ''
page = check_param(params, 'page')
if not page:
page = 1
page_size = check_param(params, 'page_size')
if not page_size:
page_size = 10
job_state = check_param(params, 'job_state')
if not job_state:
job_state = -1
job_state = int(job_state)
start_index = (int(page) - 1) * int(page_size)
end_index = start_index + int(page_size)
job_list = db_tmnet.query_survey_job_info_by_area_id(area_id)
for row in job_list:
row['cross_name'] = g_roadnet.query_cross(row['crossid']).name
complete_day = '-'
status = row['status']
if row['end_day'] < int(datetime.now().strftime('%Y%m%d')) and status != 2:
status = 3
if row['start_day'] > int(datetime.now().strftime('%Y%m%d')):
status = 0
if status == 2:
complete_day = row['update_time'].strftime('%Y%m%d')
row['status'] = status
row['complete_day'] = complete_day
row['update_time'] = row['update_time'].strftime('%Y%m%d')
row['create_time'] = row['create_time'].strftime('%Y%m%d')
if keyword and keyword != '':
job_list = find_job_info(keyword, job_list)
if start_date and start_date != '' and end_date and end_date != '':
job_list = list(filter(lambda item: str(item['end_day']) <= str(end_date) or str(item['start_day']) >= str(start_date) or (item['complete_day'] != '-' and str(start_date) <= str(item['complete_day']) <= str(end_date)), job_list))
if job_state and job_state != -1:
job_list = list(filter(lambda item: item['status'] == job_state, job_list))
job_list = sorted(job_list, key=lambda item: item['update_time'], reverse=True)
total_num = len(job_list)
job_list = job_list[start_index:end_index]
res = make_common_res(0, 'ok')
res['data'] = {
'total_num': total_num,
'job_list': job_list
}
return json.dumps(res)
def rerun_cross_survey_job(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
jobid = check_param(params, 'jobid')
if not jobid:
return json.dumps(make_common_res(6, '缺少任务ID请刷新后重试'))
job_info = db_tmnet.query_survey_job_info_by_id(jobid)
if not job_info:
return json.dumps(make_common_res(7, '任务不存在,请刷新后重试'))
if job_info[0]['status'] in [1, 2]:
return json.dumps(make_common_res(8, '任务正在执行中,无法执行重跑操作'))
ret = db_tmnet.update_cross_survey_job_status(job_info, jobid)
if ret == 1:
return json.dumps(make_common_res(0, 'ok'))
return json.dumps(make_common_res(9, '重跑任务失败'))
def query_usable_survey_crosses(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
area_servey_jobs = db_tmnet.query_survey_job_info_by_area_id(area_id)
usable_crossid_list = list(set([job['crossid'] for job in area_servey_jobs]))
res_list = []
for crossid in usable_crossid_list:
cross_name = g_roadnet.query_cross(crossid).name
crossid = crossid
res_list.append({'crossid': crossid, 'cross_name': cross_name})
res = make_common_res(0, 'ok')
res['data'] = res_list
return json.dumps(res)
def query_cross_survey_usable_dates(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(6, '缺少crossid 请刷新后重试'))
usable_info = {
'crossid': crossid,
'job_info': []
}
existed_jobs_list = db_tmnet.query_cross_survey_job(crossid, area_id)
for row in existed_jobs_list:
start_day = row['start_day']
end_day = '至今'
if row['status'] == 2:
# 判定当前任务状态值如果当前任务状态为2 则任务可查询时间区间的极大值取当前日期和任务完成时间即最新的update_time的较小值
end_day = row['update_time'].strftime('%Y%m%d')
usable_info['job_info'].append({
'jobid': row['id'],
'time_range': str(start_day) + '-' + str(end_day),
})
res = make_common_res(0, 'ok')
res['data'] = usable_info
return json.dumps(res)
def query_cross_survey_result(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(6, '缺少路口id请刷新后重试'))
jobid = check_param(params, 'jobid')
if not jobid:
return json.dumps(make_common_res(7, '缺少任务id请刷新后重试'))
job_info = db_tmnet.query_survey_job_info_by_id(jobid)
done_inroads = job_info[0]['done_inroads']
done_inroads_list = done_inroads.split('|')
done_src_dir_list = []
for item in done_inroads_list:
done_src_dir_list.append(item.split(':')[0])
# 路口静态信息及台账信息
cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid)
if not cross_ledger_info_dict:
return json.dumps(make_common_res(10, '查询路口信息失败'))
cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict)
roads_dir_dict = gen_road_dir_dict(cross_ledger_info)
road_src_dict = {v['in']: k for k, v in roads_dir_dict.items()}
src_images = {}
for item in done_src_dir_list:
item_key = 'csr_' + crossid + '_' + str(jobid) + '_' + item
item_data = db_cross.query_csr_data(nodeid, item_key, crossid)
if not item_data:
logging.error('路口id: %s, 任务id: %s, 源方向: %s, 数据不存在' % (crossid, jobid, item))
continue
item_csr_data = survey_pb.xl_cross_survey_result_t()
item_csr_data.ParseFromString(item_data[0]['data'])
# 20251230 解决更新路网文件导致出现新旧延误数据中存在路段id不一致返回结果异常的补丁
inroadid = item_csr_data.inroadid if item_csr_data.inroadid not in g_road_mapping.keys() else g_road_mapping[item_csr_data.inroadid]
item_csr_data.inroadid = inroadid
if inroadid not in road_src_dict:
logging.error('路口id: %s, 源方向: %s, 数据不存在' % (crossid, inroadid))
continue
src_dir = road_src_dict[inroadid]
pos_list = item_csr_data.pos_list
image_list = []
for pos in pos_list:
speed = pos.pos.speed
image_time = datetime.fromtimestamp(pos.pos.timestamp).strftime('%Y-%m-%d %H:%M:%S')
image_url = pos.image_url
image_location = str(pos.pos.lon / 10000000) + ',' + str(pos.pos.lat / 10000000)
dist = pos.pos.dist
image_list.append({
'src_dir': srcDir_toStr(src_dir) + '进口',
'speed': speed,
'image_time': image_time,
'image_url': image_url,
'image_location': image_location,
'dist': dist
})
image_list = sorted(image_list, key=lambda x: x['image_time'])
src_images[src_dir] = image_list
dir_info = {}
for item in done_inroads_list:
dir_info[item.split(':')[0]] = 0 if item.split(':')[1] != '2' else 1
res = make_common_res(0, 'ok')
res['data'] = {
'crossid': crossid,
'name': cross_static_info['name'],
'location': cross_static_info['location'],
'src_images': src_images,
'ledger_info': cross_ledger_info,
'can_rerun_dir': dir_info
}
return json.dumps(clean_dict_nan(res, '-'), ensure_ascii=False)
def rerun_cross_survey_dir(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(6, '缺少路口id请刷新后重试'))
jobid = check_param(params, 'jobid')
if not jobid:
return json.dumps(make_common_res(7, '缺少任务id请刷新后重试'))
rerun_list = check_param(params, 'rerun_list')
if not rerun_list or len(rerun_list) < 1:
return json.dumps(make_common_res(8, '缺少重跑方向,请刷新后重试'))
job_info = db_tmnet.query_survey_job_info_by_id(jobid)
done_inroads = job_info[0]['done_inroads']
done_inroads_list = done_inroads.split('|')
done_src_dir_dict = {item.split(':')[0]: item.split(':')[1] for item in done_inroads_list}
fail_dir_list, fail_desc = [], '失败的方向有:'
for src_dir in rerun_list:
if src_dir not in done_src_dir_dict:
fail_dir_list.append(src_dir)
fail_desc += f'{srcDir_toStr(src_dir)}' + '进口: 方向信息异常'
continue
if done_src_dir_dict[src_dir] != '2':
fail_dir_list.append(src_dir)
fail_desc += f'{srcDir_toStr(src_dir)}' + '进口: 当前方向任务状态未完成'
continue
done_src_dir_dict[src_dir] = '0'
done_inroads_str = '|'.join([key + ':' + done_src_dir_dict[key] for key in done_src_dir_dict])
ret = db_tmnet.update_cross_survey_job_status(job_info, jobid, done_inroads_str)
if not ret:
return json.dumps(make_common_res(9, '更新任务状态失败'))
res = make_common_res(0, 'ok')
res['data'] = {
'fail_dir_list': fail_dir_list,
'fail_desc': fail_desc
}
return json.dumps(res)
def query_cross_survey_pngs(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(6, '缺少路口id请刷新后重试'))
cross_roads_dir_dict = gen_crossids_roads_dir_dict_by_mysql([crossid], nodeid)
road_src_dict = cross_roads_dir_dict[crossid]
wave_cross_survey_result = query_wave_cross_survey_result(userid, crossid)
existed_jobs_list = db_tmnet.query_cross_survey_job(crossid, area_id)
cross_png_res = {}
for src_dir in road_src_dict:
cross_png_res[src_dir] = wave_cross_survey_result[src_dir] if src_dir in wave_cross_survey_result else ''
if not existed_jobs_list:
res = make_common_res(0, 'ok')
res['data'] = cross_png_res
return json.dumps(res)
job_list = sorted(existed_jobs_list, key=lambda x: x['create_time'], reverse=True)
usable_src_keys = {}
if job_list[0]['status'] != 0:
jobid = job_list[0]['id']
done_inroads = job_list[0]['done_inroads']
done_inroads_list = done_inroads.split('|')
for item in done_inroads_list:
if item.split(':')[1] == '2':
item_key = 'csr_' + crossid + '_' + str(jobid) + '_' + item.split(':')[0]
usable_src_keys[item.split(':')[0]] = item_key
if len(job_list) > 1:
jobid = job_list[1]['id']
done_inroads = job_list[1]['done_inroads']
done_inroads_list = done_inroads.split('|')
for item in done_inroads_list:
if item.split(':')[0] in usable_src_keys.keys():
continue
item_key = 'csr_' + crossid + '_' + str(jobid) + '_' + item.split(':')[0]
usable_src_keys[item.split(':')[0]] = item_key
for src_dir in usable_src_keys.keys():
item_key = usable_src_keys[src_dir]
item_data = db_cross.query_csr_data(nodeid, item_key, crossid)
if not item_data:
logging.error('路口id: %s, 任务id: %s, 源方向: %s, 数据不存在' % (crossid, jobid, item))
continue
item_csr_data = survey_pb.xl_cross_survey_result_t()
item_csr_data.ParseFromString(item_data[0]['data'])
# 20251230 解决更新路网文件导致出现新旧延误数据中存在路段id不一致返回结果异常的补丁
inroadid = item_csr_data.inroadid if item_csr_data.inroadid not in g_road_mapping.keys() else g_road_mapping[item_csr_data.inroadid]
item_csr_data.inroadid = inroadid
if inroadid not in road_src_dict:
logging.error('路口id: %s, 源方向: %s, 数据不存在' % (crossid, inroadid))
continue
src_dir = road_src_dict[inroadid]
pos_list = item_csr_data.pos_list
image_list = []
for pos in pos_list:
speed = pos.pos.speed
image_time = datetime.fromtimestamp(pos.pos.timestamp).strftime('%Y-%m-%d %H:%M:%S')
image_url = pos.image_url
image_location = str(pos.pos.lon / 10000000) + ',' + str(pos.pos.lat / 10000000)
dist = pos.pos.dist
image_list.append({
'src_dir': srcDir_toStr(src_dir) + '进口',
'speed': speed,
'image_time': image_time,
'image_url': image_url,
'image_location': image_location,
'dist': dist
})
cross_png_res[src_dir] = get_cross_png(image_list)
res = make_common_res(0, 'ok')
res['data'] = cross_png_res
return json.dumps(res)