Skip to content

Quản lý Queue với BullMQ

Giới thiệu

Trong bài này, chúng ta sẽ tìm hiểu về quản lý queue backup jobs với BullMQ và Redis, bao gồm worker configuration, retry logic, và job recovery.


BullMQ Overview

BullMQ là gì?

BullMQ là một job queue system cho Node.js:

  • Built on Redis
  • Reliable job processing
  • Retry và delayed jobs
  • Concurrency control
  • Job prioritization

Architecture

┌─────────────────┐
│  Backup Scheduler│
│  (Cron Job)      │
└────────┬─────────┘


┌─────────────────┐
│  Queue          │
│  (BullMQ)       │
└────────┬─────────┘


┌─────────────────┐
│  Worker         │
│  (BullMQ)       │
└────────┬─────────┘


┌─────────────────┐
│  Backup Service │
│  (Execute)      │
└─────────────────┘

Queue Configuration

Initialize Queue Manager

typescript
import { Queue, Worker } from 'bullmq';
import { createRedisConnection } from '../../config/redis-config';

class BackupQueueManager {
  private queue: Queue<BackupJobData> | null = null;
  private worker: Worker<BackupJobData> | null = null;
  
  // Configuration from environment
  private readonly concurrency = parseInt(
    process.env.BACKUP_CONCURRENCY || '7'
  );
  private readonly maxAttempts = parseInt(
    process.env.BACKUP_MAX_ATTEMPTS || '3'
  );
  
  async initialize(): Promise<void> {
    const connection = await createRedisConnection();
    
    // Create queue
    this.queue = new Queue<BackupJobData>('backup-processing', {
      connection,
      defaultJobOptions: {
        removeOnComplete: 50,
        removeOnFail: 20,
        attempts: this.maxAttempts,
        backoff: {
          type: 'exponential',
          delay: 5000, // Start with 5s delay
        },
      },
    });
    
    // Create worker
    this.worker = new Worker<BackupJobData>(
      'backup-processing',
      async (job) => {
        return await this.processBackupJob(job);
      },
      {
        connection,
        concurrency: this.concurrency, // 7 jobs đồng thời
        maxStalledCount: 1,
        stalledInterval: 30000, // 30s
        limiter: {
          max: 50, // Max 50 jobs
          duration: 1000, // per second
        },
      }
    );
    
    // Setup event listeners
    this.setupEventListeners();
  }
}

Job Data Structure

BackupJobData Interface

typescript
export interface BackupJobData {
  scheduleId: string;
  resourceType: string;
  resourceId: string;
  customerEmail: string;
  location?: string; // 'HCM' or 'HNI'
  attempt?: number;
}

Add Job to Queue

Queue Backup Job

typescript
async addBackupJob(
  schedule: IBackupSchedule, 
  delay: number = 0
): Promise<string> {
  const jobData: BackupJobData = {
    scheduleId: schedule._id?.toString() || '',
    resourceType: schedule.resourceType,
    resourceId: schedule.resourceId,
    customerEmail: schedule.customerEmail,
    location: schedule.location,
  };
  
  // Create deterministic jobId based on schedule and time slot
  const tz = schedule.timezone || 'Asia/Ho_Chi_Minh';
  const nowTz = moment.tz(new Date(), tz);
  const slotId = nowTz.format('YYYY-MM-DD-HH');
  const deterministicJobId = `${schedule._id}-${slotId}`;
  
  try {
    const job = await this.queue.add(
      `backup-${schedule.resourceType}-${schedule.resourceId}`,
      jobData,
      {
        delay,
        priority: 1,
        jobId: deterministicJobId, // Prevent duplicates
      }
    );
    
    return job.id || '';
  } catch (error: any) {
    // If job already exists, return existing job ID
    if (error.message.includes('Job already exists')) {
      return deterministicJobId;
    }
    throw error;
  }
}

Deterministic Job ID:

  • Format: {scheduleId}-{YYYY-MM-DD-HH}
  • Prevents duplicate jobs trong cùng time slot
  • Example: 507f1f77bcf86cd799439011-2025-01-25-02

Process Backup Job

Worker Implementation

typescript
private async processBackupJob(job: Job<BackupJobData>): Promise<void> {
  const { scheduleId, resourceType, resourceId } = job.data;
  
  logger.info(
    `🚀 Processing backup job: ${resourceType}:${resourceId} ` +
    `(Job ID: ${job.id})`
  );
  
  try {
    // 1. Update schedule status to running
    await BackupSchedule.findByIdAndUpdate(scheduleId, {
      $set: {
        status: 'running',
        runningJobId: job.id,
        lastError: null,
        startedAt: new Date()
      }
    });
    
    // 2. Get fresh schedule data
    const schedule = await BackupSchedule.findById(scheduleId);
    if (!schedule) {
      throw new Error(`Schedule not found: ${scheduleId}`);
    }
    
    if (!schedule.enabled) {
      logger.warn(`⚠️ Schedule disabled, skipping: ${scheduleId}`);
      await BackupSchedule.findByIdAndUpdate(scheduleId, {
        $set: {
          status: 'idle',
          runningJobId: null,
          lastRunAt: new Date()
        }
      });
      return;
    }
    
    // 3. Execute backup
    await runSingleBackup(schedule, new Date());
    
    // 4. Update schedule status to completed
    await BackupSchedule.findByIdAndUpdate(scheduleId, {
      $set: {
        status: 'completed',
        runningJobId: null,
        lastRunAt: new Date(),
        retryCount: 0,
        completedAt: new Date(),
        nextRunAt: calcNextRunAt(schedule)
      }
    });
    
  } catch (error: any) {
    // 5. Handle errors
    const attemptsMade = job.attemptsMade || 0;
    const isFinalAttempt = attemptsMade >= this.maxAttempts;
    
    await BackupSchedule.findByIdAndUpdate(scheduleId, {
      $set: {
        status: isFinalAttempt ? 'failed' : 'pending',
        runningJobId: null,
        lastError: error.message,
        retryCount: attemptsMade,
        failedAt: isFinalAttempt ? new Date() : undefined
      }
    });
    
    // Let BullMQ handle retry
    throw error;
  }
}

Retry Logic

Exponential Backoff

typescript
defaultJobOptions: {
  attempts: 3,
  backoff: {
    type: 'exponential',
    delay: 5000, // Start with 5 seconds
  }
}

Retry Schedule:

  • Attempt 1: Immediate
  • Attempt 2: After 5 seconds
  • Attempt 3: After 10 seconds (5 * 2)
  • Attempt 4: After 20 seconds (5 * 2^2)

Retry Conditions

typescript
// BullMQ automatically retries on:
- Job throws error
- Job times out (if timeout configured)
- Worker crashes

// Manual retry
await job.retry();

Event Listeners

Setup Event Listeners

typescript
private setupEventListeners(): void {
  // Worker events
  this.worker.on('completed', (job) => {
    logger.info(`✅ Backup job completed: ${job.data.resourceType}:${job.data.resourceId}`);
  });
  
  this.worker.on('failed', (job, err) => {
    logger.error(
      `❌ Backup job failed: ${job?.data.resourceType}:${job?.data.resourceId} - ` +
      `${err.message}`
    );
  });
  
  this.worker.on('stalled', (jobId) => {
    logger.warn(`⚠️ Backup job stalled: ${jobId}`);
  });
  
  // Queue events
  this.queueEvents.on('added', ({ jobId }) => {
    logger.debug(`📝 Job added to queue: ${jobId}`);
  });
}

Queue Statistics

Get Queue Stats

typescript
async getQueueStats(): Promise<BackupQueueStats> {
  const counts = await this.queue.getJobCounts(
    'wait', 
    'active', 
    'completed', 
    'failed', 
    'delayed', 
    'paused'
  );
  
  return {
    waiting: counts.wait || 0,
    active: counts.active || 0,
    completed: counts.completed || 0,
    failed: counts.failed || 0,
    delayed: counts.delayed || 0,
    paused: Boolean(counts.paused),
  };
}

Job Cleanup

Clean Old Jobs

typescript
async cleanOldJobs(): Promise<void> {
  const cleanInterval = 24 * 60 * 60 * 1000; // 24 hours
  const batchSize = 1000;
  
  // Clean completed jobs
  const completedCleaned = await this.queue.clean(
    cleanInterval,
    batchSize,
    'completed'
  );
  
  // Clean failed jobs
  const failedCleaned = await this.queue.clean(
    cleanInterval,
    batchSize,
    'failed'
  );
  
  logger.info(
    `🧹 Cleaned ${completedCleaned.length + failedCleaned.length} old jobs`
  );
}

Health Check

Queue Health Check

typescript
async healthCheck(): Promise<{ healthy: boolean; issues: string[] }> {
  const issues: string[] = [];
  
  if (!this.isReady()) {
    issues.push('Queue manager not initialized');
    return { healthy: false, issues };
  }
  
  try {
    const stats = await this.getQueueStats();
    
    // Check thresholds
    if (stats.waiting > 100) {
      issues.push(`Too many waiting jobs: ${stats.waiting}`);
    }
    
    if (stats.active > 20) {
      issues.push(`Too many active jobs: ${stats.active}`);
    }
    
    if (stats.failed > 50) {
      issues.push(`Too many failed jobs: ${stats.failed}`);
    }
    
    return {
      healthy: issues.length === 0,
      issues
    };
  } catch (error: any) {
    issues.push(`Health check failed: ${error.message}`);
    return { healthy: false, issues };
  }
}

Best Practices

1. Concurrency Control

typescript
// ✅ DO: Set appropriate concurrency
concurrency: 7 // Based on server capacity

// ❌ DON'T: Set too high (overload system)
concurrency: 100 // Too many

2. Deterministic Job IDs

typescript
// ✅ DO: Use deterministic job IDs
const jobId = `${scheduleId}-${timeSlot}`;

// ❌ DON'T: Use random IDs
const jobId = uuid(); // Creates duplicates

3. Error Handling

typescript
// ✅ DO: Let BullMQ handle retries
throw error; // BullMQ will retry automatically

// ❌ DON'T: Swallow errors
catch (error) {
  // No throw - job marked as completed
}

Summary

Key Points

  1. BullMQ Configuration

    • Queue với Redis connection
    • Worker với concurrency control
    • Retry với exponential backoff
  2. Job Processing

    • Deterministic job IDs
    • Status tracking
    • Error handling
  3. Monitoring

    • Event listeners
    • Queue statistics
    • Health checks
  4. Cleanup

    • Remove old completed jobs
    • Remove old failed jobs

Next Steps

Trong bài tiếp theo, chúng ta sẽ tìm hiểu về:


Last Updated: 2025-01-25
Previous: 04. Schedule System
Next: 06. Template System

Internal documentation for iNET Portal