sqllineage/utils/sql_parse.py
2025-02-05 14:18:02 +08:00

85 lines
2.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sqlparse
import sqlglot
from sqlglot.expressions import ColumnDef
from utils.log import Log
def odps(schema,table_name,columns,colmapping,hologres_connection):
odps_sql=f'''
CREATE EXTERNAL TABLE IF NOT EXISTS {table_name}
(
{columns}
)
STORED BY 'com.aliyun.odps.jdbc.JdbcStorageHandler'
-- ip设置成经典网络ip 库 加Schema 加表名
location 'jdbc:postgresql://{hologres_connection}/{schema}?ApplicationName=MaxCompute&currentSchema={schema}&preferQueryMode=simple&useSSL=false&table={table_name}/'
TBLPROPERTIES (
'mcfed.mapreduce.jdbc.driver.class'='org.postgresql.Driver',
'odps.federation.jdbc.target.db.type'='holo',
-- 格式为MaxCompute字段1 : "Hologres字段1",MaxCompute字段2 : "Hologres字段2"
'odps.federation.jdbc.colmapping'='{colmapping}'
);
'''
return odps_sql
def extract_create_table(sql_script):
# 创建一个新的Log实例确保每天创建一个新的日志文件
log = Log().getlog()
# 解析 SQL 脚本
parsed = sqlparse.parse(sql_script)
create_table_statements = []
for statement in parsed:
# 关闭格式化选项保持原样
stripped = sqlparse.format(
statement.value,
strip_comments=True,
reindent=False,
keyword_case="lower"
)
# 跳过空语句
if not stripped.strip():
continue
# 可修改条件来匹配其他语句类型
if stripped.upper().strip().startswith(("CREATE TABLE")):
create_table_statements.append(stripped)
return "\n".join(create_table_statements)
def parse_create_table_sql(create_table_sql,hologres_connection):
# 创建一个新的Log实例确保每天创建一个新的日志文件
log = Log().getlog()
result = extract_create_table(create_table_sql)
re_create_table_sql = sqlglot.transpile(result, read="postgres", write="hive")[0]
parsed = sqlglot.parse_one(re_create_table_sql, read='hive')
# 获取表名
table_name = parsed.this.this
columns = []
colmapping = []
# 遍历所有可能包含列定义的子表达式
for expression in parsed.walk():
if isinstance(expression[0], ColumnDef):
# 获取列名
column_name = expression[0].this.this
# 获取数据类型
column_type = expression[0].args['kind'].this.name.upper()
# 如果是TEXT类型则转换为STRING
if column_type == 'TEXT':
column_type = 'STRING'
columns.append(column_name+" "+column_type)
colmapping.append(column_name+":"+column_name)
# 将columns,colmapping转换成字符串用,分割
columns_str = ",\n".join(columns)
colmapping_str = ",".join(colmapping)
table_name_str=str(table_name).split('.')[-1]
schema=str(table_name).split('.')[0]
return odps(schema,table_name_str,columns_str,colmapping_str,hologres_connection)