import psycopg2 from utils.log import Log from utils.loadconfig import get_pg_config import time def execute_sql(sql_statement, params=None): log = Log().getlog() try: # 先停五秒钟再执行sql time.sleep(5) db_config = get_pg_config() with psycopg2.connect(**db_config) as connection: status_message = '成功' if connection.status == psycopg2.extensions.STATUS_READY else '失败' log.info(f"数据库: {db_config['database']} 是否连接成功: {status_message}") with connection.cursor() as cursor: sql_list = [stmt.strip() for stmt in sql_statement.split(';') if stmt.strip()] for sql_str in sql_list: log.info(f"执行中SQL:\n{sql_str}") start_time = time.time() # 记录开始时间 if params is not None: cursor.execute(sql_str, params) else: cursor.execute(sql_str) elapsed_time = time.time() - start_time # 计算耗时 log.info(f"SQL语句耗时: {elapsed_time:.4f} 秒") # 打印总耗时 connection.commit() return '完成' except psycopg2.Error as e: elapsed_time = time.time() - start_time # 计算耗时 log.error(f"执行SQL语句时出现错误: {e}\nSQL语句: {sql_statement}\n参数: {params}\n耗时: {elapsed_time:.4f} 秒") return '失败' except Exception as e: elapsed_time = time.time() - start_time # 计算耗时 log.error(f"发生未知错误: {e}\nSQL语句: {sql_statement}\n参数: {params}\n耗时: {elapsed_time:.4f} 秒") return '失败' def select_execute_sql(sql_statement, params=None): log = Log().getlog() re='' try: db_config = get_pg_config() with psycopg2.connect(**db_config) as connection: status_message = '成功' if connection.status == psycopg2.extensions.STATUS_READY else '失败' log.info(f"数据库: {db_config['database']} 是否连接成功: {status_message}") # 执行sql查询并打印查询结果 with connection.cursor() as cursor: sql_list = [stmt.strip() for stmt in sql_statement.split(';') if stmt.strip()] for sql_str in sql_list: log.info(f"执行中SQL:\n{sql_str}") if params is not None: # 将单个元素的元组转换为逗号分隔的字符串 cursor.execute(sql_str, params) else: cursor.execute(sql_str) re=cursor.fetchall() return re except psycopg2.Error as e: return [] except Exception as e: return []