Software-Defined Assets (Experimental)

Software-defined assets sit on top of the graph/job/op APIs and enable a novel way of constructing Dagster jobs that puts assets at the forefront.

Conceptually, software-defined assets invert the typical relationship between assets and computation. Instead of defining a graph of ops and recording which assets those ops end up materializing, you define a set of assets, each of which knows how to compute its contents from upstream assets.

A software-defined asset combines: - An asset key, e.g. the name of a table. - A function, which can be run to compute the contents of the asset. - A set of upstream assets that are provided as inputs to the function when computing the asset.

@dagster.asset(name=None, namespace=None, ins=None, non_argument_deps=None, metadata=None, description=None, required_resource_keys=None, resource_defs=None, io_manager_key=None, compute_kind=None, dagster_type=None, partitions_def=None, partition_mappings=None, op_tags=None)[source]

Create a definition for how to compute an asset.

A software-defined asset is the combination of: 1. An asset key, e.g. the name of a table. 2. A function, which can be run to compute the contents of the asset. 3. A set of upstream assets that are provided as inputs to the function when computing the asset.

Unlike an op, whose dependencies are determined by the graph it lives inside, an asset knows about the upstream assets it depends on. The upstream assets are inferred from the arguments to the decorated function. The name of the argument designates the name of the upstream asset.

Parameters
  • name (Optional[str]) – The name of the asset. If not provided, defaults to the name of the decorated function.

  • namespace (Optional[Sequence[str]]) – The namespace that the asset resides in. The namespace + the name forms the asset key.

  • ins (Optional[Mapping[str, AssetIn]]) – A dictionary that maps input names to their metadata and namespaces.

  • non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]) – Set of asset keys that are upstream dependencies, but do not pass an input to the asset.

  • metadata (Optional[Dict[str, Any]]) – A dict of metadata entries for the asset.

  • required_resource_keys (Optional[Set[str]]) – Set of resource handles required by the op.

  • io_manager_key (Optional[str]) – The resource key of the IOManager used for storing the output of the op as an asset, and for loading it in downstream ops (default: “io_manager”).

  • compute_kind (Optional[str]) – A string to represent the kind of computation that produces the asset, e.g. “dbt” or “spark”. It will be displayed in Dagit as a badge on the asset.

  • dagster_type (Optional[DagsterType]) – Allows specifying type validation functions that will be executed on the output of the decorated function after it runs.

  • partitions_def (Optional[PartitionsDefinition]) – Defines the set of partition keys that compose the asset.

  • partition_mappings (Optional[Mapping[str, PartitionMapping]]) – Defines how to map partition keys for this asset to partition keys of upstream assets. Each key in the dictionary correponds to one of the input assets, and each value is a PartitionMapping. If no entry is provided for a particular asset dependency, the partition mapping defaults to the default partition mapping for the partitions definition, which is typically maps partition keys to the same partition keys in upstream assets.

  • op_tags (Optional[Dict[str, Any]]) – A dictionary of tags for the op that computes the asset. Frameworks may expect and require certain metadata to be attached to a op. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value.

Examples

@asset
def my_asset(my_upstream_asset: int) -> int:
    return my_upstream_asset + 1
class dagster.AssetGroup(assets, source_assets=None, resource_defs=None, executor_def=None)[source]

Defines a group of assets, along with environment information in the form of resources and an executor.

An AssetGroup can be provided to a RepositoryDefinition. When provided to a repository, the constituent assets can be materialized from Dagit. The AssetGroup also provides an interface for creating jobs from subselections of assets, which can then be provided to a ScheduleDefinition or SensorDefinition.

There can only be one AssetGroup per repository.

Parameters
  • assets (Sequence[AssetsDefinition]) – The set of software-defined assets to group.

  • source_assets (Optional[Sequence[SourceAsset]]) – The set of source assets that the software-defined may depend on.

  • resource_defs (Optional[Mapping[str, ResourceDefinition]]) – A dictionary of resource definitions. When the AssetGroup is constructed, if there are any unsatisfied resource requirements from the assets, it will result in an error. Note that the root_manager key is a reserved resource key, and will result in an error if provided by the user.

  • executor_def (Optional[ExecutorDefinition]) – The executor definition to use when re-materializing assets in this group.

Examples

from dagster import AssetGroup, asset, AssetIn, AssetKey, SourceAsset, resource

source_asset = SourceAsset("source")

@asset(required_resource_keys={"foo"})
def start_asset(context, source):
    ...

@asset
def next_asset(start_asset):
    ...

@resource
def foo_resource():
    ...

asset_group = AssetGroup(
    assets=[start_asset, next_asset],
    source_assets=[source_asset],
    resource_defs={"foo": foo_resource},
)
...
build_job(name, selection=None, executor_def=None, tags=None, description=None, _asset_selection_data=None)[source]

Defines an executable job from the provided assets, resources, and executor.

Parameters
  • name (str) – The name to give the job.

  • selection (Union[str, List[str]]) –

    A single selection query or list of selection queries to execute. For example:

    • ['some_asset_key'] select some_asset_key itself.

    • ['*some_asset_key'] select some_asset_key and all its ancestors (upstream dependencies).

    • ['*some_asset_key+++'] select some_asset_key, all its ancestors, and its descendants (downstream dependencies) within 3 levels down.

    • ['*some_asset_key', 'other_asset_key_a', 'other_asset_key_b+'] select some_asset_key and all its ancestors, other_asset_key_a itself, and other_asset_key_b and its direct child asset keys. When subselecting into a multi-asset, all of the asset keys in that multi-asset must be selected.

  • executor_def (Optional[ExecutorDefinition]) – The executor definition to use when executing the job. Defaults to the executor on the AssetGroup. If no executor was provided on the AssetGroup, then it defaults to multi_or_in_process_executor.

  • tags (Optional[Dict[str, Any]]) – Arbitrary metadata for any execution of the job. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value. These tag values may be overwritten tag values provided at invocation time.

  • description (Optional[str]) – A description of the job.

Examples

from dagster import AssetGroup

the_asset_group = AssetGroup(...)

job_with_all_assets = the_asset_group.build_job()

job_with_one_selection = the_asset_group.build_job(selection="some_asset")

job_with_multiple_selections = the_asset_group.build_job(selection=["*some_asset", "other_asset++"])
static from_current_module(resource_defs=None, executor_def=None, extra_source_assets=None)[source]

Constructs an AssetGroup that includes all asset definitions and source assets in the module where this is called from.

Parameters
  • resource_defs (Optional[Mapping[str, ResourceDefinition]]) – A dictionary of resource definitions to include on the returned asset group.

  • executor_def (Optional[ExecutorDefinition]) – An executor to include on the returned asset group.

  • extra_source_assets (Optional[Sequence[SourceAsset]]) – Source assets to include in the group in addition to the source assets found in the module.

Returns

An asset group with all the assets defined in the module.

Return type

AssetGroup

static from_modules(modules, resource_defs=None, executor_def=None, extra_source_assets=None)[source]

Constructs an AssetGroup that includes all asset definitions and source assets in the given modules.

Parameters
  • modules (Iterable[ModuleType]) – The Python modules to look for assets inside.

  • resource_defs (Optional[Mapping[str, ResourceDefinition]]) – A dictionary of resource definitions to include on the returned asset group.

  • executor_def (Optional[ExecutorDefinition]) – An executor to include on the returned asset group.

  • extra_source_assets (Optional[Sequence[SourceAsset]]) – Source assets to include in the group in addition to the source assets found in the modules.

Returns

An asset group with all the assets defined in the given modules.

Return type

AssetGroup

static from_package_module(package_module, resource_defs=None, executor_def=None, extra_source_assets=None)[source]

Constructs an AssetGroup that includes all asset definitions and source assets in all sub-modules of the given package module.

A package module is the result of importing a package.

Parameters
  • package_module (ModuleType) – The package module to looks for assets inside.

  • resource_defs (Optional[Mapping[str, ResourceDefinition]]) – A dictionary of resource definitions to include on the returned asset group.

  • executor_def (Optional[ExecutorDefinition]) – An executor to include on the returned asset group.

  • extra_source_assets (Optional[Sequence[SourceAsset]]) – Source assets to include in the group in addition to the source assets found in the package.

Returns

An asset group with all the assets in the package.

Return type

AssetGroup

static from_package_name(package_name, resource_defs=None, executor_def=None, extra_source_assets=None)[source]

Constructs an AssetGroup that includes all asset definitions and source assets in all sub-modules of the given package.

Parameters
  • package_name (str) – The name of a Python package to look for assets inside.

  • resource_defs (Optional[Mapping[str, ResourceDefinition]]) – A dictionary of resource definitions to include on the returned asset group.

  • executor_def (Optional[ExecutorDefinition]) – An executor to include on the returned asset group.

  • extra_source_assets (Optional[Sequence[SourceAsset]]) – Source assets to include in the group in addition to the source assets found in the package.

Returns

An asset group with all the assets in the package.

Return type

AssetGroup

get_base_jobs()[source]

For internal use only.

materialize(selection=None)[source]

Executes an in-process run that materializes all assets in the group.

The execution proceeds serially, in a single thread. Only supported by AssetGroups that have no executor_def or that that use the in-process executor.

Parameters

selection (Union[str, List[str]]) –

A single selection query or list of selection queries to for assets in the group. For example:

  • ['some_asset_key'] select some_asset_key itself.

  • ['*some_asset_key'] select some_asset_key and all its ancestors (upstream dependencies).

  • ['*some_asset_key+++'] select some_asset_key, all its ancestors, and its descendants (downstream dependencies) within 3 levels down.

  • ['*some_asset_key', 'other_asset_key_a', 'other_asset_key_b+'] select some_asset_key and all its ancestors, other_asset_key_a itself, and other_asset_key_b and its direct child asset keys. When subselecting into a multi-asset, all of the asset keys in that multi-asset must be selected.

Returns

The result of the execution.

Return type

ExecuteInProcessResult

prefixed(key_prefix)[source]

Returns an AssetGroup that’s identical to this AssetGroup, but with prefixes on all the asset keys. The prefix is not added to source assets.

Input asset keys that reference other assets within the group are “brought along” - i.e. prefixed as well.

Example with a single asset:

@asset
def asset1():
    ...

result = AssetGroup([asset1]).prefixed("my_prefix")
assert result.assets[0].asset_key == AssetKey(["my_prefix", "asset1"])

Example with dependencies within the list of assets:

@asset
def asset1():
    ...

@asset
def asset2(asset1):
    ...

result = AssetGroup([asset1, asset2]).prefixed("my_prefix")
assert result.assets[0].asset_key == AssetKey(["my_prefix", "asset1"])
assert result.assets[1].asset_key == AssetKey(["my_prefix", "asset2"])
assert result.assets[1].dependency_asset_keys == {AssetKey(["my_prefix", "asset1"])}

Examples with input prefixes provided by source assets:

asset1 = SourceAsset(AssetKey(["upstream_prefix", "asset1"]))

@asset
def asset2(asset1):
    ...

result = AssetGroup([asset2], source_assets=[asset1]).prefixed("my_prefix")
assert len(result.assets) == 1
assert result.assets[0].asset_key == AssetKey(["my_prefix", "asset2"])
assert result.assets[0].dependency_asset_keys == {AssetKey(["upstream_prefix", "asset1"])}
assert result.source_assets[0].key == AssetKey(["upstream_prefix", "asset1"])
to_source_assets()[source]

Returns a list of source assets corresponding to all the non-source assets in this group.

@dagster.multi_asset(outs, name=None, ins=None, non_argument_deps=None, description=None, required_resource_keys=None, compute_kind=None, internal_asset_deps=None, partitions_def=None, partition_mappings=None, op_tags=None, can_subset=False)[source]

Create a combined definition of multiple assets that are computed using the same op and same upstream assets.

Each argument to the decorated function references an upstream asset that this asset depends on. The name of the argument designates the name of the upstream asset.

Parameters
  • name (Optional[str]) – The name of the op.

  • outs – (Optional[Dict[str, Out]]): The Outs representing the produced assets.

  • ins (Optional[Mapping[str, AssetIn]]) – A dictionary that maps input names to their metadata and namespaces.

  • non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]) – Set of asset keys that are upstream dependencies, but do not pass an input to the multi_asset.

  • required_resource_keys (Optional[Set[str]]) – Set of resource handles required by the op.

  • io_manager_key (Optional[str]) – The resource key of the IOManager used for storing the output of the op as an asset, and for loading it in downstream ops (default: “io_manager”).

  • compute_kind (Optional[str]) – A string to represent the kind of computation that produces the asset, e.g. “dbt” or “spark”. It will be displayed in Dagit as a badge on the asset.

  • internal_asset_deps (Optional[Mapping[str, Set[AssetKey]]]) – By default, it is assumed that all assets produced by a multi_asset depend on all assets that are consumed by that multi asset. If this default is not correct, you pass in a map of output names to a corrected set of AssetKeys that they depend on. Any AssetKeys in this list must be either used as input to the asset or produced within the op.

  • partitions_def (Optional[PartitionsDefinition]) – Defines the set of partition keys that compose the assets.

  • partition_mappings (Optional[Mapping[str, PartitionMapping]]) – Defines how to map partition keys for this asset to partition keys of upstream assets. Each key in the dictionary correponds to one of the input assets, and each value is a PartitionMapping. If no entry is provided for a particular asset dependency, the partition mapping defaults to the default partition mapping for the partitions definition, which is typically maps partition keys to the same partition keys in upstream assets.

  • op_tags (Optional[Dict[str, Any]]) – A dictionary of tags for the op that computes the asset. Frameworks may expect and require certain metadata to be attached to a op. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value.

  • can_subset (bool) – If this asset’s computation can emit a subset of the asset keys based on the context.selected_assets argument. Defaults to False.

dagster.build_assets_job(name, assets, source_assets=None, resource_defs=None, description=None, config=None, tags=None, executor_def=None, _asset_selection_data=None)[source]

Builds a job that materializes the given assets.

The dependencies between the ops in the job are determined by the asset dependencies defined in the metadata on the provided asset nodes.

Parameters
  • name (str) – The name of the job.

  • assets (List[AssetsDefinition]) – A list of assets or multi-assets - usually constructed using the @asset() or @multi_asset() decorator.

  • source_assets (Optional[Sequence[Union[SourceAsset, AssetsDefinition]]]) – A list of assets that are not materialized by this job, but that assets in this job depend on.

  • resource_defs (Optional[Dict[str, ResourceDefinition]]) – Resource defs to be included in this job.

  • description (Optional[str]) – A description of the job.

Examples

@asset
def asset1():
    return 5

@asset
def asset2(asset1):
    return my_upstream_asset + 1

my_assets_job = build_assets_job("my_assets_job", assets=[asset1, asset2])
Returns

A job that materializes the given assets.

Return type

JobDefinition

class dagster.AssetIn(asset_key=None, metadata=None, namespace=None)[source]
class dagster.SourceAsset(key, metadata=None, io_manager_key='io_manager', description=None, partitions_def=None)[source]

A SourceAsset represents an asset that will be loaded by (but not updated by) Dagster.

key

The key of the asset.

Type

Union[AssetKey, Sequence[str], str]

metadata_entries

Metadata associated with the asset.

Type

List[MetadataEntry]

io_manager_key

The key for the IOManager that will be used to load the contents of the asset when it’s used as an input to other assets inside a job.

Type

str

description

The description of the asset.

Type

Optional[str]

partitions_def

Defines the set of partition keys that compose the asset.

Type

Optional[PartitionsDefinition]

dagster.fs_asset_io_manager IOManagerDefinition[source]

Config Schema:
base_dir (dagster.StringSource, optional)

IO manager that stores values on the local filesystem, serializing them with pickle.

Each asset is assigned to a single filesystem path, at “<base_dir>/<asset_key>”. If the asset key has multiple components, the final component is used as the name of the file, and the preceding components as parent directories under the base_dir.

Subsequent materializations of an asset will overwrite previous materializations of that asset.

If not provided via configuration, the base dir is the local_artifact_storage in your dagster.yaml file. That will be a temporary directory if not explicitly set.

So, with a base directory of “/my/base/path”, an asset with key AssetKey([“one”, “two”, “three”]) would be stored in a file called “three” in a directory with path “/my/base/path/one/two/”.

Example usage:

1. Specify a collection-level IO manager using the reserved resource key "io_manager", which will set the given IO manager on all assets in the collection.

from dagster import AssetGroup, asset, fs_asset_io_manager

@asset
def asset1():
    # create df ...
    return df

@asset
def asset2(asset1):
    return df[:5]

asset_group = AssetGroup(
    [asset1, asset2],
    resource_defs={
        "io_manager": fs_asset_io_manager.configured({"base_path": "/my/base/path"})
    },
)

2. Specify IO manager on the asset, which allows the user to set different IO managers on different assets.

from dagster import fs_io_manager, job, op, Out

@asset(io_manager_key="my_io_manager")
def asset1():
    # create df ...
    return df

@asset
def asset2(asset1):
    return df[:5]

asset_group = AssetGroup(
    [asset1, asset2],
    resource_defs={
        "my_io_manager": fs_asset_io_manager.configured({"base_path": "/my/base/path"})
    },
)