使用jinja模板中的气流连接

我正在尝试使用环境变量将 DB 参数传递给 BashOperator,但我找不到任何文档/示例如何使用 Jinja 模板中的连接。

所以我正在寻找类似于变量的东西

echo {{ var.value.<variable_name> }}

回答

对于气流 >= 2.2.0

假设您有 conn id,test_conn您可以通过以下方式直接使用宏:

{{ conn.test_conn }} 所以你得到任何连接属性,如:

{{ conn.test_conn.host }}, {{ conn.test_conn.login }},{{ conn.test_conn.password }}等等。

对于气流 < 2.2.0

没有现成的宏,但是您可以创建自定义宏来解决这个问题。

连接示例:

创建宏:

def get_host(conn_id):
    connection = BaseHook.get_connection(conn_id)
    return connection.host

def get_schema(conn_id):
    connection = BaseHook.get_connection(conn_id)
    return connection.schema

def get_login(conn_id):
    connection = BaseHook.get_connection(conn_id)
    return connection.login

在 DAG 中使用它们:

def print_function(**context):
    print(f"host={context['host']} schema={context['schema']} login={context['login']}")

user_macros = {
    'get_host': get_host,
    'get_schema': get_schema,
    'get_login': get_login,
}

with DAG(
    dag_id='connection',
    default_args=default_args,
    schedule_interval=None,
    user_defined_macros=user_macros,
) as dag:

# Example how to use as function
python_op = PythonOperator( 
    task_id='python_task',
    provide_context=True,
    python_callable=print_function,
    op_kwargs={
        'host': get_host("test_conn"),
        'schema': get_schema("test_conn"),
        'login': get_login("test_conn"),
    }
)

# Example how to use as Jinja string
bash_op = BashOperator( 
    task_id='bash_task',
    bash_command='echo {{ get_host("test_conn") }} {{ get_schema("test_conn") }} {{ get_login("test_conn") }} ',
)

渲染PythonOperator示例:

渲染BashOperator示例:

一般说明:此代码的作用是创建一个自定义函数func()以供使用,user_defined_macros从而提供使用它的能力,就像 Airflow 本身定义的这个宏一样。您可以通过以下方式访问模板:{{ func() }}如示例中所示,该函数允许接受参数。

请注意,您可以为连接对象中的所有字段创建此类函数。

请谨慎使用它,将密码作为文本传递可能不是一个好主意。


以上是使用jinja模板中的气流连接的全部内容。
THE END
分享
二维码
< <上一篇
下一篇>>