cross_doctor/app/cross_monitor_worker.py

386 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2025/11/4 15:13
# @Description: 路口巡检页面相关接口
import json
import logging
from app.monitor_common import *
# 查询当前城市巡检任务可用日期列表
def query_monitor_task_usable_date_list(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少城市信息, 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少辖区信息, 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少用户信息, 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
day_list, week_list = db_cross.query_monitor_task_dates(nodeid, area_id)
tp_desc = db_tmnet.query_city_tp_info(nodeid, area_id)
if not tp_desc:
tp_info = [
"00:00-23:59",
"00:00-07:00",
"07:00-09:00",
"09:00-17:00",
"17:00-19:00",
"19:00-22:00",
"22:00-23:59"
]
peak_tp = [
"07:00-09:00",
"17:00-19:00"
]
else:
tp_info = ["00:00-23:59"]
tps = tp_desc[0]['tp_desc'].split(',')
for item in tps:
tp_info.append(item)
peak_tp = tp_desc[0]['peak_tp'].split(',')
res = make_common_res(0, 'ok')
res['data'] = {
'days': day_list,
'weeks': week_list,
'tp_info': tp_info,
'peak_tp': peak_tp
}
return json.dumps(res, ensure_ascii=False)
# 查询巡检数据详情
def query_monitor_data(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少城市信息, 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少辖区信息, 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少用户信息, 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
date_type = check_param(params, 'date_type')
if not date_type:
return json.dumps(make_common_res(6, '缺少查询时间类型, 请刷新后重试'))
if date_type not in ['day', 'week', 'workday', 'weekend']:
return json.dumps(make_common_res(7, '查询时间类型异常, 请刷新后重试'))
special_time_range = check_param(params, 'special_time_range')
if not special_time_range or special_time_range == '00:00-23:59':
special_time_range = ''
query_date = check_param(params, 'query_date')
if not query_date:
return json.dumps(make_common_res(8, '缺少查询日期, 请刷新后重试'))
row_list = db_cross.query_monitor_data_sql(nodeid, area_id, date_type, query_date)
if len(row_list) < 1:
return json.dumps(make_common_res(9, '没有查询到数据'))
routing_crosses = db_tmnet.query_routing_crosses(nodeid, area_id)
routing_crosses_dict = {item['crossid']: item for item in routing_crosses}
cross_report_pb = pb.xl_cross_report_t()
cross_report_pb.ParseFromString(row_list[0]['data'])
# print(MessageToJson(cross_report_pb, indent=None, always_print_fields_with_no_presence=True))
overview_data = gen_monitor_overview_data(cross_report_pb, date_type, routing_crosses, special_time_range, routing_crosses_dict)
division_list, company_list, slc_company_dict, slckind_list, detector_type_dict = gen_ledger_base_info(nodeid, area_id)
monitor_crosses_ledger_info = gen_monitor_cross_ledger_info(routing_crosses, nodeid, area_id, slc_company_dict, date_type, query_date)
cross_delay_info_list = gen_cross_delay_info_list(userid, area_id, nodeid, date_type, cross_report_pb, special_time_range, routing_crosses_dict, slc_company_dict)
res = make_common_res(0, 'ok')
res['data'] = {
'overview_data': overview_data,
'monitor_crosses_ledger_info': monitor_crosses_ledger_info,
'cross_delay_info_list': cross_delay_info_list
}
return json.dumps(clean_dict_nan(res, '-'), ensure_ascii=False)
# 查询巡检数据变化趋势
def query_monitor_data_trend(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少城市信息, 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少辖区信息, 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少用户信息, 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
date_type = check_param(params, 'date_type')
if not date_type:
return json.dumps(make_common_res(6, '缺少查询时间类型, 请刷新后重试'))
if date_type not in ['day', 'week', 'workday', 'weekend']:
return json.dumps(make_common_res(7, '查询时间类型异常, 请刷新后重试'))
query_date = check_param(params, 'query_date')
if not query_date:
return json.dumps(make_common_res(8, '缺少查询日期, 请刷新后重试'))
special_time_range = check_param(params, 'special_time_range')
if not special_time_range or special_time_range == '00:00-23:59':
special_time_range = ''
monitor_datas = db_cross.query_monitor_data_trend_sql(nodeid, area_id, date_type)
data = parse_monitor_trend_data(monitor_datas, date_type, special_time_range)
res = make_common_res(0, 'ok')
res['data'] = data
return json.dumps(clean_dict_nan(res, '-'), ensure_ascii=False)
# 查询路口时段数据变化趋势
def query_cross_tp_data_trend(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少城市信息, 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少辖区信息, 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少用户信息, 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
date_type = check_param(params, 'date_type')
if not date_type:
return json.dumps(make_common_res(6, '缺少查询时间类型, 请刷新后重试'))
if date_type not in ['day', 'week', 'workday', 'weekend']:
return json.dumps(make_common_res(7, '查询时间类型异常, 请刷新后重试'))
weekdays = check_param(params, 'weekdays')
if not weekdays:
return json.dumps(make_common_res(8, '缺少查询星期, 请刷新后重试'))
query_type= check_param(params, 'query_type')
if not query_type:
query_type = 0
time_range = check_param(params, 'time_range')
if not time_range:
return json.dumps(make_common_res(10, '缺少查询时段, 请刷新后重试'))
tp_start = int(str(time_range.split('-')[0]).split(':')[0]) * 100 + int(str(time_range.split('-')[0]).split(':')[1])
tp_end = int(str(time_range.split('-')[1]).split(':')[0]) * 100 + int(str(time_range.split('-')[1]).split(':')[1])
if query_type == 1:
tp_start = 't' + str(tp_start)
elif query_type == 2:
tp_start = 'h' + str(tp_start)
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(11, '缺少路口id 请刷新后重试'))
# 查询台账信息 获取路网渠化关系
cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid)
if not cross_ledger_info_dict:
return json.dumps(make_common_res(9, '查询路口信息失败'))
cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict)
roads_dir_dict = gen_road_dir_dict(cross_ledger_info)
if date_type == 'day':
month_ago_date = (datetime.now().date() - timedelta(days=30)).strftime('%Y%m%d')
now_prev_date = (datetime.now().date() - timedelta(days=1)).strftime('%Y%m%d')
date_list = generate_date_range(month_ago_date, now_prev_date)
# 查询近30天的数据
days_data = db_cross.query_cross_delay_info(crossid, nodeid, date_list, tp_start)
data_list = parse_data2pb4monitor(days_data, weekdays)
else:
date_list = gen_ten_weeks_ago_data_list_with_weekdays(date_type)
data_list = []
for week_dates in date_list:
week_data = db_cross.query_cross_delay_info(crossid, nodeid, week_dates, tp_start)
week_cross_delay_info = gen_avg_cross_delay_pb(week_data, weekdays)
data_list.append({
'day': week_dates[0] + '-' + week_dates[-1],
'tp_start': tp_start,
'tp_end': tp_end,
'data': week_cross_delay_info,
'week_dates': week_dates
})
cross_delay_dict = parse_single_cross_delay_info4monitor(crossid, nodeid, data_list, date_type, roads_dir_dict)
res_data = calc_single_day_delay_info_change_rate4monitor(cross_delay_dict)
res = make_common_res(0, 'ok')
weekdays_str = gen_week_str({'weekday': weekdays})
res['data'] = {
'name': cross_static_info['name'],
'weekdays': weekdays_str,
'time_range': time_range,
'delay_infos': res_data
}
return json.dumps(clean_dict_nan(res, '-'), ensure_ascii=False)
def query_monitor_problems(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少城市信息, 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少辖区信息, 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少用户信息, 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
date_type = check_param(params, 'date_type')
if not date_type:
return json.dumps(make_common_res(6, '缺少查询时间类型, 请刷新后重试'))
if date_type not in ['day', 'week', 'workday', 'weekend']:
return json.dumps(make_common_res(7, '查询时间类型异常, 请刷新后重试'))
special_time_range = check_param(params, 'special_time_range')
if not special_time_range or special_time_range == '00:00-23:59':
special_time_range = ''
query_date = check_param(params, 'query_date')
if not query_date:
return json.dumps(make_common_res(8, '缺少查询日期, 请刷新后重试'))
filter_shield = check_param(params, 'filter_shield')
if not filter_shield:
filter_shield = 0
shield_info = {
'normal': {},
'tide': {},
'sample': {},
'phase': []
}
row_list = db_cross.query_monitor_data_sql(nodeid, area_id, date_type, query_date)
if len(row_list) < 1:
return json.dumps(make_common_res(9, '没有查询到数据'))
routing_crosses = db_tmnet.query_routing_crosses(nodeid, area_id)
routing_crosses_dict = {item['crossid']: item for item in routing_crosses}
cross_roads_dir_dict = gen_crossids_roads_dir_dict_by_mysql([item['crossid'] for item in routing_crosses], nodeid)
for crossid in routing_crosses_dict.keys():
routing_crosses_dict[crossid]['roads_dir_dict'] = cross_roads_dir_dict[crossid]
# for crossid in routing_crosses_dict.keys():
# # cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid)
# # cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict)
# roads_dir_dict = gen_roads_dir_dict_by_mysql(crossid, nodeid)
# routing_crosses_dict[crossid]['roads_dir_dict'] = roads_dir_dict
# # cross_inroads = db_tmnet.query_cross_inroads(crossid, nodeid)
# # inroad_static_info_dict = {item['roadid']: item for item in cross_inroads}
# # routing_crosses_dict[crossid]['inroad_static_info_dict'] = inroad_static_info_dict
# # routing_crosses_dict[crossid]['cross_ledger_info'] = cross_ledger_info
cross_report_pb = pb.xl_cross_report_t()
cross_report_pb.ParseFromString(row_list[0]['data'])
cross_problem_records = db_cross.query_cross_problem_record(area_id, query_date, date_type)
records = {
'normal': {},
'tide': {}
}
for row in cross_problem_records:
if row['tide_cross'] and row['tide_cross'] != 0:
records['tide'][row['crossid']] = row['tide_cross']
continue
crossid = row['crossid']
weekdays = row['weekday']
time_range = row['time_range']
date_type = row['date_type']
key = crossid + '^' + weekdays + '^' + time_range + '^' + date_type
records['normal'][key] = row
# if filter_shield == 1:
shield_row_list = db_cross.query_shield_records(area_id)
for row in shield_row_list:
problem_key = row['problem_key']
if problem_key in ('tide_cross', 'unmatched_lane_num'):
if row['crossid'] not in shield_info['tide'].keys():
shield_info['tide'][row['crossid']] = [problem_key]
else:
shield_info['tide'][row['crossid']].append(problem_key)
elif problem_key == 'phase_error':
first_date = row['first_date']
if not first_date:
continue
shield_info['phase'].append(row['crossid'] + '^' + row['time_range'] + '^' + row['first_date'])
elif problem_key in ('sample_sch', 'sample_tp'):
key = row['crossid'] + '^' + row['weekdays']
if key not in shield_info['sample'].keys():
shield_info['sample'][key] = [problem_key]
else:
shield_info['sample'][key].append(problem_key)
else:
key = row['crossid'] + '^' + row['weekdays'] + '^' + row['time_range'] + '^' + row['date_type']
if key not in shield_info['normal'].keys():
shield_info['normal'][key] = [problem_key]
else:
shield_info['normal'][key].append(problem_key)
monitor_problems = gen_monitor_problem_info(cross_report_pb, routing_crosses_dict, special_time_range, records, date_type, shield_info, query_date, nodeid, area_id, filter_shield)
res = make_common_res(0, 'ok')
res['data'] = monitor_problems
return json.dumps(clean_dict_nan(res, '-'), ensure_ascii=False)
def update_cross_problem_shield_state(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少城市信息, 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少辖区信息, 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少用户信息, 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
weekdays = check_param(params, 'weekdays')
if not weekdays:
weekdays = ''
time_range = check_param(params, 'time_range')
if not time_range:
time_range = ''
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(6, '缺少路口信息, 请刷新后重试'))
problem_key = check_param(params, 'problem_key')
if not problem_key:
return json.dumps(make_common_res(7, '缺少问题类型, 请刷新后重试'))
op_type = check_param(params, 'op_type')
if not op_type:
return json.dumps(make_common_res(8, '缺少操作类型, 请刷新后重试'))
date_type = check_param(params, 'date_type')
if not date_type:
return json.dumps(make_common_res(9, '缺少查询日期类型, 请刷新后重试'))
first_date = check_param(params, 'first_date')
if not first_date:
first_date = ''
if problem_key == 'phase_error':
if not first_date or first_date == '':
return json.dumps(make_common_res(10, '缺少开始时间, 请选择开始时间'))
if op_type == 'add':
ret = db_cross.add_shield_records(crossid ,area_id, weekdays, time_range, problem_key, date_type, first_date)
if ret == 1:
return json.dumps(make_common_res(0, 'ok'))
else:
return json.dumps(make_common_res(10, '添加屏蔽信息失败'))
elif op_type == 'del':
ret = db_cross.delete_shield_records(crossid, area_id, weekdays, time_range, problem_key, date_type, first_date)
if ret == 1:
return json.dumps(make_common_res(0, 'ok'))
else:
return json.dumps(make_common_res(11, '删除屏蔽信息失败'))
else:
return json.dumps(make_common_res(9, '操作类型异常'))