cross_doctor/app/db_func_base.py

100 lines
2.9 KiB
Python
Raw Normal View History

# -*- coding:utf-8 -*-
import pymysql
import pymysql.cursors
import logging
from app.comm import *
class TableDbHelperBase:
def __init__(self, pool):
self.db_pool = pool
self.DB_Name = ''
return
def set_dbname(self, dbname: str):
"""重新设定数据库名称"""
self.DB_Name = dbname
def close(self, conn, cursor):
cursor.close()
conn.close()
def connect(self):
conn, cursor, e = self.db_pool.connect(self.DB_Name)
return conn, cursor
def show_tables(self):
sql = "show tables;"
conn, cursor = self.connect()
cursor.execute(sql)
results = cursor.fetchall()
for item in results:
print(item)
self.close(conn, cursor)
def transaction_begin(self):
conn, cursor = self.connect()
conn.autocommit = False
conn.begin()
return conn, cursor
def transaction_commit(self, conn, cursor):
conn.commit()
conn.autocommit = True
self.close(conn, cursor)
def transaction_rollback(self, conn, cursor):
conn.rollback()
conn.autocommit = True
self.close(conn, cursor)
def do_select(self, sql_query):
"""
通用的执行select语句的函数
:param sql_query: SQL语句
:return: 返回Dict数组
"""
logging.info(sql_query)
return self.db_pool.do_select(sql_query, self.DB_Name)
def do_execute(self, sql_update, cursor=None):
"""执行更新或删除操作,返回是否操作成功"""
try:
logging.info(sql_update)
if cursor:
ret = cursor.execute(sql_update) # 影响行数
return ret
conn, cursor = self.connect()
ret = cursor.execute(sql_update) # 影响行数
conn.commit()
except pymysql.Error as e:
# 捕获 PyMySQL 异常
print(f"do_execute '{sql_update}' Error: {e}")
logging.error(f"do_execute '{sql_update}' Error: {e}")
return 0
return ret
def do_executemany(self, sql, data_list, cursor=None):
try:
logging.info(f"sql = {sql}, data_list = {data_list}")
if cursor:
ret = cursor.executemany(sql, data_list)
return ret
conn, cursor = self.connect()
ret = cursor.executemany(sql, data_list)
conn.commit()
except pymysql.Error as e:
print(f"do_execute '{sql}' Error: {e}")
logging.error(f"do_execute '{sql}' Error: {e}")
return 0
return ret
def do_update(self, table, field, value, where_field, where_value):
sql_update = "update %s set %s='%s' where %s='%s'" % (table, field, value, where_field, where_value)
return self.do_execute(sql_update)