import copy import io import json from io import BytesIO from app.common_worker import * from app.eva_common import * from openpyxl import Workbook from openpyxl.styles import Alignment from datetime import datetime # 查询可用路口列表 from proto.phase_grpc import QueryCrossRunningPhase from flask import send_file # 一天内的一刻时段个数 QUARTER_COUNT = 96 # 查询路口可用时段 def cross_flow_usable_date(params): crossid = check_param(params, 'crossid') if not crossid: return json.dumps(make_common_res(1, '缺少crossid, 请刷新后重试')) nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) row_list, error = db_cross.query_cross_flow_usable_date_sql(nodeid, crossid) if error: return json.dumps(make_common_res(5, f"{error}")) res = make_common_res(0, 'ok') res['data'] = [] if len(row_list) <= 0: return json.dumps(res, ensure_ascii=False) date_list = [row['day'] for row in row_list] res = make_common_res(0, 'ok') res['data'] = date_list return json.dumps(res, ensure_ascii=False) def cross_flow_tp_divide(params): crossid = check_param(params, 'crossid') if not crossid: return json.dumps(make_common_res(1, '缺少crossid, 请刷新后重试')) nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) date_list = check_param(params, 'date_list') if not date_list or len(date_list) == 0: return json.dumps(make_common_res(2, '缺少date_list, 请刷新后重试')) date_type = check_param(params, 'date_type') if not date_type: return json.dumps(make_common_res(2, '缺少date_type, 请刷新后重试')) week_days = [1, 2, 3, 4, 5, 6, 7] if date_type == 'workday': week_days = [1, 2, 3, 4, 5] if date_type == 'weekend': week_days = [6, 7] search_date_list = [] search_week_day_map = {} for item_date in date_list: date_time = datetime.strptime(str(item_date), '%Y%m%d') date_weekday = date_time.weekday() + 1 if date_weekday in week_days: search_week_day_map[date_weekday] = True search_date_list.append(item_date) res = make_common_res(0, 'ok') res['data'] = { 'flow_data': [], 'recommend_tp': [], 'schedule_info': [] } if len(search_date_list) == 0: return json.dumps(res, ensure_ascii=False) flow_data, error = db_cross.query_cross_flowdata(str(nodeid), crossid, search_date_list) if error: return json.dumps(make_common_res(2, f"{error}")) if len(flow_data) <= 0: return json.dumps(res, ensure_ascii=False) flow_data_pb_list = [] for item_flow_data in flow_data: item_flow_pb = pb.xl_cross_flowdata_t() item_flow_pb.ParseFromString(item_flow_data['data']) flow_data_pb_list.append(item_flow_pb) merged_flow = merge_cross_flow(flow_data_pb_list) if not merged_flow: return json.dumps(res, ensure_ascii=False) hm_idx_list, flow_list = do_cross_tp_divide(merged_flow.flows) if not flow_list or len(flow_list) == 0: return json.dumps(res, ensure_ascii=False) tp_list = trans_to_tp_list(hm_idx_list) recommend_tp = {} if tp_list and len(tp_list) > 0: for item_tp_list in tp_list: res['data']['recommend_tp'].append(item_tp_list[0]) recommend_tp[item_tp_list[0]] = 1 current_time = datetime.strptime("00:00", "%H:%M") for i in range(0, len(flow_list)): time_str = current_time.strftime("%H:%M") res['data']['flow_data'].append({ "flow": round(flow_list[i], 2), "tp": time_str, "recommend": 1 if recommend_tp.get(time_str) and time_str != '00:00' else 0 }) current_time += timedelta(minutes=15) #查询配时方案 tps, error = db_phasetable.query_cross_tps_by_crossid(int(nodeid), crossid) if error: return json.dumps(make_common_res(2, f"{error}"), ensure_ascii=False) if len(tps) <= 0: return res tps_map = {} for item_tps in tps: if item_tps['weekday'] not in tps_map: tps_map[item_tps['weekday']] = { 'tp': [], 'scheduleid': item_tps['scheduleid'], } tps_map[item_tps['weekday']]['tp'].append(item_tps['tp_start']) search_week_day = list(search_week_day_map.keys()) for weeks, tp_value in tps_map.items(): weeks_list = list(map(int, weeks.split(','))) intersection = list(set(search_week_day) & set(weeks_list)) if len(intersection) == 0: continue plans, error = db_phasetable.day_schedule_by_xlcrossid(int(nodeid), crossid, tp_value['scheduleid']) if error: return json.dumps(make_common_res(2, f"{error}"), ensure_ascii=False) week_name = [] if weeks == '1,2,3,4,5,6,7': week_name = ['全周'] if weeks == '1,2,3,4,5': week_name = ['工作日'] if weeks == '6,7': week_name = ['周末'] if len(week_name) == 0: week_slice = weeks.split(',') for item_week in week_slice: week_name.append(g_week_day[int(item_week)]) item_data = { 'scheduleid': tp_value['scheduleid'], 'schedule_week': weeks, 'schedule_name': ','.join(week_name), 'tps': [], } plans_map = {} for item_plan in plans: plans_map[item_plan['tp_start']] = item_plan if len(tp_value['tp']) > 0: tp_interval = tplist_2_tpinterval(tp_value['tp']) for item_tp_interval in tp_interval: if item_tp_interval['tp_start'] in plans_map: item_data['tps'].append({ 'tp_start': item_tp_interval['tp_start'], 'tp_end': item_tp_interval['tp_end'], 'plan_name': plans_map[item_tp_interval['tp_start']]['name'], 'plan_id': plans_map[item_tp_interval['tp_start']]['planid'], }) res['data']['schedule_info'].append(item_data) return json.dumps(res, ensure_ascii=False) def merge_cross_flow(crossflow_list: list[pb.xl_cross_flowdata_t]): """将多个xl_cross_flowdata_t数据合并为一个 :param crossflow_list: 数组 :return: 合并后的pb消息 """ avg_flow = pb.xl_cross_flowdata_t() if len(crossflow_list) == 0: return None if len(crossflow_list) == 1: return crossflow_list[0] avg_flow.crossid = crossflow_list[0].crossid for i in range(0, QUARTER_COUNT): avg_flow.flows.append(0) for item in crossflow_list: flows = item.flows for i in range(0, len(flows)): avg_flow.flows[i] += flows[i] return avg_flow def get_cross_tp_divide(orig_flow_list: list[float]) -> list[str]: """输入路口流量数据,执行时段划分 :param orig_flow_list: 原始流量数组,约定包含96个元素 :return: list[str], 时段列表 """ # (1)执行时段划分 hm_idx_list, flow_list = do_cross_tp_divide(orig_flow_list) # (2)生成时段序列, tp_list的数据结构: [["00:00","06:30"],["06:30","07:30"],["07:30","08:30"]] tp_list = trans_to_tp_list(hm_idx_list) return tp_list def do_cross_tp_divide(orig_flow_list: list[float]): """ 输入一个路口的flow数据,执行时段划分。返回:1个划分结果 和 标准流量数据序列 :param crossflow: :return: hm_idx_list, std_flow_list """ # 对原始流量进行平滑处理 smooth_flow_list = get_smoothed_flow(orig_flow_list) # 计算相对流量 std_flow_list = get_normlize_flow(smooth_flow_list) # 执行时段划分 idx_list = _split_tps_by_fallhead(std_flow_list) return idx_list, std_flow_list def get_smoothed_flow(flow_list: list[float]) -> list[float]: """ 对原始流量数组进行平滑处理,采用邻域平均法 :param flow_list: [float] :return: [float] """ # 计算平均流量 avg_flow = sum(flow_list) / len(flow_list) # 根据不同的平均流量,设置不同的权重 # print(f'avg_flow: {avg_flow}') center_w = 0.6 left_w = 0.2 right_w = 0.2 if avg_flow < 10: center_w = 0.4 left_w = 0.3 right_w = 0.3 # print(f'center_w: {center_w}, left_w: {left_w}, right_w: {right_w}') smoothed_flow_list = [] max_idx = len(flow_list) - 1 for i in range(0, len(flow_list)): sum_value = 0 sum_w = 0 sum_w += center_w sum_value += flow_list[i] * center_w if i + 1 >= 0 and i + 1 <= max_idx: sum_w += right_w sum_value += flow_list[i + 1] * right_w if i - 1 >= 0 and i - 1 <= max_idx: sum_w += left_w sum_value += flow_list[i - 1] * left_w smoothed_flow = sum_value / sum_w smoothed_flow_list.append(smoothed_flow) return smoothed_flow_list def get_normlize_flow(flow_list: list[float]) -> list[float]: """ 对原始流量数组进行归一化,以最大流量作为100,对其他流量进行等比缩放 :param flow_list: [float] :return: [float] """ max_flow = 0 for flow in flow_list: if flow > max_flow: max_flow = flow norm_flow_list = [] for flow in flow_list: norm_flow = flow / max_flow * 100 if max_flow != 0 else 0 norm_flow_list.append(norm_flow) return norm_flow_list def _split_tps_by_fallhead(std_flow_list, max_tp_num=8): """ 根据相邻时段的流量落差来划分时段, 返回用于切分时段的刻钟时段的序号的数组 :param std_flow_list: 标准流量序列 :param max_tp_num: 最大时段个数 :return: [hm_idx] """ min_tp_num = 5 # 最小时段个数 # 低流量阈值 low_flow_limit = 40 # (1)计算所有相邻刻钟的相对流量落差 fallhead_map = {} """包含 QUARTER_COUNT-1 个元素 {idx => drop}, 表示idx-1与idx之间的落差""" for i in range(1, QUARTER_COUNT): drop = _calc_drop(std_flow_list, i) # 如果两侧的流量都低于该阈值,就不再考虑落差,一律设定为0 if std_flow_list[i] <= low_flow_limit and std_flow_list[i - 1] <= low_flow_limit: drop = 0 fallhead_map[i] = drop # (2)按落差降序排序 drop_info_list = sorted(fallhead_map.items(), key=lambda item: item[1], reverse=True) # print(drop_info_list) # (3) 提取落差最大的部分分界点 top_drop_idx_list = [] for info in drop_info_list: idx = info[0] drop = info[1] if drop < 5: # 落差低于5%不再分割 break top_drop_idx_list.append(idx) # 如果落差不太差,且已经满足max_tp_num,则不再分割 if drop < 15 and len(top_drop_idx_list) >= max_tp_num - 1: break # 梳理得到顺序排序的分界点 split_idx_list = sorted(top_drop_idx_list) boundary_list = [0] # for idx in split_idx_list: boundary_list.append(idx) boundary_list.append(QUARTER_COUNT) # 刻钟独立时段合并 # drop_limit_cannot_merge = 30 # 落差大于这个阈值,即使是单个刻钟时段,则不能合并 drop_limit_cannot_merge = 50 # 落差大于这个阈值,即使是单个刻钟时段,则不能合并 tp_num = len(boundary_list) - 1 # 当前时段个数 for i in range(1, len(boundary_list)): if tp_num <= min_tp_num: break half_num = boundary_list[i] - boundary_list[i - 1] if half_num == 1: # 只有1个刻钟,则考虑合并到左侧或右侧 if i == 1: # 只能合并到右侧 right_drop = fallhead_map[boundary_list[i]] if right_drop <= drop_limit_cannot_merge: boundary_list[i] = -1 tp_num -= 1 elif i == len(boundary_list) - 1: # 只能合并到左侧 left_drop = fallhead_map[boundary_list[i - 1]] if left_drop <= drop_limit_cannot_merge: boundary_list[i - 1] = -1 else: left_drop = fallhead_map[boundary_list[i - 1]] right_drop = fallhead_map[boundary_list[i]] if left_drop <= right_drop and left_drop <= drop_limit_cannot_merge: # 合并到左侧 boundary_list[i - 1] = -1 tp_num -= 1 elif right_drop < left_drop and right_drop <= drop_limit_cannot_merge: boundary_list[i] = -1 tp_num -= 1 # 合并落差小的相邻时段,以满足最大时段要求 if tp_num > max_tp_num: need_reduce_tp_num = tp_num - max_tp_num idx_to_index = {} for i in range(0, len(boundary_list)): if boundary_list[i] > 0: idx_to_index[boundary_list[i]] = i top_drop_idx_list.reverse() kill_tp_num = 0 for i in range(0, len(top_drop_idx_list)): idx = top_drop_idx_list[i] if idx in idx_to_index: index = idx_to_index[idx] boundary_list[index] = -1 kill_tp_num += 1 if kill_tp_num >= need_reduce_tp_num: break # 梳理最终的分界点 hm_idx_list = [] for i in range(0, len(boundary_list)): if boundary_list[i] < 0: continue hm_idx_list.append(boundary_list[i]) return hm_idx_list def _calc_drop(flow_list, i): """ 计算流量序列中,第i个流量与前一个流量的落差 :param flow_list: 流量数组 :param i: :return: """ drop = abs(flow_list[i] - flow_list[i - 1]) # 计算落差折减系数,当相对流量整体偏高,进行折减 factor = (flow_list[i] + flow_list[i - 1]) / 100 if factor < 1.0: factor = 1.0 # 适当折减 drop = drop / factor return drop def trans_to_tp_list(hm_idx_list: list[int]) -> list[str]: """将刻钟序号数组,转化为时段数组 :param hm_idx_list: 刻钟序号构成的数组,用于表示时段划分结果 :return: 时段字符串数组 """ hm_list = [] for hm_idx in hm_idx_list: num = hm_idx * 15 h = num // 60 m = num % 60 hm = '%02d:%02d' % (h, m) hm_list.append(hm) tp_list = [] for i in range(1, len(hm_list)): tp_list.append([hm_list[i - 1], hm_list[i]]) return tp_list def auto_match_phase(params): crossid = check_param(params, 'crossid') if not crossid: return json.dumps(make_common_res(1, '缺少crossid, 请刷新后重试')) nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) scheduleid = check_param(params, 'scheduleid') if not scheduleid: return json.dumps(make_common_res(2, '缺少scheduleid, 请刷新后重试')) recommend_tp = check_param(params, 'recommend_tp') if not recommend_tp or len(recommend_tp) == 0: return json.dumps(make_common_res(2, '缺少recommend_tp, 请刷新后重试')) cross_phase_info, error = db_phasetable.query_cross_runing_phasetable(int(nodeid), crossid) if error: return json.dumps(make_common_res(2, f"{error}")) scheduleid = int(scheduleid) res = make_common_res(0, 'ok') res['data'] = { 'scheduleid': scheduleid, 'plans': [] } if len(cross_phase_info) == 0: return json.dumps(res, ensure_ascii=False) cross_phase_info_map = {} for item_cross_phase_info in cross_phase_info: if scheduleid != item_cross_phase_info['scheduleid']: continue if item_cross_phase_info['tp_start'] not in cross_phase_info_map: cross_phase_info_map[item_cross_phase_info['tp_start']] = { 'tp_start': item_cross_phase_info['tp_start'], 'control_mode': item_cross_phase_info['control_mode'], 'coord_phaseid': item_cross_phase_info['coord_phaseid'], 'cycle': item_cross_phase_info['cycle'], 'name': item_cross_phase_info['name'], 'offset': item_cross_phase_info['offset'], 'planid': item_cross_phase_info['planid'], 'stages': [], } cross_phase_info_map[item_cross_phase_info['tp_start']]['stages'].append({ 'stageid': item_cross_phase_info['stageid'], 'stage_name': item_cross_phase_info['stage_name'], 'allred': item_cross_phase_info['allred'], 'green': item_cross_phase_info['green'], 'max_green': item_cross_phase_info['max_green'], 'min_green': item_cross_phase_info['min_green'], 'phaseids': item_cross_phase_info['phases'], 'phaseid_names': item_cross_phase_info['phase_name'], 'redyellow': item_cross_phase_info['redyellow'], 'stage_duration': item_cross_phase_info['stage_duration'], 'yellow': item_cross_phase_info['yellow'], }) phase_tps_list = tplist_2_tpinterval(list(cross_phase_info_map.keys())) for item_phase_tps_list in phase_tps_list: cross_phase_info_map[item_phase_tps_list['tp_start']]['tp_end'] = item_phase_tps_list['tp_end'] recommend_tp_interval = tplist_2_tpinterval(recommend_tp) for item_recommend_tp_interval in recommend_tp_interval: max_calc_diff_minute = 0 max_max_calc_diff_minute = None for _, item_cross_phase_info_map in cross_phase_info_map.items(): if (item_recommend_tp_interval['tp_end'] > item_cross_phase_info_map['tp_start'] and item_cross_phase_info_map['tp_end'] > item_recommend_tp_interval['tp_start']): calc_diff_minute = time_intersection_minutes( f"{item_recommend_tp_interval['tp_start']}-{item_recommend_tp_interval['tp_end']}", f"{item_cross_phase_info_map['tp_start']}-{item_cross_phase_info_map['tp_end']}") if calc_diff_minute > max_calc_diff_minute: max_calc_diff_minute = calc_diff_minute max_max_calc_diff_minute = copy.deepcopy(item_cross_phase_info_map) if not max_max_calc_diff_minute: continue max_max_calc_diff_minute['tp_start'] = item_recommend_tp_interval['tp_start'] max_max_calc_diff_minute['tp_end'] = item_recommend_tp_interval['tp_end'] if len(res['data']['plans']) == 0: res['data']['plans'].append(max_max_calc_diff_minute) continue if (res['data']['plans'][-1]['tp_end'] == max_max_calc_diff_minute['tp_start'] and res['data']['plans'][-1]['planid'] == max_max_calc_diff_minute['planid']): res['data']['plans'][-1]['tp_end'] = max_max_calc_diff_minute['tp_end'] continue res['data']['plans'].append(max_max_calc_diff_minute) return json.dumps(res, ensure_ascii=False)