Dagster's core abstractions are solids and pipelines.
Solids are individual units of computation that we wire together to form pipelines. By default, all solids in a pipeline execute in the same process. In production environments, Dagster is usually configured so that each solid executes in its own process.
In this section, we'll cover how to define a simple pipeline with a single solid, and then execute it.
Our pipeline will operate on a simple but scary CSV dataset, cereal.csv, which contains nutritional facts about 80 breakfast cereals.
If you haven't already, grab the dataset for this tutorial from Github:
curl -O https://raw.githubusercontent.com/dagster-io/dagster/master/examples/docs_snippets/docs_snippets/intro_tutorial/cereal.csv
Let's write our first Dagster solid and save it as hello_cereal.py
.
A solid is a unit of computation in a data pipeline. Typically, you'll define solids by annotating
ordinary Python functions with the @solid
decorator.
The logic in our first solid is very straightforward: it just reads in the csv from a hardcoded path and logs the number of rows it finds.
import csv
import os
from dagster import pipeline, solid
@solid
def hello_cereal(context):
# Assumes the dataset is in the same directory as this Python file
dataset_path = os.path.join(os.path.dirname(__file__), "cereal.csv")
with open(dataset_path, "r") as fd:
# Read the rows in using the standard csv library
cereals = [row for row in csv.DictReader(fd)]
context.log.info(f"Found {len(cereals)} cereals")
In this simple case, our solid takes no arguments except for the context
in which it executes (provided by
the Dagster framework as the first argument to every solid), and also returns no outputs. Don't
worry, we'll soon encounter solids that are much more dynamic.
To execute our solid, we'll embed it in an equally simple pipeline. A pipeline is a set of solids
arranged into a DAG of computation. You'll
typically define pipelines by annotating ordinary Python functions with the @pipeline
decorator.
@pipeline
def hello_cereal_pipeline():
hello_cereal()
Here you'll see that we call hello_cereal()
. This call doesn't actually execute the
solid. Within the bodies of functions decorated with @pipeline
, we use function calls to indicate the dependency structure of the solids
making up the pipeline. Here, we indicate that the execution of hello_cereal
doesn't depend on any
other solids by calling it with no arguments.
Assuming you’ve saved this pipeline as hello_cereal.py
, you can execute it via any of three
different mechanisms:
To visualize your pipeline (which only has one node) in Dagit, from the directory in which you've saved the pipeline file, just run:
dagit -f hello_cereal.py
You'll see output like
Loading repository... Serving on http://127.0.0.1:3000
You should be able to navigate to http://127.0.0.1:3000/pipeline/hello_cereal_pipeline/ in your web browser and view your pipeline. It isn't very interesting yet, because it only has one node.
Click on the "Playground" tab and you'll see the view below.
The large upper left pane is empty here, but, in pipelines with parameters, this is where you'll be able to edit pipeline configuration on the fly.
Click the "Launch Execution" button on the bottom right to execute this plan directly from Dagit. A new window should open, and you'll see a much more structured view of the stream of Dagster events start to appear in the left-hand pane.
If you have pop-up blocking enabled, you may need to tell your browser to allow pop-ups from 127.0.0.1—or, just navigate to the "Runs" tab to see this, and every run of your pipeline.
In this view, you can filter and search through the logs corresponding to your pipeline run.
From the directory in which you've saved the pipeline file, just run:
dagster pipeline execute -f hello_cereal.py
You'll see the full stream of events emitted by Dagster appear in the console, including our call to the logging machinery, which will look like:
2021-02-05 08:50:25 - dagster - INFO - system - ce5d4576-2569-44ff-a14a-51010eea5329 - hello_cereal - Found 77 cereals
Success!
If you'd rather execute your pipelines as a script, you can do that without
using the Dagster CLI at all. Just add a few lines to
hello_cereal.py
from dagster import execute_pipeline
if __name__ == "__main__":
result = execute_pipeline(hello_cereal_pipeline)
Now you can just run:
python hello_cereal.py
The execute_pipeline()
function
called here is the core Python API for executing Dagster pipelines from code.