Software-defined assets provide a declarative way to define what assets should exist and how to create them. But when it's not known what asset an op is going to materialize until the op runs, you can still create assets using AssetMaterialization
events.
"Asset" is Dagster's word for an entity, external to ops, that is mutated or created by an op. An asset might be a table in a database that an op appends to, an ML model in a model store that an op overwrites, or even a slack channel that an op writes messages to.
Op outputs often correspond to assets. For example, an op might be responsible for recreating a table, and one of its outputs might be a dataframe containing the contents of that table.
Assets can also have partitions, which refer to slices of the overall asset. The simplest example would be a table that has a partition for each day. A given op execution may simply write a single day's worth of data to that table, rather than dropping the entire table and replacing it with new data.
Dagster lets you track the interactions between ops, outputs, and assets over time and view them in the Dagit Asset Catalog. Every asset has a "key", which serves as a unique identifier for that particular entity. The act of creating or updating the contents of an asset is called a "materialization", and Dagster tracks these materializations using AssetMaterialization
events. These events can either be logged by the user at runtime, or automatically created by Dagster in cases where an AssetKey
has been referenced by an op output.
Name | Description |
---|---|
AssetMaterialization | Dagster event indicating that an op has materialized an asset. |
AssetKey | A unique identifier for a particular external asset |
There are two general patterns for dealing with assets when using Dagster:
Regardless of which pattern you are using, AssetMaterialization
events are used to communicate to Dagster that a materialization has occurred. You can create these events either by explicitly logging them at runtime, or (using an experimental interface), have Dagster automatically generate them by defining that a given op output corresponds to a given AssetKey
.
One way of recording materialization events is to log AssetMaterialization
events at runtime. These events should be co-located with your materialization logic, meaning if you store your object within your op body, then you should log from within that op, and if you store your object using an IOManager
, then you should log the event from your manager.
To make Dagster aware that we materialized an asset in our op, we can log an AssetMaterialization
event using the method OpExecutionContext.log_event
. This would involve changing the following op:
from dagster import op
@op
def my_simple_op():
df = read_df()
remote_storage_path = persist_to_storage(df)
return remote_storage_path
into something like this:
from dagster import AssetMaterialization, op
@op
def my_materialization_op(context):
df = read_df()
remote_storage_path = persist_to_storage(df)
context.log_event(
AssetMaterialization(
asset_key="my_dataset", description="Persisted result to storage"
)
)
return remote_storage_path
We should now see a materialization event in the event log when we execute a job with this op.
To record that an IOManager
has mutated or created an asset, we can log an AssetMaterialization
event from its handle_output
method. We do this via the method OutputContext.log_event
.
from dagster import AssetMaterialization, IOManager
class PandasCsvIOManager(IOManager):
def load_input(self, context):
file_path = os.path.join("my_base_dir", context.step_key, context.name)
return read_csv(file_path)
def handle_output(self, context, obj):
file_path = os.path.join("my_base_dir", context.step_key, context.name)
obj.to_csv(file_path)
context.log_event(
AssetMaterialization(
asset_key=AssetKey(file_path),
description="Persisted result to storage.",
)
)
There are a variety of types of metadata that can be associated with a materialization event, all through the MetadataEntry
class. Each materialization event optionally takes a list of metadata entries that are then displayed in the event log and the Asset Catalog.
from dagster import op, AssetMaterialization, MetadataValue
@op
def my_metadata_materialization_op(context):
df = read_df()
remote_storage_path = persist_to_storage(df)
context.log_event(
AssetMaterialization(
asset_key="my_dataset",
description="Persisted result to storage",
metadata={
"text_metadata": "Text-based metadata for this event",
"path": MetadataValue.path(remote_storage_path),
"dashboard_url": MetadataValue.url(
"http://mycoolsite.com/url_for_my_data"
),
"size (bytes)": calculate_bytes(df),
},
)
)
return remote_storage_path
from dagster import AssetMaterialization, IOManager
class PandasCsvIOManagerWithAsset(IOManager):
def load_input(self, context):
file_path = os.path.join("my_base_dir", context.step_key, context.name)
return read_csv(file_path)
def handle_output(self, context, obj):
file_path = os.path.join("my_base_dir", context.step_key, context.name)
obj.to_csv(file_path)
context.log_event(
AssetMaterialization(
asset_key=AssetKey(file_path),
description="Persisted result to storage.",
metadata={
"number of rows": obj.shape[0],
"some_column mean": obj["some_column"].mean(),
},
)
)
Check our API docs for MetadataEntry
for more details on they types of event metadata available.
If you are materializing a single slice of an asset (e.g. a single day's worth of data on a larger table), rather than mutating or creating it entirely, you can indicate this to Dagster by including the partition
argument on the object.
from dagster import op, AssetMaterialization
@op(config_schema={"date": str})
def my_partitioned_asset_op(context):
partition_date = context.op_config["date"]
df = read_df_for_date(partition_date)
remote_storage_path = persist_to_storage(df)
context.log_event(
AssetMaterialization(asset_key="my_dataset", partition=partition_date)
)
return remote_storage_path
It is fairly common for an asset to correspond to an op output. In the following simplified example, our op produces a dataframe, persists it to storage, and then passes the dataframe along as an output:
from dagster import op, Output, AssetMaterialization
@op
def my_asset_op(context):
df = read_df()
persist_to_storage(df)
context.log_event(AssetMaterialization(asset_key="my_dataset"))
return df
In this case, the AssetMaterialization
and the Output
events both correspond to the same data, the dataframe that we have created. With this in mind, we can simplify the above code, and provide useful information to the Dagster framework, by making this link between the my_dataset
asset and the output of this op explicit.
Just as there are two places in which you can log runtime AssetMaterialization
events (within an op body and within an IOManager), we provide two different interfaces for linking an op output to to an asset. Regardless of which you choose, every time the op runs and logs that output, an AssetMaterialization
event will automatically be created to record this information.
If you use an Output
event to yield your output, and specified any metadata entries on it, (see: Op Event Docs), these entries will automatically be attached to the materialization event for this asset.
For cases where you are storing your asset within the body of an op, the easiest way of linking an asset to an op output is with the asset_key
parameter on the relevant OutputDefinition
in your op. To do this, you may define a constant AssetKey
that identifies the asset you are linking.
from dagster import op, Output, Out, AssetKey
@op(out=Out(asset_key=AssetKey("my_dataset")))
def my_constant_asset_op(context):
df = read_df()
persist_to_storage(df)
return df
If you've defined a custom IOManager
to handle storing your op's outputs, the IOManager
will likely be the most natural place to define which asset a particular output will be written to. To do this, you can implement the get_output_asset_key
function on your IOManager
.
Similar to the above interface, this function takes an OutputContext
and returns an AssetKey
. The following example functions nearly identically to PandasCsvIOManagerWithMetadata
from the runtime example above.
from dagster import AssetKey, IOManager, MetadataEntry
class PandasCsvIOManagerWithOutputAsset(IOManager):
def load_input(self, context):
file_path = os.path.join("my_base_dir", context.step_key, context.name)
return read_csv(file_path)
def handle_output(self, context, obj):
file_path = os.path.join("my_base_dir", context.step_key, context.name)
obj.to_csv(file_path)
yield MetadataEntry.int(obj.shape[0], label="number of rows")
yield MetadataEntry.float(obj["some_column"].mean(), "some_column mean")
def get_output_asset_key(self, context):
file_path = os.path.join("my_base_dir", context.step_key, context.name)
return AssetKey(file_path)
When an output is linked to an asset in this way, the generated AssetMaterialization
event will contain any MetadataEntry
information yielded from the handle_output
function (in addiition to all of the metadata
specified on the corresponding Output
event).
See the IOManager docs for more information on yielding these entries from an IOManager.
If you are already specifying a get_output_asset_key
function on your IOManager
, you can optionally specify a set of partitions that this manager will be updating or creating by also defining a get_output_asset_partitions
function. If you do this, an AssetMaterialization
will be created for each of the specified partitions. One useful pattern to pass this partition information (which will likely vary each run) to the manager, is to specify the set of partitions on the configuration of the output. You can do this by providing per-output configuration on the IOManager.
Then, you can calculate the asset partitions that a particular output will correspond to by reading this output configuration in get_output_asset_partitions
:
from dagster import AssetKey, IOManager, MetadataEntry
class PandasCsvIOManagerWithOutputAssetPartitions(IOManager):
def load_input(self, context):
file_path = os.path.join("my_base_dir", context.step_key, context.name)
return read_csv(file_path)
def handle_output(self, context, obj):
file_path = os.path.join("my_base_dir", context.step_key, context.name)
obj.to_csv(file_path)
yield MetadataEntry.int(obj.shape[0], label="number of rows")
yield MetadataEntry.float(obj["some_column"].mean(), "some_column mean")
def get_output_asset_key(self, context):
file_path = os.path.join("my_base_dir", context.step_key, context.name)
return AssetKey(file_path)
def get_output_asset_partitions(self, context):
return set(context.config["partitions"])
When an op output is linked to an AssetKey
, Dagster can automatically generate lineage information that describes how this asset relates to other output-linked assets.
As a simplified example, imagine a two-op job that first scrapes some user data from an API, storing it to a table, then trains an ML model on that data, persisting it to a model store:
from dagster import op, job, AssetKey, Out
@op(out=Out(asset_key=AssetKey("my_db.users")))
def scrape_users():
users_df = some_api_call()
persist_to_db(users_df)
return users_df
@op(out=Out(asset_key=AssetKey("ml_models.user_prediction")))
def get_prediction_model(users_df):
my_ml_model = train_prediction_model(users_df)
persist_to_model_store(my_ml_model)
return my_ml_model
@job
def my_user_model_job():
get_prediction_model(scrape_users())
In this case, it's certainly fair to say that this ML model, which we have assigned the key ml_models.user_prediction
, depends on the table that we created, my_db.users
(it uses the data in the table to train the model).
Why is that? By specifying the structure of your job, you have already defined data depedencies between these ops. By linking the output of scrape_users
to the input of get_prediction_model
, we can now infer that whatever this second op outputs will be some function of its input. Furthermore, since we have linked each of these outputs to external assets, we can use this knowledge to say that the asset associated with the output of get_prediction_model
depends on the asset associated with the output of scrape_users
.
This feature is still in its early stages, but for now, this lineage information is surfaced in the Asset Catalog page for each asset ("Parent assets"):
When working with software-defined assets, the assets and their dependencies must be known at definition time. When you look at software-defined assets in Dagit, you can see exactly what assets are going to be materialized before any code runs.
Asset Materializations, on the other hand, are logged at run time. When you run an op, you find out which assets were materialized while the op is running. This allows for some flexibility, like if you wanted to determine which assets should be materialized based on the output of a previous op.