A Job
binds a Graph
and the resources it needs to be executable.
Jobs are created by calling GraphDefinition.to_job()
on a graph instance, or using the job
decorator.
@
dagster.
job
(name=None, description=None, resource_defs=None, config=None, tags=None, logger_defs=None, executor_def=None, hooks=None, op_retry_policy=None, version_strategy=None, partitions_def=None, input_values=None)[source]¶Creates a job with the specified parameters from the decorated graph/op invocation function.
Using this decorator allows you to build an executable job by writing a function that invokes ops (or graphs).
name (Optional[str]) – The name for the Job. Defaults to the name of the this graph.
resource_defs (Optional[Dict[str, ResourceDefinition]]) – Resources that are required by this graph for execution. If not defined, io_manager will default to filesystem.
config –
Describes how the job is parameterized at runtime.
If no value is provided, then the schema for the job’s run config is a standard format based on its ops and resources.
If a dictionary is provided, then it must conform to the standard config schema, and it will be used as the job’s run config for the job whenever the job is executed. The values provided will be viewable and editable in the Dagit playground, so be careful with secrets.
If a ConfigMapping
object is provided, then the schema for the job’s run config is
determined by the config mapping, and the ConfigMapping, which should return
configuration in the standard format to configure the job.
If a PartitionedConfig
object is provided, then it defines a discrete set of config
values that can parameterize the pipeline, as well as a function for mapping those
values to the base config. The values provided will be viewable and editable in the
Dagit playground, so be careful with secrets.
tags (Optional[Dict[str, Any]]) – Arbitrary metadata for any execution of the Job. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value. These tag values may be overwritten by tag values provided at invocation time.
logger_defs (Optional[Dict[str, LoggerDefinition]]) – A dictionary of string logger identifiers to their implementations.
executor_def (Optional[ExecutorDefinition]) – How this Job will be executed. Defaults to multiprocess_executor
.
op_retry_policy (Optional[RetryPolicy]) – The default retry policy for all ops in this job. Only used if retry policy is not defined on the op definition or op invocation.
version_strategy (Optional[VersionStrategy]) – Defines how each op (and optionally, resource) in the job can be versioned. If provided, memoizaton will be enabled for this job.
partitions_def (Optional[PartitionsDefinition]) – Defines a discrete set of partition keys that can parameterize the job. If this argument is supplied, the config argument can’t also be supplied.
input_values (Optional[Mapping[str, Any]]) – A dictionary that maps python objects to the top-level inputs of a job.
dagster.
JobDefinition
(graph_def, resource_defs=None, executor_def=None, logger_defs=None, config_mapping=None, partitioned_config=None, name=None, description=None, preset_defs=None, tags=None, hook_defs=None, op_retry_policy=None, version_strategy=None, _subset_selection_data=None, asset_layer=None, _input_values=None)[source]¶execute_in_process
(run_config=None, instance=None, partition_key=None, raise_on_error=True, op_selection=None, asset_selection=None, run_id=None, input_values=None)[source]¶Execute the Job in-process, gathering results in-memory.
The executor_def on the Job will be ignored, and replaced with the in-process executor. If using the default io_manager, it will switch from filesystem to in-memory.
(Optional[Dict[str (run_config) – The configuration for the run
Any]] – The configuration for the run
instance (Optional[DagsterInstance]) – The instance to execute against, an ephemeral one will be used if none provided.
partition_key – (Optional[str]) The string partition key that specifies the run config to execute. Can only be used to select run config for jobs with partitioned config.
raise_on_error (Optional[bool]) – Whether or not to raise exceptions when they occur.
Defaults to True
.
op_selection (Optional[List[str]]) – A list of op selection queries (including single op
names) to execute. For example:
* ['some_op']
: selects some_op
itself.
* ['*some_op']
: select some_op
and all its ancestors (upstream dependencies).
* ['*some_op+++']
: select some_op
, all its ancestors, and its descendants
(downstream dependencies) within 3 levels down.
* ['*some_op', 'other_op_a', 'other_op_b+']
: select some_op
and all its
ancestors, other_op_a
itself, and other_op_b
and its direct child ops.
input_values (Optional[Mapping[str, Any]]) – A dictionary that maps python objects to the top-level inputs of the job. Input values provided here will override input values that have been provided to the job directly.
dagster.
reconstructable
(target)[source]Create a ReconstructablePipeline
from a
function that returns a PipelineDefinition
/JobDefinition
,
or a function decorated with @pipeline
/@job
.
When your pipeline/job must cross process boundaries, e.g., for execution on multiple nodes or
in different systems (like dagstermill
), Dagster must know how to reconstruct the pipeline/job
on the other side of the process boundary.
Passing a job created with ~dagster.GraphDefinition.to_job
to reconstructable()
,
requires you to wrap that job’s definition in a module-scoped function, and pass that function
instead:
from dagster import graph, reconstructable
@graph
def my_graph():
...
def define_my_job():
return my_graph.to_job()
reconstructable(define_my_job)
This function implements a very conservative strategy for reconstruction, so that its behavior is easy to predict, but as a consequence it is not able to reconstruct certain kinds of pipelines or jobs, such as those defined by lambdas, in nested scopes (e.g., dynamically within a method call), or in interactive environments such as the Python REPL or Jupyter notebooks.
If you need to reconstruct objects constructed in these ways, you should use
build_reconstructable_job()
instead, which allows you to
specify your own reconstruction strategy.
Examples:
from dagster import job, reconstructable
@job
def foo_job():
...
reconstructable_foo_job = reconstructable(foo_job)
@graph
def foo():
...
def make_bar_job():
return foo.to_job()
reconstructable_bar_job = reconstructable(make_bar_job)
dagster.
build_reconstructable_job
(reconstructor_module_name, reconstructor_function_name, reconstructable_args=None, reconstructable_kwargs=None, reconstructor_working_directory=None)[source]¶Create a dagster.core.definitions.reconstructable.ReconstructablePipeline
.
When your job must cross process boundaries, e.g., for execution on multiple nodes or in
different systems (like dagstermill
), Dagster must know how to reconstruct the job
on the other side of the process boundary.
This function allows you to use the strategy of your choice for reconstructing jobs, so
that you can reconstruct certain kinds of jobs that are not supported by
reconstructable()
, such as those defined by lambdas, in nested scopes (e.g.,
dynamically within a method call), or in interactive environments such as the Python REPL or
Jupyter notebooks.
If you need to reconstruct jobs constructed in these ways, use this function instead of
reconstructable()
.
reconstructor_module_name (str) – The name of the module containing the function to use to reconstruct the job.
reconstructor_function_name (str) – The name of the function to use to reconstruct the job.
reconstructable_args (Tuple) – Args to the function to use to reconstruct the job. Values of the tuple must be JSON serializable.
reconstructable_kwargs (Dict[str, Any]) – Kwargs to the function to use to reconstruct the job. Values of the dict must be JSON serializable.
Examples:
# module: mymodule
from dagster import JobDefinition, job, build_reconstructable_job
class JobFactory:
def make_job(*args, **kwargs):
@job
def _job(...):
...
return _job
def reconstruct_job(*args):
factory = JobFactory()
return factory.make_job(*args)
factory = JobFactory()
foo_job_args = (...,...)
foo_job_kwargs = {...:...}
foo_job = factory.make_job(*foo_job_args, **foo_job_kwargs)
reconstructable_foo_job = build_reconstructable_job(
'mymodule',
'reconstruct_job',
foo_job_args,
foo_job_kwargs,
)