diff --git a/app/common_worker.py b/app/common_worker.py index 741209e..da36cf8 100644 --- a/app/common_worker.py +++ b/app/common_worker.py @@ -340,4 +340,67 @@ def convert_time(time_int): minutes = time_int % 100 time_str = f"{hours:02d}:{minutes:02d}" - return time_str \ No newline at end of file + return time_str + + +def generate_date_range(start_date_str, end_date_str): + """生成两个日期之间的所有日期(包括开始和结束日期)""" + start_date = datetime.strptime(start_date_str, "%Y%m%d") + end_date = datetime.strptime(end_date_str, "%Y%m%d") + date_list = [] + current_date = start_date + while current_date <= end_date: + date_list.append(current_date.strftime("%Y%m%d")) + current_date += timedelta(days=1) + return date_list + + +def gen_ten_weeks_ago_data_list(): + fmt = "%Y%m%d" + end = datetime.now() - timedelta(days=1) # 昨天 + start = end - timedelta(days=69) # 70 天跨度 = 10 个完整周 + + week_buckets = {} + for d in range((end - start).days + 1): + day = start + timedelta(days=d) + monday = day - timedelta(days=day.weekday()) # 本周一 + week_buckets.setdefault(monday, []).append(day.strftime(fmt)) + + # 大 list:从远到近(最早一周在最前面) + result = [week_buckets[k] for k in sorted(week_buckets, reverse=False)] + return result + + +def count_lsr(turn_str): + """ + 统计车道转向中左转(l)、直行(s)、右转(r)的数量。 + + Args: + turn_str: 形如 "2|1|1|1|5" 的字符串,每个数字对应 g_turn2str 中的键。 + + Returns: + 形如 "1/3/1" 的字符串,顺序为左转、直行、右转的数量。 + """ + # 初始化计数器 + left_count = 0 + straight_count = 0 + right_count = 0 + # 分割输入字符串 + turn_keys = turn_str.split('|') + # 遍历每个车道 + for key in turn_keys: + if key in g_turn2str: + turn_value = g_turn2str[key] + # 忽略掉头和特殊车道 + if turn_value in ['t', 'bus', 'reversible', '-']: + continue + # 统计每个转向 + if 'l' in turn_value: + left_count += 1 + if 's' in turn_value: + straight_count += 1 + if 'r' in turn_value: + right_count += 1 + + # 返回格式化的结果 + return f"{left_count}/{straight_count}/{right_count}" diff --git a/app/cross_eva_views.py b/app/cross_eva_views.py index 5be0767..b1a6c15 100644 --- a/app/cross_eva_views.py +++ b/app/cross_eva_views.py @@ -34,7 +34,11 @@ def query_cross_usable_date_api(): @app.route('/api/query_cross_delay_info', methods=['POST']) def query_cross_delay_info_api(): - return query_cross_delay_info(request.json) + return query_cross_delay_info_controller(request.json) + +@app.route('/api/query_cross_index_trend', methods=['POST']) +def query_cross_index_trend_api(): + return query_cross_index_trend_controller(request.json) if __name__ == '__main__': diff --git a/app/cross_evaluate_worker.py b/app/cross_evaluate_worker.py index 7e35ef3..624c1fb 100644 --- a/app/cross_evaluate_worker.py +++ b/app/cross_evaluate_worker.py @@ -9,6 +9,9 @@ from app.eva_common import * # 查询可用路口列表 +from proto.phase_grpc import QueryCrossRunningPhase + + def query_cross_list(params): nodeid = check_param(params, 'nodeid') if not nodeid: @@ -28,8 +31,25 @@ def query_cross_list(params): # 查询路口列表 cross_list = db_tmnet.query_cross_list_sql(nodeid, area_id) + tp_desc = db_tmnet.query_city_tp_info(nodeid, area_id) + if not tp_desc: + tp_info = [ + "00:00-07:00", + "07:00-09:00", + "09:00-12:00", + "12:00-14:00", + "14:00-17:00", + "17:00-19:00", + "19:00-22:00", + "22:00-00:00" + ] + else: + tp_info = tp_desc[0]['tp_desc'].split(',') res = make_common_res(0, 'ok') - res['data'] = cross_list + res['data'] = { + 'cross_list': cross_list, + 'tp_info': tp_info, + } return json.dumps(res, ensure_ascii=False) @@ -62,7 +82,7 @@ def query_cross_usable_date(params): # 获取路口延迟信息 -def query_cross_delay_info(params): +def query_cross_delay_info_controller(params): crossid = check_param(params, 'crossid') if not crossid: return json.dumps(make_common_res(1, '缺少crossid, 请刷新后重试')) @@ -91,20 +111,23 @@ def query_cross_delay_info(params): if not time_range: return json.dumps(make_common_res(8, '缺少时段范围,请选择时段范围')) tp_start = int(str(time_range.split('-')[0]).split(':')[0]) * 100 + int(str(time_range.split('-')[0]).split(':')[1]) - tp_end = int(str(time_range.split('-')[1]).split(':')[0]) * 100 + int(str(time_range.split('-')[1]).split(':')[1]) + # tp_end = int(str(time_range.split('-')[1]).split(':')[0]) * 100 + int(str(time_range.split('-')[1]).split(':')[1]) + # if tp_end == 0: + # tp_end = 2400 if query_type == 1: tp_start = 't' + str(tp_start) elif query_type == 2: tp_start = 'h' + str(tp_start) - cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, tp_start, tp_end) + cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, tp_start) avg_cross_delay_info = gen_avg_cross_delay_pb(cross_delay_data_list) if not avg_cross_delay_info: return json.dumps(make_common_res(9, '当前所选日期范围内该评测时段无可用数据')) cross_inroads = db_tmnet.query_cross_inroads(crossid, nodeid) inroad_static_info_dict = {item['roadid']: item for item in cross_inroads} # 路口静态信息及台账信息 - cross_static_info, cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid) - roads_dir_dict = gen_road_dir_dict(cross_ledger_info_dict) + cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid) + cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict) + roads_dir_dict = gen_road_dir_dict(cross_ledger_info) # 路口指标数据概览 overview_res = gen_overview_index(avg_cross_delay_info, inroad_static_info_dict, nodeid, date_list, roads_dir_dict) # 路段及流向数据概览 @@ -117,8 +140,141 @@ def query_cross_delay_info(params): 'road_flow_delay_infos': road_flow_delay_infos, 'road_flow_turn_rate': road_flow_turn_rate, 'cross_static_info': cross_static_info, - 'ledger_info': cross_ledger_info_dict + 'ledger_info': cross_ledger_info } return json.dumps(res, ensure_ascii=False) +# 问题诊断接口 +def query_cross_problems(params): + crossid = check_param(params, 'crossid') + if not crossid: + return json.dumps(make_common_res(1, '缺少crossid, 请刷新后重试')) + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_user_areas(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = [int(row['area_id']) for row in area_list] + if int(area_id) not in area_list: + return json.dumps(make_common_res(6, '用户信息异常')) + date_list = check_param(params, 'date_list') + if not date_list or len(date_list) < 1: + return json.dumps(make_common_res(7, '缺少日期参数,请最少选择一天作为查询日期')) + query_type = check_param(params, 'query_type') + if not query_type: + query_type = 0 + time_range = check_param(params, 'time_range') + if not time_range: + return json.dumps(make_common_res(8, '缺少时段范围,请选择时段范围')) + tp_start = int(str(time_range.split('-')[0]).split(':')[0]) * 100 + int(str(time_range.split('-')[0]).split(':')[1]) + # tp_end = int(str(time_range.split('-')[1]).split(':')[0]) * 100 + int(str(time_range.split('-')[1]).split(':')[1]) + # if tp_end == 0: + # tp_end = 2400 + if query_type == 1: + tp_start = 't' + str(tp_start) + elif query_type == 2: + tp_start = 'h' + str(tp_start) + cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, tp_start) + avg_cross_delay_info = gen_avg_cross_delay_pb(cross_delay_data_list) + if not avg_cross_delay_info: + return json.dumps(make_common_res(9, '当前所选日期范围内该评测时段无可用数据')) + cross_inroads = db_tmnet.query_cross_inroads(crossid, nodeid) + inroad_static_info_dict = {item['roadid']: item for item in cross_inroads} + # 路口静态信息及台账信息 + cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid) + cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict) + roads_dir_dict = gen_road_dir_dict(cross_ledger_info) + gen_cross_problems(avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict) + # todo 与子鉴确认逻辑 + cross_phase, err = QueryCrossRunningPhase(nodeid, [crossid]) + + +# 指标变化趋势接口 +def query_cross_index_trend_controller(params): + crossid = check_param(params, 'crossid') + if not crossid: + return json.dumps(make_common_res(1, '缺少crossid, 请刷新后重试')) + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_user_areas(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = [int(row['area_id']) for row in area_list] + if int(area_id) not in area_list: + return json.dumps(make_common_res(6, '用户信息异常')) + query_date = check_param(params, 'query_date') + if not query_date: + return json.dumps(make_common_res(7, '缺少查询日期,请选择查询日期')) + time_range = check_param(params, 'time_range') + if not time_range: + return json.dumps(make_common_res(8, '缺少时段范围,请选择时段范围')) + query_type= check_param(params, 'query_type') + if not query_type: + query_type = 0 + tp_start = int(str(time_range.split('-')[0]).split(':')[0]) * 100 + int(str(time_range.split('-')[0]).split(':')[1]) + tp_end = int(str(time_range.split('-')[1]).split(':')[0]) * 100 + int(str(time_range.split('-')[1]).split(':')[1]) + if query_type == 1: + tp_start = 't' + str(tp_start) + elif query_type == 2: + tp_start = 'h' + str(tp_start) + + prev_date = (datetime.strptime(query_date, '%Y%m%d') - timedelta(days=1)).strftime('%Y%m%d') + month_ago_date = (datetime.now().date() - timedelta(days=30)).strftime('%Y%m%d') + month_date_list = generate_date_range(month_ago_date, query_date) + # ten_week_ago_date = (datetime.now().date() - timedelta(days=70)).strftime('%Y%m%d') + # week_date_list = generate_date_range(ten_week_ago_date, query_date) + ten_weeks_date_list = gen_ten_weeks_ago_data_list() + + # 查询台账信息 获取路网渠化关系 + cross_ledger_info_dict = query_cross_ledger_info(crossid, nodeid, area_id, userid) + cross_static_info, cross_ledger_info = gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict) + roads_dir_dict = gen_road_dir_dict(cross_ledger_info) + + # 查询前一天的小时级别数据 + hours_data = db_cross.query_cross_delay_whole_day_hours(crossid, nodeid, prev_date) + hour_pb_list = parse_data2pb(hours_data) + hours_data_dict = parse_single_cross_delay_info(crossid, nodeid, hour_pb_list, 'hour', roads_dir_dict) + hours_data_with_change_rate = calc_single_day_delay_info_change_rate(hours_data_dict) + # 查询近30天的数据 + days_data = db_cross.query_cross_delay_info(crossid, nodeid, month_date_list, tp_start) + days_pb_list = parse_data2pb(days_data) + days_data_dict = parse_single_cross_delay_info(crossid, nodeid, days_pb_list, 'day', roads_dir_dict) + days_data_with_change_rate = calc_single_day_delay_info_change_rate(days_data_dict) + # 查询近10周的数据 需要聚合 可能不能使用下述方式查询 + ten_week_datas = [] + for week_dates in ten_weeks_date_list: + weeks_data = db_cross.query_cross_delay_info(crossid, nodeid, week_dates, tp_start) + week_cross_delay_info = gen_avg_cross_delay_pb(weeks_data) + ten_week_datas.append({ + 'day': week_dates[0] + '-' + week_dates[-1], + 'tp_start': tp_start, + 'tp_end': tp_end, + 'data': week_cross_delay_info + }) + weeks_data_dict = parse_single_cross_delay_info(crossid, nodeid, ten_week_datas, 'week', roads_dir_dict) + weeks_data_with_change_rate = calc_single_day_delay_info_change_rate(weeks_data_dict) + res = make_common_res(0, 'ok') + res['data'] = { + 'hours_data': hours_data_with_change_rate, + 'days_data': days_data_with_change_rate, + 'weeks_data': weeks_data_with_change_rate + } + return json.dumps(res, ensure_ascii=False) + + + diff --git a/app/db_cross_delay.py b/app/db_cross_delay.py index 2c93281..dfd7ae6 100644 --- a/app/db_cross_delay.py +++ b/app/db_cross_delay.py @@ -18,9 +18,16 @@ class CrossDbHelper(TableDbHelperBase): """ return self.do_select(sql) - def query_cross_delay_info(self, crossid, nodeid, date_list, tp_start, tp_end): + def query_cross_delay_info(self, crossid, nodeid, date_list, tp_start): date_list = ','.join(["'" + str(item) + "'" for item in date_list]) sql = f""" - select * from traffic_{nodeid}.cross_delay where crossid = '{crossid}' and day in ({date_list}) and tp_start = '{tp_start}' and tp_end = '{tp_end}' + select * from traffic_{nodeid}.cross_delay where crossid = '{crossid}' and day in ({date_list}) and tp_start = '{tp_start}' order by day """ return self.do_select(sql) + + def query_cross_delay_whole_day_hours(self, crossid, nodeid, query_date): + sql = f""" + select * from traffic_{nodeid}.cross_delay where crossid = '{crossid}' and day = '{query_date}' and tp_start like 'h%' order by CAST(REGEXP_SUBSTR(tp_start, '[0-9]+') AS UNSIGNED) + """ + return self.do_select(sql) + diff --git a/app/eva_common.py b/app/eva_common.py index 352ae58..87c06e4 100644 --- a/app/eva_common.py +++ b/app/eva_common.py @@ -10,7 +10,8 @@ import requests from google.protobuf.json_format import MessageToJson import proto.xlcomm_pb2 as pb -from app.global_source import db_tmnet, db_cross +from app.common_worker import * +from proto.phase_grpc import QueryCrossRunningPhase src_reverse = {'N': 'S', 'S': 'N', 'W': 'E', 'E': 'W', 'NE': 'SW', 'SW': 'NE', 'SE': 'NW', 'NW': 'SE'} @@ -167,7 +168,7 @@ def gen_avg_cross_delay_pb(cross_delay_data_list): inroad_delay_info.delay_info.turn_ratio_2 = turn_ratio_2 inroad_delay_info.delay_info.turn_ratio_3 = turn_ratio_3 inroad_delay_pb_list.append(inroad_delay_info) - cross_imbalance_index = round((max_stop_times - min_stop_times) / avg_cross_delay.delay_info.stop_times, 2) + cross_imbalance_index = round((max_stop_times - min_stop_times) / avg_cross_delay.delay_info.stop_times, 2) if avg_cross_delay.delay_info.stop_times != 0 else 0 avg_cross_delay.delay_info.imbalance_index = cross_imbalance_index for flow_delay_info in flow_delay_list: @@ -262,7 +263,7 @@ def gen_overview_index(avg_cross_delay_info, inroad_static_info_dict, nodeid, da jam_index = round(avg_cross_delay_info.delay_info.jam_index, 2) stop_times = round(avg_cross_delay_info.delay_info.stop_times, 2) high_park_percent = str(avg_cross_delay_info.delay_info.high_park_percent) + '%' - imbanlance_index = round(avg_cross_delay_info.delay_info.imbalance_index, 2) + imbalance_index = round(avg_cross_delay_info.delay_info.imbalance_index, 2) speed = avg_cross_delay_info.delay_info.speed / 100 move_speed = avg_cross_delay_info.delay_info.move_speed / 100 park_time = avg_cross_delay_info.delay_info.park_time @@ -277,7 +278,7 @@ def gen_overview_index(avg_cross_delay_info, inroad_static_info_dict, nodeid, da 'jam_index': jam_index, 'stop_times': stop_times, 'high_park_percent': high_park_percent, - 'imbanlance_index': imbanlance_index, + 'imbalance_index': imbalance_index, 'speed': speed, 'move_speed': move_speed, 'park_time': park_time, @@ -330,8 +331,13 @@ def query_cross_ledger_info(crossid, nodeid, area_id, userid): if cross_ledger_info.status_code != 200 or cross_ledger_info.json()['status'] != 0: logging.error(f"查询路口台账信息失败,crossid:{crossid},nodeid:{nodeid},area_id:{area_id},userid:{userid}") return None, None - division_list, company_list, slc_company_dict, slckind_list, detector_type_dict = gen_ledger_base_info(nodeid, area_id) + cross_ledger_info_dict = json.loads(cross_ledger_info.text) + return cross_ledger_info_dict + + +def gen_cross_static_info(crossid, nodeid, area_id, cross_ledger_info_dict): + division_list, company_list, slc_company_dict, slckind_list, detector_type_dict = gen_ledger_base_info(nodeid, area_id) slc_company = cross_ledger_info_dict['data']['ledger']['slc_company'] location = cross_ledger_info_dict['data']['ledger']['location'] slc_company_name = slc_company_dict[slc_company] if slc_company in slc_company_dict.keys() else '-' @@ -388,7 +394,7 @@ def query_cross_ledger_info(crossid, nodeid, area_id, userid): 'length_info': length_info, 'avg_length': avg_length } - return cross_static_info, cross_ledger_info_dict + return cross_static_info, cross_ledger_info_dict['data'] def gen_ledger_base_info(nodeid, area_id): @@ -444,6 +450,7 @@ def gen_road_delay_index(avg_cross_delay_info, roads_dir_dict): continue service_level = calc_service_level(road_index.delay_info.delay_time) road_flow_index[roadid] = { + 'src_dir': road_dir[roadid], 'stop_times': round(road_index.delay_info.stop_times, 2), 'high_park_percent': str(road_index.delay_info.high_park_percent) + '%', 'imbalance_index': round(road_index.delay_info.imbalance_index, 2), @@ -475,7 +482,7 @@ def gen_road_dir_dict(cross_ledger_info_dict): roads_dir_dict = {} if not cross_ledger_info_dict: return roads_dir_dict - roads = cross_ledger_info_dict['data']['roads'] + roads = cross_ledger_info_dict['roads'] if 'roads' in cross_ledger_info_dict.keys() else None for dir in roads.keys(): roads_dir_dict[dir] = { 'in': roads[dir]['roadid'], @@ -494,7 +501,7 @@ def calc_inroad_imbalance_index(inroad_delay_pb_list): max_stop_times = flow_delay_info.delay_info.stop_times if flow_delay_info.delay_info.stop_times < min_stop_times: min_stop_times = flow_delay_info.delay_info.stop_times - imbalance_index = round((max_stop_times - min_stop_times) / avg_stop_times, 2) + imbalance_index = round((max_stop_times - min_stop_times) / avg_stop_times, 2) if avg_stop_times > 0 and max_stop_times > 0 and min_stop_times != 99999 else 0 item.delay_info.imbalance_index = imbalance_index return inroad_delay_pb_list @@ -530,10 +537,11 @@ def gen_flow_turn_rate_index(avg_cross_delay_info, roads_dir_dict): if out_road_id != '-' and out_road_id in outroad_info_dict.keys(): out_car_num = outroad_info_dict[out_road_id].turn_info.car_num out_flow_rate = round(out_car_num / cross_out_sum_car_num, 2) if cross_out_sum_car_num != 0 else '-' - out_l_rate = round(outroad_info_dict[out_road_id].turn_info.turn_ratio_1 / cross_out_sum_car_num, 2) if cross_out_sum_car_num != 0 else '-' - out_s_rate = round(outroad_info_dict[out_road_id].turn_info.turn_ratio_0 / cross_out_sum_car_num, 2) if cross_out_sum_car_num != 0 else '-' - out_r_rate = round(outroad_info_dict[out_road_id].turn_info.turn_ratio_2 / cross_out_sum_car_num, 2) if cross_out_sum_car_num != 0 else '-' + out_l_rate = round(outroad_info_dict[out_road_id].turn_info.turn_ratio_1 / out_car_num, 2) if out_car_num != 0 else '-' + out_s_rate = round(outroad_info_dict[out_road_id].turn_info.turn_ratio_0 / out_car_num, 2) if out_car_num != 0 else '-' + out_r_rate = round(outroad_info_dict[out_road_id].turn_info.turn_ratio_2 / out_car_num, 2) if out_car_num != 0 else '-' road_flow_turn_rate[roadid] = { + 'src_dir': dir, 'in_flow_rate': in_flow_rate, 'out_flow_rate': out_flow_rate, 'l_rate': l_rate, @@ -566,8 +574,8 @@ def calc_tide_index(crossid, nodeid, date_list, roads_dir_dict): tide_index_list = [] am_tp_start, am_tp_end = 't700', '900' pm_tp_start, pm_tp_end = 't1700', '1900' - am_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, am_tp_start, am_tp_end) - pm_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, pm_tp_start, pm_tp_end) + am_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, am_tp_start) + pm_cross_delay_data_list = db_cross.query_cross_delay_info(crossid, nodeid, date_list, pm_tp_start) date_am_delay_info_dict = {item['day']: item['data'] for item in am_cross_delay_data_list} date_pm_delay_info_dict = {item['day']: item['data'] for item in pm_cross_delay_data_list} @@ -599,7 +607,7 @@ def calc_tide_index(crossid, nodeid, date_list, roads_dir_dict): if max_am_flow_road == '' or max_am_flow_road not in subtend_road_pair.keys(): tide_index_list.append(tide_index) continue - subtend_road_am_flow = am_inroad_info_dict[subtend_road_pair[max_am_flow_road]].delay_info.car_num + subtend_road_am_flow = am_inroad_info_dict[subtend_road_pair[max_am_flow_road]].delay_info.car_num if subtend_road_pair[max_am_flow_road] in am_inroad_info_dict.keys() else 0 # 如果进口道流量小于20 则无意义 if min(subtend_road_am_flow, max_am_flow) < 20: tide_index_list.append(tide_index) @@ -650,3 +658,498 @@ def gen_subtend_road_pair(roads_dir_dict): tmp_road_dir.pop(subtend_dir) return subtend_road_pair + + +def parse_single_cross_delay_info(crossid, nodeid, data_list, data_type, roads_dir_dict): + data_dict = {} + max_cross_car_num, max_road_car_num, max_flow_car_num = 0, 0, 0 + pb_data_list = [] + for item in data_list: + if item: + pb_data_list.append(item['data']) + road_delay_infos = item['data'].inroad_delay_infos if item['data'] and item['data'].inroad_delay_infos else [] + for road_delay_info in road_delay_infos: + if road_delay_info.delay_info.car_num > max_road_car_num: + max_road_car_num = road_delay_info.delay_info.car_num + flow_delay_infos = road_delay_info.flow_delay_infos + for flow_delay_info in flow_delay_infos: + if flow_delay_info.delay_info.car_num > max_flow_car_num: + max_flow_car_num = flow_delay_info.delay_info.car_num + max_cross_car_num = max( + (x for x in pb_data_list if x is not None), + key=lambda x: x.delay_info.car_num, + default=None + ).delay_info.car_num + for item in data_list: + tp_start = item['tp_start'] + tp_end = item['tp_end'] + day = item['day'] + if data_type == 'day': + key = day + elif data_type == 'hour': + key = convert_time(int(tp_start.replace('h', ''))) + '-' + convert_time(int(tp_end.replace('h', ''))) + else: + key = day + + item_cross_delay_info = item['data'] + # 指标内容依次是 停车次数 多次停车率 停车时间 延误时间 平均速度 不停车速度 相对流量 路口失衡系数 潮汐指数 服务水平 + overview_data = ['-', '-', '-', '-', '-', '-', '-', '-', '-', '-', + # 下方为上述指标变化率颜色展示flag + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] + tide_index = calc_tide_index(crossid, nodeid, [day], roads_dir_dict) if data_type == 'day' or data_type == 'week' else '-' + road_data_dict = {} + cross_car_num = 0 + if item_cross_delay_info: + cross_car_num = item_cross_delay_info.delay_info.car_num + road_delay_infos = item_cross_delay_info.inroad_delay_infos + max_stop_times = max(road_delay_infos, key=lambda x: x.delay_info.stop_times).delay_info.stop_times + min_stop_times = min(road_delay_infos, key=lambda x: x.delay_info.stop_times).delay_info.stop_times + cross_imbalance_index = round((max_stop_times - min_stop_times) / item_cross_delay_info.delay_info.stop_times, 2) if item_cross_delay_info.delay_info.stop_times != 0 else 0 + overview_data = [ + # 停车次数 + round(item_cross_delay_info.delay_info.stop_times, 2), + # 多次停车率 停车时间 + item_cross_delay_info.delay_info.high_park_percent, item_cross_delay_info.delay_info.park_time, + # 延误时间 平均速度 + item_cross_delay_info.delay_info.delay_time, item_cross_delay_info.delay_info.speed / 100, + # 不停车速度 相对流量占比 + item_cross_delay_info.delay_info.move_speed / 100, round(cross_car_num / max_cross_car_num * 100, 2) if max_cross_car_num != 0 else 0, + # 路口失衡系数 潮汐指数(当为小时级指标时,不计算该值) 服务水平 + cross_imbalance_index, tide_index[0], calc_service_level(item_cross_delay_info.delay_info.delay_time), + # 指标变化率颜色展示flag, 不含服务水平 + 0, 0, 0, 0, 0, 0, 0, 0, 0 + ] + inroad_delay_infos_with_imbalance = calc_inroad_imbalance_index(road_delay_infos) + road_data_dict = {item.inroadid: item for item in inroad_delay_infos_with_imbalance} + data_dict[key] = { + 'overview': overview_data, + 'roads_data': {} + } + src_dir_list = list(roads_dir_dict.keys()) + for src_dir in src_dir_list: + if roads_dir_dict[src_dir]['in'] != '-' and roads_dir_dict[src_dir]['in'] in road_data_dict.keys(): + road_car_num = road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.car_num + src_data = [ + # 停车次数 + round(road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.stop_times, 2), + # 多次停车率 + road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.high_park_percent, + # 转向失衡指数 + round(road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.imbalance_index, 2), + # 停车时间 + road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.park_time, + # 延误时间 + road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.delay_time, + # 平均速度 + road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.speed / 100, + # 不停车速度 + road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.move_speed / 100, + # 相对流量占比 + round(road_car_num / max_road_car_num * 100, 2) if max_road_car_num > 0 else '-', + # 进口道流量占比 + round(road_car_num / cross_car_num * 100, 2) if cross_car_num > 0 else '-', + # 服务水平 + calc_service_level(road_data_dict[roads_dir_dict[src_dir]['in']].delay_info.delay_time), + # 指标变化率颜色展示flag, 顺序为上述指标顺序, 不含服务水平 + 0, 0, 0, 0, 0, 0, 0, 0, 0 + ] + flow_delay_infos = road_data_dict[roads_dir_dict[src_dir]['in']].flow_delay_infos + flow_data_dict = {} + for flow_delay_info in flow_delay_infos: + turn_type = flow_delay_info.turn_type + if turn_type not in (0, 1): + continue + flow_data = [ + # 停车次数 + round(flow_delay_info.delay_info.stop_times, 2), + # 多次停车率 + flow_delay_info.delay_info.high_park_percent, + # 停车时间 + flow_delay_info.delay_info.park_time, + # 延误时间 + flow_delay_info.delay_info.delay_time, + # 平均速度 + flow_delay_info.delay_info.speed / 100, + # 不停车速度 + flow_delay_info.delay_info.move_speed / 100, + # 相对流量 + round(flow_delay_info.delay_info.car_num / max_flow_car_num * 100, 2) if max_flow_car_num > 0 else '-', + # 分流转向占比 + round(flow_delay_info.delay_info.car_num / road_car_num * 100, 2) if road_car_num > 0 else '-', + # 服务水平 + calc_service_level(flow_delay_info.delay_info.delay_time), + # 指标变化率颜色展示flag 不含服务水平 + 0, 0, 0, 0, 0, 0, 0, 0 + ] + flow_data_dict[turn_type] = flow_data + data_dict[key]['roads_data'][src_dir] = { + 'road': src_data, + 'flow': flow_data_dict + } + return data_dict + + +def calc_single_day_delay_info_change_rate(data_dict): + res_data_dict = copy.deepcopy(data_dict) + key_list = list(res_data_dict.keys()) + empty_overview = ['-', '-', '-', '-', '-', '-', '-', '-', '-', '-', + # 指标变化率颜色展示flag + 0, 0, 0, 0, 0, 0, 0, 0, 0] + for i in range(len(key_list)-1, -1, -1): + overview_data = res_data_dict[key_list[i]]['overview'] # list + roads_data = res_data_dict[key_list[i]]['roads_data'] # dict 内部包含多个方向路段和各个方向路段的流向级别的数据 + if i != 0: + prev_item_index = i - 1 + prev_overview_data = res_data_dict[key_list[prev_item_index]]['overview'] + prev_roads_data = res_data_dict[key_list[prev_item_index]]['roads_data'] + if overview_data != empty_overview and prev_overview_data != empty_overview: + overview_data_with_change_rate = calc_overview_change_rate(overview_data, prev_overview_data) + res_data_dict[key_list[i]]['overview'] = overview_data_with_change_rate + roads_data_wit_change_rate = calc_roads_data_change_rate(roads_data, prev_roads_data) + res_data_dict[key_list[i]]['roads_data'] = roads_data_wit_change_rate + return res_data_dict + + +def calc_overview_change_rate(overview_data, prev_overview_data): + res_data = copy.copy(overview_data) + for i in range(len(overview_data)): + if i >= 9: + continue + if overview_data[i] == '-' or prev_overview_data[i] == '-': + res_data[i + 10] = 0 + # 变化率 0表示正常 1表示恶化 2表示优化 + if i in (0, 1, 2, 3, 6, 7, 8): + # 指标提升代表恶化的有停车次数、多次停车率、转向失衡指数、停车时间、延误时间、相对流量、进口道流量占比 + rate = (overview_data[i] - prev_overview_data[i]) / prev_overview_data[i] * 100 if prev_overview_data[i] != '-' and prev_overview_data[i] > 0 else 0 + if rate < -20: + res_data[i + 10] = 1 + elif rate > 20: + res_data[i + 10] = 2 + else: + # 指标下降代表恶化的有平均速度、不停车速度 + rate = (overview_data[i] - prev_overview_data[i]) / prev_overview_data[i] * 100 if prev_overview_data[i] != '-' and prev_overview_data[i] > 0 else 0 + if rate > 20: + res_data[i + 10] = 1 + elif rate < -20: + res_data[i + 10] = 2 + return res_data + + +def calc_roads_data_change_rate(roads_data, prev_roads_data): + res_data = copy.deepcopy(roads_data) + src_dir_list = list(roads_data.keys()) + + for src_dir in src_dir_list: + src_dir_road_data = roads_data[src_dir]['road'] + if src_dir not in prev_roads_data.keys(): + continue + # 指标提升代表恶化的有停车次数、多次停车率、停车时间、延误时间、相对流量、分流转向占比 + prev_src_dir_road_data = prev_roads_data[src_dir]['road'] + for i in range(len(src_dir_road_data)): + if i >= 9: + continue + if src_dir_road_data[i] == '-' or prev_src_dir_road_data[i] == '-': + src_dir_road_data[i + 10] = 0 + if i in (0, 1, 2, 3, 4, 7, 8): + rate = (src_dir_road_data[i] - prev_src_dir_road_data[i]) / prev_src_dir_road_data[i] * 100 if prev_src_dir_road_data[i] > 0 else 0 + if rate < -20: + src_dir_road_data[i + 10] = 1 + elif rate > 20: + src_dir_road_data[i + 10] = 2 + else: + # 指标下降代表恶化的有平均速度、不停车速度 + rate = (src_dir_road_data[i] - prev_src_dir_road_data[i]) / prev_src_dir_road_data[i] * 100 if prev_src_dir_road_data[i] > 0 else 0 + if rate > 20: + src_dir_road_data[i + 10] = 1 + elif rate < -20: + src_dir_road_data[i + 10] = 2 + flow_datas = roads_data[src_dir]['flow'] + prev_flow_datas = prev_roads_data[src_dir]['flow'] + for turn_type in flow_datas.keys(): + flow_data = flow_datas[turn_type] + if turn_type not in prev_flow_datas.keys(): + continue + prev_flow_data = prev_flow_datas[turn_type] + for i in range(len(flow_data)): + if i >= 8: + continue + if flow_data[i] == '-' or prev_flow_data[i] == '-': + flow_data[i + 9] = 0 + if i in (0, 1, 2, 3, 6, 7): + rate = (flow_data[i] - prev_flow_data[i]) / prev_flow_data[i] * 100 if prev_flow_data[i] > 0 else 0 + if rate < -20: + flow_data[i + 9] = 1 + elif rate > 20: + flow_data[i + 9] = 2 + else: + rate = (flow_data[i] - prev_flow_data[i]) / prev_flow_data[i] * 100 if prev_flow_data[i] > 0 else 0 + if rate > 20: + flow_data[i + 9] = 1 + elif rate < -20: + flow_data[i + 9] = 2 + return res_data + + +def parse_data2pb(data_list): + res_list = [] + for row in data_list: + day = row['day'] + tp_start = row['tp_start'] + tp_end = row['tp_end'] + item_cross_delay_info = pb.xl_cross_delayinfo_t() + item_cross_delay_info.ParseFromString(row['data']) + res_list.append({ + 'day': day, + 'tp_start': tp_start, + 'tp_end': tp_end, + 'data': item_cross_delay_info + }) + return res_list + + +def gen_cross_problems(avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict): + # 运行效率、均衡调控、配时方案、路口渠化 + operating_efficiency_problems = gen_operating_efficiency_problems(avg_cross_delay_info, roads_dir_dict) + balanced_control_problems = gen_balanced_control_problems(avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict) + phase_problems = gen_phase_problems() + cross_channelized_problems = gen_cross_channelized_problems() + problems = [operating_efficiency_problems, balanced_control_problems, phase_problems, cross_channelized_problems] + return problems + + +def gen_operating_efficiency_problems(avg_cross_delay_info, roads_dir_dict): + if not avg_cross_delay_info: + return [{ + 'item': '运行效率', + 'values': [] + }] + road_delay_infos = avg_cross_delay_info.inroad_delay_infos + high_park_problems, high_park_suggestions = gen_high_park_problems(road_delay_infos, roads_dir_dict) + + operating_efficiency_problems = { + 'item': '运行效率', + 'values': [ + { + 'item': '多次排队', + 'detail': high_park_problems, + 'reason': '某一进口道转向的多次停车率大于15%', + 'suggestions': high_park_suggestions + }, + ] + } + pass + + +def gen_high_park_problems(road_delay_infos, roads_dir_dict): + detail = [{ + 'text': '未见异常' + }] + suggestion = [] + err_src_dict = {} + src_list = list(roads_dir_dict.keys()) + road_delays_dict = {item.inroadid: item for item in road_delay_infos} + for src_dir in src_list: + if roads_dir_dict[src_dir]['in'] == '-' or roads_dir_dict[src_dir]['in'] not in road_delays_dict.keys(): + continue + road_info = road_delays_dict[roads_dir_dict[src_dir]['in']] + flow_delay_infos = road_info.flow_delay_infos + turn_type_flow_delay_info_dict = {item.turn_type: item for item in flow_delay_infos} + for turn_type in turn_type_flow_delay_info_dict.keys(): + flow_delay_info = turn_type_flow_delay_info_dict[turn_type] + if turn_type not in (0, 1): + continue + if flow_delay_info.delay_info.high_park_percent > 15: + if src_dir not in err_src_dict.keys(): + err_src_dict[src_dir] = [str(turn_type) + ':' + str(flow_delay_info.delay_info.high_park_percent)] + else: + err_src_dict[src_dir].append(str(turn_type) + ':' + str(flow_delay_info.delay_info.high_park_percent)) + if err_src_dict: + detail = [] + for src_dir in err_src_dict.keys(): + src_detail = [] + for turn_type in err_src_dict[src_dir]: + flow_detail = { + 'turn_type': int_turn_type2str[int(turn_type.split(':')[0])], + 'text': '车辆多次停车率过大(', + 'percent': turn_type.split(':')[1] + '%),' + } + src_detail.append(flow_detail) + src_detail.append({ + 'text': '车辆需要多次排队才能通过' + }) + detail.append({ + 'src_dir': dir_str_dict[src_dir] + '进口', + 'src_detail': src_detail + }) + # todo 补充生成建议的逻辑 + return detail, suggestion + + +def gen_high_stop_time_problems(road_delay_infos, roads_dir_dict): + detail = [{ + 'text': '未见异常' + }] + suggestion = [] + # todo 需与产品再次确认逻辑 + pass + + +def gen_balanced_control_problems(avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict): + road_delay_infos = avg_cross_delay_info.inroad_delay_infos + cross_imbalance_detail, cross_imbalance_suggestions = gen_cross_imbalance_problems(road_delay_infos, roads_dir_dict) + turn_imbalance_detail, turn_imbalance_suggestions = gen_turn_imbalance_problems(road_delay_infos, roads_dir_dict, inroad_static_info_dict) + # todo 需要与产品确认路口潮汐问题诊断的逻辑 + balanced_control_problems = { + 'item': '均衡调控', + 'values': [ + { + 'item': '路口失衡', + 'detail': cross_imbalance_detail, + 'reason': '路口存在某个流向绿灯时长不足而另一个方向绿灯存在空放的现象', + 'suggestions': cross_imbalance_suggestions + }, + { + 'item': '转向失衡', + 'detail': turn_imbalance_detail, + 'reason': '同一进口道,直行与左转停车次数之差的绝对值大于0.5,且转向停车次数的最大值大于1', + 'suggestions': turn_imbalance_suggestions + }, + ] + } + return balanced_control_problems + + +# 路口失衡问题诊断 +def gen_cross_imbalance_problems(road_delay_infos, roads_dir_dict): + detail = [{ + 'text': '未见异常' + }] + suggestion = [] + road_src_dict = {item['in']: item for item in roads_dir_dict} + max_stop_times_road = max(road_delay_infos, key=lambda x: x.delay_info.stop_times) + min_stop_times_road = min(road_delay_infos, key=lambda x: x.delay_info.stop_times) + if max_stop_times_road.delay_info.stop_times > 1 \ + and max_stop_times_road.delay_info.stop_times - min_stop_times_road.delay_info.stop_times > 0.5: + max_roadid = max_stop_times_road.inroadid + min_roadid = min_stop_times_road.inroadid + max_src = road_src_dict[max_roadid] + min_src = road_src_dict[min_roadid] + detail = [ + { + 'src_dir': dir_str_dict[max_src] + '进口', + 'text': '的停车次数(' + str(round(max_stop_times_road.delay_info.stop_times, 2)) + ')与' + }, + { + 'src_dir': dir_str_dict[min_src] + '进口', + 'text': f"""的停车次数({str(round(max_stop_times_road.delay_info.stop_times, 2))})相差过大,两者之比为{int(round(max_stop_times_road.delay_info.stop_times, 2) / round(min_stop_times_road.delay_info.stop_times, 2) * 100)}%,分配的绿灯时长不匹配""" + } + ] + suggestion = [ + { + 'text': '调整', + 'max_src_dir': dir_str_dict[max_src] + '进口', + }, + { + 'text': '和', + 'min_src_dir': dir_str_dict[min_src] + '进口', + }, + { + 'text': '的绿灯时长的分配情况' + } + ] + if max_src == src_reverse[min_src]: + # todo 补充对向情况下额外建议的情况 + pass + return detail, suggestion + + +# 转向失衡问题诊断 +def gen_turn_imbalance_problems(road_delay_infos, roads_dir_dict, inroad_static_info_dict): + detail = [{ + 'text': '未见异常' + }] + suggestion = [] + err_road_dict = {} + road_src_dict = {item['in']: item for item in roads_dir_dict} + for road_delay_info in road_delay_infos: + inroadid = road_delay_info.inroadid + flow_delay_infos = road_delay_info.flow_delay_infos + turn_type_flow_delay_info_dict = {item.turn_type: item for item in flow_delay_infos} + lane_num_info = count_lsr(inroad_static_info_dict[inroadid]['lane_turn_info']) if inroadid in inroad_static_info_dict.keys() else None + if 0 in turn_type_flow_delay_info_dict.keys() and 1 in turn_type_flow_delay_info_dict.keys(): + s_stop_times = turn_type_flow_delay_info_dict[0].delay_info.stop_times + l_stop_times = turn_type_flow_delay_info_dict[1].delay_info.stop_times + if s_stop_times > 1 or l_stop_times > 1 and abs(l_stop_times - s_stop_times) > 0.5: + if s_stop_times > l_stop_times: + err_road_dict[road_src_dict[inroadid]] = ['直行' + ':' + str(round(s_stop_times, 2)), '左转' + ':' + str(round(l_stop_times, 2)), lane_num_info] + else: + err_road_dict[road_src_dict[inroadid]] = ['左转' + ':' + str(round(s_stop_times, 2)), '直行' + ':' + str(round(l_stop_times, 2)), lane_num_info] + if len(err_road_dict.keys()) > 0: + detail, suggestion, road_num_suggestion = [], [], [] + for src_dir in err_road_dict.keys(): + item_detail = [ + { + 'src_dir': dir_str_dict[src_dir] + '进口', + 'child_detail': [ + { + 'turn_type': err_road_dict[src_dir][0].split(':')[0], + }, + { + 'text': '的停车次数(' + str(err_road_dict[src_dir][0].split(':')[1]) + ')与' + }, + { + 'turn_type': err_road_dict[src_dir][1].split(':')[0], + }, + { + 'text': '的停车次数(' + str(err_road_dict[src_dir][1].split(':')[1]) + ')相差过大,两者之比为' + str(int(float(err_road_dict[src_dir][0].split(':')[1]) / float(err_road_dict[src_dir][1].split(':')[1]) * 100)) + '%,分配的绿灯时长不匹配' + } + ] + } + ] + detail.append(item_detail) + item_suggestion = [ + { + 'src_dir': dir_str_dict[src_dir] + '进口', + 'text': '信号灯直左分控,调整执行和左转车辆绿灯时长分配情况' + }, + # todo 补充配时相关建议 + ] + suggestion.extend(item_suggestion) + if err_road_dict[src_dir][2]: + road_num_suggestion.append( + { + 'src_dir': dir_str_dict[src_dir] + '进口', + 'text': '左转/直行/右转现有车道数分别为' + err_road_dict[src_dir][2] + } + ) + + lane_num_suggestion = [ + { + 'text': '调整', + 'src_dir': '、'.join([dir_str_dict[item] + '进口' for item in err_road_dict.keys()]) + }, + { + 'text': '车道分配情况' + } + ] + lane_num_suggestion.extend(road_num_suggestion) + suggestion = suggestion + lane_num_suggestion + + return detail, suggestion + + +def gen_phase_problems(): + pass + + +def gen_cross_channelized_problems(): + pass + + +def query_cross_phase(nodeid, crossid): + cross_phases, err = QueryCrossRunningPhase(nodeid, [crossid]) + if err or not cross_phases or cross_phases.code != 0: + return None + cross_phase_info = cross_phases.data[0] + + pass \ No newline at end of file diff --git a/app/tmnet_db_func.py b/app/tmnet_db_func.py index e856ab7..5503215 100644 --- a/app/tmnet_db_func.py +++ b/app/tmnet_db_func.py @@ -88,4 +88,8 @@ class TmnetDbHelper(TableDbHelperBase): def query_base_info(self, nodeid, area_id): sql = "select * from ledger.leger_base_info where nodeid= %d and area_id = %s" % (int(nodeid), int(area_id)) + return self.do_select(sql) + + def query_city_tp_info(self, nodeid, area_id): + sql = f"select tp_desc from cross_doctor_config.area_tp_config where nodeid = {nodeid} and area_id = {area_id}" return self.do_select(sql) \ No newline at end of file diff --git a/test.py b/test.py index b15d6be..6902306 100644 --- a/test.py +++ b/test.py @@ -40,7 +40,7 @@ def init(): def test_get_cross_delay_data(): - row_list = db_cross.query_cross_delay_info('CR_11987179_2632645', 350100, '20251013', 't830', '1200') + row_list = db_cross.query_cross_delay_info('CR_11987179_2632645', 350100, '20251013', 't830') data = row_list[0]['data'] cross_delay = pb.xl_cross_delayinfo_t() cross_delay.ParseFromString(data)