在Airflow2.0中使用TaskflowAPI传递争论
我正在使用 REST API 将参数传递给基于任务流的 Dag。看看这个论坛上提出的类似问题,下面似乎是访问传递参数的常用方法。
#From inside a template field or file:
{{ dag_run.conf['key'] }}
#Or when context is available, e.g. within a python callable of the PythonOperator:
context['dag_run'].conf['key']
我试图获取上下文字典
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), params=None)
def classic_xgb(**context):
"""
### TaskFlow API Tutorial Documentation
[here](https://airflow.apache.org/docs/stable/tutorial_taskflow_api.html)
"""
@task()
def extract():
print("context is ", context)
输出是 <airflow.models.dagparam.DagParam object at 0x7f735c634510> 现在我如何获得作为输入参数传递给 Dag 的 conf 字典?我需要在我的 python 代码中使用参数,因此模板选项对我来说似乎不可行。
非常感激任何的帮助。
谢谢
此致,
阿迪尔
回答
get_current_context()在 Airflow 2.0 中有一个新函数来获取上下文。获得上下文字典后,“params”键包含通过 REST API 发送到 Dag 的参数。下面的代码解决了这个问题。
from airflow.operators.python import task, get_current_context
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2))
def classic_xgb(**kwargs):
"""
@task()
def extract():
context = get_current_context()