Advanced: Dagster Types#

You can find the code for this example on Github

Verifying Op Outputs and Inputs#

Dagster lets developers express what they expect their op inputs and outputs to look like through Dagster Types.

The dagster type system is gradual and optional - jobs can run without types specified explicitly, and specifying types in some places doesn't require that types be specified everywhere.

Dagster type-checking happens at op execution time - each type defines a type_check_fn that knows how to check whether values match what it expects.

  • When a type is specified for an op's input, then the type check occurs immediately before the op is executed.
  • When a type is specified for an op's output, then the type check occurs immediately after the op is executed.

Let's look back at our simple download_csv op.

@op
def download_csv():
    response = requests.get("https://docs.dagster.io/assets/cereal.csv")
    lines = response.text.split("\n")
    get_dagster_logger().info(f"Read {len(lines)} lines")
    return [row for row in csv.DictReader(lines)]

The object returned by Python's built-in csv.DictReader is a list of collections.OrderedDict, each of which represents one row of the dataset:

[
    OrderedDict([
        ('name', '100% Bran'), ('mfr', 'N'), ('type', 'C'), ('calories', '70'), ('protein', '4'),
        ('fat', '1'), ('sodium', '130'), ('carbo', '5'), ('sugars', '6'), ('potass', '280'),
        ('vitamins', '25'), ('shelf', '3'), ('weight', '1'), ('cups', '0.33'),
        ('rating', '68.402973')
    ]),
    OrderedDict([
        ('name', '100% Natural Bran'), ('mfr', 'Q'), ('type', 'C'), ('calories', '120'),
        ('protein', '3'), ('fat', '5'), ('sodium', '15'), ('fiber', '2'), ('carbo', '8'),
        ('sugars', '8'), ('potass', '135'), ('vitamins', '0'), ('shelf', '3'), ('weight', '1'),
        ('cups', '1'), ('rating', '33.983679')
    ]),
    ...
]

This is a simple representation of a "data frame", or a table of data. We'd like to be able to use Dagster's type system to type the output of download_csv, so that we can do type checking when we construct the job, ensuring that any op consuming the output of download_csv expects to receive data in this format.

Constructing a Dagster Type#

To do this, we'll construct a DagsterType that verifies an object is a list of dictionaries.

def is_list_of_dicts(_, value):
    return isinstance(value, list) and all(
        isinstance(element, dict) for element in value
    )


SimpleDataFrame = DagsterType(
    name="SimpleDataFrame",
    type_check_fn=is_list_of_dicts,
    description="A naive representation of a data frame, e.g., as returned by csv.DictReader.",
)

Now we can annotate the rest of our job with our new type:

@op(out=Out(SimpleDataFrame))
def download_csv():
    response = requests.get("https://docs.dagster.io/assets/cereal.csv")
    lines = response.text.split("\n")
    get_dagster_logger().info(f"Read {len(lines)} lines")
    return [row for row in csv.DictReader(lines)]


@op(ins={"cereals": In(SimpleDataFrame)})
def sort_by_calories(cereals):
    sorted_cereals = sorted(cereals, key=lambda cereal: cereal["calories"])
    get_dagster_logger().info(f'Most caloric cereal: {sorted_cereals[-1]["name"]}')

The type metadata now appears in Dagit and the system will ensure the input and output to this op indeed match the criteria for SimpleDataFrame. As usual, run:

dagit -f custom_types.py
custom_types_figure_one.png

You can see that the output of download_csv (which by default has the name result) is marked to be of type SimpleDataFrame.


When Type Checks Fail#

Now, if our op logic fails to return the right type, we'll see a type check failure, which will fail the job. Let's replace our download_csv op with the following bad logic:

@op(out=Out(SimpleDataFrame))
def bad_download_csv():
    response = requests.get("https://docs.dagster.io/assets/cereal.csv")
    lines = response.text.split("\n")
    get_dagster_logger().info(f"Read {len(lines)} lines")
    return ["not_a_dict"]

When we run the job with this op, we'll see an error in your terminal like:

2021-10-18 13:15:37 - dagster - ERROR - custom_type_job - 66d26360-84bc-41a3-8848-fba271354673 - 16200 - bad_download_csv - STEP_FAILURE - Execution of step "bad_download_csv" failed.

dagster.core.errors.DagsterTypeCheckDidNotPass: Type check failed for step output "result" - expected type "SimpleDataFrame".

We will also see the error message in Dagit:

custom_types_2_dagit_error_message.png

Metadata and Custom Type Checks#

Custom types can also yield metadata about the type check. For example, in the case of our data frame, we might want to record the number of rows and columns in the dataset when our type checks succeed, and provide more information about why type checks failed when they fail. User-defined type check functions can optionally return a TypeCheck object that contains metadata about the success or failure of the type check. Let's see how to use this to emit some summary statistics about our DataFrame type:

def less_simple_data_frame_type_check(_, value):
    if not isinstance(value, list):
        return TypeCheck(
            success=False,
            description=f"LessSimpleDataFrame should be a list of dicts, got {type(value)}",
        )

    fields = [field for field in value[0].keys()]

    for i in range(len(value)):
        row = value[i]
        idx = i + 1
        if not isinstance(row, dict):
            return TypeCheck(
                success=False,
                description=(
                    f"LessSimpleDataFrame should be a list of dicts, got {type(row)} for row {idx}"
                ),
            )
        row_fields = [field for field in row.keys()]
        if fields != row_fields:
            return TypeCheck(
                success=False,
                description=(
                    f"Rows in LessSimpleDataFrame should have the same fields, got {row_fields} "
                    f"for row {idx}, expected {fields}"
                ),
            )

    return TypeCheck(
        success=True,
        description="LessSimpleDataFrame summary statistics",
        metadata={
            "n_rows": len(value),
            "n_cols": len(value[0].keys()) if len(value) > 0 else 0,
            "column_names": str(list(value[0].keys()) if len(value) > 0 else []),
        },
    )

A TypeCheck must include a success argument describing whether the check passed or failed, and may include a description and/or a list of EventMetadataEntry objects. You should use the static constructors on EventMetadataEntry to construct these objects, which are flexible enough to support arbitrary metadata in JSON or Markdown format.

Dagit knows how to display and archive structured metadata of this kind for future review:

custom_types_figure_two.png