cross_doctor/app/flow_worker.py

543 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import copy
import io
import json
from io import BytesIO
from app.common_worker import *
from app.eva_common import *
from openpyxl import Workbook
from openpyxl.styles import Alignment
from datetime import datetime
# 查询可用路口列表
from proto.phase_grpc import QueryCrossRunningPhase
from flask import send_file
# 一天内的一刻时段个数
QUARTER_COUNT = 96
# 查询路口可用时段
def cross_flow_usable_date(params):
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(1, '缺少crossid 请刷新后重试'))
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
row_list, error = db_cross.query_cross_flow_usable_date_sql(nodeid, crossid)
if error:
return json.dumps(make_common_res(5, f"{error}"))
res = make_common_res(0, 'ok')
res['data'] = []
if len(row_list) <= 0:
return json.dumps(res, ensure_ascii=False)
date_list = [row['day'] for row in row_list]
res = make_common_res(0, 'ok')
res['data'] = date_list
return json.dumps(res, ensure_ascii=False)
def cross_flow_tp_divide(params):
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(1, '缺少crossid 请刷新后重试'))
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
date_list = check_param(params, 'date_list')
if not date_list or len(date_list) == 0:
return json.dumps(make_common_res(2, '缺少date_list 请刷新后重试'))
date_type = check_param(params, 'date_type')
if not date_type:
return json.dumps(make_common_res(2, '缺少date_type 请刷新后重试'))
week_days = [1, 2, 3, 4, 5, 6, 7]
if date_type == 'workday':
week_days = [1, 2, 3, 4, 5]
if date_type == 'weekend':
week_days = [6, 7]
search_date_list = []
search_week_day_map = {}
for item_date in date_list:
date_time = datetime.strptime(str(item_date), '%Y%m%d')
date_weekday = date_time.weekday() + 1
if date_weekday in week_days:
search_week_day_map[date_weekday] = True
search_date_list.append(item_date)
res = make_common_res(0, 'ok')
res['data'] = {
'flow_data': [],
'recommend_tp': [],
'schedule_info': []
}
if len(search_date_list) == 0:
return json.dumps(res, ensure_ascii=False)
flow_data, error = db_cross.query_cross_flowdata(str(nodeid), crossid, search_date_list)
if error:
return json.dumps(make_common_res(2, f"{error}"))
if len(flow_data) <= 0:
return json.dumps(res, ensure_ascii=False)
flow_data_pb_list = []
for item_flow_data in flow_data:
item_flow_pb = pb.xl_cross_flowdata_t()
item_flow_pb.ParseFromString(item_flow_data['data'])
flow_data_pb_list.append(item_flow_pb)
merged_flow = merge_cross_flow(flow_data_pb_list)
if not merged_flow:
return json.dumps(res, ensure_ascii=False)
hm_idx_list, flow_list = do_cross_tp_divide(merged_flow.flows)
if not flow_list or len(flow_list) == 0:
return json.dumps(res, ensure_ascii=False)
tp_list = trans_to_tp_list(hm_idx_list)
recommend_tp = {}
if tp_list and len(tp_list) > 0:
for item_tp_list in tp_list:
res['data']['recommend_tp'].append(item_tp_list[0])
recommend_tp[item_tp_list[0]] = 1
current_time = datetime.strptime("00:00", "%H:%M")
for i in range(0, len(flow_list)):
time_str = current_time.strftime("%H:%M")
res['data']['flow_data'].append({
"flow": round(flow_list[i], 2),
"tp": time_str,
"recommend": 1 if recommend_tp.get(time_str) and time_str != '00:00' else 0
})
current_time += timedelta(minutes=15)
#查询配时方案
tps, error = db_phasetable.query_cross_tps_by_crossid(int(nodeid), crossid)
if error:
return json.dumps(make_common_res(2, f"{error}"), ensure_ascii=False)
if len(tps) <= 0:
return res
tps_map = {}
for item_tps in tps:
if item_tps['weekday'] not in tps_map:
tps_map[item_tps['weekday']] = {
'tp': [],
'scheduleid': item_tps['scheduleid'],
}
tps_map[item_tps['weekday']]['tp'].append(item_tps['tp_start'])
search_week_day = list(search_week_day_map.keys())
for weeks, tp_value in tps_map.items():
weeks_list = list(map(int, weeks.split(',')))
intersection = list(set(search_week_day) & set(weeks_list))
if len(intersection) == 0:
continue
plans, error = db_phasetable.day_schedule_by_xlcrossid(int(nodeid), crossid, tp_value['scheduleid'])
if error:
return json.dumps(make_common_res(2, f"{error}"), ensure_ascii=False)
week_name = []
if weeks == '1,2,3,4,5,6,7':
week_name = ['全周']
if weeks == '1,2,3,4,5':
week_name = ['工作日']
if weeks == '6,7':
week_name = ['周末']
if len(week_name) == 0:
week_slice = weeks.split(',')
for item_week in week_slice:
week_name.append(g_week_day[int(item_week)])
item_data = {
'scheduleid': tp_value['scheduleid'],
'schedule_week': weeks,
'schedule_name': ','.join(week_name),
'tps': [],
}
plans_map = {}
for item_plan in plans:
plans_map[item_plan['tp_start']] = item_plan
if len(tp_value['tp']) > 0:
tp_interval = tplist_2_tpinterval(tp_value['tp'])
for item_tp_interval in tp_interval:
if item_tp_interval['tp_start'] in plans_map:
item_data['tps'].append({
'tp_start': item_tp_interval['tp_start'],
'tp_end': item_tp_interval['tp_end'],
'plan_name': plans_map[item_tp_interval['tp_start']]['name'],
'plan_id': plans_map[item_tp_interval['tp_start']]['planid'],
})
res['data']['schedule_info'].append(item_data)
return json.dumps(res, ensure_ascii=False)
def merge_cross_flow(crossflow_list: list[pb.xl_cross_flowdata_t]):
"""将多个xl_cross_flowdata_t数据合并为一个
:param crossflow_list: 数组
:return: 合并后的pb消息
"""
avg_flow = pb.xl_cross_flowdata_t()
if len(crossflow_list) == 0:
return None
if len(crossflow_list) == 1:
return crossflow_list[0]
avg_flow.crossid = crossflow_list[0].crossid
for i in range(0, QUARTER_COUNT):
avg_flow.flows.append(0)
for item in crossflow_list:
flows = item.flows
for i in range(0, len(flows)):
avg_flow.flows[i] += flows[i]
return avg_flow
def get_cross_tp_divide(orig_flow_list: list[float]) -> list[str]:
"""输入路口流量数据,执行时段划分
:param orig_flow_list: 原始流量数组约定包含96个元素
:return: list[str], 时段列表
"""
# (1)执行时段划分
hm_idx_list, flow_list = do_cross_tp_divide(orig_flow_list)
# (2)生成时段序列, tp_list的数据结构: [["00:00","06:30"],["06:30","07:30"],["07:30","08:30"]]
tp_list = trans_to_tp_list(hm_idx_list)
return tp_list
def do_cross_tp_divide(orig_flow_list: list[float]):
"""
输入一个路口的flow数据执行时段划分。返回1个划分结果 和 标准流量数据序列
:param crossflow:
:return: hm_idx_list, std_flow_list
"""
# 对原始流量进行平滑处理
smooth_flow_list = get_smoothed_flow(orig_flow_list)
# 计算相对流量
std_flow_list = get_normlize_flow(smooth_flow_list)
# 执行时段划分
idx_list = _split_tps_by_fallhead(std_flow_list)
return idx_list, std_flow_list
def get_smoothed_flow(flow_list: list[float]) -> list[float]:
"""
对原始流量数组进行平滑处理,采用邻域平均法
:param flow_list: [float]
:return: [float]
"""
# 计算平均流量
avg_flow = sum(flow_list) / len(flow_list)
# 根据不同的平均流量,设置不同的权重
# print(f'avg_flow: {avg_flow}')
center_w = 0.6
left_w = 0.2
right_w = 0.2
if avg_flow < 10:
center_w = 0.4
left_w = 0.3
right_w = 0.3
# print(f'center_w: {center_w}, left_w: {left_w}, right_w: {right_w}')
smoothed_flow_list = []
max_idx = len(flow_list) - 1
for i in range(0, len(flow_list)):
sum_value = 0
sum_w = 0
sum_w += center_w
sum_value += flow_list[i] * center_w
if i + 1 >= 0 and i + 1 <= max_idx:
sum_w += right_w
sum_value += flow_list[i + 1] * right_w
if i - 1 >= 0 and i - 1 <= max_idx:
sum_w += left_w
sum_value += flow_list[i - 1] * left_w
smoothed_flow = sum_value / sum_w
smoothed_flow_list.append(smoothed_flow)
return smoothed_flow_list
def get_normlize_flow(flow_list: list[float]) -> list[float]:
"""
对原始流量数组进行归一化以最大流量作为100对其他流量进行等比缩放
:param flow_list: [float]
:return: [float]
"""
max_flow = 0
for flow in flow_list:
if flow > max_flow:
max_flow = flow
norm_flow_list = []
for flow in flow_list:
norm_flow = flow / max_flow * 100 if max_flow != 0 else 0
norm_flow_list.append(norm_flow)
return norm_flow_list
def _split_tps_by_fallhead(std_flow_list, max_tp_num=8):
"""
根据相邻时段的流量落差来划分时段, 返回用于切分时段的刻钟时段的序号的数组
:param std_flow_list: 标准流量序列
:param max_tp_num: 最大时段个数
:return: [hm_idx]
"""
min_tp_num = 5 # 最小时段个数
# 低流量阈值
low_flow_limit = 40
# (1)计算所有相邻刻钟的相对流量落差
fallhead_map = {}
"""包含 QUARTER_COUNT-1 个元素 {idx => drop}, 表示idx-1与idx之间的落差"""
for i in range(1, QUARTER_COUNT):
drop = _calc_drop(std_flow_list, i)
# 如果两侧的流量都低于该阈值就不再考虑落差一律设定为0
if std_flow_list[i] <= low_flow_limit and std_flow_list[i - 1] <= low_flow_limit:
drop = 0
fallhead_map[i] = drop
# (2)按落差降序排序
drop_info_list = sorted(fallhead_map.items(), key=lambda item: item[1], reverse=True)
# print(drop_info_list)
# (3) 提取落差最大的部分分界点
top_drop_idx_list = []
for info in drop_info_list:
idx = info[0]
drop = info[1]
if drop < 5: # 落差低于5%不再分割
break
top_drop_idx_list.append(idx)
# 如果落差不太差且已经满足max_tp_num则不再分割
if drop < 15 and len(top_drop_idx_list) >= max_tp_num - 1:
break
# 梳理得到顺序排序的分界点
split_idx_list = sorted(top_drop_idx_list)
boundary_list = [0] #
for idx in split_idx_list:
boundary_list.append(idx)
boundary_list.append(QUARTER_COUNT)
# 刻钟独立时段合并
# drop_limit_cannot_merge = 30 # 落差大于这个阈值,即使是单个刻钟时段,则不能合并
drop_limit_cannot_merge = 50 # 落差大于这个阈值,即使是单个刻钟时段,则不能合并
tp_num = len(boundary_list) - 1 # 当前时段个数
for i in range(1, len(boundary_list)):
if tp_num <= min_tp_num:
break
half_num = boundary_list[i] - boundary_list[i - 1]
if half_num == 1: # 只有1个刻钟则考虑合并到左侧或右侧
if i == 1:
# 只能合并到右侧
right_drop = fallhead_map[boundary_list[i]]
if right_drop <= drop_limit_cannot_merge:
boundary_list[i] = -1
tp_num -= 1
elif i == len(boundary_list) - 1:
# 只能合并到左侧
left_drop = fallhead_map[boundary_list[i - 1]]
if left_drop <= drop_limit_cannot_merge:
boundary_list[i - 1] = -1
else:
left_drop = fallhead_map[boundary_list[i - 1]]
right_drop = fallhead_map[boundary_list[i]]
if left_drop <= right_drop and left_drop <= drop_limit_cannot_merge:
# 合并到左侧
boundary_list[i - 1] = -1
tp_num -= 1
elif right_drop < left_drop and right_drop <= drop_limit_cannot_merge:
boundary_list[i] = -1
tp_num -= 1
# 合并落差小的相邻时段,以满足最大时段要求
if tp_num > max_tp_num:
need_reduce_tp_num = tp_num - max_tp_num
idx_to_index = {}
for i in range(0, len(boundary_list)):
if boundary_list[i] > 0:
idx_to_index[boundary_list[i]] = i
top_drop_idx_list.reverse()
kill_tp_num = 0
for i in range(0, len(top_drop_idx_list)):
idx = top_drop_idx_list[i]
if idx in idx_to_index:
index = idx_to_index[idx]
boundary_list[index] = -1
kill_tp_num += 1
if kill_tp_num >= need_reduce_tp_num:
break
# 梳理最终的分界点
hm_idx_list = []
for i in range(0, len(boundary_list)):
if boundary_list[i] < 0:
continue
hm_idx_list.append(boundary_list[i])
return hm_idx_list
def _calc_drop(flow_list, i):
"""
计算流量序列中第i个流量与前一个流量的落差
:param flow_list: 流量数组
:param i:
:return:
"""
drop = abs(flow_list[i] - flow_list[i - 1])
# 计算落差折减系数,当相对流量整体偏高,进行折减
factor = (flow_list[i] + flow_list[i - 1]) / 100
if factor < 1.0:
factor = 1.0
# 适当折减
drop = drop / factor
return drop
def trans_to_tp_list(hm_idx_list: list[int]) -> list[str]:
"""将刻钟序号数组,转化为时段数组
:param hm_idx_list: 刻钟序号构成的数组,用于表示时段划分结果
:return: 时段字符串数组
"""
hm_list = []
for hm_idx in hm_idx_list:
num = hm_idx * 15
h = num // 60
m = num % 60
hm = '%02d:%02d' % (h, m)
hm_list.append(hm)
tp_list = []
for i in range(1, len(hm_list)):
tp_list.append([hm_list[i - 1], hm_list[i]])
return tp_list
def auto_match_phase(params):
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(1, '缺少crossid 请刷新后重试'))
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
scheduleid = check_param(params, 'scheduleid')
if not scheduleid:
return json.dumps(make_common_res(2, '缺少scheduleid 请刷新后重试'))
recommend_tp = check_param(params, 'recommend_tp')
if not recommend_tp or len(recommend_tp) == 0:
return json.dumps(make_common_res(2, '缺少recommend_tp 请刷新后重试'))
cross_phase_info, error = db_phasetable.query_cross_runing_phasetable(int(nodeid), crossid)
if error:
return json.dumps(make_common_res(2, f"{error}"))
scheduleid = int(scheduleid)
res = make_common_res(0, 'ok')
res['data'] = {
'scheduleid': scheduleid,
'plans': []
}
if len(cross_phase_info) == 0:
return json.dumps(res, ensure_ascii=False)
cross_phase_info_map = {}
for item_cross_phase_info in cross_phase_info:
if scheduleid != item_cross_phase_info['scheduleid']:
continue
if item_cross_phase_info['tp_start'] not in cross_phase_info_map:
cross_phase_info_map[item_cross_phase_info['tp_start']] = {
'tp_start': item_cross_phase_info['tp_start'],
'control_mode': item_cross_phase_info['control_mode'],
'coord_phaseid': item_cross_phase_info['coord_phaseid'],
'cycle': item_cross_phase_info['cycle'],
'name': item_cross_phase_info['name'],
'offset': item_cross_phase_info['offset'],
'planid': item_cross_phase_info['planid'],
'stages': [],
}
cross_phase_info_map[item_cross_phase_info['tp_start']]['stages'].append({
'stageid': item_cross_phase_info['stageid'],
'stage_name': item_cross_phase_info['stage_name'],
'allred': item_cross_phase_info['allred'],
'green': item_cross_phase_info['green'],
'max_green': item_cross_phase_info['max_green'],
'min_green': item_cross_phase_info['min_green'],
'phaseids': item_cross_phase_info['phases'],
'phaseid_names': item_cross_phase_info['phase_name'],
'redyellow': item_cross_phase_info['redyellow'],
'stage_duration': item_cross_phase_info['stage_duration'],
'yellow': item_cross_phase_info['yellow'],
})
phase_tps_list = tplist_2_tpinterval(list(cross_phase_info_map.keys()))
for item_phase_tps_list in phase_tps_list:
cross_phase_info_map[item_phase_tps_list['tp_start']]['tp_end'] = item_phase_tps_list['tp_end']
recommend_tp_interval = tplist_2_tpinterval(recommend_tp)
for item_recommend_tp_interval in recommend_tp_interval:
max_calc_diff_minute = 0
max_max_calc_diff_minute = None
for _, item_cross_phase_info_map in cross_phase_info_map.items():
if (item_recommend_tp_interval['tp_end'] > item_cross_phase_info_map['tp_start']
and item_cross_phase_info_map['tp_end'] > item_recommend_tp_interval['tp_start']):
calc_diff_minute = time_intersection_minutes(
f"{item_recommend_tp_interval['tp_start']}-{item_recommend_tp_interval['tp_end']}",
f"{item_cross_phase_info_map['tp_start']}-{item_cross_phase_info_map['tp_end']}")
if calc_diff_minute > max_calc_diff_minute:
max_calc_diff_minute = calc_diff_minute
max_max_calc_diff_minute = copy.deepcopy(item_cross_phase_info_map)
if not max_max_calc_diff_minute:
continue
max_max_calc_diff_minute['tp_start'] = item_recommend_tp_interval['tp_start']
max_max_calc_diff_minute['tp_end'] = item_recommend_tp_interval['tp_end']
if len(res['data']['plans']) == 0:
res['data']['plans'].append(max_max_calc_diff_minute)
continue
if (res['data']['plans'][-1]['tp_end'] == max_max_calc_diff_minute['tp_start']
and res['data']['plans'][-1]['planid'] == max_max_calc_diff_minute['planid']):
res['data']['plans'][-1]['tp_end'] = max_max_calc_diff_minute['tp_end']
continue
res['data']['plans'].append(max_max_calc_diff_minute)
return json.dumps(res, ensure_ascii=False)