Dagster includes a scheduler that allows you to run jobs at regular intervals, e.g. daily or hourly.
As before, we've defined a op and a job.
import csv
from datetime import datetime
import requests
from dagster import get_dagster_logger, job, op, repository, schedule
@op
def hello_cereal(context):
response = requests.get("https://docs.dagster.io/assets/cereal.csv")
lines = response.text.split("\n")
cereals = [row for row in csv.DictReader(lines)]
date = context.op_config["date"]
get_dagster_logger().info(f"Today is {date}. Found {len(cereals)} cereals.")
@job
def hello_cereal_job():
hello_cereal()
Suppose that we need to run our simple cereal job every morning before breakfast, at 6:45 AM. To do this, we'll write a ScheduleDefinition
to define our schedule. We can either directly construct a ScheduleDefinition
, or use one of the included schedule decorators.
In this example, we'll use the @schedule
decorator, which runs a schedule once a day at a specified time.
The decorated function should return the run_config
needed to run the schedule at the specified execution time. The function is passed the datetime for which the schedule is running.
@schedule(
cron_schedule="45 6 * * *",
job=hello_cereal_job,
execution_timezone="US/Central",
)
def good_morning_schedule(context):
date = context.scheduled_execution_time.strftime("%Y-%m-%d")
return {"ops": {"hello_cereal": {"config": {"date": date}}}}
To complete the picture, we'll need to add the schedule definition to the list of definitions returned from our repository.
@repository
def hello_cereal_repository():
return [hello_cereal_job, good_morning_schedule]
Now, we can load Dagit to view the schedule, start and stop it, and monitor the runs it creates:
dagit -f scheduler.py
The left sidebar of Dagit displays a clock icon next to each job with a schedule.
Clicking on the clock icon will take us to the Schedules view. From here, we can turn on the schedule by pressing the toggle button, at which point Dagit will show us that the schedule is running and will next execute tomorrow at 6:45 AM.
Scheduled runs are launched from a long-running dagster-daemon process. To start the dagster-daemon
process, run the following command in the same folder as your workspace.yaml
file:
dagster-daemon run
This process will periodically check for any running schedules and launch their associated runs. If you leave this process running, it will launch a new run for your schedule each day at the expected time.
If you need to customize the times at which the schedule should execute, you can pass a function as the should_execute
argument to ScheduleDefinition
.
For example, we can define a filter that only returns True
on weekdays:
def weekday_filter(_context):
weekno = datetime.today().weekday()
# Returns true if current day is a weekday
return weekno < 5
If we combine this should_execute
filter with a schedule that runs at 6:45am every day, then we'll have a schedule that runs at 6:45am only on weekdays.
@schedule(
cron_schedule="45 6 * * *",
job=hello_cereal_job,
execution_timezone="US/Central",
should_execute=weekday_filter,
)
def good_weekday_morning_schedule(context):
date = context.scheduled_execution_time.strftime("%Y-%m-%d")
return {"ops": {"hello_cereal": {"inputs": {"date": {"value": date}}}}}