Dynamic tasks in airflow based on an external file
Depending on your requirements, if what you are looking for is to avoid reading a file many times, but you don't mind reading from the metadata database as many times instead, then you could change your approach to use Variables
as the source of iteration to dynamically create tasks.
A basic example could be performing the file reading inside a PythonOperator
and set the Variables
you will use to iterate later on (same callable):
sample_file.json:
{
"cities": [ "London", "Paris", "BA", "NY" ]
}
Task definition:
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.utils.task_group import TaskGroup
import json
def _read_file():
with open('dags/sample_file.json') as f:
data = json.load(f)
Variable.set(key='list_of_cities',
value=data['cities'], serialize_json=True)
print('Loading Variable from file...')
def _say_hello(city_name):
print('hello from ' + city_name)
with DAG('dynamic_tasks_from_var', schedule_interval='@once',
start_date=days_ago(2),
catchup=False) as dag:
read_file = PythonOperator(
task_id='read_file',
python_callable=_read_file
)
Then you could read from that variable and create the dynamic tasks. (It's important to set a default_var
). The TaskGroup
is optional.
# Top-level code
updated_list = Variable.get('list_of_cities',
default_var=['default_city'],
deserialize_json=True)
print(f'Updated LIST: {updated_list}')
with TaskGroup('dynamic_tasks_group',
prefix_group_id=False,
) as dynamic_tasks_group:
for index, city in enumerate(updated_list):
say_hello = PythonOperator(
task_id=f'say_hello_from_{city}',
python_callable=_say_hello,
op_kwargs={'city_name': city}
)
# DAG level dependencies
read_file >> dynamic_tasks_group
In the Scheduler logs, you will only find:
INFO - Updated LIST: ['London', 'Paris', 'BA', 'NY']
Dag Graph View:
With this approach, the top-level code, hence read by the Scheduler continuously, is the call to Variable.get()
method. If you need to read from many variables, it's important to remember that it's recommended to store them in one single JSON value to avoid constantly create connections to the metadata database (example in this article).
Update:
- As for 11-2021 this approach is considered a "quick and dirty" kind of solution.
- Does it work? Yes, totally. Is it production quality code? No.
- What's wrong with it? The DB is accessed every time the Scheduler parses the file, by default every 30 seconds, and has nothing to do with your DAG execution. Full details on Airflow Best practices, top-level code.
- How can this be improved? Consider if any of the recommended ways about dynamic DAG generation applies to your needs.
Airflow dynamic tasks at runtime
It it not possible to modify the DAG during its execution (without a lot more work).
The dag = DAG(...
is picked up in a loop by the scheduler. It will have task instance 'python_operator'
in it. That task instance gets scheduled in a dag run, and executed by a worker or executor. Since DAG models in the Airflow DB are only updated by the scheduler these added dummy tasks will not be persisted to the DAG nor scheduled to run. They will be forgotten when the worker exits. Unless you copy all the code from the scheduler regarding persisting & updating the model… but that will be undone the next time the scheduler visits the DAG file for parsing, which could be happening once a minute, once a second or faster depending how many other DAG files there are to parse.
Airflow actually wants each DAG to approximately stay the same layout between runs. It also wants to reload/parse DAG files constantly. So though you could make a DAG file that on each run determines the tasks dynamically based on some external data (preferably cached in a file or pyc module, not network I/O like a DB lookup, you'll slow down the whole scheduling loop for all the DAGs) it's not a good plan as your graph and tree view will get all confusing, and your scheduler parsing will be more taxed by that lookup.
You could make the callable run each task…
def make_tasks(context):
du1 = DummyOperator(task_id='dummy1', dag=dag)
du2 = DummyOperator(task_id='dummy2', dag=dag)
du3 = DummyOperator(task_id='dummy3', dag=dag)
du1.execute(context)
du2.execute(context)
du3.execute(context)
p = PythonOperator(
provides_context=true,
But that's sequential, and you have to work out how to use python to make them parallel (use futures?) and if any raise an exception the whole task fails. Also it is bound to one executor or worker so not using airflow's task distribution (kubernetes, mesos, celery).
The other way to work with this is to add a fixed number of tasks (the maximal number), and use the callable(s) to short circuit the unneeded tasks or push arguments with xcom for each of them, changing their behavior at run time but not changing the DAG.
Related Topics
How to Assign Values to a Numpy Array as a Function of Index
Change Specific Value in CSV File Via Python
Subtract a Value from Every Number in a List in Python
How to Run an .Ipynb Jupyter Notebook from Terminal
Python Opencv Cv2 - How to Increase the Brightness and Contrast of an Image by 100%
Change CSV Name to CSV Date Time Python
Numpy: How to Pick Rows from Two 2D Arrays Based on Conditions in 1D Arrays
Typing Greek Letters etc. in Plots
Saving Numpy Array to Txt File Row Wise
How to Iterate Over a Timespan After Days, Hours, Weeks and Months
Merging Two Dataframes With Different Lengths
High Pass Filter for Image Processing in Python by Using Scipy/Numpy
Adding Columns to Dataframe Based on File Name in Python
How to Disable the Security Certificate Check in Python Requests
Moving Position of Character Within an Item in List
Check If Key Exists in a Python Dict in Jinja2 Templates
How to Make a Discord Bot Leave a Server from a Command in Another Server