From 54b3253ff9d6cdc82e7a20c9bd3db20ba7663c1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=B9=E8=88=9F?= <13007110208@163.com> Date: Tue, 4 Mar 2025 10:24:36 +0800 Subject: [PATCH] =?UTF-8?q?=E7=AC=AC=E4=B8=80=E6=AC=A1=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .dockerignore | 4 + .gitignore | 1 + README.md | 12 ++ config/config.ini | 12 ++ deploy.sh | 40 +++++ docker_deploy/docker-compose.yml | 11 ++ dockerfile | 20 +++ job_one_game.py | 41 +++++ job_test.py | 26 +++ one_game_sql/dim/first_user.sql | 28 ++++ one_game_sql/dwd_after/revenue_df.sql | 148 ++++++++++++++++++ one_game_sql/dwd_before/active_account_df.sql | 30 ++++ one_game_sql/dwd_before/active_df.sql | 18 +++ one_game_sql/dwd_before/order_df.sql | 43 +++++ one_game_sql/dwd_before/order_user_df.sql | 29 ++++ .../dws_after/game_user_data_month.sql | 65 ++++++++ one_game_sql/dws_before/game_user_data.sql | 67 ++++++++ requirements.txt | Bin 0 -> 94 bytes sql/ads/data_screen.sql | 34 ++++ sql/ads/game_data.sql | 62 ++++++++ sql/ads/order_ltv.sql | 41 +++++ sql/ads/order_retention.sql | 42 +++++ sql/ads/revenue_data.sql | 64 ++++++++ sql/ads/user_retention.sql | 41 +++++ sql/dim/first_user.sql | 41 +++++ sql/dim/game_product_relation.sql | 50 ++++++ sql/dwd_after/revenue_df.sql | 148 ++++++++++++++++++ sql/dwd_before/active_account_df.sql | 31 ++++ sql/dwd_before/active_df.sql | 17 ++ sql/dwd_before/order_df.sql | 97 ++++++++++++ sql/dwd_before/order_user_df.sql | 49 ++++++ sql/dws_after/game_user_data_month.sql | 65 ++++++++ sql/dws_before/game_user_data.sql | 65 ++++++++ sql_job.py | 44 ++++++ utils/__init__.py | 0 utils/date_time.py | 63 ++++++++ utils/execute_sql.py | 71 +++++++++ utils/loadconfig.py | 27 ++++ utils/log.py | 69 ++++++++ utils/read_sql_files.py | 109 +++++++++++++ 40 files changed, 1825 insertions(+) create mode 100644 .dockerignore create mode 100644 .gitignore create mode 100644 README.md create mode 100644 config/config.ini create mode 100644 deploy.sh create mode 100644 docker_deploy/docker-compose.yml create mode 100644 dockerfile create mode 100644 job_one_game.py create mode 100644 job_test.py create mode 100644 one_game_sql/dim/first_user.sql create mode 100644 one_game_sql/dwd_after/revenue_df.sql create mode 100644 one_game_sql/dwd_before/active_account_df.sql create mode 100644 one_game_sql/dwd_before/active_df.sql create mode 100644 one_game_sql/dwd_before/order_df.sql create mode 100644 one_game_sql/dwd_before/order_user_df.sql create mode 100644 one_game_sql/dws_after/game_user_data_month.sql create mode 100644 one_game_sql/dws_before/game_user_data.sql create mode 100644 requirements.txt create mode 100644 sql/ads/data_screen.sql create mode 100644 sql/ads/game_data.sql create mode 100644 sql/ads/order_ltv.sql create mode 100644 sql/ads/order_retention.sql create mode 100644 sql/ads/revenue_data.sql create mode 100644 sql/ads/user_retention.sql create mode 100644 sql/dim/first_user.sql create mode 100644 sql/dim/game_product_relation.sql create mode 100644 sql/dwd_after/revenue_df.sql create mode 100644 sql/dwd_before/active_account_df.sql create mode 100644 sql/dwd_before/active_df.sql create mode 100644 sql/dwd_before/order_df.sql create mode 100644 sql/dwd_before/order_user_df.sql create mode 100644 sql/dws_after/game_user_data_month.sql create mode 100644 sql/dws_before/game_user_data.sql create mode 100644 sql_job.py create mode 100644 utils/__init__.py create mode 100644 utils/date_time.py create mode 100644 utils/execute_sql.py create mode 100644 utils/loadconfig.py create mode 100644 utils/log.py create mode 100644 utils/read_sql_files.py diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..e833929 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,4 @@ +.venv/ +.idea/ +.deploy/ +logs/ diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6d9594a --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/logs/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..d5625c2 --- /dev/null +++ b/README.md @@ -0,0 +1,12 @@ +# data_job + +自动化配置: +将代码拉取到任意目录 + +修改 +[docker-compose.yml](docker_deploy%2Fdocker-compose.yml) +的volumes /opt/logs 目录指定到本地任意文件夹 + +代码根目录执行[deploy.sh](deploy.sh) + +查看修改的日志目录日志文件有日志输出部署正常 \ No newline at end of file diff --git a/config/config.ini b/config/config.ini new file mode 100644 index 0000000..3e739e9 --- /dev/null +++ b/config/config.ini @@ -0,0 +1,12 @@ +[test] +database=pubdata_center +user=data +password=vpcC~JGr5@qLKO0Y +host=101.37.167.108 +port=5432 +[pro] +database=pubdata_center +user=data +password=vpcC~JGr5@qLKO0Y +host=127.0.0.1 +port=5432 \ No newline at end of file diff --git a/deploy.sh b/deploy.sh new file mode 100644 index 0000000..76a3bf7 --- /dev/null +++ b/deploy.sh @@ -0,0 +1,40 @@ +#!/bin/bash + +# 设置颜色代码以改善输出可读性 +GREEN='\033[0;32m' +RED='\033[0;31m' +NC='\033[0m' # No Color + +# 获取脚本所在目录的绝对路径 +SCRIPT_DIR=$(dirname "$(realpath "$0")") + +# 定义函数用于检查命令是否成功执行 +check_command() { + if [ $? -ne 0 ]; then + echo -e "${RED}期间发生错误: $1${NC}" + exit 1 + else + echo -e "${GREEN}$1 完成.${NC}" + fi +} + +# 打印开始信息 +echo -e "${GREEN}开始部署...${NC}" + +# Step 1: 停止并移除现有的 docker-compose 服务 +echo -e "${GREEN}停止并移除现有容器...${NC}" +docker compose -f "${SCRIPT_DIR}/docker_deploy/docker-compose.yml" down +check_command "docker-compose down" + +# Step 2: 构建新的 Docker 镜像 +echo -e "${GREEN}构建新的Docker镜像...${NC}" +docker build --no-cache -f "${SCRIPT_DIR}/Dockerfile" -t data_job "${SCRIPT_DIR}/" +check_command "docker build" + +# Step 3: 使用更新后的镜像启动服务 +echo -e "${GREEN}使用更新的映像启动服务...${NC}" +docker compose -f "${SCRIPT_DIR}/docker_deploy/docker-compose.yml" up -d +check_command "docker-compose up -d" + +# 打印完成信息 +echo -e "${GREEN}部署过程已成功完成.${NC}" \ No newline at end of file diff --git a/docker_deploy/docker-compose.yml b/docker_deploy/docker-compose.yml new file mode 100644 index 0000000..392ada8 --- /dev/null +++ b/docker_deploy/docker-compose.yml @@ -0,0 +1,11 @@ +services: + sql_job: + image: data_job + restart: always + hostname: sql_job + container_name: sql_job + volumes: + - /opt/logs:/opt/job/logs + environment: + ENVIRONMENT_VARIABLE: pro + network_mode: "host" diff --git a/dockerfile b/dockerfile new file mode 100644 index 0000000..6f77291 --- /dev/null +++ b/dockerfile @@ -0,0 +1,20 @@ +# 使用阿里云的 Python 3.11 镜像 +FROM registry.cn-hangzhou.aliyuncs.com/yinzhou_docker_hub/python:3.11-alpine + +# 设置工作目录 +WORKDIR /opt/job + +# 设置时区为 Asia/Shanghai +ENV TZ=Asia/Shanghai + +# 将 requirements.txt 文件复制到容器中 +COPY requirements.txt . + +# 安装依赖 +RUN pip install --no-cache-dir -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple + +# 将其他文件复制到容器中 +COPY . . + +# 运行应用程序 +ENTRYPOINT ["python3", "sql_job.py"] \ No newline at end of file diff --git a/job_one_game.py b/job_one_game.py new file mode 100644 index 0000000..cccccaa --- /dev/null +++ b/job_one_game.py @@ -0,0 +1,41 @@ +from utils.read_sql_files import read_one_game_sql_files +from utils.execute_sql import select_execute_sql + + + +def dim(game_information): + read_one_game_sql_files('dim',game_information) + + +def dwd(game_information): + read_one_game_sql_files('dwd_before',game_information) + read_one_game_sql_files('dwd_after',game_information) + + +def dws(game_information): + read_one_game_sql_files('dws_before',game_information) + read_one_game_sql_files('dws_after',game_information) + + +def ads(game_information): + read_one_game_sql_files('ads',game_information) + + +if __name__ == '__main__': + + sql_params="49,50,51" + + sql_str=f"select relation_id,game_channel_id,game_identity,game_platform_id from dim.game_product_relation where relation_id IN ({sql_params})" + game_information=select_execute_sql(sql_str) + for game in game_information: + game_information_dic={ + "${relation_id}": str(game[0]), + "${channel_id}": str(game[1]), + "${game_identity}": game[2], + "${platform_id}": str(game[3]) + } + print(game_information_dic) + # dim(game_information_dic) + # dwd(game_information_dic) + # dws(game_information_dic) + # ads(game_information_dic) diff --git a/job_test.py b/job_test.py new file mode 100644 index 0000000..15ea510 --- /dev/null +++ b/job_test.py @@ -0,0 +1,26 @@ +from utils.read_sql_files import read_sql_files + + +def dim(): + read_sql_files('dim') + + +def dwd(): + read_sql_files('dwd_before') + read_sql_files('dwd_after') + + +def dws(): + read_sql_files('dws_before') + read_sql_files('dws_after') + + +def ads(): + read_sql_files('ads') + + +if __name__ == '__main__': + # dim() + # dwd() + # dws() + ads() diff --git a/one_game_sql/dim/first_user.sql b/one_game_sql/dim/first_user.sql new file mode 100644 index 0000000..682d02d --- /dev/null +++ b/one_game_sql/dim/first_user.sql @@ -0,0 +1,28 @@ +-- 取登陆最早时间 逻辑:如果之前没出现过,那插入这条记录做最早用户登陆时间 +-- 首次写入数据sql +INSERT INTO dim.first_user(game_channel_id,game_identity,game_platform_id,relation_id,prodid,user_id,ds) +SELECT t2.game_channel_id + ,t2.game_identity + ,t2.game_platform_id + ,t2.relation_id + ,t2.product_id prodid + ,t1.user_id + ,min(ds) ds +FROM ods.active_account_list t1 +inner join +dim.game_product_relation t2 +ON t1.channel_id = t2.game_channel_id +AND t1.game_identity = t2.game_identity +AND t1.platform_id = t2.game_platform_id +where ds>='${30_days_later}' and t1.channel_id='${channel_id}' AND t1.game_identity='${game_identity}' AND t1.platform_id='${platform_id}' +GROUP BY t2.game_channel_id + ,t2.game_identity + ,t2.game_platform_id + ,t2.relation_id + ,t2.product_id + ,t1.user_id + +ON CONFLICT (relation_id,user_id) +DO UPDATE SET game_channel_id = EXCLUDED.game_channel_id,game_identity = EXCLUDED.game_identity ,game_platform_id = EXCLUDED.game_platform_id +,prodid = EXCLUDED.prodid,user_id = EXCLUDED.user_id,ds = EXCLUDED.ds +; \ No newline at end of file diff --git a/one_game_sql/dwd_after/revenue_df.sql b/one_game_sql/dwd_after/revenue_df.sql new file mode 100644 index 0000000..c838e0e --- /dev/null +++ b/one_game_sql/dwd_after/revenue_df.sql @@ -0,0 +1,148 @@ +-- 由于收入是按照不同的统计方式选择不同的计算方式 +-- 1.divide_type=1: +-- order_amount * divide_scale +-- 2.divide_type=2: +-- 阶梯(累计): +-- 0->10% 50->20% 100->%30 +-- 第一个月20(流水): 20->10% +-- 第二个月90(流水): 30->10% 50->20% 10->30% +-- 3.divide_type=3: +-- 阶梯(按月): +-- 0->10% 50->20% 100->%30 +-- 第一个月20(流水): 20->10% +-- 第二个月90(流水): 50->10% 40->20% +WITH money_divide_scale_tmp AS ( +SELECT + relation_id, + money, + divide_scale, + COALESCE(LEAD(money) OVER (PARTITION BY relation_id ORDER BY money), 999999999999.99) AS next_money + FROM ( + + SELECT + relation_id, + cast((elem->>'money')::text as numeric) money, + cast((elem->>'divide_scale')::text as numeric) divide_scale + FROM ( + SELECT + relation_id, + jsonb_array_elements(divide_scale_step) elem + FROM + dim.game_product_relation + WHERE + divide_type != 1 + ) tt + +) ttt +WHERE +ttt.money IS NOT NULL +AND ttt.divide_scale IS NOT NULL +), +revenue_df_tmp AS +( +-- divide_type=1 money= order_amount * divide_scale +SELECT + t2.relation_id, + SUM(t1.order_amount * t2.divide_scale) + money, + t1.ds, + t1.ds_type, + t1.ds_name +FROM + dwd.order_df t1 + INNER JOIN dim.game_product_relation t2 ON t1.game_channel_id = t2.game_channel_id + AND t1.game_identity = t2.game_identity + AND t1.game_platform_id = t2.game_platform_id + AND t2.divide_type = 1 + where t1.ds_type = 'd' +GROUP BY + t2.relation_id, + t1.ds, + t1.ds_type, + t1.ds_name +UNION ALL +-- 阶梯(累计): +-- 0->10% 50->20% 100->%30 +-- 第一个月20(流水): 20->10% +-- 第二个月90(流水): 30->10% 50->20% 10->30% +SELECT + tt1.relation_id, + tt1.order_amount * tt2.divide_scale money, + tt1.ds, + tt1.ds_type, + tt1.ds_name +FROM ( + SELECT + t2.relation_id, + t1.ds, + t1.ds_type, + t1.ds_name, + t1.order_amount, + SUM(t1.order_amount) OVER (PARTITION BY t2.relation_id ORDER BY ds) AS sum_order_amount + FROM + dwd.order_df t1 + INNER JOIN dim.game_product_relation t2 ON t1.game_channel_id = t2.game_channel_id + AND t1.game_identity = t2.game_identity + AND t1.game_platform_id = t2.game_platform_id + AND t2.divide_type = 2 + WHERE + t1.ds_type = 'd') tt1 + INNER JOIN money_divide_scale_tmp tt2 ON tt1.relation_id = tt2.relation_id + AND tt1.sum_order_amount >= tt2.money + AND tt1.sum_order_amount <= tt2.next_money +-- 阶梯(按月): +-- 0->10% 50->20% 100->%30 +-- 第一个月20(流水): 20->10% +-- 第二个月90(流水): 50->10% 40->20% +-- divide_type=3 +UNION ALL +SELECT + tt1.relation_id, + tt1.order_amount * tt2.divide_scale money, + tt1.ds, + tt1.ds_type, + tt1.ds_name +FROM ( + SELECT + t2.relation_id, + t1.ds, + t1.ds_type, + t1.ds_name, + t1.order_amount, + SUM(t1.order_amount) OVER (PARTITION BY t2.relation_id, SUBSTR(t1.ds, 1, 6) ORDER BY ds) AS sum_order_amount + FROM + dwd.order_df t1 + INNER JOIN dim.game_product_relation t2 ON t1.game_channel_id = t2.game_channel_id + AND t1.game_identity = t2.game_identity + AND t1.game_platform_id = t2.game_platform_id + AND t2.divide_type = 3 +WHERE + t1.ds_type = 'd') tt1 +INNER JOIN money_divide_scale_tmp tt2 ON tt1.relation_id = tt2.relation_id + AND tt1.sum_order_amount >= tt2.money + AND tt1.sum_order_amount <= tt2.next_money + +) +INSERT INTO dwd.revenue_df(relation_id,money,ds,ds_type,ds_name) +SELECT relation_id + ,sum(money) money + ,ds + ,ds_type + ,ds_name +FROM revenue_df_tmp where relation_id='${relation_id}' +GROUP BY relation_id + ,ds + ,ds_type + ,ds_name +union all +SELECT relation_id + ,sum(money) money + ,SUBSTRING(ds, 1, 6) || '00' ds + ,'m' ds_type + ,'月' ds_name +FROM revenue_df_tmp where relation_id='${relation_id}' +GROUP BY relation_id + ,SUBSTRING(ds, 1, 6) || '00' +ON CONFLICT (relation_id,ds,ds_type) +DO UPDATE SET money = EXCLUDED.money,ds_name = EXCLUDED.ds_name +; \ No newline at end of file diff --git a/one_game_sql/dwd_before/active_account_df.sql b/one_game_sql/dwd_before/active_account_df.sql new file mode 100644 index 0000000..a1242b6 --- /dev/null +++ b/one_game_sql/dwd_before/active_account_df.sql @@ -0,0 +1,30 @@ +-- 根据游戏汇总活跃账号数据 天日增 +INSERT INTO dwd.active_account_df(game_channel_id,game_identity,game_platform_id,active_num,ds,ds_type,ds_name) +SELECT channel_id game_channel_id + ,game_identity + ,platform_id game_platform_id + ,count(distinct user_id) active_num + ,ds ds + ,'d' ds_type + ,'天' ds_name +FROM ods.active_account_list +WHERE ds>='${30_days_later}' and channel_id='${channel_id}' AND game_identity='${game_identity}' AND platform_id='${platform_id}' +GROUP BY ds + ,channel_id + ,game_identity + ,platform_id +UNION +SELECT channel_id game_channel_id + ,game_identity + ,platform_id game_platform_id + ,count(distinct user_id) active_num + ,'${t_month}' AS ds + ,'m' ds_type + ,'月' ds_name +FROM ods.active_account_list +WHERE ds>='${30_days_later}' and channel_id='${channel_id}' AND game_identity='${game_identity}' AND platform_id='${platform_id}' +GROUP BY channel_id + ,game_identity + ,platform_id +ON CONFLICT (game_channel_id,game_identity,game_platform_id,ds,ds_type) DO UPDATE SET active_num = EXCLUDED.active_num +; \ No newline at end of file diff --git a/one_game_sql/dwd_before/active_df.sql b/one_game_sql/dwd_before/active_df.sql new file mode 100644 index 0000000..0e6bdb2 --- /dev/null +++ b/one_game_sql/dwd_before/active_df.sql @@ -0,0 +1,18 @@ +INSERT INTO dwd.active_df(relation_id,user_id,ds,ds_date) +SELECT t2.relation_id + ,t1.user_id + ,t1.ds + ,TO_DATE(t1.ds, 'YYYYMMDD') ds_date +FROM pubdata_center.ods.active_account_list t1 +INNER JOIN dim.game_product_relation t2 +ON t1.channel_id = t2.game_channel_id +AND t1.game_identity = t2.game_identity +AND t1.platform_id = t2.game_platform_id +WHERE t1.ds>='${30_days_later}' and t1.channel_id='${channel_id}' AND t1.game_identity='${game_identity}' AND t1.platform_id='${platform_id}' +GROUP BY t2.relation_id + ,t1.user_id + ,t1.ds + ,TO_DATE(t1.ds, 'YYYYMMDD') +ON CONFLICT (relation_id, user_id, ds) + DO UPDATE SET ds_date = EXCLUDED.ds_date +; \ No newline at end of file diff --git a/one_game_sql/dwd_before/order_df.sql b/one_game_sql/dwd_before/order_df.sql new file mode 100644 index 0000000..20af0cf --- /dev/null +++ b/one_game_sql/dwd_before/order_df.sql @@ -0,0 +1,43 @@ +-- 根据游戏汇总订单数据 +-- 增量逻辑 由于数据会存在延迟推送或者新游戏接入数据补数据 +-- 天:取当天推送的订单数据里面看订单日期有哪些天的订单把那几天的数据拉出来重新汇总统计 +-- 月:取当天推送的订单数据里面看订单日期有哪些月的订单把那几天的数据拉出来重新汇总统计 +INSERT INTO dwd.order_df(game_channel_id,game_identity,game_platform_id,order_amount,pay_amount,pay_user_num,ds,ds_type,ds_name) +SELECT t1.channel_id game_channel_id + ,t1.game_identity + ,t1.platform_id game_platform_id + ,SUM(t1.amount) order_amount + ,SUM(t1.pay_amount) pay_amount + ,count(distinct t1.user_id) pay_user_num + ,TO_CHAR(t1.create_time,'YYYYMMDD') ds + ,'d' ds_type + ,'天' ds_name +FROM ods."order" t1 +WHERE t1.channel_id='${channel_id}' AND t1.game_identity='${game_identity}' AND t1.platform_id='${platform_id}' +GROUP BY t1.channel_id + ,t1.game_identity + ,t1.platform_id + ,TO_CHAR(t1.create_time,'YYYYMMDD') +UNION +SELECT t1.channel_id game_channel_id + ,t1.game_identity + ,t1.platform_id game_platform_id + ,SUM(t1.amount) order_amount + ,SUM(t1.pay_amount) pay_amount + ,count(distinct t1.user_id) pay_user_num + ,TO_CHAR(t1.create_time,'YYYYMM') || '00' ds + ,'m' ds_type + ,'月' ds_name +FROM ods."order" t1 +WHERE t1.channel_id='${channel_id}' AND t1.game_identity='${game_identity}' AND t1.platform_id='${platform_id}' +GROUP BY t1.channel_id + ,t1.game_identity + ,t1.platform_id + ,TO_CHAR(t1.create_time,'YYYYMM') || '00' + +ON CONFLICT (game_channel_id,game_identity,game_platform_id,ds,ds_type) +DO UPDATE SET order_amount = EXCLUDED.order_amount,pay_amount = EXCLUDED.pay_amount,pay_user_num = EXCLUDED.pay_user_num +; + + + diff --git a/one_game_sql/dwd_before/order_user_df.sql b/one_game_sql/dwd_before/order_user_df.sql new file mode 100644 index 0000000..c832b2a --- /dev/null +++ b/one_game_sql/dwd_before/order_user_df.sql @@ -0,0 +1,29 @@ +INSERT INTO "dwd"."order_user_df" (relation_id,prodid,game_channel_id,game_identity,game_platform_id,user_id,ds,pay_amount,ds_date) +SELECT t2.relation_id + ,t2.product_id prodid + ,t2.game_channel_id + ,t2.game_identity + ,t2.game_platform_id + ,t1.user_id + ,t1.ds ds + ,sum(pay_amount) pay_amount + ,TO_DATE(t1.ds, 'YYYYMMDD') ds_date +FROM ods.ORDER t1 +inner join +dim.game_product_relation t2 +ON t1.channel_id = t2.game_channel_id +AND t1.game_identity = t2.game_identity +AND t1.platform_id = t2.game_platform_id +WHERE t1.channel_id='${channel_id}' AND t1.game_identity='${game_identity}' AND t1.platform_id='${platform_id}' +group by t2.relation_id + ,t2.product_id + ,t2.game_channel_id + ,t2.game_identity + ,t2.game_platform_id + ,t1.user_id + ,t1.ds + ,TO_DATE(t1.ds, 'YYYYMMDD') +ON CONFLICT (relation_id, user_id, ds) + DO UPDATE SET prodid = EXCLUDED.prodid,game_channel_id = EXCLUDED.game_channel_id,game_identity = EXCLUDED.game_identity,game_platform_id = EXCLUDED.game_platform_id, + ds = EXCLUDED.ds,user_id = EXCLUDED.user_id,pay_amount = EXCLUDED.pay_amount,ds_date = EXCLUDED.ds_date +; \ No newline at end of file diff --git a/one_game_sql/dws_after/game_user_data_month.sql b/one_game_sql/dws_after/game_user_data_month.sql new file mode 100644 index 0000000..732259d --- /dev/null +++ b/one_game_sql/dws_after/game_user_data_month.sql @@ -0,0 +1,65 @@ +-- 按月统计游戏的活跃账号相关数据 +INSERT INTO dws.game_user_data_month(type,type_name,game_channel_id,game_identity,game_platform_id,ds,active_user_cut,play_user_cut,order_amount,money) + +select + 'hyzh' type + ,'活跃账号' type_name + ,COALESCE(t.game_channel_id,t1.game_channel_id) game_channel_id + , COALESCE(t.game_identity,t1.game_identity) game_identity + , COALESCE(t.game_platform_id,t1.game_platform_id) game_platform_id + , COALESCE(t.ds,t1.ds) ds + , COALESCE(sum(t.active_num), 0) active_user_cut + , COALESCE(sum(t1.pay_user_num), 0) play_user_cut + , COALESCE(sum(t1.order_amount), 0) order_amount + , COALESCE(SUM(t1.pay_amount), 0) money + +FROM dwd.active_account_df t +FULL OUTER JOIN dwd.order_df t1 +on t.game_channel_id = t1.game_channel_id +and t.game_identity = t1.game_identity +and t.game_platform_id = t1.game_platform_id +and t.ds_type = t1.ds_type +and t.ds = t1.ds +where t.ds_type = 'm' +group by COALESCE(t.game_channel_id,t1.game_channel_id) + , COALESCE(t.game_identity,t1.game_identity) + , COALESCE(t.game_platform_id,t1.game_platform_id) + , COALESCE(t.ds,t1.ds) + + +ON CONFLICT (type,type_name,game_channel_id,game_identity,game_platform_id,ds) DO +UPDATE SET active_user_cut = EXCLUDED.active_user_cut,play_user_cut = EXCLUDED.play_user_cut,order_amount = EXCLUDED.order_amount +,money = EXCLUDED.money +; + + +-- 按月增量统计游戏的新账号相关数据 + + +INSERT INTO dws.game_user_data_month(type,type_name,game_channel_id,game_identity,game_platform_id,ds,active_user_cut,play_user_cut,order_amount,money) + + +select + 'xzh' type + ,'新账号' type_name + ,t1.game_channel_id + ,t1.game_identity + ,t1.game_platform_id + ,substring(t1.ds,1,6) || '00' ds + ,COALESCE(sum(t1.active_user_cut),0) active_user_cut + ,COALESCE(sum(t1.play_user_cut),0) play_user_cut + ,COALESCE(sum(t1.order_amount),0) order_amount + ,COALESCE(sum(t1.money),0) money +from dws.game_user_data t1 +group by t1.game_channel_id + ,t1.game_identity + ,t1.game_platform_id + ,substring(t1.ds,1,6) || '00' + + +ON CONFLICT (type,type_name,game_channel_id,game_identity,game_platform_id,ds) DO +UPDATE SET active_user_cut = EXCLUDED.active_user_cut,play_user_cut = EXCLUDED.play_user_cut,order_amount = EXCLUDED.order_amount +,money = EXCLUDED.money +; + + diff --git a/one_game_sql/dws_before/game_user_data.sql b/one_game_sql/dws_before/game_user_data.sql new file mode 100644 index 0000000..0379cf8 --- /dev/null +++ b/one_game_sql/dws_before/game_user_data.sql @@ -0,0 +1,67 @@ +-- 统计游戏的活跃账号相关数据 +INSERT INTO dws.game_user_data(type,type_name,game_channel_id,game_identity,game_platform_id,ds,active_user_cut,play_user_cut,order_amount,money) + +select + 'hyzh' type + ,'活跃账号' type_name + ,COALESCE(t.game_channel_id,t1.game_channel_id) game_channel_id + , COALESCE(t.game_identity,t1.game_identity) game_identity + , COALESCE(t.game_platform_id,t1.game_platform_id) game_platform_id + , COALESCE(t.ds,t1.ds) ds + , COALESCE(sum(t.active_num), 0) active_user_cut + , COALESCE(sum(t1.pay_user_num), 0) play_user_cut + , COALESCE(sum(t1.order_amount), 0) order_amount + , COALESCE(SUM(t1.pay_amount), 0) money + + FROM dwd.order_df t1 + FULL OUTER JOIN + dwd.active_account_df t + on t.game_channel_id = t1.game_channel_id + and t.game_identity = t1.game_identity + and t.game_platform_id = t1.game_platform_id + and t.ds_type = t1.ds_type + and t.ds = t1.ds + where t1.ds_type = 'd' + group by COALESCE(t.game_channel_id,t1.game_channel_id) + , COALESCE(t.game_identity,t1.game_identity) + , COALESCE(t.game_platform_id,t1.game_platform_id) + , COALESCE(t.ds,t1.ds) + +ON CONFLICT (type,type_name,game_channel_id,game_identity,game_platform_id,ds) DO +UPDATE SET active_user_cut = EXCLUDED.active_user_cut,play_user_cut = EXCLUDED.play_user_cut,order_amount = EXCLUDED.order_amount +,money = EXCLUDED.money +; + + + + +-- 增量统计游戏的新账号相关数据 + +INSERT INTO dws.game_user_data(type,type_name,game_channel_id,game_identity,game_platform_id,ds,active_user_cut,play_user_cut,order_amount,money) +select + 'xzh' type + ,'新账号' type_name + ,t1.game_channel_id + ,t1.game_identity + ,t1.game_platform_id + ,t1.ds + ,count(distinct t1.user_id) active_user_cut + ,count(distinct t2.user_id) play_user_cut + ,COALESCE(sum(t2.pay_amount),0) order_amount + ,COALESCE(SUM(t2.pay_amount),0) money +from +dim.first_user t1 +left join +"dwd"."order_user_df" t2 +on t1.relation_id=t2.relation_id +and t1.user_id=t2.user_id +and t1.ds=t2.ds +where t1.game_channel_id='${channel_id}' AND t1.game_identity='${game_identity}' AND t1.game_platform_id='${platform_id}' +group by t1.game_channel_id + ,t1.game_identity + ,t1.game_platform_id + ,t1.ds +ON CONFLICT (type,type_name,game_channel_id,game_identity,game_platform_id,ds) DO +UPDATE SET active_user_cut = EXCLUDED.active_user_cut,play_user_cut = EXCLUDED.play_user_cut,order_amount = EXCLUDED.order_amount +,money = EXCLUDED.money +; diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..f1cdb6f0036e6048070d031753b7145c661a2b0e GIT binary patch literal 94 zcmXAgF%CdL00gI&c!~?BQYidD;sv-15>uf7A1y4 d7;;h)4laVsU2npYF|e@F%MY=za;sSV#Rp{54}t&y literal 0 HcmV?d00001 diff --git a/sql/ads/data_screen.sql b/sql/ads/data_screen.sql new file mode 100644 index 0000000..53c81bb --- /dev/null +++ b/sql/ads/data_screen.sql @@ -0,0 +1,34 @@ +-- 数据概览 +INSERT INTO ads.data_screen(relation_id,prodid,ds,ds_type,ds_name,order_amount,money,processed_amount,"cost") +SELECT t1.relation_id + ,t1.product_id prodid + ,t.ds + ,t.ds_type + ,t.ds_name + ,SUM(t.order_amount) order_amount + ,SUM(t2.money) money + ,0 processed_amount + ,SUM(COALESCE(t3.cost,0)) AS "cost" +FROM dwd.order_df t +INNER JOIN dim.game_product_relation t1 +ON t.game_channel_id = t1.game_channel_id +AND t.game_identity = t1.game_identity +AND t.game_platform_id = t1.game_platform_id +INNER JOIN dwd.revenue_df t2 on t1.relation_id=t2.relation_id +AND t.ds=t2.ds +and t.ds_type=t2.ds_type +left join "ads"."ad_data" t3 +on t1.relation_id=t3.relation_id +and t.ds=t3.ds +and t.ds_type=t3.ds_type + +GROUP BY t1.relation_id + ,t1.product_id + ,t.ds + ,t.ds_type + ,t.ds_name +ON CONFLICT (relation_id,ds,ds_type) DO +UPDATE SET prodid = EXCLUDED.prodid,order_amount = EXCLUDED.order_amount,money = EXCLUDED.money +,processed_amount = EXCLUDED.processed_amount,"cost" = EXCLUDED."cost" +; + diff --git a/sql/ads/game_data.sql b/sql/ads/game_data.sql new file mode 100644 index 0000000..e4fa54e --- /dev/null +++ b/sql/ads/game_data.sql @@ -0,0 +1,62 @@ +-- 汇总游戏数据 + + +INSERT INTO ads.game_data(relation_id,prodid,ds,ds_type,ds_name,active_user_cut,play_user_cut,order_amount,new_active_user_cut, + new_play_user_cut,new_order_amount) + +SELECT + relation_id, + prodid, + ds, + ds_type, + ds_name, + active_user_cut, + play_user_cut, + order_amount, + new_active_user_cut, + new_play_user_cut, + new_order_amount +FROM ( + SELECT t1.relation_id, + t1.product_id AS prodid, + t.ds, + 'd' AS ds_type, + '天' AS ds_name, + SUM(CASE WHEN t.type = 'hyzh' THEN active_user_cut ELSE 0 END) AS active_user_cut, + SUM(CASE WHEN t.type = 'hyzh' THEN play_user_cut ELSE 0 END) AS play_user_cut, + SUM(CASE WHEN t.type = 'hyzh' THEN order_amount ELSE 0 END) AS order_amount, + SUM(CASE WHEN t.type = 'xzh' THEN active_user_cut ELSE 0 END) AS new_active_user_cut, + SUM(CASE WHEN t.type = 'xzh' THEN play_user_cut ELSE 0 END) AS new_play_user_cut, + SUM(CASE WHEN t.type = 'xzh' THEN order_amount ELSE 0 END) AS new_order_amount + FROM dws.game_user_data t + INNER JOIN dim.game_product_relation t1 ON + t.game_channel_id = t1.game_channel_id AND + t.game_identity = t1.game_identity AND + t.game_platform_id = t1.game_platform_id + where t.ds<='${bizdate}' + GROUP BY t1.relation_id, t1.product_id, t.ds + + UNION ALL + + SELECT t1.relation_id, + t1.product_id AS prodid, + t.ds, + 'm' AS ds_type, + '月' AS ds_name, + SUM(CASE WHEN t.type = 'hyzh' THEN active_user_cut ELSE 0 END) AS active_user_cut, + SUM(CASE WHEN t.type = 'hyzh' THEN play_user_cut ELSE 0 END) AS play_user_cut, + SUM(CASE WHEN t.type = 'hyzh' THEN order_amount ELSE 0 END) AS order_amount, + SUM(CASE WHEN t.type = 'xzh' THEN active_user_cut ELSE 0 END) AS new_active_user_cut, + SUM(CASE WHEN t.type = 'xzh' THEN play_user_cut ELSE 0 END) AS new_play_user_cut, + SUM(CASE WHEN t.type = 'xzh' THEN order_amount ELSE 0 END) AS new_order_amount + FROM dws.game_user_data_month t + INNER JOIN dim.game_product_relation t1 ON + t.game_channel_id = t1.game_channel_id AND + t.game_identity = t1.game_identity AND + t.game_platform_id = t1.game_platform_id + GROUP BY t1.relation_id, t1.product_id, t.ds +) tt + +ON CONFLICT (relation_id,ds,ds_type) DO UPDATE SET prodid=EXCLUDED.prodid,ds_name = EXCLUDED.ds_name, active_user_cut = EXCLUDED.active_user_cut ,play_user_cut = EXCLUDED.play_user_cut + ,order_amount = EXCLUDED.order_amount,new_active_user_cut = EXCLUDED.new_active_user_cut,new_play_user_cut = EXCLUDED.new_play_user_cut,new_order_amount = EXCLUDED.new_order_amount +; \ No newline at end of file diff --git a/sql/ads/order_ltv.sql b/sql/ads/order_ltv.sql new file mode 100644 index 0000000..d8ddecb --- /dev/null +++ b/sql/ads/order_ltv.sql @@ -0,0 +1,41 @@ +-- 处理逻辑说明 +-- 1.first_user是用户首次登陆时间 和ods.order 订单表 +-- 2.用户首次登陆去关联订单表获取用户的再那一天下单多少钱 + +INSERT INTO ads.order_ltv ( relation_id,prodid,ds,num,order_amount,ltv0,ltv1,ltv2,ltv3,ltv4,ltv5,ltv6,ltv7,ltv14,ltv30,ltv60,ltv90,ltv150,ltv300) +SELECT + t1.relation_id, + t1.prodid, + t1.ds, + COUNT(DISTINCT t1.user_id) AS num, + COALESCE(SUM(pay_amount),0) order_amount, + SUM(CASE WHEN (t2.ds_date-t1.ds_date) <= 0 THEN pay_amount ELSE 0 END) AS ltv0, + SUM(CASE WHEN (t2.ds_date-t1.ds_date) <= 1 THEN pay_amount ELSE 0 END) AS ltv1, + SUM(CASE WHEN (t2.ds_date-t1.ds_date) <= 2 THEN pay_amount ELSE 0 END) AS ltv2, + SUM(CASE WHEN (t2.ds_date-t1.ds_date) <= 3 THEN pay_amount ELSE 0 END) AS ltv3, + SUM(CASE WHEN (t2.ds_date-t1.ds_date) <= 4 THEN pay_amount ELSE 0 END) AS ltv4, + SUM(CASE WHEN (t2.ds_date-t1.ds_date) <= 5 THEN pay_amount ELSE 0 END) AS ltv5, + SUM(CASE WHEN (t2.ds_date-t1.ds_date) <= 6 THEN pay_amount ELSE 0 END) AS ltv6, + SUM(CASE WHEN (t2.ds_date-t1.ds_date) <= 7 THEN pay_amount ELSE 0 END) AS ltv7, + SUM(CASE WHEN (t2.ds_date-t1.ds_date) <= 14 THEN pay_amount ELSE 0 END) AS ltv14, + SUM(CASE WHEN (t2.ds_date-t1.ds_date) <= 30 THEN pay_amount ELSE 0 END) AS ltv30, + SUM(CASE WHEN (t2.ds_date-t1.ds_date) <= 60 THEN pay_amount ELSE 0 END) AS ltv60, + SUM(CASE WHEN (t2.ds_date-t1.ds_date) <= 90 THEN pay_amount ELSE 0 END) AS ltv90, + SUM(CASE WHEN (t2.ds_date-t1.ds_date) <= 150 THEN pay_amount ELSE 0 END) AS ltv150, + SUM(CASE WHEN (t2.ds_date-t1.ds_date) <= 300 THEN pay_amount ELSE 0 END) AS ltv300 +FROM + dim.first_user t1 +LEFT JOIN + "dwd"."order_user_df" t2 +ON +t1.relation_id = t2.relation_id +AND t1.user_id = t2.user_id +AND t2.ds_date-t1.ds_date IN (1,2,3,4,5,6,7,14,30,60,90,150,300) +where t1.ds>='${300_days_later}' +GROUP BY + t1.relation_id, t1.prodid, t1.ds +ON CONFLICT (relation_id,prodid,ds) +DO UPDATE SET num = EXCLUDED.num,order_amount = EXCLUDED.order_amount,ltv0 = EXCLUDED.ltv0,ltv1 = EXCLUDED.ltv1,ltv2 = EXCLUDED.ltv2,ltv3 = EXCLUDED.ltv3,ltv4 = EXCLUDED.ltv4,ltv5 = EXCLUDED.ltv5,ltv6 = EXCLUDED.ltv6,ltv7 = EXCLUDED.ltv7 + ,ltv14 = EXCLUDED.ltv14,ltv30 = EXCLUDED.ltv30,ltv60 = EXCLUDED.ltv60,ltv90 = EXCLUDED.ltv90,ltv150 = EXCLUDED.ltv150,ltv300 = EXCLUDED.ltv300 + +; \ No newline at end of file diff --git a/sql/ads/order_retention.sql b/sql/ads/order_retention.sql new file mode 100644 index 0000000..1207e03 --- /dev/null +++ b/sql/ads/order_retention.sql @@ -0,0 +1,42 @@ +-- 处理逻辑说明 +-- 1.用到了first_user 和 lately_order_user 这两个表来计算ltv +-- first_user是用户首次登陆时间 +-- lately_user是用户最后一次登陆时间 +-- 2.用户首次登陆去关联用户最后一次登陆时间他们的时间差就是用户的流失时间 + +INSERT INTO ads.order_retention ( relation_id,prodid,ds,num,ltv1,ltv2,ltv3,ltv4,ltv5,ltv6,ltv7,ltv14,ltv30,ltv60,ltv90,ltv150,ltv300) +select + t1.relation_id + ,t1.prodid + ,t1.ds + ,count(distinct t1.user_id) AS num + ,SUM(CASE WHEN t2.ds_date-t1.ds_date = 1 then 1 ELSE 0 END) AS ltv1 + ,SUM(CASE WHEN t2.ds_date-t1.ds_date = 2 then 1 ELSE 0 END) AS ltv2 + ,SUM(CASE WHEN t2.ds_date-t1.ds_date = 3 then 1 ELSE 0 END) AS ltv3 + ,SUM(CASE WHEN t2.ds_date-t1.ds_date = 4 then 1 ELSE 0 END) AS ltv4 + ,SUM(CASE WHEN t2.ds_date-t1.ds_date = 5 then 1 ELSE 0 END) AS ltv5 + ,SUM(CASE WHEN t2.ds_date-t1.ds_date = 6 then 1 ELSE 0 END) AS ltv6 + ,SUM(CASE WHEN t2.ds_date-t1.ds_date = 7 then 1 ELSE 0 END) AS ltv7 + ,SUM(CASE WHEN t2.ds_date-t1.ds_date = 14 then 1 ELSE 0 END) AS ltv14 + ,SUM(CASE WHEN t2.ds_date-t1.ds_date = 30 then 1 ELSE 0 END) AS ltv30 + ,SUM(CASE WHEN t2.ds_date-t1.ds_date = 60 then 1 ELSE 0 END) AS ltv60 + ,SUM(CASE WHEN t2.ds_date-t1.ds_date = 90 then 1 ELSE 0 END) AS ltv90 + ,SUM(CASE WHEN t2.ds_date-t1.ds_date = 150 then 1 ELSE 0 END) AS ltv150 + ,SUM(CASE WHEN t2.ds_date-t1.ds_date = 300 then 1 ELSE 0 END) AS ltv300 + +from + "dwd"."order_user_df" t1 +left join + dwd.active_df t2 +on t1.relation_id=t2.relation_id +and t1.user_id=t2.user_id +AND t2.ds_date-t1.ds_date IN (1,2,3,4,5,6,7,14,30,60,90,150,300) +-- AND t2.ds_date >= t1.ds_date - 300 +where t1.ds>='${300_days_later}' +group by t1.relation_id + ,t1.prodid + ,t1.ds +ON CONFLICT (relation_id,prodid,ds) +DO UPDATE SET num = EXCLUDED.num,ltv1 = EXCLUDED.ltv1,ltv2 = EXCLUDED.ltv2,ltv3 = EXCLUDED.ltv3,ltv4 = EXCLUDED.ltv4,ltv5 = EXCLUDED.ltv5,ltv6 = EXCLUDED.ltv6,ltv7 = EXCLUDED.ltv7,ltv14 = EXCLUDED.ltv14,ltv30 = EXCLUDED.ltv30,ltv60 = EXCLUDED.ltv60 + ,ltv90 = EXCLUDED.ltv90,ltv150 = EXCLUDED.ltv150,ltv300 = EXCLUDED.ltv300 +; \ No newline at end of file diff --git a/sql/ads/revenue_data.sql b/sql/ads/revenue_data.sql new file mode 100644 index 0000000..7c07a01 --- /dev/null +++ b/sql/ads/revenue_data.sql @@ -0,0 +1,64 @@ +-- 收入数据汇总 +INSERT INTO ads.revenue_data ( relation_id,prodid,ds,ds_type,ds_name,channel,channel_type_name,merchant_name,platform_name,order_amount,money,active_num) +SELECT t1.relation_id + ,t1.product_id prodid + ,t.ds + ,t.ds_type + ,t.ds_name + ,t1.channel_id channel + ,t1.channel_name channel_type_name + ,t1.merchant_name merchant_name + ,t1.platform_name platform_name + ,SUM(COALESCE(tt4.order_amount,0)) order_amount + ,SUM(COALESCE(tt5.money,0)) money + ,SUM(COALESCE(tt3.active_num,0)) active_num +FROM ( + SELECT game_channel_id + ,game_identity + ,game_platform_id + ,ds + ,ds_type + ,ds_name + FROM dwd.active_account_df + UNION + SELECT game_channel_id + ,game_identity + ,game_platform_id + ,ds + ,ds_type + ,ds_name + FROM dwd.order_df + ) t +INNER JOIN dim.game_product_relation t1 +ON t.game_channel_id = t1.game_channel_id +AND t.game_identity = t1.game_identity +AND t.game_platform_id = t1.game_platform_id +LEFT JOIN dwd.active_account_df tt3 +ON t.game_channel_id = tt3.game_channel_id +AND t.game_identity = tt3.game_identity +AND t.game_platform_id = tt3.game_platform_id +AND t.ds = tt3.ds +AND t.ds_type = tt3.ds_type +LEFT JOIN dwd.order_df tt4 +ON t.game_channel_id = tt4.game_channel_id +AND t.game_identity = tt4.game_identity +AND t.game_platform_id = tt4.game_platform_id +AND t.ds = tt4.ds +AND t.ds_type = tt4.ds_type +left JOIN dwd.revenue_df tt5 on t1.relation_id=tt5.relation_id +AND tt4.ds=tt5.ds +and tt4.ds_type=tt5.ds_type +group by t1.relation_id + ,t1.product_id + ,t.ds + ,t.ds_type + ,t.ds_name + ,t1.channel_id + ,t1.channel_name + ,t1.merchant_name + ,t1.platform_name + +ON CONFLICT (relation_id,channel,merchant_name,platform_name,ds,ds_type) +DO UPDATE SET prodid = EXCLUDED.prodid,order_amount = EXCLUDED.order_amount,money = EXCLUDED.money, +active_num = EXCLUDED.active_num +; diff --git a/sql/ads/user_retention.sql b/sql/ads/user_retention.sql new file mode 100644 index 0000000..4be5186 --- /dev/null +++ b/sql/ads/user_retention.sql @@ -0,0 +1,41 @@ +-- 处理逻辑说明 +-- 1.用到了first_user 和 lately_user 这两个表来计算ltv +-- first_user是用户首次登陆时间 +-- lately_user是用户最后一次登陆时间 +-- 2.用户首次登陆去关联用户最后一次登陆时间他们的时间差就是用户的流失时间 + +INSERT INTO ads.user_retention ( relation_id,prodid,ds,num,ltv1,ltv2,ltv3,ltv4,ltv5,ltv6,ltv7,ltv14,ltv30,ltv60,ltv90,ltv150,ltv300) +select + t1.relation_id + ,t1.prodid + ,t1.ds + ,count(distinct t1.user_id) AS num + ,SUM(CASE WHEN (t2.ds_date-t1.ds_date) = 1 then 1 ELSE 0 END) AS ltv1 + ,SUM(CASE WHEN (t2.ds_date-t1.ds_date) = 2 then 1 ELSE 0 END) AS ltv2 + ,SUM(CASE WHEN (t2.ds_date-t1.ds_date) = 3 then 1 ELSE 0 END) AS ltv3 + ,SUM(CASE WHEN (t2.ds_date-t1.ds_date) = 4 then 1 ELSE 0 END) AS ltv4 + ,SUM(CASE WHEN (t2.ds_date-t1.ds_date) = 5 then 1 ELSE 0 END) AS ltv5 + ,SUM(CASE WHEN (t2.ds_date-t1.ds_date) = 6 then 1 ELSE 0 END) AS ltv6 + ,SUM(CASE WHEN (t2.ds_date-t1.ds_date) = 7 then 1 ELSE 0 END) AS ltv7 + ,SUM(CASE WHEN (t2.ds_date-t1.ds_date) = 14 then 1 ELSE 0 END) AS ltv14 + ,SUM(CASE WHEN (t2.ds_date-t1.ds_date) = 30 then 1 ELSE 0 END) AS ltv30 + ,SUM(CASE WHEN (t2.ds_date-t1.ds_date) = 60 then 1 ELSE 0 END) AS ltv60 + ,SUM(CASE WHEN (t2.ds_date-t1.ds_date) = 90 then 1 ELSE 0 END) AS ltv90 + ,SUM(CASE WHEN (t2.ds_date-t1.ds_date) = 150 then 1 ELSE 0 END) AS ltv150 + ,SUM(CASE WHEN (t2.ds_date-t1.ds_date) = 300 then 1 ELSE 0 END) AS ltv300 + + from +dim.first_user t1 +left join +dwd.active_df t2 +on t1.relation_id=t2.relation_id +and t1.user_id=t2.user_id +AND t2.ds_date-t1.ds_date IN (1,2,3,4,5,6,7,14,30,60,90,150,300) +where t1.ds>='${300_days_later}' +group by t1.relation_id + ,t1.prodid + ,t1.ds +ON CONFLICT (relation_id,prodid,ds) +DO UPDATE SET num = EXCLUDED.num,ltv1 = EXCLUDED.ltv1,ltv2 = EXCLUDED.ltv2,ltv3 = EXCLUDED.ltv3,ltv4 = EXCLUDED.ltv4,ltv5 = EXCLUDED.ltv5,ltv6 = EXCLUDED.ltv6,ltv7 = EXCLUDED.ltv7,ltv14 = EXCLUDED.ltv14,ltv30 = EXCLUDED.ltv30,ltv60 = EXCLUDED.ltv60 + ,ltv90 = EXCLUDED.ltv90,ltv150 = EXCLUDED.ltv150,ltv300 = EXCLUDED.ltv300 +; \ No newline at end of file diff --git a/sql/dim/first_user.sql b/sql/dim/first_user.sql new file mode 100644 index 0000000..a7f6585 --- /dev/null +++ b/sql/dim/first_user.sql @@ -0,0 +1,41 @@ +-- 取登陆最早时间 逻辑:如果之前没出现过,那插入这条记录做最早用户登陆时间 +-- 首次写入数据sql + +INSERT INTO dim.first_user(game_channel_id,game_identity,game_platform_id,relation_id,prodid,user_id,ds,ds_date) +WITH active_account_tmp AS ( + SELECT distinct + t1.channel_id, + t1.game_identity, + t1.platform_id, + t1.user_id, + '${bizdate}' AS ds -- 直接赋值替代min(ds) + FROM ods.active_account_list t1 + WHERE t1.ds = '${bizdate}' + AND NOT EXISTS ( + SELECT 1 + FROM dim.first_user t3 + WHERE t1.channel_id = t3.game_channel_id + AND t1.game_identity = t3.game_identity + AND t1.platform_id = t3.game_platform_id + AND t1.user_id = t3.user_id + ) +) +SELECT t2.game_channel_id + ,t2.game_identity + ,t2.game_platform_id + ,t2.relation_id + ,t2.product_id prodid + ,t1.user_id + ,t1.ds ds + ,'${biz-date}' ds_date +FROM active_account_tmp t1 +inner join +dim.game_product_relation t2 +ON t1.channel_id = t2.game_channel_id +AND t1.game_identity = t2.game_identity +AND t1.platform_id = t2.game_platform_id + +ON CONFLICT (relation_id,user_id) +DO UPDATE SET game_channel_id = EXCLUDED.game_channel_id,game_identity = EXCLUDED.game_identity ,game_platform_id = EXCLUDED.game_platform_id +,prodid = EXCLUDED.prodid,user_id = EXCLUDED.user_id,ds = EXCLUDED.ds,ds_date = EXCLUDED.ds_date +; \ No newline at end of file diff --git a/sql/dim/game_product_relation.sql b/sql/dim/game_product_relation.sql new file mode 100644 index 0000000..49ac9d3 --- /dev/null +++ b/sql/dim/game_product_relation.sql @@ -0,0 +1,50 @@ +-- 维度扩展 +INSERT INTO dim.game_product_relation (relation_id, game_identity, game_platform_id, game_channel_id, product_id, product_name, project_name, studio_id, studio_name, entity_id, entity_name, system_id, system_name, platform_id, platform_name, merchant_id, merchant_name, channel_id, channel_name, divide_type, divide_scale, divide_scale_step) +SELECT + tt1.relation_id, + tt1.game_identity, + tt1.platform_id game_platform_id, + tt1.channel_id game_channel_id, + tt2.product_id, + tt2.product_name, + tt2.project_id project_name, + tt2.studio_id, + tt2.studio_name, + tt2.entity_id, + tt2.entity_name, + tt2.system_id, + tt2.system_name, + tt2.platform_id, + tt2.platform_name, + tt2.merchant_id, + tt2.merchant_name, + tt2.channel_id, + tt2.channel_name, + tt2.divide_type, + tt2.divide_scale, + tt2.divide_scale_step +FROM + dws.game_external_relation tt1 + INNER JOIN dws.game_product_relation tt2 ON tt1.relation_id = tt2.relation_id +ON CONFLICT (game_identity,game_platform_id,game_channel_id) + DO UPDATE SET + relation_id = EXCLUDED.relation_id, + product_id = EXCLUDED.product_id, + product_name = EXCLUDED.product_name, + project_name = EXCLUDED.project_name, + studio_id = EXCLUDED.studio_id, + studio_name = EXCLUDED.studio_name, + entity_id = EXCLUDED.entity_id, + entity_name = EXCLUDED.entity_name, + system_id = EXCLUDED.system_id, + system_name = EXCLUDED.system_name, + platform_id = EXCLUDED.platform_id, + platform_name = EXCLUDED.platform_name, + merchant_id = EXCLUDED.merchant_id, + merchant_name = EXCLUDED.merchant_name, + channel_id = EXCLUDED.channel_id, + channel_name = EXCLUDED.channel_name, + divide_type = EXCLUDED.divide_type, + divide_scale = EXCLUDED.divide_scale, + divide_scale_step = EXCLUDED.divide_scale_step +; diff --git a/sql/dwd_after/revenue_df.sql b/sql/dwd_after/revenue_df.sql new file mode 100644 index 0000000..670cee3 --- /dev/null +++ b/sql/dwd_after/revenue_df.sql @@ -0,0 +1,148 @@ +-- 由于收入是按照不同的统计方式选择不同的计算方式 +-- 1.divide_type=1: +-- order_amount * divide_scale +-- 2.divide_type=2: +-- 阶梯(累计): +-- 0->10% 50->20% 100->%30 +-- 第一个月20(流水): 20->10% +-- 第二个月90(流水): 30->10% 50->20% 10->30% +-- 3.divide_type=3: +-- 阶梯(按月): +-- 0->10% 50->20% 100->%30 +-- 第一个月20(流水): 20->10% +-- 第二个月90(流水): 50->10% 40->20% +WITH money_divide_scale_tmp AS ( +SELECT + relation_id, + money, + divide_scale, + COALESCE(LEAD(money) OVER (PARTITION BY relation_id ORDER BY money), 999999999999.99) AS next_money + FROM ( + + SELECT + relation_id, + cast((elem->>'money')::text as numeric) money, + cast((elem->>'divide_scale')::text as numeric) divide_scale + FROM ( + SELECT + relation_id, + jsonb_array_elements(divide_scale_step) elem + FROM + dim.game_product_relation + WHERE + divide_type != 1 + ) tt + +) ttt +WHERE +ttt.money IS NOT NULL +AND ttt.divide_scale IS NOT NULL +), +revenue_df_tmp AS +( +-- divide_type=1 money= order_amount * divide_scale +SELECT + t2.relation_id, + SUM(t1.order_amount * t2.divide_scale) + money, + t1.ds, + t1.ds_type, + t1.ds_name +FROM + dwd.order_df t1 + INNER JOIN dim.game_product_relation t2 ON t1.game_channel_id = t2.game_channel_id + AND t1.game_identity = t2.game_identity + AND t1.game_platform_id = t2.game_platform_id + AND t2.divide_type = 1 + where t1.ds_type = 'd' +GROUP BY + t2.relation_id, + t1.ds, + t1.ds_type, + t1.ds_name +UNION ALL +-- 阶梯(累计): +-- 0->10% 50->20% 100->%30 +-- 第一个月20(流水): 20->10% +-- 第二个月90(流水): 30->10% 50->20% 10->30% +SELECT + tt1.relation_id, + tt1.order_amount * tt2.divide_scale money, + tt1.ds, + tt1.ds_type, + tt1.ds_name +FROM ( + SELECT + t2.relation_id, + t1.ds, + t1.ds_type, + t1.ds_name, + t1.order_amount, + SUM(t1.order_amount) OVER (PARTITION BY t2.relation_id ORDER BY ds) AS sum_order_amount + FROM + dwd.order_df t1 + INNER JOIN dim.game_product_relation t2 ON t1.game_channel_id = t2.game_channel_id + AND t1.game_identity = t2.game_identity + AND t1.game_platform_id = t2.game_platform_id + AND t2.divide_type = 2 + WHERE + t1.ds_type = 'd') tt1 + INNER JOIN money_divide_scale_tmp tt2 ON tt1.relation_id = tt2.relation_id + AND tt1.sum_order_amount >= tt2.money + AND tt1.sum_order_amount <= tt2.next_money +-- 阶梯(按月): +-- 0->10% 50->20% 100->%30 +-- 第一个月20(流水): 20->10% +-- 第二个月90(流水): 50->10% 40->20% +-- divide_type=3 +UNION ALL +SELECT + tt1.relation_id, + tt1.order_amount * tt2.divide_scale money, + tt1.ds, + tt1.ds_type, + tt1.ds_name +FROM ( + SELECT + t2.relation_id, + t1.ds, + t1.ds_type, + t1.ds_name, + t1.order_amount, + SUM(t1.order_amount) OVER (PARTITION BY t2.relation_id, SUBSTR(t1.ds, 1, 6) ORDER BY ds) AS sum_order_amount + FROM + dwd.order_df t1 + INNER JOIN dim.game_product_relation t2 ON t1.game_channel_id = t2.game_channel_id + AND t1.game_identity = t2.game_identity + AND t1.game_platform_id = t2.game_platform_id + AND t2.divide_type = 3 +WHERE + t1.ds_type = 'd') tt1 +INNER JOIN money_divide_scale_tmp tt2 ON tt1.relation_id = tt2.relation_id + AND tt1.sum_order_amount >= tt2.money + AND tt1.sum_order_amount <= tt2.next_money + +) +INSERT INTO dwd.revenue_df(relation_id,money,ds,ds_type,ds_name) +SELECT relation_id + ,sum(money) money + ,ds + ,ds_type + ,ds_name +FROM revenue_df_tmp +GROUP BY relation_id + ,ds + ,ds_type + ,ds_name +union all +SELECT relation_id + ,sum(money) money + ,SUBSTRING(ds, 1, 6) || '00' ds + ,'m' ds_type + ,'月' ds_name +FROM revenue_df_tmp +GROUP BY relation_id + ,SUBSTRING(ds, 1, 6) || '00' +ON CONFLICT (relation_id,ds,ds_type) +DO UPDATE SET money = EXCLUDED.money,ds_name = EXCLUDED.ds_name +; \ No newline at end of file diff --git a/sql/dwd_before/active_account_df.sql b/sql/dwd_before/active_account_df.sql new file mode 100644 index 0000000..5c088f0 --- /dev/null +++ b/sql/dwd_before/active_account_df.sql @@ -0,0 +1,31 @@ +-- 根据游戏汇总活跃账号数据 天日增 +INSERT INTO dwd.active_account_df(game_channel_id,game_identity,game_platform_id,active_num,ds,ds_type,ds_name) +SELECT channel_id game_channel_id + ,game_identity + ,platform_id game_platform_id + ,count(distinct user_id) active_num + ,ds ds + ,'d' ds_type + ,'天' ds_name +FROM ods.active_account_list +WHERE ds = '${bizdate}' +GROUP BY ds + ,channel_id + ,game_identity + ,platform_id +UNION +SELECT channel_id game_channel_id + ,game_identity + ,platform_id game_platform_id + ,count(distinct user_id) active_num + ,'${t_month}' AS ds + ,'m' ds_type + ,'月' ds_name +FROM ods.active_account_list +WHERE ds >= '${t_month}' +AND ds <= '${bizdate}' +GROUP BY channel_id + ,game_identity + ,platform_id +ON CONFLICT (game_channel_id,game_identity,game_platform_id,ds,ds_type) DO UPDATE SET active_num = EXCLUDED.active_num +; \ No newline at end of file diff --git a/sql/dwd_before/active_df.sql b/sql/dwd_before/active_df.sql new file mode 100644 index 0000000..6e2cf20 --- /dev/null +++ b/sql/dwd_before/active_df.sql @@ -0,0 +1,17 @@ +INSERT INTO dwd.active_df(relation_id,user_id,ds,ds_date) +SELECT t2.relation_id + ,t1.user_id + ,t1.ds + ,'${biz-date}' ds_date +FROM pubdata_center.ods.active_account_list t1 +INNER JOIN dim.game_product_relation t2 +ON t1.channel_id = t2.game_channel_id +AND t1.game_identity = t2.game_identity +AND t1.platform_id = t2.game_platform_id +where ds='${bizdate}' +GROUP BY t2.relation_id + ,t1.user_id + ,t1.ds +ON CONFLICT (relation_id, user_id, ds) + DO UPDATE SET ds_date = EXCLUDED.ds_date +; \ No newline at end of file diff --git a/sql/dwd_before/order_df.sql b/sql/dwd_before/order_df.sql new file mode 100644 index 0000000..b380347 --- /dev/null +++ b/sql/dwd_before/order_df.sql @@ -0,0 +1,97 @@ +-- 根据游戏汇总订单数据 +-- 增量逻辑 由于数据会存在延迟推送或者新游戏接入数据补数据 +-- 天:取当天推送的订单数据里面看订单日期有哪些天的订单把那几天的数据拉出来重新汇总统计 +-- 月:取当天推送的订单数据里面看订单日期有哪些月的订单把那几天的数据拉出来重新汇总统计 +WITH data_tmp AS +( + SELECT t2.relation_id + ,t2.product_id prodid + ,t2.game_channel_id + ,t2.game_identity + ,t2.game_platform_id + ,t1.ds + FROM ods.ORDER t1 + INNER JOIN dim.game_product_relation t2 + ON t1.channel_id = t2.game_channel_id + AND t1.game_identity = t2.game_identity + AND t1.platform_id = t2.game_platform_id + WHERE t1.receive_time BETWEEN '${biz-date}' AND '${intra-day}' + GROUP BY t2.relation_id + ,t2.product_id + ,t2.game_channel_id + ,t2.game_identity + ,t2.game_platform_id + ,t1.ds +) +INSERT INTO dwd.order_df(game_channel_id,game_identity,game_platform_id,order_amount,pay_amount,pay_user_num,ds,ds_type,ds_name) +SELECT t1.game_channel_id + ,t1.game_identity + ,t1.game_platform_id + ,SUM(t2.amount) order_amount + ,SUM(t2.pay_amount) pay_amount + ,count(distinct t2.user_id) pay_user_num + ,t2.ds ds + ,'d' ds_type + ,'天' ds_name +FROM data_tmp t1 +INNER JOIN ods.ORDER t2 +ON t1.game_channel_id = t2.channel_id +AND t1.game_identity = t2.game_identity +AND t1.game_platform_id = t2.platform_id +and t1.ds=t2.ds +GROUP BY t1.game_channel_id + ,t1.game_identity + ,t1.game_platform_id + ,t2.ds +ON CONFLICT (game_channel_id,game_identity,game_platform_id,ds,ds_type) +DO UPDATE SET order_amount = EXCLUDED.order_amount,pay_amount = EXCLUDED.pay_amount,pay_user_num = EXCLUDED.pay_user_num +; + + +WITH data_tmp AS +( + SELECT t2.relation_id + ,t2.product_id prodid + ,t2.game_channel_id + ,t2.game_identity + ,t2.game_platform_id + ,substring(t1.ds,1,6) || '00' min_month + ,substring(t1.ds,1,6) || '32' max_month + FROM ods.ORDER t1 + INNER JOIN dim.game_product_relation t2 + ON t1.channel_id = t2.game_channel_id + AND t1.game_identity = t2.game_identity + AND t1.platform_id = t2.game_platform_id + WHERE t1.receive_time BETWEEN '${biz-date}' AND '${intra-day}' + GROUP BY t2.relation_id + ,t2.product_id + ,t2.game_channel_id + ,t2.game_identity + ,t2.game_platform_id + ,substring(t1.ds,1,6) || '00' + ,substring(t1.ds,1,6) || '32' +) +INSERT INTO dwd.order_df(game_channel_id,game_identity,game_platform_id,order_amount,pay_amount,pay_user_num,ds,ds_type,ds_name) +SELECT t1.game_channel_id + ,t1.game_identity + ,t1.game_platform_id + ,SUM(t2.amount) order_amount + ,SUM(t2.pay_amount) pay_amount + ,count(distinct t2.user_id) pay_user_num + ,substring(t2.ds,1,6) || '00' ds + ,'m' ds_type + ,'月' ds_name +FROM data_tmp t1 +INNER JOIN ods.ORDER t2 +ON t1.game_channel_id = t2.channel_id +AND t1.game_identity = t2.game_identity +AND t1.game_platform_id = t2.platform_id +AND t1.min_month t2.ds +GROUP BY t1.game_channel_id + ,t1.game_identity + ,t1.game_platform_id + ,substring(t2.ds,1,6) || '00' +ON CONFLICT (game_channel_id,game_identity,game_platform_id,ds,ds_type) +DO UPDATE SET order_amount = EXCLUDED.order_amount,pay_amount = EXCLUDED.pay_amount,pay_user_num = EXCLUDED.pay_user_num +; diff --git a/sql/dwd_before/order_user_df.sql b/sql/dwd_before/order_user_df.sql new file mode 100644 index 0000000..bedd45b --- /dev/null +++ b/sql/dwd_before/order_user_df.sql @@ -0,0 +1,49 @@ +-- 取当天推送的订单数据里面看订单日期 重新汇总统计 +WITH data_tmp AS +( + SELECT t2.relation_id + ,t2.product_id prodid + ,t2.game_channel_id + ,t2.game_identity + ,t2.game_platform_id + ,t1.ds + FROM ods.ORDER t1 + INNER JOIN dim.game_product_relation t2 + ON t1.channel_id = t2.game_channel_id + AND t1.game_identity = t2.game_identity + AND t1.platform_id = t2.game_platform_id + WHERE t1.receive_time BETWEEN '${biz-date}' AND '${intra-day}' + GROUP BY t2.relation_id + ,t2.product_id + ,t2.game_channel_id + ,t2.game_identity + ,t2.game_platform_id + ,t1.ds +) +INSERT INTO "dwd"."order_user_df" (relation_id,prodid,game_channel_id,game_identity,game_platform_id,user_id,ds,pay_amount,ds_date) +SELECT t1.relation_id + ,t1.prodid + ,t1.game_channel_id + ,t1.game_identity + ,t1.game_platform_id + ,t2.user_id + ,t1.ds ds + ,sum(pay_amount) pay_amount + ,'${biz-date}' ds_date +FROM data_tmp t1 +INNER JOIN ods.ORDER t2 +ON t1.game_channel_id = t2.channel_id +AND t1.game_identity = t2.game_identity +AND t1.game_platform_id = t2.platform_id +and t1.ds=t2.ds +group by t1.relation_id + ,t1.prodid + ,t1.game_channel_id + ,t1.game_identity + ,t1.game_platform_id + ,t2.user_id + ,t1.ds +ON CONFLICT (relation_id, user_id, ds) + DO UPDATE SET prodid = EXCLUDED.prodid,game_channel_id = EXCLUDED.game_channel_id,game_identity = EXCLUDED.game_identity,game_platform_id = EXCLUDED.game_platform_id, + ds = EXCLUDED.ds,user_id = EXCLUDED.user_id,pay_amount = EXCLUDED.pay_amount,ds_date = EXCLUDED.ds_date +; \ No newline at end of file diff --git a/sql/dws_after/game_user_data_month.sql b/sql/dws_after/game_user_data_month.sql new file mode 100644 index 0000000..732259d --- /dev/null +++ b/sql/dws_after/game_user_data_month.sql @@ -0,0 +1,65 @@ +-- 按月统计游戏的活跃账号相关数据 +INSERT INTO dws.game_user_data_month(type,type_name,game_channel_id,game_identity,game_platform_id,ds,active_user_cut,play_user_cut,order_amount,money) + +select + 'hyzh' type + ,'活跃账号' type_name + ,COALESCE(t.game_channel_id,t1.game_channel_id) game_channel_id + , COALESCE(t.game_identity,t1.game_identity) game_identity + , COALESCE(t.game_platform_id,t1.game_platform_id) game_platform_id + , COALESCE(t.ds,t1.ds) ds + , COALESCE(sum(t.active_num), 0) active_user_cut + , COALESCE(sum(t1.pay_user_num), 0) play_user_cut + , COALESCE(sum(t1.order_amount), 0) order_amount + , COALESCE(SUM(t1.pay_amount), 0) money + +FROM dwd.active_account_df t +FULL OUTER JOIN dwd.order_df t1 +on t.game_channel_id = t1.game_channel_id +and t.game_identity = t1.game_identity +and t.game_platform_id = t1.game_platform_id +and t.ds_type = t1.ds_type +and t.ds = t1.ds +where t.ds_type = 'm' +group by COALESCE(t.game_channel_id,t1.game_channel_id) + , COALESCE(t.game_identity,t1.game_identity) + , COALESCE(t.game_platform_id,t1.game_platform_id) + , COALESCE(t.ds,t1.ds) + + +ON CONFLICT (type,type_name,game_channel_id,game_identity,game_platform_id,ds) DO +UPDATE SET active_user_cut = EXCLUDED.active_user_cut,play_user_cut = EXCLUDED.play_user_cut,order_amount = EXCLUDED.order_amount +,money = EXCLUDED.money +; + + +-- 按月增量统计游戏的新账号相关数据 + + +INSERT INTO dws.game_user_data_month(type,type_name,game_channel_id,game_identity,game_platform_id,ds,active_user_cut,play_user_cut,order_amount,money) + + +select + 'xzh' type + ,'新账号' type_name + ,t1.game_channel_id + ,t1.game_identity + ,t1.game_platform_id + ,substring(t1.ds,1,6) || '00' ds + ,COALESCE(sum(t1.active_user_cut),0) active_user_cut + ,COALESCE(sum(t1.play_user_cut),0) play_user_cut + ,COALESCE(sum(t1.order_amount),0) order_amount + ,COALESCE(sum(t1.money),0) money +from dws.game_user_data t1 +group by t1.game_channel_id + ,t1.game_identity + ,t1.game_platform_id + ,substring(t1.ds,1,6) || '00' + + +ON CONFLICT (type,type_name,game_channel_id,game_identity,game_platform_id,ds) DO +UPDATE SET active_user_cut = EXCLUDED.active_user_cut,play_user_cut = EXCLUDED.play_user_cut,order_amount = EXCLUDED.order_amount +,money = EXCLUDED.money +; + + diff --git a/sql/dws_before/game_user_data.sql b/sql/dws_before/game_user_data.sql new file mode 100644 index 0000000..20f22e4 --- /dev/null +++ b/sql/dws_before/game_user_data.sql @@ -0,0 +1,65 @@ +-- 统计游戏的活跃账号相关数据 +INSERT INTO dws.game_user_data(type,type_name,game_channel_id,game_identity,game_platform_id,ds,active_user_cut,play_user_cut,order_amount,money) + +select + 'hyzh' type + ,'活跃账号' type_name + ,COALESCE(t.game_channel_id,t1.game_channel_id) game_channel_id + , COALESCE(t.game_identity,t1.game_identity) game_identity + , COALESCE(t.game_platform_id,t1.game_platform_id) game_platform_id + , COALESCE(t.ds,t1.ds) ds + , COALESCE(sum(t.active_num), 0) active_user_cut + , COALESCE(sum(t1.pay_user_num), 0) play_user_cut + , COALESCE(sum(t1.order_amount), 0) order_amount + , COALESCE(SUM(t1.pay_amount), 0) money +FROM dwd.order_df t1 +FULL OUTER JOIN +dwd.active_account_df t +on t.game_channel_id = t1.game_channel_id +and t.game_identity = t1.game_identity +and t.game_platform_id = t1.game_platform_id +and t.ds_type = t1.ds_type +and t.ds = t1.ds +where t1.ds_type = 'd' +group by COALESCE(t.game_channel_id,t1.game_channel_id) + ,COALESCE(t.game_identity,t1.game_identity) + ,COALESCE(t.game_platform_id,t1.game_platform_id) + ,COALESCE(t.ds,t1.ds) + +ON CONFLICT (type,type_name,game_channel_id,game_identity,game_platform_id,ds) DO +UPDATE SET active_user_cut = EXCLUDED.active_user_cut,play_user_cut = EXCLUDED.play_user_cut,order_amount = EXCLUDED.order_amount +,money = EXCLUDED.money +; + + + +-- 增量统计游戏的新账号相关数据 + +INSERT INTO dws.game_user_data(type,type_name,game_channel_id,game_identity,game_platform_id,ds,active_user_cut,play_user_cut,order_amount,money) +select + 'xzh' type + ,'新账号' type_name + ,t1.game_channel_id + ,t1.game_identity + ,t1.game_platform_id + ,t1.ds + ,count(distinct t1.user_id) active_user_cut + ,count(distinct t2.user_id) play_user_cut + ,COALESCE(sum(t2.pay_amount),0) order_amount + ,COALESCE(SUM(t2.pay_amount),0) money +from +dim.first_user t1 +left join +"dwd"."order_user_df" t2 +on t1.relation_id=t2.relation_id +and t1.user_id=t2.user_id +and t1.ds=t2.ds +where t1.ds='${bizdate}' +group by t1.game_channel_id + ,t1.game_identity + ,t1.game_platform_id + ,t1.ds +ON CONFLICT (type,type_name,game_channel_id,game_identity,game_platform_id,ds) DO +UPDATE SET active_user_cut = EXCLUDED.active_user_cut,play_user_cut = EXCLUDED.play_user_cut,order_amount = EXCLUDED.order_amount +,money = EXCLUDED.money +; diff --git a/sql_job.py b/sql_job.py new file mode 100644 index 0000000..2dea04d --- /dev/null +++ b/sql_job.py @@ -0,0 +1,44 @@ +from utils.read_sql_files import read_sql_files +from apscheduler.schedulers.background import BlockingScheduler +from utils.log import Log +import logging + +# 配置logging以控制日志输出 +logger = logging.getLogger('apscheduler') +# 设置logger的级别为WARNING,这样INFO级别的日志就不会被处理了 +logger.setLevel(logging.WARNING) + + +def dim(): + read_sql_files('dim') + + +def dwd(): + read_sql_files('dwd_before') + read_sql_files('dwd_after') + + +def dws(): + read_sql_files('dws_before') + read_sql_files('dws_after') + + +def ads(): + read_sql_files('ads') + + +if __name__ == '__main__': + log = Log().getlog() + log.info(f"---------job开始执行:---------") + + sch = BlockingScheduler(timezone='Asia/Shanghai') + # 每天4点执行dim + sch.add_job(dim, 'cron', hour=4, minute=0) + # 每天5点执行执行dwd + sch.add_job(dwd, 'cron', hour=5, minute=0) + # 每天6点十分执行dws + sch.add_job(dws, 'cron', hour=6, minute=10) + # 每天七点十分执行ads + sch.add_job(ads, 'cron', hour=7, minute=10) + + sch.start() diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/utils/date_time.py b/utils/date_time.py new file mode 100644 index 0000000..84ab401 --- /dev/null +++ b/utils/date_time.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import datetime +import time + + +def str_time(): + return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) + + +def bizdate(day): + # 获取当前日期 + current_date = datetime.date.today() + # 计算前一天的日期 + previous_date = current_date - datetime.timedelta(days=day) + # 格式化日期输出 + return previous_date.strftime('%Y%m%d') + +def biz_date(day): + # 获取当前日期 + current_date = datetime.date.today() + # 计算前一天的日期 + previous_date = current_date - datetime.timedelta(days=day) + # 格式化日期输出 + return previous_date.strftime('%Y-%m-%d') + +def t_month(day): + # 获取当前日期 + current_date = datetime.date.today() + # 计算前一天的日期 + previous_date = current_date - datetime.timedelta(days=day) + # 格式化日期输出 + return previous_date.strftime('%Y%m00') + + +def get_ltv_day(): + day=(1, 2, 3, 4, 5, 6, 7, 14, 30, 60, 90, 150, 300) + return ','.join(["'"+bizdate(i)+"'" for i in day]) + +def get_parameter(): + return { + '${bizdate}': bizdate(1), + '${biz-date}': biz_date(1), + '${t_month}': t_month(1), + '${intra_day}': bizdate(0), + '${intra-day}': biz_date(0), + '${30_days_later}': bizdate(30), + '${60_days_later}': bizdate(60), + '${300_days_later}': bizdate(301), + '${ltv_day}': get_ltv_day(), + } + + +if __name__ == "__main__": + # hhh = { + # '${bizdate}': bizdate(1), + # '${t_month}': t_month(1), + # '${intra_day}': bizdate(0), + # '${30_days_later}': bizdate(30), + # '${300_days_later}': bizdate(301) + # } + print(get_ltv_day()) diff --git a/utils/execute_sql.py b/utils/execute_sql.py new file mode 100644 index 0000000..0ac0fc8 --- /dev/null +++ b/utils/execute_sql.py @@ -0,0 +1,71 @@ +import psycopg2 +from utils.log import Log +from utils.loadconfig import get_pg_config +import time + +def execute_sql(sql_statement, params=None): + log = Log().getlog() + + try: + # 先停五秒钟再执行sql + time.sleep(5) + db_config = get_pg_config() + with psycopg2.connect(**db_config) as connection: + status_message = '成功' if connection.status == psycopg2.extensions.STATUS_READY else '失败' + log.info(f"数据库: {db_config['database']} 是否连接成功: {status_message}") + + with connection.cursor() as cursor: + sql_list = [stmt.strip() for stmt in sql_statement.split(';') if stmt.strip()] + + for sql_str in sql_list: + log.info(f"执行中SQL:\n{sql_str}") + start_time = time.time() # 记录开始时间 + if params is not None: + cursor.execute(sql_str, params) + else: + cursor.execute(sql_str) + + elapsed_time = time.time() - start_time # 计算耗时 + log.info(f"SQL语句耗时: {elapsed_time:.4f} 秒") # 打印总耗时 + + connection.commit() + + return '完成' + + except psycopg2.Error as e: + elapsed_time = time.time() - start_time # 计算耗时 + log.error(f"执行SQL语句时出现错误: {e}\nSQL语句: {sql_statement}\n参数: {params}\n耗时: {elapsed_time:.4f} 秒") + return '失败' + except Exception as e: + elapsed_time = time.time() - start_time # 计算耗时 + log.error(f"发生未知错误: {e}\nSQL语句: {sql_statement}\n参数: {params}\n耗时: {elapsed_time:.4f} 秒") + return '失败' + + + +def select_execute_sql(sql_statement, params=None): + log = Log().getlog() + + re='' + try: + db_config = get_pg_config() + with psycopg2.connect(**db_config) as connection: + status_message = '成功' if connection.status == psycopg2.extensions.STATUS_READY else '失败' + log.info(f"数据库: {db_config['database']} 是否连接成功: {status_message}") + # 执行sql查询并打印查询结果 + with connection.cursor() as cursor: + sql_list = [stmt.strip() for stmt in sql_statement.split(';') if stmt.strip()] + for sql_str in sql_list: + log.info(f"执行中SQL:\n{sql_str}") + if params is not None: + # 将单个元素的元组转换为逗号分隔的字符串 + cursor.execute(sql_str, params) + else: + cursor.execute(sql_str) + re=cursor.fetchall() + return re + + except psycopg2.Error as e: + return [] + except Exception as e: + return [] \ No newline at end of file diff --git a/utils/loadconfig.py b/utils/loadconfig.py new file mode 100644 index 0000000..bdaeb21 --- /dev/null +++ b/utils/loadconfig.py @@ -0,0 +1,27 @@ +from configparser import ConfigParser +import os + + +ENVIRONMENT_VARIABLE = os.getenv("ENVIRONMENT_VARIABLE", "test") +def get_path(): + current_directory = os.path.dirname(os.path.abspath(__file__)) + root_path = os.path.abspath(os.path.dirname(current_directory) + os.path.sep + ".") + return root_path + + +def get_pg_config(): + conf = ConfigParser() + conf.read(get_path() + '/config/config.ini') + return { + "database": conf[ENVIRONMENT_VARIABLE]['database'], + "user": conf[ENVIRONMENT_VARIABLE]['user'], + "password": conf[ENVIRONMENT_VARIABLE]['password'], + "host": conf[ENVIRONMENT_VARIABLE]['host'], + "port": conf[ENVIRONMENT_VARIABLE]['port'], + # 设置默认超时时间statement_timeout=五分钟 + "options":"-c statement_timeout=800000 -c idle_in_transaction_session_timeout=900000" + } + + +if __name__ == '__main__': + print(get_path()) diff --git a/utils/log.py b/utils/log.py new file mode 100644 index 0000000..4b64d58 --- /dev/null +++ b/utils/log.py @@ -0,0 +1,69 @@ +import logging +import os +from datetime import datetime + +# 定义全局变量 log_path +cur_path = os.path.dirname(os.path.realpath(__file__)) +log_path = os.path.join(os.path.dirname(cur_path), 'logs') + + +class Log(): + def __init__(self, logger_name='my_logger'): + self.logger = logging.getLogger(logger_name) + if self.logger.hasHandlers(): + self.logger.handlers.clear() + self.logger.setLevel(logging.INFO) + + if not os.path.exists(log_path): + os.makedirs(log_path) + + self.update_log_file() + + def update_log_file(self): + current_date = datetime.now().strftime("%Y_%m_%d") + self.log_name = os.path.join(log_path, f'{current_date}.log') + + for handler in self.logger.handlers[:]: + self.logger.removeHandler(handler) + + fh = logging.FileHandler(self.log_name, 'a', encoding='utf-8') + fh.setLevel(logging.INFO) + + ch = logging.StreamHandler() + ch.setLevel(logging.INFO) + + formatter = logging.Formatter( + '[%(asctime)s] %(filename)s line:%(lineno)d [%(levelname)s]%(message)s', + datefmt="%Y-%m-%d %H:%M:%S" + ) + fh.setFormatter(formatter) + ch.setFormatter(formatter) + + self.logger.addHandler(fh) + self.logger.addHandler(ch) + + def getlog(self): + current_date = datetime.now().strftime("%Y_%m_%d") + log_date = os.path.basename(self.log_name).split('.')[0] + if current_date != log_date: + self.update_log_file() + return self.logger + + def info(self, msg, *args, **kwargs): + logger = self.getlog() + logger.info(msg, *args, **kwargs) + + def error(self, msg, *args, **kwargs): + logger = self.getlog() + logger.error(msg, *args, **kwargs) + + def warning(self, msg, *args, **kwargs): + logger = self.getlog() + logger.warning(msg, *args, **kwargs) + + +if __name__ == "__main__": + log = Log() + log.info("---测试开始----") + log.error("操作步骤1,2,3") + log.warning("----测试结束----") \ No newline at end of file diff --git a/utils/read_sql_files.py b/utils/read_sql_files.py new file mode 100644 index 0000000..6de0dff --- /dev/null +++ b/utils/read_sql_files.py @@ -0,0 +1,109 @@ +import os +from utils.execute_sql import execute_sql +from utils.date_time import get_parameter +from utils.log import Log +from utils.loadconfig import get_path + + +def read_sql_files(directory): + log = Log().getlog() + + + directory = get_path() + '/sql/' + directory + """ + 读取指定目录下所有.sql结尾的文件内容并打印 + + 参数: + directory (str): 要读取文件的目录路径 + + 返回: + None + """ + # 检查目录是否存在 + if not os.path.exists(directory): + log.warning(f"目录 {directory} 不存在,请检查路径。") + return + + # 检查目录是否为空 + if not os.listdir(directory): + log.warning(f"目录 {directory} 为空。") + return + + # 遍历目录及其子目录下的所有文件 + for root, dirs, files in os.walk(directory): + for file in files: + if file.endswith('.sql'): + file_path = os.path.join(root, file) + try: + with open(file_path, 'r', encoding='utf-8') as f: + content = f.read() + # 遍历字典,将字符串中的占位符替换为对应的值 + for key, value in get_parameter().items(): + content = content.replace(key, str(value)) + log.info(f"文件路径: {file_path}") + + try: + returnstr = execute_sql(content) + log.info(f"执行结果: {returnstr}") + log.info("-" * 50) + except Exception as e: + log.error(f"执行 SQL 文件 {file_path} 时出现错误: {e}") + + except UnicodeDecodeError: + log.error(f"文件 {file_path} 编码格式可能不是utf-8,无法正确读取,请检查。") + except Exception as e: + log.error(f"读取文件 {file_path} 时出现错误: {e}") + +def read_one_game_sql_files(directory,game_information): + log = Log().getlog() + + directory = get_path() + '/one_game_sql/' + directory + """ + 读取指定目录下所有.sql结尾的文件内容并打印 + + 参数: + directory (str): 要读取文件的目录路径 + + 返回: + None + """ + # 检查目录是否存在 + if not os.path.exists(directory): + log.warning(f"目录 {directory} 不存在,请检查路径。") + return + + # 检查目录是否为空 + if not os.listdir(directory): + log.warning(f"目录 {directory} 为空。") + return + + # 遍历目录及其子目录下的所有文件 + for root, dirs, files in os.walk(directory): + for file in files: + if file.endswith('.sql'): + file_path = os.path.join(root, file) + try: + with open(file_path, 'r', encoding='utf-8') as f: + content = f.read() + parameter=get_parameter() + parameter.update(game_information) + # 遍历字典,将字符串中的占位符替换为对应的值 + for key, value in parameter.items(): + content = content.replace(key, str(value)) + log.info(f"文件路径: {file_path}") + + try: + returnstr = execute_sql(content) + log.info(f"执行结果: {returnstr}") + log.info("-" * 50) + except Exception as e: + log.error(f"执行 SQL 文件 {file_path} 时出现错误: {e}") + + except UnicodeDecodeError: + log.error(f"文件 {file_path} 编码格式可能不是utf-8,无法正确读取,请检查。") + except Exception as e: + log.error(f"读取文件 {file_path} 时出现错误: {e}") + + +if __name__ == '__main__': + read_sql_files()