One - One Code All

Blog Content

airflow中dag上下文context传参数

Python 并行计算   2014-02-15 23:41:32

dag

def my_py_command(ds, **kwargs):
    print(kwargs)
    print(kwargs.get('dag_run').conf.get('foo'))
    
    print(" 'miff' was passed in via task params = {}".format(kwargs["params"]["miff"]))
    logging.info(" 'miff' was passed in via task params = {}".format(kwargs["params"]["miff"]))
    return 1

my_templated_command = """
    echo " 'foo was passed in via Airflow CLI Test command with value {{ params.foo }} "
    echo " 'miff was passed in via BashOperator with value {{ params.miff }} "
"""

run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=my_py_command,
    params={"miff":"agg"},
    dag=dag)


每个DAG 实例都有一个上下文的概念,以context参数的形式会透传给所有的任务,以及所有任务的回调函数。

context的内容----print(kwargs)    如下:

{
    u'next_execution_date': None,
    u'dag_run': ,
    u'tomorrow_ds_nodash': u'20140215',
    u'run_id': 'manual__2014-04-14T18:28:07',
    u'dag': ,
    u'prev_execution_date': None,
   
 u'conf': ,
    u'tables': None,
    u'task_instance_key_str': u'example_passing_params_via_test_command__run_this__20140214',
    u'END_DATE': '2014-04-14',
    u'execution_date': datetime.datetime(2014, 2, 14, 18, 28, 7),
    u'ts': '2014-02-14T18:28:07',
   
 u'macros': ,
    u'params': {'miff': 'agg'},
    u'ti': ,
    u'var': {u'json': None, u'value': None},
    u'ds_nodash': u'20140214',
    u'test_mode': False,
    u'end_date': '2014-04-14',
    'templates_dict': None,
    u'task': ,
    u'task_instance': ,
    u'yesterday_ds_nodash': u'20140213',
    u'latest_date': '2014-04-14',
    u'yesterday_ds': '2014-04-13',
    u'ts_nodash': u'20140214T182807',
    u'tomorrow_ds': '2014-04-15'
}

实例参数使用pickle序列化存储在dag_run表中

字段类型如下

conf = Column(PickleType)
在执行PythonOperator时,会将上下文context参数,传递给回调函数中的self.op_kwargs

在执行Operator时,就可以从上下文实例中获取DagRun实例

kwargs.get('dag_run')
再从DagRun实例中获取conf参数,值为json对象类型

dag_run_conf = kwargs.get('dag_run').conf


上一篇:airflow给task传参数-tp json
下一篇:Airbnb开源项目airflow

The minute you think of giving up, think of the reason why you held on so long.