DagsterDocs
Quick search

A Single-Solid Pipeline#

You can find the code for this example on Github

Solids and Pipelines#

Dagster's core abstractions are solids and pipelines.

Solids are individual units of computation that we wire together to form pipelines. By default, all solids in a pipeline execute in the same process. In production environments, Dagster is usually configured so that each solid executes in its own process.

In this section, we'll cover how to define a simple pipeline with a single solid, and then execute it.

The Cereal Dataset#

Our pipeline will operate on a simple but scary CSV dataset, cereal.csv, which contains nutritional facts about 80 breakfast cereals.

If you haven't already, grab the dataset for this tutorial from Github:

curl -O https://raw.githubusercontent.com/dagster-io/dagster/master/examples/docs_snippets/docs_snippets/intro_tutorial/cereal.csv

Hello, Solid!#

Let's write our first Dagster solid and save it as hello_cereal.py.

A solid is a unit of computation in a data pipeline. Typically, you'll define solids by annotating ordinary Python functions with the @solid decorator.

The logic in our first solid is very straightforward: it just reads in the csv from a hardcoded path and logs the number of rows it finds.

import csv
import os

from dagster import pipeline, solid


@solid
def hello_cereal(context):
    # Assumes the dataset is in the same directory as this Python file
    dataset_path = os.path.join(os.path.dirname(__file__), "cereal.csv")
    with open(dataset_path, "r") as fd:
        # Read the rows in using the standard csv library
        cereals = [row for row in csv.DictReader(fd)]

    context.log.info(f"Found {len(cereals)} cereals")

In this simple case, our solid takes no arguments except for the context in which it executes (provided by the Dagster framework as the first argument to every solid), and also returns no outputs. Don't worry, we'll soon encounter solids that are much more dynamic.

Hello, Pipeline!#

To execute our solid, we'll embed it in an equally simple pipeline. A pipeline is a set of solids arranged into a DAG of computation. You'll typically define pipelines by annotating ordinary Python functions with the @pipeline decorator.

@pipeline
def hello_cereal_pipeline():
    hello_cereal()

Here you'll see that we call hello_cereal(). This call doesn't actually execute the solid. Within the bodies of functions decorated with @pipeline, we use function calls to indicate the dependency structure of the solids making up the pipeline. Here, we indicate that the execution of hello_cereal doesn't depend on any other solids by calling it with no arguments.

Executing Our First Pipeline#

Assuming you’ve saved this pipeline as hello_cereal.py, you can execute it via any of three different mechanisms:

Dagit#

To visualize your pipeline (which only has one node) in Dagit, from the directory in which you've saved the pipeline file, just run:

dagit -f hello_cereal.py

You'll see output like

Loading repository... Serving on http://127.0.0.1:3000

You should be able to navigate to http://127.0.0.1:3000/pipeline/hello_cereal_pipeline/ in your web browser and view your pipeline. It isn't very interesting yet, because it only has one node.

hello_cereal_figure_one.png

Click on the "Playground" tab and you'll see the view below.

hello_cereal_figure_two.png

The large upper left pane is empty here, but, in pipelines with parameters, this is where you'll be able to edit pipeline configuration on the fly.

Click the "Launch Execution" button on the bottom right to execute this plan directly from Dagit. A new window should open, and you'll see a much more structured view of the stream of Dagster events start to appear in the left-hand pane.

If you have pop-up blocking enabled, you may need to tell your browser to allow pop-ups from 127.0.0.1—or, just navigate to the "Runs" tab to see this, and every run of your pipeline.

hello_cereal_figure_three.png

In this view, you can filter and search through the logs corresponding to your pipeline run.

Dagster CLI#

From the directory in which you've saved the pipeline file, just run:

dagster pipeline execute -f hello_cereal.py

You'll see the full stream of events emitted by Dagster appear in the console, including our call to the logging machinery, which will look like:

2021-02-05 08:50:25 - dagster - INFO - system - ce5d4576-2569-44ff-a14a-51010eea5329 - hello_cereal - Found 77 cereals

Success!

Python API#

If you'd rather execute your pipelines as a script, you can do that without using the Dagster CLI at all. Just add a few lines to hello_cereal.py

from dagster import execute_pipeline

if __name__ == "__main__":
    result = execute_pipeline(hello_cereal_pipeline)

Now you can just run:

python hello_cereal.py

The execute_pipeline() function called here is the core Python API for executing Dagster pipelines from code.