Graphs

The core of a job is a _graph_ of ops - connected via data dependencies.

@dagster.graph(name=None, description=None, input_defs=None, output_defs=None, ins=None, out=None, tags=None, config=None)[source]

Create a graph with the specified parameters from the decorated composition function.

Using this decorator allows you to build up a dependency graph by writing a function that invokes ops (or other graphs) and passes the output to subsequent invocations.

Parameters
  • name (Optional[str]) – The name of the graph. Must be unique within any RepositoryDefinition containing the graph.

  • description (Optional[str]) – A human-readable description of the graph.

  • input_defs (Optional[List[InputDefinition]]) –

    Information about the inputs that this graph maps. Information provided here will be combined with what can be inferred from the function signature, with these explicit InputDefinitions taking precedence.

    Uses of inputs in the body of the decorated composition function will determine the InputMappings passed to the underlying GraphDefinition.

  • output_defs (Optional[List[OutputDefinition]]) –

    Output definitions for the graph. If not provided explicitly, these will be inferred from typehints.

    Uses of these outputs in the body of the decorated composition function, as well as the return value of the decorated function, will be used to infer the appropriate set of OutputMappings for the underlying GraphDefinition.

    To map multiple outputs, return a dictionary from the composition function.

  • ins (Optional[Dict[str, GraphIn]]) – Information about the inputs that this graph maps. Information provided here will be combined with what can be inferred from the function signature, with these explicit GraphIn taking precedence.

  • out

    Information about the outputs that this graph maps. Information provided here will be combined with what can be inferred from the return type signature if the function does not use yield.

    To map multiple outputs, return a dictionary from the composition function.

class dagster.GraphDefinition(name, description=None, node_defs=None, dependencies=None, input_mappings=None, output_mappings=None, config=None, tags=None, **kwargs)[source]

Defines a Dagster graph.

A graph is made up of

  • Nodes, which can either be an op (the functional unit of computation), or another graph.

  • Dependencies, which determine how the values produced by nodes as outputs flow from one node to another. This tells Dagster how to arrange nodes into a directed, acyclic graph (DAG) of compute.

End users should prefer the @graph decorator. GraphDefinition is generally intended to be used by framework authors or for programatically generated graphs.

Parameters
  • name (str) – The name of the graph. Must be unique within any GraphDefinition or JobDefinition containing the graph.

  • description (Optional[str]) – A human-readable description of the pipeline.

  • node_defs (Optional[List[NodeDefinition]]) – The set of ops / graphs used in this graph.

  • dependencies (Optional[Dict[Union[str, NodeInvocation], Dict[str, DependencyDefinition]]]) – A structure that declares the dependencies of each op’s inputs on the outputs of other ops in the graph. Keys of the top level dict are either the string names of ops in the graph or, in the case of aliased ops, NodeInvocations. Values of the top level dict are themselves dicts, which map input names belonging to the op or aliased op to DependencyDefinitions.

  • input_mappings (Optional[List[InputMapping]]) – Defines the inputs to the nested graph, and how they map to the inputs of its constituent ops.

  • output_mappings (Optional[List[OutputMapping]]) – Defines the outputs of the nested graph, and how they map from the outputs of its constituent ops.

  • config (Optional[ConfigMapping]) – Defines the config of the graph, and how its schema maps to the config of its constituent ops.

  • tags (Optional[Dict[str, Any]]) – Arbitrary metadata for any execution of the graph. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value. These tag values may be overwritten by tag values provided at invocation time.

Examples

@op
def return_one():
    return 1

@op
def add_one(num):
    return num + 1

graph_def = GraphDefinition(
    name='basic',
    node_defs=[return_one, add_one],
    dependencies={'add_one': {'num': DependencyDefinition('return_one')}},
)
execute_in_process(run_config=None, instance=None, resources=None, raise_on_error=True, op_selection=None, run_id=None, input_values=None)[source]

Execute this graph in-process, collecting results in-memory.

Parameters
  • run_config (Optional[Dict[str, Any]]) – Run config to provide to execution. The configuration for the underlying graph should exist under the “ops” key.

  • instance (Optional[DagsterInstance]) – The instance to execute against, an ephemeral one will be used if none provided.

  • resources (Optional[Dict[str, Any]]) – The resources needed if any are required. Can provide resource instances directly, or resource definitions.

  • raise_on_error (Optional[bool]) – Whether or not to raise exceptions when they occur. Defaults to True.

  • op_selection (Optional[List[str]]) – A list of op selection queries (including single op names) to execute. For example: * ['some_op']: selects some_op itself. * ['*some_op']: select some_op and all its ancestors (upstream dependencies). * ['*some_op+++']: select some_op, all its ancestors, and its descendants (downstream dependencies) within 3 levels down. * ['*some_op', 'other_op_a', 'other_op_b+']: select some_op and all its ancestors, other_op_a itself, and other_op_b and its direct child ops.

  • input_values (Optional[Mapping[str, Any]]) – A dictionary that maps python objects to the top-level inputs of the graph.

Returns

ExecuteInProcessResult

to_job(name=None, description=None, resource_defs=None, config=None, tags=None, logger_defs=None, executor_def=None, hooks=None, op_retry_policy=None, version_strategy=None, op_selection=None, partitions_def=None, asset_layer=None, input_values=None, _asset_selection_data=None)[source]

Make this graph in to an executable Job by providing remaining components required for execution.

Parameters
  • name (Optional[str]) – The name for the Job. Defaults to the name of the this graph.

  • resource_defs (Optional[Dict[str, ResourceDefinition]]) – Resources that are required by this graph for execution. If not defined, io_manager will default to filesystem.

  • config

    Describes how the job is parameterized at runtime.

    If no value is provided, then the schema for the job’s run config is a standard format based on its solids and resources.

    If a dictionary is provided, then it must conform to the standard config schema, and it will be used as the job’s run config for the job whenever the job is executed. The values provided will be viewable and editable in the Dagit playground, so be careful with secrets.

    If a ConfigMapping object is provided, then the schema for the job’s run config is determined by the config mapping, and the ConfigMapping, which should return configuration in the standard format to configure the job.

    If a PartitionedConfig object is provided, then it defines a discrete set of config values that can parameterize the job, as well as a function for mapping those values to the base config. The values provided will be viewable and editable in the Dagit playground, so be careful with secrets.

  • tags (Optional[Dict[str, Any]]) – Arbitrary metadata for any execution of the Job. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value. These tag values may be overwritten by tag values provided at invocation time.

  • logger_defs (Optional[Dict[str, LoggerDefinition]]) – A dictionary of string logger identifiers to their implementations.

  • executor_def (Optional[ExecutorDefinition]) – How this Job will be executed. Defaults to multi_or_in_process_executor, which can be switched between multi-process and in-process modes of execution. The default mode of execution is multi-process.

  • op_retry_policy (Optional[RetryPolicy]) – The default retry policy for all ops in this job. Only used if retry policy is not defined on the op definition or op invocation.

  • version_strategy (Optional[VersionStrategy]) – Defines how each solid (and optionally, resource) in the job can be versioned. If provided, memoizaton will be enabled for this job.

  • partitions_def (Optional[PartitionsDefinition]) – Defines a discrete set of partition keys that can parameterize the job. If this argument is supplied, the config argument can’t also be supplied.

  • asset_layer (Optional[AssetLayer]) – Top level information about the assets this job will produce. Generally should not be set manually.

  • input_values (Optional[Mapping[str, Any]]) – A dictionary that maps python objects to the top-level inputs of a job.

Returns

JobDefinition

class dagster.GraphIn(description=None)[source]

Represents information about an input that a graph maps.

Parameters

description (Optional[str]) – Human-readable description of the input.

class dagster.GraphOut(description=None)[source]

Represents information about the outputs that a graph maps.

Parameters

description (Optional[str]) – Human-readable description of the output.

Explicit dependencies

class dagster.DependencyDefinition(solid=None, output='result', description=None, node=None)[source]

Represents an edge in the DAG of nodes (ops or graphs) forming a job.

This object is used at the leaves of a dictionary structure that represents the complete dependency structure of a job whose keys represent the dependent node and dependent input, so this object only contains information about the dependee.

Concretely, if the input named ‘input’ of op_b depends on the output named ‘result’ of op_a, and the output named ‘other_result’ of graph_a, the structure will look as follows:

dependency_structure = {
    'my_downstream_op': {
        'input': DependencyDefinition('my_upstream_op', 'result')
    }
    'my_downstream_op': {
        'input': DependencyDefinition('my_upstream_graph', 'result')
    }
}

In general, users should prefer not to construct this class directly or use the JobDefinition API that requires instances of this class. Instead, use the @job API:

@job
def the_job():
    node_b(node_a())
Parameters
  • solid (str) – (legacy) The name of the solid that is depended on, that is, from which the value passed between the two nodes originates.

  • output (Optional[str]) – The name of the output that is depended on. (default: “result”)

  • description (Optional[str]) – Human-readable description of this dependency.

  • node (str) – The name of the node (op or graph) that is depended on, that is, from which the value passed between the two nodes originates.

class dagster.MultiDependencyDefinition(dependencies)[source]

Represents a fan-in edge in the DAG of op instances forming a job.

This object is used only when an input of type List[T] is assembled by fanning-in multiple upstream outputs of type T.

This object is used at the leaves of a dictionary structure that represents the complete dependency structure of a job or pipeline whose keys represent the dependent ops or graphs and dependent input, so this object only contains information about the dependee.

Concretely, if the input named ‘input’ of op_c depends on the outputs named ‘result’ of op_a and op_b, this structure will look as follows:

dependency_structure = {
    'op_c': {
        'input': MultiDependencyDefinition(
            [
                DependencyDefinition('op_a', 'result'),
                DependencyDefinition('op_b', 'result')
            ]
        )
    }
}

In general, users should prefer not to construct this class directly or use the JobDefinition API that requires instances of this class. Instead, use the @job API:

@job
def the_job():
    op_c(op_a(), op_b())
Parameters

dependencies (List[Union[DependencyDefinition, Type[MappedInputPlaceHolder]]]) – List of upstream dependencies fanned in to this input.

class dagster.NodeInvocation(name, alias=None, tags=None, hook_defs=None, retry_policy=None)[source]

Identifies an instance of a node in a graph dependency structure.

Parameters
  • name (str) – Name of the solid of which this is an instance.

  • alias (Optional[str]) – Name specific to this instance of the solid. Necessary when there are multiple instances of the same solid.

  • tags (Optional[Dict[str, Any]]) – Optional tags values to extend or override those set on the solid definition.

  • hook_defs (Optional[AbstractSet[HookDefinition]]) – A set of hook definitions applied to the solid instance.

Examples:

In general, users should prefer not to construct this class directly or use the JobDefinition API that requires instances of this class. Instead, use the @job API:

from dagster import job

@job
def my_job():
    other_name = some_op.alias('other_name')
    some_graph(other_name(some_op))