import hashlib from PIL import Image import base64 import io from app.db_func_base import * from flask import g, has_app_context # md5加密方法 def md5_hash(text): encode_text = text.encode('utf-8') md5 = hashlib.md5() md5.update(encode_text) return md5.hexdigest() def image_to_base64(image_path): with Image.open(image_path) as img: # 将图片转换为字节流 img_byte_arr = io.BytesIO() img.save(img_byte_arr, format='JPEG') img_byte_arr = img_byte_arr.getvalue() # 将字节流编码为Base64字符串 base64_string = base64.b64encode(img_byte_arr).decode('utf-8') return base64_string class WorkstationDbHelper(TableDbHelperBase): def __init__(self, pool): self.db_pool = pool self.DB_Name = 'workstation' def get_traffic_db_name(self, nodeid=None): db_traffic_name = 'traffic' if nodeid and str(nodeid) != '9660': db_traffic_name = f'traffic_{nodeid}' if has_app_context() and hasattr(g, "nodeid") and str(g.nodeid) != '9660': db_traffic_name = f'traffic_{g.nodeid}' return db_traffic_name # 创建用户 def create_user(self, user_name, password, user_role=2, image=None): pwd_md5 = md5_hash(password) image_base64 = '' if image: image_base64 = image_to_base64(image) sql = ( f""" insert into {self.DB_Name}.users(user_name, pwd, user_role, image) value ({user_name}, {pwd_md5}, {user_role}, {image_base64}); """ ) return self.do_execute(sql) def create_gen_corss_report_task(self, nodeid, crossid, cross_name, report_type, items, tasks, create_user_id, create_user_name): sql = ( f""" insert into {self.DB_Name}.gen_report_task(nodeid, report_type, crossid, cross_name, items, tasks, create_user_id, create_user_name) value({nodeid}, {report_type}, '{crossid}', '{cross_name}', '{items}', '{tasks}', '{create_user_id}', '{create_user_name}'); """ ) return self.do_execute(sql) def query_report_task_list(self, nodeid, create_user_id=None, create_user_name=None): sql = (f""" select * from {self.DB_Name}.gen_report_task where nodeid = {nodeid}; """) if create_user_id and create_user_name: sql = ( f""" select * from {self.DB_Name}.gen_report_task where nodeid = {nodeid} and create_user_id = '{create_user_id}' and create_user_name = '{create_user_name}'; """ ) return self.do_select(sql) def update_report_task_status(self, nodeid, create_user_id, create_user_name, crossid, cross_name, report_type, items, tasks, status, download_url=None, failed_reason=None, file_name=None): sql = ( f"""update {self.DB_Name}.gen_report_task set status = {status},failed_reason = '{failed_reason}',download_url = '{download_url}',file_name = '{file_name}' where nodeid = {nodeid} and report_type = {report_type} and crossid = '{crossid}' and cross_name = '{cross_name}' and items = '{items}' and tasks = '{tasks}' and create_user_id = '{create_user_id}' and create_user_name = '{create_user_name}';""" ) return self.do_execute(sql) def create_favorite_cross_artery(self, nodeid, create_user_id, create_user_name, favorite_type, favorite_id, favorite_name, area_id: int): sql = ( f"""insert into {self.DB_Name}.user_favorite(nodeid, user_id, user_name,favorite_type, favorite_id, favorite_name,area_id) value ({nodeid},'{create_user_id}', '{create_user_name}', {favorite_type}, '{favorite_id}', '{favorite_name}',{area_id});""" ) return self.do_execute(sql) def query_favorite_info_list(self, nodeid, create_user_id, create_user_name, area_id: int): sql = ( f""" select * from {self.DB_Name}.user_favorite where nodeid = {nodeid} and user_id = '{create_user_id}' and user_name = '{create_user_name}' and area_id = {area_id}; """ ) return self.do_select(sql) def query_cross_rt_info(self): sql = ( f""" select * from {self.get_traffic_db_name()}.cross_delay_rt where timestamp = (select value from {self.get_traffic_db_name()}.delay_update_info where param = 'latest_tp'); """ ) return self.do_select(sql) # def query_artery_rt_info(self): # sql = ( # f""" # select * from {self.get_traffic_db_name()}.artery_delay_rt where timestamp = (select value from {self.get_traffic_db_name()}.delay_update_info where param = 'latest_tp') and duration = 1 # """ # ) # return self.do_select(sql) def query_artery_name(self, arteryid): sql = ( f"""select `name` from tmnet.artery where arteryid = '{arteryid}'""" ) return self.do_select(sql) def delete_favorite_ids(self, nodeid, ids, user_id, user_name, area_id): sql = ( f"""delete from workstation.user_favorite where user_id = '{user_id}' and user_name = '{user_name}' and nodeid = '{nodeid}' and area_id = {area_id} and favorite_id in ({ids})""" ) ret = self.do_execute(sql) return ret def query_task_list(self, nodeid, user_id, area_id): sql = ( f""" select * from task.task where executor = '{user_id}' and nodeid = {nodeid} and record_state = 0 and area_id = {area_id}; """ ) return self.do_select(sql) def query_task_yesterday_operator(self, yesterday_date, nodeid): sql = ( f""" select task_name, content from (select taskno, nodeid, history_date, content, operator from task.task_history where date(history_date) = '{yesterday_date}' and nodeid = {nodeid}) t1 left join (select taskno, task_name from task.task) t2 on t1.taskno = t2.taskno """ ) return self.do_select(sql) def check_favorite_info_exists(self, nodeid, user_id, user_name, f_id, f_name, area_id: int): sql = ( f""" select * from workstation.user_favorite where user_id = '{user_id}' and user_name = '{user_name}' and nodeid = '{nodeid}' and favorite_id = '{f_id}' and favorite_name = '{f_name}' and area_id = {area_id}; """ ) return self.do_select(sql) def query_yesterday_task_data(self, yesterday_date, nodeid, user_name, area_id): sql_query = (f""" select t2.task_name, content from (select * from task.task_history where date(history_date)='{yesterday_date}' and nodeid='{nodeid}' and content like '%%operation%%任务进度%%content%%' and operator = '{user_name}' and area_id = {area_id} order by history_date) t1 left join (select task_name,taskno from task.task where record_state = 0 and nodeid = '{nodeid}' and executor = '{user_name}') t2 on t1.taskno = t2.taskno; """) return self.do_select(sql_query) def query_cross_delay_info(self, nodeid, start, cross_list): conn, cursor = self.connect() try: sql = f'''select cd1.* from {self.get_traffic_db_name(nodeid)}.cross_delay as cd1 join (select crossid, max(day) max_day from {self.get_traffic_db_name(nodeid)}.cross_delay where tp_start = %s and crossid in %s group by crossid) as t1 on cd1.crossid = t1.crossid and cd1.day = t1.max_day where cd1.crossid in %s and cd1.tp_start = %s and cd1.day > 20251101''' print(cursor.mogrify(sql, (start, cross_list, cross_list, start))) cursor.execute(sql, (start, cross_list, cross_list, start)) result = cursor.fetchall() self.close(conn, cursor) return result, None except Exception as error: self.close(conn, cursor) return None, error def query_favorite_crosses(self, userid, nodeid, area_id): sql = """ select favorite_id from workstation.user_favorite where user_id = '%s' and nodeid = %s and area_id = %s and favorite_type = 1 """ % (userid, nodeid, area_id) return self.do_select(sql)