Airflow:Passing a Dynamic Value to Sub Dag Operator

Dynamically passing parameters to an airflow operator through a dictionary

This is standard Python feature (** operator).

Use **arguments to expand dictionary to keyword arguments Converting Python dict to kwargs?

Is it possible to pass user input from dag to sub dag in airflow?

I guess using variables is a good solution for a problem BUT users may overwrite each other changes (some issues can occur).

Alternative 1:

Airflow has a REST API on top which supports dag triggering functionality.

Request example:

curl -X POST \
'http://localhost:8080/api/experimental/dags/<DAG_ID>/dag_runs' \
--header 'Cache-Control: no-cache' \
--header 'Content-Type: application/json' \
--data '{"conf":"{\"key\":\"value\"}"}'

The data section can store some user input which will later be accessed in Airflow operators.

More documentation: https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_import_error

Alternative 2:

Airflow supports a CLI interface that can be used for triggering dags. You can specify extra configurations as a configuration parameter (-c option). Configurations can store user input.

Comand format:

airflow trigger_dag [-h] [-sd SUBDIR] [-r RUN_ID] [-c CONF] [-e EXEC_DATE]
dag_id

More documentation: http://airflow.apache.org/docs/apache-airflow/1.10.5/cli.html#trigger_dag

StackOverflow question which shows how configuration parameters can be accessed in Airflow operators: Accessing configuration parameters passed to Airflow through CLI

How to dynamically create subdags in Airflow

I tried creating subdags dynamically as follows

# create and return and DAG
def create_subdag(dag_parent, dag_id_child_prefix, db_name):
# dag params
dag_id_child = '%s.%s' % (dag_parent.dag_id, dag_id_child_prefix + db_name)
default_args_copy = default_args.copy()

# dag
dag = DAG(dag_id=dag_id_child,
default_args=default_args_copy,
schedule_interval='@once')

# operators
tid_check = 'check2_db_' + db_name
py_op_check = PythonOperator(task_id=tid_check, dag=dag,
python_callable=check_sync_enabled,
op_args=[db_name])

tid_spark = 'spark2_submit_' + db_name
py_op_spark = PythonOperator(task_id=tid_spark, dag=dag,
python_callable=spark_submit,
op_args=[db_name])

py_op_check >> py_op_spark
return dag

# wrap DAG into SubDagOperator
def create_subdag_operator(dag_parent, db_name):
tid_subdag = 'subdag_' + db_name
subdag = create_subdag(dag_parent, tid_prefix_subdag, db_name)
sd_op = SubDagOperator(task_id=tid_subdag, dag=dag_parent, subdag=subdag)
return sd_op

# create SubDagOperator for each db in db_names
def create_all_subdag_operators(dag_parent, db_names):
subdags = [create_subdag_operator(dag_parent, db_name) for db_name in db_names]
# chain subdag-operators together
airflow.utils.helpers.chain(*subdags)
return subdags


# (top-level) DAG & operators
dag = DAG(dag_id=dag_id_parent,
default_args=default_args,
schedule_interval=None)

subdag_ops = create_subdag_operators(dag, db_names)

Note that the list of inputs for which subdags are created, here db_names, can either be declared statically in the python file or could be read from external source.

The resulting DAG looks like this
enter image description here
enter image description here

Diving into SubDAG(s)

enter image description here

enter image description here

How to pass dynamic arguments Airflow operator?

Not sure what you mean for 'dynamic', but when yaml file updated, if the reading file process is in dag file body, the dag will be refreshed to apply for the new args from yaml file. So actually, you don't need XCOM to get the arguments.
just simply create a params dictionary then pass to default_args:

CONFIGFILE = os.path.join(
os.path.dirname(os.path.realpath(\__file__)), 'your_yaml_file')

with open(CONFIGFILE, 'r') as ymlfile:
CFG = yaml.load(ymlfile)

default_args = {
'cluster_name': CFG['section_A']['cluster_name'], # edit here according to the structure of your yaml file.
'project_id': CFG['section_A']['project_id'],
'zone': CFG['section_A']['zone'],
'mage_version': CFG['section_A']['image_version'],
'num_workers': CFG['section_A']['num_workers'],
'worker_machine_type': CFG['section_A']['worker_machine_type'],
# you can add all needs params here.
}

DAG = DAG(
dag_id=DAG_NAME,
schedule_interval=SCHEDULE_INTEVAL,
default_args=default_args, # pass the params to DAG environment
)

Task1 = DataprocClusterCreateOperator(
task_id='your_task_id',
dag=DAG
)

But if you want dynamic dags rather than arguments, you may need other strategy like this.

So you probably need to figure out the basic idea:
In which level the dynamics is? Task level? DAG level?

Or you can create your own Operator to do the job and take the parameters.

Is there a way to pass a parameter to an airflow dag when triggering it manually

You can not change tasks dependency with runtime parameter.

However you can pass runtime parameter (with dag_run.conf) that according to it's value tasks will be executed or be skipped for that you need to place operators in your workflow that can handle this logic for example: ShortCircuitOperator, BranchPythonOperator



Related Topics



Leave a reply



Submit