

With this, we're able to pass params from a parent DAG to a triggered DAG without the need of changing too much logic to use the context params (: We'll get the following result in the sync task logs: Now if we run the Wrapper DAG passing the following config object: Ultimately, our Sync DAG has to be rewritten as follows: # Sync DAG (let's assume we have 2 like this that are pretty similar) from corators import task, dagįrom import 1, 7), catchup=False) def sync_dag(): def sync(): You can click here to visit the official docs and have a deeper insight into it. This function evaluates a string containing a Python literal, for instance, a Python dictionary. Here we're checking if the params object has a configuration property, if so, we spread the value in the first params object level as a python dictionary using the literal_eval function from the ast package. We can create a get_context_params util function: # dags/utils/common.py from ast import literal_eval To handle this, we'll need to modify our sync DAGs a little bit. Furthermore, we're receiving a string with the python dictionary instead of the dictionary. As you can imagine, the 2 Sync DAGs were built using context instead of context. We also can use the Jinja template interpolation feature that Airflow provides out of the box.

To access the params object passed to a DAG using the Trigger DAG w/config Airflow feature, we can use the params key inside the context that we retrieve using the get_current_context function. If 'run-task-a' in params and params:Įlif 'run-task-b' in params and params: # Access to context params in order to perform certain tasks Import 1, 7), catchup=False) def sync_dag(): def sync(): # Sync DAG (let's assume we have 2 like this that are pretty similar) from corators import task, dag Params = context # Access to context paramsĬreate_backup_env() > other_task() Trigger_sync_dag_2_task = def other_task(): Trigger_sync_dag_1_task = TriggerDagRunOperator(

To use the TriggerDagRunOperator, we need to define something like this: # Wrapper DAG from corators import task, dagįrom _dagrun import TriggerDagRunOperatorįrom import get_current_contextįrom datetime import 1, 7), catchup=False) def wrapper_dag(): def create_backup_env(): The SolutionįYI - I simplified the solution a lot but always kept the main components untouched. Also, these DAGs cannot be executed manually or with a scheduled interval anymore but the Wrapper DAG instead, the create-backup-env task has to always be run first for the 2 DAGs to always push data to the same env and don't push to old envs that will not be used anymore.įurthermore, the 2 DAGs can receive quite many config parameters to execute or not certain tasks using the Trigger DAG w/config feature that Airflow provides, so these parameters have to be also available in the Wrapper DAG. The proposed solution was to create a new DAG (which I'll call Wrapper from now on) that first runs this create-backup-env task and then triggers the 2 DAGs using the TriggerDagRunOperator. With this, the 2 DAGs cannot run async anymore, they have to sync the data to the same environment. If anything goes wrong, we can just switch the environment and delete the broken one.
TRIGGERDAGRUNOPERATOR AIRFLOW 2.0 EXAMPLE FREE
The sync process between the 2 data sources is not free of failures so, a new need come up, which was to first create a backup of the env and then sync the data to a new env that is a copy of the old one. Until now, both DAGs were run individually, updating the CMS environment async. Each DAG syncs a specific type of data to the same env. I had 2 DAGs that run at the same time (with the same schedule_interval) and synced data from the ERP to the CMS. I had 2 data sources, an ERP and one content environment (from now on I'll call it 'env') from a CMS (if you don't know what a CMS is, I explain a little bit about it in this post). If you want to go straight to the solution you can skip this section.
TRIGGERDAGRUNOPERATOR AIRFLOW 2.0 EXAMPLE HOW TO
Maybe I was just not experienced enough and I fell into a really easy thing to fix but, today I'll show how to do it, so you don't have to struggle as I did 🙂 let's get into it. So I was in this situation, struggling for like 5 hours yesterday (yes, the last 5 Friday work hours, the best ones to get stuck with some code) trying to pass parameters using the TriggerDagRunOperator, and wanting to die but at the end achieving it.
