cross_doctor/app/flow_worker.py

543 lines
21 KiB
Python
Raw Normal View History

import copy
import io
import json
from io import BytesIO
from app.common_worker import *
from app.eva_common import *
from openpyxl import Workbook
from openpyxl.styles import Alignment
from datetime import datetime
# 查询可用路口列表
from proto.phase_grpc import QueryCrossRunningPhase
from flask import send_file
# 一天内的一刻时段个数
QUARTER_COUNT = 96
# 查询路口可用时段
def cross_flow_usable_date(params):
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(1, '缺少crossid 请刷新后重试'))
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
row_list, error = db_cross.query_cross_flow_usable_date_sql(nodeid, crossid)
if error:
return json.dumps(make_common_res(5, f"{error}"))
res = make_common_res(0, 'ok')
res['data'] = []
if len(row_list) <= 0:
return json.dumps(res, ensure_ascii=False)
date_list = [row['day'] for row in row_list]
res = make_common_res(0, 'ok')
res['data'] = date_list
return json.dumps(res, ensure_ascii=False)
def cross_flow_tp_divide(params):
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(1, '缺少crossid 请刷新后重试'))
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
date_list = check_param(params, 'date_list')
if not date_list or len(date_list) == 0:
return json.dumps(make_common_res(2, '缺少date_list 请刷新后重试'))
date_type = check_param(params, 'date_type')
if not date_type:
return json.dumps(make_common_res(2, '缺少date_type 请刷新后重试'))
week_days = [1, 2, 3, 4, 5, 6, 7]
if date_type == 'workday':
week_days = [1, 2, 3, 4, 5]
if date_type == 'weekend':
week_days = [6, 7]
search_date_list = []
search_week_day_map = {}
for item_date in date_list:
date_time = datetime.strptime(str(item_date), '%Y%m%d')
date_weekday = date_time.weekday() + 1
if date_weekday in week_days:
search_week_day_map[date_weekday] = True
search_date_list.append(item_date)
res = make_common_res(0, 'ok')
res['data'] = {
'flow_data': [],
'recommend_tp': [],
'schedule_info': []
}
if len(search_date_list) == 0:
return json.dumps(res, ensure_ascii=False)
flow_data, error = db_cross.query_cross_flowdata(str(nodeid), crossid, search_date_list)
if error:
return json.dumps(make_common_res(2, f"{error}"))
if len(flow_data) <= 0:
return json.dumps(res, ensure_ascii=False)
flow_data_pb_list = []
for item_flow_data in flow_data:
item_flow_pb = pb.xl_cross_flowdata_t()
item_flow_pb.ParseFromString(item_flow_data['data'])
flow_data_pb_list.append(item_flow_pb)
merged_flow = merge_cross_flow(flow_data_pb_list)
if not merged_flow:
return json.dumps(res, ensure_ascii=False)
hm_idx_list, flow_list = do_cross_tp_divide(merged_flow.flows)
if not flow_list or len(flow_list) == 0:
return json.dumps(res, ensure_ascii=False)
tp_list = trans_to_tp_list(hm_idx_list)
recommend_tp = {}
if tp_list and len(tp_list) > 0:
for item_tp_list in tp_list:
res['data']['recommend_tp'].append(item_tp_list[0])
recommend_tp[item_tp_list[0]] = 1
current_time = datetime.strptime("00:00", "%H:%M")
for i in range(0, len(flow_list)):
time_str = current_time.strftime("%H:%M")
res['data']['flow_data'].append({
"flow": round(flow_list[i], 2),
"tp": time_str,
"recommend": 1 if recommend_tp.get(time_str) and time_str != '00:00' else 0
})
current_time += timedelta(minutes=15)
#查询配时方案
tps, error = db_phasetable.query_cross_tps_by_crossid(int(nodeid), crossid)
if error:
return json.dumps(make_common_res(2, f"{error}"), ensure_ascii=False)
if len(tps) <= 0:
return res
tps_map = {}
for item_tps in tps:
if item_tps['weekday'] not in tps_map:
tps_map[item_tps['weekday']] = {
'tp': [],
'scheduleid': item_tps['scheduleid'],
}
tps_map[item_tps['weekday']]['tp'].append(item_tps['tp_start'])
search_week_day = list(search_week_day_map.keys())
for weeks, tp_value in tps_map.items():
weeks_list = list(map(int, weeks.split(',')))
intersection = list(set(search_week_day) & set(weeks_list))
if len(intersection) == 0:
continue
plans, error = db_phasetable.day_schedule_by_xlcrossid(int(nodeid), crossid, tp_value['scheduleid'])
if error:
return json.dumps(make_common_res(2, f"{error}"), ensure_ascii=False)
week_name = []
if weeks == '1,2,3,4,5,6,7':
week_name = ['全周']
if weeks == '1,2,3,4,5':
week_name = ['工作日']
if weeks == '6,7':
week_name = ['周末']
if len(week_name) == 0:
week_slice = weeks.split(',')
for item_week in week_slice:
week_name.append(g_week_day[int(item_week)])
item_data = {
'scheduleid': tp_value['scheduleid'],
'schedule_week': weeks,
'schedule_name': ','.join(week_name),
'tps': [],
}
plans_map = {}
for item_plan in plans:
plans_map[item_plan['tp_start']] = item_plan
if len(tp_value['tp']) > 0:
tp_interval = tplist_2_tpinterval(tp_value['tp'])
for item_tp_interval in tp_interval:
if item_tp_interval['tp_start'] in plans_map:
item_data['tps'].append({
'tp_start': item_tp_interval['tp_start'],
'tp_end': item_tp_interval['tp_end'],
'plan_name': plans_map[item_tp_interval['tp_start']]['name'],
'plan_id': plans_map[item_tp_interval['tp_start']]['planid'],
})
res['data']['schedule_info'].append(item_data)
return json.dumps(res, ensure_ascii=False)
def merge_cross_flow(crossflow_list: list[pb.xl_cross_flowdata_t]):
"""将多个xl_cross_flowdata_t数据合并为一个
:param crossflow_list: 数组
:return: 合并后的pb消息
"""
avg_flow = pb.xl_cross_flowdata_t()
if len(crossflow_list) == 0:
return None
if len(crossflow_list) == 1:
return crossflow_list[0]
avg_flow.crossid = crossflow_list[0].crossid
for i in range(0, QUARTER_COUNT):
avg_flow.flows.append(0)
for item in crossflow_list:
flows = item.flows
for i in range(0, len(flows)):
avg_flow.flows[i] += flows[i]
return avg_flow
def get_cross_tp_divide(orig_flow_list: list[float]) -> list[str]:
"""输入路口流量数据,执行时段划分
:param orig_flow_list: 原始流量数组约定包含96个元素
:return: list[str], 时段列表
"""
# (1)执行时段划分
hm_idx_list, flow_list = do_cross_tp_divide(orig_flow_list)
# (2)生成时段序列, tp_list的数据结构: [["00:00","06:30"],["06:30","07:30"],["07:30","08:30"]]
tp_list = trans_to_tp_list(hm_idx_list)
return tp_list
def do_cross_tp_divide(orig_flow_list: list[float]):
"""
输入一个路口的flow数据执行时段划分返回1个划分结果 标准流量数据序列
:param crossflow:
:return: hm_idx_list, std_flow_list
"""
# 对原始流量进行平滑处理
smooth_flow_list = get_smoothed_flow(orig_flow_list)
# 计算相对流量
std_flow_list = get_normlize_flow(smooth_flow_list)
# 执行时段划分
idx_list = _split_tps_by_fallhead(std_flow_list)
return idx_list, std_flow_list
def get_smoothed_flow(flow_list: list[float]) -> list[float]:
"""
对原始流量数组进行平滑处理采用邻域平均法
:param flow_list: [float]
:return: [float]
"""
# 计算平均流量
avg_flow = sum(flow_list) / len(flow_list)
# 根据不同的平均流量,设置不同的权重
# print(f'avg_flow: {avg_flow}')
center_w = 0.6
left_w = 0.2
right_w = 0.2
if avg_flow < 10:
center_w = 0.4
left_w = 0.3
right_w = 0.3
# print(f'center_w: {center_w}, left_w: {left_w}, right_w: {right_w}')
smoothed_flow_list = []
max_idx = len(flow_list) - 1
for i in range(0, len(flow_list)):
sum_value = 0
sum_w = 0
sum_w += center_w
sum_value += flow_list[i] * center_w
if i + 1 >= 0 and i + 1 <= max_idx:
sum_w += right_w
sum_value += flow_list[i + 1] * right_w
if i - 1 >= 0 and i - 1 <= max_idx:
sum_w += left_w
sum_value += flow_list[i - 1] * left_w
smoothed_flow = sum_value / sum_w
smoothed_flow_list.append(smoothed_flow)
return smoothed_flow_list
def get_normlize_flow(flow_list: list[float]) -> list[float]:
"""
对原始流量数组进行归一化以最大流量作为100对其他流量进行等比缩放
:param flow_list: [float]
:return: [float]
"""
max_flow = 0
for flow in flow_list:
if flow > max_flow:
max_flow = flow
norm_flow_list = []
for flow in flow_list:
norm_flow = flow / max_flow * 100 if max_flow != 0 else 0
norm_flow_list.append(norm_flow)
return norm_flow_list
def _split_tps_by_fallhead(std_flow_list, max_tp_num=8):
"""
根据相邻时段的流量落差来划分时段, 返回用于切分时段的刻钟时段的序号的数组
:param std_flow_list: 标准流量序列
:param max_tp_num: 最大时段个数
:return: [hm_idx]
"""
min_tp_num = 5 # 最小时段个数
# 低流量阈值
low_flow_limit = 40
# (1)计算所有相邻刻钟的相对流量落差
fallhead_map = {}
"""包含 QUARTER_COUNT-1 个元素 {idx => drop}, 表示idx-1与idx之间的落差"""
for i in range(1, QUARTER_COUNT):
drop = _calc_drop(std_flow_list, i)
# 如果两侧的流量都低于该阈值就不再考虑落差一律设定为0
if std_flow_list[i] <= low_flow_limit and std_flow_list[i - 1] <= low_flow_limit:
drop = 0
fallhead_map[i] = drop
# (2)按落差降序排序
drop_info_list = sorted(fallhead_map.items(), key=lambda item: item[1], reverse=True)
# print(drop_info_list)
# (3) 提取落差最大的部分分界点
top_drop_idx_list = []
for info in drop_info_list:
idx = info[0]
drop = info[1]
if drop < 5: # 落差低于5%不再分割
break
top_drop_idx_list.append(idx)
# 如果落差不太差且已经满足max_tp_num则不再分割
if drop < 15 and len(top_drop_idx_list) >= max_tp_num - 1:
break
# 梳理得到顺序排序的分界点
split_idx_list = sorted(top_drop_idx_list)
boundary_list = [0] #
for idx in split_idx_list:
boundary_list.append(idx)
boundary_list.append(QUARTER_COUNT)
# 刻钟独立时段合并
# drop_limit_cannot_merge = 30 # 落差大于这个阈值,即使是单个刻钟时段,则不能合并
drop_limit_cannot_merge = 50 # 落差大于这个阈值,即使是单个刻钟时段,则不能合并
tp_num = len(boundary_list) - 1 # 当前时段个数
for i in range(1, len(boundary_list)):
if tp_num <= min_tp_num:
break
half_num = boundary_list[i] - boundary_list[i - 1]
if half_num == 1: # 只有1个刻钟则考虑合并到左侧或右侧
if i == 1:
# 只能合并到右侧
right_drop = fallhead_map[boundary_list[i]]
if right_drop <= drop_limit_cannot_merge:
boundary_list[i] = -1
tp_num -= 1
elif i == len(boundary_list) - 1:
# 只能合并到左侧
left_drop = fallhead_map[boundary_list[i - 1]]
if left_drop <= drop_limit_cannot_merge:
boundary_list[i - 1] = -1
else:
left_drop = fallhead_map[boundary_list[i - 1]]
right_drop = fallhead_map[boundary_list[i]]
if left_drop <= right_drop and left_drop <= drop_limit_cannot_merge:
# 合并到左侧
boundary_list[i - 1] = -1
tp_num -= 1
elif right_drop < left_drop and right_drop <= drop_limit_cannot_merge:
boundary_list[i] = -1
tp_num -= 1
# 合并落差小的相邻时段,以满足最大时段要求
if tp_num > max_tp_num:
need_reduce_tp_num = tp_num - max_tp_num
idx_to_index = {}
for i in range(0, len(boundary_list)):
if boundary_list[i] > 0:
idx_to_index[boundary_list[i]] = i
top_drop_idx_list.reverse()
kill_tp_num = 0
for i in range(0, len(top_drop_idx_list)):
idx = top_drop_idx_list[i]
if idx in idx_to_index:
index = idx_to_index[idx]
boundary_list[index] = -1
kill_tp_num += 1
if kill_tp_num >= need_reduce_tp_num:
break
# 梳理最终的分界点
hm_idx_list = []
for i in range(0, len(boundary_list)):
if boundary_list[i] < 0:
continue
hm_idx_list.append(boundary_list[i])
return hm_idx_list
def _calc_drop(flow_list, i):
"""
计算流量序列中第i个流量与前一个流量的落差
:param flow_list: 流量数组
:param i:
:return:
"""
drop = abs(flow_list[i] - flow_list[i - 1])
# 计算落差折减系数,当相对流量整体偏高,进行折减
factor = (flow_list[i] + flow_list[i - 1]) / 100
if factor < 1.0:
factor = 1.0
# 适当折减
drop = drop / factor
return drop
def trans_to_tp_list(hm_idx_list: list[int]) -> list[str]:
"""将刻钟序号数组,转化为时段数组
:param hm_idx_list: 刻钟序号构成的数组用于表示时段划分结果
:return: 时段字符串数组
"""
hm_list = []
for hm_idx in hm_idx_list:
num = hm_idx * 15
h = num // 60
m = num % 60
hm = '%02d:%02d' % (h, m)
hm_list.append(hm)
tp_list = []
for i in range(1, len(hm_list)):
tp_list.append([hm_list[i - 1], hm_list[i]])
return tp_list
def auto_match_phase(params):
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(1, '缺少crossid 请刷新后重试'))
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
scheduleid = check_param(params, 'scheduleid')
if not scheduleid:
return json.dumps(make_common_res(2, '缺少scheduleid 请刷新后重试'))
recommend_tp = check_param(params, 'recommend_tp')
if not recommend_tp or len(recommend_tp) == 0:
return json.dumps(make_common_res(2, '缺少recommend_tp 请刷新后重试'))
cross_phase_info, error = db_phasetable.query_cross_runing_phasetable(int(nodeid), crossid)
if error:
return json.dumps(make_common_res(2, f"{error}"))
scheduleid = int(scheduleid)
res = make_common_res(0, 'ok')
res['data'] = {
'scheduleid': scheduleid,
'plans': []
}
if len(cross_phase_info) == 0:
return json.dumps(res, ensure_ascii=False)
cross_phase_info_map = {}
for item_cross_phase_info in cross_phase_info:
if scheduleid != item_cross_phase_info['scheduleid']:
continue
if item_cross_phase_info['tp_start'] not in cross_phase_info_map:
cross_phase_info_map[item_cross_phase_info['tp_start']] = {
'tp_start': item_cross_phase_info['tp_start'],
'control_mode': item_cross_phase_info['control_mode'],
'coord_phaseid': item_cross_phase_info['coord_phaseid'],
'cycle': item_cross_phase_info['cycle'],
'name': item_cross_phase_info['name'],
'offset': item_cross_phase_info['offset'],
'planid': item_cross_phase_info['planid'],
'stages': [],
}
cross_phase_info_map[item_cross_phase_info['tp_start']]['stages'].append({
'stageid': item_cross_phase_info['stageid'],
'stage_name': item_cross_phase_info['stage_name'],
'allred': item_cross_phase_info['allred'],
'green': item_cross_phase_info['green'],
'max_green': item_cross_phase_info['max_green'],
'min_green': item_cross_phase_info['min_green'],
'phaseids': item_cross_phase_info['phases'],
'phaseid_names': item_cross_phase_info['phase_name'],
'redyellow': item_cross_phase_info['redyellow'],
'stage_duration': item_cross_phase_info['stage_duration'],
'yellow': item_cross_phase_info['yellow'],
})
phase_tps_list = tplist_2_tpinterval(list(cross_phase_info_map.keys()))
for item_phase_tps_list in phase_tps_list:
cross_phase_info_map[item_phase_tps_list['tp_start']]['tp_end'] = item_phase_tps_list['tp_end']
recommend_tp_interval = tplist_2_tpinterval(recommend_tp)
for item_recommend_tp_interval in recommend_tp_interval:
max_calc_diff_minute = 0
max_max_calc_diff_minute = None
for _, item_cross_phase_info_map in cross_phase_info_map.items():
if (item_recommend_tp_interval['tp_end'] > item_cross_phase_info_map['tp_start']
and item_cross_phase_info_map['tp_end'] > item_recommend_tp_interval['tp_start']):
calc_diff_minute = time_intersection_minutes(
f"{item_recommend_tp_interval['tp_start']}-{item_recommend_tp_interval['tp_end']}",
f"{item_cross_phase_info_map['tp_start']}-{item_cross_phase_info_map['tp_end']}")
if calc_diff_minute > max_calc_diff_minute:
max_calc_diff_minute = calc_diff_minute
max_max_calc_diff_minute = copy.deepcopy(item_cross_phase_info_map)
if not max_max_calc_diff_minute:
continue
max_max_calc_diff_minute['tp_start'] = item_recommend_tp_interval['tp_start']
max_max_calc_diff_minute['tp_end'] = item_recommend_tp_interval['tp_end']
if len(res['data']['plans']) == 0:
res['data']['plans'].append(max_max_calc_diff_minute)
continue
if (res['data']['plans'][-1]['tp_end'] == max_max_calc_diff_minute['tp_start']
and res['data']['plans'][-1]['planid'] == max_max_calc_diff_minute['planid']):
res['data']['plans'][-1]['tp_end'] = max_max_calc_diff_minute['tp_end']
continue
res['data']['plans'].append(max_max_calc_diff_minute)
return json.dumps(res, ensure_ascii=False)