data_job/utils/read_sql_files.py
2025-03-04 10:24:36 +08:00

110 lines
3.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
from utils.execute_sql import execute_sql
from utils.date_time import get_parameter
from utils.log import Log
from utils.loadconfig import get_path
def read_sql_files(directory):
log = Log().getlog()
directory = get_path() + '/sql/' + directory
"""
读取指定目录下所有.sql结尾的文件内容并打印
参数:
directory (str): 要读取文件的目录路径
返回:
None
"""
# 检查目录是否存在
if not os.path.exists(directory):
log.warning(f"目录 {directory} 不存在,请检查路径。")
return
# 检查目录是否为空
if not os.listdir(directory):
log.warning(f"目录 {directory} 为空。")
return
# 遍历目录及其子目录下的所有文件
for root, dirs, files in os.walk(directory):
for file in files:
if file.endswith('.sql'):
file_path = os.path.join(root, file)
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 遍历字典,将字符串中的占位符替换为对应的值
for key, value in get_parameter().items():
content = content.replace(key, str(value))
log.info(f"文件路径: {file_path}")
try:
returnstr = execute_sql(content)
log.info(f"执行结果: {returnstr}")
log.info("-" * 50)
except Exception as e:
log.error(f"执行 SQL 文件 {file_path} 时出现错误: {e}")
except UnicodeDecodeError:
log.error(f"文件 {file_path} 编码格式可能不是utf-8无法正确读取请检查。")
except Exception as e:
log.error(f"读取文件 {file_path} 时出现错误: {e}")
def read_one_game_sql_files(directory,game_information):
log = Log().getlog()
directory = get_path() + '/one_game_sql/' + directory
"""
读取指定目录下所有.sql结尾的文件内容并打印
参数:
directory (str): 要读取文件的目录路径
返回:
None
"""
# 检查目录是否存在
if not os.path.exists(directory):
log.warning(f"目录 {directory} 不存在,请检查路径。")
return
# 检查目录是否为空
if not os.listdir(directory):
log.warning(f"目录 {directory} 为空。")
return
# 遍历目录及其子目录下的所有文件
for root, dirs, files in os.walk(directory):
for file in files:
if file.endswith('.sql'):
file_path = os.path.join(root, file)
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
parameter=get_parameter()
parameter.update(game_information)
# 遍历字典,将字符串中的占位符替换为对应的值
for key, value in parameter.items():
content = content.replace(key, str(value))
log.info(f"文件路径: {file_path}")
try:
returnstr = execute_sql(content)
log.info(f"执行结果: {returnstr}")
log.info("-" * 50)
except Exception as e:
log.error(f"执行 SQL 文件 {file_path} 时出现错误: {e}")
except UnicodeDecodeError:
log.error(f"文件 {file_path} 编码格式可能不是utf-8无法正确读取请检查。")
except Exception as e:
log.error(f"读取文件 {file_path} 时出现错误: {e}")
if __name__ == '__main__':
read_sql_files()