Search
  • Aldo Sula

Orchestrating Pipelines with Dagster

A complete guide on how to integrate dbt with Dagster and an automated CI/CD pipeline to deploy on an AWS Kubernetes cluster


This blog was written together with Megi Menalla and Arbin Bici.

With so many different components in an ETL project where each of them might use different technologies, orchestrating everything to work together is a challenge. The main goal is having an automated infrastructure that is reproducible, scalable and reliable. Also the data extracted loaded and transformed from the system should be accurate and of high quality.

Some open-source data orchestration tools are Airflow, Luigi, Prefect, Argo and recently Dagster, which will be the main subject of this blog post, attempt to achieve this goal.


Why Dagster?


Dagster is a tool designed to manage every step of a data oriented projects' lifecycle.

With this cloud-native orchestrator, we can define a pipeline with a few lines of code and observe its state from Dagit UI. Dagit is a web-based interface for Dagster that simplifies the pipeline execution and the debugging process.


With Dagster users can parameterize execution and directly inspect outcomes using functions that a model inputs and outputs. In addition, Dagster supports the feature of controlling the execution from various environments such as your local machine, a CI/CD pipeline, production etc.


Pipelines can also have different modes, like "test" and "prod" for resource definition at runtime. This implies that for example a pipeline in "test" mode might persist data to a local Postgres instance where as in "prod" mode the data can be stored in a cloud database service such as RDS by Amazon or BigQuery by Google.


In comparison to Airflow, Dagster is faster and easier to define pipelines as it can execute operations completely in-memory, without a database.

One of the key features that Dagster provides, that makes it a great orchestrating tool, is the effortless integration with dbt.


With Airflow the scheduling of executors are instance-wide meaning that they need to continuously load all tasks from DAGs included in the process. This leads to limited flexibility and scalability bottlenecks. Dagster's approach to pipeline execution is by running a daemon that is responsible for scheduling runs and not tasks. This allows the execution of different operations inside a repository grouped into jobs that can have different schedules.


This blog will give an overview on how to orchestrate pipelines with Dagster and transform data with dbt. The infrastructure will be provisioned in AWS with IaC on Terraform.


In this blog post we are going to:

  1. Read weather data from an API

  2. Store the JSON response on S3 (AWS Storage Service)

  3. Fetch data from S3 and store it on a Postgres database

  4. Use dbt to extract relational data from JSON records

  5. Create dimensions and facts with dbt

  6. Integrate dbt with Dagster

  7. Orchestrate tasks 1-5 with Dagster

  8. Provision infrastructure on cloud with IaC on Terraform

  9. Deploy the Dagster project on EKS


Prerequisites

  • An AWS account

  • API key for weather data

  • A programmatic IAM user to authenticate Terraform

  • An S3 bucket to store the Terraforms state

  • AWS CLI

  • Kubectl


API key

Get a free API key on OpenWeather and export it as an environment variable under the name API_KEY in your environment.


Authenticate on AWS

After downloading AWS CLI and creating an IAM user, execute the following command:

aws configure

You will be prompted to provide the access key id and the secret access key of the newly created user. Now you are is successfully authenticated in AWS.



Getting started


You can find the resources and code used in this post in our Dagster GitHub repository for the pipeline and dbt project, and Infrastructure GitHub repository for the Terraform project.



1. Read weather data from an API


The following code snippet is a Dagster operator, which is actually a node of the DAG.

This function makes a request call to the Weather API in order to retrieve the current weather data of the cities included in the cities list which is declared outside of the function.


2. Store the json response on S3 (AWS Storage Service)


Using the credentials we have from the IAM user, we are first going to authenticate to S3, and then upload the json response to an AWS S3 bucket. The bucket will be created by Terraform. More on cloud infrastructure will be covered later in this post.




3. Fetch data from S3 and store it on a Postgres database


To transform and use the fetched data, we are going to save them in Postgres database. Firstly, we need to open a connection with the database, read json files form S3 and write them one by one as records on a table. There is also the option of bulk importing the data, but this is not the scope of this blog post. The following code snippet shows the described logic:



4. Use dbt to extract relational data from json records


Now that the data is uploaded on the database, we can work with it in a sql-like fashion. At this point every json file is stored as a json record on json_file table. With dbt, we managed to extract the data, so that every attribute, some of which were nested, is saved as a separate column in the main weather_data table. In these conditions we can work with the data to extract actual information from it.



5.Create dimensions and facts with dbt


Practically, we can easily transform the data using dbt. Below we will provide and example of how a fact table is created with dbt.


dbt has its own way of examining data lineage. Find more on this topic here. As far as dbt is concerned, the following DAG is the order in which the queries are going to be executed.



6. Integrate dbt and orchestrate tasks 1-5 with Dagster


One of Dagster's main integration advantages is the possibility to orchestrate dbt alongside other technologies, such as Python and PostgreSQL in our case. Integrating dbt is as simple as defining a dbt resource with the project and profiles directory of your dbt project. In order to make the connection between our PostgreSQL instance and the dbt project we also need to define a Dagster resource that opens the connection between these two technologies. Below we will see how this resource integration is made possible inside a Dagster Job.



7. Orchestrate tasks 1-5 with Dagster


The execution of our operations is sequential meaning that each function should wait till the previous one has finished its task. With Dagster, this is possible by passing an operation as a parameter to another operation, thus creating a dependency between them. The code snippet provided below demonstrates this operation dependency:


A pipeline in Dagster can have multiple jobs and schedules where all of them are grouped in a repository which is a python function that returns a list of job and schedule functions as provided below:



As mentioned above Dagster has an easy integration with dbt. By using functions provided from dagster-dbt library, we can run dbt commands. In our case we are using dbt_run_op which is an operation that executes dbt run command.



Visual representation of the Dagster job operations in Dagit UI

8. Provision infrastructure on cloud with IaC on Terraform


We mentioned previously that the data will be stored in a bucket. This bucket, alongside other cloud resources are provisioned by another project. The Dagster project will be built as a Docker image and then pushed on ECR. From there, the image is going to be used to deploy it on a Kubernetes cluster, also provisioned by the Terraform project.

The cluster infrastructure is based on this open-source Github repo.


Architectural diagram




The above schema shows a general overview on what is going to be built on your AWS account. Terraform will create three secrets, regarding AWS credentials and the API key, that you have to provide manually.


Finally, after the cluster is up and running, you can configure kubectl to connect to the new cluster by running the following command.


aws eks --region us-east-1 update-kubeconfig \
        --name <cluster-name>

9. Deploy the Dagster project on EKS


The Dagster repository has already a Git Action that deploys automatically the Dagster project in an EKS cluster. Check the Git Action workflow in this link. However, there is also the possibility of deploying the project by following the instructions below.

We have provided a Makefile to login to the ECR, then build and push the Docker image to the repository. Following, we use kubectl to replace the default image with the newly created one. Apply the above described steps by executing:


make ecr-push

At this point, both applications, Dagster and Postgres, should be deployed in the Kubernetes cluster.

In order to see the Dagit UI and manually run the pipeline, we should make it accessible by executing the following command on the terminal.


kubectl port-forward service/dagster 3000:3000

We have now practically demonstrated that Dagster is a very easy-to-setup tool with many integrations. It makes the ETL pipeline implementation, scheduling and monitoring more straight forward and easier to debug.


For any questions, feel free to reach out to us at hello@data-max.io. We would love to hear from you.