优化sql
This commit is contained in:
		
							parent
							
								
									54b3253ff9
								
							
						
					
					
						commit
						e1c1ad75d4
					
				
							
								
								
									
										45
									
								
								job_test.py
									
									
									
									
									
								
							
							
						
						
									
										45
									
								
								job_test.py
									
									
									
									
									
								
							| @ -1,26 +1,35 @@ | ||||
| from utils.read_sql_files import read_sql_files | ||||
| from utils.loadconfig import get_path | ||||
| from utils.log import Log | ||||
| from utils.date_time import get_variable_parameter | ||||
| from utils.execute_sql import execute_sql | ||||
| 
 | ||||
| 
 | ||||
| def dim(): | ||||
|     read_sql_files('dim') | ||||
| def read_sql_files(directory, day): | ||||
|     log = Log().getlog() | ||||
|     directory = get_path() + directory | ||||
| 
 | ||||
|     try: | ||||
|         with open(directory, 'r', encoding='utf-8') as f: | ||||
|             content = f.read() | ||||
|             # 遍历字典,将字符串中的占位符替换为对应的值 | ||||
|             for key, value in get_variable_parameter(day).items(): | ||||
|                 content = content.replace(key, str(value)) | ||||
|             log.info(f"文件路径: {directory}") | ||||
| 
 | ||||
| def dwd(): | ||||
|     read_sql_files('dwd_before') | ||||
|     read_sql_files('dwd_after') | ||||
|             try: | ||||
|                 returnstr = execute_sql(content) | ||||
|                 log.info(f"执行结果: {returnstr}") | ||||
|                 log.info("-" * 50) | ||||
|             except Exception as e: | ||||
|                 log.error(f"执行 SQL 文件 {directory} 时出现错误: {e}") | ||||
| 
 | ||||
| 
 | ||||
| def dws(): | ||||
|     read_sql_files('dws_before') | ||||
|     read_sql_files('dws_after') | ||||
| 
 | ||||
| 
 | ||||
| def ads(): | ||||
|     read_sql_files('ads') | ||||
|     except UnicodeDecodeError: | ||||
|         log.error(f"文件 {directory} 编码格式可能不是utf-8,无法正确读取,请检查。") | ||||
|     except Exception as e: | ||||
|         log.error(f"读取文件 {directory} 时出现错误: {e}") | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|     # dim() | ||||
|     # dwd() | ||||
|     # dws() | ||||
|     ads() | ||||
|     # 遍历300到0 | ||||
|     for day in range(301): | ||||
|         read_sql_files('/sql/ads/user_retention.sql', day) | ||||
|  | ||||
| @ -30,13 +30,25 @@ left join | ||||
|     dwd.active_df t2 | ||||
| on  t1.relation_id=t2.relation_id | ||||
| and t1.user_id=t2.user_id | ||||
| and t2.ds='${bizdate}' | ||||
| AND t2.ds_date-t1.ds_date IN (1,2,3,4,5,6,7,14,30,60,90,150,300) | ||||
| -- AND t2.ds_date >= t1.ds_date - 300 | ||||
| where t1.ds>='${300_days_later}' | ||||
| where t1.ds>='${300_days_later}' and t1.ds<='${bizdate}' | ||||
| group by  t1.relation_id | ||||
|             ,t1.prodid | ||||
|             ,t1.ds | ||||
| ON CONFLICT (relation_id,prodid,ds) | ||||
| DO UPDATE SET num = EXCLUDED.num,ltv1 = EXCLUDED.ltv1,ltv2 = EXCLUDED.ltv2,ltv3 = EXCLUDED.ltv3,ltv4 = EXCLUDED.ltv4,ltv5 = EXCLUDED.ltv5,ltv6 = EXCLUDED.ltv6,ltv7 = EXCLUDED.ltv7,ltv14 = EXCLUDED.ltv14,ltv30 = EXCLUDED.ltv30,ltv60 = EXCLUDED.ltv60 | ||||
|     ,ltv90 = EXCLUDED.ltv90,ltv150 = EXCLUDED.ltv150,ltv300 = EXCLUDED.ltv300 | ||||
| ON CONFLICT (relation_id,prodid,ds) DO UPDATE SET | ||||
| num = CASE WHEN EXCLUDED.num <> 0 THEN EXCLUDED.num ELSE ads.order_retention.num END, | ||||
| ltv1 = CASE WHEN EXCLUDED.ltv1 <> 0 THEN EXCLUDED.ltv1 ELSE ads.order_retention.ltv1 END, | ||||
| ltv2 = CASE WHEN EXCLUDED.ltv2 <> 0 THEN EXCLUDED.ltv2 ELSE ads.order_retention.ltv2 END, | ||||
| ltv3 = CASE WHEN EXCLUDED.ltv3 <> 0 THEN EXCLUDED.ltv3 ELSE ads.order_retention.ltv3 END, | ||||
| ltv4 = CASE WHEN EXCLUDED.ltv4 <> 0 THEN EXCLUDED.ltv4 ELSE ads.order_retention.ltv4 END, | ||||
| ltv5 = CASE WHEN EXCLUDED.ltv5 <> 0 THEN EXCLUDED.ltv5 ELSE ads.order_retention.ltv5 END, | ||||
| ltv6 = CASE WHEN EXCLUDED.ltv6 <> 0 THEN EXCLUDED.ltv6 ELSE ads.order_retention.ltv6 END, | ||||
| ltv7 = CASE WHEN EXCLUDED.ltv7 <> 0 THEN EXCLUDED.ltv7 ELSE ads.order_retention.ltv7 END, | ||||
| ltv14 = CASE WHEN EXCLUDED.ltv14 <> 0 THEN EXCLUDED.ltv14 ELSE ads.order_retention.ltv14 END, | ||||
| ltv30 = CASE WHEN EXCLUDED.ltv30 <> 0 THEN EXCLUDED.ltv30 ELSE ads.order_retention.ltv30 END, | ||||
| ltv60 = CASE WHEN EXCLUDED.ltv60 <> 0 THEN EXCLUDED.ltv60 ELSE ads.order_retention.ltv60 END, | ||||
| ltv90 = CASE WHEN EXCLUDED.ltv90 <> 0 THEN EXCLUDED.ltv90 ELSE ads.order_retention.ltv90 END, | ||||
| ltv150 = CASE WHEN EXCLUDED.ltv150 <> 0 THEN EXCLUDED.ltv150 ELSE ads.order_retention.ltv150 END, | ||||
| ltv300 = CASE WHEN EXCLUDED.ltv300 <> 0 THEN EXCLUDED.ltv300 ELSE ads.order_retention.ltv300 END | ||||
| ; | ||||
| @ -3,7 +3,6 @@ | ||||
| -- first_user是用户首次登陆时间 | ||||
| -- lately_user是用户最后一次登陆时间 | ||||
| -- 2.用户首次登陆去关联用户最后一次登陆时间他们的时间差就是用户的流失时间 | ||||
| 
 | ||||
| INSERT INTO ads.user_retention ( relation_id,prodid,ds,num,ltv1,ltv2,ltv3,ltv4,ltv5,ltv6,ltv7,ltv14,ltv30,ltv60,ltv90,ltv150,ltv300) | ||||
| select | ||||
|     t1.relation_id | ||||
| @ -30,12 +29,26 @@ left join | ||||
| dwd.active_df t2 | ||||
| on   t1.relation_id=t2.relation_id | ||||
| and t1.user_id=t2.user_id | ||||
| and t2.ds='${bizdate}' | ||||
| AND t2.ds_date-t1.ds_date IN (1,2,3,4,5,6,7,14,30,60,90,150,300) | ||||
| where t1.ds>='${300_days_later}' | ||||
| where t1.ds>='${300_days_later}' and t1.ds<='${bizdate}' | ||||
| group by  t1.relation_id | ||||
|     ,t1.prodid | ||||
|     ,t1.ds | ||||
| ON CONFLICT (relation_id,prodid,ds) | ||||
| DO UPDATE SET num = EXCLUDED.num,ltv1 = EXCLUDED.ltv1,ltv2 = EXCLUDED.ltv2,ltv3 = EXCLUDED.ltv3,ltv4 = EXCLUDED.ltv4,ltv5 = EXCLUDED.ltv5,ltv6 = EXCLUDED.ltv6,ltv7 = EXCLUDED.ltv7,ltv14 = EXCLUDED.ltv14,ltv30 = EXCLUDED.ltv30,ltv60 = EXCLUDED.ltv60 | ||||
|     ,ltv90 = EXCLUDED.ltv90,ltv150 = EXCLUDED.ltv150,ltv300 = EXCLUDED.ltv300 | ||||
| ON CONFLICT (relation_id,prodid,ds) DO UPDATE SET | ||||
| num = CASE WHEN EXCLUDED.num <> 0 THEN EXCLUDED.num ELSE ads.user_retention.num END, | ||||
| ltv1 = CASE WHEN EXCLUDED.ltv1 <> 0 THEN EXCLUDED.ltv1 ELSE ads.user_retention.ltv1 END, | ||||
| ltv2 = CASE WHEN EXCLUDED.ltv2 <> 0 THEN EXCLUDED.ltv2 ELSE ads.user_retention.ltv2 END, | ||||
| ltv3 = CASE WHEN EXCLUDED.ltv3 <> 0 THEN EXCLUDED.ltv3 ELSE ads.user_retention.ltv3 END, | ||||
| ltv4 = CASE WHEN EXCLUDED.ltv4 <> 0 THEN EXCLUDED.ltv4 ELSE ads.user_retention.ltv4 END, | ||||
| ltv5 = CASE WHEN EXCLUDED.ltv5 <> 0 THEN EXCLUDED.ltv5 ELSE ads.user_retention.ltv5 END, | ||||
| ltv6 = CASE WHEN EXCLUDED.ltv6 <> 0 THEN EXCLUDED.ltv6 ELSE ads.user_retention.ltv6 END, | ||||
| ltv7 = CASE WHEN EXCLUDED.ltv7 <> 0 THEN EXCLUDED.ltv7 ELSE ads.user_retention.ltv7 END, | ||||
| ltv14 = CASE WHEN EXCLUDED.ltv14 <> 0 THEN EXCLUDED.ltv14 ELSE ads.user_retention.ltv14 END, | ||||
| ltv30 = CASE WHEN EXCLUDED.ltv30 <> 0 THEN EXCLUDED.ltv30 ELSE ads.user_retention.ltv30 END, | ||||
| ltv60 = CASE WHEN EXCLUDED.ltv60 <> 0 THEN EXCLUDED.ltv60 ELSE ads.user_retention.ltv60 END, | ||||
| ltv90 = CASE WHEN EXCLUDED.ltv90 <> 0 THEN EXCLUDED.ltv90 ELSE ads.user_retention.ltv90 END, | ||||
| ltv150 = CASE WHEN EXCLUDED.ltv150 <> 0 THEN EXCLUDED.ltv150 ELSE ads.user_retention.ltv150 END, | ||||
| ltv300 = CASE WHEN EXCLUDED.ltv300 <> 0 THEN EXCLUDED.ltv300 ELSE ads.user_retention.ltv300 END | ||||
| ; | ||||
| 
 | ||||
|  | ||||
| @ -1,5 +1,32 @@ | ||||
| -- 取登陆最早时间 逻辑:如果之前没出现过,那插入这条记录做最早用户登陆时间 | ||||
| -- 首次写入数据sql | ||||
| -- INSERT INTO dim.first_user(game_channel_id,game_identity,game_platform_id,relation_id,prodid,user_id,ds,ds_date) | ||||
| -- WITH active_account_tmp AS ( | ||||
| --   SELECT distinct | ||||
| --     t1.channel_id, | ||||
| --     t1.game_identity, | ||||
| --     t1.platform_id, | ||||
| --     t1.user_id, | ||||
| --     '${bizdate}' AS ds  -- 直接赋值替代min(ds) | ||||
| --   FROM ods.active_account_list t1 | ||||
| --   WHERE t1.ds = '${bizdate}' | ||||
| -- ) | ||||
| -- SELECT t2.game_channel_id | ||||
| --         ,t2.game_identity | ||||
| --         ,t2.game_platform_id | ||||
| --         ,t2.relation_id | ||||
| --         ,t2.product_id prodid | ||||
| --         ,t1.user_id | ||||
| --         ,t1.ds ds | ||||
| --         ,'${biz-date}' ds_date | ||||
| -- FROM    active_account_tmp t1 | ||||
| -- inner join | ||||
| -- dim.game_product_relation t2 | ||||
| -- ON      t1.channel_id = t2.game_channel_id | ||||
| -- AND     t1.game_identity = t2.game_identity | ||||
| -- AND     t1.platform_id = t2.game_platform_id | ||||
| -- ON CONFLICT (relation_id, user_id) | ||||
| -- DO NOTHING; | ||||
| 
 | ||||
| INSERT INTO dim.first_user(game_channel_id,game_identity,game_platform_id,relation_id,prodid,user_id,ds,ds_date) | ||||
| WITH active_account_tmp AS ( | ||||
|  | ||||
| @ -13,5 +13,4 @@ GROUP BY t2.relation_id | ||||
|         ,t1.user_id | ||||
|         ,t1.ds | ||||
| ON CONFLICT (relation_id, user_id, ds) | ||||
|     DO UPDATE SET ds_date = EXCLUDED.ds_date | ||||
| ; | ||||
| DO NOTHING; | ||||
| @ -51,6 +51,19 @@ def get_parameter(): | ||||
|         '${ltv_day}': get_ltv_day(), | ||||
|     } | ||||
| 
 | ||||
| def get_variable_parameter(day): | ||||
|     return { | ||||
|         '${bizdate}': bizdate(1+day), | ||||
|         '${biz-date}': biz_date(1+day), | ||||
|         '${t_month}': t_month(1+day), | ||||
|         '${intra_day}': bizdate(0+day), | ||||
|         '${intra-day}': biz_date(0+day), | ||||
|         '${30_days_later}': bizdate(30+day), | ||||
|         '${60_days_later}': bizdate(60+day), | ||||
|         '${300_days_later}': bizdate(301+day), | ||||
|         '${ltv_day}': get_ltv_day(), | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|     # hhh = { | ||||
| @ -60,4 +73,4 @@ if __name__ == "__main__": | ||||
|     #     '${30_days_later}': bizdate(30), | ||||
|     #     '${300_days_later}': bizdate(301) | ||||
|     # } | ||||
|     print(get_ltv_day()) | ||||
|     print(get_variable_parameter(1)) | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user