Quản lý Queue với BullMQ
Giới thiệu
Trong bài này, chúng ta sẽ tìm hiểu về quản lý queue backup jobs với BullMQ và Redis, bao gồm worker configuration, retry logic, và job recovery.
BullMQ Overview
BullMQ là gì?
BullMQ là một job queue system cho Node.js:
- Built on Redis
- Reliable job processing
- Retry và delayed jobs
- Concurrency control
- Job prioritization
Architecture
┌─────────────────┐
│ Backup Scheduler│
│ (Cron Job) │
└────────┬─────────┘
│
↓
┌─────────────────┐
│ Queue │
│ (BullMQ) │
└────────┬─────────┘
│
↓
┌─────────────────┐
│ Worker │
│ (BullMQ) │
└────────┬─────────┘
│
↓
┌─────────────────┐
│ Backup Service │
│ (Execute) │
└─────────────────┘Queue Configuration
Initialize Queue Manager
typescript
import { Queue, Worker } from 'bullmq';
import { createRedisConnection } from '../../config/redis-config';
class BackupQueueManager {
private queue: Queue<BackupJobData> | null = null;
private worker: Worker<BackupJobData> | null = null;
// Configuration from environment
private readonly concurrency = parseInt(
process.env.BACKUP_CONCURRENCY || '7'
);
private readonly maxAttempts = parseInt(
process.env.BACKUP_MAX_ATTEMPTS || '3'
);
async initialize(): Promise<void> {
const connection = await createRedisConnection();
// Create queue
this.queue = new Queue<BackupJobData>('backup-processing', {
connection,
defaultJobOptions: {
removeOnComplete: 50,
removeOnFail: 20,
attempts: this.maxAttempts,
backoff: {
type: 'exponential',
delay: 5000, // Start with 5s delay
},
},
});
// Create worker
this.worker = new Worker<BackupJobData>(
'backup-processing',
async (job) => {
return await this.processBackupJob(job);
},
{
connection,
concurrency: this.concurrency, // 7 jobs đồng thời
maxStalledCount: 1,
stalledInterval: 30000, // 30s
limiter: {
max: 50, // Max 50 jobs
duration: 1000, // per second
},
}
);
// Setup event listeners
this.setupEventListeners();
}
}Job Data Structure
BackupJobData Interface
typescript
export interface BackupJobData {
scheduleId: string;
resourceType: string;
resourceId: string;
customerEmail: string;
location?: string; // 'HCM' or 'HNI'
attempt?: number;
}Add Job to Queue
Queue Backup Job
typescript
async addBackupJob(
schedule: IBackupSchedule,
delay: number = 0
): Promise<string> {
const jobData: BackupJobData = {
scheduleId: schedule._id?.toString() || '',
resourceType: schedule.resourceType,
resourceId: schedule.resourceId,
customerEmail: schedule.customerEmail,
location: schedule.location,
};
// Create deterministic jobId based on schedule and time slot
const tz = schedule.timezone || 'Asia/Ho_Chi_Minh';
const nowTz = moment.tz(new Date(), tz);
const slotId = nowTz.format('YYYY-MM-DD-HH');
const deterministicJobId = `${schedule._id}-${slotId}`;
try {
const job = await this.queue.add(
`backup-${schedule.resourceType}-${schedule.resourceId}`,
jobData,
{
delay,
priority: 1,
jobId: deterministicJobId, // Prevent duplicates
}
);
return job.id || '';
} catch (error: any) {
// If job already exists, return existing job ID
if (error.message.includes('Job already exists')) {
return deterministicJobId;
}
throw error;
}
}Deterministic Job ID:
- Format:
{scheduleId}-{YYYY-MM-DD-HH} - Prevents duplicate jobs trong cùng time slot
- Example:
507f1f77bcf86cd799439011-2025-01-25-02
Process Backup Job
Worker Implementation
typescript
private async processBackupJob(job: Job<BackupJobData>): Promise<void> {
const { scheduleId, resourceType, resourceId } = job.data;
logger.info(
`🚀 Processing backup job: ${resourceType}:${resourceId} ` +
`(Job ID: ${job.id})`
);
try {
// 1. Update schedule status to running
await BackupSchedule.findByIdAndUpdate(scheduleId, {
$set: {
status: 'running',
runningJobId: job.id,
lastError: null,
startedAt: new Date()
}
});
// 2. Get fresh schedule data
const schedule = await BackupSchedule.findById(scheduleId);
if (!schedule) {
throw new Error(`Schedule not found: ${scheduleId}`);
}
if (!schedule.enabled) {
logger.warn(`⚠️ Schedule disabled, skipping: ${scheduleId}`);
await BackupSchedule.findByIdAndUpdate(scheduleId, {
$set: {
status: 'idle',
runningJobId: null,
lastRunAt: new Date()
}
});
return;
}
// 3. Execute backup
await runSingleBackup(schedule, new Date());
// 4. Update schedule status to completed
await BackupSchedule.findByIdAndUpdate(scheduleId, {
$set: {
status: 'completed',
runningJobId: null,
lastRunAt: new Date(),
retryCount: 0,
completedAt: new Date(),
nextRunAt: calcNextRunAt(schedule)
}
});
} catch (error: any) {
// 5. Handle errors
const attemptsMade = job.attemptsMade || 0;
const isFinalAttempt = attemptsMade >= this.maxAttempts;
await BackupSchedule.findByIdAndUpdate(scheduleId, {
$set: {
status: isFinalAttempt ? 'failed' : 'pending',
runningJobId: null,
lastError: error.message,
retryCount: attemptsMade,
failedAt: isFinalAttempt ? new Date() : undefined
}
});
// Let BullMQ handle retry
throw error;
}
}Retry Logic
Exponential Backoff
typescript
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 5000, // Start with 5 seconds
}
}Retry Schedule:
- Attempt 1: Immediate
- Attempt 2: After 5 seconds
- Attempt 3: After 10 seconds (5 * 2)
- Attempt 4: After 20 seconds (5 * 2^2)
Retry Conditions
typescript
// BullMQ automatically retries on:
- Job throws error
- Job times out (if timeout configured)
- Worker crashes
// Manual retry
await job.retry();Event Listeners
Setup Event Listeners
typescript
private setupEventListeners(): void {
// Worker events
this.worker.on('completed', (job) => {
logger.info(`✅ Backup job completed: ${job.data.resourceType}:${job.data.resourceId}`);
});
this.worker.on('failed', (job, err) => {
logger.error(
`❌ Backup job failed: ${job?.data.resourceType}:${job?.data.resourceId} - ` +
`${err.message}`
);
});
this.worker.on('stalled', (jobId) => {
logger.warn(`⚠️ Backup job stalled: ${jobId}`);
});
// Queue events
this.queueEvents.on('added', ({ jobId }) => {
logger.debug(`📝 Job added to queue: ${jobId}`);
});
}Queue Statistics
Get Queue Stats
typescript
async getQueueStats(): Promise<BackupQueueStats> {
const counts = await this.queue.getJobCounts(
'wait',
'active',
'completed',
'failed',
'delayed',
'paused'
);
return {
waiting: counts.wait || 0,
active: counts.active || 0,
completed: counts.completed || 0,
failed: counts.failed || 0,
delayed: counts.delayed || 0,
paused: Boolean(counts.paused),
};
}Job Cleanup
Clean Old Jobs
typescript
async cleanOldJobs(): Promise<void> {
const cleanInterval = 24 * 60 * 60 * 1000; // 24 hours
const batchSize = 1000;
// Clean completed jobs
const completedCleaned = await this.queue.clean(
cleanInterval,
batchSize,
'completed'
);
// Clean failed jobs
const failedCleaned = await this.queue.clean(
cleanInterval,
batchSize,
'failed'
);
logger.info(
`🧹 Cleaned ${completedCleaned.length + failedCleaned.length} old jobs`
);
}Health Check
Queue Health Check
typescript
async healthCheck(): Promise<{ healthy: boolean; issues: string[] }> {
const issues: string[] = [];
if (!this.isReady()) {
issues.push('Queue manager not initialized');
return { healthy: false, issues };
}
try {
const stats = await this.getQueueStats();
// Check thresholds
if (stats.waiting > 100) {
issues.push(`Too many waiting jobs: ${stats.waiting}`);
}
if (stats.active > 20) {
issues.push(`Too many active jobs: ${stats.active}`);
}
if (stats.failed > 50) {
issues.push(`Too many failed jobs: ${stats.failed}`);
}
return {
healthy: issues.length === 0,
issues
};
} catch (error: any) {
issues.push(`Health check failed: ${error.message}`);
return { healthy: false, issues };
}
}Best Practices
1. Concurrency Control
typescript
// ✅ DO: Set appropriate concurrency
concurrency: 7 // Based on server capacity
// ❌ DON'T: Set too high (overload system)
concurrency: 100 // Too many2. Deterministic Job IDs
typescript
// ✅ DO: Use deterministic job IDs
const jobId = `${scheduleId}-${timeSlot}`;
// ❌ DON'T: Use random IDs
const jobId = uuid(); // Creates duplicates3. Error Handling
typescript
// ✅ DO: Let BullMQ handle retries
throw error; // BullMQ will retry automatically
// ❌ DON'T: Swallow errors
catch (error) {
// No throw - job marked as completed
}Summary
Key Points
BullMQ Configuration
- Queue với Redis connection
- Worker với concurrency control
- Retry với exponential backoff
Job Processing
- Deterministic job IDs
- Status tracking
- Error handling
Monitoring
- Event listeners
- Queue statistics
- Health checks
Cleanup
- Remove old completed jobs
- Remove old failed jobs
Next Steps
Trong bài tiếp theo, chúng ta sẽ tìm hiểu về:
- Template System - Template-based backup configuration
Last Updated: 2025-01-25
Previous: 04. Schedule System
Next: 06. Template System