Scheduler Module
π― Purpose
Section titled βπ― PurposeβThe Scheduler Module provides functionality for scheduling and executing time-based triggers in a distributed environment. It allows for the registration of trigger handlers that can be executed either directly or through Google Cloud Pub/Sub, depending on the configuration.
This module is designed to be flexible and extensible, allowing for different trigger types and execution strategies. It provides a facade pattern implementation for easy integration with other modules.
β¨ Features
Section titled ββ¨ Featuresβ- β±οΈ Time-based trigger scheduling and execution
- π Support for different trigger types (regular and scheduled)
- π Support for different runner types (direct execution or Pub/Sub)
- π Optimistic locking to prevent duplicate processing in distributed environments
- β οΈ Error handling
- βοΈ Configurable execution strategies
- π Graceful shutdown support for clean application termination
π Usage
Section titled βπ Usageβπ Trigger Types
Section titled βπ Trigger TypesβThe Scheduler Module supports two types of triggers:
π Regular Triggers (TimeBasedTrigger)
Section titled βπ Regular Triggers (TimeBasedTrigger)βRegular triggers are one-time triggers that run once at a specified time. After successful execution, they are marked as completed and will not run again unless manually rescheduled.
π Creating a Regular Trigger Entity
Section titled βπ Creating a Regular Trigger Entityβimport { Entity } from 'typeorm';import { TimeBasedTrigger } from './common/scheduler/shared/entities/time-based-trigger.entity';
@Entity('my_triggers')export class MyTrigger extends TimeBasedTrigger { // Add additional properties specific to your trigger}π Scheduled Triggers (ScheduledTrigger)
Section titled βπ Scheduled Triggers (ScheduledTrigger)βScheduled triggers run repeatedly according to a cron expression. After each execution (whether successful or not), they automatically schedule their next run based on the cron expression and timezone.
π Creating a Scheduled Trigger Entity
Section titled βπ Creating a Scheduled Trigger Entityβimport { Entity } from 'typeorm';import { ScheduledTrigger } from './common/scheduler/shared/entities/scheduled-trigger.entity';
@Entity('my_scheduled_triggers')export class MyScheduledTrigger extends ScheduledTrigger { // Add additional properties specific to your trigger}π§© Scheduled System Triggers
Section titled βπ§© Scheduled System TriggersβSystem triggers are periodic system tasks based on cron and executed by a shared handler.
Basic approach
- Extend the base class
BaseSystemTaskProcessorand implement three methods:getType(): SystemTriggerTypeβ a unique task typegetDefaultCron(): stringβ the default scheduleprocess(trigger: SystemTrigger, options?: { signal?: AbortSignal })β task execution
- Add your class to your moduleβs
providers. SystemTriggerHandlerServiceincommon/schedulerwill automatically discover such processors (viaDiscoveryModule), create/update records insystem_triggers(based ongetDefaultCron()), and execute them on schedule.
Example: creating your own processor
import { Injectable, Logger } from '@nestjs/common';import { BaseSystemTaskProcessor } from './system-tasks/base-system-task.processor';import { SystemTriggerType } from './system-tasks/system-trigger-type';import { SystemTrigger } from './shared/entities/system-trigger.entity';
@Injectable()export class MyCustomTaskProcessor extends BaseSystemTaskProcessor { private readonly logger = new Logger(MyCustomTaskProcessor.name);
getType() { return SystemTriggerType.MY_CUSTOM_TASK; }
// for example, every 5 minutes at second 0 getDefaultCron() { return '0 */5 * * * *'; }
async process(_trigger: SystemTrigger, options?: { signal?: AbortSignal }): Promise<void> { if (options?.signal?.aborted) return; this.logger.debug('Executing MY_CUSTOM_TASK'); // Your business logic here }}Module wiring
import { Module } from '@nestjs/common';import { MyCustomTaskProcessor } from './system-tasks/processors/my-custom-task.processor';
@Module({ providers: [MyCustomTaskProcessor],})export class MyFeatureModule {}π¦ Module Registration
Section titled βπ¦ Module RegistrationβTo use the Scheduler Module, import it into your application module:
import { Module } from '@nestjs/common';import { SchedulerModule } from './common/scheduler/scheduler.module';
@Module({ imports: [SchedulerModule],})export class MyModule {}π Self-Registering Trigger Handler
Section titled βπ Self-Registering Trigger HandlerβImplement a trigger handler that self-registers with the scheduler facade during initialization:
import { Injectable, Inject, OnModuleInit } from '@nestjs/common';import { InjectRepository } from '@nestjs/typeorm';import { Repository } from 'typeorm';import { SCHEDULER_FACADE, SchedulerFacade } from './common/scheduler/shared/scheduler.facade';import { TimeBasedTriggerHandler } from './common/scheduler/shared/time-based-trigger-handler.interface';import { MyTrigger } from './my-trigger.entity';
@Injectable()export class MyTriggerHandler implements TimeBasedTriggerHandler<MyTrigger>, OnModuleInit { constructor( @InjectRepository(MyTrigger) private readonly repository: Repository<MyTrigger>, @Inject(SCHEDULER_FACADE) private readonly schedulerFacade: SchedulerFacade ) {}
getTriggerRepository(): Repository<MyTrigger> { return this.repository; }
async handleTrigger(trigger: MyTrigger): Promise<void> { // Implement your trigger processing logic here }
processingCronExpression(): string { // Define how frequently triggers should be checked for processing return '0 */5 * * * *'; // Every 5 minutes }
async onModuleInit() { // Self-register with the scheduler facade await this.schedulerFacade.registerTimeBasedTriggerHandler(this); }}Note: The registerTimeBasedTriggerHandler method automatically adapts its behavior based on the SCHEDULER_EXECUTION_ENABLED configuration. When set to false, it will only log the registration without creating cron jobs. When set to true, it will create and start the actual cron jobs for trigger execution.
βοΈ Configuration
Section titled ββοΈ ConfigurationβThe Scheduler Module can be configured using environment variables:
| Environment Variable | Description | Default Value |
|---|---|---|
SCHEDULER_EXECUTION_ENABLED | Enables/disables trigger execution. When false, only registration occurs without creating cron jobs | true |
SCHEDULER_TRIGGER_RUNNER_TYPE | The type of trigger runner to use (direct or pubsub) | direct |
SCHEDULER_PUBSUB_PROJECT_ID | The Google Cloud project ID to use for Pub/Sub (required when using pubsub runner) | - |
SCHEDULER_TIMEZONE | The timezone to use for cron expressions | UTC |
SCHEDULER_GRACEFUL_SHUTDOWN_TIMEOUT_MINUTES | The timeout in minutes for graceful shutdown | 15 |
π Execution Modes
Section titled βπ Execution ModesβThe Scheduler Module supports two execution modes controlled by the SCHEDULER_EXECUTION_ENABLED environment variable:
π΄ Registration Mode (SCHEDULER_EXECUTION_ENABLED=false)
Section titled βπ΄ Registration Mode (SCHEDULER_EXECUTION_ENABLED=false)βIn registration mode, the scheduler facade only logs trigger handler registrations without creating actual cron jobs or executing triggers. This mode is suitable for:
- API instances: Where you want to register handlers for completeness but not execute scheduled tasks
- Development environments: Where you want to test handler registration without running actual schedules
- Service separation: When you want to separate registration logic from execution logic
SCHEDULER_EXECUTION_ENABLED=falseWhen a trigger handler is registered in this mode:
Handler 'MyTriggerHandler' registered but execution disabled.π’ Worker Mode (SCHEDULER_EXECUTION_ENABLED=true)
Section titled βπ’ Worker Mode (SCHEDULER_EXECUTION_ENABLED=true)βIn worker mode, the scheduler facade creates actual cron jobs and executes triggers according to their schedules. This mode is suitable for:
- Worker instances: Dedicated instances that handle scheduled task execution
- Production environments: Where actual trigger processing should occur
- Single-instance deployments: Where both API and scheduling functionality are needed
SCHEDULER_EXECUTION_ENABLED=trueWhen a trigger handler is registered in this mode:
Time-based trigger handler 'MyTriggerHandler' initialized with cron '0 */5 * * * *' in timezone UTCThis adaptive behavior allows the same codebase to serve both API instances (registration only) and Worker instances (full execution) without requiring separate implementations.
πββοΈ Runner Types
Section titled βπββοΈ Runner Typesββ‘ Direct Runner
Section titled ββ‘ Direct Runnerββββββββββββββββ βββββββββββββββ ββββββββββββββββ Fetch ββββββΊβ Process ββββββΊβ Update ββ Triggers β β In Current β β Status ββββββββββββββββ β Process β βββββββββββββββ βββββββββββββββThe direct runner processes triggers immediately in the current process. Itβs suitable for scenarios where triggers need to be processed quickly and donβt require distributed processing.
To use the direct runner, set:
SCHEDULER_TRIGGER_RUNNER_TYPE=directπ Google PubSub Runner
Section titled βπ Google PubSub Runnerββββββββββββββββ βββββββββββββββ βββββββββββββββ ββββββββββββββββ Fetch ββββββΊβ Publish to ββββββΊβ Subscribe & ββββββΊβ Process ββ Triggers β β PubSub β β Receive β β & Update ββββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββThe PubSub runner publishes trigger IDs to a Google Cloud PubSub topic and processes them asynchronously when received from the subscription. Itβs suitable for distributed processing scenarios where triggers need to be processed across multiple instances or services.
To use the PubSub runner, set:
SCHEDULER_TRIGGER_RUNNER_TYPE=pubsubSCHEDULER_PUBSUB_PROJECT_ID=your-google-cloud-project-idImportant Note: When using the Pub/Sub runner, Googleβs default credential logic is used for authentication. Credentials are automatically obtained from the environment where the application is running, following Google Cloudβs standard credential resolution process. This means that the application will use the credentials available in the environment (e.g., service account credentials, application default credentials, etc.). For more information, see Google Cloud Authentication documentation.
β οΈ Error Handling
Section titled ββ οΈ Error HandlingβThe Scheduler Module includes built-in error handling for trigger processing. If an error occurs during trigger processing, the trigger is marked with an error status.
π Optimistic Locking
Section titled βπ Optimistic LockingβThe Scheduler Module uses optimistic locking to prevent duplicate processing of triggers in distributed environments. This ensures that each trigger is processed exactly once, even when multiple instances of the application are running.
π Graceful Shutdown
Section titled βπ Graceful ShutdownβThe Scheduler Module includes graceful shutdown support to ensure that in-progress trigger processing can complete before the application terminates. This prevents data loss or inconsistency that could occur if processing is abruptly terminated.
When the application receives a shutdown signal, the graceful shutdown service:
- Prevents new triggers from being processed
- Tracks active trigger processing
- Signals OS processes when available (
SIGTERM;SIGKILLafter timeout) - Waits for active processing to complete or until the configurable timeout
Internally, the service exposes helpers (isInShutdownMode, registerActiveProcess/unregisterActiveProcess, updateProcessPid) and initiateShutdown(signal), which is invoked by the module lifecycle (onModuleDestroy).
The timeout for graceful shutdown can be configured using the
SCHEDULER_GRACEFUL_SHUTDOWN_TIMEOUT_MINUTES environment variable. If active
processes are still running when the timeout is reached, the application will
log details about these processes, send SIGKILL to known PIDs, and then terminate.