Background Jobs
Overview
BookWish uses Bull for background job processing. Bull is a Redis-based queue system for Node.js that handles asynchronous tasks, scheduled jobs, and job retries.
Technology Stack
- Queue: Bull 4.12
- Cache/Store: Redis (IORedis 5.3)
- Scheduler: Cron expressions
Queue Configuration
Queue Factory
// src/jobs/queue.ts
import Queue from 'bull';
import { env } from '../config/env';
const redisConfig = {
redis: {
host: new URL(env.REDIS_URL).hostname,
port: parseInt(new URL(env.REDIS_URL).port || '6379'),
password: new URL(env.REDIS_URL).password || undefined,
},
};
export function createQueue<T = any>(name: string): Queue.Queue<T> {
return new Queue<T>(name, redisConfig);
}
Queue Initialization
Queues are initialized on server startup:
// src/index.ts
import { scheduleSquareSyncs } from './jobs/square-sync.job';
import { scheduleReservationCleanup } from './jobs/reservation-cleanup.job';
import { initializeDigestScheduler } from './jobs/order-digest.job';
async function main() {
// ... database and Redis connection ...
// Schedule recurring jobs
await scheduleSquareSyncs();
await scheduleReservationCleanup();
initializeDigestScheduler();
// Start server
app.listen(env.PORT);
}
Job Types
1. Notification Job
File: src/jobs/notification.job.ts
Sends push and email notifications asynchronously.
Queue: notifications
Job Data:
interface NotificationJobData {
notificationId: string;
userId: string;
type: NotificationType;
title: string;
body: string;
orderId?: string;
orderNumber?: string;
}
Processing:
notificationQueue.process(async (job: Job<NotificationJobData>) => {
const { notificationId, userId, type, title, body } = job.data;
// 1. Get user's push tokens
const pushTokens = await prisma.pushToken.findMany({
where: { userId },
});
// 2. Send push notification to all devices
for (const tokenRecord of pushTokens) {
await sendPush(tokenRecord.token, {
title,
body,
data: { notificationId, type },
});
}
// 3. Send email for critical notifications
const emailTypes = [
'order_update',
'payment_issue',
'premium_subscription_expiring'
];
if (emailTypes.includes(type)) {
const user = await prisma.user.findUnique({
where: { id: userId },
select: { email: true },
});
if (user?.email) {
await sendEmail({
to: user.email,
subject: title,
htmlBody: `<p>${body}</p>`,
textBody: body,
});
}
}
});
Queueing:
import { queueNotification } from '../jobs/notification.job';
await queueNotification({
notificationId: notification.id,
userId: user.id,
type: 'order_update',
title: 'Order Shipped',
body: 'Your order #12345 has been shipped',
orderId: order.id,
orderNumber: order.orderNumber,
});
Retry Configuration:
- Attempts: 3
- Backoff: Exponential starting at 2 seconds
- Remove on complete: Yes
- Remove on fail: No
2. Square Sync Job
File: src/jobs/square-sync.job.ts
Hourly sync of inventory from Square POS as backup to webhooks.
Queue: square-sync
Job Data:
interface SquareSyncJob {
storeId: string;
}
Processing:
squareSyncQueue.process(async (job) => {
const { storeId } = job.data;
// Sync inventory from Square
const result = await inventoryService.syncFromSquare(storeId);
logger.info('square.sync_completed', { storeId, result });
return result;
});
Scheduling:
export async function scheduleSquareSyncs(): Promise<void> {
// Find all stores with Square integration
const stores = await prisma.store.findMany({
where: {
inventorySource: 'square',
squareAccessToken: { not: null },
},
});
// Add recurring job for each store
for (const store of stores) {
await squareSyncQueue.add(
{ storeId: store.id },
{
jobId: `square-sync-${store.id}`, // Prevents duplicates
repeat: {
cron: '0 * * * *', // Every hour at minute 0
},
removeOnComplete: true,
removeOnFail: false,
attempts: 3,
backoff: {
type: 'exponential',
delay: 60000, // 1 minute
},
}
);
}
}
Manual Trigger:
export async function triggerStoreSync(storeId: string): Promise<void> {
await squareSyncQueue.add(
{ storeId },
{
removeOnComplete: true,
attempts: 1,
}
);
}
Schedule: Every hour at :00 (e.g., 1:00, 2:00, 3:00)
3. Reservation Cleanup Job
File: src/jobs/reservation-cleanup.job.ts
Releases stuck inventory reservations from abandoned/failed orders.
Queue: reservation-cleanup
Job Data: None (no payload)
Processing:
reservationCleanupQueue.process(async () => {
// Find pending orders older than 30 minutes
const cutoffTime = new Date();
cutoffTime.setMinutes(cutoffTime.getMinutes() - 30);
const stuckOrders = await prisma.order.findMany({
where: {
status: 'pending',
createdAt: { lt: cutoffTime },
},
include: {
items: {
where: {
inventoryId: { not: null },
},
},
},
});
// Release inventory for each stuck order
for (const order of stuckOrders) {
for (const item of order.items) {
if (item.inventoryId) {
await prisma.inventory.update({
where: { id: item.inventoryId },
data: {
reservedQuantity: { decrement: item.quantity },
},
});
}
}
// Mark order as cancelled
await prisma.order.update({
where: { id: order.id },
data: {
status: 'cancelled',
paymentStatus: 'failed',
},
});
}
return {
releasedOrders: stuckOrders.length,
releasedItems: stuckOrders.reduce((sum, o) => sum + o.items.length, 0),
};
});
Scheduling:
export async function scheduleReservationCleanup(): Promise<void> {
await reservationCleanupQueue.add(
{},
{
jobId: 'reservation-cleanup-recurring',
repeat: {
cron: '*/15 * * * *', // Every 15 minutes
},
removeOnComplete: true,
removeOnFail: false,
attempts: 3,
backoff: {
type: 'exponential',
delay: 30000, // 30 seconds
},
}
);
}
Schedule: Every 15 minutes
4. Order Digest Job
File: src/jobs/order-digest.job.ts
Daily email digest of new orders for stores and BookWish operations.
Queue: order-digest
Job Data:
{ type: 'daily' }
Processing:
orderDigestQueue.process(async (job) => {
// Process store digests
await processStoreDigests();
// Process BookWish ops digest
await processBookwishOpsDigest();
});
async function processStoreDigests(): Promise<void> {
// Get stores with pending digest items
const storesWithPendingItems = await prisma.orderDigestQueue.groupBy({
by: ['storeId'],
where: { processed: false },
});
// Send digest email for each store
for (const storeGroup of storesWithPendingItems) {
const digestItems = await prisma.orderDigestQueue.findMany({
where: { storeId: storeGroup.storeId, processed: false },
include: { order: { include: { items: true, user: true } } },
});
const store = await prisma.store.findUnique({
where: { id: storeGroup.storeId },
include: { owner: true },
});
const ownerEmail = store.owner?.email || store.email;
if (ownerEmail) {
await emailService.sendStoreOrderDigest(
store,
ownerEmail,
digestItems.map(item => ({
order: item.order,
isSpecialOrder: item.isSpecialOrder,
}))
);
}
// Mark as processed
await prisma.orderDigestQueue.updateMany({
where: { id: { in: digestItems.map(i => i.id) } },
data: { processed: true },
});
}
}
Queueing Orders:
// Queue order for store digest
export async function queueStoreOrderForDigest(
storeId: string,
orderId: string,
isSpecialOrder: boolean
): Promise<void> {
await prisma.orderDigestQueue.create({
data: { storeId, orderId, isSpecialOrder },
});
}
// Queue order for BookWish ops digest
export async function queueOrderForOpsDigest(orderId: string): Promise<void> {
await prisma.bookwishOpsDigestQueue.create({
data: { orderId },
});
}
Scheduling:
export function initializeDigestScheduler(): void {
orderDigestQueue.add(
{ type: 'daily' },
{
repeat: { cron: '0 6 * * *' }, // Every day at 6 AM
removeOnComplete: true,
removeOnFail: false,
}
);
}
Schedule: Daily at 6:00 AM
Cron Expression Reference
┌───────────── minute (0 - 59)
│ ┌────── ─────── hour (0 - 23)
│ │ ┌───────────── day of month (1 - 31)
│ │ │ ┌───────────── month (1 - 12)
│ │ │ │ ┌───────────── day of week (0 - 6) (Sunday to Saturday)
│ │ │ │ │
│ │ │ │ │
* * * * *
Examples:
0 * * * *- Every hour at minute 0*/15 * * * *- Every 15 minutes0 6 * * *- Daily at 6:00 AM0 0 * * 0- Weekly on Sunday at midnight0 0 1 * *- Monthly on the 1st at midnight
Job Configuration Options
Common Options
await queue.add(
jobData,
{
// Job ID (prevents duplicates)
jobId: 'unique-job-id',
// Retry configuration
attempts: 3,
backoff: {
type: 'exponential', // or 'fixed'
delay: 2000, // milliseconds
},
// Cleanup
removeOnComplete: true,
removeOnFail: false,
// Delayed execution
delay: 5000, // 5 seconds
// Recurring jobs
repeat: {
cron: '0 * * * *',
},
// Priority (higher = processed first)
priority: 1,
// Timeout
timeout: 60000, // 60 seconds
}
);
Retry Strategies
Exponential Backoff:
backoff: {
type: 'exponential',
delay: 2000, // Start at 2 seconds
}
// Retry delays: 2s, 4s, 8s, 16s, ...
Fixed Backoff:
backoff: {
type: 'fixed',
delay: 5000, // Always 5 seconds
}
// Retry delays: 5s, 5s, 5s, ...
Job Events
Listening to Events
queue.on('completed', (job, result) => {
logger.info('job.completed', { jobId: job.id, result });
});
queue.on('failed', (job, error) => {
logger.error('job.failed', {
jobId: job.id,
error: error.message,
});
});
queue.on('stalled', (job) => {
logger.warn('job.stalled', { jobId: job.id });
});
queue.on('progress', (job, progress) => {
logger.info('job.progress', { jobId: job.id, progress });
});
Available Events
completed- Job finished successfullyfailed- Job failed after all retriesstalled- Job is stuckprogress- Job reported progressactive- Job started processingwaiting- Job added to queuedelayed- Job scheduled for laterremoved- Job removed from queuecleaned- Old jobs cleaned up
Job Progress Reporting
queue.process(async (job) => {
// Report progress (0-100)
await job.progress(25);
// Do some work
await doStep1();
await job.progress(50);
await doStep2();
await job.progress(75);
await doStep3();
await job.progress(100);
return { success: true };
});
Queue Management
Pause/Resume Queue
// Pause queue (stop processing)
await queue.pause();
// Resume queue
await queue.resume();
Clean Old Jobs
// Remove completed jobs older than 1 hour
await queue.clean(3600000, 'completed');
// Remove failed jobs older than 1 day
await queue.clean(86400000, 'failed');
Get Queue Status
const jobCounts = await queue.getJobCounts();
// {
// waiting: 5,
// active: 2,
// completed: 100,
// failed: 3,
// delayed: 0,
// paused: 0
// }
Error Handling
Automatic Retries
Jobs automatically retry on failure based on configuration:
await queue.add(
jobData,
{
attempts: 3, // Total attempts (original + 2 retries)
backoff: {
type: 'exponential',
delay: 2000,
},
}
);
Failed Job Handler
queue.on('failed', async (job, error) => {
logger.error('job.failed', {
jobId: job.id,
attempts: job.attemptsMade,
error: error.message,
stack: error.stack,
});
// Optionally alert on critical failures
if (job.data.type === 'critical') {
await alertOps(`Job ${job.id} failed after ${job.attemptsMade} attempts`);
}
});
Best Practices
- Idempotent Jobs - Jobs should be safe to run multiple times
- Fail Fast - Return early on invalid data
- Log Progress - Log important steps for debugging
- Handle Errors Gracefully - Catch and log errors, don't crash
- Set Timeouts - Prevent jobs from running forever
- Clean Up - Remove old completed jobs
- Monitor Queues - Track queue depth and failed jobs
- Use Job IDs - Prevent duplicate jobs with unique IDs
- Graceful Shutdown - Wait for active jobs to complete
- Test Jobs - Write tests for job processing logic
Monitoring
Queue Metrics
// Get queue stats
const waiting = await queue.getWaitingCount();
const active = await queue.getActiveCount();
const failed = await queue.getFailedCount();
logger.info('queue.metrics', {
queue: 'notifications',
waiting,
active,
failed,
});
Job Inspection
// Get specific job
const job = await queue.getJob(jobId);
// Get job state
const state = await job.getState();
// 'waiting' | 'active' | 'completed' | 'failed' | 'delayed'
// Get job logs
const logs = await queue.getJobLogs(jobId);
Graceful Shutdown
async function gracefulShutdown() {
logger.info('jobs.shutdown_initiated');
// Close all queues
await notificationQueue.close();
await squareSyncQueue.close();
await reservationCleanupQueue.close();
await orderDigestQueue.close();
logger.info('jobs.shutdown_completed');
}
process.on('SIGTERM', gracefulShutdown);
process.on('SIGINT', gracefulShutdown);