cross_doctor/tool/mysql_common_connector_pool.py

108 lines
3.1 KiB
Python
Raw Normal View History

2025-10-10 14:38:22 +08:00
#import logging
import time
import math
import datetime
import pymysql
import pymysql.cursors
import pymysql.connections
from dbutils.pooled_db import PooledDB
import logging
class DatabaseManager:
def __init__(self, dbinfo):
self.params = dbinfo
def init_pool(self, dbinfo, maxcon=300, mincached=1):
logging.info("Created a new connection pool.")
self.params = dbinfo
self.db = PooledDB(
creator=pymysql,
maxconnections=maxcon,
mincached=mincached,
blocking=True,
host=self.params['host'],
port=self.params['port'],
user=self.params['user'],
password=self.params['password'],
database=self.params['db'],
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor,
autocommit=True)
def close(self, conn, cursor):
try:
cursor.close()
conn.close()
except Exception as e:
logging.error(f"Error closing connection: {str(e)}")
def connect(self, db_name='tmnet'):
sql = f"use {db_name}"
conn = self.db.connection()
cursor = conn.cursor()
try:
cursor.execute(sql)
logging.info(f"Successfully change to MySQL database to {db_name}")
except Exception as e:
print(f"Error executing SQL: {str(e)}")
return conn, cursor, e
return conn, cursor, None
def transaction_begin(self, db_name):
conn, cursor, e = self.connect(db_name)
conn.autocommit = False
conn.begin()
def transaction_commit(self, db_name):
conn, cursor, e = self.connect(db_name)
conn.commit()
conn.autocommit = True
def transaction_rollback(self, db_name):
conn, cursor, e = self.connect(db_name)
conn.rollback()
conn.autocommit = True
def show_tables(self, db_name):
sql = "SHOW TABLES;"
conn, cursor, e = self.connect(db_name)
if e is not None:
self.close(conn, cursor)
return e
try:
cursor.execute(sql)
results = cursor.fetchall()
for item in results:
print(item)
except Exception as e:
logging.error(f"Error executing SQL: {str(e)}")
finally:
self.close(conn, cursor)
def do_select(self, sql_query, db_name):
"""
Executes a SELECT SQL query and returns results as a list of dictionaries.
"""
logging.debug(sql_query)
conn, cursor, e = self.connect(db_name)
if e is not None:
self.close(conn, cursor)
logging.error(f"Error executing SQL: {str(e)}")
return []
try:
cursor.execute(sql_query)
result_set = cursor.fetchall()
data = [row for row in result_set]
except Exception as e:
logging.error(f"Error executing SQL: {str(e)}")
data = []
finally:
self.close(conn, cursor)
return data
def connect2(self):
return self.db.connection()