import json import os from datetime import timezone from app.user_worker import do_get_user_info from app.workstation_db_function import * from app.work_station_common import * from app.task_worker import * from app.phasetable_worker import * import proto.xlcomm_pb2 as pb def favorite(params, token): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) if not token: return json.dumps(make_common_res(2, '用户信息异常,请刷新一下后重试')) token_params = {'token': token} token_res = json.loads(do_get_user_info(token_params, token)) create_user_id = token_res['token']['userno'] create_user_name = token_res['token']['user_name'] favorite_type = int(check_param(params, 'favorite_type')) if not favorite_type or favorite_type not in (1, 2): return json.dumps(make_common_res(3, '缺少收藏类型')) favorite_id = check_param(params, 'favorite_id') if not favorite_id: return json.dumps(make_common_res(4, '缺少收藏路口或干线id')) favorite_name = check_param(params, 'favorite_name') if not favorite_name: return json.dumps(make_common_res(5, '缺少收藏路口或干线名称')) row_list = db_workstation.check_favorite_info_exists(nodeid, create_user_id, create_user_name, favorite_id, favorite_name, int(area_id)) if len(row_list) > 0: return json.dumps(make_common_res(6, '当前收藏内容已存在,请勿重复收藏')) ret = db_workstation.create_favorite_cross_artery(nodeid, create_user_id, create_user_name, favorite_type, favorite_id, favorite_name, int(area_id)) if int(ret) == 1: return json.dumps(make_common_res(0, '收藏成功')) def get_favorite_list(params, token): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) if not token: return json.dumps(make_common_res(2, '用户信息异常,请刷新一下后重试')) token_params = {'token': token} token_res = json.loads(do_get_user_info(token_params, token)) user_id = token_res['token']['userno'] user_name = token_res['token']['user_name'] row_list = db_workstation.query_favorite_info_list(nodeid, user_id, user_name, int(area_id)) cross_list, artery_list = [], [] for row in row_list: favorite_type = row['favorite_type'] favorite_id = row['favorite_id'] if favorite_type == 1: cross_list.append(favorite_id) elif favorite_type == 2: artery_list.append(favorite_id) res = make_common_res(0, 'ok') res['data'] = { 'cross_list': cross_list, 'artery_list': artery_list } return json.dumps(res, ensure_ascii=False) def get_favorite_data_list(params, token): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) page = int(check_param(params, 'page')) if not page: return json.dumps(make_common_res(3, '缺少分页页码')) page_size = int(check_param(params, 'page_size')) if not page_size or page_size == 0: return json.dumps(make_common_res(4, '缺少分页条数')) query_type = check_param(params, 'query_type') if not query_type or query_type not in ('cross', 'artery'): return json.dumps(make_common_res(5, '查询类型缺失或查询类型异常')) token_params = {'token': token} token_res = json.loads(do_get_user_info(token_params, token)) user_id = token_res['token']['userno'] user_name = token_res['token']['user_name'] row_list = db_workstation.query_favorite_info_list(nodeid, user_id, user_name, int(area_id)) cross_list, artery_list, cross_id_list = [], [], [] for row in row_list: favorite_type = row['favorite_type'] favorite_id = row['favorite_id'] favorite_name = row['favorite_name'] if favorite_type == 1: cross_list.append({'id': favorite_id, 'name': favorite_name}) cross_id_list.append(favorite_id) res = make_common_res(0, 'ok') res['data'] = [] res['total_pages'] = 0 res['total_num'] = 0 if len(cross_id_list) == 0 or query_type != 'cross': return json.dumps(res, ensure_ascii=False) # 插入实时数据 cross_info_list, cross_rt_data_list = [], [] if len(cross_id_list) > 0: print(cross_rt_data_list) cross_rt_data_list, error = read_cross_deday(nodeid, area_id, cross_id_list) if error: return json.dumps(make_common_res(2, error)) if len(cross_rt_data_list) == 0: return json.dumps(res, ensure_ascii=False) for cross_rt_data in cross_rt_data_list: cross_id = cross_rt_data['crossid'] if any(d['id'] == cross_id for d in cross_list): cross_info_list.append(cross_rt_data) start_index = (page - 1) * page_size end_index = start_index + page_size cross_info, error = get_cross_info(cross_id_list) if error: return json.dumps(make_common_res(2, error)) paginated_data = cross_info_list[start_index:end_index] res['total_pages'] = (len(cross_info_list) + page_size - 1) // page_size res['total_num'] = len(cross_info_list) res['data'] = gen_work_station_cross_data_list(paginated_data, cross_info) return json.dumps(res, ensure_ascii=False) def get_cross_info(cross_ids: []): cross_info, error = db_tmnet.query_cross_info_all(cross_ids) if error: return None, error result = {} for item_cross_info in cross_info: result[item_cross_info['crossid']] = { 'name': item_cross_info['name'], 'location': item_cross_info['location'], 'nodeid': item_cross_info['nodeid'], 'area_id': item_cross_info['area_id'] } return result, None def get_road_info(road_ids: []): roads_info, error = db_tmnet.query_road_info_all(road_ids) if error: return None, error result = {} for item_roads_info in roads_info: result[roads_info['roadid']] = { 'name': item_roads_info['name'], 'from_crossid': item_roads_info['from_crossid'], 'to_crossid': item_roads_info['to_crossid'], 'src_direct': item_roads_info['src_direct'], 'nodeid': item_roads_info['nodeid'] } return result, None def read_cross_deday(nodeid, area_id, cross_id_list): tp_info = [ "00:00-07:00", "07:00-09:00", "09:00-17:00", "17:00-19:00", "19:00-22:00", "22:00-23:59" ] tp_desc = db_tmnet.query_city_tp_info(nodeid, area_id) if tp_desc: tp_info = tp_desc[0]['tp_desc'].split(',') now_time = datetime.now().strftime('%H:%M') start, end = '', '' for index, item_tp_info in enumerate(tp_info): start, end = map(str, item_tp_info.split('-')) if now_time >= start and end >= now_time: start, end = map(str, tp_info[index - 1].split('-')) break cross_data_list, error = db_workstation.query_cross_delay_info(nodeid, f"t{int(start.replace(':', ''))}", cross_id_list) if error: return [], error if len(cross_data_list) <= 0: return [], None result = [] for item_cross_data_list in cross_data_list: item_delay_pb = pb.xl_cross_delayinfo_t() item_delay_pb.ParseFromString(item_cross_data_list['data']) result.append({ 'crossid': item_delay_pb.crossid, 'jam_index': item_delay_pb.delay_info.jam_index, 'unbalance_index': item_delay_pb.delay_info.imbalance_index, 'queue_len': item_delay_pb.delay_info.queue_len, 'stop_times': item_delay_pb.delay_info.stop_times, 'delay_time': item_delay_pb.delay_info.delay_time, }) return result, None def delete_favorite_list(params, token): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) ids = check_param(params, 'ids') if not ids or len(ids) < 1: return json.dumps(make_common_res(2, '缺少需要删除的收藏项')) if not token: return json.dumps(make_common_res(3, '用户信息异常,请刷新一下后重试')) ids_str = ", ".join([f"'{fid}'" for fid in ids]) token_params = {'token': token} token_res = json.loads(do_get_user_info(token_params, token)) user_id = token_res['token']['userno'] user_name = token_res['token']['user_name'] ret = db_workstation.delete_favorite_ids(nodeid, ids_str, user_id, user_name) if ret == len(ids): return json.dumps(make_common_res(0, '操作成功')) def get_workstation_task_list(params, token): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) if not token: return json.dumps(make_common_res(2, '用户信息异常,请刷新重试')) token_params = {'token': token} token_res = json.loads(do_get_user_info(token_params, token)) user_id = token_res['token']['userno'] area_list = db_user.query_areaid_list(user_id) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) user_name = token_res['token']['user_name'] pending_task, timeout_task, approaching_overdue_task, done_task = [], [], [], [] last_month_should_done, last_month_done, last_month_should_done_but_not, need_todo = 0, [], 0, 0 last_week_should_done, last_week_done, last_week_should_done_but_not, week_need_todo = 0, [], 0, 0 row_list = db_workstation.query_task_list(nodeid, user_name, area_id) current_date = datetime.now(timezone.utc).date() # current_date = datetime.strptime('2024-07-30', "%Y-%m-%d") yesterday_obj = current_date - timedelta(days=1) yesterday_date = yesterday_obj.strftime('%Y-%m-%d') first_day_of_this_month = current_date.replace(day=1) last_day_of_last_month_obj = first_day_of_this_month - timedelta(days=1) last_month = last_day_of_last_month_obj.strftime('%Y-%m') first_day_of_this_week = current_date - timedelta(days=current_date.weekday()) last_day_of_last_week_obj = first_day_of_this_week - timedelta(days=1) formatted_last_week = f'{last_day_of_last_week_obj.year}年第{last_day_of_last_week_obj.isocalendar()[1]}周' for row in row_list: task_name = row['task_name'] task_type = int(row['task_type']) plan_begin_time = int(row['plan_begin_time']) plan_end_time = int(row['plan_end_time']) task_state = int(row['task_state']) task_end_time = None if row['task_end_time']: task_end_time = int(row['task_end_time']) if task_state == 4: done_task.append({ 'task_name': task_name, 'task_type': task_type, 'plan_begin_time': plan_begin_time, 'plan_end_time': plan_end_time }) elif task_state != 0 and task_state != 4: pending_task.append({ 'task_name': task_name, 'task_type': task_type, 'plan_begin_time': plan_begin_time, 'plan_end_time': plan_end_time }) plan_endtime_date = datetime.fromtimestamp(plan_end_time // 1000, tz=timezone.utc).date() if plan_endtime_date < current_date and task_state != 0 and task_state != 4: timeout_task.append({ 'task_name': task_name, 'task_type': task_type, 'plan_begin_time': plan_begin_time, 'plan_end_time': plan_end_time }) elif current_date - plan_endtime_date <= timedelta(days=3) and task_state != 0 and task_state != 4: approaching_overdue_task.append({ 'task_name': task_name, 'task_type': task_type, 'plan_begin_time': plan_begin_time, 'plan_end_time': plan_end_time }) # task_statistics # month & week if task_end_time: end_time = datetime.fromtimestamp(task_end_time // 1000, tz=timezone.utc).date() end_time_year = end_time.year end_time_month = end_time.month end_time_week = end_time.isocalendar()[1] if task_state == 4 and f'{end_time_year}-{end_time_month}' == last_month: last_month_done.append(task_type) if plan_endtime_date < first_day_of_this_month: last_month_should_done += 1 if task_state == 4 and f'{end_time_year}年第{end_time_week}周' == formatted_last_week: last_week_done.append(task_type) if plan_endtime_date.year == end_time_year and plan_endtime_date.isocalendar()[1] == end_time_week: last_week_should_done += 1 else: if task_state in [1, 2, 3] and plan_endtime_date < first_day_of_this_month: last_month_should_done += 1 last_month_should_done_but_not += 1 if task_state in [1, 2, 3] and plan_endtime_date < first_day_of_this_week: last_week_should_done += 1 last_week_should_done_but_not += 1 if task_state in [1, 2, 3] and plan_endtime_date > last_day_of_last_month_obj: need_todo += 1 if task_state in [1, 2, 3] and plan_endtime_date >= first_day_of_this_week: week_need_todo += 1 yesterday_res = db_workstation.query_yesterday_task_data(yesterday_date, nodeid, user_name, area_id) task_statistics = gen_task_statistics_res(last_month_should_done, last_month_done, last_month_should_done_but_not, need_todo, last_week_should_done, last_week_done, last_week_should_done_but_not, week_need_todo, yesterday_res, last_day_of_last_month_obj, formatted_last_week, yesterday_date) pending_res = cal_task_type_num(pending_task) timeout_res = cal_task_type_num(timeout_task) approaching_overdue_res = cal_task_type_num(approaching_overdue_task) done_res = cal_task_type_num(done_task) res = make_common_res(0, 'ok') data = { 'join_task': { 'pending_res': pending_res, 'timeout_res': timeout_res, 'approaching_overdue_res': approaching_overdue_res, 'done_res': done_res }, 'task_statistics': task_statistics } res['data'] = data return json.dumps(res, ensure_ascii=False) # 路口优化页面 获取路口列表 接口1 def get_crosses_list(params): nodeid = check_nodeid(params) if not nodeid: return json.dumps(make_common_res(1, 'nodeid异常')) cross_list = [] for crossid in g_info_dict.get(g.nodeid)['g_crossroad_idset']: crossName = g_info_dict.get(g.nodeid)['g_roadnet'].query_cross(crossid).name cross_list.append({'crossid': crossid, 'crossName': crossName}) cross_useable_date = situation_db_pool.query_situation_usable_date()['cross'] data = { 'cross_list': cross_list, 'useable_date': cross_useable_date } res = make_common_res(0, 'ok') res['data'] = data return json.dumps(res, ensure_ascii=False) # 路口优化页面 获取指定路口指定日期的配时方案与异常情况列表 接口2 def get_cross_phase_symptom_info(params): tp_time_range_dict = {'早高峰': '07:00-09:00', '晚高峰': '17:00-19:00', '平峰': '06:00-07:00,09:00-17:00,19:00-22:00'} tp_dict = {'早高峰': 100, '晚高峰': 200, '平峰': 400} symptom_dict = { '对向失衡': 'opposite_tide', '路口溢流': 'inroad_overflow', '路口失衡': 'intersection_imbalance', '转向失衡': 'straight_left_imbalance', '左转低效': 'inefficient_left', '路口死锁': 'cross_deadlock' } nodeid = check_nodeid(params) if not nodeid: return json.dumps(make_common_res(1, 'nodeid异常')) crossid = check_param(params, 'crossid') if not crossid: return json.dumps(make_common_res(2, '缺少路口信息,请选择路口')) date = check_param(params, 'date') if not date: return json.dumps(make_common_res(3, '缺少查询日期')) date_type = check_param(params, 'date_type') weekday = datetime.strptime(date, '%Y%m%d').weekday() jj_crossid = db_mapping.query_jjcrossid_by_xlcrossid(nodeid, crossid) if not jj_crossid: return json.dumps(make_common_res(4, '路口不存在')) row_list = situation_db_pool.query_phase_data(jj_crossid, weekday + 1, crossid) symptom_list = situation_db_pool.query_symptom_info_by_crossid(nodeid, crossid, date, date_type=date_type) res_list = [] for item in row_list.keys(): plan_time_range = row_list[item]['tp_start'] + '-' + row_list[item]['tp_end'] row_list[item]['symptom_type_list'] = [] row_list[item]['symptom_detail_list'] = [] row_list[item]['todo_detail_list'] = [] row_list[item]['tp_desc_list'] = [] row_list[item]['desc_key_list'] = [] for symptom in symptom_list: symptom_detail = json.loads(str(symptom['symptom_detail']).replace("'", "\"")) todo_detail = json.loads(str(symptom['todo_detail']).replace("'", "\"")) if symptom['tp'] != '平峰': symptom_time_range = tp_time_range_dict.get(symptom['tp']) if has_overlap(plan_time_range, symptom_time_range): tp_num = tp_dict.get(symptom['tp']) symptom_desc = symptom_dict.get(symptom['symptom_type']) desc_key = situation_db_pool.query_symptom_desckey(crossid, tp_num, symptom_desc, date_type=date_type) row_list[item]['symptom_type_list'].append({ 'symptom_type': symptom['symptom_type'], 'symptom_type_eng': symptom_desc }) row_list[item]['symptom_detail_list'].append(symptom_detail) row_list[item]['tp_desc_list'].append(symptom['tp']) row_list[item]['desc_key_list'].append(desc_key) row_list[item]['todo_detail_list'].append(todo_detail) else: symptom_time_range_list = tp_time_range_dict.get(symptom['tp']).split(',') for symptom_time_range in symptom_time_range_list: if has_overlap(plan_time_range, symptom_time_range): tp_num = tp_dict.get(symptom['tp']) symptom_desc = symptom_dict.get(symptom['symptom_type']) desc_key = situation_db_pool.query_symptom_desckey(crossid, tp_num, symptom_desc, date_type=date_type) row_list[item]['symptom_type_list'].append({ 'symptom_type': symptom['symptom_type'], 'symptom_type_eng': symptom_desc }) row_list[item]['symptom_detail_list'].append(symptom_detail) row_list[item]['tp_desc_list'].append(symptom['tp']) row_list[item]['desc_key_list'].append(desc_key) row_list[item]['todo_detail_list'].append(todo_detail) continue res_list.append(row_list[item]) res = make_common_res(0, 'ok') res['data'] = { 'nodeid': nodeid, 'date': date, 'crossid': crossid, 'res_list': res_list } return json.dumps(res, ensure_ascii=False) def parse_time(time_str): """将时间字符串转换为datetime.time对象""" return datetime.strptime(time_str, '%H:%M').time() def has_overlap(time_range1, time_range2): """判断两个时间段是否有交集""" # 解析时间段 start1, end1 = map(parse_time, time_range1.split('-')) start2, end2 = map(parse_time, time_range2.split('-')) # 检查交集条件 return start1 < end2 and start2 < end1 # 路口优化页面 获取路口已存在的优化任务信息 接口5 def get_cross_optimize_plan_detail(params): nodeid = check_nodeid(params) if not nodeid: return json.dumps(make_common_res(1, 'nodeid异常')) crossid = check_param(params, 'crossid') if not crossid: return json.dumps(make_common_res(2, '缺少路口id信息')) planid = check_param(params, 'planid') if not planid: return json.dumps(make_common_res(3, '缺少方案id信息')) tp_start = check_param(params, 'tp_start') if not tp_start: return json.dumps(make_common_res(4, '缺少时段开始时刻')) tp_end = check_param(params, 'tp_end') if not tp_end: return json.dumps(make_common_res(5, '缺少时段结束时刻')) taskId = check_param(params, 'taskid') if not taskId: return json.dumps(make_common_res(6, '缺少任务id信息')) crossName = g_info_dict.get(g.nodeid)['g_roadnet'].query_cross(crossid).name init_plan_res, target_plan_res, his_plan_list, edit_plan_res, delay_matrix_res, delay_matrix_now_res = None, None, {}, None, None, None init_weekday = '' init_plan_str, delay_input_str = situation_db_pool.query_init_info(nodeid, taskId) if init_plan_str: init_plan_res = Plan.parse_from_json(init_plan_str).to_dict() if delay_input_str: delay_matrix_res = DelayMatrix.parse_from_json(delay_input_str).to_dict() init_weekday = datetime.strptime(delay_matrix_res['date'], '%Y%m%d').weekday() query_type = check_param(params, 'type') if not query_type or query_type == 'all': target_plan_str, edit_plan_str, his_plan_dict = situation_db_pool.query_optimize_records(nodeid, taskId) if target_plan_str: target_plan_res = Plan.parse_from_json(target_plan_str).to_dict() if edit_plan_str: edit_plan_res = Plan.parse_from_json(edit_plan_str).to_dict() for seq in his_plan_dict.keys(): item = Plan.parse_from_json(his_plan_dict[seq]).to_dict() timestamp = str(datetime.fromtimestamp(item['timestamp']).__format__('%Y%m%d %H:%M')) his_plan_list[timestamp] = item if init_weekday: max_date = get_max_usable_date(init_weekday) dm_now = get_delay_matrix(str(max_date), tp_start, tp_end, crossid) delay_matrix_now_res = dm_now.to_dict() # i_list = calc_dm_delay_change_detail(delay_matrix_res['delay_matrix'], delay_matrix_now_res['delay_matrix']) # delay_matrix_now_res['change_flag'] = i_list res = make_common_res(0, 'ok') res['data'] = { 'nodeid': nodeid, 'crossid': crossid, 'crossName': crossName, 'planid': planid, 'tp_start': tp_start, 'tp_end': tp_end, 'optimize_taskid': taskId, 'init_plan': init_plan_res, # 初始方案 'target_plan': target_plan_res, # 推荐方案 'his_plan': his_plan_list, # 历史方案 'edit_plan': edit_plan_res, # 编辑方案 'delay_matrix': delay_matrix_res, # 初始关键指标延误矩阵 'delay_matrix_now': delay_matrix_now_res, # 当前最新的关键指标延误矩阵 } return json.dumps(res, ensure_ascii=False) def get_max_usable_date(init_weekday): cross_useable_date = situation_db_pool.query_situation_usable_date()['cross'] max_date = 00000000 for date in cross_useable_date['hisday']: tmp_weekday = datetime.strptime(str(date), '%Y%m%d').weekday() if tmp_weekday == init_weekday and int(date) > int(max_date): max_date = date return max_date def get_delay_matrix(date_str, tp_start, tp_end, crossid): roadnet = g_info_dict.get(g.nodeid)['g_roadnet'] crossform = CrossForm.extract_cross_form(roadnet, crossid) start_time = str(date_str) + ' ' + str(tp_start) end_time = str(date_str) + ' ' + str(tp_end) start_timestamp = datetime.strptime(start_time, '%Y%m%d %H:%M').timestamp() end_timestamp = datetime.strptime(end_time, '%Y%m%d %H:%M').timestamp() flow_delay_list = db.query_delay_matrix(start_timestamp, end_timestamp, crossid) for item in flow_delay_list: inroadid = item['inroadid'] dirname = crossform.inroad_dirname_map[inroadid] item['dirname'] = dirname dm = DelayMatrix() dirname_list = list(crossform.dirname_turns_map.keys()) dm.set_inroad_names(dirname_list) dm.add_flow_delay_data(flow_delay_list) dm.disable_all_badturns(crossform.dirname_turns_map) dm.date = date_str dm.tp_start = tp_start dm.tp_end = tp_end return dm # 路口优化页面 获取路口方案与延误矩阵信息 接口3 def get_cross_init_info(params): nodeid = check_nodeid(params) if not nodeid: return json.dumps(make_common_res(1, 'nodeid异常')) crossid = check_param(params, 'crossid') if not crossid: return json.dumps(make_common_res(2, '缺少路口id信息')) date = check_param(params, 'date') if not date: return json.dumps(make_common_res(3, '缺少日期信息')) planid = check_param(params, 'planid') if not planid: return json.dumps(make_common_res(4, '缺少方案id信息')) tp_start = check_param(params, 'tp_start') if not tp_start: return json.dumps(make_common_res(5, '缺少时段开始时刻')) tp_end = check_param(params, 'tp_end') if not tp_end: return json.dumps(make_common_res(6, '缺少时段结束时刻')) delay_matrix = get_delay_matrix(str(date), tp_start, tp_end, crossid) plan: Plan = get_cross_init_plan(nodeid, crossid, planid) symptom_list = check_param(params, 'symptom_list') if not symptom_list or len(symptom_list) == 0: logging.info('异常类型为空') else: s_list = [] for item in symptom_list: info = {} info['daytype'] = 'hisday' info['crossid'] = crossid info['day'] = date info['tp'] = 0 info['duration'] = 0 info['desc_key'] = item['desc_key'] info['type'] = item['symptom_type_eng'] s = SymptomRecord.make_from_dict(info) s_list.append(s) OptimizePlanTool.judge_plan_adjust_type(plan, s_list) res = make_common_res(0, 'ok') res['data'] = { 'nodeid': nodeid, 'crossid': crossid, 'planid': planid, 'tp_start': tp_start, 'tp_end': tp_end, 'date': date, 'init_plan': plan.to_dict(), 'delay_matrix': delay_matrix.to_dict() } return json.dumps(res, ensure_ascii=False) def get_cross_init_plan(nodeid, crossid, planid): crossid_mapping = g_info_dict.get(nodeid)['g_jj_cross'] jj_crossid = crossid_mapping[crossid] phasetable_data = db_phasetable.query_phasetable(nodeid, jj_crossid) plan_data = phasetable_data['plans'][int(planid)] phase_data: dict = phasetable_data['phases'] plan = Plan() plan.init_from_dict(plan_data) for info in plan_data['stages']: stage = PhaseStage() stage.init_from_dict(info) plan.add_stage(stage) for phase_info in phase_data.values(): phase = PhaseInfo() phase.init_from_dict(phase_info) plan.add_phase(phase) plan.update_stages_max_min_green() return plan # 路口优化页面 生成方案,创建路口配时方案优化方案 接口4 def gen_cross_optimize(params): tp_dict = {'早高峰': 100, '晚高峰': 200, '平峰': 400} nodeid = check_nodeid(params) if not nodeid: return json.dumps(make_common_res(1, 'nodeid异常')) crossid = check_param(params, 'crossid') if not crossid: return json.dumps(make_common_res(2, '缺少路口id信息')) planid = check_param(params, 'planid') if not planid: return json.dumps(make_common_res(3, '缺少方案id信息')) tp_start = check_param(params, 'tp_start') if not tp_start: return json.dumps(make_common_res(4, '缺少时段开始时刻')) tp_end = check_param(params, 'tp_end') if not tp_end: return json.dumps(make_common_res(5, '缺少时段结束时刻')) init_plan_str = check_param(params, 'init_plan') if not init_plan_str: return json.dumps(make_common_res(6, '缺少初始方案信息')) init_plan = Plan.parse_from_json(init_plan_str) delay_matrix_str = check_param(params, 'delay_matrix') if not delay_matrix_str: return json.dumps(make_common_res(7, '缺少初始延误数据矩阵信息')) date = check_param(params, 'date') if not date: return json.dumps(make_common_res(8, '缺少日期信息')) delay_matrix = DelayMatrix.parse_from_json(delay_matrix_str) roadnet = g_info_dict.get(g.nodeid)['g_roadnet'] crossName = roadnet.query_cross(crossid).name cross_form = CrossForm.extract_cross_form(roadnet, crossid) symptom_list = check_param(params, 'symptom_list') init_plan_id = init_plan.planid init_plan_name = init_plan.name if not symptom_list or len(symptom_list) == 0: logging.info('异常类型为空') target_plan, diff_list = OptimizePlanTool.do_plan_optimize(cross_form, init_plan, delay_matrix) else: symptom_list = json.loads(symptom_list) s_list = [] for item in symptom_list: item['daytype'] = 'hisday' item['crossid'] = crossid item['day'] = date item['tp'] = tp_dict[item['tp_desc']] item['duration'] = 0 s = SymptomRecord.make_from_dict(item) s_list.append(s) target_plan, diff_list = OptimizePlanTool.do_plan_optimize(cross_form, init_plan, delay_matrix, s_list) if not target_plan: return json.dumps(make_common_res(10, '未能生成方案')) now_timestamp = datetime.now().timestamp() init_plan.timestamp = now_timestamp target_plan.timestamp = now_timestamp optimize_taskid = gen_optimize_taskid(nodeid, crossid, planid, tp_start, tp_end) sum_success_ret = 0 ret = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, planid, tp_start, tp_end, 'init_plan', init_plan_str, nodeid, 0, init_plan_id, init_plan_name) if ret: sum_success_ret += 1 ret = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, planid, tp_start, tp_end, 'delay_matrix', delay_matrix_str, nodeid, 0, init_plan_id, init_plan_name) if ret: sum_success_ret += 1 ret = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, planid, tp_start, tp_end, 'target_plan', json.dumps(target_plan.to_dict(), ensure_ascii=False), nodeid, 0, init_plan_id, init_plan_name) if ret: sum_success_ret += 1 if sum_success_ret == 3: res = make_common_res(0, 'ok') res['data'] = { 'nodeid': nodeid, 'crossid': crossid, 'planid': planid, 'tp_start': tp_start, 'tp_end': tp_end, 'crossName': crossName, 'optimize_taskid': optimize_taskid, 'init_plan': init_plan.to_dict(), 'target_plan': target_plan.to_dict(), 'his_plans': [], 'edit_plan': None, 'delay_matrix': delay_matrix.to_dict(), 'delay_matrix_now': None } return json.dumps(res, ensure_ascii=False) else: return json.dumps(make_common_res(11, '保存方案优化信息失败')) def gen_optimize_taskid(nodeid, crossid, planid, tp_start, tp_end): current_timestamp = time.time() optimize_taskid = md5_hash(f"""{nodeid}-{crossid}-{planid}-{tp_start}-{tp_end}-{current_timestamp}""") return optimize_taskid def save_edit_plan(params): nodeid = check_nodeid(params) if not nodeid: return json.dumps(make_common_res(1, 'nodeid异常')) optimize_taskid = check_param(params, 'taskid') if not optimize_taskid: return json.dumps(make_common_res(2, '缺少优化任务id')) crossid = check_param(params, 'crossid') if not crossid: return json.dumps(make_common_res(3, '缺少路口id信息')) planid = check_param(params, 'planid') if not planid: return json.dumps(make_common_res(4, '缺少方案id信息')) tp_start = check_param(params, 'tp_start') if not tp_start: return json.dumps(make_common_res(5, '缺少时段开始时刻')) tp_end = check_param(params, 'tp_end') if not tp_end: return json.dumps(make_common_res(6, '缺少时段结束时刻')) edit_plan = check_param(params, 'edit_plan') if not edit_plan: return json.dumps(make_common_res(7, '缺少编辑方案信息')) init_plan_id = check_param(params, 'init_planid') if not init_plan_id: return json.dumps(make_common_res(8, '缺少初始方案id')) init_plan_name = check_param(params, 'init_planname') if not init_plan_name: return json.dumps(make_common_res(9, '缺少初始方案名称')) new_planid = Plan.parse_from_json(edit_plan).planid # 表示当前的操作是更新操作 modify_params = gen_modify_params(planid, crossid, nodeid, edit_plan) if new_planid == planid: modify_res = json.loads(modify_plan_stage(modify_params)) else: # 当前的方案号不存在 当前的操作是创建新的方案 modify_res = json.loads(add_plan_stage(modify_params)) flag, status = situation_db_pool.check_exists_edit_plan(optimize_taskid) tmp_plan = Plan.parse_from_json(edit_plan) tmp_plan.timestamp = datetime.now().timestamp() edit_plan = tmp_plan.to_dict() if flag: res = situation_db_pool.update_optimize_edit_plan(optimize_taskid, edit_plan, new_planid) else: res = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, planid, tp_start, tp_end, 'edit_plan', json.dumps(edit_plan, ensure_ascii=False), nodeid, status, init_plan_id, init_plan_name) if res and modify_res['status'] == 0: return json.dumps(make_common_res(0, '操作成功')) else: return json.dumps(modify_res) def issued_edit_plan(params): nodeid = check_nodeid(params) if not nodeid: return json.dumps(make_common_res(1, 'nodeid异常')) optimize_taskid = check_param(params, 'taskid') if not optimize_taskid: return json.dumps(make_common_res(2, '缺少优化任务id')) issued_plan = check_param(params, 'issued_plan') if not issued_plan: return json.dumps(make_common_res(3, '缺少下发方案信息')) crossid = check_param(params, 'crossid') if not crossid: return json.dumps(make_common_res(4, '缺少路口id信息')) planid = check_param(params, 'planid') if not planid: return json.dumps(make_common_res(5, '缺少方案id信息')) tp_start = check_param(params, 'tp_start') if not tp_start: return json.dumps(make_common_res(6, '缺少时段开始时刻')) tp_end = check_param(params, 'tp_end') if not tp_end: return json.dumps(make_common_res(7, '缺少时段结束时刻')) init_plan_id = check_param(params, 'init_planid') if not init_plan_id: return json.dumps(make_common_res(8, '缺少初始方案id')) init_plan_name = check_param(params, 'init_planname') if not init_plan_name: return json.dumps(make_common_res(9, '缺少初始方案名称')) # 判定当前id是否存在his_plan,如果存在则取出tag字段 截取出当前最大的iter_plan_x中x值,然后+1,否则tag='iter_plan_1' plan_seq = 0 his_plan_list = situation_db_pool.query_optimize_his_plans(optimize_taskid) new_planid = Plan.parse_from_json(issued_plan).planid modify_params = gen_modify_params(planid, crossid, nodeid, issued_plan) if planid == new_planid: modify_res = json.loads(modify_plan_stage(modify_params)) else: # 当前的方案号不存在 当前的操作是创建新的方案 modify_res = json.loads(add_plan_stage(modify_params)) for his_plan in his_plan_list: item_seq = int(str(his_plan['tag']).split('iter_plan_')[1]) if item_seq > plan_seq: plan_seq = item_seq tmp_plan = Plan.parse_from_json(issued_plan) tmp_plan.timestamp = datetime.now().timestamp() issued_plan = tmp_plan.to_dict() # 当his_plan不存在时说明是第一次下发,还需要将该taskid的状态字段status更新为迭代中 也就是 1 if plan_seq == 0: ret = situation_db_pool.update_optimize_records_status(optimize_taskid, 1) if not ret: return json.dumps(make_common_res(10, '更新状态失败')) flag, status = situation_db_pool.check_exists_edit_plan(optimize_taskid) if flag: ret = situation_db_pool.update_optimize_edit_plan(optimize_taskid, issued_plan, new_planid) else: ret = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, new_planid, tp_start, tp_end, 'edit_plan', json.dumps(issued_plan, ensure_ascii=False), nodeid, status, init_plan_id, init_plan_name) if not ret: return json.dumps(make_common_res(11, '保存方案失败')) ret = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, new_planid, tp_start, tp_end, f'iter_plan_{plan_seq + 1}', json.dumps(issued_plan, ensure_ascii=False), nodeid, status, init_plan_id, init_plan_name) if not ret: return json.dumps(make_common_res(12, '保存方案失败')) else: # 说明方案已经下发过 当前任务状态为1 :迭代中,需要将当前方案保存为his_plan中的一个 且将当前编辑的方案保存为编辑方案 ret1 = situation_db_pool.update_optimize_edit_plan(optimize_taskid, issued_plan, new_planid) ret2 = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, new_planid, tp_start, tp_end, f'iter_plan_{plan_seq + 1}', json.dumps(issued_plan, ensure_ascii=False), nodeid, 1, init_plan_id, init_plan_name) if not ret1 or not ret2: return json.dumps(make_common_res(13, '保存方案失败')) row_list = situation_db_pool.query_optimize_issued_plan(optimize_taskid) edit_plan, his_plans = None, [] for row in row_list: tag = row['tag'] if tag == 'edit_plan': edit_plan = Plan.parse_from_json(row['content']).to_dict() else: his_plans.append(Plan.parse_from_json(row['content']).to_dict()) if modify_res['status'] == 0: res = make_common_res(0, 'ok') res['data'] = { 'edit_plan': edit_plan, 'his_plans': his_plans } return json.dumps(res, ensure_ascii=False) else: return json.dumps(modify_res) def update_optimize_plan_status(params): nodeid = check_nodeid(params) if not nodeid: return json.dumps(make_common_res(1, 'nodeid异常')) optimize_taskid = check_param(params, 'optimize_taskid') if not optimize_taskid: return json.dumps(make_common_res(2, '缺少优化任务id')) status = check_param(params, 'status') if not status or status not in ('finish', 'abort'): return json.dumps(make_common_res(3, '缺少状态信息')) ret = situation_db_pool.update_optimize_plan_status_byid(nodeid, optimize_taskid, status) if ret: return json.dumps(make_common_res(0, 'ok')) else: return json.dumps(make_common_res(4, '更新失败')) def gen_modify_params(planid, crossid, nodeid, edit_plan): plan_dict = Plan.parse_from_json(edit_plan).to_dict() new_plan_stages = [] plan_stages = plan_dict['stages'] plan_phases = plan_dict['phases'] plan_phase_dict = {} for phase in plan_phases: plan_phase_dict[phase['phaseid']] = phase for stage in plan_stages: phaseids = stage['phaseids'].split(',') phaseid_names = plan_phase_dict[int(phaseids[0])]['name'] max_green, min_green = plan_phase_dict[int(phaseids[0])]['max_green'], plan_phase_dict[int(phaseids[0])][ 'min_green'] if len(phaseids) > 1: for item in phaseids[1:]: phaseid_names += f",{plan_phase_dict[int(item)]['name']}" new_plan_stages.append({ 'stageid': stage['stageid'], 'stage_name': stage['name'], 'stage_duration': stage['duration'], 'green': stage['green'], 'redyellow': stage['redyellow'], 'yellow': stage['yellow'], 'allred': stage['allred'], 'phaseids': stage['phaseids'], 'phaseid_names': phaseid_names, 'max_green': max_green, 'min_green': min_green, 'phases': phaseid_names }) modify_params = { 'planid': planid, 'plan_name': plan_dict['name'], 'offset': plan_dict['offset'], 'cycle': plan_dict['cycle'], 'crossid': crossid, 'stages': new_plan_stages, 'nodeid': nodeid } return modify_params def calc_dm_delay_change_detail(dm, dm_now): i_list = [] if dm is not None and dm_now is not None: # 同时遍历两个三维数组 for i in range(len(dm)): j_list = [] for j in range(len(dm[i])): flag_list = [] for k in range(len(dm[i][j])): # 比较相同位置的元素 value1 = dm[i][j][k] if dm[i][j][k] != '-' else 0 value2 = dm_now[i][j][k] if dm_now[i][j][k] != '-' else 0 if float(value2) > float(value1): flag = 1 elif float(value2) < float(value1): flag = -1 else: flag = 0 flag_list.append(flag) j_list.append(flag_list) i_list.append(j_list) return i_list def download_optimize_plan(params): nodeid = check_nodeid(params) if not nodeid: return json.dumps(make_common_res(1, 'nodeid异常')) crossid = check_param(params, 'crossid') if not crossid: return json.dumps(make_common_res(2, '缺少路口id')) cross_info = g_info_dict.get(g.nodeid)['g_roadnet'].query_cross(crossid) download_plan = check_param(params, 'download_plan') if not download_plan: return json.dumps(make_common_res(3, '缺少下载方案')) download_plan_obj = Plan.parse_from_json(download_plan) download_plan_dict = download_plan_obj.to_dict() wb = Workbook() sheet1 = wb.active sheet1.title = "配时方案表" sheet1.append(t_sheet_cols['配时方案表']['head']) sheet2 = wb.create_sheet("阶段表") sheet2.append(t_sheet_cols['阶段表']['head']) stages = download_plan_dict['stages'] for index, stage in enumerate(stages): num = index + 1 sheet1.append([num, download_plan_dict['planid'], download_plan_dict['name'], download_plan_dict['cycle'], num, stage['stageid'], stage['duration']]) sheet2.append([num, stage['stageid'], stage['name'], stage['yellow'], stage['allred'], stage['phaseids']]) sheet3 = wb.create_sheet("相位表") sheet3.append(t_sheet_cols['相位表']['head']) phases = download_plan_dict['phases'] for index, phase in enumerate(phases): num = index + 1 sheet3.append([num, phase['phaseid'], phase['name'], phase['min_green'], phase['max_green']]) excel_data = BytesIO() wb.save(excel_data) excel_data.seek(0) return send_file( excel_data, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', as_attachment=True, download_name=f"{cross_info.name}_{download_plan_obj.name}_配时方案.xlsx") def get_road_net_detail(params): nodeid = check_param(params, 'nodeid') if not nodeid: return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) area_id = check_param(params, 'area_id') if not area_id: return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) roadid = check_param(params, 'roadid') if not roadid: return json.dumps(make_common_res(3, '缺少roadid, 请刷新后重试')) userid = check_param(params, 'userid') if not userid: return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) area_list = db_user.query_areaid_list(userid) if not area_list or len(area_list) < 1: return json.dumps(make_common_res(5, '用户信息异常')) area_list = map(int, area_list) if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) road_info, error = get_road_info([roadid]) if error: return json.dumps(make_common_res(2, f"{error}")) return json.dumps(make_common_res(0, road_info[roadid]['src_direct']))