diff --git a/app/cross_compare_report.py b/app/cross_compare_report.py index 016cf04..c4ca29f 100644 --- a/app/cross_compare_report.py +++ b/app/cross_compare_report.py @@ -68,7 +68,14 @@ def gen_cross_compare_report(params): cut_images = check_param(params, 'cut_images') if not cut_images: cut_images = 0 - date_list4screen = transition_date_list4screen(date_list) + origin_phase_info = check_param(params, 'origin_phase_info') + if not origin_phase_info: + origin_phase_info = None + final_phase_info = check_param(params, 'final_phase_info') + if not final_phase_info: + final_phase_info = None + + date_list4screen = transition_date_list4screen(compare_date_list) all_date_set = set() for item_list in date_list: for item in item_list: @@ -157,8 +164,8 @@ def gen_cross_compare_report(params): part1_data = gen_compare_report_part1_data(data_range, compare_date_range, time_range, cross_static_info['name'], final_overview) if 'part1' in parts: doc1.tpl_paragraph['part1']['visible'] = 1 - doc1.tpl_data['part1_1'].item1 = part1_data['data_range'] - doc1.tpl_data['part1_1'].item2 = part1_data['compare_date_range'] + doc1.tpl_data['part1_1'].item2 = part1_data['data_range'] + doc1.tpl_data['part1_1'].item1 = part1_data['compare_date_range'] doc1.tpl_data['part1_1'].item3 = part1_data['cross_name'] doc1.tpl_data['part1_1'].item4 = part1_data['time_range'] for item in part1_data['data_list']: @@ -189,7 +196,7 @@ def gen_cross_compare_report(params): doc1.tpl_data['part2_3'].table = [detail1] if 'part3' in parts: doc1.tpl_paragraph['part3']['visible'] = 1 - part3_data = gen_compare_report_part3_data(crossid, nodeid, area_id, time_range, tp_start, date_list, avg_cross_delay_info, roads_dir_dict, inroad_static_info_dict, cross_phase, is_peak, cross_ledger_info, weekdays) + part3_data = gen_compare_report_part3_data(crossid, nodeid, area_id, time_range, tp_start, date_list, avg_comp_cross_delay_info, roads_dir_dict, inroad_static_info_dict, cross_phase, is_peak, cross_ledger_info, weekdays) for problem_key in part3_data.keys(): class_part = PartDetail() class_part.item1 = part3_data[problem_key]['class_name'] @@ -284,7 +291,14 @@ def gen_compare_report_part1_data(data_range, compare_date_range, time_range, cr final_overview[compare_key] = int(final_overview[compare_key].replace('%', '')) if final_overview[compare_key] and final_overview[key] and final_overview[key] != '-' and final_overview[compare_key] != '-' and final_overview[compare_key] > final_overview[key]: rate = round((final_overview[compare_key] - final_overview[key]) / final_overview[compare_key] * 100, 2) if final_overview[compare_key] > 0 else 0 - compare_res_str = f"{index_dict[key]}由{final_overview[compare_key]}下降为{final_overview[key]}, 减少{round(final_overview[compare_key] - final_overview[key], 2)},优化率为{rate}%;" + if key == 'stop_times': + compare_res_str = f"{index_dict[key]}由{final_overview[compare_key]}次下降为{final_overview[key]}次, 减少{round(final_overview[compare_key] - final_overview[key], 2)}次,优化率为{rate}%;" + elif key == 'high_park_percent': + compare_res_str = f"{index_dict[key]}由{final_overview[compare_key]}%下降为{final_overview[key]}%, 减少{round(final_overview[compare_key] - final_overview[key], 2)}%,优化率为{rate}%;" + elif key == 'delay_time': + compare_res_str = f"{index_dict[key]}由{final_overview[compare_key]}s下降为{final_overview[key]}s, 减少{round(final_overview[compare_key] - final_overview[key], 2)}s,优化率为{rate}%;" + else: + compare_res_str = f"{index_dict[key]}由{final_overview[compare_key]}提升为{final_overview[key]}, 提升{round(final_overview[key] - final_overview[compare_key], 2)},优化率为{rate}%;" part1_data['data_list'].append(compare_res_str) else: if final_overview[compare_key] and final_overview[key] and final_overview[key] != '-' and final_overview[compare_key] != '-' and final_overview[compare_key] < final_overview[key]: @@ -323,6 +337,8 @@ def gen_compare_report_part2_2_data(road_flow_delay_infos, roads_dir_dict): if err_dir_list: total_str = ','.join(err_dir_list) total_str += '。' + if total_str == '': + total_str = '根据各进口道车辆延误时间分析,没有服务水平较低的进口道。' part2_2_data['detail'] = total_str return part2_2_data @@ -476,7 +492,12 @@ def gen_compare_report_part5_data(part1_data, compared_inroad_delay_infos): compare_data[key] = int(compare_data[key].replace('%', '')) if compare_data[key] and item_data[key] and compare_data[key] != '-' and item_data[key] != '-' and compare_data[key] > item_data[key]: rate = round((compare_data[key] - item_data[key]) / compare_data[key] * 100, 2) if compare_data[key] > 0 else 0 - compare_res_str = f"{index_dict[key]}由{compare_data[key]}下降为{item_data[key]}, 减少{diff_data[key]},优化率为{rate}%;" + if key == 'stop_times': + compare_res_str = f"{index_dict[key]}由{compare_data[key]}次下降为{item_data[key]}次, 减少{diff_data[key]}次,优化率为{rate}%;" + elif key == 'high_park_percent': + compare_res_str = f"{index_dict[key]}由{compare_data[key]}%下降为{item_data[key]}%, 减少{diff_data[key]}%,优化率为{rate}%;" + else: + compare_res_str = f"{index_dict[key]}由{compare_data[key]}s下降为{item_data[key]}s, 减少{diff_data[key]}s,优化率为{rate}%;" src_dir_data.append(compare_res_str) else: if compare_data[key] and item_data[key] and compare_data[key] != '-' and item_data[key] != '-' and compare_data[key] < item_data[key]: @@ -624,4 +645,44 @@ def transition_date_list4screen(date_list): tmp_list[i] = [datetime.strptime(tmp_list[i][0], '%Y%m%d').strftime('%Y-%m-%d'), datetime.strptime(tmp_list[i][1], '%Y%m%d').strftime('%Y-%m-%d')] elif len(tmp_list[i]) > 2: tmp_list[i] = [datetime.strptime(tmp_list[i][0], '%Y%m%d').strftime('%Y-%m-%d'), datetime.strptime(tmp_list[i][-1], '%Y%m%d').strftime('%Y-%m-%d')] - return tmp_list \ No newline at end of file + return tmp_list + + +def query_cross_optimize_task_list(params): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_areaid_list(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = map(int, area_list) + if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: + return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) + crossid = check_param(params, 'crossid') + if not crossid: + return json.dumps(make_common_res(6, '缺少crossid, 请刷新后重试')) + + records = db_task.query_cross_optimize_task_records(nodeid, area_id, crossid) + for item in records: + origin_phase_detail = item['origin_phase_detail'] + if origin_phase_detail: + origin_phase_detail = json.loads(origin_phase_detail) + optimize_phase_detail = item['optimize_phase_detail'] + if optimize_phase_detail: + optimize_phase_detail = json.loads(optimize_phase_detail) + phase_tiny_adjust_detail = item['phase_tiny_adjust_detail'] + if phase_tiny_adjust_detail: + phase_tiny_adjust_detail = json.loads(phase_tiny_adjust_detail) + item['origin_phase_detail'] = origin_phase_detail + item['optimize_phase_detail'] = optimize_phase_detail + item['phase_tiny_adjust_detail'] = phase_tiny_adjust_detail + item['check_result_time'] = item['check_result_time'].strftime('%Y-%m-%d %H:%M:%S') if item['check_result_time'] else '' + res = make_common_res(0, 'ok') + res['data'] = records + return json.dumps(res, ensure_ascii=False) diff --git a/app/eva_common.py b/app/eva_common.py index 4ec58d3..1c89b77 100644 --- a/app/eva_common.py +++ b/app/eva_common.py @@ -739,16 +739,99 @@ def calc_inroad_imbalance_index(inroad_delay_pb_list): return tmp_list + +def _classify_angles_to_turn_types(angles, is_merge=False): + if not angles: + return set() + + default_straight = 30 + default_uturn = 150 + + abs_angles = sorted(set(abs(a) for a in angles)) + straight_candidates = sorted(set(abs_angles + [default_straight])) + uturn_candidates = sorted(set(abs_angles + [default_uturn])) + + best_types = None + best_score = (-1, float('inf'), float('inf')) + + for s_thr in straight_candidates: + for u_thr in uturn_candidates: + if s_thr >= u_thr: + continue + types = set() + for a in angles: + if abs(a) <= s_thr: + types.add(0) + elif abs(a) >= u_thr: + types.add(3) + elif a > 0: + types.add(1 if is_merge else 2) + else: + types.add(2 if is_merge else 1) + score = (len(types), -abs(s_thr - default_straight), -abs(u_thr - default_uturn)) + if score > best_score: + best_score = score + best_types = types + + return best_types + + def gen_flow_turn_rate_index(avg_cross_delay_info, roads_dir_dict): road_delay_infos = avg_cross_delay_info.inroad_delay_infos outroad_infos = avg_cross_delay_info.outroad_infos road_delay_dict = {item.inroadid: item for item in road_delay_infos} outroad_info_dict = {item.outroadid: item for item in outroad_infos} - cross_sum_car_num = sum([item.delay_info.car_num for item in road_delay_infos]) - cross_out_sum_car_num = sum([item.turn_info.car_num for item in outroad_infos]) road_flow_turn_rate = {} inroadid_list = [roads_dir_dict[k]['in'] for k in roads_dir_dict.keys()] outroadid_list = [roads_dir_dict[k]['out'] for k in roads_dir_dict.keys()] + cross_sum_car_num = sum([item.delay_info.car_num for item in road_delay_infos if item.inroadid in inroadid_list]) + cross_out_sum_car_num = sum([item.turn_info.car_num for item in outroad_infos if item.outroadid in outroadid_list]) + + # ---- 预处理:为每个进口道预计算 split_turns_set ---- + split_turns_precomputed = {} + for dir_key in roads_dir_dict: + roadid = roads_dir_dict[dir_key]['in'] + if 'udr_' in roadid or roadid == '-' or roadid not in road_delay_dict: + continue + inroad_info = g_roadnet.query_road(roadid) + if not inroad_info: + continue + angles = [] + for outroadid in outroadid_list: + if not outroadid or outroadid == '-': + continue + outroad = g_roadnet.query_road(outroadid) + if not outroad: + continue + angle = g_roadnet.calc_road_turn_angle_without_abs_split(inroad_info, outroad) + angles.append(angle) + turn_types = _classify_angles_to_turn_types(angles, is_merge=False) + if 0 not in turn_types: + logging.warning(f'[DEBUG] crossid={avg_cross_delay_info.crossid} inroad={roadid} dir={dir_key} ' + f'angles={[round(a, 1) for a in angles]} -> turn_types={turn_types} (NO STRAIGHT!)') + split_turns_precomputed[roadid] = turn_types + + # ---- 预处理:为所有出口道预计算 merge_turns_set ---- + merge_turns_precomputed = {} + valid_inroad_infos = [] + for inroadid in inroadid_list: + if not inroadid or inroadid == '-': + continue + inroad = g_roadnet.query_road(inroadid) + if inroad: + valid_inroad_infos.append(inroad) + for out_road_id in set(outroadid_list): + if 'udr_' in out_road_id or not out_road_id or out_road_id == '-': + continue + out_road_info = g_roadnet.query_road(out_road_id) + if not out_road_info: + continue + angles = [] + for inroad in valid_inroad_infos: + angle = g_roadnet.calc_road_turn_angle_without_abs_merge(out_road_info, inroad) + angles.append(angle) + merge_turns_precomputed[out_road_id] = _classify_angles_to_turn_types(angles, is_merge=True) + for dir in roads_dir_dict.keys(): roadid = roads_dir_dict[dir]['in'] if 'udr_' in roadid: @@ -759,21 +842,7 @@ def gen_flow_turn_rate_index(avg_cross_delay_info, roads_dir_dict): l_num, s_num, r_num = 0, 0, 0 out_l_num, out_s_num, out_r_num = 0, 0, 0 if roadid != '-' and roadid in road_delay_dict.keys(): - inroad_info = g_roadnet.query_road(roadid) - split_turns_set = set() - for outroadid in outroadid_list: - outroad = g_roadnet.query_road(outroadid) - if not outroad: - continue - angle = g_roadnet.calc_road_turn_angle_without_abs_split(inroad_info, outroad) - if abs(angle) <= 30: - split_turns_set.add(0) - elif abs(angle) >= 150: - split_turns_set.add(3) - elif angle > 0: - split_turns_set.add(2) - else: - split_turns_set.add(1) + split_turns_set = split_turns_precomputed.get(roadid, set()) car_num = road_delay_dict[roadid].delay_info.car_num in_flow_rate = round(car_num / cross_sum_car_num * 100, 1) if cross_sum_car_num != 0 else 0 l_rate = max(1, round((road_delay_dict[roadid].delay_info.turn_ratio_1 + road_delay_dict[roadid].delay_info.turn_ratio_3)) / car_num * 100) if car_num != 0 else 0 @@ -795,21 +864,7 @@ def gen_flow_turn_rate_index(avg_cross_delay_info, roads_dir_dict): l_rate, s_rate, r_rate = rate_list[0] if rate_list[0] == '-' else rate_list[0], rate_list[1] if rate_list[1] == '-' else rate_list[1], rate_list[2] if rate_list[2] == '-' else rate_list[2] out_road_id = roads_dir_dict[dir]['out'] if out_road_id != '-' and out_road_id in outroad_info_dict.keys(): - out_road_info = g_roadnet.query_road(out_road_id) - merge_turns_set = set() - for inroadid in inroadid_list: - inroad = g_roadnet.query_road(inroadid) - if not inroad: - continue - angle = g_roadnet.calc_road_turn_angle_without_abs_merge(out_road_info, inroad) - if abs(angle) <= 30: - merge_turns_set.add(0) - elif abs(angle) >= 150: - merge_turns_set.add(3) - elif angle > 0: - merge_turns_set.add(1) - else: - merge_turns_set.add(2) + merge_turns_set = merge_turns_precomputed.get(out_road_id, set()) out_car_num = outroad_info_dict[out_road_id].turn_info.car_num out_flow_rate = round(out_car_num / cross_out_sum_car_num * 100, 1) if cross_out_sum_car_num != 0 else 0 # out_l_rate = max(1, round(outroad_info_dict[out_road_id].turn_info.turn_ratio_1 / out_car_num * 100)) if out_car_num != 0 else 0 diff --git a/app/task_db_func.py b/app/task_db_func.py index f895a05..6f3fd49 100644 --- a/app/task_db_func.py +++ b/app/task_db_func.py @@ -895,10 +895,10 @@ class TaskDbHelper(TableDbHelperBase): """ % recordid return self.do_select(sql) - def query_cross_optimize_task_additional_sql(self, task_no, nodeid, area_id): + def query_cross_optimize_task_additional_sql(self, task_no): sql = """ - select * from task.cross_optimize_task_additional_detail where nodeid = %s and area_id = %s and nodeid = %s - """ % (nodeid, area_id, task_no) + select * from task.cross_optimize_task_additional_detail where task_no = %s + """ % (task_no) return self.do_select(sql) def query_adjust_plan_infos(self, task_no): @@ -907,15 +907,18 @@ class TaskDbHelper(TableDbHelperBase): """ % task_no return self.do_select(sql) - def save_cross_optimize_task_stage1_info(self, stage1_values, task_no, process): + def save_cross_optimize_task_stage1_info(self, stage1_values, task_no, process, stage1_adjust_flag, task_stage): conn, cursor = self.connect() - other_table_name = "your_other_table_name_here" # 替换为您实际的表名 + other_table_name = "task.cross_optimize_task_additional_detail" # 替换为您实际的表名 temp_table_name = "temp_inserted_ids_" + str(int(time.time() * 1000000)) # 生成一个唯一的临时表名 - other_insert_sql = f""" - INSERT INTO {other_table_name} (cross_optimize_task_phase_info_id, task_no, process_info) VALUES (%s, %s, %s) + other_update_sql = f""" + update {other_table_name} set stage1_info_ids = %s, stage1_adjust_flag = %s, task_stage = %s where task_no = %s + """ + update_task_process_sql = """ + update task.task set progress = %s where taskno = %s """ - try: + conn.autocommit = False # 1. 创建临时表来存储ID create_temp_sql = f""" CREATE TEMPORARY TABLE {temp_table_name} ( @@ -928,8 +931,8 @@ class TaskDbHelper(TableDbHelperBase): # 2. 批量执行插入操作到原表 insert_sql = """ INSERT INTO task.cross_optimize_task_phase_info - (task_no, plan_id, plan_name, time_range, weekdays, problem_info) - VALUES (%s, %s, %s, %s, %s, %s) + (task_no, schedule_id, plan_id, plan_name, time_range, weekdays, weekdays_str, problem_info) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """ cursor.executemany(insert_sql, stage1_values) num_inserted = cursor.rowcount @@ -937,45 +940,261 @@ class TaskDbHelper(TableDbHelperBase): # 3. 获取本次插入的第一行ID (注意:lastrowid 是正确的属性名) first_inserted_id = cursor.lastrowid if first_inserted_id is None or num_inserted == 0: - # 如果没有插入任何行,lastrowid 可能是 None - # 如果插入了行,但某种原因没拿到 ID,也应该报错 if num_inserted > 0: - logging.error("Rows were inserted ({num_inserted}), but could not retrieve the starting ID (lastrowid={first_inserted_id}).") + logging.error(f"Rows inserted ({num_inserted}), but lastrowid={first_inserted_id}") return False else: - logging.error("No rows were inserted, cannot determine IDs.") + logging.error("No rows inserted") return False - # 4. 将计算出的ID范围插入到临时表 + + # 4. 【修复】直接内存生成 ID,去掉临时表 if num_inserted > 0: - # 假设 ID 是连续递增的,从 first_inserted_id 开始 - ids_to_insert = [(first_inserted_id + i,) for i in range(num_inserted)] - insert_temp_sql = f"INSERT INTO {temp_table_name} (inserted_id) VALUES (%s)" - cursor.executemany(insert_temp_sql, ids_to_insert) - logging.info(f"Inserted {num_inserted} IDs into temp table using executemany.") - # 5. 从临时表查询ID - select_from_temp_sql = f"SELECT inserted_id FROM {temp_table_name}" - cursor.execute(select_from_temp_sql) - inserted_ids = [row[0] for row in cursor.fetchall()] - logging.info(f"Retrieved IDs from temp table: {inserted_ids}") - if len(inserted_ids) != num_inserted: - logging.error(f"Expected {num_inserted} IDs in temp table, but found {len(inserted_ids)}.") - return False - # 6. 准备并批量插入到另一张表 - other_values_to_insert = [(inserted_id, task_no, process) for inserted_id in inserted_ids] - cursor.executemany(other_insert_sql, other_values_to_insert) - logging.info(f"Successfully inserted {len(other_values_to_insert)} records into {other_table_name}.") + inserted_ids = [str(first_inserted_id + i) for i in range(num_inserted)] + logging.info(f"Generated IDs: {inserted_ids[:3]}... (total: {len(inserted_ids)})") + else: + inserted_ids = [] + # 5. 【修复】无需查询临时表,直接使用 inserted_ids + logging.info(f"Using {len(inserted_ids)} IDs for UPDATE") + + # 6. 执行 other_update_sql + other_update_values = (','.join(inserted_ids), stage1_adjust_flag, task_stage, task_no) + logging.info(f"Executing UPDATE: task_no={task_no}, ids={','.join(inserted_ids)[:50]}...") + cursor.execute(other_update_sql, other_update_values) + logging.info(f"UPDATE rowcount: {cursor.rowcount}") + if cursor.rowcount == 0: + logging.warning(f"UPDATE affected 0 rows for task_no={task_no}") + logging.info(f"Successfully inserted {len(other_update_values)} records into {other_table_name}.") + cursor.execute(update_task_process_sql, (process, task_no)) + logging.info(f"Task process update affected rows: {cursor.rowcount}") # 7. 提交事务 conn.commit() logging.info("Transaction committed successfully. All records saved.") - return inserted_ids + return True except Exception as e: logging.error(f"An error occurred: {e}") conn.rollback() return False + finally: + conn.autocommit = True + cursor.close() + conn.close() + def save_stage2_optimize_info(self, stage2_values, process, task_no, task_stage): + sql = """ + update task.cross_optimize_task_phase_info set origin_phase_detail = %s, optimize_phase_detail = %s where id = %s and task_no = %s + """ + update_process_sql = """ + update task.task set progress = %s where taskno = %s + """ + update_task_stage_sql = """ + update task.cross_optimize_task_additional_detail set task_stage = %s where task_no = %s + """ + conn, cursor = self.connect() + try: + conn.autocommit = False + suc_num = 0 + for item in stage2_values: + ret = cursor.execute(sql, (item['origin_phase_detail'], item['optimize_phase_detail'], item['id'], task_no)) + suc_num += ret + if suc_num == len(stage2_values): + cursor.execute(update_process_sql, (process, task_no)) + cursor.execute(update_task_stage_sql, (task_stage, task_no)) + conn.commit() + return True + else: + logging.error(f"update error! suc_num = {suc_num}, len(stage2_values) = {stage2_values}") + conn.rollback() + return False + except Exception as e: + logging.error(f"An error occurred: {e}") + conn.rollback() + return False + finally: + conn.autocommit = True + cursor.close() + conn.close() + def save_stage3_optimize_info(self, process, task_no, issue_time, task_stage): + sql = """ + update task.cross_optimize_task_additional_detail set stage3_issue_time = %s, task_stage = %s where task_no = %s + """ + update_process_sql = """ + update task.task set progress = %s where taskno = %s + """ + conn, cursor = self.connect() + try: + conn.autocommit = False + ret = cursor.execute(sql, (issue_time, task_stage, task_no)) + if ret == 1: + update_process_ret = cursor.execute(update_process_sql, (process, task_no)) + if update_process_ret == 1: + conn.commit() + return True + else: + logging.error(f"update process error! ret = {ret}, update_process_ret = {update_process_ret}") + conn.rollback() + return False + else: + logging.error(f"update issue_time error! ret = {ret}") + conn.rollback() + return False + except Exception as e: + logging.error(f"An error occurred: {e}") + conn.rollback() + return False + finally: + conn.autocommit = True + cursor.close() + conn.close() + + def save_stage4_optimize_info(self, process, task_no, stage4_info, task_stage, check_result_time): + sql = """ + update task.cross_optimize_task_phase_info set check_result_res = %s, check_result_common = %s where id = %s + """ + update_process_sql = """ + update task.task set progress = %s where taskno = %s + """ + update_task_stage_sql = """ + update task.cross_optimize_task_additional_detail set task_stage = %s, check_result_time = %s where task_no = %s + """ + conn, cursor = self.connect() + try: + conn.autocommit = False + suc_num = 0 + for item in stage4_info: + ret = cursor.execute(sql, (item['check_result_res'], item['check_result_common'], item['record_id'])) + suc_num += ret + if suc_num == len(stage4_info): + cursor.execute(update_process_sql, (process, task_no)) + cursor.execute(update_task_stage_sql, (task_stage, check_result_time, task_no)) + conn.commit() + return True + else: + logging.error(f"update check_result_res error! suc_num = {suc_num}, len(stage4_info) = {stage4_info}") + conn.rollback() + return False + except Exception as e: + logging.error(f"An error occurred: {e}") + conn.rollback() + return False + finally: + conn.autocommit = True + cursor.close() + conn.close() + + def save_stage5_optimize_info(self, process, task_no, task_stage, stage5_info, check_result_time, stage5_all_tp_pass): + sql = """ + update task.cross_optimize_task_phase_info set phase_tiny_adjust_detail = %s where id = %s + """ + update_process_sql = """ + update task.task set progress = %s where taskno = %s + """ + update_task_stage_sql = """ + update task.cross_optimize_task_additional_detail set task_stage = %s, check_result_time = %s, stage5_all_tp_pass = %s where task_no = %s + """ + conn, cursor = self.connect() + try: + conn.autocommit = False + suc_num = 0 + for item in stage5_info: + ret = cursor.execute(sql, (item['phase_tiny_adjust_detail_json'], item['id'])) + suc_num += ret + if suc_num == len(stage5_info): + cursor.execute(update_process_sql, (process, task_no)) + cursor.execute(update_task_stage_sql, (task_stage, check_result_time, stage5_all_tp_pass, task_no)) + conn.commit() + return True + else: + logging.error(f"update phase_tiny_adjust_detail error! suc_num = {suc_num}, len(stage5_info) = {stage5_info}") + conn.rollback() + return False + except Exception as e: + logging.error(f"An error occurred: {e}") + conn.rollback() + return False + finally: + conn.autocommit = True + cursor.close() + conn.close() + + def save_stage7_optimize_info(self, process, task_no, task_stage, open_cross_monitor): + sql = """ + update task.cross_optimize_task_additional_detail set open_cross_monitor = %s where task_no = %s + """ + update_process_sql = """ + update task.task set progress = %s where taskno = %s + """ + update_task_stage_sql = """ + update task.cross_optimize_task_additional_detail set task_stage = %s where task_no = %s + """ + + conn, cursor = self.connect() + try: + conn.autocommit = False + cursor.execute(sql, (open_cross_monitor, task_no)) + cursor.execute(update_process_sql, (process, task_no)) + cursor.execute(update_task_stage_sql, (task_stage, task_no)) + conn.commit() + return True + except Exception as e: + logging.error(f"An error occurred: {e}") + conn.rollback() + return False + finally: + conn.autocommit = True + cursor.close() + conn.close() + + def save_stage6_optimize_info(self, process, task_no, task_stage, download_url, upload_time): + sql = """ + update task.cross_optimize_task_additional_detail set stage6_report_url = %s, stage6_report_upload_time = %s where task_no = %s + """ + update_process_sql = """ + update task.task set progress = %s where taskno = %s + """ + update_task_stage_sql = """ + update task.cross_optimize_task_additional_detail set task_stage = %s where task_no = %s + """ + + conn, cursor = self.connect() + try: + conn.autocommit = False + ret = cursor.execute(sql, (download_url, upload_time, task_no)) + if ret == 1: + cursor.execute(update_process_sql, (process, task_no)) + cursor.execute(update_task_stage_sql, (task_stage, task_no)) + conn.commit() + return True + else: + logging.error(f"update download_url error! ret = {ret}") + conn.rollback() + return False + except Exception as e: + logging.error(f"An error occurred: {e}") + conn.rollback() + return False + finally: + conn.autocommit = True + cursor.close() + conn.close() + + def save_optimize_task_additional_info(self, task_no, task_stage, crossid, link_cross_monitor_task_no, link_cross_monitor_task_executor, link_cross_monitor_task_recordid, open_cross_monitor): + sql = """ + insert into task.cross_optimize_task_additional_detail (task_no, crossid, task_stage, link_cross_monitor_task_no, link_cross_monitor_task_executor, link_cross_monitor_task_recordid, open_cross_monitor) values(%s, '%s', '%s', %s, '%s', %s, %s) + """ % (task_no, crossid, task_stage, link_cross_monitor_task_no, link_cross_monitor_task_executor, link_cross_monitor_task_recordid, open_cross_monitor) + return self.do_execute(sql) + + def query_cross_optimize_task_records(self, nodeid, area_id, crossid): + sql = """ + select t2.task_no, t1.task_name,t1.plan_begin_time, t1.plan_end_time, t1.executor, t2.task_stage,t2.stage1_info_ids, t3.origin_phase_detail, t3.optimize_phase_detail, t3.phase_tiny_adjust_detail, t2.stage3_issue_time, t2.check_result_time from + (select * from task.task where nodeid = %s and area_id = %s and crossids = '%s' and task_type_class = 2 and record_state != 1) t1 inner join + (select * from task.cross_optimize_task_additional_detail where crossid = '%s') t2 inner join + (select * from task.cross_optimize_task_phase_info) t3 + on t1.taskno = t2.task_no and t1.taskno = t3.task_no + """ % (nodeid, area_id, crossid, crossid) + return self.do_select(sql) # # if __name__ == '__main__': # tt_5min = get_latest_5min_timestamp() diff --git a/app/task_worker.py b/app/task_worker.py index 65d7303..f682f58 100644 --- a/app/task_worker.py +++ b/app/task_worker.py @@ -11,6 +11,8 @@ from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.utils import formataddr +import requests + from app.cross_monitor_task_avg_split import cluster_crossings_by_region, assign_to_shuffled_managers from app.eva_common import query_cross_ledger_info, gen_crossids_roads_dir_dict_by_mysql from app.global_source import db_user, db_task @@ -637,6 +639,27 @@ def do_add_task(params): logging.error(str(params) + ' do_add_task添加路口分配信息失败!') res = make_common_res(-1, '路口分配信息保存失败,请反馈该信息至管理员') return json.dumps(res) + elif task_type_class == 2: + link_cross_monitor_task_no = check_param(params, 'link_cross_monior_task_no') + if not link_cross_monitor_task_no: + link_cross_monitor_task_no = 0 + link_cross_monitor_task_executor = check_param(params, 'link_cross_monitor_task_executor') + if not link_cross_monitor_task_executor: + link_cross_monitor_task_executor = '' + link_cross_monitor_task_recordid = check_param(params, 'link_cross_monitor_task_recordid') + if not link_cross_monitor_task_recordid: + link_cross_monitor_task_recordid = 0 + task_stage = '1000000' + if full_review == 1: + task_stage = '1003330' + bound_crosses = db_tmnet.query_area_routing_focus_cross_num(nodeid, area_id) + monitor_crosses = [row['crossid'] for row in bound_crosses if row['is_routing_inspection'] == 1] + open_cross_monitor = 0 if crossids not in monitor_crosses else 1 + insert_optimize_cross_additional_info_ret = db_task.save_optimize_task_additional_info(taskno, task_stage, crossids, link_cross_monitor_task_no, link_cross_monitor_task_executor, link_cross_monitor_task_recordid, open_cross_monitor) + if not insert_optimize_cross_additional_info_ret: + logging.error(str(params) + ' do_add_task添加路口优化信息失败!') + res = make_common_res(-1, '路口优化信息保存失败,请反馈该信息至管理员') + return json.dumps(res) res = make_common_res(0, 'ok') res['nodeid'] = nodeid @@ -1022,6 +1045,7 @@ def do_query_task_detail(params): task = db_task.query_task(taskno, nodeid, area_id) cross_name_info = db_tmnet.query_cross_list_sql(nodeid, area_id) + cross_name_dict = {item['crossid']: item for item in cross_name_info} task_crosses_info = db_task.query_all_ledger_task_crosses_info(nodeid, area_id) cross_monitor_info_dict = query_cross_monitor_info(nodeid, area_id) if task is None: @@ -1051,6 +1075,9 @@ def do_query_task_detail(params): task_add_base_info['process'] = cross_monitor_info_dict[int(taskno)]['process'] if int(taskno) in cross_monitor_info_dict.keys() else 0 task['task_base_info'] = task_add_base_info task['cross_monitor_additional_info'] = cross_monitor_additional_info + elif task['task_type_class'] == 2: + task['cross_name'] = cross_name_dict[task['crossids']]['name'] + task['optimize_cross_task_additional_info'] = query_cross_optimize_additional_info(taskno, userid, nodeid, area_id) task.pop('update_time') res['desc'] = '' res['data'] = task @@ -2593,13 +2620,15 @@ def query_cross_monitor_record_info_api(params): if not executor: return json.dumps(make_common_res(8, '缺少巡查人信息,请刷新后重试')) - has_record = check_param(params, 'has_record') - if not has_record: - has_record = 0 - has_record = int(has_record) day_list, week_list, month_list = db_cross.query_monitor_task_dates(nodeid, area_id) usable_day_list = sorted(day_list)[-7:] usable_week_list = sorted(week_list)[-1:] + query_date = check_param(params, 'query_date') + if not query_date: + query_date = str(usable_week_list[0]) + query_type = check_param(params, 'query_type') + if not query_type: + query_type = 'week' cross_ledger_info = db_tmnet.query_cross_ledger_info(crossid, nodeid, area_id) cross_monitor_record_info = cross_monitor_record_data.copy() if not cross_ledger_info: @@ -2616,7 +2645,31 @@ def query_cross_monitor_record_info_api(params): if not cross_ledger_info: return json.dumps(make_common_res(8, '路口信息异常')) - if has_record != 1: + + recordid = check_param(params, 'recordid') + if not recordid: + return json.dumps(make_common_res(10, '缺少记录编号,请刷新后重试')) + record_info = db_task.query_cross_monitor_record_info(recordid) + if not record_info: + return json.dumps(make_common_res(11, '记录信息异常')) + if record_info['record_detail'] and record_info['record_detail'] != '': + cross_monitor_record_info = json.loads(record_info['record_detail']) + cross_monitor_record_info['base_info'] = { + 'cross_name': cross_ledger_info[0]['name'], + 'crossid': cross_ledger_info[0]['crossid'], + 'cross_no': cross_ledger_info[0]['cross_no'], + 'division': cross_ledger_info[0]['division'], + 'greenwave': cross_ledger_info[0]['wave_cross'], + 'executor': executor, + 'monitor_date': record_info['monitor_date'], + 'special_time_range': record_info['special_time_range'], + 'monitor_type': record_info['monitor_type'] + } + cross_monitor_record_info['plan_start_date'] = cross_monitor_record_info['plan_start_date'].strftime('%Y-%m-%d') if 'plan_start_date' in cross_monitor_record_info.keys() and cross_monitor_record_info['plan_start_date'] else '' + cross_monitor_record_info['plan_end_date'] = cross_monitor_record_info['plan_end_date'].strftime('%Y-%m-%d') if 'plan_end_date' in cross_monitor_record_info.keys() and cross_monitor_record_info['plan_end_date'] else '' + cross_monitor_record_info['create_time'] = cross_monitor_record_info['create_time'].strftime('%Y-%m-%d %H:%M:%S') if 'create_time' in cross_monitor_record_info.keys() and cross_monitor_record_info['create_time'] else '' + cross_monitor_record_info['update_time'] = cross_monitor_record_info['update_time'].strftime('%Y-%m-%d %H:%M:%S') if 'update_time' in cross_monitor_record_info.keys() and cross_monitor_record_info['update_time'] else '' + else: cross_roads_dir_dict = gen_crossids_roads_dir_dict_by_mysql([item['crossid'] for item in cross_ledger_info], nodeid) for row in cross_ledger_info: row['crossno'] = row['cross_no'] @@ -2625,12 +2678,6 @@ def query_cross_monitor_record_info_api(params): day_list, week_list, month_list = db_cross.query_monitor_task_dates(nodeid, area_id) usable_day_list = sorted(day_list)[-7:] usable_week_list = sorted(week_list)[-1:] - query_date = check_param(params, 'query_date') - if not query_date: - query_date = str(usable_week_list[0]) - query_type = check_param(params, 'query_type') - if not query_type: - query_type = 'week' row_list = db_cross.query_monitor_data_sql(nodeid, area_id, query_type, query_date) if row_list: cross_report_pb = pb.xl_cross_report_t() @@ -2686,30 +2733,6 @@ def query_cross_monitor_record_info_api(params): if len(cross_monitor_problem_detail[0]['unreasonable_red']) > 0: cross_monitor_record_info['phase_info']['unreasonable_all_red']['is_error'] = 1 cross_monitor_record_info['phase_info']['unreasonable_all_red']['big_data_monitor'] = '、'.join(cross_monitor_problem_detail[0]['unreasonable_red']) - else: - recordid = check_param(params, 'recordid') - if not recordid: - return json.dumps(make_common_res(10, '缺少记录编号,请刷新后重试')) - record_info = db_task.query_cross_monitor_record_info(recordid) - if not record_info: - return json.dumps(make_common_res(11, '记录信息异常')) - - cross_monitor_record_info = json.loads(record_info['record_detail']) - cross_monitor_record_info['base_info'] = { - 'cross_name': cross_ledger_info[0]['name'], - 'crossid': cross_ledger_info[0]['crossid'], - 'cross_no': cross_ledger_info[0]['cross_no'], - 'division': cross_ledger_info[0]['division'], - 'greenwave': cross_ledger_info[0]['wave_cross'], - 'executor': executor, - 'monitor_date': record_info['monitor_date'], - 'special_time_range': record_info['special_time_range'], - 'monitor_type': record_info['monitor_type'] - } - cross_monitor_record_info['plan_start_date'] = cross_monitor_record_info['plan_start_date'].strftime('%Y-%m-%d') if 'plan_start_date' in cross_monitor_record_info.keys() and cross_monitor_record_info['plan_start_date'] else '' - cross_monitor_record_info['plan_end_date'] = cross_monitor_record_info['plan_end_date'].strftime('%Y-%m-%d') if 'plan_end_date' in cross_monitor_record_info.keys() and cross_monitor_record_info['plan_end_date'] else '' - cross_monitor_record_info['create_time'] = cross_monitor_record_info['create_time'].strftime('%Y-%m-%d %H:%M:%S') if 'create_time' in cross_monitor_record_info.keys() and cross_monitor_record_info['create_time'] else '' - cross_monitor_record_info['update_time'] = cross_monitor_record_info['update_time'].strftime('%Y-%m-%d %H:%M:%S') if 'update_time' in cross_monitor_record_info.keys() and cross_monitor_record_info['update_time'] else '' res = make_common_res(0, 'ok') res['cross_monitor_record_info'] = cross_monitor_record_info @@ -3203,7 +3226,7 @@ def export_cross_monitor_week_report(params): # 以下内容为路口优化任务相关 20260518 -def query_cross_optimize_additional_info(task_no, nodeid, area_id): +def query_cross_optimize_additional_info(task_no, userid, nodeid, area_id): """ 获取路口优化任务补充信息 :param task_no: 任务编码 @@ -3211,31 +3234,103 @@ def query_cross_optimize_additional_info(task_no, nodeid, area_id): :param area_id: 辖区id :return: 路口优化任务补充信息 """ - row_list = db_tmnet.query_cross_optimize_task_additional_sql(task_no, nodeid, area_id) + row_list = db_task.query_cross_optimize_task_additional_sql(task_no) if not row_list: return None - adjust_plan_infos = db_tmnet.query_adjust_plan_infos(task_no) + adjust_plan_infos = db_task.query_adjust_plan_infos(task_no) res = { 'task_no': task_no, 'task_stage': row_list[0]['task_stage'], 'crossid': row_list[0]['crossid'], + 'stage1_adjust_flag': row_list[0]['stage1_adjust_flag'], 'stage3_issue_time': row_list[0]['stage3_issue_time'], - 'check_result_time': row_list[0]['check_result_time'], + 'check_result_time': row_list[0]['check_result_time'].strftime("%Y-%m-%d %H:%M:%S") if row_list[0]['check_result_time'] else None, + 'stage6_report_upload_time': row_list[0]['stage6_report_upload_time'].strftime("%Y-%m-%d %H:%M:%S") if row_list[0]['stage6_report_upload_time'] else None, + 'stage5_all_tp_pass': row_list[0]['stage5_all_tp_pass'], 'stage6_report_url': row_list[0]['stage6_report_url'], + 'open_cross_monitor': row_list[0]['open_cross_monitor'], + 'link_cross_monitor_task_executor': row_list[0]['link_cross_monitor_task_executor'], + 'link_cross_monitor_task_no': row_list[0]['link_cross_monitor_task_no'], + 'link_cross_monitor_task_recordid': row_list[0]['link_cross_monitor_task_recordid'], 'phase_stage_infos': [] } + headers = {"Content-Type": "application/json"} + if row_list[0]['task_stage'][1] == '1': + query_cross_phase_detail_params = { + 'userid': userid, + 'nodeid': nodeid, + 'crossid': row_list[0]['crossid'], + 'area_id': area_id, + 'schedule_infos': {} + } + for item in adjust_plan_infos: + schedule_id = int(item['schedule_id']) + if schedule_id not in query_cross_phase_detail_params['schedule_infos']: + query_cross_phase_detail_params['schedule_infos'][schedule_id] = [] + query_cross_phase_detail_params['schedule_infos'][schedule_id].append(item['plan_id']) + logging.info(f"查询路口相位详情参数:{json.dumps(query_cross_phase_detail_params)}") + query_link = 'http://localhost:7070/api/common/cross_phase_detail' + origin_phase_infos = None + try: + response = requests.post(query_link, json=query_cross_phase_detail_params, headers=headers) + logging.info(response.json()) + if response.status_code == 200: + response_data = response.json() + if response_data.get('status') == 0: + origin_phase_infos = response_data.get('data') + else: + logging.error(f"查询路口相位详情失败,crossid:{row_list[0]['crossid']},响应:{response_data}") + origin_phase_infos = None + else: + logging.error(f"查询路口相位详情HTTP错误,crossid:{row_list[0]['crossid']},状态码:{response.status_code}") + origin_phase_infos = None + except Exception as e: + logging.error(f"查询路口相位详情异常,crossid:{row_list[0]['crossid']},错误:{str(e)}") + query_phase_update_time_link = f"http://localhost:7070/api/common/phasetable?crossid={row_list[0]['crossid']}&nodeid={nodeid}&area_id={area_id}&userid={userid}" + phase_update_time_res = requests.get(query_phase_update_time_link, headers=headers) + if phase_update_time_res.status_code != 200 or phase_update_time_res.json()['status'] != 0: + phase_update_time = None + else: + phase_update_time_infos = phase_update_time_res.json()['modify'] + phase_update_time = '1900-01-01 00:00:00' + for item_key in phase_update_time_infos: + if phase_update_time_infos[item_key] > phase_update_time: + phase_update_time = phase_update_time_infos[item_key] + res['phase_update_time'] = phase_update_time for item in adjust_plan_infos: + origin_phase_detail = item['origin_phase_detail'] + schedule_id = str(item['schedule_id']) + plan_id = item['plan_id'] + if row_list[0]['task_stage'][1] == '1' and origin_phase_infos: + plans_info = origin_phase_infos[schedule_id]['plan_infos'] if schedule_id in origin_phase_infos.keys() else [] + for plan_info in plans_info: + if plan_info['planid'] == plan_id: + origin_phase_detail = plan_info + break + else: + origin_phase_detail = item['origin_phase_detail'] + if origin_phase_detail: + origin_phase_detail = json.loads(origin_phase_detail) + optimize_phase_detail = item['optimize_phase_detail'] + if optimize_phase_detail: + optimize_phase_detail = json.loads(optimize_phase_detail) + phase_tiny_adjust_detail = item['phase_tiny_adjust_detail'] + if phase_tiny_adjust_detail: + phase_tiny_adjust_detail = json.loads(phase_tiny_adjust_detail) res['phase_stage_infos'].append({ - 'plan_id': item['plan_id'], + 'record_id': item['id'], + 'schedule_id': schedule_id, + 'plan_id': plan_id, 'plan_name': item['plan_name'], 'time_range': item['time_range'], 'weekdays': item['weekdays'], + 'weekdays_str': item['weekdays_str'], 'problem_info': item['problem_info'], - 'origin_phase_detail': item['origin_phase_detail'], - 'optimize_phase_detail': item['optimize_phase_detail'], + 'origin_phase_detail': origin_phase_detail, + 'optimize_phase_detail': optimize_phase_detail, 'check_result_res': item['check_result_res'], 'check_result_common': item['check_result_common'], - 'phase_tiny_adjust_detail': item['phase_tiny_adjust_detail'] + 'phase_tiny_adjust_detail': phase_tiny_adjust_detail }) return res @@ -3276,18 +3371,162 @@ def update_cross_optimize_additional_info(params): phase_stage_infos = check_param(params, 'phase_stage_infos') if not phase_stage_infos: return json.dumps(make_common_res(10, '缺少调整阶段信息,请刷新后重试')) + stage1_adjust_flag = check_param(params, 'stage1_adjust_flag') + if stage1_adjust_flag is None: + return json.dumps(make_common_res(11, '缺少调整阶段标识,请刷新后重试')) stage1_values = [] for item in phase_stage_infos: - plan_id = item['plan_id'] - plan_name = item['plan_name'] - time_range = item['time_range'] - weekdays = item['weekdays'] - problem_info = item['problem_info'] + schedule_id = item.get('schedule_id', None) + plan_id = item.get('plan_id', None) + plan_name = item.get('plan_name', None) + time_range = item.get('time_range', None) + weekdays = item.get('weekdays', None) + weekdays_str = item.get('weekdays_str', None) + problem_info = item.get('problem_info', None) + if not plan_id or not plan_name or not time_range or not weekdays or problem_info is None or not weekdays_str: + stage1_values = [] + break stage1_values.append( - (task_no, plan_id, plan_name, time_range, weekdays, problem_info) + (task_no, schedule_id, plan_id, plan_name, time_range, weekdays, weekdays_str, problem_info) ) if len(stage1_values) < 1: return json.dumps(make_common_res(11, '调整阶段信息异常,请刷新后重试')) + ret = db_task.save_cross_optimize_task_stage1_info(stage1_values, task_no, process, stage1_adjust_flag, task_stage) + elif modify_stage == 2: + stage2_info = check_param(params, 'stage2_info') + if not stage2_info or len(stage2_info) < 1: + return json.dumps(make_common_res(12, '缺少调整阶段信息,请刷新后重试')) + stage2_values = [] + for item in stage2_info: + origin_phase_detail = item.get('origin_phase_detail', None) + optimize_phase_detail = item.get('optimize_phase_detail', None) + origin_phase_detail_json, optimize_phase_detail_json = '', '' + if origin_phase_detail: + origin_phase_detail_json = json.dumps(origin_phase_detail, ensure_ascii=False) + if optimize_phase_detail: + optimize_phase_detail_json = json.dumps(optimize_phase_detail, ensure_ascii=False) + record_id = item.get('record_id', None) + if not record_id: + return json.dumps(make_common_res(13, '缺少调整阶段id信息,请刷新后重试')) + stage2_values.append({ + 'id': record_id, + 'task_no': task_no, + 'origin_phase_detail': origin_phase_detail_json, + 'optimize_phase_detail': optimize_phase_detail_json, + }) + + ret = db_task.save_stage2_optimize_info(stage2_values, process, task_no, task_stage) + elif modify_stage == 3: + stage3_issue_time = check_param(params, 'stage3_issue_time') + if not stage3_issue_time: + return json.dumps(make_common_res(14, '缺少下发时间,请刷新后重试')) + ret = db_task.save_stage3_optimize_info(process, task_no, stage3_issue_time, task_stage) + elif modify_stage == 4: + stage4_info = check_param(params, 'stage4_info') + if not stage4_info or len(stage4_info) < 1: + return json.dumps(make_common_res(15, '缺少效果确认信息,请刷新后重试')) + stage4_values = [] + for item in stage4_info: + record_id = item.get('record_id', None) + check_result_res = item.get('check_result_res', None) + check_result_common = item.get('check_result_common', None) + if not record_id or check_result_res not in (0, 1): + continue + stage4_values.append({ + 'id': record_id, + 'task_no': task_no, + 'check_result_res': check_result_res, + 'check_result_common': check_result_common, + }) + if len(stage4_values) != len(stage4_info): + return json.dumps(make_common_res(16, '效果确认信息异常,请刷新后重试')) + check_result_time = check_param(params, 'check_result_time') + ret = db_task.save_stage4_optimize_info(process, task_no, stage4_info, task_stage, check_result_time) + elif modify_stage == 5: + stage5_info = check_param(params, 'stage5_info') + if not stage5_info or len(stage5_info) < 1: + return json.dumps(make_common_res(17, '缺少调整结果信息,请刷新后重试')) + check_result_time = check_param(params, 'check_result_time') + if not check_result_time: + return json.dumps(make_common_res(17, '缺少效果确认时间,请刷新后重试')) + stage5_all_tp_pass = check_param(params, 'stage5_all_tp_pass') + if stage5_all_tp_pass is None: + return json.dumps(make_common_res(18, '缺少是否全通过信息,请刷新后重试')) + stage5_values = [] + for item in stage5_info: + record_id = item.get('record_id', None) + phase_tiny_adjust_detail = item.get('phase_tiny_adjust_detail', None) + if not record_id or not phase_tiny_adjust_detail: + continue + stage5_values.append({ + 'id': record_id, + 'task_no': task_no, + 'phase_tiny_adjust_detail_json': json.dumps(phase_tiny_adjust_detail, ensure_ascii=False), + }) + if len(stage5_values) != len(stage5_info): + return json.dumps(make_common_res(18, '微调信息异常,请刷新后重试')) + ret = db_task.save_stage5_optimize_info(process, task_no, task_stage, stage5_values, check_result_time, stage5_all_tp_pass) + elif modify_stage == 6: + stage6_report_url = check_param(params, 'stage6_report_url') + if not stage6_report_url: + return json.dumps(make_common_res(19, '缺少报告地址,请刷新后重试')) + stage6_report_upload_time = check_param(params, 'stage6_report_upload_time') + if not stage6_report_upload_time: + return json.dumps(make_common_res(19, '缺少报告上传时间,请刷新后重试')) + ret = db_task.save_stage6_optimize_info(process, task_no, task_stage, stage6_report_url, stage6_report_upload_time) + elif modify_stage == 7: + open_cross_monitor = check_param(params, 'open_cross_monitor') + if open_cross_monitor is None: + return json.dumps(make_common_res(19, '缺少是否开启路口信息,请刷新后重试')) + open_cross_monitor = int(open_cross_monitor) + ret = db_task.save_stage7_optimize_info(process, task_no, task_stage, open_cross_monitor) + + else: + return json.dumps(make_common_res(20, '修改阶段异常,请刷新后重试')) + if ret: + return json.dumps(make_common_res(0, '修改成功')) + return json.dumps(make_common_res(21, '修改失败,请检查后重试')) + + +def upload_cross_optimize_task_file(params): + nodeid = check_param({'nodeid': request.form.get('nodeid')}, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param({'area_id': request.form.get('area_id')}, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param({'userid': request.form.get('userid')}, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_areaid_list(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = map(int, area_list) + if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: + return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) + task_no = check_param({'task_no': request.form.get('task_no')}, 'task_no') + if not task_no: + return json.dumps(make_common_res(6, '缺少任务编号, 请刷新后重试')) + + cos_client = get_client() + folder_manager = CosFolderManager(cos_client, g_cos_bucket) + download_url = '' + for key in request.files.keys(): + file_info = request.files[key] + name = file_info.filename + file_stream = file_info.stream + cos_path = f'user/cross_doctor/task_file/{nodeid}/{area_id}/{task_no}' + folder_manager.ensure_folder(cos_path) + cos_key = f'{cos_path}/{name}' + cos_client.put_object(Bucket=g_cos_bucket, Key=cos_key, Body=file_stream) + download_url = f'{g_cos_root}/{cos_key}' + if download_url: + now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + res = make_common_res(0, 'ok') + res['data'] = { + 'download_url': download_url, + 'upload_time': now_time, + } + return json.dumps(res, ensure_ascii=False) + return json.dumps(make_common_res(10, '上传失败,请检查后重试')) - pass - pass \ No newline at end of file diff --git a/app/views_task.py b/app/views_task.py index 99ecce2..b47a4b1 100644 --- a/app/views_task.py +++ b/app/views_task.py @@ -10,6 +10,7 @@ from flask_cors import CORS from app.models import * from app.task_worker import * from app.cross_eva_views import * +from app.cross_compare_report import query_cross_optimize_task_list #app = Flask(__name__) #CORS(app, resources={r"/api/*": {"origins": "*"}}) @@ -272,6 +273,21 @@ def export_week_report(): return export_cross_monitor_week_report(dict(request.args)) +@app.route('/api/update_cross_optimize_additional', methods=['POST']) +def update_cross_optimize_additional(): + return update_cross_optimize_additional_info(request.get_json()) + + +@app.route('/api/upload_cross_optimize_file', methods=['POST']) +def upload_cross_optimize_file(): + return upload_cross_optimize_task_file(dict(request.form)) + + +@app.route('/api/cross_optimize_task_list', methods=['GET']) +def cross_optimize_task_list_api(): + return query_cross_optimize_task_list(dict(request.args)) + + # if __name__ == '__main__': # init() # app.run(debug=True)