Triggers¶
In simple terms, a trigger is what contains the logic for the scheduler to execute. Internal tasks contain their own trigger which checks when a task should be the next to run.
There are five types of triggers available within the Asyncz.
All the triggers subclass the BaseTrigger and any custom trigger should be the same.
DateTrigger¶
The DateTrigger is the simplest way of scheduling a task. It schedules a task to be executed once at a given time.
alias - date
Parameters¶
-
run_at - The date/time to run the task at.
Default:
None
-
timezone - The time zone for the run_at if it does not have one already.
Default: None
Examples¶
from datetime import date
from loguru import logger
from asyncz.schedulers import AsyncIOScheduler
scheduler = AsyncIOScheduler()
def my_task(number):
logger.info(number)
# Execute the task on December 25th, 2022
scheduler.add_task(my_task, run_at=date(2022, 12, 24), args=[25])
scheduler.start()
You can run the trigger by passing a date as text as well.
from loguru import logger
from asyncz.schedulers import AsyncIOScheduler
scheduler = AsyncIOScheduler()
def my_task(number):
logger.info(number)
# Execute the task on December 25th, 2022
scheduler.add_task(my_task, run_at="2022-12-25 00:01:00", args=[25])
scheduler.start()
Do you want to run immediatly after adding the task? Don't specify any date then.
from loguru import logger
from asyncz.schedulers import AsyncIOScheduler
scheduler = AsyncIOScheduler()
def my_task(number):
logger.info(number)
# Execute the task on December 25th, 2022
scheduler.add_task(my_task, args=[25])
scheduler.start()
IntervalTrigger¶
If you want to run the task periodically, this is the trigger for you then. The advantage of this
trigger is that you can specify also the start_at
and end_at
making it more custom for your
needs.
alias - interval
Parameters¶
-
weeks - Number of weeks to wait.
Default:
0
-
days - Number of days to wait.
Default:
0
-
hours - Number of hours to wait.
Default:
0
-
minutes - Number of minutes to wait.
Default:
0
-
seconds - Number of seconds to wait.
Default:
0
-
start_at - Starting point for the interval calculation.
Default:
None
-
end_at - Latest possible date/time to trigger on.
Default:
None
-
timezone - Time zone to use gor the date/time calculations.
Default:
None
-
jitter - Delay the task execution by jitter seconds at most.
Default:
None
Examples¶
from loguru import logger
from asyncz.schedulers import AsyncIOScheduler
scheduler = AsyncIOScheduler()
def my_task():
logger.info("My task working")
# Run every 5 hours
scheduler.add_task(my_task, "interval", hours=5)
scheduler.start()
Use the start_at
and end_at
to provide a limit in which the scheduler should run.
from loguru import logger
from asyncz.schedulers import AsyncIOScheduler
scheduler = AsyncIOScheduler()
def my_task():
logger.info("My task working")
# Run every 5 hours and limits the window
scheduler.add_task(
my_task, "interval", hours=5, start_at="2022-12-24 00:00:00", end_at="2022-12-25 23:59:59"
)
scheduler.start()
What about using the scheduled_task
decorator?
from loguru import logger
from asyncz.schedulers import AsyncIOScheduler
scheduler = AsyncIOScheduler()
# Run every 5 hours
@scheduler.scheduled_task("interval", hours=5, id="my_task_id")
def my_task():
logger.info("My task working")
scheduler.start()
What is the jitter? A simple random component that can be added to the execution. This can be useful if there are multiple server running tasks and you don't want them to run the same task at the exact same time or to prevent the same task to run concurrently.
from loguru import logger
from asyncz.schedulers import AsyncIOScheduler
scheduler = AsyncIOScheduler()
def my_task():
logger.info("My task working")
# Run every 5 hours and limits the window
scheduler.add_task(my_task, "interval", hours=5, jitter=200)
scheduler.start()
CronTrigger¶
This is similar to a UNIX cron like system and the most powerful trigger in Asyncz. The possiblities of this cron are endless and this can be also frightning if you are not familiar with how UNIX crons operate.
Tip
Get familiar with the
UNIX cron and take advantage of the CronTrigger
with ease.
A visual explanation of how a UNIX cron is might also help.
┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of the month (1 - 31)
│ │ │ ┌───────────── month (1 - 12)
│ │ │ │ ┌───────────── day of the week (0 - 6) (Sunday to Saturday;
│ │ │ │ │ 7 is also Sunday on some systems)
│ │ │ │ │
│ │ │ │ │
* * * * * <command to execute>
Like the IntervalTrigger, you can also specify a start_at
and an end_at
parameters.
Since Asyncz is APScheduled revamped, it also inherited the CronTrigger in the way it was beautifully done which means you can omit fields that you don't need.
alias - cron
Parameters¶
-
year - 4-digit value.
Default:
None
-
month - Month (1-12).
Default:
None
-
day - Day of the month (1-31).
Default:
None
-
week - ISO week (1-53).
Default:
None
-
day_of_week - Number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun).
Default:
None
-
hour - Hour (0-23).
Default:
None
-
minute - Minute (0-59).
Default:
None
-
second - Second (0-59).
Default:
None
-
start_at - Earliest possible date/time to trigger on (inclusive).
Default:
None
-
end_at - Latest possible date/time to trier on (inclusive).
Default:
None
-
timezone - Time zone to use for the date/time calculations (defaults to scheduler timezone).
Default:
None
-
jitter - Delay the task executions by jitter seconds at most.
Default:
None
Expressions¶
The table below lists all the available expressions for the use in the fields from year to second.
Multiple expressions can be given in a single field (comma separated).
Expression | Field | Description |
---|---|---|
* |
any | Fire on every clause |
*/n |
any | For every n values, staring from the minimum |
a-n |
any | Fire on any value within the a-n range (a must be smaller than n) |
a-b/n |
any | Fire every n values within the a-b range |
xth y |
day | Fire on the x -th occurrence of weekday y within the month |
last x |
day | Fire on the last occurrence of weekday x within the month |
last |
day | Fire on the last day within the month |
x,y,z |
any | Fire on any matching expression; can combine any number of any of the above expressions |
Info
The month
and day_of_week
accept abbreviated English month and weekday names.
Example: jan
- dec
and mon
- sun
respectively.
Daylight saving time behaviour¶
As mentioned numerous times, Asyncz comes from APScheduler and therefore similar behaviours were implemented and with also means the cron trigger works with the so-called "wall clock" time. If the selected time zone observes DST (daylight saving time), you should be aware that it may cause unexpected behavior with the cron trigger when entering or leaving DST.
Example of a problematic schedule:
# In the Europe/London timezone, this will not execute at all on the last sunday morning of March
# Likewise, it will execute twice on the last sunday morning of October
scheduler.add_task(my_task, 'cron', hour=4, minute=25)
Examples¶
from loguru import logger
from asyncz.schedulers import AsyncIOScheduler
scheduler = AsyncIOScheduler()
def my_task():
logger.info("Hello, world!")
# Schedules task_function to be run on the third Friday
# of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00
scheduler.add_task(my_task, "cron", month="6-8,11-12", day="3rd fri", hour="0-3")
scheduler.start()
Use the start_at
and end_at
to provide a limit in which the scheduler should run.
from loguru import logger
from asyncz.schedulers import AsyncIOScheduler
scheduler = AsyncIOScheduler()
def my_task():
logger.info("Hello, world!")
# Runs from Monday to Friday at 1:00 (am) until 2022-12-25 00:00:00
scheduler.add_task(my_task, "cron", day_of_week="mon-fri", hour=1, end_at="2022-12-25")
scheduler.start()
What about using the scheduled_task
decorator?
from loguru import logger
from asyncz.schedulers import AsyncIOScheduler
scheduler = AsyncIOScheduler()
@scheduler.scheduled_task("cron", hours=1, id="my_task_id", day="last sat")
def my_task():
logger.info("My task working")
scheduler.start()
What about some jitter?
from loguru import logger
from asyncz.schedulers import AsyncIOScheduler
scheduler = AsyncIOScheduler()
def my_task():
logger.info("My task working")
scheduler.add_task(my_task, "cron", hours="*", jitter=200)
scheduler.start()
Combination¶
The combinator is a special instance that allows joining different triggers in one place. Imagine
in SQL when you use the AND
and OR
operators. This works in a similar fashion.
There are two combinators available, the OrTrigger and the AndTrigger.
AndTrigger¶
Always returns the earliest next trigger time that all the given trigger can agree on. The trigger is considered to be finished when any of the given triggers has finished the schedule.
alias - and
Parameters¶
- triggers - List of triggers to combine.
-
jitter - Delay the task execution by the jitter seconds at most.
Default:
None
Examples¶
from loguru import logger
from asyncz.schedulers import AsyncIOScheduler
from asyncz.triggers import AndTrigger, CronTrigger, IntervalTrigger
scheduler = AsyncIOScheduler()
def my_task():
logger.info("Hello, world!")
# Combine the triggers
trigger = AndTrigger(triggers=[IntervalTrigger(hours=5), CronTrigger(day_of_week="mon, tue")])
# Add the trigger to the task
scheduler.add_task(my_task, trigger)
OrTrigger¶
Always returns the earliest next trigger produced by any of the given triggers. The trigger is considered to be finished when all of the given triggers have finished their schedule.
alias - or
Parameters¶
- triggers - List of triggers to combine.
-
jitter - Delay the task execution by the jitter seconds at most.
Default:
None
Examples¶
from loguru import logger
from asyncz.schedulers import AsyncIOScheduler
from asyncz.triggers import CronTrigger, OrTrigger
scheduler = AsyncIOScheduler()
def my_task():
logger.info("Hello, world!")
# Combine the triggers
trigger = OrTrigger(
triggers=[CronTrigger(day_of_week="sat", hour=5), CronTrigger(day_of_week="sun", hour=10)]
)
# Add the trigger to the task
scheduler.add_task(my_task, trigger)
ShutdownTrigger¶
The ShutdownTrigger is a special trigger: it is executed once when shutting down the scheduler and uses only the function as well as it's arguments and the store of a task.
It is useful to implement a lifecycle pattern.
from loguru import logger
from asyncz.schedulers import AsyncIOScheduler
scheduler = AsyncIOScheduler()
def my_task(): logger.info("Goodbye!")
Run on shutdown in an own thread¶
scheduler.add_task(my_task, "shutdown")
scheduler.start()
For easing the integration in lifecycle generators, the exceptions StopIteration and StopAsyncIteration are ignored.
BaseTrigger¶
All the triggers subclass the BaseTrigger and any custom trigger must be the same.
from asyncz.triggers.base import BaseTrigger
Custom trigger¶
If you see the triggers provided are not good enough for your use cases, you can always create one of your own and you must implement the get_next_trigger_time() on your custom trigger.
from datetime import datetime, tzinfo
from typing import Optional, Union
from loguru import logger
from asyncz.schedulers import AsyncIOScheduler
from asyncz.triggers.base import BaseTrigger
class CustomTrigger(BaseTrigger):
alias: str = "custom"
def get_next_trigger_time(
self, timezone: tzinfo, previous_time: datetime, now: Optional[datetime] = None
) -> Union[datetime, None]:
# Add logic for the next trigger time of the custom trigger
...
def get_info():
logger.info("info...")
# Create an instance
trigger = CustomTrigger(...)
# Create a scheduler
scheduler = AsyncIOScheduler()
# Add custom trigger
scheduler.add_task(get_info, trigger)
Alias¶
Every trigger has an alias that sometimes and based on the given examples is passed instead of the object instance itself.
- DateTrigger -
date
- IntervalTrigger -
interval
- CronTrigger -
cron
- AndTrigger -
and
- OrTrigger -
or