cross_doctor/app/workstation_worker.py

1089 lines
49 KiB
Python
Raw Normal View History

import json
import os
from datetime import timezone
from app.user_worker import do_get_user_info
from app.workstation_db_function import *
from app.work_station_common import *
from app.task_worker import *
from app.phasetable_worker import *
import proto.xlcomm_pb2 as pb
def favorite(params, token):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
if not token:
return json.dumps(make_common_res(2, '用户信息异常,请刷新一下后重试'))
token_params = {'token': token}
token_res = json.loads(do_get_user_info(token_params, token))
create_user_id = token_res['token']['userno']
create_user_name = token_res['token']['user_name']
favorite_type = int(check_param(params, 'favorite_type'))
if not favorite_type or favorite_type not in (1, 2):
return json.dumps(make_common_res(3, '缺少收藏类型'))
favorite_id = check_param(params, 'favorite_id')
if not favorite_id:
return json.dumps(make_common_res(4, '缺少收藏路口或干线id'))
favorite_name = check_param(params, 'favorite_name')
if not favorite_name:
return json.dumps(make_common_res(5, '缺少收藏路口或干线名称'))
row_list = db_workstation.check_favorite_info_exists(nodeid, create_user_id, create_user_name, favorite_id,
favorite_name, int(area_id))
if len(row_list) > 0:
return json.dumps(make_common_res(6, '当前收藏内容已存在,请勿重复收藏'))
ret = db_workstation.create_favorite_cross_artery(nodeid, create_user_id, create_user_name, favorite_type,
favorite_id, favorite_name, int(area_id))
if int(ret) == 1:
return json.dumps(make_common_res(0, '收藏成功'))
def get_favorite_list(params, token):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
if not token:
return json.dumps(make_common_res(2, '用户信息异常,请刷新一下后重试'))
token_params = {'token': token}
token_res = json.loads(do_get_user_info(token_params, token))
user_id = token_res['token']['userno']
user_name = token_res['token']['user_name']
row_list = db_workstation.query_favorite_info_list(nodeid, user_id, user_name, int(area_id))
cross_list, artery_list = [], []
for row in row_list:
favorite_type = row['favorite_type']
favorite_id = row['favorite_id']
if favorite_type == 1:
cross_list.append(favorite_id)
elif favorite_type == 2:
artery_list.append(favorite_id)
res = make_common_res(0, 'ok')
res['data'] = {
'cross_list': cross_list,
'artery_list': artery_list
}
return json.dumps(res, ensure_ascii=False)
def get_favorite_data_list(params, token):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
page = int(check_param(params, 'page'))
if not page:
return json.dumps(make_common_res(3, '缺少分页页码'))
page_size = int(check_param(params, 'page_size'))
if not page_size or page_size == 0:
return json.dumps(make_common_res(4, '缺少分页条数'))
query_type = check_param(params, 'query_type')
if not query_type or query_type not in ('cross', 'artery'):
return json.dumps(make_common_res(5, '查询类型缺失或查询类型异常'))
token_params = {'token': token}
token_res = json.loads(do_get_user_info(token_params, token))
user_id = token_res['token']['userno']
user_name = token_res['token']['user_name']
row_list = db_workstation.query_favorite_info_list(nodeid, user_id, user_name, int(area_id))
cross_list, artery_list, cross_id_list = [], [], []
for row in row_list:
favorite_type = row['favorite_type']
favorite_id = row['favorite_id']
favorite_name = row['favorite_name']
if favorite_type == 1:
cross_list.append({'id': favorite_id, 'name': favorite_name})
cross_id_list.append(favorite_id)
res = make_common_res(0, 'ok')
res['data'] = []
res['total_pages'] = 0
res['total_num'] = 0
if len(cross_id_list) == 0 or query_type != 'cross':
return json.dumps(res, ensure_ascii=False)
start_index = (page - 1) * page_size
end_index = start_index + page_size
cross_info, error = get_cross_info(cross_id_list)
if error:
logging.error('查询路口信息失败', error)
return json.dumps(make_common_res(2, '查询路口信息失败'))
for cross in cross_info.keys():
cross_info[cross]['jam_index'] = '-'
cross_info[cross]['unbalance_index'] = '-'
cross_info[cross]['flow'] = '-'
cross_info[cross]['queue_len'] = '-'
cross_info[cross]['stop_times'] = '-'
cross_info[cross]['delay_time'] = '-'
# 插入实时数据
cross_rt_data, error = read_cross_deday(nodeid, area_id, cross_id_list)
if error:
logging.error('查询路口实时数据失败', error)
return json.dumps(make_common_res(3, '查询路口实时数据失败'))
for crossid in cross_info.keys():
if crossid in cross_rt_data.keys():
cross_info[crossid]['jam_index'] = cross_rt_data[crossid]['jam_index']
cross_info[crossid]['unbalance_index'] = cross_rt_data[crossid]['unbalance_index']
cross_info[crossid]['flow'] = cross_rt_data[crossid]['flow']
cross_info[crossid]['queue_len'] = cross_rt_data[crossid]['queue_len']
cross_info[crossid]['stop_times'] = cross_rt_data[crossid]['stop_times']
paginated_data = list(cross_info.values())[start_index:end_index]
res['total_pages'] = (len(list(cross_info.values())) + page_size - 1) // page_size
res['total_num'] = len(list(cross_info.values()))
res['data'] = paginated_data
return json.dumps(res, ensure_ascii=False)
def get_cross_info(cross_ids: []):
cross_info, error = db_tmnet.query_cross_info_all(cross_ids)
if error:
return None, error
result = {}
for item_cross_info in cross_info:
result[item_cross_info['crossid']] = {
'crossid': item_cross_info['crossid'],
'name': item_cross_info['name'],
'location': item_cross_info['location'],
'nodeid': item_cross_info['nodeid'],
'area_id': item_cross_info['area_id']
}
return result, None
def get_road_info(road_ids: []):
roads_info, error = db_tmnet.query_road_info_all(road_ids)
if error:
return None, error
result = {}
for item_roads_info in roads_info:
result[roads_info['roadid']] = {
'name': item_roads_info['name'],
'from_crossid': item_roads_info['from_crossid'],
'to_crossid': item_roads_info['to_crossid'],
'src_direct': item_roads_info['src_direct'],
'nodeid': item_roads_info['nodeid']
}
return result, None
def read_cross_deday(nodeid, area_id, cross_id_list):
tp_info = [
"00:00-07:00",
"07:00-09:00",
"09:00-17:00",
"17:00-19:00",
"19:00-22:00",
"22:00-23:59"
]
tp_desc = db_tmnet.query_city_tp_info(nodeid, area_id)
if tp_desc:
tp_info = tp_desc[0]['tp_desc'].split(',')
now_time = datetime.now().strftime('%H:%M')
start, end = '', ''
for index, item_tp_info in enumerate(tp_info):
start, end = map(str, item_tp_info.split('-'))
if now_time >= start and end >= now_time:
start, end = map(str, tp_info[index - 1].split('-'))
break
cross_data_list, error = db_workstation.query_cross_delay_info(nodeid, f"t{int(start.replace(':', ''))}",
cross_id_list)
result = {}
if error:
return result, error
for item_cross_data_list in cross_data_list:
item_delay_pb = pb.xl_cross_delayinfo_t()
item_delay_pb.ParseFromString(item_cross_data_list['data'])
result[item_delay_pb.crossid] = {
'crossid': item_delay_pb.crossid,
'jam_index': item_delay_pb.delay_info.jam_index,
'unbalance_index': item_delay_pb.delay_info.imbalance_index,
'queue_len': item_delay_pb.delay_info.queue_len,
'stop_times': item_delay_pb.delay_info.stop_times,
'delay_time': item_delay_pb.delay_info.delay_time,
}
return result, None
def delete_favorite_list(params, token):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
ids = check_param(params, 'ids')
if not ids or len(ids) < 1:
return json.dumps(make_common_res(2, '缺少需要删除的收藏项'))
if not token:
return json.dumps(make_common_res(3, '用户信息异常,请刷新一下后重试'))
ids_str = ", ".join([f"'{fid}'" for fid in ids])
token_params = {'token': token}
token_res = json.loads(do_get_user_info(token_params, token))
user_id = token_res['token']['userno']
user_name = token_res['token']['user_name']
ret = db_workstation.delete_favorite_ids(nodeid, ids_str, user_id, user_name, area_id)
if ret == len(ids):
return json.dumps(make_common_res(0, '操作成功'))
def get_workstation_task_list(params, token):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
if not token:
return json.dumps(make_common_res(2, '用户信息异常,请刷新重试'))
token_params = {'token': token}
token_res = json.loads(do_get_user_info(token_params, token))
user_id = token_res['token']['userno']
area_list = db_user.query_areaid_list(user_id)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
user_name = token_res['token']['user_name']
pending_task, timeout_task, approaching_overdue_task, done_task = [], [], [], []
last_month_should_done, last_month_done, last_month_should_done_but_not, need_todo = 0, [], 0, 0
last_week_should_done, last_week_done, last_week_should_done_but_not, week_need_todo = 0, [], 0, 0
row_list = db_workstation.query_task_list(nodeid, user_name, area_id)
current_date = datetime.now(timezone.utc).date()
# current_date = datetime.strptime('2024-07-30', "%Y-%m-%d")
yesterday_obj = current_date - timedelta(days=1)
yesterday_date = yesterday_obj.strftime('%Y-%m-%d')
first_day_of_this_month = current_date.replace(day=1)
last_day_of_last_month_obj = first_day_of_this_month - timedelta(days=1)
last_month = last_day_of_last_month_obj.strftime('%Y-%m')
first_day_of_this_week = current_date - timedelta(days=current_date.weekday())
last_day_of_last_week_obj = first_day_of_this_week - timedelta(days=1)
formatted_last_week = f'{last_day_of_last_week_obj.year}年第{last_day_of_last_week_obj.isocalendar()[1]}'
for row in row_list:
task_name = row['task_name']
task_type = int(row['task_type'])
plan_begin_time = int(row['plan_begin_time'])
plan_end_time = int(row['plan_end_time'])
task_state = int(row['task_state'])
task_end_time = None
if row['task_end_time']:
task_end_time = int(row['task_end_time'])
if task_state == 4:
done_task.append({
'task_name': task_name,
'task_type': task_type,
'plan_begin_time': plan_begin_time,
'plan_end_time': plan_end_time
})
elif task_state != 0 and task_state != 4:
pending_task.append({
'task_name': task_name,
'task_type': task_type,
'plan_begin_time': plan_begin_time,
'plan_end_time': plan_end_time
})
plan_endtime_date = datetime.fromtimestamp(plan_end_time // 1000, tz=timezone.utc).date()
if plan_endtime_date < current_date and task_state != 0 and task_state != 4:
timeout_task.append({
'task_name': task_name,
'task_type': task_type,
'plan_begin_time': plan_begin_time,
'plan_end_time': plan_end_time
})
elif current_date - plan_endtime_date <= timedelta(days=3) and task_state != 0 and task_state != 4:
approaching_overdue_task.append({
'task_name': task_name,
'task_type': task_type,
'plan_begin_time': plan_begin_time,
'plan_end_time': plan_end_time
})
# task_statistics
# month & week
if task_end_time:
end_time = datetime.fromtimestamp(task_end_time // 1000, tz=timezone.utc).date()
end_time_year = end_time.year
end_time_month = end_time.month
end_time_week = end_time.isocalendar()[1]
if task_state == 4 and f'{end_time_year}-{end_time_month}' == last_month:
last_month_done.append(task_type)
if plan_endtime_date < first_day_of_this_month:
last_month_should_done += 1
if task_state == 4 and f'{end_time_year}年第{end_time_week}' == formatted_last_week:
last_week_done.append(task_type)
if plan_endtime_date.year == end_time_year and plan_endtime_date.isocalendar()[1] == end_time_week:
last_week_should_done += 1
else:
if task_state in [1, 2, 3] and plan_endtime_date < first_day_of_this_month:
last_month_should_done += 1
last_month_should_done_but_not += 1
if task_state in [1, 2, 3] and plan_endtime_date < first_day_of_this_week:
last_week_should_done += 1
last_week_should_done_but_not += 1
if task_state in [1, 2, 3] and plan_endtime_date > last_day_of_last_month_obj:
need_todo += 1
if task_state in [1, 2, 3] and plan_endtime_date >= first_day_of_this_week:
week_need_todo += 1
yesterday_res = db_workstation.query_yesterday_task_data(yesterday_date, nodeid, user_name, area_id)
task_statistics = gen_task_statistics_res(last_month_should_done, last_month_done, last_month_should_done_but_not,
need_todo,
last_week_should_done, last_week_done, last_week_should_done_but_not,
week_need_todo,
yesterday_res, last_day_of_last_month_obj, formatted_last_week,
yesterday_date)
pending_res = cal_task_type_num(pending_task)
timeout_res = cal_task_type_num(timeout_task)
approaching_overdue_res = cal_task_type_num(approaching_overdue_task)
done_res = cal_task_type_num(done_task)
res = make_common_res(0, 'ok')
data = {
'join_task': {
'pending_res': pending_res,
'timeout_res': timeout_res,
'approaching_overdue_res': approaching_overdue_res,
'done_res': done_res
},
'task_statistics': task_statistics
}
res['data'] = data
return json.dumps(res, ensure_ascii=False)
# 路口优化页面 获取路口列表 接口1
def get_crosses_list(params):
nodeid = check_nodeid(params)
if not nodeid:
return json.dumps(make_common_res(1, 'nodeid异常'))
cross_list = []
for crossid in g_info_dict.get(g.nodeid)['g_crossroad_idset']:
crossName = g_info_dict.get(g.nodeid)['g_roadnet'].query_cross(crossid).name
cross_list.append({'crossid': crossid, 'crossName': crossName})
cross_useable_date = situation_db_pool.query_situation_usable_date()['cross']
data = {
'cross_list': cross_list,
'useable_date': cross_useable_date
}
res = make_common_res(0, 'ok')
res['data'] = data
return json.dumps(res, ensure_ascii=False)
# 路口优化页面 获取指定路口指定日期的配时方案与异常情况列表 接口2
def get_cross_phase_symptom_info(params):
tp_time_range_dict = {'早高峰': '07:00-09:00', '晚高峰': '17:00-19:00',
'平峰': '06:00-07:00,09:00-17:00,19:00-22:00'}
tp_dict = {'早高峰': 100, '晚高峰': 200, '平峰': 400}
symptom_dict = {
'对向失衡': 'opposite_tide',
'路口溢流': 'inroad_overflow',
'路口失衡': 'intersection_imbalance',
'转向失衡': 'straight_left_imbalance',
'左转低效': 'inefficient_left',
'路口死锁': 'cross_deadlock'
}
nodeid = check_nodeid(params)
if not nodeid:
return json.dumps(make_common_res(1, 'nodeid异常'))
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(2, '缺少路口信息,请选择路口'))
date = check_param(params, 'date')
if not date:
return json.dumps(make_common_res(3, '缺少查询日期'))
date_type = check_param(params, 'date_type')
weekday = datetime.strptime(date, '%Y%m%d').weekday()
jj_crossid = db_mapping.query_jjcrossid_by_xlcrossid(nodeid, crossid)
if not jj_crossid:
return json.dumps(make_common_res(4, '路口不存在'))
row_list = situation_db_pool.query_phase_data(jj_crossid, weekday + 1, crossid)
symptom_list = situation_db_pool.query_symptom_info_by_crossid(nodeid, crossid, date, date_type=date_type)
res_list = []
for item in row_list.keys():
plan_time_range = row_list[item]['tp_start'] + '-' + row_list[item]['tp_end']
row_list[item]['symptom_type_list'] = []
row_list[item]['symptom_detail_list'] = []
row_list[item]['todo_detail_list'] = []
row_list[item]['tp_desc_list'] = []
row_list[item]['desc_key_list'] = []
for symptom in symptom_list:
symptom_detail = json.loads(str(symptom['symptom_detail']).replace("'", "\""))
todo_detail = json.loads(str(symptom['todo_detail']).replace("'", "\""))
if symptom['tp'] != '平峰':
symptom_time_range = tp_time_range_dict.get(symptom['tp'])
if has_overlap(plan_time_range, symptom_time_range):
tp_num = tp_dict.get(symptom['tp'])
symptom_desc = symptom_dict.get(symptom['symptom_type'])
desc_key = situation_db_pool.query_symptom_desckey(crossid, tp_num, symptom_desc,
date_type=date_type)
row_list[item]['symptom_type_list'].append({
'symptom_type': symptom['symptom_type'],
'symptom_type_eng': symptom_desc
})
row_list[item]['symptom_detail_list'].append(symptom_detail)
row_list[item]['tp_desc_list'].append(symptom['tp'])
row_list[item]['desc_key_list'].append(desc_key)
row_list[item]['todo_detail_list'].append(todo_detail)
else:
symptom_time_range_list = tp_time_range_dict.get(symptom['tp']).split(',')
for symptom_time_range in symptom_time_range_list:
if has_overlap(plan_time_range, symptom_time_range):
tp_num = tp_dict.get(symptom['tp'])
symptom_desc = symptom_dict.get(symptom['symptom_type'])
desc_key = situation_db_pool.query_symptom_desckey(crossid, tp_num, symptom_desc,
date_type=date_type)
row_list[item]['symptom_type_list'].append({
'symptom_type': symptom['symptom_type'],
'symptom_type_eng': symptom_desc
})
row_list[item]['symptom_detail_list'].append(symptom_detail)
row_list[item]['tp_desc_list'].append(symptom['tp'])
row_list[item]['desc_key_list'].append(desc_key)
row_list[item]['todo_detail_list'].append(todo_detail)
continue
res_list.append(row_list[item])
res = make_common_res(0, 'ok')
res['data'] = {
'nodeid': nodeid,
'date': date,
'crossid': crossid,
'res_list': res_list
}
return json.dumps(res, ensure_ascii=False)
def parse_time(time_str):
"""将时间字符串转换为datetime.time对象"""
return datetime.strptime(time_str, '%H:%M').time()
def has_overlap(time_range1, time_range2):
"""判断两个时间段是否有交集"""
# 解析时间段
start1, end1 = map(parse_time, time_range1.split('-'))
start2, end2 = map(parse_time, time_range2.split('-'))
# 检查交集条件
return start1 < end2 and start2 < end1
# 路口优化页面 获取路口已存在的优化任务信息 接口5
def get_cross_optimize_plan_detail(params):
nodeid = check_nodeid(params)
if not nodeid:
return json.dumps(make_common_res(1, 'nodeid异常'))
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(2, '缺少路口id信息'))
planid = check_param(params, 'planid')
if not planid:
return json.dumps(make_common_res(3, '缺少方案id信息'))
tp_start = check_param(params, 'tp_start')
if not tp_start:
return json.dumps(make_common_res(4, '缺少时段开始时刻'))
tp_end = check_param(params, 'tp_end')
if not tp_end:
return json.dumps(make_common_res(5, '缺少时段结束时刻'))
taskId = check_param(params, 'taskid')
if not taskId:
return json.dumps(make_common_res(6, '缺少任务id信息'))
crossName = g_info_dict.get(g.nodeid)['g_roadnet'].query_cross(crossid).name
init_plan_res, target_plan_res, his_plan_list, edit_plan_res, delay_matrix_res, delay_matrix_now_res = None, None, {}, None, None, None
init_weekday = ''
init_plan_str, delay_input_str = situation_db_pool.query_init_info(nodeid, taskId)
if init_plan_str:
init_plan_res = Plan.parse_from_json(init_plan_str).to_dict()
if delay_input_str:
delay_matrix_res = DelayMatrix.parse_from_json(delay_input_str).to_dict()
init_weekday = datetime.strptime(delay_matrix_res['date'], '%Y%m%d').weekday()
query_type = check_param(params, 'type')
if not query_type or query_type == 'all':
target_plan_str, edit_plan_str, his_plan_dict = situation_db_pool.query_optimize_records(nodeid, taskId)
if target_plan_str:
target_plan_res = Plan.parse_from_json(target_plan_str).to_dict()
if edit_plan_str:
edit_plan_res = Plan.parse_from_json(edit_plan_str).to_dict()
for seq in his_plan_dict.keys():
item = Plan.parse_from_json(his_plan_dict[seq]).to_dict()
timestamp = str(datetime.fromtimestamp(item['timestamp']).__format__('%Y%m%d %H:%M'))
his_plan_list[timestamp] = item
if init_weekday:
max_date = get_max_usable_date(init_weekday)
dm_now = get_delay_matrix(str(max_date), tp_start, tp_end, crossid)
delay_matrix_now_res = dm_now.to_dict()
# i_list = calc_dm_delay_change_detail(delay_matrix_res['delay_matrix'], delay_matrix_now_res['delay_matrix'])
# delay_matrix_now_res['change_flag'] = i_list
res = make_common_res(0, 'ok')
res['data'] = {
'nodeid': nodeid,
'crossid': crossid,
'crossName': crossName,
'planid': planid,
'tp_start': tp_start,
'tp_end': tp_end,
'optimize_taskid': taskId,
'init_plan': init_plan_res, # 初始方案
'target_plan': target_plan_res, # 推荐方案
'his_plan': his_plan_list, # 历史方案
'edit_plan': edit_plan_res, # 编辑方案
'delay_matrix': delay_matrix_res, # 初始关键指标延误矩阵
'delay_matrix_now': delay_matrix_now_res, # 当前最新的关键指标延误矩阵
}
return json.dumps(res, ensure_ascii=False)
def get_max_usable_date(init_weekday):
cross_useable_date = situation_db_pool.query_situation_usable_date()['cross']
max_date = 00000000
for date in cross_useable_date['hisday']:
tmp_weekday = datetime.strptime(str(date), '%Y%m%d').weekday()
if tmp_weekday == init_weekday and int(date) > int(max_date):
max_date = date
return max_date
def get_delay_matrix(date_str, tp_start, tp_end, crossid):
roadnet = g_info_dict.get(g.nodeid)['g_roadnet']
crossform = CrossForm.extract_cross_form(roadnet, crossid)
start_time = str(date_str) + ' ' + str(tp_start)
end_time = str(date_str) + ' ' + str(tp_end)
start_timestamp = datetime.strptime(start_time, '%Y%m%d %H:%M').timestamp()
end_timestamp = datetime.strptime(end_time, '%Y%m%d %H:%M').timestamp()
flow_delay_list = db.query_delay_matrix(start_timestamp, end_timestamp, crossid)
for item in flow_delay_list:
inroadid = item['inroadid']
dirname = crossform.inroad_dirname_map[inroadid]
item['dirname'] = dirname
dm = DelayMatrix()
dirname_list = list(crossform.dirname_turns_map.keys())
dm.set_inroad_names(dirname_list)
dm.add_flow_delay_data(flow_delay_list)
dm.disable_all_badturns(crossform.dirname_turns_map)
dm.date = date_str
dm.tp_start = tp_start
dm.tp_end = tp_end
return dm
# 路口优化页面 获取路口方案与延误矩阵信息 接口3
def get_cross_init_info(params):
nodeid = check_nodeid(params)
if not nodeid:
return json.dumps(make_common_res(1, 'nodeid异常'))
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(2, '缺少路口id信息'))
date = check_param(params, 'date')
if not date:
return json.dumps(make_common_res(3, '缺少日期信息'))
planid = check_param(params, 'planid')
if not planid:
return json.dumps(make_common_res(4, '缺少方案id信息'))
tp_start = check_param(params, 'tp_start')
if not tp_start:
return json.dumps(make_common_res(5, '缺少时段开始时刻'))
tp_end = check_param(params, 'tp_end')
if not tp_end:
return json.dumps(make_common_res(6, '缺少时段结束时刻'))
delay_matrix = get_delay_matrix(str(date), tp_start, tp_end, crossid)
plan: Plan = get_cross_init_plan(nodeid, crossid, planid)
symptom_list = check_param(params, 'symptom_list')
if not symptom_list or len(symptom_list) == 0:
logging.info('异常类型为空')
else:
s_list = []
for item in symptom_list:
info = {}
info['daytype'] = 'hisday'
info['crossid'] = crossid
info['day'] = date
info['tp'] = 0
info['duration'] = 0
info['desc_key'] = item['desc_key']
info['type'] = item['symptom_type_eng']
s = SymptomRecord.make_from_dict(info)
s_list.append(s)
OptimizePlanTool.judge_plan_adjust_type(plan, s_list)
res = make_common_res(0, 'ok')
res['data'] = {
'nodeid': nodeid,
'crossid': crossid,
'planid': planid,
'tp_start': tp_start,
'tp_end': tp_end,
'date': date,
'init_plan': plan.to_dict(),
'delay_matrix': delay_matrix.to_dict()
}
return json.dumps(res, ensure_ascii=False)
def get_cross_init_plan(nodeid, crossid, planid):
crossid_mapping = g_info_dict.get(nodeid)['g_jj_cross']
jj_crossid = crossid_mapping[crossid]
phasetable_data = db_phasetable.query_phasetable(nodeid, jj_crossid)
plan_data = phasetable_data['plans'][int(planid)]
phase_data: dict = phasetable_data['phases']
plan = Plan()
plan.init_from_dict(plan_data)
for info in plan_data['stages']:
stage = PhaseStage()
stage.init_from_dict(info)
plan.add_stage(stage)
for phase_info in phase_data.values():
phase = PhaseInfo()
phase.init_from_dict(phase_info)
plan.add_phase(phase)
plan.update_stages_max_min_green()
return plan
# 路口优化页面 生成方案,创建路口配时方案优化方案 接口4
def gen_cross_optimize(params):
tp_dict = {'早高峰': 100, '晚高峰': 200, '平峰': 400}
nodeid = check_nodeid(params)
if not nodeid:
return json.dumps(make_common_res(1, 'nodeid异常'))
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(2, '缺少路口id信息'))
planid = check_param(params, 'planid')
if not planid:
return json.dumps(make_common_res(3, '缺少方案id信息'))
tp_start = check_param(params, 'tp_start')
if not tp_start:
return json.dumps(make_common_res(4, '缺少时段开始时刻'))
tp_end = check_param(params, 'tp_end')
if not tp_end:
return json.dumps(make_common_res(5, '缺少时段结束时刻'))
init_plan_str = check_param(params, 'init_plan')
if not init_plan_str:
return json.dumps(make_common_res(6, '缺少初始方案信息'))
init_plan = Plan.parse_from_json(init_plan_str)
delay_matrix_str = check_param(params, 'delay_matrix')
if not delay_matrix_str:
return json.dumps(make_common_res(7, '缺少初始延误数据矩阵信息'))
date = check_param(params, 'date')
if not date:
return json.dumps(make_common_res(8, '缺少日期信息'))
delay_matrix = DelayMatrix.parse_from_json(delay_matrix_str)
roadnet = g_info_dict.get(g.nodeid)['g_roadnet']
crossName = roadnet.query_cross(crossid).name
cross_form = CrossForm.extract_cross_form(roadnet, crossid)
symptom_list = check_param(params, 'symptom_list')
init_plan_id = init_plan.planid
init_plan_name = init_plan.name
if not symptom_list or len(symptom_list) == 0:
logging.info('异常类型为空')
target_plan, diff_list = OptimizePlanTool.do_plan_optimize(cross_form, init_plan, delay_matrix)
else:
symptom_list = json.loads(symptom_list)
s_list = []
for item in symptom_list:
item['daytype'] = 'hisday'
item['crossid'] = crossid
item['day'] = date
item['tp'] = tp_dict[item['tp_desc']]
item['duration'] = 0
s = SymptomRecord.make_from_dict(item)
s_list.append(s)
target_plan, diff_list = OptimizePlanTool.do_plan_optimize(cross_form, init_plan, delay_matrix, s_list)
if not target_plan:
return json.dumps(make_common_res(10, '未能生成方案'))
now_timestamp = datetime.now().timestamp()
init_plan.timestamp = now_timestamp
target_plan.timestamp = now_timestamp
optimize_taskid = gen_optimize_taskid(nodeid, crossid, planid, tp_start, tp_end)
sum_success_ret = 0
ret = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, planid, tp_start, tp_end, 'init_plan',
init_plan_str, nodeid, 0, init_plan_id, init_plan_name)
if ret:
sum_success_ret += 1
ret = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, planid, tp_start, tp_end, 'delay_matrix',
delay_matrix_str, nodeid, 0, init_plan_id, init_plan_name)
if ret:
sum_success_ret += 1
ret = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, planid, tp_start, tp_end, 'target_plan',
json.dumps(target_plan.to_dict(), ensure_ascii=False), nodeid, 0,
init_plan_id, init_plan_name)
if ret:
sum_success_ret += 1
if sum_success_ret == 3:
res = make_common_res(0, 'ok')
res['data'] = {
'nodeid': nodeid,
'crossid': crossid,
'planid': planid,
'tp_start': tp_start,
'tp_end': tp_end,
'crossName': crossName,
'optimize_taskid': optimize_taskid,
'init_plan': init_plan.to_dict(),
'target_plan': target_plan.to_dict(),
'his_plans': [],
'edit_plan': None,
'delay_matrix': delay_matrix.to_dict(),
'delay_matrix_now': None
}
return json.dumps(res, ensure_ascii=False)
else:
return json.dumps(make_common_res(11, '保存方案优化信息失败'))
def gen_optimize_taskid(nodeid, crossid, planid, tp_start, tp_end):
current_timestamp = time.time()
optimize_taskid = md5_hash(f"""{nodeid}-{crossid}-{planid}-{tp_start}-{tp_end}-{current_timestamp}""")
return optimize_taskid
def save_edit_plan(params):
nodeid = check_nodeid(params)
if not nodeid:
return json.dumps(make_common_res(1, 'nodeid异常'))
optimize_taskid = check_param(params, 'taskid')
if not optimize_taskid:
return json.dumps(make_common_res(2, '缺少优化任务id'))
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(3, '缺少路口id信息'))
planid = check_param(params, 'planid')
if not planid:
return json.dumps(make_common_res(4, '缺少方案id信息'))
tp_start = check_param(params, 'tp_start')
if not tp_start:
return json.dumps(make_common_res(5, '缺少时段开始时刻'))
tp_end = check_param(params, 'tp_end')
if not tp_end:
return json.dumps(make_common_res(6, '缺少时段结束时刻'))
edit_plan = check_param(params, 'edit_plan')
if not edit_plan:
return json.dumps(make_common_res(7, '缺少编辑方案信息'))
init_plan_id = check_param(params, 'init_planid')
if not init_plan_id:
return json.dumps(make_common_res(8, '缺少初始方案id'))
init_plan_name = check_param(params, 'init_planname')
if not init_plan_name:
return json.dumps(make_common_res(9, '缺少初始方案名称'))
new_planid = Plan.parse_from_json(edit_plan).planid
# 表示当前的操作是更新操作
modify_params = gen_modify_params(planid, crossid, nodeid, edit_plan)
if new_planid == planid:
modify_res = json.loads(modify_plan_stage(modify_params))
else:
# 当前的方案号不存在 当前的操作是创建新的方案
modify_res = json.loads(add_plan_stage(modify_params))
flag, status = situation_db_pool.check_exists_edit_plan(optimize_taskid)
tmp_plan = Plan.parse_from_json(edit_plan)
tmp_plan.timestamp = datetime.now().timestamp()
edit_plan = tmp_plan.to_dict()
if flag:
res = situation_db_pool.update_optimize_edit_plan(optimize_taskid, edit_plan, new_planid)
else:
res = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, planid, tp_start, tp_end, 'edit_plan',
json.dumps(edit_plan, ensure_ascii=False), nodeid, status,
init_plan_id, init_plan_name)
if res and modify_res['status'] == 0:
return json.dumps(make_common_res(0, '操作成功'))
else:
return json.dumps(modify_res)
def issued_edit_plan(params):
nodeid = check_nodeid(params)
if not nodeid:
return json.dumps(make_common_res(1, 'nodeid异常'))
optimize_taskid = check_param(params, 'taskid')
if not optimize_taskid:
return json.dumps(make_common_res(2, '缺少优化任务id'))
issued_plan = check_param(params, 'issued_plan')
if not issued_plan:
return json.dumps(make_common_res(3, '缺少下发方案信息'))
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(4, '缺少路口id信息'))
planid = check_param(params, 'planid')
if not planid:
return json.dumps(make_common_res(5, '缺少方案id信息'))
tp_start = check_param(params, 'tp_start')
if not tp_start:
return json.dumps(make_common_res(6, '缺少时段开始时刻'))
tp_end = check_param(params, 'tp_end')
if not tp_end:
return json.dumps(make_common_res(7, '缺少时段结束时刻'))
init_plan_id = check_param(params, 'init_planid')
if not init_plan_id:
return json.dumps(make_common_res(8, '缺少初始方案id'))
init_plan_name = check_param(params, 'init_planname')
if not init_plan_name:
return json.dumps(make_common_res(9, '缺少初始方案名称'))
# 判定当前id是否存在his_plan如果存在则取出tag字段 截取出当前最大的iter_plan_x中x值然后+1否则tag='iter_plan_1'
plan_seq = 0
his_plan_list = situation_db_pool.query_optimize_his_plans(optimize_taskid)
new_planid = Plan.parse_from_json(issued_plan).planid
modify_params = gen_modify_params(planid, crossid, nodeid, issued_plan)
if planid == new_planid:
modify_res = json.loads(modify_plan_stage(modify_params))
else:
# 当前的方案号不存在 当前的操作是创建新的方案
modify_res = json.loads(add_plan_stage(modify_params))
for his_plan in his_plan_list:
item_seq = int(str(his_plan['tag']).split('iter_plan_')[1])
if item_seq > plan_seq:
plan_seq = item_seq
tmp_plan = Plan.parse_from_json(issued_plan)
tmp_plan.timestamp = datetime.now().timestamp()
issued_plan = tmp_plan.to_dict()
# 当his_plan不存在时说明是第一次下发还需要将该taskid的状态字段status更新为迭代中 也就是 1
if plan_seq == 0:
ret = situation_db_pool.update_optimize_records_status(optimize_taskid, 1)
if not ret:
return json.dumps(make_common_res(10, '更新状态失败'))
flag, status = situation_db_pool.check_exists_edit_plan(optimize_taskid)
if flag:
ret = situation_db_pool.update_optimize_edit_plan(optimize_taskid, issued_plan, new_planid)
else:
ret = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, new_planid, tp_start, tp_end,
'edit_plan',
json.dumps(issued_plan, ensure_ascii=False), nodeid, status,
init_plan_id, init_plan_name)
if not ret:
return json.dumps(make_common_res(11, '保存方案失败'))
ret = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, new_planid, tp_start, tp_end,
f'iter_plan_{plan_seq + 1}',
json.dumps(issued_plan, ensure_ascii=False), nodeid, status,
init_plan_id, init_plan_name)
if not ret:
return json.dumps(make_common_res(12, '保存方案失败'))
else:
# 说明方案已经下发过 当前任务状态为1 迭代中需要将当前方案保存为his_plan中的一个 且将当前编辑的方案保存为编辑方案
ret1 = situation_db_pool.update_optimize_edit_plan(optimize_taskid, issued_plan, new_planid)
ret2 = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, new_planid, tp_start, tp_end,
f'iter_plan_{plan_seq + 1}',
json.dumps(issued_plan, ensure_ascii=False), nodeid, 1,
init_plan_id,
init_plan_name)
if not ret1 or not ret2:
return json.dumps(make_common_res(13, '保存方案失败'))
row_list = situation_db_pool.query_optimize_issued_plan(optimize_taskid)
edit_plan, his_plans = None, []
for row in row_list:
tag = row['tag']
if tag == 'edit_plan':
edit_plan = Plan.parse_from_json(row['content']).to_dict()
else:
his_plans.append(Plan.parse_from_json(row['content']).to_dict())
if modify_res['status'] == 0:
res = make_common_res(0, 'ok')
res['data'] = {
'edit_plan': edit_plan,
'his_plans': his_plans
}
return json.dumps(res, ensure_ascii=False)
else:
return json.dumps(modify_res)
def update_optimize_plan_status(params):
nodeid = check_nodeid(params)
if not nodeid:
return json.dumps(make_common_res(1, 'nodeid异常'))
optimize_taskid = check_param(params, 'optimize_taskid')
if not optimize_taskid:
return json.dumps(make_common_res(2, '缺少优化任务id'))
status = check_param(params, 'status')
if not status or status not in ('finish', 'abort'):
return json.dumps(make_common_res(3, '缺少状态信息'))
ret = situation_db_pool.update_optimize_plan_status_byid(nodeid, optimize_taskid, status)
if ret:
return json.dumps(make_common_res(0, 'ok'))
else:
return json.dumps(make_common_res(4, '更新失败'))
def gen_modify_params(planid, crossid, nodeid, edit_plan):
plan_dict = Plan.parse_from_json(edit_plan).to_dict()
new_plan_stages = []
plan_stages = plan_dict['stages']
plan_phases = plan_dict['phases']
plan_phase_dict = {}
for phase in plan_phases:
plan_phase_dict[phase['phaseid']] = phase
for stage in plan_stages:
phaseids = stage['phaseids'].split(',')
phaseid_names = plan_phase_dict[int(phaseids[0])]['name']
max_green, min_green = plan_phase_dict[int(phaseids[0])]['max_green'], plan_phase_dict[int(phaseids[0])][
'min_green']
if len(phaseids) > 1:
for item in phaseids[1:]:
phaseid_names += f",{plan_phase_dict[int(item)]['name']}"
new_plan_stages.append({
'stageid': stage['stageid'],
'stage_name': stage['name'],
'stage_duration': stage['duration'],
'green': stage['green'],
'redyellow': stage['redyellow'],
'yellow': stage['yellow'],
'allred': stage['allred'],
'phaseids': stage['phaseids'],
'phaseid_names': phaseid_names,
'max_green': max_green,
'min_green': min_green,
'phases': phaseid_names
})
modify_params = {
'planid': planid,
'plan_name': plan_dict['name'],
'offset': plan_dict['offset'],
'cycle': plan_dict['cycle'],
'crossid': crossid,
'stages': new_plan_stages,
'nodeid': nodeid
}
return modify_params
def calc_dm_delay_change_detail(dm, dm_now):
i_list = []
if dm is not None and dm_now is not None:
# 同时遍历两个三维数组
for i in range(len(dm)):
j_list = []
for j in range(len(dm[i])):
flag_list = []
for k in range(len(dm[i][j])):
# 比较相同位置的元素
value1 = dm[i][j][k] if dm[i][j][k] != '-' else 0
value2 = dm_now[i][j][k] if dm_now[i][j][k] != '-' else 0
if float(value2) > float(value1):
flag = 1
elif float(value2) < float(value1):
flag = -1
else:
flag = 0
flag_list.append(flag)
j_list.append(flag_list)
i_list.append(j_list)
return i_list
def download_optimize_plan(params):
nodeid = check_nodeid(params)
if not nodeid:
return json.dumps(make_common_res(1, 'nodeid异常'))
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(2, '缺少路口id'))
cross_info = g_info_dict.get(g.nodeid)['g_roadnet'].query_cross(crossid)
download_plan = check_param(params, 'download_plan')
if not download_plan:
return json.dumps(make_common_res(3, '缺少下载方案'))
download_plan_obj = Plan.parse_from_json(download_plan)
download_plan_dict = download_plan_obj.to_dict()
wb = Workbook()
sheet1 = wb.active
sheet1.title = "配时方案表"
sheet1.append(t_sheet_cols['配时方案表']['head'])
sheet2 = wb.create_sheet("阶段表")
sheet2.append(t_sheet_cols['阶段表']['head'])
stages = download_plan_dict['stages']
for index, stage in enumerate(stages):
num = index + 1
sheet1.append([num, download_plan_dict['planid'], download_plan_dict['name'], download_plan_dict['cycle'], num,
stage['stageid'], stage['duration']])
sheet2.append([num, stage['stageid'], stage['name'], stage['yellow'], stage['allred'], stage['phaseids']])
sheet3 = wb.create_sheet("相位表")
sheet3.append(t_sheet_cols['相位表']['head'])
phases = download_plan_dict['phases']
for index, phase in enumerate(phases):
num = index + 1
sheet3.append([num, phase['phaseid'], phase['name'], phase['min_green'], phase['max_green']])
excel_data = BytesIO()
wb.save(excel_data)
excel_data.seek(0)
return send_file(
excel_data,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=f"{cross_info.name}_{download_plan_obj.name}_配时方案.xlsx")
def get_road_net_detail(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
roadid = check_param(params, 'roadid')
if not roadid:
return json.dumps(make_common_res(3, '缺少roadid 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
road_info, error = get_road_info([roadid])
if error:
return json.dumps(make_common_res(2, f"{error}"))
return json.dumps(make_common_res(0, road_info[roadid]['src_direct']))