# -*- coding:utf-8 -*- import pymysql import pymysql.cursors import logging from app.comm import * class TableDbHelperBase: def __init__(self, pool): self.db_pool = pool self.DB_Name = '' return def set_dbname(self, dbname: str): """重新设定数据库名称""" self.DB_Name = dbname def close(self, conn, cursor): cursor.close() conn.close() def connect(self): conn, cursor, e = self.db_pool.connect(self.DB_Name) return conn, cursor def show_tables(self): sql = "show tables;" conn, cursor = self.connect() cursor.execute(sql) results = cursor.fetchall() for item in results: print(item) self.close(conn, cursor) def transaction_begin(self): conn, cursor = self.connect() conn.autocommit = False conn.begin() return conn, cursor def transaction_commit(self, conn, cursor): conn.commit() conn.autocommit = True self.close(conn, cursor) def transaction_rollback(self, conn, cursor): conn.rollback() conn.autocommit = True self.close(conn, cursor) def do_select(self, sql_query): """ 通用的执行select语句的函数 :param sql_query: SQL语句 :return: 返回Dict数组 """ logging.info(sql_query) return self.db_pool.do_select(sql_query, self.DB_Name) def do_execute(self, sql_update, cursor=None): """执行更新或删除操作,返回是否操作成功""" try: logging.info(sql_update) if cursor: ret = cursor.execute(sql_update) # 影响行数 return ret conn, cursor = self.connect() ret = cursor.execute(sql_update) # 影响行数 conn.commit() except pymysql.Error as e: # 捕获 PyMySQL 异常 print(f"do_execute '{sql_update}' Error: {e}") logging.error(f"do_execute '{sql_update}' Error: {e}") return 0 return ret def do_executemany(self, sql, data_list, cursor=None): try: logging.info(f"sql = {sql}, data_list = {data_list}") if cursor: ret = cursor.executemany(sql, data_list) return ret conn, cursor = self.connect() ret = cursor.executemany(sql, data_list) conn.commit() except pymysql.Error as e: print(f"do_execute '{sql}' Error: {e}") logging.error(f"do_execute '{sql}' Error: {e}") return 0 return ret def do_update(self, table, field, value, where_field, where_value): sql_update = "update %s set %s='%s' where %s='%s'" % (table, field, value, where_field, where_value) return self.do_execute(sql_update)