aliapi/utils/ali_client.py
2025-02-05 14:27:23 +08:00

63 lines
2.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_dataworks_public20240518.client import Client as dataworks_public20240518Client
from utils.loadconfig import get_ali_config
from alibabacloud_dataworks_public20240518 import models as dataworks_public_20240518_models
from alibabacloud_tea_util import models as util_models
import json
from alibabacloud_tea_util.client import Client as UtilClient
from utils.log import Log
def create_client(env):
# 创建一个新的Log实例确保每天创建一个新的日志文件
log = Log().getlog()
db_config = get_ali_config(env)
log.info(db_config)
config = open_api_models.Config(
# 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。,
access_key_id=db_config.get('access_key_id'),
access_key_secret=db_config.get('access_key_secret')
# 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。,
)
# Endpoint 请参考 https://api.aliyun.com/product/dataworks-public
config.endpoint =db_config.get('endpoint')
return dataworks_public20240518Client(config)
# 获取任务列表
def get_listtask(client,project_id):
list_tasks_request = dataworks_public_20240518_models.ListTasksRequest(
project_id=project_id
)
runtime = util_models.RuntimeOptions()
try:
# 复制代码运行请自行打印 API 的返回值
response = client.list_tasks_with_options(list_tasks_request, runtime)
# 打印 API 的返回值
return json.dumps(response.to_map(), indent=2, ensure_ascii=False)
except Exception as error:
# 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
# 错误 message
print(error.message)
# 诊断地址
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
def get_task_detail(client,task_id,project_env):
get_task_request = dataworks_public_20240518_models.GetTaskRequest(
id=1023194720,
project_env=project_env
)
runtime = util_models.RuntimeOptions()
try:
# 调用 API 并获取返回值
response = client.get_task_with_options(get_task_request, runtime)
# 打印 API 的返回值
return json.dumps(response.to_map(), indent=2, ensure_ascii=False)
except Exception as error:
print(f"Error occurred: {error.message}")
if hasattr(error, 'data') and error.data:
print(f"Recommend: {error.data.get('Recommend')}")