data_job/utils/execute_sql.py
2025-03-04 10:24:36 +08:00

71 lines
2.8 KiB
Python

import psycopg2
from utils.log import Log
from utils.loadconfig import get_pg_config
import time
def execute_sql(sql_statement, params=None):
log = Log().getlog()
try:
# 先停五秒钟再执行sql
time.sleep(5)
db_config = get_pg_config()
with psycopg2.connect(**db_config) as connection:
status_message = '成功' if connection.status == psycopg2.extensions.STATUS_READY else '失败'
log.info(f"数据库: {db_config['database']} 是否连接成功: {status_message}")
with connection.cursor() as cursor:
sql_list = [stmt.strip() for stmt in sql_statement.split(';') if stmt.strip()]
for sql_str in sql_list:
log.info(f"执行中SQL:\n{sql_str}")
start_time = time.time() # 记录开始时间
if params is not None:
cursor.execute(sql_str, params)
else:
cursor.execute(sql_str)
elapsed_time = time.time() - start_time # 计算耗时
log.info(f"SQL语句耗时: {elapsed_time:.4f}") # 打印总耗时
connection.commit()
return '完成'
except psycopg2.Error as e:
elapsed_time = time.time() - start_time # 计算耗时
log.error(f"执行SQL语句时出现错误: {e}\nSQL语句: {sql_statement}\n参数: {params}\n耗时: {elapsed_time:.4f}")
return '失败'
except Exception as e:
elapsed_time = time.time() - start_time # 计算耗时
log.error(f"发生未知错误: {e}\nSQL语句: {sql_statement}\n参数: {params}\n耗时: {elapsed_time:.4f}")
return '失败'
def select_execute_sql(sql_statement, params=None):
log = Log().getlog()
re=''
try:
db_config = get_pg_config()
with psycopg2.connect(**db_config) as connection:
status_message = '成功' if connection.status == psycopg2.extensions.STATUS_READY else '失败'
log.info(f"数据库: {db_config['database']} 是否连接成功: {status_message}")
# 执行sql查询并打印查询结果
with connection.cursor() as cursor:
sql_list = [stmt.strip() for stmt in sql_statement.split(';') if stmt.strip()]
for sql_str in sql_list:
log.info(f"执行中SQL:\n{sql_str}")
if params is not None:
# 将单个元素的元组转换为逗号分隔的字符串
cursor.execute(sql_str, params)
else:
cursor.execute(sql_str)
re=cursor.fetchall()
return re
except psycopg2.Error as e:
return []
except Exception as e:
return []