Dynamically passing parameters to an airflow operator through a dictionary
This is standard Python feature (** operator).
Use **arguments
to expand dictionary to keyword arguments Converting Python dict to kwargs?
Is it possible to pass user input from dag to sub dag in airflow?
I guess using variables is a good solution for a problem BUT users may overwrite each other changes (some issues can occur).
Alternative 1:
Airflow has a REST API on top which supports dag triggering functionality.
Request example:
curl -X POST \
'http://localhost:8080/api/experimental/dags/<DAG_ID>/dag_runs' \
--header 'Cache-Control: no-cache' \
--header 'Content-Type: application/json' \
--data '{"conf":"{\"key\":\"value\"}"}'
The data section can store some user input which will later be accessed in Airflow operators.
More documentation: https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_import_error
Alternative 2:
Airflow supports a CLI interface that can be used for triggering dags. You can specify extra configurations as a configuration parameter (-c
option). Configurations can store user input.
Comand format:
airflow trigger_dag [-h] [-sd SUBDIR] [-r RUN_ID] [-c CONF] [-e EXEC_DATE]
dag_id
More documentation: http://airflow.apache.org/docs/apache-airflow/1.10.5/cli.html#trigger_dag
StackOverflow question which shows how configuration parameters can be accessed in Airflow operators: Accessing configuration parameters passed to Airflow through CLI
How to dynamically create subdags in Airflow
I tried creating subdag
s dynamically as follows
# create and return and DAG
def create_subdag(dag_parent, dag_id_child_prefix, db_name):
# dag params
dag_id_child = '%s.%s' % (dag_parent.dag_id, dag_id_child_prefix + db_name)
default_args_copy = default_args.copy()
# dag
dag = DAG(dag_id=dag_id_child,
default_args=default_args_copy,
schedule_interval='@once')
# operators
tid_check = 'check2_db_' + db_name
py_op_check = PythonOperator(task_id=tid_check, dag=dag,
python_callable=check_sync_enabled,
op_args=[db_name])
tid_spark = 'spark2_submit_' + db_name
py_op_spark = PythonOperator(task_id=tid_spark, dag=dag,
python_callable=spark_submit,
op_args=[db_name])
py_op_check >> py_op_spark
return dag
# wrap DAG into SubDagOperator
def create_subdag_operator(dag_parent, db_name):
tid_subdag = 'subdag_' + db_name
subdag = create_subdag(dag_parent, tid_prefix_subdag, db_name)
sd_op = SubDagOperator(task_id=tid_subdag, dag=dag_parent, subdag=subdag)
return sd_op
# create SubDagOperator for each db in db_names
def create_all_subdag_operators(dag_parent, db_names):
subdags = [create_subdag_operator(dag_parent, db_name) for db_name in db_names]
# chain subdag-operators together
airflow.utils.helpers.chain(*subdags)
return subdags
# (top-level) DAG & operators
dag = DAG(dag_id=dag_id_parent,
default_args=default_args,
schedule_interval=None)
subdag_ops = create_subdag_operators(dag, db_names)
Note that the list of inputs for which subdag
s are created, here db_names
, can either be declared statically in the python
file or could be read from external source.
The resulting DAG
looks like this
Diving into SubDAG
(s)
How to pass dynamic arguments Airflow operator?
Not sure what you mean for 'dynamic', but when yaml file updated, if the reading file process is in dag file body, the dag will be refreshed to apply for the new args from yaml file. So actually, you don't need XCOM to get the arguments.
just simply create a params dictionary then pass to default_args:
CONFIGFILE = os.path.join(
os.path.dirname(os.path.realpath(\__file__)), 'your_yaml_file')
with open(CONFIGFILE, 'r') as ymlfile:
CFG = yaml.load(ymlfile)
default_args = {
'cluster_name': CFG['section_A']['cluster_name'], # edit here according to the structure of your yaml file.
'project_id': CFG['section_A']['project_id'],
'zone': CFG['section_A']['zone'],
'mage_version': CFG['section_A']['image_version'],
'num_workers': CFG['section_A']['num_workers'],
'worker_machine_type': CFG['section_A']['worker_machine_type'],
# you can add all needs params here.
}
DAG = DAG(
dag_id=DAG_NAME,
schedule_interval=SCHEDULE_INTEVAL,
default_args=default_args, # pass the params to DAG environment
)
Task1 = DataprocClusterCreateOperator(
task_id='your_task_id',
dag=DAG
)
But if you want dynamic dags rather than arguments, you may need other strategy like this.
So you probably need to figure out the basic idea:
In which level the dynamics is? Task level? DAG level?
Or you can create your own Operator to do the job and take the parameters.
Is there a way to pass a parameter to an airflow dag when triggering it manually
You can not change tasks dependency with runtime parameter.
However you can pass runtime parameter (with dag_run.conf
) that according to it's value tasks will be executed or be skipped for that you need to place operators in your workflow that can handle this logic for example: ShortCircuitOperator
, BranchPythonOperator
Related Topics
Django.Db.Utils.Operationalerror: (1045, Access Denied for User '<User>'@'Localhost'
Open() Gives Filenotfounderror/Ioerror: Errno 2 No Such File or Directory
Convert Np.Array of Type Float64 to Type Uint8 Scaling Values
Interactive Matplotlib Figures in Google Colab
Adding Months to a Pandas Object in Python
Filtering Dataframe Using the Length of a Column
Using Continue in a Try and Except Inside While-Loop
Package Only Binary Compiled .So Files of a Python Library Compiled With Cython
How to Use Ffmpeg in a Python Function
Most Efficient Way to Forward-Fill Nan Values in Numpy Array
Calculate Final Letter Grade in Python Given 4 Test Scores
How to Force Pip to Reinstall the Current Version
How to Append New Data Onto a New Line
Pass Variable Between Python Scripts
Converting Numpy Dtypes to Native Python Types
Codehs Python, Remove All from String
Numpy Import Throws Attributeerror: 'Module' Object Has No Attribute 'Core'