From 812877f45d3032b747ed7fa1c8a1ec10bd61fdf4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=B9=E8=88=9F?= <13007110208@163.com> Date: Wed, 5 Feb 2025 14:18:02 +0800 Subject: [PATCH] =?UTF-8?q?=E7=AC=AC=E4=B8=80=E6=AC=A1=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .dockerignore | 4 + .idea/.gitignore | 8 ++ demo.py | 169 ++++++++++++++++++++++++++++++++++++++++++ docker-compose.yaml | 14 ++++ dockerfile | 20 +++++ requirements.txt | Bin 0 -> 98 bytes sqllineage.py | 34 +++++++++ templates/index.html | 116 +++++++++++++++++++++++++++++ templates/index1.html | 108 +++++++++++++++++++++++++++ utils/log.py | 58 +++++++++++++++ utils/sql_parse.py | 85 +++++++++++++++++++++ 11 files changed, 616 insertions(+) create mode 100644 .dockerignore create mode 100644 .idea/.gitignore create mode 100644 demo.py create mode 100644 docker-compose.yaml create mode 100644 dockerfile create mode 100644 requirements.txt create mode 100644 sqllineage.py create mode 100644 templates/index.html create mode 100644 templates/index1.html create mode 100644 utils/log.py create mode 100644 utils/sql_parse.py diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..e833929 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,4 @@ +.venv/ +.idea/ +.deploy/ +logs/ diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..35410ca --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# 默认忽略的文件 +/shelf/ +/workspace.xml +# 基于编辑器的 HTTP 客户端请求 +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/demo.py b/demo.py new file mode 100644 index 0000000..8b67213 --- /dev/null +++ b/demo.py @@ -0,0 +1,169 @@ +import sqlparse +import sqlglot +from sqlglot.expressions import ColumnDef + + +def extract_create_table(sql_script): + # 解析 SQL 脚本 + parsed = sqlparse.parse(sql_script) + + create_table_statements = [] + + for statement in parsed: + # 关闭格式化选项保持原样 + stripped = sqlparse.format( + statement.value, + strip_comments=True, + reindent=False, + keyword_case="lower" + ) + + # 跳过空语句 + if not stripped.strip(): + continue + + # 可修改条件来匹配其他语句类型 + if stripped.upper().strip().startswith(("CREATE TABLE")): + create_table_statements.append(stripped) + + return "\n".join(create_table_statements) + + +# 原始 SQL 脚本 +sql_script = """ +BEGIN; + +/* +DROP TABLE ods.track_log_002; +*/ + +-- Type: TABLE ; Name: track_log_002; Owner: sdk_statis_developer + +CREATE TABLE ods.track_log_002 ( + appid bigint NOT NULL, + app_ver text, + sdk_ver text, + channel text, + country text, + province text, + city text, + isp text, + ip text, + device_width integer, + device_height integer, + device_id text NOT NULL, + device_lang text, + device_model text, + device_brand text, + device_os text, + device_type text, + event_name text NOT NULL, + event_type text, + event_time bigint NOT NULL, + net_type text, + user_id text, + order_id text, + amount bigint, + platform text, + status integer, + servid text, + server_name text, + role_id text, + role_name text, + role_level text, + job_id text, + job_name text, + var1 text, + var2 text, + var3 text, + var4 text, + var5 text, + var6 text, + var7 text, + var8 text, + var9 text, + var10 text, + var11 text, + var12 text, + var13 text, + var14 text, + var15 text, + var16 text, + var17 text, + var18 text, + var19 text, + var20 text, + var21 text, + var22 text, + var23 text, + var24 text, + var25 text, + var26 text, + var27 text, + var28 text, + var29 text, + var30 text, + ds text NOT NULL, + prodid text, + prod_name text, + sub_servid text, + sub_server_name text +) + PARTITION BY LIST (ds)with ( +orientation = 'column', +storage_format = 'orc', +auto_partitioning_enable = 'true', +auto_partitioning_num_hot = '90', +auto_partitioning_num_precreate = '2', +auto_partitioning_num_retention = '191', +auto_partitioning_schd_start_time = '1970-01-01 00:00:00', +auto_partitioning_time_format = '', +auto_partitioning_time_unit = 'day', +auto_partitioning_time_zone = 'PRC', +bitmap_columns = 'appid,event_name,ds,role_id,device_id,servid,user_id,country,channel,province,status,city,device_width,var4,var3,var2,var1,amount,device_height,var12,var13,var14,var15,var10,var11,var9,var8,var7,var6,var5,event_time', +clustering_key = 'appid:asc', +dictionary_encoding_columns = '', +segment_key = 'event_time', +table_group = 'sdk_statis_tg_s80', +table_storage_mode = 'hot', +time_to_live_in_seconds = '16416000' +); + + + +COMMENT ON TABLE ods.track_log_002 IS NULL; +ALTER TABLE ods.track_log_002 OWNER TO sdk_statis_developer; + + +END; +""" + +# 执行解析 +result = extract_create_table(sql_script) + +re_create_table_sql = sqlglot.transpile(result, read="postgres", write="hive")[0] + +parsed = sqlglot.parse_one(re_create_table_sql, read='hive') + +# 获取表名 +table_name = parsed.this.this + +columns = [] +# 遍历所有可能包含列定义的子表达式 +for expression in parsed.walk(): + if isinstance(expression[0], ColumnDef): + # 获取列名 + column_name = expression[0].this.this + # 获取数据类型 + column_type = expression[0].args['kind'].this.name.upper() + # 如果是TEXT类型,则转换为STRING + if column_type == 'TEXT': + column_type = 'STRING' + columns.append({'name': column_name, 'type': column_type}) + +# 输出表名和字段信息 +print(f"表名称: {table_name}") + +# 输出结果 +for column in columns: + print(f"字段名称: {column['name']}, 字段类型: {column['type']}") diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..9ef109a --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,14 @@ +version: '3.4' +services: + sql-runner: + build: + context: . + dockerfile: Dockerfile + restart: always + container_name: sqllineage + image: sqllineage:latest + ports: + - "8778:8778" + + +# docker-compose up --build \ No newline at end of file diff --git a/dockerfile b/dockerfile new file mode 100644 index 0000000..85e5c88 --- /dev/null +++ b/dockerfile @@ -0,0 +1,20 @@ +# 使用阿里云的 Python 3.11 镜像 +FROM registry.cn-hangzhou.aliyuncs.com/yinzhou_docker_hub/python:3.11-alpine + +# 设置工作目录 +WORKDIR /opt/sqllineage + +# 设置时区为 Asia/Shanghai +ENV TZ=Asia/Shanghai + +# 将 requirements.txt 文件复制到容器中 +COPY requirements.txt . + +# 安装依赖 +RUN pip install --no-cache-dir -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple + +# 将其他文件复制到容器中 +COPY . . + +# 运行应用程序 +ENTRYPOINT ["python3", "sqllineage.py"] \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..2dbdf802f083cbed606b127296cf66bd698102b1 GIT binary patch literal 98 zcmezWub82bA%~%WA(5eop_n0+!4?P&81xuSf!LUVmw^i?3Q~~{#Q6**V0l9ZL!hiV Z5F3J3qygm;f$Flsvc_O>J)kKd-2j!D4?O?? literal 0 HcmV?d00001 diff --git a/sqllineage.py b/sqllineage.py new file mode 100644 index 0000000..12dde21 --- /dev/null +++ b/sqllineage.py @@ -0,0 +1,34 @@ +from flask import Flask, render_template, request, jsonify +from utils.sql_parse import parse_create_table_sql +from utils.log import Log + + +app = Flask(__name__) + +@app.route('/') +def index(): + return render_template('index.html') + +@app.route('/convert', methods=['POST']) +def convert_sql(): + # 创建一个新的Log实例,确保每天创建一个新的日志文件 + log = Log().getlog() + sql_input = request.form['sql'] + hologres_connection = request.form['hologresConnection'] + log.info("SQL Input: %s", sql_input) + log.info("SQL hologres_connection: %s", hologres_connection) + try: + parsed_result=parse_create_table_sql(sql_input,hologres_connection) + + result = { + 'target_tables': parsed_result, + 'message': 'SQL processed successfully.' + } + except Exception as e: + result = {'error': str(e)} + log.info("SQL result: %s", result) + return jsonify(result) + +if __name__ == '__main__': + # 指定host和port,这里使用0.0.0.0可以让服务器被外部访问 + app.run(host='0.0.0.0', port=8778, debug=True) \ No newline at end of file diff --git a/templates/index.html b/templates/index.html new file mode 100644 index 0000000..3985e3d --- /dev/null +++ b/templates/index.html @@ -0,0 +1,116 @@ + + + + + SQL Processor + + + +

SQL Processor

+
+ + + + +
+
+ +
+ + + + \ No newline at end of file diff --git a/templates/index1.html b/templates/index1.html new file mode 100644 index 0000000..289e150 --- /dev/null +++ b/templates/index1.html @@ -0,0 +1,108 @@ + + + + + SQL Processor + + + +

SQL Processor

+
+ + +
+
+ +
+ + + + \ No newline at end of file diff --git a/utils/log.py b/utils/log.py new file mode 100644 index 0000000..3650a39 --- /dev/null +++ b/utils/log.py @@ -0,0 +1,58 @@ +import logging +import os +from datetime import datetime + +# 定义全局变量 log_path +cur_path = os.path.dirname(os.path.realpath(__file__)) +log_path = os.path.join(os.path.dirname(cur_path), 'logs') + + +class Log(): + def __init__(self, logger_name='my_logger'): + self.logger = logging.getLogger(logger_name) + if self.logger.hasHandlers(): + self.logger.handlers.clear() + self.logger.setLevel(logging.INFO) + + if not os.path.exists(log_path): + os.makedirs(log_path) + + self.update_log_file() + + def update_log_file(self): + current_date = datetime.now().strftime("%Y_%m_%d") + self.log_name = os.path.join(log_path, f'{current_date}.log') + + for handler in self.logger.handlers[:]: + self.logger.removeHandler(handler) + + fh = logging.FileHandler(self.log_name, 'a', encoding='utf-8') + fh.setLevel(logging.INFO) + + ch = logging.StreamHandler() + ch.setLevel(logging.INFO) + + formatter = logging.Formatter('[%(asctime)s] %(filename)s line:%(lineno)d [%(levelname)s]%(message)s') + fh.setFormatter(formatter) + ch.setFormatter(formatter) + + self.logger.addHandler(fh) + self.logger.addHandler(ch) + + def getlog(self): + today = datetime.now().strftime("%Y_%m_%d") + log_date = os.path.basename(self.log_name).split('.')[0] + if today != log_date: + self.update_log_file() + return self.logger + + def info(self, msg, *args, **kwargs): + logger = self.getlog() + logger.info(msg, *args, **kwargs) + + +if __name__ == "__main__": + log = Log().getlog() + log.info("---测试开始----") + log.error("操作步骤1,2,3") + log.warning("----测试结束----") diff --git a/utils/sql_parse.py b/utils/sql_parse.py new file mode 100644 index 0000000..c032677 --- /dev/null +++ b/utils/sql_parse.py @@ -0,0 +1,85 @@ +import sqlparse +import sqlglot +from sqlglot.expressions import ColumnDef +from utils.log import Log + +def odps(schema,table_name,columns,colmapping,hologres_connection): + + odps_sql=f''' +CREATE EXTERNAL TABLE IF NOT EXISTS {table_name} +( +{columns} +) +STORED BY 'com.aliyun.odps.jdbc.JdbcStorageHandler' +-- ip设置成经典网络ip 库 加Schema 加表名 +location 'jdbc:postgresql://{hologres_connection}/{schema}?ApplicationName=MaxCompute¤tSchema={schema}&preferQueryMode=simple&useSSL=false&table={table_name}/' +TBLPROPERTIES ( +'mcfed.mapreduce.jdbc.driver.class'='org.postgresql.Driver', +'odps.federation.jdbc.target.db.type'='holo', +-- 格式为:MaxCompute字段1 : "Hologres字段1",MaxCompute字段2 : "Hologres字段2" +'odps.federation.jdbc.colmapping'='{colmapping}' +); +''' + return odps_sql + +def extract_create_table(sql_script): + # 创建一个新的Log实例,确保每天创建一个新的日志文件 + log = Log().getlog() + # 解析 SQL 脚本 + parsed = sqlparse.parse(sql_script) + + create_table_statements = [] + + for statement in parsed: + # 关闭格式化选项保持原样 + stripped = sqlparse.format( + statement.value, + strip_comments=True, + reindent=False, + keyword_case="lower" + ) + + # 跳过空语句 + if not stripped.strip(): + continue + + # 可修改条件来匹配其他语句类型 + if stripped.upper().strip().startswith(("CREATE TABLE")): + create_table_statements.append(stripped) + + return "\n".join(create_table_statements) + +def parse_create_table_sql(create_table_sql,hologres_connection): + # 创建一个新的Log实例,确保每天创建一个新的日志文件 + log = Log().getlog() + + result = extract_create_table(create_table_sql) + + re_create_table_sql = sqlglot.transpile(result, read="postgres", write="hive")[0] + + parsed = sqlglot.parse_one(re_create_table_sql, read='hive') + + # 获取表名 + table_name = parsed.this.this + + columns = [] + colmapping = [] + # 遍历所有可能包含列定义的子表达式 + for expression in parsed.walk(): + if isinstance(expression[0], ColumnDef): + # 获取列名 + column_name = expression[0].this.this + # 获取数据类型 + column_type = expression[0].args['kind'].this.name.upper() + # 如果是TEXT类型,则转换为STRING + if column_type == 'TEXT': + column_type = 'STRING' + columns.append(column_name+" "+column_type) + colmapping.append(column_name+":"+column_name) + # 将columns,colmapping转换成字符串用,分割 + columns_str = ",\n".join(columns) + colmapping_str = ",".join(colmapping) + table_name_str=str(table_name).split('.')[-1] + schema=str(table_name).split('.')[0] + + return odps(schema,table_name_str,columns_str,colmapping_str,hologres_connection) \ No newline at end of file