cross_doctor/app/cross_monitor_worker.py

387 lines
18 KiB
Python
Raw Normal View History

2025-11-04 15:34:41 +08:00
# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2025/11/4 15:13
# @Description: 路口巡检页面相关接口
import json
from app.monitor_common import *
# 查询当前城市巡检任务可用日期列表
def query_monitor_task_usable_date_list(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少城市信息, 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少辖区信息, 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少用户信息, 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
day_list, week_list = db_cross.query_monitor_task_dates(nodeid, area_id)
tp_desc = db_tmnet.query_city_tp_info(nodeid, area_id)
if not tp_desc:
tp_info = [
2025-12-03 15:13:31 +08:00
"00:00-23:59",
"00:00-07:00",
"07:00-09:00",
"09:00-17:00",
"17:00-19:00",
"19:00-22:00",
2025-12-03 15:13:31 +08:00
"22:00-23:59"
]
peak_tp = [
"07:00-09:00",
"17:00-19:00"
]
else:
2025-12-03 15:13:31 +08:00
tp_info = ["00:00-23:59"]
tps = tp_desc[0]['tp_desc'].split(',')
for item in tps:
tp_info.append(item)
peak_tp = tp_desc[0]['peak_tp'].split(',')
res = make_common_res(0, 'ok')
res['data'] = {
'days': day_list,
'weeks': week_list,
'tp_info': tp_info,
'peak_tp': peak_tp
}
return json.dumps(res, ensure_ascii=False)
# 查询巡检数据详情
def query_monitor_data(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少城市信息, 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少辖区信息, 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少用户信息, 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
date_type = check_param(params, 'date_type')
if not date_type:
return json.dumps(make_common_res(6, '缺少查询时间类型, 请刷新后重试'))
if date_type not in ['day', 'week', 'workday', 'weekend']:
return json.dumps(make_common_res(7, '查询时间类型异常, 请刷新后重试'))
special_time_range = check_param(params, 'special_time_range')
if not special_time_range or special_time_range == '00:00-23:59':
special_time_range = ''
query_date = check_param(params, 'query_date')
if not query_date:
return json.dumps(make_common_res(8, '缺少查询日期, 请刷新后重试'))
row_list = db_cross.query_monitor_data_sql(nodeid, area_id, date_type, query_date)
if len(row_list) < 1:
return json.dumps(make_common_res(9, '没有查询到数据'))
routing_crosses = db_tmnet.query_routing_crosses(nodeid, area_id)
routing_crosses_dict = {item['crossid']: item for item in routing_crosses}
cross_report_pb = pb.xl_cross_report_t()
cross_report_pb.ParseFromString(row_list[0]['data'])
# print(MessageToJson(cross_report_pb, indent=None, always_print_fields_with_no_presence=True))
overview_data = gen_monitor_overview_data(cross_report_pb, date_type, routing_crosses, special_time_range, routing_crosses_dict)
division_list, company_list, slc_company_dict, slckind_list, detector_type_dict = gen_ledger_base_info(nodeid, area_id)
monitor_crosses_ledger_info = gen_monitor_cross_ledger_info(routing_crosses, nodeid, area_id, slc_company_dict, date_type, query_date)
cross_delay_info_list = gen_cross_delay_info_list(userid, area_id, nodeid, date_type, cross_report_pb, special_time_range, routing_crosses_dict, slc_company_dict)
res = make_common_res(0, 'ok')
res['data'] = {
'overview_data': overview_data,
'monitor_crosses_ledger_info': monitor_crosses_ledger_info,
'cross_delay_info_list': cross_delay_info_list
}
return json.dumps(res, ensure_ascii=False)
# 查询巡检数据变化趋势
def query_monitor_data_trend(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少城市信息, 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少辖区信息, 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少用户信息, 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
date_type = check_param(params, 'date_type')
if not date_type:
return json.dumps(make_common_res(6, '缺少查询时间类型, 请刷新后重试'))
if date_type not in ['day', 'week', 'workday', 'weekend']:
return json.dumps(make_common_res(7, '查询时间类型异常, 请刷新后重试'))
query_date = check_param(params, 'query_date')
if not query_date:
return json.dumps(make_common_res(8, '缺少查询日期, 请刷新后重试'))
special_time_range = check_param(params, 'special_time_range')
if not special_time_range or special_time_range == '00:00-23:59':
special_time_range = ''
monitor_datas = db_cross.query_monitor_data_trend_sql(nodeid, area_id, date_type)
data = parse_monitor_trend_data(monitor_datas, date_type, special_time_range)
res = make_common_res(0, 'ok')
res['data'] = data
return json.dumps(res, ensure_ascii=False)
# 查询路口时段数据变化趋势
def query_cross_tp_data_trend(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少城市信息, 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少辖区信息, 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少用户信息, 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
date_type = check_param(params, 'date_type')
if not date_type:
return json.dumps(make_common_res(6, '缺少查询时间类型, 请刷新后重试'))
if date_type not in ['day', 'week', 'workday', 'weekend']:
return json.dumps(make_common_res(7, '查询时间类型异常, 请刷新后重试'))
weekdays = check_param(params, 'weekdays')
if not weekdays:
return json.dumps(make_common_res(8, '缺少查询星期, 请刷新后重试'))
query_type= check_param(params, 'query_type')
if not query_type:
query_type = 0
time_range = check_param(params, 'time_range')
if not time_range:
return json.dumps(make_common_res(10, '缺少查询时段, 请刷新后重试'))
tp_start = int(str(time_range.split('-')[0]).split(':')[0]) * 100 + int(str(time_range.split('-')[0]).split(':')[1])
tp_end = int(str(time_range.split('-')[1]).split(':')[0]) * 100 + int(str(time_range.split('-')[1]).split(':')[1])
if query_type == 1:
tp_start = 't' + str(tp_start)
elif query_type == 2:
tp_start = 'h' + str(tp_start)
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(11, '缺少路口id 请刷新后重试'))
# 查询台账信息 获取路网渠化关系
cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid)
if not cross_ledger_info_dict:
return json.dumps(make_common_res(9, '查询路口信息失败'))
cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict)
roads_dir_dict = gen_road_dir_dict(cross_ledger_info)
if date_type == 'day':
month_ago_date = (datetime.now().date() - timedelta(days=30)).strftime('%Y%m%d')
now_prev_date = (datetime.now().date() - timedelta(days=1)).strftime('%Y%m%d')
date_list = generate_date_range(month_ago_date, now_prev_date)
# 查询近30天的数据
days_data = db_cross.query_cross_delay_info(crossid, nodeid, date_list, tp_start)
data_list = parse_data2pb4monitor(days_data, weekdays)
else:
date_list = gen_ten_weeks_ago_data_list_with_weekdays(date_type)
data_list = []
for week_dates in date_list:
week_data = db_cross.query_cross_delay_info(crossid, nodeid, week_dates, tp_start)
week_cross_delay_info = gen_avg_cross_delay_pb(week_data)
data_list.append({
'day': week_dates[0] + '-' + week_dates[-1],
'tp_start': tp_start,
'tp_end': tp_end,
'data': week_cross_delay_info,
'week_dates': week_dates
})
cross_delay_dict = parse_single_cross_delay_info4monitor(crossid, nodeid, data_list, date_type, roads_dir_dict)
res_data = calc_single_day_delay_info_change_rate4monitor(cross_delay_dict)
res = make_common_res(0, 'ok')
weekdays_str = gen_week_str({'weekday': weekdays})
res['data'] = {
'name': cross_static_info['name'],
'weekdays': weekdays_str,
'time_range': time_range,
'delay_infos': res_data
}
return json.dumps(res, ensure_ascii=False)
def query_monitor_problems(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少城市信息, 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少辖区信息, 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少用户信息, 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
date_type = check_param(params, 'date_type')
if not date_type:
return json.dumps(make_common_res(6, '缺少查询时间类型, 请刷新后重试'))
if date_type not in ['day', 'week', 'workday', 'weekend']:
return json.dumps(make_common_res(7, '查询时间类型异常, 请刷新后重试'))
special_time_range = check_param(params, 'special_time_range')
if not special_time_range or special_time_range == '00:00-23:59':
special_time_range = ''
query_date = check_param(params, 'query_date')
if not query_date:
return json.dumps(make_common_res(8, '缺少查询日期, 请刷新后重试'))
filter_shield = check_param(params, 'filter_shield')
if not filter_shield:
filter_shield = 0
shield_info = {
'normal': {},
'tide': {},
'sample': {},
'phase': []
}
row_list = db_cross.query_monitor_data_sql(nodeid, area_id, date_type, query_date)
if len(row_list) < 1:
return json.dumps(make_common_res(9, '没有查询到数据'))
routing_crosses = db_tmnet.query_routing_crosses(nodeid, area_id)
routing_crosses_dict = {item['crossid']: item for item in routing_crosses}
for crossid in routing_crosses_dict.keys():
cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid)
if not cross_ledger_info_dict:
logging.error('查询路口信息失败', crossid)
continue
cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict)
roads_dir_dict = gen_road_dir_dict(cross_ledger_info)
routing_crosses_dict[crossid]['roads_dir_dict'] = roads_dir_dict
routing_crosses_dict[crossid]['cross_static_info'] = cross_static_info
cross_inroads = db_tmnet.query_cross_inroads(crossid, nodeid)
inroad_static_info_dict = {item['roadid']: item for item in cross_inroads}
routing_crosses_dict[crossid]['inroad_static_info_dict'] = inroad_static_info_dict
routing_crosses_dict[crossid]['cross_ledger_info'] = cross_ledger_info
cross_report_pb = pb.xl_cross_report_t()
cross_report_pb.ParseFromString(row_list[0]['data'])
2025-12-03 17:15:24 +08:00
cross_problem_records = db_cross.query_cross_problem_record(area_id, query_date, date_type)
records = {
'normal': {},
'tide': {}
}
for row in cross_problem_records:
if row['tide_cross'] and row['tide_cross'] != 0:
records['tide'][row['crossid']] = row['tide_cross']
continue
crossid = row['crossid']
weekdays = row['weekday']
time_range = row['time_range']
date_type = row['date_type']
key = crossid + '^' + weekdays + '^' + time_range + '^' + date_type
records['normal'][key] = row
# if filter_shield == 1:
shield_row_list = db_cross.query_shield_records(area_id)
for row in shield_row_list:
problem_key = row['problem_key']
if problem_key in ('tide_cross', 'unmatched_lane_num'):
if row['crossid'] not in shield_info['tide'].keys():
shield_info['tide'][row['crossid']] = [problem_key]
else:
shield_info['tide'][row['crossid']].append(problem_key)
elif problem_key == 'phase_error':
first_date = row['first_date']
if not first_date:
continue
shield_info['phase'].append(row['crossid'] + '^' + row['time_range'] + '^' + row['first_date'])
elif problem_key in ('sample_sch', 'sample_tp'):
key = row['crossid'] + '^' + row['weekdays']
if key not in shield_info['sample'].keys():
shield_info['sample'][key] = [problem_key]
else:
shield_info['sample'][key].append(problem_key)
else:
key = row['crossid'] + '^' + row['weekdays'] + '^' + row['time_range'] + '^' + row['date_type']
if key not in shield_info['normal'].keys():
shield_info['normal'][key] = [problem_key]
else:
shield_info['normal'][key].append(problem_key)
monitor_problems = gen_monitor_problem_info(cross_report_pb, routing_crosses_dict, special_time_range, records, date_type, shield_info, query_date, nodeid, area_id, filter_shield)
res = make_common_res(0, 'ok')
res['data'] = monitor_problems
return json.dumps(res, ensure_ascii=False)
def update_cross_problem_shield_state(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少城市信息, 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少辖区信息, 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少用户信息, 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
weekdays = check_param(params, 'weekdays')
if not weekdays:
weekdays = ''
time_range = check_param(params, 'time_range')
if not time_range:
time_range = ''
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(6, '缺少路口信息, 请刷新后重试'))
problem_key = check_param(params, 'problem_key')
if not problem_key:
return json.dumps(make_common_res(7, '缺少问题类型, 请刷新后重试'))
op_type = check_param(params, 'op_type')
if not op_type:
return json.dumps(make_common_res(8, '缺少操作类型, 请刷新后重试'))
date_type = check_param(params, 'date_type')
if not date_type:
return json.dumps(make_common_res(9, '缺少查询日期类型, 请刷新后重试'))
first_date = check_param(params, 'first_date')
if not first_date:
first_date = ''
if problem_key == 'phase_error':
if not first_date or first_date == '':
return json.dumps(make_common_res(10, '缺少开始时间, 请选择开始时间'))
if op_type == 'add':
ret = db_cross.add_shield_records(crossid ,area_id, weekdays, time_range, problem_key, date_type, first_date)
if ret == 1:
return json.dumps(make_common_res(0, 'ok'))
else:
return json.dumps(make_common_res(10, '添加屏蔽信息失败'))
elif op_type == 'del':
ret = db_cross.delete_shield_records(crossid, area_id, weekdays, time_range, problem_key, date_type, first_date)
if ret == 1:
return json.dumps(make_common_res(0, 'ok'))
else:
return json.dumps(make_common_res(11, '删除屏蔽信息失败'))
else:
return json.dumps(make_common_res(9, '操作类型异常'))