# -*- coding: utf-8 -*- # @Author: Owl # @Date: 2025/11/10 17:59 # @Description: # -*- coding:utf-8 -*- #import logging import pymysql import pymysql.cursors from datetime import datetime from flask import g from app.db_func_base import * class TaskDbHelper(TableDbHelperBase): def __init__(self, pool): self.db_pool = pool self.DB_Name = 'task' def query_task(self, taskno, nodeid, area_id): sql_query = "select * from `task` where nodeid='%s' and taskno='%s' and area_id = %s" % (nodeid, taskno, area_id) tasks = self.do_select(sql_query) if len(tasks) != 1: logging.error('query_ledger error! %s' % (sql_query)) return None return tasks[0] def query_task_executor(self, nodeid, area_id): sql_query = "select user_name from user.user where userno in (select userno from user.area_user where nodeid = '%s' and area_id = %s) and department = '信号调优团队'" % (nodeid, area_id) executors = self.do_select(sql_query) if len(executors) <= 0: logging.error('query_task_executor is null! %s' % (sql_query)) return [] return executors def query_task_src(self, nodeid, area_id): sql_query = "select distinct src from `task_src` where nodeid='%s' and area_id = %s" % (nodeid, area_id) srcs = self.do_select(sql_query) if len(srcs) <= 0: logging.error('query_task_src is null! %s' % (sql_query)) return [] return srcs def query_task_type(self, nodeid, area_id): sql_query = "select * from `task_type` where nodeid='%s' and area_id = %s" % (nodeid, area_id) types = self.do_select(sql_query) if len(types) <= 0: logging.error('query_task_type is null! %s' % (sql_query)) return [] return types def query_all_tak_list(self, nodeid, area_id): """查询路口的全部台账履历""" sql_query = "select * from `task` where nodeid='%s' and area_id = %s and record_state=0 order by task_state asc, progress asc, plan_end_time desc" % (nodeid, area_id) res_list = self.do_select(sql_query) for res in res_list: logging.info(res['update_time']) if res['update_time'] is not None: res['update_time'] = res['update_time'].strftime('%Y-%m-%d %H:%M:%S') return res_list def query_task_list(self, task_name,task_type,data_type, task_class,plan_begin_time,plan_end_time,publish_time,task_state,executor,task_src, nodeid): """查询路口的全部台账履历""" sql_query = "select * from `task` where nodeid='%s' and record_state=0" % (nodeid) if task_name is not None and len(task_name)>0: sql_query += " and task_name like '%%%s%%'" %(task_name) if task_type is not None and len(task_type)>0: sql_query += " and task_type='%s'" %(task_type) if data_type is not None and len(data_type) > 0: data_type += " and task_type='%s'" % (data_type) if task_class is not None and len(task_class)>0: sql_query += " and task_class='%s'" %(task_class) if plan_begin_time is not None and len(plan_begin_time)>0: sql_query += " and plan_begin_time='%s'" %(plan_begin_time) if plan_end_time is not None and len(plan_end_time)>0: sql_query += " and plan_end_time='%s'" %(plan_end_time) if publish_time is not None and len(publish_time)>0: sql_query += " and publish_time='%s'" %(publish_time) if task_state is not None and len(task_state)>0: sql_query += " and task_state='%s'" %(task_state) if executor is not None and len(executor)>0: sql_query += " and executor='%s'" %(executor) if task_src is not None and len(task_src) > 0: sql_query += " and task_src='%s'" % (task_src) res_list = self.do_select(sql_query) if len(res_list) > 0: for res in res_list: logging.info(res['update_time']) if res['update_time'] is not None: res['update_time'] = res['update_time'].strftime('%Y-%m-%d %H:%M:%S') return res_list def query_completed_task_cross_list(self, nodeid, area_id): sql_query = "select crossid,name from tmnet.cross where crossid in (select crossids from task where task_state=4 and nodeid='%s') and nodeid='%s';" % (nodeid, nodeid) return self.do_select(sql_query) def query_completed_task_by_cross(self, crossid, nodeid, area_id): sql_query = "select * from task where task_state=4 and crossids='%s' and nodeid='%s' and area_id = %s;" % (crossid, nodeid, area_id) return self.do_select(sql_query) def get_last_taskno(self, creatorid, task_name, nodeid): sql_query = "select max(taskno) as taskno from task where creatorid='%s' and task_name='%s' and nodeid='%s';" % (creatorid,task_name, nodeid) tasknos = self.do_select(sql_query) if len(tasknos) != 1: logging.error('get_last_taskno error! %s' % (sql_query)) return None return tasknos[0]['taskno'] def get_task_no(self, nodeid, area_id, task_name, task_type, task_class, data_type, plan_begin_time, plan_end_time, executor, task_src, comment): sql = f""" select taskno from task where nodeid = '{nodeid}' and area_id = {area_id} and task_name = '{task_name}' and task_type = {task_type} and task_class = {task_class} and data_type = {data_type} and plan_begin_time = {plan_begin_time} and plan_end_time = {plan_end_time} and executor = '{executor}' and task_src = '{task_src}' and comment = '{comment}' """ res = self.do_select(sql) if len(res) > 0: return res[0]['taskno'] return None def delete_task(self, taskno, nodeid, area_id): taskno_str = ','.join(str(num) for num in taskno) sql = "update `task` set record_state=1 where taskno in (%s) and nodeid='%s' and area_id = %s;" % (taskno_str, nodeid, area_id) # 执行删除操作 return self.do_execute(sql) def query_task_history(self, taskno, nodeid, area_id): """查询路口的全部台账履历""" sql_query = "select * from task_history where taskno='%s' and nodeid='%s' and area_id = %s order by history_date;" % (taskno, nodeid, area_id) return self.do_select(sql_query) def query_task_progress_history(self, yesterday_date, nodeid, area_id): """查询路口的全部台账履历""" sql_query = (f""" select t2.task_name, content from (select * from task.task_history where date(history_date)='{yesterday_date}' and nodeid='{nodeid}' and area_id = {area_id} and content like '%%operation%%任务进度%%content%%' order by history_date) t1 left join (select task_name,taskno from task.task) t2 on t1.taskno = t2.taskno; """) return self.do_select(sql_query) def add_task_history(self, taskno, nodeid, area_id, operator,remark): """查询路口的全部台账履历""" now = datetime.now() time_str = now.strftime('%Y-%m-%d %H:%M:%S') sql_insert = "insert into task_history (taskno,nodeid, area_id, operator,content, history_date) values('%s',%s,'%s','%s','%s','%s')" % (taskno, nodeid, area_id, operator,remark, time_str) count = self.do_execute(sql_insert) if count != 1: logging.error('add_task_history error! %s' % (sql_insert)) return 0 return count def add_task(self, timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time, executor, progress, task_state, description, crossids, sectionids, arteryids, comment, record_state, task_src, task_class, nodeid, area_id): sql_insert = "insert into task (timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time," \ " executor, progress, task_state, description, crossids, sectionids, arteryids, comment," \ " record_state, task_src, task_class, nodeid, area_id) values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s', %s)" % ( timestamp, creatorid, task_name, task_type, data_type, plan_begin_time, plan_end_time,publish_time, executor, progress, task_state, description, crossids, sectionids, arteryids, comment, record_state, task_src, task_class, nodeid, area_id) count = self.do_execute(sql_insert) if count != 1: logging.error('add_task error! %s' % (sql_insert)) return 0 return count def update_task_info(self, taskno, modify_data, nodeid, area_id): update_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') sql = f"update task.task set %s, update_time = '%s' where taskno = '%s' and nodeid = %s and area_id = %s" %(modify_data, update_time, taskno, nodeid, area_id) return self.do_execute(sql) ########################################################################## def insert_upload_file_record(self, taskno, nodeid, download_url, area_id): sql = f""" insert into task.task_upload_file_record (taskno, nodeid, download_url, area_id) values (%d , %d, '%s', %s) """ % (int(taskno), int(nodeid), download_url, area_id) return self.do_execute(sql) def query_task_file(self, nodeid, area_id): sql = f""" select * from task.task_upload_file_record where nodeid = %d and area_id = %s """ % (int(nodeid), area_id) row_list = self.do_select(sql) tmp_dict = {} for row in row_list: if row['taskno'] not in tmp_dict.keys(): tmp_dict[row['taskno']] = [{ 'id': row['id'], 'download_url': row['download_url'] }] else: tmp_dict[row['taskno']].append({ 'id': row['id'], 'download_url': row['download_url'] }) return tmp_dict def query_task_file_by_taskno(self, nodeid, taskno, area_id): sql = f""" select id, download_url from task.task_upload_file_record where nodeid = %d and taskno = %d and area_id = %d """ % (int(nodeid), int(taskno), int(area_id)) row_list = self.do_select(sql) download_url_list = [] for row in row_list: download_url_list.append({ 'id': row['id'], 'download_url': row['download_url'] }) return download_url_list def del_task_file(self, nodeid, taskno, file_name, file_id): download_url = f'/api/download_task_file?nodeid={nodeid}&taskno={taskno}&file_name={file_name}' sql = f""" delete from task.task_upload_file_record where nodeid = %d and taskno = %d and download_url = '%s' and id = %d """ % (int(nodeid), int(taskno), download_url, int(file_id)) return self.do_execute(sql) def update_task_state(self, nodeid, area_id, task_no, state): sql = "update task set task_state='%s' where nodeid='%s' and area_id='%s' and taskno='%s'" % (state, nodeid, area_id, task_no) return self.do_execute(sql) # 私有属性示例 __secret_code = "Private Info" # 私有方法示例 def __private_activity(self): print("This is a private activity.") # # if __name__ == '__main__': # tt_5min = get_latest_5min_timestamp() # print(tt_5min) # print(get_today_str())