One - One Code All

Blog Content

airflow给给DAG实例传递参数

Python 并行计算   2014-01-22 22:16:10


在创建dag实例时传递参数,每个任务都可以从任务实例中获取需要的参数。

创建一个DAG实例


$ airflow trigger_dag -h

{__init__.py:57} INFO - Using executor CeleryExecutor
usage: airflow trigger_dag [-h] [-sd SUBDIR] [-r RUN_ID] [-c CONF]
                           [-e EXEC_DATE]
                           dag_id

positional arguments:
  dag_id                The id of the dag

optional arguments:
  -h, --help            show this help message and exit
  -sd SUBDIR, --subdir SUBDIR
                        File location or directory from which to look for the
                        dag
  -r RUN_ID, --run_id RUN_ID
                        Helps to identify this run
  -c CONF, --conf CONF  JSON string that gets pickled into the DagRun's conf
                        attribute
  -e EXEC_DATE, --exec_date EXEC_DATE
                        The execution date of the DAG



我们把json格式的字符串参数 '{"foo":"bar"}' 传递给DAG实例,如下

airflow trigger_dag example_passing_params_via_test_command -c '{"foo":"bar"}'

任务获取实例参数

def my_py_command(ds, **kwargs):
    logging.info(kwargs)
    logging.info(kwargs.get('dag_run').conf.get('foo'))
    # Print out the "foo" param passed in via
    # `airflow test example_passing_params_via_test_command run_this 
    # -tp '{"foo":"bar"}'`
    if kwargs["test_mode"]:
        print(" 'foo' was passed in via test={} command : kwargs[params][foo] \
               = {}".format(kwargs["test_mode"], kwargs["params"]["foo"]))
        logging.info(" 'foo' was passed in via test={} command : kwargs[params][foo] \
               = {}".format(kwargs["test_mode"], kwargs["params"]["foo"]))
    # Print out the value of "miff", passed in below via the Python Operator
    print(" 'miff' was passed in via task params = {}".format(kwargs["params"]["miff"]))
    logging.info(" 'miff' was passed in via task params = {}".format(kwargs["params"]["miff"]))
    return 1

my_templated_command = """
    echo " 'foo was passed in via Airflow CLI Test command with value {{ params.foo }} "
    echo " 'miff was passed in via BashOperator with value {{ params.miff }} "
"""

run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=my_py_command,
    params={"miff":"agg"},
    dag=dag)

包含logging的代码部分就是获取参数的地方

每个DAG 实例都有一个上下文的概念,以context参数的形式会透传给所有的任务,以及所有任务的回调函数。

实例参数使用pickle序列化存储在dag_run表中

字段类型如下

conf = Column(PickleType)

在执行PythonOperator时,会将上下文context参数,传递给回调函数中的self.op_kwargs

源码在 class PythonOperator(BaseOperator)

在执行Operator时,就可以从上下文实例中获取DagRun实例

kwargs.get('dag_run')

再从DagRun实例中获取conf参数,值为json对象类型

dag_run_conf = kwargs.get('dag_run').conf





上一篇:Python扫描指定目录下的文件,或者匹配指定后缀和前缀
下一篇:airflow给task传参数-tp json

The minute you think of giving up, think of the reason why you held on so long.