#import logging import time import math import datetime import pymysql import pymysql.cursors import pymysql.connections from dbutils.pooled_db import PooledDB import logging class DatabaseManager: def __init__(self, dbinfo): self.params = dbinfo def init_pool(self, dbinfo, maxcon=300, mincached=1): logging.info("Created a new connection pool.") self.params = dbinfo self.db = PooledDB( creator=pymysql, maxconnections=maxcon, mincached=mincached, blocking=True, host=self.params['host'], port=self.params['port'], user=self.params['user'], password=self.params['password'], database=self.params['db'], charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor, autocommit=True) def close(self, conn, cursor): try: cursor.close() conn.close() except Exception as e: logging.error(f"Error closing connection: {str(e)}") def connect(self, db_name='tmnet'): sql = f"use {db_name}" conn = self.db.connection() cursor = conn.cursor() try: cursor.execute(sql) logging.info(f"Successfully change to MySQL database to {db_name}") except Exception as e: print(f"Error executing SQL: {str(e)}") return conn, cursor, e return conn, cursor, None def transaction_begin(self, db_name): conn, cursor, e = self.connect(db_name) conn.autocommit = False conn.begin() def transaction_commit(self, db_name): conn, cursor, e = self.connect(db_name) conn.commit() conn.autocommit = True def transaction_rollback(self, db_name): conn, cursor, e = self.connect(db_name) conn.rollback() conn.autocommit = True def show_tables(self, db_name): sql = "SHOW TABLES;" conn, cursor, e = self.connect(db_name) if e is not None: self.close(conn, cursor) return e try: cursor.execute(sql) results = cursor.fetchall() for item in results: print(item) except Exception as e: logging.error(f"Error executing SQL: {str(e)}") finally: self.close(conn, cursor) def do_select(self, sql_query, db_name): """ Executes a SELECT SQL query and returns results as a list of dictionaries. """ logging.debug(sql_query) conn, cursor, e = self.connect(db_name) if e is not None: self.close(conn, cursor) logging.error(f"Error executing SQL: {str(e)}") return [] try: cursor.execute(sql_query) result_set = cursor.fetchall() data = [row for row in result_set] except Exception as e: logging.error(f"Error executing SQL: {str(e)}") data = [] finally: self.close(conn, cursor) return data def connect2(self): return self.db.connection()