cross_doctor/app/workstation_worker.py

1088 lines
49 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
from datetime import timezone
from app.user_worker import do_get_user_info
from app.workstation_db_function import *
from app.work_station_common import *
from app.task_worker import *
from app.phasetable_worker import *
import proto.xlcomm_pb2 as pb
def favorite(params, token):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
if not token:
return json.dumps(make_common_res(2, '用户信息异常,请刷新一下后重试'))
token_params = {'token': token}
token_res = json.loads(do_get_user_info(token_params, token))
create_user_id = token_res['token']['userno']
create_user_name = token_res['token']['user_name']
favorite_type = int(check_param(params, 'favorite_type'))
if not favorite_type or favorite_type not in (1, 2):
return json.dumps(make_common_res(3, '缺少收藏类型'))
favorite_id = check_param(params, 'favorite_id')
if not favorite_id:
return json.dumps(make_common_res(4, '缺少收藏路口或干线id'))
favorite_name = check_param(params, 'favorite_name')
if not favorite_name:
return json.dumps(make_common_res(5, '缺少收藏路口或干线名称'))
row_list = db_workstation.check_favorite_info_exists(nodeid, create_user_id, create_user_name, favorite_id,
favorite_name, int(area_id))
if len(row_list) > 0:
return json.dumps(make_common_res(6, '当前收藏内容已存在,请勿重复收藏'))
ret = db_workstation.create_favorite_cross_artery(nodeid, create_user_id, create_user_name, favorite_type,
favorite_id, favorite_name, int(area_id))
if int(ret) == 1:
return json.dumps(make_common_res(0, '收藏成功'))
def get_favorite_list(params, token):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
if not token:
return json.dumps(make_common_res(2, '用户信息异常,请刷新一下后重试'))
token_params = {'token': token}
token_res = json.loads(do_get_user_info(token_params, token))
user_id = token_res['token']['userno']
user_name = token_res['token']['user_name']
row_list = db_workstation.query_favorite_info_list(nodeid, user_id, user_name, int(area_id))
cross_list, artery_list = [], []
for row in row_list:
favorite_type = row['favorite_type']
favorite_id = row['favorite_id']
if favorite_type == 1:
cross_list.append(favorite_id)
elif favorite_type == 2:
artery_list.append(favorite_id)
res = make_common_res(0, 'ok')
res['data'] = {
'cross_list': cross_list,
'artery_list': artery_list
}
return json.dumps(res, ensure_ascii=False)
def get_favorite_data_list(params, token):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
page = int(check_param(params, 'page'))
if not page:
return json.dumps(make_common_res(3, '缺少分页页码'))
page_size = int(check_param(params, 'page_size'))
if not page_size or page_size == 0:
return json.dumps(make_common_res(4, '缺少分页条数'))
query_type = check_param(params, 'query_type')
if not query_type or query_type not in ('cross', 'artery'):
return json.dumps(make_common_res(5, '查询类型缺失或查询类型异常'))
token_params = {'token': token}
token_res = json.loads(do_get_user_info(token_params, token))
user_id = token_res['token']['userno']
user_name = token_res['token']['user_name']
row_list = db_workstation.query_favorite_info_list(nodeid, user_id, user_name, int(area_id))
cross_list, artery_list, cross_id_list = [], [], []
for row in row_list:
favorite_type = row['favorite_type']
favorite_id = row['favorite_id']
favorite_name = row['favorite_name']
if favorite_type == 1:
cross_list.append({'id': favorite_id, 'name': favorite_name})
cross_id_list.append(favorite_id)
res = make_common_res(0, 'ok')
res['data'] = []
res['total_pages'] = 0
res['total_num'] = 0
if len(cross_id_list) == 0 or query_type != 'cross':
return json.dumps(res, ensure_ascii=False)
# 插入实时数据
cross_info_list, cross_rt_data_list = [], []
if len(cross_id_list) > 0:
print(cross_rt_data_list)
cross_rt_data_list, error = read_cross_deday(nodeid, area_id, cross_id_list)
if error:
return json.dumps(make_common_res(2, error))
if len(cross_rt_data_list) == 0:
return json.dumps(res, ensure_ascii=False)
for cross_rt_data in cross_rt_data_list:
cross_id = cross_rt_data['crossid']
if any(d['id'] == cross_id for d in cross_list):
cross_info_list.append(cross_rt_data)
start_index = (page - 1) * page_size
end_index = start_index + page_size
cross_info, error = get_cross_info(cross_id_list)
if error:
return json.dumps(make_common_res(2, error))
paginated_data = cross_info_list[start_index:end_index]
res['total_pages'] = (len(cross_info_list) + page_size - 1) // page_size
res['total_num'] = len(cross_info_list)
res['data'] = gen_work_station_cross_data_list(paginated_data, cross_info)
return json.dumps(res, ensure_ascii=False)
def get_cross_info(cross_ids: []):
cross_info, error = db_tmnet.query_cross_info_all(cross_ids)
if error:
return None, error
result = {}
for item_cross_info in cross_info:
result[item_cross_info['crossid']] = {
'name': item_cross_info['name'],
'location': item_cross_info['location'],
'nodeid': item_cross_info['nodeid'],
'area_id': item_cross_info['area_id']
}
return result, None
def get_road_info(road_ids: []):
roads_info, error = db_tmnet.query_road_info_all(road_ids)
if error:
return None, error
result = {}
for item_roads_info in roads_info:
result[roads_info['roadid']] = {
'name': item_roads_info['name'],
'from_crossid': item_roads_info['from_crossid'],
'to_crossid': item_roads_info['to_crossid'],
'src_direct': item_roads_info['src_direct'],
'nodeid': item_roads_info['nodeid']
}
return result, None
def read_cross_deday(nodeid, area_id, cross_id_list):
tp_info = [
"00:00-07:00",
"07:00-09:00",
"09:00-17:00",
"17:00-19:00",
"19:00-22:00",
"22:00-23:59"
]
tp_desc = db_tmnet.query_city_tp_info(nodeid, area_id)
if tp_desc:
tp_info = tp_desc[0]['tp_desc'].split(',')
now_time = datetime.now().strftime('%H:%M')
start, end = '', ''
for index, item_tp_info in enumerate(tp_info):
start, end = map(str, item_tp_info.split('-'))
if now_time >= start and end >= now_time:
start, end = map(str, tp_info[index - 1].split('-'))
break
cross_data_list, error = db_workstation.query_cross_delay_info(nodeid, f"t{int(start.replace(':', ''))}",
cross_id_list)
if error:
return [], error
if len(cross_data_list) <= 0:
return [], None
result = []
for item_cross_data_list in cross_data_list:
item_delay_pb = pb.xl_cross_delayinfo_t()
item_delay_pb.ParseFromString(item_cross_data_list['data'])
result.append({
'crossid': item_delay_pb.crossid,
'jam_index': item_delay_pb.delay_info.jam_index,
'unbalance_index': item_delay_pb.delay_info.imbalance_index,
'queue_len': item_delay_pb.delay_info.queue_len,
'stop_times': item_delay_pb.delay_info.stop_times,
'delay_time': item_delay_pb.delay_info.delay_time,
})
return result, None
def delete_favorite_list(params, token):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
ids = check_param(params, 'ids')
if not ids or len(ids) < 1:
return json.dumps(make_common_res(2, '缺少需要删除的收藏项'))
if not token:
return json.dumps(make_common_res(3, '用户信息异常,请刷新一下后重试'))
ids_str = ", ".join([f"'{fid}'" for fid in ids])
token_params = {'token': token}
token_res = json.loads(do_get_user_info(token_params, token))
user_id = token_res['token']['userno']
user_name = token_res['token']['user_name']
ret = db_workstation.delete_favorite_ids(nodeid, ids_str, user_id, user_name)
if ret == len(ids):
return json.dumps(make_common_res(0, '操作成功'))
def get_workstation_task_list(params, token):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
if not token:
return json.dumps(make_common_res(2, '用户信息异常,请刷新重试'))
token_params = {'token': token}
token_res = json.loads(do_get_user_info(token_params, token))
user_id = token_res['token']['userno']
area_list = db_user.query_areaid_list(user_id)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
user_name = token_res['token']['user_name']
pending_task, timeout_task, approaching_overdue_task, done_task = [], [], [], []
last_month_should_done, last_month_done, last_month_should_done_but_not, need_todo = 0, [], 0, 0
last_week_should_done, last_week_done, last_week_should_done_but_not, week_need_todo = 0, [], 0, 0
row_list = db_workstation.query_task_list(nodeid, user_name, area_id)
current_date = datetime.now(timezone.utc).date()
# current_date = datetime.strptime('2024-07-30', "%Y-%m-%d")
yesterday_obj = current_date - timedelta(days=1)
yesterday_date = yesterday_obj.strftime('%Y-%m-%d')
first_day_of_this_month = current_date.replace(day=1)
last_day_of_last_month_obj = first_day_of_this_month - timedelta(days=1)
last_month = last_day_of_last_month_obj.strftime('%Y-%m')
first_day_of_this_week = current_date - timedelta(days=current_date.weekday())
last_day_of_last_week_obj = first_day_of_this_week - timedelta(days=1)
formatted_last_week = f'{last_day_of_last_week_obj.year}年第{last_day_of_last_week_obj.isocalendar()[1]}'
for row in row_list:
task_name = row['task_name']
task_type = int(row['task_type'])
plan_begin_time = int(row['plan_begin_time'])
plan_end_time = int(row['plan_end_time'])
task_state = int(row['task_state'])
task_end_time = None
if row['task_end_time']:
task_end_time = int(row['task_end_time'])
if task_state == 4:
done_task.append({
'task_name': task_name,
'task_type': task_type,
'plan_begin_time': plan_begin_time,
'plan_end_time': plan_end_time
})
elif task_state != 0 and task_state != 4:
pending_task.append({
'task_name': task_name,
'task_type': task_type,
'plan_begin_time': plan_begin_time,
'plan_end_time': plan_end_time
})
plan_endtime_date = datetime.fromtimestamp(plan_end_time // 1000, tz=timezone.utc).date()
if plan_endtime_date < current_date and task_state != 0 and task_state != 4:
timeout_task.append({
'task_name': task_name,
'task_type': task_type,
'plan_begin_time': plan_begin_time,
'plan_end_time': plan_end_time
})
elif current_date - plan_endtime_date <= timedelta(days=3) and task_state != 0 and task_state != 4:
approaching_overdue_task.append({
'task_name': task_name,
'task_type': task_type,
'plan_begin_time': plan_begin_time,
'plan_end_time': plan_end_time
})
# task_statistics
# month & week
if task_end_time:
end_time = datetime.fromtimestamp(task_end_time // 1000, tz=timezone.utc).date()
end_time_year = end_time.year
end_time_month = end_time.month
end_time_week = end_time.isocalendar()[1]
if task_state == 4 and f'{end_time_year}-{end_time_month}' == last_month:
last_month_done.append(task_type)
if plan_endtime_date < first_day_of_this_month:
last_month_should_done += 1
if task_state == 4 and f'{end_time_year}年第{end_time_week}' == formatted_last_week:
last_week_done.append(task_type)
if plan_endtime_date.year == end_time_year and plan_endtime_date.isocalendar()[1] == end_time_week:
last_week_should_done += 1
else:
if task_state in [1, 2, 3] and plan_endtime_date < first_day_of_this_month:
last_month_should_done += 1
last_month_should_done_but_not += 1
if task_state in [1, 2, 3] and plan_endtime_date < first_day_of_this_week:
last_week_should_done += 1
last_week_should_done_but_not += 1
if task_state in [1, 2, 3] and plan_endtime_date > last_day_of_last_month_obj:
need_todo += 1
if task_state in [1, 2, 3] and plan_endtime_date >= first_day_of_this_week:
week_need_todo += 1
yesterday_res = db_workstation.query_yesterday_task_data(yesterday_date, nodeid, user_name, area_id)
task_statistics = gen_task_statistics_res(last_month_should_done, last_month_done, last_month_should_done_but_not,
need_todo,
last_week_should_done, last_week_done, last_week_should_done_but_not,
week_need_todo,
yesterday_res, last_day_of_last_month_obj, formatted_last_week,
yesterday_date)
pending_res = cal_task_type_num(pending_task)
timeout_res = cal_task_type_num(timeout_task)
approaching_overdue_res = cal_task_type_num(approaching_overdue_task)
done_res = cal_task_type_num(done_task)
res = make_common_res(0, 'ok')
data = {
'join_task': {
'pending_res': pending_res,
'timeout_res': timeout_res,
'approaching_overdue_res': approaching_overdue_res,
'done_res': done_res
},
'task_statistics': task_statistics
}
res['data'] = data
return json.dumps(res, ensure_ascii=False)
# 路口优化页面 获取路口列表 接口1
def get_crosses_list(params):
nodeid = check_nodeid(params)
if not nodeid:
return json.dumps(make_common_res(1, 'nodeid异常'))
cross_list = []
for crossid in g_info_dict.get(g.nodeid)['g_crossroad_idset']:
crossName = g_info_dict.get(g.nodeid)['g_roadnet'].query_cross(crossid).name
cross_list.append({'crossid': crossid, 'crossName': crossName})
cross_useable_date = situation_db_pool.query_situation_usable_date()['cross']
data = {
'cross_list': cross_list,
'useable_date': cross_useable_date
}
res = make_common_res(0, 'ok')
res['data'] = data
return json.dumps(res, ensure_ascii=False)
# 路口优化页面 获取指定路口指定日期的配时方案与异常情况列表 接口2
def get_cross_phase_symptom_info(params):
tp_time_range_dict = {'早高峰': '07:00-09:00', '晚高峰': '17:00-19:00',
'平峰': '06:00-07:00,09:00-17:00,19:00-22:00'}
tp_dict = {'早高峰': 100, '晚高峰': 200, '平峰': 400}
symptom_dict = {
'对向失衡': 'opposite_tide',
'路口溢流': 'inroad_overflow',
'路口失衡': 'intersection_imbalance',
'转向失衡': 'straight_left_imbalance',
'左转低效': 'inefficient_left',
'路口死锁': 'cross_deadlock'
}
nodeid = check_nodeid(params)
if not nodeid:
return json.dumps(make_common_res(1, 'nodeid异常'))
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(2, '缺少路口信息,请选择路口'))
date = check_param(params, 'date')
if not date:
return json.dumps(make_common_res(3, '缺少查询日期'))
date_type = check_param(params, 'date_type')
weekday = datetime.strptime(date, '%Y%m%d').weekday()
jj_crossid = db_mapping.query_jjcrossid_by_xlcrossid(nodeid, crossid)
if not jj_crossid:
return json.dumps(make_common_res(4, '路口不存在'))
row_list = situation_db_pool.query_phase_data(jj_crossid, weekday + 1, crossid)
symptom_list = situation_db_pool.query_symptom_info_by_crossid(nodeid, crossid, date, date_type=date_type)
res_list = []
for item in row_list.keys():
plan_time_range = row_list[item]['tp_start'] + '-' + row_list[item]['tp_end']
row_list[item]['symptom_type_list'] = []
row_list[item]['symptom_detail_list'] = []
row_list[item]['todo_detail_list'] = []
row_list[item]['tp_desc_list'] = []
row_list[item]['desc_key_list'] = []
for symptom in symptom_list:
symptom_detail = json.loads(str(symptom['symptom_detail']).replace("'", "\""))
todo_detail = json.loads(str(symptom['todo_detail']).replace("'", "\""))
if symptom['tp'] != '平峰':
symptom_time_range = tp_time_range_dict.get(symptom['tp'])
if has_overlap(plan_time_range, symptom_time_range):
tp_num = tp_dict.get(symptom['tp'])
symptom_desc = symptom_dict.get(symptom['symptom_type'])
desc_key = situation_db_pool.query_symptom_desckey(crossid, tp_num, symptom_desc,
date_type=date_type)
row_list[item]['symptom_type_list'].append({
'symptom_type': symptom['symptom_type'],
'symptom_type_eng': symptom_desc
})
row_list[item]['symptom_detail_list'].append(symptom_detail)
row_list[item]['tp_desc_list'].append(symptom['tp'])
row_list[item]['desc_key_list'].append(desc_key)
row_list[item]['todo_detail_list'].append(todo_detail)
else:
symptom_time_range_list = tp_time_range_dict.get(symptom['tp']).split(',')
for symptom_time_range in symptom_time_range_list:
if has_overlap(plan_time_range, symptom_time_range):
tp_num = tp_dict.get(symptom['tp'])
symptom_desc = symptom_dict.get(symptom['symptom_type'])
desc_key = situation_db_pool.query_symptom_desckey(crossid, tp_num, symptom_desc,
date_type=date_type)
row_list[item]['symptom_type_list'].append({
'symptom_type': symptom['symptom_type'],
'symptom_type_eng': symptom_desc
})
row_list[item]['symptom_detail_list'].append(symptom_detail)
row_list[item]['tp_desc_list'].append(symptom['tp'])
row_list[item]['desc_key_list'].append(desc_key)
row_list[item]['todo_detail_list'].append(todo_detail)
continue
res_list.append(row_list[item])
res = make_common_res(0, 'ok')
res['data'] = {
'nodeid': nodeid,
'date': date,
'crossid': crossid,
'res_list': res_list
}
return json.dumps(res, ensure_ascii=False)
def parse_time(time_str):
"""将时间字符串转换为datetime.time对象"""
return datetime.strptime(time_str, '%H:%M').time()
def has_overlap(time_range1, time_range2):
"""判断两个时间段是否有交集"""
# 解析时间段
start1, end1 = map(parse_time, time_range1.split('-'))
start2, end2 = map(parse_time, time_range2.split('-'))
# 检查交集条件
return start1 < end2 and start2 < end1
# 路口优化页面 获取路口已存在的优化任务信息 接口5
def get_cross_optimize_plan_detail(params):
nodeid = check_nodeid(params)
if not nodeid:
return json.dumps(make_common_res(1, 'nodeid异常'))
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(2, '缺少路口id信息'))
planid = check_param(params, 'planid')
if not planid:
return json.dumps(make_common_res(3, '缺少方案id信息'))
tp_start = check_param(params, 'tp_start')
if not tp_start:
return json.dumps(make_common_res(4, '缺少时段开始时刻'))
tp_end = check_param(params, 'tp_end')
if not tp_end:
return json.dumps(make_common_res(5, '缺少时段结束时刻'))
taskId = check_param(params, 'taskid')
if not taskId:
return json.dumps(make_common_res(6, '缺少任务id信息'))
crossName = g_info_dict.get(g.nodeid)['g_roadnet'].query_cross(crossid).name
init_plan_res, target_plan_res, his_plan_list, edit_plan_res, delay_matrix_res, delay_matrix_now_res = None, None, {}, None, None, None
init_weekday = ''
init_plan_str, delay_input_str = situation_db_pool.query_init_info(nodeid, taskId)
if init_plan_str:
init_plan_res = Plan.parse_from_json(init_plan_str).to_dict()
if delay_input_str:
delay_matrix_res = DelayMatrix.parse_from_json(delay_input_str).to_dict()
init_weekday = datetime.strptime(delay_matrix_res['date'], '%Y%m%d').weekday()
query_type = check_param(params, 'type')
if not query_type or query_type == 'all':
target_plan_str, edit_plan_str, his_plan_dict = situation_db_pool.query_optimize_records(nodeid, taskId)
if target_plan_str:
target_plan_res = Plan.parse_from_json(target_plan_str).to_dict()
if edit_plan_str:
edit_plan_res = Plan.parse_from_json(edit_plan_str).to_dict()
for seq in his_plan_dict.keys():
item = Plan.parse_from_json(his_plan_dict[seq]).to_dict()
timestamp = str(datetime.fromtimestamp(item['timestamp']).__format__('%Y%m%d %H:%M'))
his_plan_list[timestamp] = item
if init_weekday:
max_date = get_max_usable_date(init_weekday)
dm_now = get_delay_matrix(str(max_date), tp_start, tp_end, crossid)
delay_matrix_now_res = dm_now.to_dict()
# i_list = calc_dm_delay_change_detail(delay_matrix_res['delay_matrix'], delay_matrix_now_res['delay_matrix'])
# delay_matrix_now_res['change_flag'] = i_list
res = make_common_res(0, 'ok')
res['data'] = {
'nodeid': nodeid,
'crossid': crossid,
'crossName': crossName,
'planid': planid,
'tp_start': tp_start,
'tp_end': tp_end,
'optimize_taskid': taskId,
'init_plan': init_plan_res, # 初始方案
'target_plan': target_plan_res, # 推荐方案
'his_plan': his_plan_list, # 历史方案
'edit_plan': edit_plan_res, # 编辑方案
'delay_matrix': delay_matrix_res, # 初始关键指标延误矩阵
'delay_matrix_now': delay_matrix_now_res, # 当前最新的关键指标延误矩阵
}
return json.dumps(res, ensure_ascii=False)
def get_max_usable_date(init_weekday):
cross_useable_date = situation_db_pool.query_situation_usable_date()['cross']
max_date = 00000000
for date in cross_useable_date['hisday']:
tmp_weekday = datetime.strptime(str(date), '%Y%m%d').weekday()
if tmp_weekday == init_weekday and int(date) > int(max_date):
max_date = date
return max_date
def get_delay_matrix(date_str, tp_start, tp_end, crossid):
roadnet = g_info_dict.get(g.nodeid)['g_roadnet']
crossform = CrossForm.extract_cross_form(roadnet, crossid)
start_time = str(date_str) + ' ' + str(tp_start)
end_time = str(date_str) + ' ' + str(tp_end)
start_timestamp = datetime.strptime(start_time, '%Y%m%d %H:%M').timestamp()
end_timestamp = datetime.strptime(end_time, '%Y%m%d %H:%M').timestamp()
flow_delay_list = db.query_delay_matrix(start_timestamp, end_timestamp, crossid)
for item in flow_delay_list:
inroadid = item['inroadid']
dirname = crossform.inroad_dirname_map[inroadid]
item['dirname'] = dirname
dm = DelayMatrix()
dirname_list = list(crossform.dirname_turns_map.keys())
dm.set_inroad_names(dirname_list)
dm.add_flow_delay_data(flow_delay_list)
dm.disable_all_badturns(crossform.dirname_turns_map)
dm.date = date_str
dm.tp_start = tp_start
dm.tp_end = tp_end
return dm
# 路口优化页面 获取路口方案与延误矩阵信息 接口3
def get_cross_init_info(params):
nodeid = check_nodeid(params)
if not nodeid:
return json.dumps(make_common_res(1, 'nodeid异常'))
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(2, '缺少路口id信息'))
date = check_param(params, 'date')
if not date:
return json.dumps(make_common_res(3, '缺少日期信息'))
planid = check_param(params, 'planid')
if not planid:
return json.dumps(make_common_res(4, '缺少方案id信息'))
tp_start = check_param(params, 'tp_start')
if not tp_start:
return json.dumps(make_common_res(5, '缺少时段开始时刻'))
tp_end = check_param(params, 'tp_end')
if not tp_end:
return json.dumps(make_common_res(6, '缺少时段结束时刻'))
delay_matrix = get_delay_matrix(str(date), tp_start, tp_end, crossid)
plan: Plan = get_cross_init_plan(nodeid, crossid, planid)
symptom_list = check_param(params, 'symptom_list')
if not symptom_list or len(symptom_list) == 0:
logging.info('异常类型为空')
else:
s_list = []
for item in symptom_list:
info = {}
info['daytype'] = 'hisday'
info['crossid'] = crossid
info['day'] = date
info['tp'] = 0
info['duration'] = 0
info['desc_key'] = item['desc_key']
info['type'] = item['symptom_type_eng']
s = SymptomRecord.make_from_dict(info)
s_list.append(s)
OptimizePlanTool.judge_plan_adjust_type(plan, s_list)
res = make_common_res(0, 'ok')
res['data'] = {
'nodeid': nodeid,
'crossid': crossid,
'planid': planid,
'tp_start': tp_start,
'tp_end': tp_end,
'date': date,
'init_plan': plan.to_dict(),
'delay_matrix': delay_matrix.to_dict()
}
return json.dumps(res, ensure_ascii=False)
def get_cross_init_plan(nodeid, crossid, planid):
crossid_mapping = g_info_dict.get(nodeid)['g_jj_cross']
jj_crossid = crossid_mapping[crossid]
phasetable_data = db_phasetable.query_phasetable(nodeid, jj_crossid)
plan_data = phasetable_data['plans'][int(planid)]
phase_data: dict = phasetable_data['phases']
plan = Plan()
plan.init_from_dict(plan_data)
for info in plan_data['stages']:
stage = PhaseStage()
stage.init_from_dict(info)
plan.add_stage(stage)
for phase_info in phase_data.values():
phase = PhaseInfo()
phase.init_from_dict(phase_info)
plan.add_phase(phase)
plan.update_stages_max_min_green()
return plan
# 路口优化页面 生成方案,创建路口配时方案优化方案 接口4
def gen_cross_optimize(params):
tp_dict = {'早高峰': 100, '晚高峰': 200, '平峰': 400}
nodeid = check_nodeid(params)
if not nodeid:
return json.dumps(make_common_res(1, 'nodeid异常'))
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(2, '缺少路口id信息'))
planid = check_param(params, 'planid')
if not planid:
return json.dumps(make_common_res(3, '缺少方案id信息'))
tp_start = check_param(params, 'tp_start')
if not tp_start:
return json.dumps(make_common_res(4, '缺少时段开始时刻'))
tp_end = check_param(params, 'tp_end')
if not tp_end:
return json.dumps(make_common_res(5, '缺少时段结束时刻'))
init_plan_str = check_param(params, 'init_plan')
if not init_plan_str:
return json.dumps(make_common_res(6, '缺少初始方案信息'))
init_plan = Plan.parse_from_json(init_plan_str)
delay_matrix_str = check_param(params, 'delay_matrix')
if not delay_matrix_str:
return json.dumps(make_common_res(7, '缺少初始延误数据矩阵信息'))
date = check_param(params, 'date')
if not date:
return json.dumps(make_common_res(8, '缺少日期信息'))
delay_matrix = DelayMatrix.parse_from_json(delay_matrix_str)
roadnet = g_info_dict.get(g.nodeid)['g_roadnet']
crossName = roadnet.query_cross(crossid).name
cross_form = CrossForm.extract_cross_form(roadnet, crossid)
symptom_list = check_param(params, 'symptom_list')
init_plan_id = init_plan.planid
init_plan_name = init_plan.name
if not symptom_list or len(symptom_list) == 0:
logging.info('异常类型为空')
target_plan, diff_list = OptimizePlanTool.do_plan_optimize(cross_form, init_plan, delay_matrix)
else:
symptom_list = json.loads(symptom_list)
s_list = []
for item in symptom_list:
item['daytype'] = 'hisday'
item['crossid'] = crossid
item['day'] = date
item['tp'] = tp_dict[item['tp_desc']]
item['duration'] = 0
s = SymptomRecord.make_from_dict(item)
s_list.append(s)
target_plan, diff_list = OptimizePlanTool.do_plan_optimize(cross_form, init_plan, delay_matrix, s_list)
if not target_plan:
return json.dumps(make_common_res(10, '未能生成方案'))
now_timestamp = datetime.now().timestamp()
init_plan.timestamp = now_timestamp
target_plan.timestamp = now_timestamp
optimize_taskid = gen_optimize_taskid(nodeid, crossid, planid, tp_start, tp_end)
sum_success_ret = 0
ret = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, planid, tp_start, tp_end, 'init_plan',
init_plan_str, nodeid, 0, init_plan_id, init_plan_name)
if ret:
sum_success_ret += 1
ret = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, planid, tp_start, tp_end, 'delay_matrix',
delay_matrix_str, nodeid, 0, init_plan_id, init_plan_name)
if ret:
sum_success_ret += 1
ret = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, planid, tp_start, tp_end, 'target_plan',
json.dumps(target_plan.to_dict(), ensure_ascii=False), nodeid, 0,
init_plan_id, init_plan_name)
if ret:
sum_success_ret += 1
if sum_success_ret == 3:
res = make_common_res(0, 'ok')
res['data'] = {
'nodeid': nodeid,
'crossid': crossid,
'planid': planid,
'tp_start': tp_start,
'tp_end': tp_end,
'crossName': crossName,
'optimize_taskid': optimize_taskid,
'init_plan': init_plan.to_dict(),
'target_plan': target_plan.to_dict(),
'his_plans': [],
'edit_plan': None,
'delay_matrix': delay_matrix.to_dict(),
'delay_matrix_now': None
}
return json.dumps(res, ensure_ascii=False)
else:
return json.dumps(make_common_res(11, '保存方案优化信息失败'))
def gen_optimize_taskid(nodeid, crossid, planid, tp_start, tp_end):
current_timestamp = time.time()
optimize_taskid = md5_hash(f"""{nodeid}-{crossid}-{planid}-{tp_start}-{tp_end}-{current_timestamp}""")
return optimize_taskid
def save_edit_plan(params):
nodeid = check_nodeid(params)
if not nodeid:
return json.dumps(make_common_res(1, 'nodeid异常'))
optimize_taskid = check_param(params, 'taskid')
if not optimize_taskid:
return json.dumps(make_common_res(2, '缺少优化任务id'))
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(3, '缺少路口id信息'))
planid = check_param(params, 'planid')
if not planid:
return json.dumps(make_common_res(4, '缺少方案id信息'))
tp_start = check_param(params, 'tp_start')
if not tp_start:
return json.dumps(make_common_res(5, '缺少时段开始时刻'))
tp_end = check_param(params, 'tp_end')
if not tp_end:
return json.dumps(make_common_res(6, '缺少时段结束时刻'))
edit_plan = check_param(params, 'edit_plan')
if not edit_plan:
return json.dumps(make_common_res(7, '缺少编辑方案信息'))
init_plan_id = check_param(params, 'init_planid')
if not init_plan_id:
return json.dumps(make_common_res(8, '缺少初始方案id'))
init_plan_name = check_param(params, 'init_planname')
if not init_plan_name:
return json.dumps(make_common_res(9, '缺少初始方案名称'))
new_planid = Plan.parse_from_json(edit_plan).planid
# 表示当前的操作是更新操作
modify_params = gen_modify_params(planid, crossid, nodeid, edit_plan)
if new_planid == planid:
modify_res = json.loads(modify_plan_stage(modify_params))
else:
# 当前的方案号不存在 当前的操作是创建新的方案
modify_res = json.loads(add_plan_stage(modify_params))
flag, status = situation_db_pool.check_exists_edit_plan(optimize_taskid)
tmp_plan = Plan.parse_from_json(edit_plan)
tmp_plan.timestamp = datetime.now().timestamp()
edit_plan = tmp_plan.to_dict()
if flag:
res = situation_db_pool.update_optimize_edit_plan(optimize_taskid, edit_plan, new_planid)
else:
res = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, planid, tp_start, tp_end, 'edit_plan',
json.dumps(edit_plan, ensure_ascii=False), nodeid, status,
init_plan_id, init_plan_name)
if res and modify_res['status'] == 0:
return json.dumps(make_common_res(0, '操作成功'))
else:
return json.dumps(modify_res)
def issued_edit_plan(params):
nodeid = check_nodeid(params)
if not nodeid:
return json.dumps(make_common_res(1, 'nodeid异常'))
optimize_taskid = check_param(params, 'taskid')
if not optimize_taskid:
return json.dumps(make_common_res(2, '缺少优化任务id'))
issued_plan = check_param(params, 'issued_plan')
if not issued_plan:
return json.dumps(make_common_res(3, '缺少下发方案信息'))
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(4, '缺少路口id信息'))
planid = check_param(params, 'planid')
if not planid:
return json.dumps(make_common_res(5, '缺少方案id信息'))
tp_start = check_param(params, 'tp_start')
if not tp_start:
return json.dumps(make_common_res(6, '缺少时段开始时刻'))
tp_end = check_param(params, 'tp_end')
if not tp_end:
return json.dumps(make_common_res(7, '缺少时段结束时刻'))
init_plan_id = check_param(params, 'init_planid')
if not init_plan_id:
return json.dumps(make_common_res(8, '缺少初始方案id'))
init_plan_name = check_param(params, 'init_planname')
if not init_plan_name:
return json.dumps(make_common_res(9, '缺少初始方案名称'))
# 判定当前id是否存在his_plan如果存在则取出tag字段 截取出当前最大的iter_plan_x中x值然后+1否则tag='iter_plan_1'
plan_seq = 0
his_plan_list = situation_db_pool.query_optimize_his_plans(optimize_taskid)
new_planid = Plan.parse_from_json(issued_plan).planid
modify_params = gen_modify_params(planid, crossid, nodeid, issued_plan)
if planid == new_planid:
modify_res = json.loads(modify_plan_stage(modify_params))
else:
# 当前的方案号不存在 当前的操作是创建新的方案
modify_res = json.loads(add_plan_stage(modify_params))
for his_plan in his_plan_list:
item_seq = int(str(his_plan['tag']).split('iter_plan_')[1])
if item_seq > plan_seq:
plan_seq = item_seq
tmp_plan = Plan.parse_from_json(issued_plan)
tmp_plan.timestamp = datetime.now().timestamp()
issued_plan = tmp_plan.to_dict()
# 当his_plan不存在时说明是第一次下发还需要将该taskid的状态字段status更新为迭代中 也就是 1
if plan_seq == 0:
ret = situation_db_pool.update_optimize_records_status(optimize_taskid, 1)
if not ret:
return json.dumps(make_common_res(10, '更新状态失败'))
flag, status = situation_db_pool.check_exists_edit_plan(optimize_taskid)
if flag:
ret = situation_db_pool.update_optimize_edit_plan(optimize_taskid, issued_plan, new_planid)
else:
ret = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, new_planid, tp_start, tp_end,
'edit_plan',
json.dumps(issued_plan, ensure_ascii=False), nodeid, status,
init_plan_id, init_plan_name)
if not ret:
return json.dumps(make_common_res(11, '保存方案失败'))
ret = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, new_planid, tp_start, tp_end,
f'iter_plan_{plan_seq + 1}',
json.dumps(issued_plan, ensure_ascii=False), nodeid, status,
init_plan_id, init_plan_name)
if not ret:
return json.dumps(make_common_res(12, '保存方案失败'))
else:
# 说明方案已经下发过 当前任务状态为1 迭代中需要将当前方案保存为his_plan中的一个 且将当前编辑的方案保存为编辑方案
ret1 = situation_db_pool.update_optimize_edit_plan(optimize_taskid, issued_plan, new_planid)
ret2 = situation_db_pool.save_optimize_task_info(optimize_taskid, crossid, new_planid, tp_start, tp_end,
f'iter_plan_{plan_seq + 1}',
json.dumps(issued_plan, ensure_ascii=False), nodeid, 1,
init_plan_id,
init_plan_name)
if not ret1 or not ret2:
return json.dumps(make_common_res(13, '保存方案失败'))
row_list = situation_db_pool.query_optimize_issued_plan(optimize_taskid)
edit_plan, his_plans = None, []
for row in row_list:
tag = row['tag']
if tag == 'edit_plan':
edit_plan = Plan.parse_from_json(row['content']).to_dict()
else:
his_plans.append(Plan.parse_from_json(row['content']).to_dict())
if modify_res['status'] == 0:
res = make_common_res(0, 'ok')
res['data'] = {
'edit_plan': edit_plan,
'his_plans': his_plans
}
return json.dumps(res, ensure_ascii=False)
else:
return json.dumps(modify_res)
def update_optimize_plan_status(params):
nodeid = check_nodeid(params)
if not nodeid:
return json.dumps(make_common_res(1, 'nodeid异常'))
optimize_taskid = check_param(params, 'optimize_taskid')
if not optimize_taskid:
return json.dumps(make_common_res(2, '缺少优化任务id'))
status = check_param(params, 'status')
if not status or status not in ('finish', 'abort'):
return json.dumps(make_common_res(3, '缺少状态信息'))
ret = situation_db_pool.update_optimize_plan_status_byid(nodeid, optimize_taskid, status)
if ret:
return json.dumps(make_common_res(0, 'ok'))
else:
return json.dumps(make_common_res(4, '更新失败'))
def gen_modify_params(planid, crossid, nodeid, edit_plan):
plan_dict = Plan.parse_from_json(edit_plan).to_dict()
new_plan_stages = []
plan_stages = plan_dict['stages']
plan_phases = plan_dict['phases']
plan_phase_dict = {}
for phase in plan_phases:
plan_phase_dict[phase['phaseid']] = phase
for stage in plan_stages:
phaseids = stage['phaseids'].split(',')
phaseid_names = plan_phase_dict[int(phaseids[0])]['name']
max_green, min_green = plan_phase_dict[int(phaseids[0])]['max_green'], plan_phase_dict[int(phaseids[0])][
'min_green']
if len(phaseids) > 1:
for item in phaseids[1:]:
phaseid_names += f",{plan_phase_dict[int(item)]['name']}"
new_plan_stages.append({
'stageid': stage['stageid'],
'stage_name': stage['name'],
'stage_duration': stage['duration'],
'green': stage['green'],
'redyellow': stage['redyellow'],
'yellow': stage['yellow'],
'allred': stage['allred'],
'phaseids': stage['phaseids'],
'phaseid_names': phaseid_names,
'max_green': max_green,
'min_green': min_green,
'phases': phaseid_names
})
modify_params = {
'planid': planid,
'plan_name': plan_dict['name'],
'offset': plan_dict['offset'],
'cycle': plan_dict['cycle'],
'crossid': crossid,
'stages': new_plan_stages,
'nodeid': nodeid
}
return modify_params
def calc_dm_delay_change_detail(dm, dm_now):
i_list = []
if dm is not None and dm_now is not None:
# 同时遍历两个三维数组
for i in range(len(dm)):
j_list = []
for j in range(len(dm[i])):
flag_list = []
for k in range(len(dm[i][j])):
# 比较相同位置的元素
value1 = dm[i][j][k] if dm[i][j][k] != '-' else 0
value2 = dm_now[i][j][k] if dm_now[i][j][k] != '-' else 0
if float(value2) > float(value1):
flag = 1
elif float(value2) < float(value1):
flag = -1
else:
flag = 0
flag_list.append(flag)
j_list.append(flag_list)
i_list.append(j_list)
return i_list
def download_optimize_plan(params):
nodeid = check_nodeid(params)
if not nodeid:
return json.dumps(make_common_res(1, 'nodeid异常'))
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(2, '缺少路口id'))
cross_info = g_info_dict.get(g.nodeid)['g_roadnet'].query_cross(crossid)
download_plan = check_param(params, 'download_plan')
if not download_plan:
return json.dumps(make_common_res(3, '缺少下载方案'))
download_plan_obj = Plan.parse_from_json(download_plan)
download_plan_dict = download_plan_obj.to_dict()
wb = Workbook()
sheet1 = wb.active
sheet1.title = "配时方案表"
sheet1.append(t_sheet_cols['配时方案表']['head'])
sheet2 = wb.create_sheet("阶段表")
sheet2.append(t_sheet_cols['阶段表']['head'])
stages = download_plan_dict['stages']
for index, stage in enumerate(stages):
num = index + 1
sheet1.append([num, download_plan_dict['planid'], download_plan_dict['name'], download_plan_dict['cycle'], num,
stage['stageid'], stage['duration']])
sheet2.append([num, stage['stageid'], stage['name'], stage['yellow'], stage['allred'], stage['phaseids']])
sheet3 = wb.create_sheet("相位表")
sheet3.append(t_sheet_cols['相位表']['head'])
phases = download_plan_dict['phases']
for index, phase in enumerate(phases):
num = index + 1
sheet3.append([num, phase['phaseid'], phase['name'], phase['min_green'], phase['max_green']])
excel_data = BytesIO()
wb.save(excel_data)
excel_data.seek(0)
return send_file(
excel_data,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=f"{cross_info.name}_{download_plan_obj.name}_配时方案.xlsx")
def get_road_net_detail(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
roadid = check_param(params, 'roadid')
if not roadid:
return json.dumps(make_common_res(3, '缺少roadid 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
road_info, error = get_road_info([roadid])
if error:
return json.dumps(make_common_res(2, f"{error}"))
return json.dumps(make_common_res(0, road_info[roadid]['src_direct']))