提交路口优化任务相关代码

This commit is contained in:
xuwang 2026-06-10 10:20:04 +08:00
parent da8be17f13
commit 2739f5a6b5
5 changed files with 713 additions and 123 deletions

View File

@ -68,7 +68,14 @@ def gen_cross_compare_report(params):
cut_images = check_param(params, 'cut_images')
if not cut_images:
cut_images = 0
date_list4screen = transition_date_list4screen(date_list)
origin_phase_info = check_param(params, 'origin_phase_info')
if not origin_phase_info:
origin_phase_info = None
final_phase_info = check_param(params, 'final_phase_info')
if not final_phase_info:
final_phase_info = None
date_list4screen = transition_date_list4screen(compare_date_list)
all_date_set = set()
for item_list in date_list:
for item in item_list:
@ -157,8 +164,8 @@ def gen_cross_compare_report(params):
part1_data = gen_compare_report_part1_data(data_range, compare_date_range, time_range, cross_static_info['name'], final_overview)
if 'part1' in parts:
doc1.tpl_paragraph['part1']['visible'] = 1
doc1.tpl_data['part1_1'].item1 = part1_data['data_range']
doc1.tpl_data['part1_1'].item2 = part1_data['compare_date_range']
doc1.tpl_data['part1_1'].item2 = part1_data['data_range']
doc1.tpl_data['part1_1'].item1 = part1_data['compare_date_range']
doc1.tpl_data['part1_1'].item3 = part1_data['cross_name']
doc1.tpl_data['part1_1'].item4 = part1_data['time_range']
for item in part1_data['data_list']:
@ -189,7 +196,7 @@ def gen_cross_compare_report(params):
doc1.tpl_data['part2_3'].table = [detail1]
if 'part3' in parts:
doc1.tpl_paragraph['part3']['visible'] = 1
part3_data = gen_compare_report_part3_data(crossid, nodeid, area_id, time_range, tp_start, date_list, avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict, cross_phase, is_peak, cross_ledger_info, weekdays)
part3_data = gen_compare_report_part3_data(crossid, nodeid, area_id, time_range, tp_start, date_list, avg_comp_cross_delay_info, roads_dir_dict, inroad_static_info_dict, cross_phase, is_peak, cross_ledger_info, weekdays)
for problem_key in part3_data.keys():
class_part = PartDetail()
class_part.item1 = part3_data[problem_key]['class_name']
@ -284,7 +291,14 @@ def gen_compare_report_part1_data(data_range, compare_date_range, time_range, cr
final_overview[compare_key] = int(final_overview[compare_key].replace('%', ''))
if final_overview[compare_key] and final_overview[key] and final_overview[key] != '-' and final_overview[compare_key] != '-' and final_overview[compare_key] > final_overview[key]:
rate = round((final_overview[compare_key] - final_overview[key]) / final_overview[compare_key] * 100, 2) if final_overview[compare_key] > 0 else 0
compare_res_str = f"{index_dict[key]}{final_overview[compare_key]}下降为{final_overview[key]} 减少{round(final_overview[compare_key] - final_overview[key], 2)},优化率为{rate}%"
if key == 'stop_times':
compare_res_str = f"{index_dict[key]}{final_overview[compare_key]}次下降为{final_overview[key]}次, 减少{round(final_overview[compare_key] - final_overview[key], 2)}次,优化率为{rate}%"
elif key == 'high_park_percent':
compare_res_str = f"{index_dict[key]}{final_overview[compare_key]}%下降为{final_overview[key]}% 减少{round(final_overview[compare_key] - final_overview[key], 2)}%,优化率为{rate}%"
elif key == 'delay_time':
compare_res_str = f"{index_dict[key]}{final_overview[compare_key]}s下降为{final_overview[key]}s 减少{round(final_overview[compare_key] - final_overview[key], 2)}s优化率为{rate}%"
else:
compare_res_str = f"{index_dict[key]}{final_overview[compare_key]}提升为{final_overview[key]} 提升{round(final_overview[key] - final_overview[compare_key], 2)},优化率为{rate}%"
part1_data['data_list'].append(compare_res_str)
else:
if final_overview[compare_key] and final_overview[key] and final_overview[key] != '-' and final_overview[compare_key] != '-' and final_overview[compare_key] < final_overview[key]:
@ -323,6 +337,8 @@ def gen_compare_report_part2_2_data(road_flow_delay_infos, roads_dir_dict):
if err_dir_list:
total_str = ''.join(err_dir_list)
total_str += ''
if total_str == '':
total_str = '根据各进口道车辆延误时间分析,没有服务水平较低的进口道。'
part2_2_data['detail'] = total_str
return part2_2_data
@ -476,7 +492,12 @@ def gen_compare_report_part5_data(part1_data, compared_inroad_delay_infos):
compare_data[key] = int(compare_data[key].replace('%', ''))
if compare_data[key] and item_data[key] and compare_data[key] != '-' and item_data[key] != '-' and compare_data[key] > item_data[key]:
rate = round((compare_data[key] - item_data[key]) / compare_data[key] * 100, 2) if compare_data[key] > 0 else 0
compare_res_str = f"{index_dict[key]}{compare_data[key]}下降为{item_data[key]} 减少{diff_data[key]},优化率为{rate}%"
if key == 'stop_times':
compare_res_str = f"{index_dict[key]}{compare_data[key]}次下降为{item_data[key]}次, 减少{diff_data[key]}次,优化率为{rate}%"
elif key == 'high_park_percent':
compare_res_str = f"{index_dict[key]}{compare_data[key]}%下降为{item_data[key]}% 减少{diff_data[key]}%,优化率为{rate}%"
else:
compare_res_str = f"{index_dict[key]}{compare_data[key]}s下降为{item_data[key]}s 减少{diff_data[key]}s优化率为{rate}%"
src_dir_data.append(compare_res_str)
else:
if compare_data[key] and item_data[key] and compare_data[key] != '-' and item_data[key] != '-' and compare_data[key] < item_data[key]:
@ -624,4 +645,44 @@ def transition_date_list4screen(date_list):
tmp_list[i] = [datetime.strptime(tmp_list[i][0], '%Y%m%d').strftime('%Y-%m-%d'), datetime.strptime(tmp_list[i][1], '%Y%m%d').strftime('%Y-%m-%d')]
elif len(tmp_list[i]) > 2:
tmp_list[i] = [datetime.strptime(tmp_list[i][0], '%Y%m%d').strftime('%Y-%m-%d'), datetime.strptime(tmp_list[i][-1], '%Y%m%d').strftime('%Y-%m-%d')]
return tmp_list
return tmp_list
def query_cross_optimize_task_list(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(6, '缺少crossid 请刷新后重试'))
records = db_task.query_cross_optimize_task_records(nodeid, area_id, crossid)
for item in records:
origin_phase_detail = item['origin_phase_detail']
if origin_phase_detail:
origin_phase_detail = json.loads(origin_phase_detail)
optimize_phase_detail = item['optimize_phase_detail']
if optimize_phase_detail:
optimize_phase_detail = json.loads(optimize_phase_detail)
phase_tiny_adjust_detail = item['phase_tiny_adjust_detail']
if phase_tiny_adjust_detail:
phase_tiny_adjust_detail = json.loads(phase_tiny_adjust_detail)
item['origin_phase_detail'] = origin_phase_detail
item['optimize_phase_detail'] = optimize_phase_detail
item['phase_tiny_adjust_detail'] = phase_tiny_adjust_detail
item['check_result_time'] = item['check_result_time'].strftime('%Y-%m-%d %H:%M:%S') if item['check_result_time'] else ''
res = make_common_res(0, 'ok')
res['data'] = records
return json.dumps(res, ensure_ascii=False)

View File

@ -739,16 +739,99 @@ def calc_inroad_imbalance_index(inroad_delay_pb_list):
return tmp_list
def _classify_angles_to_turn_types(angles, is_merge=False):
if not angles:
return set()
default_straight = 30
default_uturn = 150
abs_angles = sorted(set(abs(a) for a in angles))
straight_candidates = sorted(set(abs_angles + [default_straight]))
uturn_candidates = sorted(set(abs_angles + [default_uturn]))
best_types = None
best_score = (-1, float('inf'), float('inf'))
for s_thr in straight_candidates:
for u_thr in uturn_candidates:
if s_thr >= u_thr:
continue
types = set()
for a in angles:
if abs(a) <= s_thr:
types.add(0)
elif abs(a) >= u_thr:
types.add(3)
elif a > 0:
types.add(1 if is_merge else 2)
else:
types.add(2 if is_merge else 1)
score = (len(types), -abs(s_thr - default_straight), -abs(u_thr - default_uturn))
if score > best_score:
best_score = score
best_types = types
return best_types
def gen_flow_turn_rate_index(avg_cross_delay_info, roads_dir_dict):
road_delay_infos = avg_cross_delay_info.inroad_delay_infos
outroad_infos = avg_cross_delay_info.outroad_infos
road_delay_dict = {item.inroadid: item for item in road_delay_infos}
outroad_info_dict = {item.outroadid: item for item in outroad_infos}
cross_sum_car_num = sum([item.delay_info.car_num for item in road_delay_infos])
cross_out_sum_car_num = sum([item.turn_info.car_num for item in outroad_infos])
road_flow_turn_rate = {}
inroadid_list = [roads_dir_dict[k]['in'] for k in roads_dir_dict.keys()]
outroadid_list = [roads_dir_dict[k]['out'] for k in roads_dir_dict.keys()]
cross_sum_car_num = sum([item.delay_info.car_num for item in road_delay_infos if item.inroadid in inroadid_list])
cross_out_sum_car_num = sum([item.turn_info.car_num for item in outroad_infos if item.outroadid in outroadid_list])
# ---- 预处理:为每个进口道预计算 split_turns_set ----
split_turns_precomputed = {}
for dir_key in roads_dir_dict:
roadid = roads_dir_dict[dir_key]['in']
if 'udr_' in roadid or roadid == '-' or roadid not in road_delay_dict:
continue
inroad_info = g_roadnet.query_road(roadid)
if not inroad_info:
continue
angles = []
for outroadid in outroadid_list:
if not outroadid or outroadid == '-':
continue
outroad = g_roadnet.query_road(outroadid)
if not outroad:
continue
angle = g_roadnet.calc_road_turn_angle_without_abs_split(inroad_info, outroad)
angles.append(angle)
turn_types = _classify_angles_to_turn_types(angles, is_merge=False)
if 0 not in turn_types:
logging.warning(f'[DEBUG] crossid={avg_cross_delay_info.crossid} inroad={roadid} dir={dir_key} '
f'angles={[round(a, 1) for a in angles]} -> turn_types={turn_types} (NO STRAIGHT!)')
split_turns_precomputed[roadid] = turn_types
# ---- 预处理:为所有出口道预计算 merge_turns_set ----
merge_turns_precomputed = {}
valid_inroad_infos = []
for inroadid in inroadid_list:
if not inroadid or inroadid == '-':
continue
inroad = g_roadnet.query_road(inroadid)
if inroad:
valid_inroad_infos.append(inroad)
for out_road_id in set(outroadid_list):
if 'udr_' in out_road_id or not out_road_id or out_road_id == '-':
continue
out_road_info = g_roadnet.query_road(out_road_id)
if not out_road_info:
continue
angles = []
for inroad in valid_inroad_infos:
angle = g_roadnet.calc_road_turn_angle_without_abs_merge(out_road_info, inroad)
angles.append(angle)
merge_turns_precomputed[out_road_id] = _classify_angles_to_turn_types(angles, is_merge=True)
for dir in roads_dir_dict.keys():
roadid = roads_dir_dict[dir]['in']
if 'udr_' in roadid:
@ -759,21 +842,7 @@ def gen_flow_turn_rate_index(avg_cross_delay_info, roads_dir_dict):
l_num, s_num, r_num = 0, 0, 0
out_l_num, out_s_num, out_r_num = 0, 0, 0
if roadid != '-' and roadid in road_delay_dict.keys():
inroad_info = g_roadnet.query_road(roadid)
split_turns_set = set()
for outroadid in outroadid_list:
outroad = g_roadnet.query_road(outroadid)
if not outroad:
continue
angle = g_roadnet.calc_road_turn_angle_without_abs_split(inroad_info, outroad)
if abs(angle) <= 30:
split_turns_set.add(0)
elif abs(angle) >= 150:
split_turns_set.add(3)
elif angle > 0:
split_turns_set.add(2)
else:
split_turns_set.add(1)
split_turns_set = split_turns_precomputed.get(roadid, set())
car_num = road_delay_dict[roadid].delay_info.car_num
in_flow_rate = round(car_num / cross_sum_car_num * 100, 1) if cross_sum_car_num != 0 else 0
l_rate = max(1, round((road_delay_dict[roadid].delay_info.turn_ratio_1 + road_delay_dict[roadid].delay_info.turn_ratio_3)) / car_num * 100) if car_num != 0 else 0
@ -795,21 +864,7 @@ def gen_flow_turn_rate_index(avg_cross_delay_info, roads_dir_dict):
l_rate, s_rate, r_rate = rate_list[0] if rate_list[0] == '-' else rate_list[0], rate_list[1] if rate_list[1] == '-' else rate_list[1], rate_list[2] if rate_list[2] == '-' else rate_list[2]
out_road_id = roads_dir_dict[dir]['out']
if out_road_id != '-' and out_road_id in outroad_info_dict.keys():
out_road_info = g_roadnet.query_road(out_road_id)
merge_turns_set = set()
for inroadid in inroadid_list:
inroad = g_roadnet.query_road(inroadid)
if not inroad:
continue
angle = g_roadnet.calc_road_turn_angle_without_abs_merge(out_road_info, inroad)
if abs(angle) <= 30:
merge_turns_set.add(0)
elif abs(angle) >= 150:
merge_turns_set.add(3)
elif angle > 0:
merge_turns_set.add(1)
else:
merge_turns_set.add(2)
merge_turns_set = merge_turns_precomputed.get(out_road_id, set())
out_car_num = outroad_info_dict[out_road_id].turn_info.car_num
out_flow_rate = round(out_car_num / cross_out_sum_car_num * 100, 1) if cross_out_sum_car_num != 0 else 0
# out_l_rate = max(1, round(outroad_info_dict[out_road_id].turn_info.turn_ratio_1 / out_car_num * 100)) if out_car_num != 0 else 0

View File

@ -895,10 +895,10 @@ class TaskDbHelper(TableDbHelperBase):
""" % recordid
return self.do_select(sql)
def query_cross_optimize_task_additional_sql(self, task_no, nodeid, area_id):
def query_cross_optimize_task_additional_sql(self, task_no):
sql = """
select * from task.cross_optimize_task_additional_detail where nodeid = %s and area_id = %s and nodeid = %s
""" % (nodeid, area_id, task_no)
select * from task.cross_optimize_task_additional_detail where task_no = %s
""" % (task_no)
return self.do_select(sql)
def query_adjust_plan_infos(self, task_no):
@ -907,15 +907,18 @@ class TaskDbHelper(TableDbHelperBase):
""" % task_no
return self.do_select(sql)
def save_cross_optimize_task_stage1_info(self, stage1_values, task_no, process):
def save_cross_optimize_task_stage1_info(self, stage1_values, task_no, process, stage1_adjust_flag, task_stage):
conn, cursor = self.connect()
other_table_name = "your_other_table_name_here" # 替换为您实际的表名
other_table_name = "task.cross_optimize_task_additional_detail" # 替换为您实际的表名
temp_table_name = "temp_inserted_ids_" + str(int(time.time() * 1000000)) # 生成一个唯一的临时表名
other_insert_sql = f"""
INSERT INTO {other_table_name} (cross_optimize_task_phase_info_id, task_no, process_info) VALUES (%s, %s, %s)
other_update_sql = f"""
update {other_table_name} set stage1_info_ids = %s, stage1_adjust_flag = %s, task_stage = %s where task_no = %s
"""
update_task_process_sql = """
update task.task set progress = %s where taskno = %s
"""
try:
conn.autocommit = False
# 1. 创建临时表来存储ID
create_temp_sql = f"""
CREATE TEMPORARY TABLE {temp_table_name} (
@ -928,8 +931,8 @@ class TaskDbHelper(TableDbHelperBase):
# 2. 批量执行插入操作到原表
insert_sql = """
INSERT INTO task.cross_optimize_task_phase_info
(task_no, plan_id, plan_name, time_range, weekdays, problem_info)
VALUES (%s, %s, %s, %s, %s, %s)
(task_no, schedule_id, plan_id, plan_name, time_range, weekdays, weekdays_str, problem_info)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.executemany(insert_sql, stage1_values)
num_inserted = cursor.rowcount
@ -937,45 +940,261 @@ class TaskDbHelper(TableDbHelperBase):
# 3. 获取本次插入的第一行ID (注意lastrowid 是正确的属性名)
first_inserted_id = cursor.lastrowid
if first_inserted_id is None or num_inserted == 0:
# 如果没有插入任何行lastrowid 可能是 None
# 如果插入了行,但某种原因没拿到 ID也应该报错
if num_inserted > 0:
logging.error("Rows were inserted ({num_inserted}), but could not retrieve the starting ID (lastrowid={first_inserted_id}).")
logging.error(f"Rows inserted ({num_inserted}), but lastrowid={first_inserted_id}")
return False
else:
logging.error("No rows were inserted, cannot determine IDs.")
logging.error("No rows inserted")
return False
# 4. 将计算出的ID范围插入到临时表
# 4. 【修复】直接内存生成 ID去掉临时表
if num_inserted > 0:
# 假设 ID 是连续递增的,从 first_inserted_id 开始
ids_to_insert = [(first_inserted_id + i,) for i in range(num_inserted)]
insert_temp_sql = f"INSERT INTO {temp_table_name} (inserted_id) VALUES (%s)"
cursor.executemany(insert_temp_sql, ids_to_insert)
logging.info(f"Inserted {num_inserted} IDs into temp table using executemany.")
# 5. 从临时表查询ID
select_from_temp_sql = f"SELECT inserted_id FROM {temp_table_name}"
cursor.execute(select_from_temp_sql)
inserted_ids = [row[0] for row in cursor.fetchall()]
logging.info(f"Retrieved IDs from temp table: {inserted_ids}")
if len(inserted_ids) != num_inserted:
logging.error(f"Expected {num_inserted} IDs in temp table, but found {len(inserted_ids)}.")
return False
# 6. 准备并批量插入到另一张表
other_values_to_insert = [(inserted_id, task_no, process) for inserted_id in inserted_ids]
cursor.executemany(other_insert_sql, other_values_to_insert)
logging.info(f"Successfully inserted {len(other_values_to_insert)} records into {other_table_name}.")
inserted_ids = [str(first_inserted_id + i) for i in range(num_inserted)]
logging.info(f"Generated IDs: {inserted_ids[:3]}... (total: {len(inserted_ids)})")
else:
inserted_ids = []
# 5. 【修复】无需查询临时表,直接使用 inserted_ids
logging.info(f"Using {len(inserted_ids)} IDs for UPDATE")
# 6. 执行 other_update_sql
other_update_values = (','.join(inserted_ids), stage1_adjust_flag, task_stage, task_no)
logging.info(f"Executing UPDATE: task_no={task_no}, ids={','.join(inserted_ids)[:50]}...")
cursor.execute(other_update_sql, other_update_values)
logging.info(f"UPDATE rowcount: {cursor.rowcount}")
if cursor.rowcount == 0:
logging.warning(f"UPDATE affected 0 rows for task_no={task_no}")
logging.info(f"Successfully inserted {len(other_update_values)} records into {other_table_name}.")
cursor.execute(update_task_process_sql, (process, task_no))
logging.info(f"Task process update affected rows: {cursor.rowcount}")
# 7. 提交事务
conn.commit()
logging.info("Transaction committed successfully. All records saved.")
return inserted_ids
return True
except Exception as e:
logging.error(f"An error occurred: {e}")
conn.rollback()
return False
finally:
conn.autocommit = True
cursor.close()
conn.close()
def save_stage2_optimize_info(self, stage2_values, process, task_no, task_stage):
sql = """
update task.cross_optimize_task_phase_info set origin_phase_detail = %s, optimize_phase_detail = %s where id = %s and task_no = %s
"""
update_process_sql = """
update task.task set progress = %s where taskno = %s
"""
update_task_stage_sql = """
update task.cross_optimize_task_additional_detail set task_stage = %s where task_no = %s
"""
conn, cursor = self.connect()
try:
conn.autocommit = False
suc_num = 0
for item in stage2_values:
ret = cursor.execute(sql, (item['origin_phase_detail'], item['optimize_phase_detail'], item['id'], task_no))
suc_num += ret
if suc_num == len(stage2_values):
cursor.execute(update_process_sql, (process, task_no))
cursor.execute(update_task_stage_sql, (task_stage, task_no))
conn.commit()
return True
else:
logging.error(f"update error! suc_num = {suc_num}, len(stage2_values) = {stage2_values}")
conn.rollback()
return False
except Exception as e:
logging.error(f"An error occurred: {e}")
conn.rollback()
return False
finally:
conn.autocommit = True
cursor.close()
conn.close()
def save_stage3_optimize_info(self, process, task_no, issue_time, task_stage):
sql = """
update task.cross_optimize_task_additional_detail set stage3_issue_time = %s, task_stage = %s where task_no = %s
"""
update_process_sql = """
update task.task set progress = %s where taskno = %s
"""
conn, cursor = self.connect()
try:
conn.autocommit = False
ret = cursor.execute(sql, (issue_time, task_stage, task_no))
if ret == 1:
update_process_ret = cursor.execute(update_process_sql, (process, task_no))
if update_process_ret == 1:
conn.commit()
return True
else:
logging.error(f"update process error! ret = {ret}, update_process_ret = {update_process_ret}")
conn.rollback()
return False
else:
logging.error(f"update issue_time error! ret = {ret}")
conn.rollback()
return False
except Exception as e:
logging.error(f"An error occurred: {e}")
conn.rollback()
return False
finally:
conn.autocommit = True
cursor.close()
conn.close()
def save_stage4_optimize_info(self, process, task_no, stage4_info, task_stage, check_result_time):
sql = """
update task.cross_optimize_task_phase_info set check_result_res = %s, check_result_common = %s where id = %s
"""
update_process_sql = """
update task.task set progress = %s where taskno = %s
"""
update_task_stage_sql = """
update task.cross_optimize_task_additional_detail set task_stage = %s, check_result_time = %s where task_no = %s
"""
conn, cursor = self.connect()
try:
conn.autocommit = False
suc_num = 0
for item in stage4_info:
ret = cursor.execute(sql, (item['check_result_res'], item['check_result_common'], item['record_id']))
suc_num += ret
if suc_num == len(stage4_info):
cursor.execute(update_process_sql, (process, task_no))
cursor.execute(update_task_stage_sql, (task_stage, check_result_time, task_no))
conn.commit()
return True
else:
logging.error(f"update check_result_res error! suc_num = {suc_num}, len(stage4_info) = {stage4_info}")
conn.rollback()
return False
except Exception as e:
logging.error(f"An error occurred: {e}")
conn.rollback()
return False
finally:
conn.autocommit = True
cursor.close()
conn.close()
def save_stage5_optimize_info(self, process, task_no, task_stage, stage5_info, check_result_time, stage5_all_tp_pass):
sql = """
update task.cross_optimize_task_phase_info set phase_tiny_adjust_detail = %s where id = %s
"""
update_process_sql = """
update task.task set progress = %s where taskno = %s
"""
update_task_stage_sql = """
update task.cross_optimize_task_additional_detail set task_stage = %s, check_result_time = %s, stage5_all_tp_pass = %s where task_no = %s
"""
conn, cursor = self.connect()
try:
conn.autocommit = False
suc_num = 0
for item in stage5_info:
ret = cursor.execute(sql, (item['phase_tiny_adjust_detail_json'], item['id']))
suc_num += ret
if suc_num == len(stage5_info):
cursor.execute(update_process_sql, (process, task_no))
cursor.execute(update_task_stage_sql, (task_stage, check_result_time, stage5_all_tp_pass, task_no))
conn.commit()
return True
else:
logging.error(f"update phase_tiny_adjust_detail error! suc_num = {suc_num}, len(stage5_info) = {stage5_info}")
conn.rollback()
return False
except Exception as e:
logging.error(f"An error occurred: {e}")
conn.rollback()
return False
finally:
conn.autocommit = True
cursor.close()
conn.close()
def save_stage7_optimize_info(self, process, task_no, task_stage, open_cross_monitor):
sql = """
update task.cross_optimize_task_additional_detail set open_cross_monitor = %s where task_no = %s
"""
update_process_sql = """
update task.task set progress = %s where taskno = %s
"""
update_task_stage_sql = """
update task.cross_optimize_task_additional_detail set task_stage = %s where task_no = %s
"""
conn, cursor = self.connect()
try:
conn.autocommit = False
cursor.execute(sql, (open_cross_monitor, task_no))
cursor.execute(update_process_sql, (process, task_no))
cursor.execute(update_task_stage_sql, (task_stage, task_no))
conn.commit()
return True
except Exception as e:
logging.error(f"An error occurred: {e}")
conn.rollback()
return False
finally:
conn.autocommit = True
cursor.close()
conn.close()
def save_stage6_optimize_info(self, process, task_no, task_stage, download_url, upload_time):
sql = """
update task.cross_optimize_task_additional_detail set stage6_report_url = %s, stage6_report_upload_time = %s where task_no = %s
"""
update_process_sql = """
update task.task set progress = %s where taskno = %s
"""
update_task_stage_sql = """
update task.cross_optimize_task_additional_detail set task_stage = %s where task_no = %s
"""
conn, cursor = self.connect()
try:
conn.autocommit = False
ret = cursor.execute(sql, (download_url, upload_time, task_no))
if ret == 1:
cursor.execute(update_process_sql, (process, task_no))
cursor.execute(update_task_stage_sql, (task_stage, task_no))
conn.commit()
return True
else:
logging.error(f"update download_url error! ret = {ret}")
conn.rollback()
return False
except Exception as e:
logging.error(f"An error occurred: {e}")
conn.rollback()
return False
finally:
conn.autocommit = True
cursor.close()
conn.close()
def save_optimize_task_additional_info(self, task_no, task_stage, crossid, link_cross_monitor_task_no, link_cross_monitor_task_executor, link_cross_monitor_task_recordid, open_cross_monitor):
sql = """
insert into task.cross_optimize_task_additional_detail (task_no, crossid, task_stage, link_cross_monitor_task_no, link_cross_monitor_task_executor, link_cross_monitor_task_recordid, open_cross_monitor) values(%s, '%s', '%s', %s, '%s', %s, %s)
""" % (task_no, crossid, task_stage, link_cross_monitor_task_no, link_cross_monitor_task_executor, link_cross_monitor_task_recordid, open_cross_monitor)
return self.do_execute(sql)
def query_cross_optimize_task_records(self, nodeid, area_id, crossid):
sql = """
select t2.task_no, t1.task_name,t1.plan_begin_time, t1.plan_end_time, t1.executor, t2.task_stage,t2.stage1_info_ids, t3.origin_phase_detail, t3.optimize_phase_detail, t3.phase_tiny_adjust_detail, t2.stage3_issue_time, t2.check_result_time from
(select * from task.task where nodeid = %s and area_id = %s and crossids = '%s' and task_type_class = 2 and record_state != 1) t1 inner join
(select * from task.cross_optimize_task_additional_detail where crossid = '%s') t2 inner join
(select * from task.cross_optimize_task_phase_info) t3
on t1.taskno = t2.task_no and t1.taskno = t3.task_no
""" % (nodeid, area_id, crossid, crossid)
return self.do_select(sql)
#
# if __name__ == '__main__':
# tt_5min = get_latest_5min_timestamp()

View File

@ -11,6 +11,8 @@ from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formataddr
import requests
from app.cross_monitor_task_avg_split import cluster_crossings_by_region, assign_to_shuffled_managers
from app.eva_common import query_cross_ledger_info, gen_crossids_roads_dir_dict_by_mysql
from app.global_source import db_user, db_task
@ -637,6 +639,27 @@ def do_add_task(params):
logging.error(str(params) + ' do_add_task添加路口分配信息失败!')
res = make_common_res(-1, '路口分配信息保存失败,请反馈该信息至管理员')
return json.dumps(res)
elif task_type_class == 2:
link_cross_monitor_task_no = check_param(params, 'link_cross_monior_task_no')
if not link_cross_monitor_task_no:
link_cross_monitor_task_no = 0
link_cross_monitor_task_executor = check_param(params, 'link_cross_monitor_task_executor')
if not link_cross_monitor_task_executor:
link_cross_monitor_task_executor = ''
link_cross_monitor_task_recordid = check_param(params, 'link_cross_monitor_task_recordid')
if not link_cross_monitor_task_recordid:
link_cross_monitor_task_recordid = 0
task_stage = '1000000'
if full_review == 1:
task_stage = '1003330'
bound_crosses = db_tmnet.query_area_routing_focus_cross_num(nodeid, area_id)
monitor_crosses = [row['crossid'] for row in bound_crosses if row['is_routing_inspection'] == 1]
open_cross_monitor = 0 if crossids not in monitor_crosses else 1
insert_optimize_cross_additional_info_ret = db_task.save_optimize_task_additional_info(taskno, task_stage, crossids, link_cross_monitor_task_no, link_cross_monitor_task_executor, link_cross_monitor_task_recordid, open_cross_monitor)
if not insert_optimize_cross_additional_info_ret:
logging.error(str(params) + ' do_add_task添加路口优化信息失败!')
res = make_common_res(-1, '路口优化信息保存失败,请反馈该信息至管理员')
return json.dumps(res)
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
@ -1022,6 +1045,7 @@ def do_query_task_detail(params):
task = db_task.query_task(taskno, nodeid, area_id)
cross_name_info = db_tmnet.query_cross_list_sql(nodeid, area_id)
cross_name_dict = {item['crossid']: item for item in cross_name_info}
task_crosses_info = db_task.query_all_ledger_task_crosses_info(nodeid, area_id)
cross_monitor_info_dict = query_cross_monitor_info(nodeid, area_id)
if task is None:
@ -1051,6 +1075,9 @@ def do_query_task_detail(params):
task_add_base_info['process'] = cross_monitor_info_dict[int(taskno)]['process'] if int(taskno) in cross_monitor_info_dict.keys() else 0
task['task_base_info'] = task_add_base_info
task['cross_monitor_additional_info'] = cross_monitor_additional_info
elif task['task_type_class'] == 2:
task['cross_name'] = cross_name_dict[task['crossids']]['name']
task['optimize_cross_task_additional_info'] = query_cross_optimize_additional_info(taskno, userid, nodeid, area_id)
task.pop('update_time')
res['desc'] = ''
res['data'] = task
@ -2593,13 +2620,15 @@ def query_cross_monitor_record_info_api(params):
if not executor:
return json.dumps(make_common_res(8, '缺少巡查人信息,请刷新后重试'))
has_record = check_param(params, 'has_record')
if not has_record:
has_record = 0
has_record = int(has_record)
day_list, week_list, month_list = db_cross.query_monitor_task_dates(nodeid, area_id)
usable_day_list = sorted(day_list)[-7:]
usable_week_list = sorted(week_list)[-1:]
query_date = check_param(params, 'query_date')
if not query_date:
query_date = str(usable_week_list[0])
query_type = check_param(params, 'query_type')
if not query_type:
query_type = 'week'
cross_ledger_info = db_tmnet.query_cross_ledger_info(crossid, nodeid, area_id)
cross_monitor_record_info = cross_monitor_record_data.copy()
if not cross_ledger_info:
@ -2616,7 +2645,31 @@ def query_cross_monitor_record_info_api(params):
if not cross_ledger_info:
return json.dumps(make_common_res(8, '路口信息异常'))
if has_record != 1:
recordid = check_param(params, 'recordid')
if not recordid:
return json.dumps(make_common_res(10, '缺少记录编号,请刷新后重试'))
record_info = db_task.query_cross_monitor_record_info(recordid)
if not record_info:
return json.dumps(make_common_res(11, '记录信息异常'))
if record_info['record_detail'] and record_info['record_detail'] != '':
cross_monitor_record_info = json.loads(record_info['record_detail'])
cross_monitor_record_info['base_info'] = {
'cross_name': cross_ledger_info[0]['name'],
'crossid': cross_ledger_info[0]['crossid'],
'cross_no': cross_ledger_info[0]['cross_no'],
'division': cross_ledger_info[0]['division'],
'greenwave': cross_ledger_info[0]['wave_cross'],
'executor': executor,
'monitor_date': record_info['monitor_date'],
'special_time_range': record_info['special_time_range'],
'monitor_type': record_info['monitor_type']
}
cross_monitor_record_info['plan_start_date'] = cross_monitor_record_info['plan_start_date'].strftime('%Y-%m-%d') if 'plan_start_date' in cross_monitor_record_info.keys() and cross_monitor_record_info['plan_start_date'] else ''
cross_monitor_record_info['plan_end_date'] = cross_monitor_record_info['plan_end_date'].strftime('%Y-%m-%d') if 'plan_end_date' in cross_monitor_record_info.keys() and cross_monitor_record_info['plan_end_date'] else ''
cross_monitor_record_info['create_time'] = cross_monitor_record_info['create_time'].strftime('%Y-%m-%d %H:%M:%S') if 'create_time' in cross_monitor_record_info.keys() and cross_monitor_record_info['create_time'] else ''
cross_monitor_record_info['update_time'] = cross_monitor_record_info['update_time'].strftime('%Y-%m-%d %H:%M:%S') if 'update_time' in cross_monitor_record_info.keys() and cross_monitor_record_info['update_time'] else ''
else:
cross_roads_dir_dict = gen_crossids_roads_dir_dict_by_mysql([item['crossid'] for item in cross_ledger_info], nodeid)
for row in cross_ledger_info:
row['crossno'] = row['cross_no']
@ -2625,12 +2678,6 @@ def query_cross_monitor_record_info_api(params):
day_list, week_list, month_list = db_cross.query_monitor_task_dates(nodeid, area_id)
usable_day_list = sorted(day_list)[-7:]
usable_week_list = sorted(week_list)[-1:]
query_date = check_param(params, 'query_date')
if not query_date:
query_date = str(usable_week_list[0])
query_type = check_param(params, 'query_type')
if not query_type:
query_type = 'week'
row_list = db_cross.query_monitor_data_sql(nodeid, area_id, query_type, query_date)
if row_list:
cross_report_pb = pb.xl_cross_report_t()
@ -2686,30 +2733,6 @@ def query_cross_monitor_record_info_api(params):
if len(cross_monitor_problem_detail[0]['unreasonable_red']) > 0:
cross_monitor_record_info['phase_info']['unreasonable_all_red']['is_error'] = 1
cross_monitor_record_info['phase_info']['unreasonable_all_red']['big_data_monitor'] = ''.join(cross_monitor_problem_detail[0]['unreasonable_red'])
else:
recordid = check_param(params, 'recordid')
if not recordid:
return json.dumps(make_common_res(10, '缺少记录编号,请刷新后重试'))
record_info = db_task.query_cross_monitor_record_info(recordid)
if not record_info:
return json.dumps(make_common_res(11, '记录信息异常'))
cross_monitor_record_info = json.loads(record_info['record_detail'])
cross_monitor_record_info['base_info'] = {
'cross_name': cross_ledger_info[0]['name'],
'crossid': cross_ledger_info[0]['crossid'],
'cross_no': cross_ledger_info[0]['cross_no'],
'division': cross_ledger_info[0]['division'],
'greenwave': cross_ledger_info[0]['wave_cross'],
'executor': executor,
'monitor_date': record_info['monitor_date'],
'special_time_range': record_info['special_time_range'],
'monitor_type': record_info['monitor_type']
}
cross_monitor_record_info['plan_start_date'] = cross_monitor_record_info['plan_start_date'].strftime('%Y-%m-%d') if 'plan_start_date' in cross_monitor_record_info.keys() and cross_monitor_record_info['plan_start_date'] else ''
cross_monitor_record_info['plan_end_date'] = cross_monitor_record_info['plan_end_date'].strftime('%Y-%m-%d') if 'plan_end_date' in cross_monitor_record_info.keys() and cross_monitor_record_info['plan_end_date'] else ''
cross_monitor_record_info['create_time'] = cross_monitor_record_info['create_time'].strftime('%Y-%m-%d %H:%M:%S') if 'create_time' in cross_monitor_record_info.keys() and cross_monitor_record_info['create_time'] else ''
cross_monitor_record_info['update_time'] = cross_monitor_record_info['update_time'].strftime('%Y-%m-%d %H:%M:%S') if 'update_time' in cross_monitor_record_info.keys() and cross_monitor_record_info['update_time'] else ''
res = make_common_res(0, 'ok')
res['cross_monitor_record_info'] = cross_monitor_record_info
@ -3203,7 +3226,7 @@ def export_cross_monitor_week_report(params):
# 以下内容为路口优化任务相关 20260518
def query_cross_optimize_additional_info(task_no, nodeid, area_id):
def query_cross_optimize_additional_info(task_no, userid, nodeid, area_id):
"""
获取路口优化任务补充信息
:param task_no: 任务编码
@ -3211,31 +3234,103 @@ def query_cross_optimize_additional_info(task_no, nodeid, area_id):
:param area_id: 辖区id
:return: 路口优化任务补充信息
"""
row_list = db_tmnet.query_cross_optimize_task_additional_sql(task_no, nodeid, area_id)
row_list = db_task.query_cross_optimize_task_additional_sql(task_no)
if not row_list:
return None
adjust_plan_infos = db_tmnet.query_adjust_plan_infos(task_no)
adjust_plan_infos = db_task.query_adjust_plan_infos(task_no)
res = {
'task_no': task_no,
'task_stage': row_list[0]['task_stage'],
'crossid': row_list[0]['crossid'],
'stage1_adjust_flag': row_list[0]['stage1_adjust_flag'],
'stage3_issue_time': row_list[0]['stage3_issue_time'],
'check_result_time': row_list[0]['check_result_time'],
'check_result_time': row_list[0]['check_result_time'].strftime("%Y-%m-%d %H:%M:%S") if row_list[0]['check_result_time'] else None,
'stage6_report_upload_time': row_list[0]['stage6_report_upload_time'].strftime("%Y-%m-%d %H:%M:%S") if row_list[0]['stage6_report_upload_time'] else None,
'stage5_all_tp_pass': row_list[0]['stage5_all_tp_pass'],
'stage6_report_url': row_list[0]['stage6_report_url'],
'open_cross_monitor': row_list[0]['open_cross_monitor'],
'link_cross_monitor_task_executor': row_list[0]['link_cross_monitor_task_executor'],
'link_cross_monitor_task_no': row_list[0]['link_cross_monitor_task_no'],
'link_cross_monitor_task_recordid': row_list[0]['link_cross_monitor_task_recordid'],
'phase_stage_infos': []
}
headers = {"Content-Type": "application/json"}
if row_list[0]['task_stage'][1] == '1':
query_cross_phase_detail_params = {
'userid': userid,
'nodeid': nodeid,
'crossid': row_list[0]['crossid'],
'area_id': area_id,
'schedule_infos': {}
}
for item in adjust_plan_infos:
schedule_id = int(item['schedule_id'])
if schedule_id not in query_cross_phase_detail_params['schedule_infos']:
query_cross_phase_detail_params['schedule_infos'][schedule_id] = []
query_cross_phase_detail_params['schedule_infos'][schedule_id].append(item['plan_id'])
logging.info(f"查询路口相位详情参数:{json.dumps(query_cross_phase_detail_params)}")
query_link = 'http://localhost:7070/api/common/cross_phase_detail'
origin_phase_infos = None
try:
response = requests.post(query_link, json=query_cross_phase_detail_params, headers=headers)
logging.info(response.json())
if response.status_code == 200:
response_data = response.json()
if response_data.get('status') == 0:
origin_phase_infos = response_data.get('data')
else:
logging.error(f"查询路口相位详情失败,crossid:{row_list[0]['crossid']},响应:{response_data}")
origin_phase_infos = None
else:
logging.error(f"查询路口相位详情HTTP错误,crossid:{row_list[0]['crossid']},状态码:{response.status_code}")
origin_phase_infos = None
except Exception as e:
logging.error(f"查询路口相位详情异常,crossid:{row_list[0]['crossid']},错误:{str(e)}")
query_phase_update_time_link = f"http://localhost:7070/api/common/phasetable?crossid={row_list[0]['crossid']}&nodeid={nodeid}&area_id={area_id}&userid={userid}"
phase_update_time_res = requests.get(query_phase_update_time_link, headers=headers)
if phase_update_time_res.status_code != 200 or phase_update_time_res.json()['status'] != 0:
phase_update_time = None
else:
phase_update_time_infos = phase_update_time_res.json()['modify']
phase_update_time = '1900-01-01 00:00:00'
for item_key in phase_update_time_infos:
if phase_update_time_infos[item_key] > phase_update_time:
phase_update_time = phase_update_time_infos[item_key]
res['phase_update_time'] = phase_update_time
for item in adjust_plan_infos:
origin_phase_detail = item['origin_phase_detail']
schedule_id = str(item['schedule_id'])
plan_id = item['plan_id']
if row_list[0]['task_stage'][1] == '1' and origin_phase_infos:
plans_info = origin_phase_infos[schedule_id]['plan_infos'] if schedule_id in origin_phase_infos.keys() else []
for plan_info in plans_info:
if plan_info['planid'] == plan_id:
origin_phase_detail = plan_info
break
else:
origin_phase_detail = item['origin_phase_detail']
if origin_phase_detail:
origin_phase_detail = json.loads(origin_phase_detail)
optimize_phase_detail = item['optimize_phase_detail']
if optimize_phase_detail:
optimize_phase_detail = json.loads(optimize_phase_detail)
phase_tiny_adjust_detail = item['phase_tiny_adjust_detail']
if phase_tiny_adjust_detail:
phase_tiny_adjust_detail = json.loads(phase_tiny_adjust_detail)
res['phase_stage_infos'].append({
'plan_id': item['plan_id'],
'record_id': item['id'],
'schedule_id': schedule_id,
'plan_id': plan_id,
'plan_name': item['plan_name'],
'time_range': item['time_range'],
'weekdays': item['weekdays'],
'weekdays_str': item['weekdays_str'],
'problem_info': item['problem_info'],
'origin_phase_detail': item['origin_phase_detail'],
'optimize_phase_detail': item['optimize_phase_detail'],
'origin_phase_detail': origin_phase_detail,
'optimize_phase_detail': optimize_phase_detail,
'check_result_res': item['check_result_res'],
'check_result_common': item['check_result_common'],
'phase_tiny_adjust_detail': item['phase_tiny_adjust_detail']
'phase_tiny_adjust_detail': phase_tiny_adjust_detail
})
return res
@ -3276,18 +3371,162 @@ def update_cross_optimize_additional_info(params):
phase_stage_infos = check_param(params, 'phase_stage_infos')
if not phase_stage_infos:
return json.dumps(make_common_res(10, '缺少调整阶段信息,请刷新后重试'))
stage1_adjust_flag = check_param(params, 'stage1_adjust_flag')
if stage1_adjust_flag is None:
return json.dumps(make_common_res(11, '缺少调整阶段标识,请刷新后重试'))
stage1_values = []
for item in phase_stage_infos:
plan_id = item['plan_id']
plan_name = item['plan_name']
time_range = item['time_range']
weekdays = item['weekdays']
problem_info = item['problem_info']
schedule_id = item.get('schedule_id', None)
plan_id = item.get('plan_id', None)
plan_name = item.get('plan_name', None)
time_range = item.get('time_range', None)
weekdays = item.get('weekdays', None)
weekdays_str = item.get('weekdays_str', None)
problem_info = item.get('problem_info', None)
if not plan_id or not plan_name or not time_range or not weekdays or problem_info is None or not weekdays_str:
stage1_values = []
break
stage1_values.append(
(task_no, plan_id, plan_name, time_range, weekdays, problem_info)
(task_no, schedule_id, plan_id, plan_name, time_range, weekdays, weekdays_str, problem_info)
)
if len(stage1_values) < 1:
return json.dumps(make_common_res(11, '调整阶段信息异常,请刷新后重试'))
ret = db_task.save_cross_optimize_task_stage1_info(stage1_values, task_no, process, stage1_adjust_flag, task_stage)
elif modify_stage == 2:
stage2_info = check_param(params, 'stage2_info')
if not stage2_info or len(stage2_info) < 1:
return json.dumps(make_common_res(12, '缺少调整阶段信息,请刷新后重试'))
stage2_values = []
for item in stage2_info:
origin_phase_detail = item.get('origin_phase_detail', None)
optimize_phase_detail = item.get('optimize_phase_detail', None)
origin_phase_detail_json, optimize_phase_detail_json = '', ''
if origin_phase_detail:
origin_phase_detail_json = json.dumps(origin_phase_detail, ensure_ascii=False)
if optimize_phase_detail:
optimize_phase_detail_json = json.dumps(optimize_phase_detail, ensure_ascii=False)
record_id = item.get('record_id', None)
if not record_id:
return json.dumps(make_common_res(13, '缺少调整阶段id信息请刷新后重试'))
stage2_values.append({
'id': record_id,
'task_no': task_no,
'origin_phase_detail': origin_phase_detail_json,
'optimize_phase_detail': optimize_phase_detail_json,
})
ret = db_task.save_stage2_optimize_info(stage2_values, process, task_no, task_stage)
elif modify_stage == 3:
stage3_issue_time = check_param(params, 'stage3_issue_time')
if not stage3_issue_time:
return json.dumps(make_common_res(14, '缺少下发时间,请刷新后重试'))
ret = db_task.save_stage3_optimize_info(process, task_no, stage3_issue_time, task_stage)
elif modify_stage == 4:
stage4_info = check_param(params, 'stage4_info')
if not stage4_info or len(stage4_info) < 1:
return json.dumps(make_common_res(15, '缺少效果确认信息,请刷新后重试'))
stage4_values = []
for item in stage4_info:
record_id = item.get('record_id', None)
check_result_res = item.get('check_result_res', None)
check_result_common = item.get('check_result_common', None)
if not record_id or check_result_res not in (0, 1):
continue
stage4_values.append({
'id': record_id,
'task_no': task_no,
'check_result_res': check_result_res,
'check_result_common': check_result_common,
})
if len(stage4_values) != len(stage4_info):
return json.dumps(make_common_res(16, '效果确认信息异常,请刷新后重试'))
check_result_time = check_param(params, 'check_result_time')
ret = db_task.save_stage4_optimize_info(process, task_no, stage4_info, task_stage, check_result_time)
elif modify_stage == 5:
stage5_info = check_param(params, 'stage5_info')
if not stage5_info or len(stage5_info) < 1:
return json.dumps(make_common_res(17, '缺少调整结果信息,请刷新后重试'))
check_result_time = check_param(params, 'check_result_time')
if not check_result_time:
return json.dumps(make_common_res(17, '缺少效果确认时间,请刷新后重试'))
stage5_all_tp_pass = check_param(params, 'stage5_all_tp_pass')
if stage5_all_tp_pass is None:
return json.dumps(make_common_res(18, '缺少是否全通过信息,请刷新后重试'))
stage5_values = []
for item in stage5_info:
record_id = item.get('record_id', None)
phase_tiny_adjust_detail = item.get('phase_tiny_adjust_detail', None)
if not record_id or not phase_tiny_adjust_detail:
continue
stage5_values.append({
'id': record_id,
'task_no': task_no,
'phase_tiny_adjust_detail_json': json.dumps(phase_tiny_adjust_detail, ensure_ascii=False),
})
if len(stage5_values) != len(stage5_info):
return json.dumps(make_common_res(18, '微调信息异常,请刷新后重试'))
ret = db_task.save_stage5_optimize_info(process, task_no, task_stage, stage5_values, check_result_time, stage5_all_tp_pass)
elif modify_stage == 6:
stage6_report_url = check_param(params, 'stage6_report_url')
if not stage6_report_url:
return json.dumps(make_common_res(19, '缺少报告地址,请刷新后重试'))
stage6_report_upload_time = check_param(params, 'stage6_report_upload_time')
if not stage6_report_upload_time:
return json.dumps(make_common_res(19, '缺少报告上传时间,请刷新后重试'))
ret = db_task.save_stage6_optimize_info(process, task_no, task_stage, stage6_report_url, stage6_report_upload_time)
elif modify_stage == 7:
open_cross_monitor = check_param(params, 'open_cross_monitor')
if open_cross_monitor is None:
return json.dumps(make_common_res(19, '缺少是否开启路口信息,请刷新后重试'))
open_cross_monitor = int(open_cross_monitor)
ret = db_task.save_stage7_optimize_info(process, task_no, task_stage, open_cross_monitor)
else:
return json.dumps(make_common_res(20, '修改阶段异常,请刷新后重试'))
if ret:
return json.dumps(make_common_res(0, '修改成功'))
return json.dumps(make_common_res(21, '修改失败,请检查后重试'))
def upload_cross_optimize_task_file(params):
nodeid = check_param({'nodeid': request.form.get('nodeid')}, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param({'area_id': request.form.get('area_id')}, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param({'userid': request.form.get('userid')}, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
task_no = check_param({'task_no': request.form.get('task_no')}, 'task_no')
if not task_no:
return json.dumps(make_common_res(6, '缺少任务编号, 请刷新后重试'))
cos_client = get_client()
folder_manager = CosFolderManager(cos_client, g_cos_bucket)
download_url = ''
for key in request.files.keys():
file_info = request.files[key]
name = file_info.filename
file_stream = file_info.stream
cos_path = f'user/cross_doctor/task_file/{nodeid}/{area_id}/{task_no}'
folder_manager.ensure_folder(cos_path)
cos_key = f'{cos_path}/{name}'
cos_client.put_object(Bucket=g_cos_bucket, Key=cos_key, Body=file_stream)
download_url = f'{g_cos_root}/{cos_key}'
if download_url:
now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
res = make_common_res(0, 'ok')
res['data'] = {
'download_url': download_url,
'upload_time': now_time,
}
return json.dumps(res, ensure_ascii=False)
return json.dumps(make_common_res(10, '上传失败,请检查后重试'))
pass
pass

View File

@ -10,6 +10,7 @@ from flask_cors import CORS
from app.models import *
from app.task_worker import *
from app.cross_eva_views import *
from app.cross_compare_report import query_cross_optimize_task_list
#app = Flask(__name__)
#CORS(app, resources={r"/api/*": {"origins": "*"}})
@ -272,6 +273,21 @@ def export_week_report():
return export_cross_monitor_week_report(dict(request.args))
@app.route('/api/update_cross_optimize_additional', methods=['POST'])
def update_cross_optimize_additional():
return update_cross_optimize_additional_info(request.get_json())
@app.route('/api/upload_cross_optimize_file', methods=['POST'])
def upload_cross_optimize_file():
return upload_cross_optimize_task_file(dict(request.form))
@app.route('/api/cross_optimize_task_list', methods=['GET'])
def cross_optimize_task_list_api():
return query_cross_optimize_task_list(dict(request.args))
# if __name__ == '__main__':
# init()
# app.run(debug=True)