From 1622eb59288c3fc8cb130532eeb9a9128f656fe7 Mon Sep 17 00:00:00 2001 From: wangxu <1318272526@qq.com> Date: Thu, 12 Mar 2026 18:01:22 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4=E5=8F=B0=E8=B4=A6=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E5=B7=A5=E4=BD=9C=E6=B5=81=E5=8F=8A=E5=8F=B0=E8=B4=A6?= =?UTF-8?q?=E9=9C=80=E6=B1=82=E8=BF=AD=E4=BB=A3=E5=8A=9F=E8=83=BD=E7=9B=B8?= =?UTF-8?q?=E5=85=B3=E5=B7=B2=E7=BB=8F=E6=B5=8B=E8=AF=95=E7=9A=84=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/task_db_func.py | 124 +++++++++++++++- app/task_worker.py | 341 ++++++++++++++++++++++++++++++++++++++++--- app/tmnet_db_func.py | 84 ++++++++++- app/views_task.py | 19 +++ cross_doctor.ini | 4 +- test.py | 4 +- tool/qcos_func.py | 65 +++++++++ 7 files changed, 604 insertions(+), 37 deletions(-) diff --git a/app/task_db_func.py b/app/task_db_func.py index d5f9fa7..1d6dc94 100644 --- a/app/task_db_func.py +++ b/app/task_db_func.py @@ -4,6 +4,7 @@ # @Description: # -*- coding:utf-8 -*- #import logging +import logging import pymysql import pymysql.cursors @@ -12,6 +13,7 @@ from datetime import datetime from flask import g from app.db_func_base import * + class TaskDbHelper(TableDbHelperBase): def __init__(self, pool): @@ -174,13 +176,13 @@ class TaskDbHelper(TableDbHelperBase): def add_task(self, timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time, executor, progress, task_state, description, crossids, sectionids, arteryids, comment, - record_state, task_src, task_class, nodeid, area_id): + record_state, task_src, task_class, nodeid, area_id, task_type_class, full_review): sql_insert = "insert into task (timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time," \ " executor, progress, task_state, description, crossids, sectionids, arteryids, comment," \ - " record_state, task_src, task_class, nodeid, area_id) values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s', %s)" % ( + " record_state, task_src, task_class, nodeid, area_id, task_type_class, full_review) values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s', %s, %s, %s)" % ( timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time, executor, progress, task_state, description, crossids, sectionids, arteryids, comment, - record_state, task_src, task_class, nodeid, area_id) + record_state, task_src, task_class, nodeid, area_id, task_type_class, full_review) count = self.do_execute(sql_insert) if count != 1: @@ -243,12 +245,118 @@ class TaskDbHelper(TableDbHelperBase): sql = "update task set task_state='%s' where nodeid='%s' and area_id='%s' and taskno='%s'" % (state, nodeid, area_id, task_no) return self.do_execute(sql) - # 私有属性示例 - __secret_code = "Private Info" + def query_ledger_task_list(self, nodeid, area_id): + sql = f""" + select distinct crossid from ledger_task_detail where task_no in (select taskno from task.task where nodeid = %d and area_id = %d and task_type_class = 1 and record_state != 1 and task_state != 4) and (submit_status is null or submit_status != 2) + """ % (int(nodeid), int(area_id)) + return self.do_select(sql) - # 私有方法示例 - def __private_activity(self): - print("This is a private activity.") + def query_ledger_task_crosses_info(self, task_no): + sql = """ + select * from ledger_task_detail where task_no = %s + """ % task_no + return self.do_select(sql) + + def query_ledger_task_crosses_pics(self, crossid_list): + crossids = "'" + "', '".join(item for item in crossid_list) + "'" + sql = """ + select * from tmnet.user_upload_cross_pics where crossid in (%s) + """ % crossids + return self.do_select(sql) + + def drop_old_task_cross(self, crossid_list, old_task_no, nodeid, area_id): + crossids = "'" + "', '".join(item for item in crossid_list) + "'" + sql = """ + delete from ledger_task_detail where task_no = %s and crossid in (%s) + """ % (old_task_no, crossids) + old_task_info = self.query_task(old_task_no, nodeid, area_id) + old_crossids = old_task_info['crossids'].split(',') + drop_crossids = list(set(old_crossids) - set(crossid_list)) + update_task_info_sql = """ + update task set crossids = '%s' where taskno = %s and nodeid = %s and area_id = %s + """ % (','.join(drop_crossids), old_task_no, nodeid, area_id) + conn, cursor = self.connect() + try: + conn.begin() + logging.info('drop_old_task_cross: %s' % (sql)) + logging.info('drop_old_task_cross: %s' % (update_task_info_sql)) + update_ret = cursor.execute(update_task_info_sql) + del_ret = cursor.execute(sql) + if update_ret == 1 and del_ret == len(crossid_list): + conn.commit() + return True + else: + conn.rollback() + return False + except Exception as e: + logging.error(e) + conn.rollback() + return False + + def insert_ledger_task_cross(self, insert_list, crossid_list, task_no, nodeid): + crossids = "'" + "', '".join(item for item in crossid_list) + "'" + sql = """ + insert into ledger_task_detail (task_no, crossid, nodeid, area_id) values(%s, %s, %s, %s) + """ + query_cross_ledger_status_sql = """ + select crossid, nodeid, status from tmnet.ledger_entering_status where crossid in (%s) + """ % crossids + cross_ledger_status_values = set() + ledger_status_info = self.do_select(query_cross_ledger_status_sql) + cross_ledger_info_dict = {row['crossid']: row['status'] for row in ledger_status_info} + for crossid in crossid_list: + if crossid in cross_ledger_info_dict.keys(): + cross_ledger_status_values.add(f"when '{crossid}' then {cross_ledger_info_dict[crossid]} ") + else: + cross_ledger_status_values.add(f"when '{crossid}' then 0 ") + # logging.error(cross_ledger_status_values) + update_ledger_task_cross_status_sql = f""" + update ledger_task_detail set ledger_status = case crossid {' '.join(list(cross_ledger_status_values))} end where crossid in ({crossids}) and nodeid = {nodeid} and task_no = {task_no} + """ + logging.info(update_ledger_task_cross_status_sql) + conn, cursor = self.connect() + try: + conn.begin() + ret = cursor.executemany(sql, insert_list) + if ret == len(insert_list): + update_ledger_status_ret = cursor.execute(update_ledger_task_cross_status_sql) + if update_ledger_status_ret == len(cross_ledger_status_values): + conn.commit() + return True + else: + conn.rollback() + logging.error("update ledger_entering_status fail") + return False + else: + conn.rollback() + return False + except Exception as e: + logging.error(e) + conn.rollback() + return False + + def query_ledger_task_cross_record(self, task_no, crossid): + sql = "select * from ledger_task_detail where task_no = %s and crossid = '%s'" % (task_no, crossid) + res = self.do_select(sql) + if res: + return res[0] + return None + + def update_ledger_task_cross_record(self, task_no, crossid, field_name, value): + sql = "update ledger_task_detail set %s = %s where task_no = '%s' and crossid = '%s'" % (field_name, value, task_no, crossid) + return self.do_execute(sql) + + def approval_ledger_task_cross(self, task_no, crossid, status, approver, approver_id, now_time): + sql = """ + update ledger_task_detail set submit_status = %s, approver = '%s', approver_id = '%s', approver_time = '%s' where task_no = '%s' and crossid = '%s' + """ % (status, approver, approver_id, now_time, task_no, crossid) + return self.do_execute(sql) + + def query_ledger_task_crosses(self, taskno): + sql = """ + select * from ledger_task_detail where task_no = %s + """ % taskno + return self.do_select(sql) # # if __name__ == '__main__': diff --git a/app/task_worker.py b/app/task_worker.py index 64756e0..55abf18 100644 --- a/app/task_worker.py +++ b/app/task_worker.py @@ -9,9 +9,13 @@ from flask import request, send_file from pypinyin import lazy_pinyin, Style from app.common_worker import * -from tool.qcos_func import get_client +from proto.phase_grpc import LedgerTaskDetailPhaseState +from tool.qcos_func import get_client, CosFolderManager ALLOWED_EXTENSION_TASK = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'doc', 'docx', 'xlsx', 'xls', 'pptx', 'ppt'} +ALLOWED_CROSS_PIC_FILE_TYPE = {'png', 'jpg', 'jpeg'} +g_cos_root = 'https://xinglu-1324629296.cos.ap-beijing.myqcloud.com' +g_cos_bucket = 'xinglu-1324629296' def do_query_task_list_parameter(params): @@ -187,6 +191,11 @@ def do_query_task_list(params): else: comment_list = [comment] task_info['comment'] = comment_list + if task_info['task_type_class'] == 1: + item_task_no = task_info['taskno'] + ledger_task_add_info = query_ledger_task_additional_info(item_task_no, nodeid, area_id) + task_info['entered_percent'] = 100 if task_info['task_state'] == 4 else ledger_task_add_info['entered_percent'] + task_info['approve_percent'] = 100 if task_info['task_state'] == 4 else ledger_task_add_info['approve_percent'] sorted_list = sorted(filtered_list, key=sort_key) res = make_common_res(0, 'ok') res['nodeid'] = nodeid @@ -457,8 +466,14 @@ def do_add_task(params): task_type = check_param(params, 'task_type') if not task_type: return json.dumps(make_common_res(2, '任务类型缺失,请检查后重试')) - # if int(task_type) not in (1, 2, 3, 4, 5): - # return json.dumps(make_common_res(2, '任务类型参数异常,请检查后重试')) + task_type_class = check_param(params, 'task_type_class') + if not task_type_class: + task_type_class = 0 + task_type_class = int(task_type_class) + full_review = check_param(params, 'full_review') + if not full_review: + full_review = 0 + full_review = int(full_review) task_name = check_param(params, 'task_name') if not task_name: return json.dumps(make_common_res(2, '任务名称缺失,请检查后重试')) @@ -491,8 +506,6 @@ def do_add_task(params): task_src = check_param(params, 'task_src') if not task_src: return json.dumps(make_common_res(2, '需求来源缺失,请检查后重试')) - # if task_src not in ('舆情', '交警交办任务', '现场拥堵', '平台优化策略'): - # return json.dumps(make_common_res(2, '需求来源异常,可选项为:舆情、交警交办任务、现场拥堵、平台优化策略,请检查后重试')) executor = check_param(params, 'executor') if not executor: @@ -528,7 +541,7 @@ def do_add_task(params): task_state = 0 count = db_task.add_task(timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time, publish_time, executor, progress, task_state, description, crossids, sectionids, arteryids, comment, - record_task_state, task_src, task_class, nodeid, area_id) + record_task_state, task_src, task_class, nodeid, area_id, task_type_class, full_review) if count != 1: logging.error(str(params) + ' 添加任务报错!') res = make_common_res(-1, '添加任务报错。') @@ -547,6 +560,21 @@ def do_add_task(params): logging.error(str(params) + ' do_add_task添加任务履历失败!') res = make_common_res(-1, '添加任务履历失败') return json.dumps(res) + if task_type_class == 1: + values = [] + crossid_list = crossids.split(',') + for crossid in crossid_list: + values.append((taskno, crossid, nodeid, area_id)) + ret = db_task.insert_ledger_task_cross(values, crossid_list, taskno, nodeid) + if not ret: + logging.error(str(params) + ' do_add_task添加任务关联路口失败!') + res = make_common_res(-1, '添加任务关联路口失败, 请反馈该情况至管理员') + return json.dumps(res) + # 更新配时方案状态信息函数调用 + ledger_update_res, e = LedgerTaskDetailPhaseState(int(nodeid), crossid_list) + if e: + logging.error(e) + return json.dumps(make_common_res(2, '台账任务创建成功,但路口列表配时方案状态更新失败,请反馈该情况至管理员!')) res = make_common_res(0, 'ok') res['nodeid'] = nodeid @@ -750,8 +778,6 @@ def do_task_upload(params): if len(request.files.keys()) < 1: return json.dumps(make_common_res(1, '文件不存在')) suc_file_num, suc_mysql_num = 0, 0 - g_cos_root = 'https://xinglu-1324629296.cos.ap-beijing.myqcloud.com' - g_cos_bucket = 'xinglu-1324629296' for key in request.files.keys(): file = request.files[key] try: @@ -932,12 +958,18 @@ def do_query_task_detail(params): if not taskno: return json.dumps(make_common_res(2, '任务id缺失,请检查后重试')) - task = db_task.query_task(taskno, nodeid) + task = db_task.query_task(taskno, nodeid, area_id) if task is None: logging.error(str(params) + ' do_query_task_detail 任务在数据库中不存在!') res = make_common_res(-1, '任务在数据库中不存在。') return json.dumps(res) - + if task['task_type_class'] == 1: + ledger_task_additional_info = query_ledger_task_additional_info(taskno, nodeid, area_id) + task['ledger_task_additional_info'] = ledger_task_additional_info + if task['task_state'] == 4: + task['ledger_task_additional_info']['entered_percent'] = 100 + task['ledger_task_additional_info']['approve_percent'] = 100 + task.pop('update_time') res['desc'] = '' res['data'] = task return json.dumps(res) @@ -987,20 +1019,6 @@ def gen_update_sql(params, task_old_info): modify_data += f", arteryids = '{arteryids}'" modify_item += prefix modify_item += f"【编辑】关联路网信息:{task_old_info['data_type']} -> {data_type}" - # plan_begin_time = check_param(params, 'plan_begin_time') - # if plan_begin_time and plan_begin_time != task_old_info['plan_begin_time']: - # if modify_data != '': - # modify_data += f', plan_begin_time = {int(plan_begin_time)}' - # else: - # modify_data += f'plan_begin_time = {int(plan_begin_time)}' - # modify_item += ' 计划开始时间 ' - # plan_end_time = check_param(params, 'plan_end_time') - # if plan_end_time and plan_end_time != task_old_info['plan_end_time']: - # if modify_data != '': - # modify_data += f', plan_end_time = {int(plan_end_time)}' - # else: - # modify_data += f'plan_end_time = {int(plan_end_time)}' - # modify_item += ' 计划结束时间 ' executor = check_param(params, 'executor') if executor and executor != task_old_info['executor']: prefix = '' @@ -1044,3 +1062,278 @@ def gen_update_sql(params, task_old_info): modify_item += f"【添加】任务备注:{comment}" return modify_data, modify_item + + +# 新增台账管理路口类别任务 圈选路口 返回所有可用路口列表 +def get_undistributed_cross_list(params): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_areaid_list(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = map(int, area_list) + if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: + return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) + + all_area_cross_list = db_tmnet.query_cross_list4task(nodeid, area_id) + # 查询出所有已经分配过的路口列表 + task_crosses = db_task.query_ledger_task_list(nodeid, area_id) + distributed_cross_list = [row['crossid'] for row in task_crosses] + + row_list = [cross for cross in all_area_cross_list if cross['crossid'] not in distributed_cross_list] + distributed_cross_info_list = [cross for cross in all_area_cross_list if cross['crossid'] in distributed_cross_list] + res = make_common_res(0, 'ok') + res['data'] = { + 'cross_list': row_list, + 'distributed_cross_list': distributed_cross_info_list + } + return json.dumps(res) + + +# 查询台账路口任务详情-> 路口详细列表、完成情况占比 +def query_ledger_task_additional_info(task_no, nodeid, area_id): + task_crosses_info = db_task.query_ledger_task_crosses_info(task_no) + task_cross_num = len(task_crosses_info) + entered_cross_num = len([cross for cross in task_crosses_info if cross['ledger_status'] == 2 and cross['phase_status'] in (1, 3)]) + entered_percent = int(entered_cross_num * 100 / task_cross_num) if task_cross_num > 0 else 0 + approve_cross_num = len([cross for cross in task_crosses_info if cross['submit_status'] == 2]) + approve_percent = int(approve_cross_num * 100 / task_cross_num) if task_cross_num > 0 else 0 + cross_name_info = db_tmnet.query_cross_list4task(nodeid, area_id) + cross_name_dict = {row['crossid']: row['name'] for row in cross_name_info} + cross_info_dict = {} + this_week_entering_cross_num, this_week_approve_cross_num = 0, 0 + for row in task_crosses_info: + row['name'] = cross_name_dict[row['crossid']] + ledger_time_info, phase_time_info, approve_time_info = get_year_week(row['ledger_status_update_time']), get_year_week(row['phase_status_update_time']), get_year_week(row['approver_time']) + now_time_info = get_year_week(datetime.now()) + if row['ledger_status'] == 2 and row['phase_status'] == 1 and ledger_time_info == now_time_info and phase_time_info == now_time_info: + this_week_entering_cross_num += 1 + if row['submit_status'] == 2 and approve_time_info == now_time_info: + this_week_approve_cross_num += 1 + for item in ['ledger_status_update_time', 'phase_status_update_time', 'approver_time', 'create_time', 'update_time']: + if row[item]: + row[item] = row[item].strftime('%Y-%m-%d %H:%M:%S') + cross_info_dict[row['crossid']] = row + cross_info_dict[row['crossid']]['high_pic_num'] = 0 + cross_info_dict[row['crossid']]['ground_pic_num'] = 0 + cross_pics = db_task.query_ledger_task_crosses_pics(list(cross_info_dict.keys())) + for row in cross_pics: + crossid = row['crossid'] + if row['pic_type'] == 1: + # 高空图最大数量为2 + cross_info_dict[crossid]['high_pic_num'] += 1 + else: + # 地面图最大数量为10 + cross_info_dict[crossid]['ground_pic_num'] += 1 + + ledger_task_additional_info = { + 'entered_percent': entered_percent, + 'approve_percent': approve_percent, + 'this_week_entering_cross_num': this_week_entering_cross_num, + 'this_week_approve_cross_num': this_week_approve_cross_num, + 'cross_list': list(cross_info_dict.values()) + } + return ledger_task_additional_info + + +# 重新分配台账任务路口 +def do_redistributed_cross_list(params): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_areaid_list(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = map(int, area_list) + if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: + return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) + old_task_no = check_param(params, 'old_task_no') + if not old_task_no: + return json.dumps(make_common_res(6, '缺少任务编号, 请刷新后重试')) + crossid_list = check_param(params, 'crossid_list') + if not crossid_list or len(crossid_list) < 1: + return json.dumps(make_common_res(7, '缺少路口列表, 请刷新后重试')) + + ret = db_task.drop_old_task_cross(crossid_list, old_task_no, nodeid, area_id) + if not ret: + return json.dumps(make_common_res(8, '删除旧任务路口失败,请检查后重试')) + return json.dumps(make_common_res(0, 'ok')) + + +# 操作台账任务路口状态 +def do_op_ledger_task_cross_status(params): + nodeid = check_param(params, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param(params, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param(params, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_areaid_list(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = map(int, area_list) + if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: + return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) + task_no = check_param(params, 'task_no') + if not task_no: + return json.dumps(make_common_res(6, '缺少任务编号, 请刷新后重试')) + crossid = check_param(params, 'crossid') + if not crossid: + return json.dumps(make_common_res(7, '缺少路口编号, 请刷新后重试')) + status = check_param(params, 'status') + if not status: + return json.dumps(make_common_res(8, '缺少状态信息')) + status = int(status) + record = db_task.query_ledger_task_cross_record(task_no, crossid) + if not record: + return json.dumps(make_common_res(8, '路口信息异常,请检查后重试')) + if record['phase_status'] != 1 and record['ledger_status'] != 2: + return json.dumps(make_common_res(9, '路口当前未完成信息录入,请确认路口状态信息')) + if status in (2, 3): + approver = check_param(params, 'approver') + if not approver: + return json.dumps(make_common_res(10, '缺少审核人信息')) + approver_id = check_param(params, 'approver_id') + if not approver_id: + return json.dumps(make_common_res(11, '缺少审核人id信息')) + now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + ret = db_task.approval_ledger_task_cross(task_no, crossid, status, approver, approver_id, now_time) + else: + # 提交状态字段 submit 0未提交 1待审核 2审核通过 3审核未通过 + ret = db_task.update_ledger_task_cross_record(task_no, crossid, 'submit_status', status) + if ret: + return json.dumps(make_common_res(0, 'ok')) + return json.dumps(make_common_res(10, '更新路口提交状态失败,请检查后重试')) + + +# 批量上传路口图片 +def do_batch_upload_cross_pics(params): + nodeid = check_param({'nodeid': request.form.get('nodeid')}, 'nodeid') + if not nodeid: + return json.dumps(make_common_res(2, '缺少nodeid, 请刷新后重试')) + area_id = check_param({'area_id': request.form.get('area_id')}, 'area_id') + if not area_id: + return json.dumps(make_common_res(3, '缺少area_id, 请刷新后重试')) + userid = check_param({'userid': request.form.get('userid')}, 'userid') + if not userid: + return json.dumps(make_common_res(4, '缺少userid, 请刷新后重试')) + area_list = db_user.query_areaid_list(userid) + if not area_list or len(area_list) < 1: + return json.dumps(make_common_res(5, '用户信息异常')) + area_list = map(int, area_list) + if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list: + return json.dumps(make_common_res(5, '辖区id异常,请检查后重试')) + task_no = check_param({'task_no': request.form.get('task_no')}, 'task_no') + if not task_no: + return json.dumps(make_common_res(6, '缺少任务编号, 请刷新后重试')) + file_type = check_param({'file_type': request.form.get('file_type')}, 'file_type') + if not file_type: + file_type = 1 + # file_type 1: 高空图片 2:地面图片 + file_type = int(file_type) + if len(request.files.keys()) < 1: + return json.dumps(make_common_res(7, '缺少文件列表, 请刷新后重试')) + task_cross_list = db_task.query_ledger_task_crosses(task_no) + crossid_list = [row['crossid'] for row in task_cross_list] + cross_info_list = db_tmnet.query_cross_infos(crossid_list) + cross_info_dict = {row['name']: row['crossid'] for row in cross_info_list} + checker = RegexSingleMatcher(cross_info_dict) + fail_list, suc_dict, records = [], {}, [] + cross_file_dict = {} + cos_client = get_client() + folder_manager = CosFolderManager(cos_client, g_cos_bucket) + for key in request.files.keys(): + file_info = request.files[key] + name = file_info.filename + if not '.' in name and name.rsplit('.', 1)[1].lower() in ALLOWED_CROSS_PIC_FILE_TYPE: + continue + find_res = checker.find(name) + if find_res: + # 如果找到了路口id,说明文件名称匹配到了路口,将该文件按照需求上传到cos目录上的路口文件夹下,同时写表记录文件路径 + file_stream = file_info.stream + crossid = cross_info_dict[find_res] + cos_path = f'user/ledger/{nodeid}/{area_id}/{crossid}' + folder_manager.ensure_folder(cos_path) + cos_key = f'{cos_path}/{name}' + cos_client.put_object(Bucket=g_cos_bucket, Key=cos_key, Body=file_stream) + suc_dict[name] = find_res + download_url = f'{g_cos_root}/{cos_key}' + if crossid not in cross_file_dict.keys(): + cross_file_dict[crossid] = [] + cross_file_dict[crossid].append((crossid, file_type, download_url)) + else: + fail_list.append(name) + continue + for crossid in cross_file_dict.keys(): + his_pics = db_tmnet.query_cross_pics(crossid, file_type) + if file_type == 2: + if len(his_pics) + len(cross_file_dict[crossid]) > 10: + db_tmnet.del_cross_pic(crossid, file_type, len(his_pics) + len(cross_file_dict[crossid]) - 10) + elif file_type == 1: + if len(his_pics) + len(cross_file_dict[crossid]) > 2: + db_tmnet.del_cross_pic(crossid, file_type, len(his_pics) + len(cross_file_dict[crossid]) - 2) + records.extend(cross_file_dict[crossid]) + # 批量写入路口图片记录 + if records: + ret = db_tmnet.batch_insert_cross_pic(records) + else: + ret = 0 + if ret == len(records): + res = make_common_res(0, 'ok') + res['data'] = { + 'total': len(request.files.keys()), + 'suc_info': suc_dict, + 'fail_list': fail_list + } + return json.dumps(res) + else: + return json.dumps(make_common_res(8, '批量上传路口图片失败')) + + +class RegexSingleMatcher: + def __init__(self, key_dict): + self.key_dict = key_dict + # 转义并编译正则 + keys = [re.escape(k) for k in key_dict.keys() if k] + if keys: + # 正则 search 默认就是找到第一个就停止 + self.pattern = re.compile("|".join(keys)) + else: + self.pattern = None + + def find(self, target_string): + if not self.pattern: + return None + match = self.pattern.search(target_string) + if match: + return match.group() # 返回匹配到的具体字符串 + return None + + +def get_year_week(time_input): + """ + 支持 datetime 对象、时间戳整数、时间字符串 + 返回格式:'2023-40' + """ + if not time_input: + return None + + # 使用 ISO 标准获取年周 + year, week, _ = time_input.isocalendar() + return f"{year}-{week:02d}" diff --git a/app/tmnet_db_func.py b/app/tmnet_db_func.py index d0c123e..ccd0d42 100644 --- a/app/tmnet_db_func.py +++ b/app/tmnet_db_func.py @@ -78,6 +78,43 @@ class TmnetDbHelper(TableDbHelperBase): cross_list = [cross for cross in cross_list if cross['crossid'] in area_cross_list] return cross_list + def query_cross_list4task(self, nodeid, area_id): + sql = """ + select + if(t2.name is not null, t2.name, t1.name) as name, + t1.crossid, + if (t2.location is not null, t2.location, t1.location) as location, + t1.nodeid, + t1.area_id + from (select name,crossid, location,nodeid, area_id from `cross` where nodeid = %s and area_id = %s and at_edge=0 and isdeleted=0 ) as t1 + left join (select name,crossid, location,nodeid, area_id from `cross_ledger_update_info` where nodeid = %s and area_id = %s and at_edge=0 and isdeleted=0 ) as t2 on t1.crossid=t2.crossid + """ % (nodeid, area_id, nodeid, area_id) + cross_list = self.do_select(sql) + virtual_cross_sql = f'select name, crossid, location, nodeid, area_id from user_defined_cross where nodeid = {nodeid} and area_id = {area_id}' + virtual_cross_list = self.do_select(virtual_cross_sql) + cross_list.extend(virtual_cross_list) + if area_id != 0: + area_cross_sql = """ + select bound_crosses.crossid, bound_crosses.location, c.name, bound_crosses.nodeid, bound_crosses.area_id + from tmnet.bound_crosses + left join (select if(t2.name is not null, t2.name, t1.name) as name, t1.crossid + from tmnet.`cross` t1 + left join tmnet.`cross_ledger_update_info` t2 on t1.crossid = t2.crossid + where t1.nodeid = %s + and t1.area_id = %s + union + select name, crossid + from tmnet.user_defined_cross + where nodeid = %s + and area_id = %s) c on bound_crosses.crossid = c.crossid + where bound_crosses.nodeid = %s + and bound_crosses.area_id = %s + """ % (nodeid, area_id, nodeid, area_id, nodeid, area_id) + area_crosses = self.do_select(area_cross_sql) + area_cross_list = [cross['crossid'] for cross in area_crosses] + cross_list = [cross for cross in cross_list if cross['crossid'] in area_cross_list] + return cross_list + def query_cross_inroads(self, crossid, nodeid): sql = """ select @@ -628,4 +665,49 @@ class TmnetDbHelper(TableDbHelperBase): def query_slc_company_dict(self): sql = "select mapping_code, web_tag from tags.cross_tag_meta where field_eng_name = 'slc_company'" - return self.do_select(sql) \ No newline at end of file + return self.do_select(sql) + + def query_cross_infos(self, crossid_list): + crossids = "'" + "', '".join(item for item in crossid_list) + "'" + sql = """ + select + if(t2.name is not null, t2.name, t1.name) as name, + t1.crossid, + if (t2.location is not null, t2.location, t1.location) as location, + t1.nodeid, + t1.area_id + from (select name,crossid, location,nodeid, area_id from `cross` where crossid in (%s) and at_edge=0 and isdeleted=0 ) as t1 + left join (select name,crossid, location,nodeid, area_id from `cross_ledger_update_info` where crossid in (%s) and at_edge=0 and isdeleted=0 ) as t2 on t1.crossid=t2.crossid + """ % (crossids, crossids) + cross_list = self.do_select(sql) + virtual_cross_sql = f'select name, crossid, location, nodeid, area_id from user_defined_cross where crossid in ({crossids})' + virtual_cross_list = self.do_select(virtual_cross_sql) + cross_list.extend(virtual_cross_list) + return cross_list + + def query_cross_pics(self, crossid, pic_type): + sql = """ + select * from user_upload_cross_pics where crossid = '%s' and pic_type = %s + """ % (crossid, pic_type) + return self.do_select(sql) + + def del_cross_pic(self, crossid, pic_type, del_num): + sql = """ + DELETE FROM user_upload_cross_pics + WHERE id in ( + SELECT id FROM ( + SELECT id FROM user_upload_cross_pics + WHERE crossid = '%s' + AND pic_type = %s + ORDER BY create_time ASC + LIMIT %d + ) AS tmp + ); + """ % (crossid, pic_type, del_num) + return self.do_execute(sql) + + def batch_insert_cross_pic(self, values): + sql = f""" + insert into user_upload_cross_pics (crossid, pic_type, pic_path) values (%s, %s, %s) + """ + return self.do_executemany(sql, values) diff --git a/app/views_task.py b/app/views_task.py index 32ebc1f..c711fd7 100644 --- a/app/views_task.py +++ b/app/views_task.py @@ -143,6 +143,25 @@ def task_del(): return del_task_file_api(dict(request.args)) +@app.route('/api/undistributed_cross_list', methods=['GET']) +def undistributed_cross_list(): + return get_undistributed_cross_list(dict(request.args)) + + +@app.route('/api/redistributed_cross_list', methods=['POST']) +def redistributed_cross_list(): + return do_redistributed_cross_list(request.get_json()) + + +@app.route('/api/op_ledger_task_cross_status', methods=['GET']) +def op_ledger_task_cross_status(): + return do_op_ledger_task_cross_status(dict(request.args)) + + +@app.route('/api/batch_upload_cross_pics', methods=['POST']) +def batch_upload_cross_pics(): + return do_batch_upload_cross_pics(dict(request.form)) + # if __name__ == '__main__': # init() # app.run(debug=True) diff --git a/cross_doctor.ini b/cross_doctor.ini index aa48703..cbc4e51 100644 --- a/cross_doctor.ini +++ b/cross_doctor.ini @@ -1,9 +1,9 @@ [roadnet] ;citylist = 110000,130100,130200,140400,150100,320500,321000,330800,330900,341700,350300,350400,350800,370100,440600,440800,441800,445100,450200,450600,420100 ;citylist = 110000,130100,130200,150100,350300,370100,440600,450200 -citylist = 350100,350300,130200 +citylist = 350300 ;citylist = 110000,130100,130200,150100,350300,370100,440600,450200,421000,370500,532600,140700,140400,430100,440100,370300,320500,140200 -hot_citylist = 350100,350300,130200 +hot_citylist = 350300 [db] host = 120.53.125.169 diff --git a/test.py b/test.py index 5bbd0d8..4e2b1a1 100644 --- a/test.py +++ b/test.py @@ -55,7 +55,7 @@ def init(): def test_get_cross_delay_data(): - row_list = db_cross.query_cross_delay_info('CR_11987179_2632645', 350100, '20251013', 't830') + row_list = db_cross.query_cross_delay_info('CR_11902719_2542847', 350300, ['20260101'], 't700') data = row_list[0]['data'] cross_delay = pb.xl_cross_delayinfo_t() cross_delay.ParseFromString(data) @@ -150,4 +150,4 @@ def gen_monitor_cross_ledger_info(): if __name__ == '__main__': init() - gen_monitor_cross_ledger_info() \ No newline at end of file + test_get_cross_delay_data() \ No newline at end of file diff --git a/tool/qcos_func.py b/tool/qcos_func.py index 66709b2..1954cb9 100644 --- a/tool/qcos_func.py +++ b/tool/qcos_func.py @@ -1,6 +1,7 @@ # -*- coding=utf-8 from qcloud_cos import CosConfig from qcloud_cos import CosS3Client +from qcloud_cos.cos_exception import CosServiceError import sys import os import logging @@ -78,6 +79,70 @@ def upload_order_images_to_cos(idx_to_localfile: dict, orderid: str): return idx_to_cosfile +class CosFolderManager: + def __init__(self, client, bucket): + self.client = client + self.bucket = bucket + + def _normalize_folder(self, folder_path): + """确保文件夹路径以 / 结尾""" + if not folder_path: + return '' + # 去除开头的 / + if folder_path.startswith('/'): + folder_path = folder_path[1:] + # 确保结尾有 / + if not folder_path.endswith('/'): + folder_path += '/' + return folder_path + + def folder_exists(self, folder_path): + """ + 判断文件夹是否存在 + :param folder_path: 文件夹路径,如 'data/logs/' 或 'data/logs' + :return: True/False + """ + key = self._normalize_folder(folder_path) + if not key: + return True # 根目录默认存在 + + try: + self.client.head_object(Bucket=self.bucket, Key=key) + return True + except CosServiceError as e: + if e.get_status_code() == 404: + return False + raise # 其他错误向上抛出 + + def create_folder(self, folder_path): + """ + 创建文件夹(上传一个以 / 结尾的空对象) + :param folder_path: 文件夹路径 + :return: None + """ + key = self._normalize_folder(folder_path) + if not key: + return # 根目录无需创建 + + self.client.put_object( + Bucket=self.bucket, + Key=key, + Body=b'' # 空内容 + ) + + def ensure_folder(self, folder_path): + """ + 确保文件夹存在,不存在则创建 + :param folder_path: 文件夹路径 + :return: True(已存在) / False(刚创建) + """ + if self.folder_exists(folder_path): + return True + else: + self.create_folder(folder_path) + return False + + if __name__ == '__main__': idx_to_localfile = {0:'D:/slgwork/slgcode/wave_survey/1739430848000.jpg', 2:'D:/slgwork/slgcode/wave_survey/1739430851000.jpg'}