From da8be17f13656d9be5297bd04a48e897a3ccf1f6 Mon Sep 17 00:00:00 2001 From: wangxu <1318272526@qq.com> Date: Sun, 24 May 2026 15:37:49 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4=E6=9C=AC=E5=9C=B0=E4=BB=A3?= =?UTF-8?q?=E7=A0=81=EF=BC=8C=E6=9C=AA=E5=AE=8C=E6=88=90=E8=B7=AF=E5=8F=A3?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=BB=E5=8A=A1=E7=9B=B8=E5=85=B3=E5=86=85?= =?UTF-8?q?=E5=AE=B9=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/task_db_func.py | 82 +++++++++++++++++++++++++++++++++++++++++++++ app/task_worker.py | 80 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 161 insertions(+), 1 deletion(-) diff --git a/app/task_db_func.py b/app/task_db_func.py index 5eb16fc..f895a05 100644 --- a/app/task_db_func.py +++ b/app/task_db_func.py @@ -894,6 +894,88 @@ class TaskDbHelper(TableDbHelperBase): select * from task.cross_monitor_task_schedule_detail where id = %s """ % recordid return self.do_select(sql) + + def query_cross_optimize_task_additional_sql(self, task_no, nodeid, area_id): + sql = """ + select * from task.cross_optimize_task_additional_detail where nodeid = %s and area_id = %s and nodeid = %s + """ % (nodeid, area_id, task_no) + return self.do_select(sql) + + def query_adjust_plan_infos(self, task_no): + sql = """ + select * from task.cross_optimize_task_phase_info where task_no = %s + """ % task_no + return self.do_select(sql) + + def save_cross_optimize_task_stage1_info(self, stage1_values, task_no, process): + conn, cursor = self.connect() + other_table_name = "your_other_table_name_here" # 替换为您实际的表名 + temp_table_name = "temp_inserted_ids_" + str(int(time.time() * 1000000)) # 生成一个唯一的临时表名 + other_insert_sql = f""" + INSERT INTO {other_table_name} (cross_optimize_task_phase_info_id, task_no, process_info) VALUES (%s, %s, %s) + """ + + try: + # 1. 创建临时表来存储ID + create_temp_sql = f""" + CREATE TEMPORARY TABLE {temp_table_name} ( + id INT AUTO_INCREMENT PRIMARY KEY, + inserted_id INT NOT NULL + ) + """ + cursor.execute(create_temp_sql) + + # 2. 批量执行插入操作到原表 + insert_sql = """ + INSERT INTO task.cross_optimize_task_phase_info + (task_no, plan_id, plan_name, time_range, weekdays, problem_info) + VALUES (%s, %s, %s, %s, %s, %s) + """ + cursor.executemany(insert_sql, stage1_values) + num_inserted = cursor.rowcount + print(f"Successfully inserted {num_inserted} records into main table.") + # 3. 获取本次插入的第一行ID (注意:lastrowid 是正确的属性名) + first_inserted_id = cursor.lastrowid + if first_inserted_id is None or num_inserted == 0: + # 如果没有插入任何行,lastrowid 可能是 None + # 如果插入了行,但某种原因没拿到 ID,也应该报错 + if num_inserted > 0: + logging.error("Rows were inserted ({num_inserted}), but could not retrieve the starting ID (lastrowid={first_inserted_id}).") + return False + else: + logging.error("No rows were inserted, cannot determine IDs.") + return False + # 4. 将计算出的ID范围插入到临时表 + if num_inserted > 0: + # 假设 ID 是连续递增的,从 first_inserted_id 开始 + ids_to_insert = [(first_inserted_id + i,) for i in range(num_inserted)] + insert_temp_sql = f"INSERT INTO {temp_table_name} (inserted_id) VALUES (%s)" + cursor.executemany(insert_temp_sql, ids_to_insert) + logging.info(f"Inserted {num_inserted} IDs into temp table using executemany.") + # 5. 从临时表查询ID + select_from_temp_sql = f"SELECT inserted_id FROM {temp_table_name}" + cursor.execute(select_from_temp_sql) + inserted_ids = [row[0] for row in cursor.fetchall()] + logging.info(f"Retrieved IDs from temp table: {inserted_ids}") + if len(inserted_ids) != num_inserted: + logging.error(f"Expected {num_inserted} IDs in temp table, but found {len(inserted_ids)}.") + return False + # 6. 准备并批量插入到另一张表 + other_values_to_insert = [(inserted_id, task_no, process) for inserted_id in inserted_ids] + cursor.executemany(other_insert_sql, other_values_to_insert) + logging.info(f"Successfully inserted {len(other_values_to_insert)} records into {other_table_name}.") + + # 7. 提交事务 + conn.commit() + logging.info("Transaction committed successfully. All records saved.") + + return inserted_ids + except Exception as e: + logging.error(f"An error occurred: {e}") + conn.rollback() + return False + + # # if __name__ == '__main__': # tt_5min = get_latest_5min_timestamp() diff --git a/app/task_worker.py b/app/task_worker.py index 145a074..65d7303 100644 --- a/app/task_worker.py +++ b/app/task_worker.py @@ -3211,5 +3211,83 @@ def query_cross_optimize_additional_info(task_no, nodeid, area_id): :param area_id: 辖区id :return: 路口优化任务补充信息 """ + row_list = db_tmnet.query_cross_optimize_task_additional_sql(task_no, nodeid, area_id) + if not row_list: + return None + adjust_plan_infos = db_tmnet.query_adjust_plan_infos(task_no) + res = { + 'task_no': task_no, + 'task_stage': row_list[0]['task_stage'], + 'crossid': row_list[0]['crossid'], + 'stage3_issue_time': row_list[0]['stage3_issue_time'], + 'check_result_time': row_list[0]['check_result_time'], + 'stage6_report_url': row_list[0]['stage6_report_url'], + 'phase_stage_infos': [] + } + for item in adjust_plan_infos: + res['phase_stage_infos'].append({ + 'plan_id': item['plan_id'], + 'plan_name': item['plan_name'], + 'time_range': item['time_range'], + 'weekdays': item['weekdays'], + 'problem_info': item['problem_info'], + 'origin_phase_detail': item['origin_phase_detail'], + 'optimize_phase_detail': item['optimize_phase_detail'], + 'check_result_res': item['check_result_res'], + 'check_result_common': item['check_result_common'], + 'phase_tiny_adjust_detail': item['phase_tiny_adjust_detail'] + }) + return res - pass + +def update_cross_optimize_additional_info(params): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_areaid_list(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = map(int, area_list) + if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: + return json.dumps(make_common_res(5, 'area_id异常,请检查后重试')) + task_no = check_param(params, 'task_no') + if not task_no: + return json.dumps(make_common_res(6, '缺少任务编号,请刷新后重试')) + crossid = check_param(params, 'crossid') + if not crossid: + return json.dumps(make_common_res(7, '缺少路口编号,请刷新后重试')) + task_stage = check_param(params, 'task_stage') + if not task_stage: + return json.dumps(make_common_res(8, '缺少任务阶段,请刷新后重试')) + modify_stage = check_param(params, 'modify_stage') + if not modify_stage: + return json.dumps(make_common_res(9, '缺少修改阶段,请刷新后重试')) + modify_stage = int(modify_stage) + process = check_param(params, 'process') + if not process: + return json.dumps(make_common_res(10, '缺少处理进度,请刷新后重试')) + if modify_stage == 1: + phase_stage_infos = check_param(params, 'phase_stage_infos') + if not phase_stage_infos: + return json.dumps(make_common_res(10, '缺少调整阶段信息,请刷新后重试')) + stage1_values = [] + for item in phase_stage_infos: + plan_id = item['plan_id'] + plan_name = item['plan_name'] + time_range = item['time_range'] + weekdays = item['weekdays'] + problem_info = item['problem_info'] + stage1_values.append( + (task_no, plan_id, plan_name, time_range, weekdays, problem_info) + ) + if len(stage1_values) < 1: + return json.dumps(make_common_res(11, '调整阶段信息异常,请刷新后重试')) + + pass + pass \ No newline at end of file