提交台账任务工作流及台账需求迭代功能相关已经测试的代码

This commit is contained in:
wangxu 2026-03-12 18:01:22 +08:00
parent 765b3bd830
commit 1622eb5928
7 changed files with 604 additions and 37 deletions

View File

@ -4,6 +4,7 @@
# @Description:
# -*- coding:utf-8 -*-
#import logging
import logging
import pymysql
import pymysql.cursors
@ -12,6 +13,7 @@ from datetime import datetime
from flask import g
from app.db_func_base import *
class TaskDbHelper(TableDbHelperBase):
def __init__(self, pool):
@ -174,13 +176,13 @@ class TaskDbHelper(TableDbHelperBase):
def add_task(self, timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time,
executor, progress, task_state, description, crossids, sectionids, arteryids, comment,
record_state, task_src, task_class, nodeid, area_id):
record_state, task_src, task_class, nodeid, area_id, task_type_class, full_review):
sql_insert = "insert into task (timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time," \
" executor, progress, task_state, description, crossids, sectionids, arteryids, comment," \
" record_state, task_src, task_class, nodeid, area_id) values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s', %s)" % (
" record_state, task_src, task_class, nodeid, area_id, task_type_class, full_review) values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s', %s, %s, %s)" % (
timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time,
executor, progress, task_state, description, crossids, sectionids, arteryids, comment,
record_state, task_src, task_class, nodeid, area_id)
record_state, task_src, task_class, nodeid, area_id, task_type_class, full_review)
count = self.do_execute(sql_insert)
if count != 1:
@ -243,12 +245,118 @@ class TaskDbHelper(TableDbHelperBase):
sql = "update task set task_state='%s' where nodeid='%s' and area_id='%s' and taskno='%s'" % (state, nodeid, area_id, task_no)
return self.do_execute(sql)
# 私有属性示例
__secret_code = "Private Info"
def query_ledger_task_list(self, nodeid, area_id):
sql = f"""
select distinct crossid from ledger_task_detail where task_no in (select taskno from task.task where nodeid = %d and area_id = %d and task_type_class = 1 and record_state != 1 and task_state != 4) and (submit_status is null or submit_status != 2)
""" % (int(nodeid), int(area_id))
return self.do_select(sql)
# 私有方法示例
def __private_activity(self):
print("This is a private activity.")
def query_ledger_task_crosses_info(self, task_no):
sql = """
select * from ledger_task_detail where task_no = %s
""" % task_no
return self.do_select(sql)
def query_ledger_task_crosses_pics(self, crossid_list):
crossids = "'" + "', '".join(item for item in crossid_list) + "'"
sql = """
select * from tmnet.user_upload_cross_pics where crossid in (%s)
""" % crossids
return self.do_select(sql)
def drop_old_task_cross(self, crossid_list, old_task_no, nodeid, area_id):
crossids = "'" + "', '".join(item for item in crossid_list) + "'"
sql = """
delete from ledger_task_detail where task_no = %s and crossid in (%s)
""" % (old_task_no, crossids)
old_task_info = self.query_task(old_task_no, nodeid, area_id)
old_crossids = old_task_info['crossids'].split(',')
drop_crossids = list(set(old_crossids) - set(crossid_list))
update_task_info_sql = """
update task set crossids = '%s' where taskno = %s and nodeid = %s and area_id = %s
""" % (','.join(drop_crossids), old_task_no, nodeid, area_id)
conn, cursor = self.connect()
try:
conn.begin()
logging.info('drop_old_task_cross: %s' % (sql))
logging.info('drop_old_task_cross: %s' % (update_task_info_sql))
update_ret = cursor.execute(update_task_info_sql)
del_ret = cursor.execute(sql)
if update_ret == 1 and del_ret == len(crossid_list):
conn.commit()
return True
else:
conn.rollback()
return False
except Exception as e:
logging.error(e)
conn.rollback()
return False
def insert_ledger_task_cross(self, insert_list, crossid_list, task_no, nodeid):
crossids = "'" + "', '".join(item for item in crossid_list) + "'"
sql = """
insert into ledger_task_detail (task_no, crossid, nodeid, area_id) values(%s, %s, %s, %s)
"""
query_cross_ledger_status_sql = """
select crossid, nodeid, status from tmnet.ledger_entering_status where crossid in (%s)
""" % crossids
cross_ledger_status_values = set()
ledger_status_info = self.do_select(query_cross_ledger_status_sql)
cross_ledger_info_dict = {row['crossid']: row['status'] for row in ledger_status_info}
for crossid in crossid_list:
if crossid in cross_ledger_info_dict.keys():
cross_ledger_status_values.add(f"when '{crossid}' then {cross_ledger_info_dict[crossid]} ")
else:
cross_ledger_status_values.add(f"when '{crossid}' then 0 ")
# logging.error(cross_ledger_status_values)
update_ledger_task_cross_status_sql = f"""
update ledger_task_detail set ledger_status = case crossid {' '.join(list(cross_ledger_status_values))} end where crossid in ({crossids}) and nodeid = {nodeid} and task_no = {task_no}
"""
logging.info(update_ledger_task_cross_status_sql)
conn, cursor = self.connect()
try:
conn.begin()
ret = cursor.executemany(sql, insert_list)
if ret == len(insert_list):
update_ledger_status_ret = cursor.execute(update_ledger_task_cross_status_sql)
if update_ledger_status_ret == len(cross_ledger_status_values):
conn.commit()
return True
else:
conn.rollback()
logging.error("update ledger_entering_status fail")
return False
else:
conn.rollback()
return False
except Exception as e:
logging.error(e)
conn.rollback()
return False
def query_ledger_task_cross_record(self, task_no, crossid):
sql = "select * from ledger_task_detail where task_no = %s and crossid = '%s'" % (task_no, crossid)
res = self.do_select(sql)
if res:
return res[0]
return None
def update_ledger_task_cross_record(self, task_no, crossid, field_name, value):
sql = "update ledger_task_detail set %s = %s where task_no = '%s' and crossid = '%s'" % (field_name, value, task_no, crossid)
return self.do_execute(sql)
def approval_ledger_task_cross(self, task_no, crossid, status, approver, approver_id, now_time):
sql = """
update ledger_task_detail set submit_status = %s, approver = '%s', approver_id = '%s', approver_time = '%s' where task_no = '%s' and crossid = '%s'
""" % (status, approver, approver_id, now_time, task_no, crossid)
return self.do_execute(sql)
def query_ledger_task_crosses(self, taskno):
sql = """
select * from ledger_task_detail where task_no = %s
""" % taskno
return self.do_select(sql)
#
# if __name__ == '__main__':

View File

@ -9,9 +9,13 @@ from flask import request, send_file
from pypinyin import lazy_pinyin, Style
from app.common_worker import *
from tool.qcos_func import get_client
from proto.phase_grpc import LedgerTaskDetailPhaseState
from tool.qcos_func import get_client, CosFolderManager
ALLOWED_EXTENSION_TASK = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'doc', 'docx', 'xlsx', 'xls', 'pptx', 'ppt'}
ALLOWED_CROSS_PIC_FILE_TYPE = {'png', 'jpg', 'jpeg'}
g_cos_root = 'https://xinglu-1324629296.cos.ap-beijing.myqcloud.com'
g_cos_bucket = 'xinglu-1324629296'
def do_query_task_list_parameter(params):
@ -187,6 +191,11 @@ def do_query_task_list(params):
else:
comment_list = [comment]
task_info['comment'] = comment_list
if task_info['task_type_class'] == 1:
item_task_no = task_info['taskno']
ledger_task_add_info = query_ledger_task_additional_info(item_task_no, nodeid, area_id)
task_info['entered_percent'] = 100 if task_info['task_state'] == 4 else ledger_task_add_info['entered_percent']
task_info['approve_percent'] = 100 if task_info['task_state'] == 4 else ledger_task_add_info['approve_percent']
sorted_list = sorted(filtered_list, key=sort_key)
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
@ -457,8 +466,14 @@ def do_add_task(params):
task_type = check_param(params, 'task_type')
if not task_type:
return json.dumps(make_common_res(2, '任务类型缺失,请检查后重试'))
# if int(task_type) not in (1, 2, 3, 4, 5):
# return json.dumps(make_common_res(2, '任务类型参数异常,请检查后重试'))
task_type_class = check_param(params, 'task_type_class')
if not task_type_class:
task_type_class = 0
task_type_class = int(task_type_class)
full_review = check_param(params, 'full_review')
if not full_review:
full_review = 0
full_review = int(full_review)
task_name = check_param(params, 'task_name')
if not task_name:
return json.dumps(make_common_res(2, '任务名称缺失,请检查后重试'))
@ -491,8 +506,6 @@ def do_add_task(params):
task_src = check_param(params, 'task_src')
if not task_src:
return json.dumps(make_common_res(2, '需求来源缺失,请检查后重试'))
# if task_src not in ('舆情', '交警交办任务', '现场拥堵', '平台优化策略'):
# return json.dumps(make_common_res(2, '需求来源异常,可选项为:舆情、交警交办任务、现场拥堵、平台优化策略,请检查后重试'))
executor = check_param(params, 'executor')
if not executor:
@ -528,7 +541,7 @@ def do_add_task(params):
task_state = 0
count = db_task.add_task(timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time, publish_time,
executor, progress, task_state, description, crossids, sectionids, arteryids, comment,
record_task_state, task_src, task_class, nodeid, area_id)
record_task_state, task_src, task_class, nodeid, area_id, task_type_class, full_review)
if count != 1:
logging.error(str(params) + ' 添加任务报错!')
res = make_common_res(-1, '添加任务报错。')
@ -547,6 +560,21 @@ def do_add_task(params):
logging.error(str(params) + ' do_add_task添加任务履历失败!')
res = make_common_res(-1, '添加任务履历失败')
return json.dumps(res)
if task_type_class == 1:
values = []
crossid_list = crossids.split(',')
for crossid in crossid_list:
values.append((taskno, crossid, nodeid, area_id))
ret = db_task.insert_ledger_task_cross(values, crossid_list, taskno, nodeid)
if not ret:
logging.error(str(params) + ' do_add_task添加任务关联路口失败!')
res = make_common_res(-1, '添加任务关联路口失败, 请反馈该情况至管理员')
return json.dumps(res)
# 更新配时方案状态信息函数调用
ledger_update_res, e = LedgerTaskDetailPhaseState(int(nodeid), crossid_list)
if e:
logging.error(e)
return json.dumps(make_common_res(2, '台账任务创建成功,但路口列表配时方案状态更新失败,请反馈该情况至管理员!'))
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
@ -750,8 +778,6 @@ def do_task_upload(params):
if len(request.files.keys()) < 1:
return json.dumps(make_common_res(1, '文件不存在'))
suc_file_num, suc_mysql_num = 0, 0
g_cos_root = 'https://xinglu-1324629296.cos.ap-beijing.myqcloud.com'
g_cos_bucket = 'xinglu-1324629296'
for key in request.files.keys():
file = request.files[key]
try:
@ -932,12 +958,18 @@ def do_query_task_detail(params):
if not taskno:
return json.dumps(make_common_res(2, '任务id缺失请检查后重试'))
task = db_task.query_task(taskno, nodeid)
task = db_task.query_task(taskno, nodeid, area_id)
if task is None:
logging.error(str(params) + ' do_query_task_detail 任务在数据库中不存在!')
res = make_common_res(-1, '任务在数据库中不存在。')
return json.dumps(res)
if task['task_type_class'] == 1:
ledger_task_additional_info = query_ledger_task_additional_info(taskno, nodeid, area_id)
task['ledger_task_additional_info'] = ledger_task_additional_info
if task['task_state'] == 4:
task['ledger_task_additional_info']['entered_percent'] = 100
task['ledger_task_additional_info']['approve_percent'] = 100
task.pop('update_time')
res['desc'] = ''
res['data'] = task
return json.dumps(res)
@ -987,20 +1019,6 @@ def gen_update_sql(params, task_old_info):
modify_data += f", arteryids = '{arteryids}'"
modify_item += prefix
modify_item += f"【编辑】关联路网信息:{task_old_info['data_type']} -> {data_type}"
# plan_begin_time = check_param(params, 'plan_begin_time')
# if plan_begin_time and plan_begin_time != task_old_info['plan_begin_time']:
# if modify_data != '':
# modify_data += f', plan_begin_time = {int(plan_begin_time)}'
# else:
# modify_data += f'plan_begin_time = {int(plan_begin_time)}'
# modify_item += ' 计划开始时间 '
# plan_end_time = check_param(params, 'plan_end_time')
# if plan_end_time and plan_end_time != task_old_info['plan_end_time']:
# if modify_data != '':
# modify_data += f', plan_end_time = {int(plan_end_time)}'
# else:
# modify_data += f'plan_end_time = {int(plan_end_time)}'
# modify_item += ' 计划结束时间 '
executor = check_param(params, 'executor')
if executor and executor != task_old_info['executor']:
prefix = ''
@ -1044,3 +1062,278 @@ def gen_update_sql(params, task_old_info):
modify_item += f"【添加】任务备注:{comment}"
return modify_data, modify_item
# 新增台账管理路口类别任务 圈选路口 返回所有可用路口列表
def get_undistributed_cross_list(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
all_area_cross_list = db_tmnet.query_cross_list4task(nodeid, area_id)
# 查询出所有已经分配过的路口列表
task_crosses = db_task.query_ledger_task_list(nodeid, area_id)
distributed_cross_list = [row['crossid'] for row in task_crosses]
row_list = [cross for cross in all_area_cross_list if cross['crossid'] not in distributed_cross_list]
distributed_cross_info_list = [cross for cross in all_area_cross_list if cross['crossid'] in distributed_cross_list]
res = make_common_res(0, 'ok')
res['data'] = {
'cross_list': row_list,
'distributed_cross_list': distributed_cross_info_list
}
return json.dumps(res)
# 查询台账路口任务详情-> 路口详细列表、完成情况占比
def query_ledger_task_additional_info(task_no, nodeid, area_id):
task_crosses_info = db_task.query_ledger_task_crosses_info(task_no)
task_cross_num = len(task_crosses_info)
entered_cross_num = len([cross for cross in task_crosses_info if cross['ledger_status'] == 2 and cross['phase_status'] in (1, 3)])
entered_percent = int(entered_cross_num * 100 / task_cross_num) if task_cross_num > 0 else 0
approve_cross_num = len([cross for cross in task_crosses_info if cross['submit_status'] == 2])
approve_percent = int(approve_cross_num * 100 / task_cross_num) if task_cross_num > 0 else 0
cross_name_info = db_tmnet.query_cross_list4task(nodeid, area_id)
cross_name_dict = {row['crossid']: row['name'] for row in cross_name_info}
cross_info_dict = {}
this_week_entering_cross_num, this_week_approve_cross_num = 0, 0
for row in task_crosses_info:
row['name'] = cross_name_dict[row['crossid']]
ledger_time_info, phase_time_info, approve_time_info = get_year_week(row['ledger_status_update_time']), get_year_week(row['phase_status_update_time']), get_year_week(row['approver_time'])
now_time_info = get_year_week(datetime.now())
if row['ledger_status'] == 2 and row['phase_status'] == 1 and ledger_time_info == now_time_info and phase_time_info == now_time_info:
this_week_entering_cross_num += 1
if row['submit_status'] == 2 and approve_time_info == now_time_info:
this_week_approve_cross_num += 1
for item in ['ledger_status_update_time', 'phase_status_update_time', 'approver_time', 'create_time', 'update_time']:
if row[item]:
row[item] = row[item].strftime('%Y-%m-%d %H:%M:%S')
cross_info_dict[row['crossid']] = row
cross_info_dict[row['crossid']]['high_pic_num'] = 0
cross_info_dict[row['crossid']]['ground_pic_num'] = 0
cross_pics = db_task.query_ledger_task_crosses_pics(list(cross_info_dict.keys()))
for row in cross_pics:
crossid = row['crossid']
if row['pic_type'] == 1:
# 高空图最大数量为2
cross_info_dict[crossid]['high_pic_num'] += 1
else:
# 地面图最大数量为10
cross_info_dict[crossid]['ground_pic_num'] += 1
ledger_task_additional_info = {
'entered_percent': entered_percent,
'approve_percent': approve_percent,
'this_week_entering_cross_num': this_week_entering_cross_num,
'this_week_approve_cross_num': this_week_approve_cross_num,
'cross_list': list(cross_info_dict.values())
}
return ledger_task_additional_info
# 重新分配台账任务路口
def do_redistributed_cross_list(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
old_task_no = check_param(params, 'old_task_no')
if not old_task_no:
return json.dumps(make_common_res(6, '缺少任务编号, 请刷新后重试'))
crossid_list = check_param(params, 'crossid_list')
if not crossid_list or len(crossid_list) < 1:
return json.dumps(make_common_res(7, '缺少路口列表, 请刷新后重试'))
ret = db_task.drop_old_task_cross(crossid_list, old_task_no, nodeid, area_id)
if not ret:
return json.dumps(make_common_res(8, '删除旧任务路口失败,请检查后重试'))
return json.dumps(make_common_res(0, 'ok'))
# 操作台账任务路口状态
def do_op_ledger_task_cross_status(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param(params, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
task_no = check_param(params, 'task_no')
if not task_no:
return json.dumps(make_common_res(6, '缺少任务编号, 请刷新后重试'))
crossid = check_param(params, 'crossid')
if not crossid:
return json.dumps(make_common_res(7, '缺少路口编号, 请刷新后重试'))
status = check_param(params, 'status')
if not status:
return json.dumps(make_common_res(8, '缺少状态信息'))
status = int(status)
record = db_task.query_ledger_task_cross_record(task_no, crossid)
if not record:
return json.dumps(make_common_res(8, '路口信息异常,请检查后重试'))
if record['phase_status'] != 1 and record['ledger_status'] != 2:
return json.dumps(make_common_res(9, '路口当前未完成信息录入,请确认路口状态信息'))
if status in (2, 3):
approver = check_param(params, 'approver')
if not approver:
return json.dumps(make_common_res(10, '缺少审核人信息'))
approver_id = check_param(params, 'approver_id')
if not approver_id:
return json.dumps(make_common_res(11, '缺少审核人id信息'))
now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
ret = db_task.approval_ledger_task_cross(task_no, crossid, status, approver, approver_id, now_time)
else:
# 提交状态字段 submit 0未提交 1待审核 2审核通过 3审核未通过
ret = db_task.update_ledger_task_cross_record(task_no, crossid, 'submit_status', status)
if ret:
return json.dumps(make_common_res(0, 'ok'))
return json.dumps(make_common_res(10, '更新路口提交状态失败,请检查后重试'))
# 批量上传路口图片
def do_batch_upload_cross_pics(params):
nodeid = check_param({'nodeid': request.form.get('nodeid')}, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(2, '缺少nodeid 请刷新后重试'))
area_id = check_param({'area_id': request.form.get('area_id')}, 'area_id')
if not area_id:
return json.dumps(make_common_res(3, '缺少area_id 请刷新后重试'))
userid = check_param({'userid': request.form.get('userid')}, 'userid')
if not userid:
return json.dumps(make_common_res(4, '缺少userid 请刷新后重试'))
area_list = db_user.query_areaid_list(userid)
if not area_list or len(area_list) < 1:
return json.dumps(make_common_res(5, '用户信息异常'))
area_list = map(int, area_list)
if not str(area_id).lstrip('-').isdigit() or int(area_id) not in area_list:
return json.dumps(make_common_res(5, '辖区id异常请检查后重试'))
task_no = check_param({'task_no': request.form.get('task_no')}, 'task_no')
if not task_no:
return json.dumps(make_common_res(6, '缺少任务编号, 请刷新后重试'))
file_type = check_param({'file_type': request.form.get('file_type')}, 'file_type')
if not file_type:
file_type = 1
# file_type 1: 高空图片 2地面图片
file_type = int(file_type)
if len(request.files.keys()) < 1:
return json.dumps(make_common_res(7, '缺少文件列表, 请刷新后重试'))
task_cross_list = db_task.query_ledger_task_crosses(task_no)
crossid_list = [row['crossid'] for row in task_cross_list]
cross_info_list = db_tmnet.query_cross_infos(crossid_list)
cross_info_dict = {row['name']: row['crossid'] for row in cross_info_list}
checker = RegexSingleMatcher(cross_info_dict)
fail_list, suc_dict, records = [], {}, []
cross_file_dict = {}
cos_client = get_client()
folder_manager = CosFolderManager(cos_client, g_cos_bucket)
for key in request.files.keys():
file_info = request.files[key]
name = file_info.filename
if not '.' in name and name.rsplit('.', 1)[1].lower() in ALLOWED_CROSS_PIC_FILE_TYPE:
continue
find_res = checker.find(name)
if find_res:
# 如果找到了路口id说明文件名称匹配到了路口将该文件按照需求上传到cos目录上的路口文件夹下,同时写表记录文件路径
file_stream = file_info.stream
crossid = cross_info_dict[find_res]
cos_path = f'user/ledger/{nodeid}/{area_id}/{crossid}'
folder_manager.ensure_folder(cos_path)
cos_key = f'{cos_path}/{name}'
cos_client.put_object(Bucket=g_cos_bucket, Key=cos_key, Body=file_stream)
suc_dict[name] = find_res
download_url = f'{g_cos_root}/{cos_key}'
if crossid not in cross_file_dict.keys():
cross_file_dict[crossid] = []
cross_file_dict[crossid].append((crossid, file_type, download_url))
else:
fail_list.append(name)
continue
for crossid in cross_file_dict.keys():
his_pics = db_tmnet.query_cross_pics(crossid, file_type)
if file_type == 2:
if len(his_pics) + len(cross_file_dict[crossid]) > 10:
db_tmnet.del_cross_pic(crossid, file_type, len(his_pics) + len(cross_file_dict[crossid]) - 10)
elif file_type == 1:
if len(his_pics) + len(cross_file_dict[crossid]) > 2:
db_tmnet.del_cross_pic(crossid, file_type, len(his_pics) + len(cross_file_dict[crossid]) - 2)
records.extend(cross_file_dict[crossid])
# 批量写入路口图片记录
if records:
ret = db_tmnet.batch_insert_cross_pic(records)
else:
ret = 0
if ret == len(records):
res = make_common_res(0, 'ok')
res['data'] = {
'total': len(request.files.keys()),
'suc_info': suc_dict,
'fail_list': fail_list
}
return json.dumps(res)
else:
return json.dumps(make_common_res(8, '批量上传路口图片失败'))
class RegexSingleMatcher:
def __init__(self, key_dict):
self.key_dict = key_dict
# 转义并编译正则
keys = [re.escape(k) for k in key_dict.keys() if k]
if keys:
# 正则 search 默认就是找到第一个就停止
self.pattern = re.compile("|".join(keys))
else:
self.pattern = None
def find(self, target_string):
if not self.pattern:
return None
match = self.pattern.search(target_string)
if match:
return match.group() # 返回匹配到的具体字符串
return None
def get_year_week(time_input):
"""
支持 datetime 对象时间戳整数时间字符串
返回格式'2023-40'
"""
if not time_input:
return None
# 使用 ISO 标准获取年周
year, week, _ = time_input.isocalendar()
return f"{year}-{week:02d}"

View File

@ -78,6 +78,43 @@ class TmnetDbHelper(TableDbHelperBase):
cross_list = [cross for cross in cross_list if cross['crossid'] in area_cross_list]
return cross_list
def query_cross_list4task(self, nodeid, area_id):
sql = """
select
if(t2.name is not null, t2.name, t1.name) as name,
t1.crossid,
if (t2.location is not null, t2.location, t1.location) as location,
t1.nodeid,
t1.area_id
from (select name,crossid, location,nodeid, area_id from `cross` where nodeid = %s and area_id = %s and at_edge=0 and isdeleted=0 ) as t1
left join (select name,crossid, location,nodeid, area_id from `cross_ledger_update_info` where nodeid = %s and area_id = %s and at_edge=0 and isdeleted=0 ) as t2 on t1.crossid=t2.crossid
""" % (nodeid, area_id, nodeid, area_id)
cross_list = self.do_select(sql)
virtual_cross_sql = f'select name, crossid, location, nodeid, area_id from user_defined_cross where nodeid = {nodeid} and area_id = {area_id}'
virtual_cross_list = self.do_select(virtual_cross_sql)
cross_list.extend(virtual_cross_list)
if area_id != 0:
area_cross_sql = """
select bound_crosses.crossid, bound_crosses.location, c.name, bound_crosses.nodeid, bound_crosses.area_id
from tmnet.bound_crosses
left join (select if(t2.name is not null, t2.name, t1.name) as name, t1.crossid
from tmnet.`cross` t1
left join tmnet.`cross_ledger_update_info` t2 on t1.crossid = t2.crossid
where t1.nodeid = %s
and t1.area_id = %s
union
select name, crossid
from tmnet.user_defined_cross
where nodeid = %s
and area_id = %s) c on bound_crosses.crossid = c.crossid
where bound_crosses.nodeid = %s
and bound_crosses.area_id = %s
""" % (nodeid, area_id, nodeid, area_id, nodeid, area_id)
area_crosses = self.do_select(area_cross_sql)
area_cross_list = [cross['crossid'] for cross in area_crosses]
cross_list = [cross for cross in cross_list if cross['crossid'] in area_cross_list]
return cross_list
def query_cross_inroads(self, crossid, nodeid):
sql = """
select
@ -628,4 +665,49 @@ class TmnetDbHelper(TableDbHelperBase):
def query_slc_company_dict(self):
sql = "select mapping_code, web_tag from tags.cross_tag_meta where field_eng_name = 'slc_company'"
return self.do_select(sql)
return self.do_select(sql)
def query_cross_infos(self, crossid_list):
crossids = "'" + "', '".join(item for item in crossid_list) + "'"
sql = """
select
if(t2.name is not null, t2.name, t1.name) as name,
t1.crossid,
if (t2.location is not null, t2.location, t1.location) as location,
t1.nodeid,
t1.area_id
from (select name,crossid, location,nodeid, area_id from `cross` where crossid in (%s) and at_edge=0 and isdeleted=0 ) as t1
left join (select name,crossid, location,nodeid, area_id from `cross_ledger_update_info` where crossid in (%s) and at_edge=0 and isdeleted=0 ) as t2 on t1.crossid=t2.crossid
""" % (crossids, crossids)
cross_list = self.do_select(sql)
virtual_cross_sql = f'select name, crossid, location, nodeid, area_id from user_defined_cross where crossid in ({crossids})'
virtual_cross_list = self.do_select(virtual_cross_sql)
cross_list.extend(virtual_cross_list)
return cross_list
def query_cross_pics(self, crossid, pic_type):
sql = """
select * from user_upload_cross_pics where crossid = '%s' and pic_type = %s
""" % (crossid, pic_type)
return self.do_select(sql)
def del_cross_pic(self, crossid, pic_type, del_num):
sql = """
DELETE FROM user_upload_cross_pics
WHERE id in (
SELECT id FROM (
SELECT id FROM user_upload_cross_pics
WHERE crossid = '%s'
AND pic_type = %s
ORDER BY create_time ASC
LIMIT %d
) AS tmp
);
""" % (crossid, pic_type, del_num)
return self.do_execute(sql)
def batch_insert_cross_pic(self, values):
sql = f"""
insert into user_upload_cross_pics (crossid, pic_type, pic_path) values (%s, %s, %s)
"""
return self.do_executemany(sql, values)

View File

@ -143,6 +143,25 @@ def task_del():
return del_task_file_api(dict(request.args))
@app.route('/api/undistributed_cross_list', methods=['GET'])
def undistributed_cross_list():
return get_undistributed_cross_list(dict(request.args))
@app.route('/api/redistributed_cross_list', methods=['POST'])
def redistributed_cross_list():
return do_redistributed_cross_list(request.get_json())
@app.route('/api/op_ledger_task_cross_status', methods=['GET'])
def op_ledger_task_cross_status():
return do_op_ledger_task_cross_status(dict(request.args))
@app.route('/api/batch_upload_cross_pics', methods=['POST'])
def batch_upload_cross_pics():
return do_batch_upload_cross_pics(dict(request.form))
# if __name__ == '__main__':
# init()
# app.run(debug=True)

View File

@ -1,9 +1,9 @@
[roadnet]
;citylist = 110000,130100,130200,140400,150100,320500,321000,330800,330900,341700,350300,350400,350800,370100,440600,440800,441800,445100,450200,450600,420100
;citylist = 110000,130100,130200,150100,350300,370100,440600,450200
citylist = 350100,350300,130200
citylist = 350300
;citylist = 110000,130100,130200,150100,350300,370100,440600,450200,421000,370500,532600,140700,140400,430100,440100,370300,320500,140200
hot_citylist = 350100,350300,130200
hot_citylist = 350300
[db]
host = 120.53.125.169

View File

@ -55,7 +55,7 @@ def init():
def test_get_cross_delay_data():
row_list = db_cross.query_cross_delay_info('CR_11987179_2632645', 350100, '20251013', 't830')
row_list = db_cross.query_cross_delay_info('CR_11902719_2542847', 350300, ['20260101'], 't700')
data = row_list[0]['data']
cross_delay = pb.xl_cross_delayinfo_t()
cross_delay.ParseFromString(data)
@ -150,4 +150,4 @@ def gen_monitor_cross_ledger_info():
if __name__ == '__main__':
init()
gen_monitor_cross_ledger_info()
test_get_cross_delay_data()

View File

@ -1,6 +1,7 @@
# -*- coding=utf-8
from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client
from qcloud_cos.cos_exception import CosServiceError
import sys
import os
import logging
@ -78,6 +79,70 @@ def upload_order_images_to_cos(idx_to_localfile: dict, orderid: str):
return idx_to_cosfile
class CosFolderManager:
def __init__(self, client, bucket):
self.client = client
self.bucket = bucket
def _normalize_folder(self, folder_path):
"""确保文件夹路径以 / 结尾"""
if not folder_path:
return ''
# 去除开头的 /
if folder_path.startswith('/'):
folder_path = folder_path[1:]
# 确保结尾有 /
if not folder_path.endswith('/'):
folder_path += '/'
return folder_path
def folder_exists(self, folder_path):
"""
判断文件夹是否存在
:param folder_path: 文件夹路径 'data/logs/' 'data/logs'
:return: True/False
"""
key = self._normalize_folder(folder_path)
if not key:
return True # 根目录默认存在
try:
self.client.head_object(Bucket=self.bucket, Key=key)
return True
except CosServiceError as e:
if e.get_status_code() == 404:
return False
raise # 其他错误向上抛出
def create_folder(self, folder_path):
"""
创建文件夹上传一个以 / 结尾的空对象
:param folder_path: 文件夹路径
:return: None
"""
key = self._normalize_folder(folder_path)
if not key:
return # 根目录无需创建
self.client.put_object(
Bucket=self.bucket,
Key=key,
Body=b'' # 空内容
)
def ensure_folder(self, folder_path):
"""
确保文件夹存在不存在则创建
:param folder_path: 文件夹路径
:return: True(已存在) / False(刚创建)
"""
if self.folder_exists(folder_path):
return True
else:
self.create_folder(folder_path)
return False
if __name__ == '__main__':
idx_to_localfile = {0:'D:/slgwork/slgcode/wave_survey/1739430848000.jpg',
2:'D:/slgwork/slgcode/wave_survey/1739430851000.jpg'}