from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_dataworks_public20240518.client import Client as dataworks_public20240518Client from utils.loadconfig import get_ali_config from alibabacloud_dataworks_public20240518 import models as dataworks_public_20240518_models from alibabacloud_tea_util import models as util_models import json from alibabacloud_tea_util.client import Client as UtilClient from utils.log import Log def create_client(env): # 创建一个新的Log实例,确保每天创建一个新的日志文件 log = Log().getlog() db_config = get_ali_config(env) log.info(db_config) config = open_api_models.Config( # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。, access_key_id=db_config.get('access_key_id'), access_key_secret=db_config.get('access_key_secret') # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。, ) # Endpoint 请参考 https://api.aliyun.com/product/dataworks-public config.endpoint =db_config.get('endpoint') return dataworks_public20240518Client(config) # 获取任务列表 def get_listtask(client,project_id): list_tasks_request = dataworks_public_20240518_models.ListTasksRequest( project_id=project_id ) runtime = util_models.RuntimeOptions() try: # 复制代码运行请自行打印 API 的返回值 response = client.list_tasks_with_options(list_tasks_request, runtime) # 打印 API 的返回值 return json.dumps(response.to_map(), indent=2, ensure_ascii=False) except Exception as error: # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 # 错误 message print(error.message) # 诊断地址 print(error.data.get("Recommend")) UtilClient.assert_as_string(error.message) def get_task_detail(client,task_id,project_env): get_task_request = dataworks_public_20240518_models.GetTaskRequest( id=1023194720, project_env=project_env ) runtime = util_models.RuntimeOptions() try: # 调用 API 并获取返回值 response = client.get_task_with_options(get_task_request, runtime) # 打印 API 的返回值 return json.dumps(response.to_map(), indent=2, ensure_ascii=False) except Exception as error: print(f"Error occurred: {error.message}") if hasattr(error, 'data') and error.data: print(f"Recommend: {error.data.get('Recommend')}")