The software-defined asset APIs sit atop of the graph/job/op APIs and enable a novel approach to orchestration that puts assets at the forefront.
In Dagster, an "asset" is a data product, an object produced by a data pipeline. Some examples are tables, machine learning models, or reports.
Conceptually, software-defined assets invert the typical relationship between assets and computation. Instead of defining a graph of ops and recording which assets those ops end up materializing, you define a set of assets. Each asset knows how to compute its contents from upstream assets.
Taking a software-defined asset approach has a few main benefits:
@graph
/ @job
to wire up dependencies.In this example, we'll define some tables with dependencies on each other. We have a table of temperature samples collected in five-minute increments, and we want to compute a table of the highest temperatures for each day.
Here are our asset definitions that define tables we want to materialize.
import pandas as pd
from pandas import DataFrame
from dagster import AssetKey, SourceAsset, asset
sfo_q2_weather_sample = SourceAsset(
key=AssetKey("sfo_q2_weather_sample"),
description="Weather samples, taken every five minutes at SFO",
metadata={"format": "csv"},
)
@asset
def daily_temperature_highs(sfo_q2_weather_sample: DataFrame) -> DataFrame:
"""Computes the temperature high for each day"""
sfo_q2_weather_sample["valid_date"] = pd.to_datetime(sfo_q2_weather_sample["valid"])
return sfo_q2_weather_sample.groupby("valid_date").max().rename(columns={"tmpf": "max_tmpf"})
@asset
def hottest_dates(daily_temperature_highs: DataFrame) -> DataFrame:
"""Computes the 10 hottest dates"""
return daily_temperature_highs.nlargest(10, "max_tmpf")
sfo_q2_weather_sample
represents our base temperature table. It's a SourceAsset
, meaning that we rely on it, but don't generate it.
daily_temperature_highs
represents a computed asset. It's derived by taking the sfo_q2_weather_sample
table and applying the decorated function to it. Notice that it's defined using a pure function, a function with no side effects, just logical data transformation. The code for storing and retrieving the data in persistent storage will be supplied later on in an IOManager
. This allows us to swap in different implementations in different environments. For example, in local development, we might want to store data in a local CSV file for easy testing. However in production, we would want to store data in a data warehouse.
hottest_dates
is a computed asset that depends on another computed asset, daily_temperature_highs
.
The framework infers asset dependencies by looking at the names of the arguments to the decorated functions. The function that defines the daily_temperature_highs
asset has an argument named sfo_q2_weather_sample
, which corresponds to the asset definition of the same name.
Having defined some assets, we can combine them into an AssetGroup
, which allows working with them in Dagit. It also allows combining them with resources and IO managers that determine how they're stored and connect them to external services.
It's common to use a utility like AssetGroup.from_module
or AssetGroup.from_package_name
to pick up all the assets within a module or package, so you don't need to list them individually.
# imports the module called "assets" from the package containing the current module
# the "assets" module contains the asset definitions
from . import assets
weather_assets = AssetGroup.from_modules(
modules=[assets],
resource_defs={
"io_manager": IOManagerDefinition.hardcoded_io_manager(LocalFileSystemIOManager())
},
)
The order that we supply the assets when constructing an AssetGroup
doesn't matter, since the dependencies are determined by each asset definition.
The functions we used to define our assets describe how to compute their contents, but not how to read and write them to persistent storage. For reading and writing, we define an IOManager
. In this case, our LocalFileSystemIOManager
stores DataFrames as CSVs on the local filesystem:
class LocalFileSystemIOManager(IOManager):
"""Translates between Pandas DataFrames and CSVs on the local filesystem."""
def _get_fs_path(self, asset_key: AssetKey) -> str:
rpath = os.path.join(*asset_key.path) + ".csv"
return os.path.abspath(rpath)
def handle_output(self, context, obj: DataFrame):
"""This saves the dataframe as a CSV."""
fpath = self._get_fs_path(context.asset_key)
obj.to_csv(fpath)
def load_input(self, context):
"""This reads a dataframe from a CSV."""
fpath = self._get_fs_path(context.asset_key)
return pd.read_csv(fpath)
Not all the assets in a group need to have the same Python type. Here's an asset whose computation is defined using Spark DataFrames, that depends on the daily_temperature_highs
asset we defined above using Pandas.
from pyspark.sql import DataFrame as SparkDF
from pyspark.sql import Window
from pyspark.sql import functions as f
from dagster import asset
@asset
def daily_temperature_high_diffs(daily_temperature_highs: SparkDF) -> SparkDF:
"""Computes the difference between each day's high and the previous day's high"""
window = Window.orderBy("valid_date")
return daily_temperature_highs.select(
"valid_date",
(
daily_temperature_highs["max_tmpf"]
- f.lag(daily_temperature_highs["max_tmpf"]).over(window)
).alias("day_high_diff"),
)
Here's an extended version of weather_assets
that contains the new asset:
from . import assets, spark_asset
spark_weather_assets = AssetGroup.from_modules(
modules=[assets, spark_asset],
resource_defs={
"io_manager": IOManagerDefinition.hardcoded_io_manager(LocalFileSystemIOManager())
},
)
Because the same assets will be written and read into different Python types in different situations, we need to define an IOManager
that can handle both of those types. Here's an extended version of the IOManager
we defined before:
class LocalFileSystemIOManager(IOManager):
def _get_fs_path(self, asset_key: AssetKey) -> str:
return os.path.abspath(os.path.join(*asset_key.path))
def handle_output(self, context, obj: Union[PandasDF, SparkDF]):
"""This saves the dataframe as a CSV using the layout written and expected by Spark/Hadoop.
E.g. if the given storage maps the asset's path to the filesystem path "/a/b/c", a directory
will be created with two files inside it:
/a/b/c/
part-00000.csv
2 _SUCCESS
"""
if isinstance(obj, PandasDF):
directory = self._get_fs_path(context.asset_key)
os.makedirs(directory, exist_ok=True)
open(os.path.join(directory, "_SUCCESS"), "wb").close()
csv_path = os.path.join(directory, "part-00000.csv")
obj.to_csv(csv_path)
elif isinstance(obj, SparkDF):
obj.write.format("csv").options(header="true").save(
self._get_fs_path(context.asset_key), mode="overwrite"
)
else:
raise ValueError("Unexpected input type")
def load_input(self, context) -> Union[PandasDF, SparkDF]:
"""This reads a dataframe from a CSV using the layout written and expected by Spark/Hadoop.
E.g. if the given storage maps the asset's path to the filesystem path "/a/b/c", and that
directory contains:
/a/b/c/
part-00000.csv
part-00001.csv
_SUCCESS
then the produced dataframe will contain the concatenated contents of the two CSV files.
"""
if context.dagster_type.typing_type == PandasDF:
fs_path = os.path.abspath(self._get_fs_path(context.asset_key))
paths = glob.glob(os.path.join(fs_path, "*.csv"))
check.invariant(len(paths) > 0, f"No csv files found under {fs_path}")
return pd.concat(map(pd.read_csv, paths))
elif context.dagster_type.typing_type == SparkDF:
return (
SparkSession.builder.getOrCreate()
.read.format("csv")
.options(header="true")
.load(self._get_fs_path(context.asset_key))
)
else:
raise ValueError("Unexpected input type")