bullmq icon indicating copy to clipboard operation
bullmq copied to clipboard

[Bug]: Jobs gets auto removed from the queue and no logs to found

Open PixelwandDev opened this issue 1 month ago • 4 comments

Version

5.58.5

Platform

NodeJS

What happened?

I have 6 queue running and i am not sure what happening it gets auto remove after sometimes without execution.

Worker is running on ECS: with 3% CPU utilization and 18% memory. Redis is on the EC2

Worker config:


export const defaultJobOptions = {
  attempts: 3,
  backoff: {
    delay: 2000,
    type: "exponential",
  },
  removeOnComplete: {
    age: 7 * 24 * 60 * 60 * 1000, // keep for 7 days (in milliseconds)
  },
  removeOnFail: {
    age: 30 * 24 * 60 * 60 * 1000, // keep failed jobs for 30 days (in milliseconds)
  },
};

export const defaultWorkerOptions = {
  attempts: 5,
  autorun: true,
  backoff: {
    delay: 1000 * 60 * 60 * 24, // 24 hours
    type: "exponential",
  },
  lockDuration: 180000,
  lockRenewTime: 90000,
  maxStalledCount: 3,
  removeOnComplete: {
    age: 7 * 24 * 60 * 60 * 1000, // 7 days (in milliseconds)
  },
  removeOnFail: {
    age: 30 * 24 * 60 * 60 * 1000, // 30 days (in milliseconds)
  },
  stalledInterval: 120 * 1000, // 2 minutes between stalled checks (increased for EC2 Redis)
  useWorkerThreads: false,
};

export const getWorker = (
  name: keyof typeof QUEUE_NAMES,
  // biome-ignore lint/suspicious/noExplicitAny: processer type is dynamic
  processer: (job: any) => Promise<void>,
  redis: RedisOptions,
) => {
  const queueName = QUEUE_NAMES[name];
  const workerOptions: WorkerOptions = {
    ...defaultWorkerOptions,
    concurrency: WORKER_CONCURRENCY[name],
    connection: redis,
    // Remove prefix to use default BullMQ behavior - queue name is already unique
  };
  return new Worker(queueName, processer, workerOptions);
};

export const WORKER_CONCURRENCY = {
  [JobType.EMAIL]: 30,
  [JobType.VOICEMAIL]: 50,
  [JobType.WATCH]: 100,
  [JobType.CAMPAIGN]: 100,
  [JobType.CAMPAIGN_STEP]: 40,
  [JobType.SMS]: 50,
};

export const QUEUE_NAMES = {
  CAMPAIGN: "campaignQueue",
  CAMPAIGN_STEP: "campaignStepQueue",
  EMAIL: "emailQueue",
  SMS: "smsQueue",
  VOICEMAIL: "voicemailQueue",
  WATCH: "watchQueue",
} as const;

// Redis configuration for workers (matches campaign scheduler)
export const getRedisConfig = () => ({
  connection: env.REDIS_URL,
});

// Get Redis connection options for BullMQ (shared by workers)
export const getRedisConnectionOptions = () => {
  // Parse Redis URL to get host and port
  const redisUrl = env.REDIS_URL;

  // Assume host:port format
  const [host, portStr] = redisUrl.split(":");
  const port = portStr ? Number.parseInt(portStr, 10) : 6379;

  const redisOptions: RedisOptions = {
    connectTimeout: 20000, // 20 seconds timeout for EC2 connections
    enableOfflineQueue: true,
    enableReadyCheck: true,
    family: 4, // Force IPv4
    host: host || "localhost",
    keepAlive: 30000, // Keep connection alive every 30 seconds
    maxRetriesPerRequest: null, // Required by BullMQ for blocking operations
    port,
    // Retry strategy for network issues
    retryStrategy: (times: number) => {
      const delay = Math.min(times * 50, 2000);
      return delay;
    },
  };

  return redisOptions;
};

// Create Redis connection instance (for backward compatibility)
export const createRedisConnection = () => {
  const options = getRedisConnectionOptions();
  // Add ACL username and password from env
  return new Redis({
    ...options,
    connectionName: "worker-connection",
    lazyConnect: true,
  });
};


export const createQueueMQ = (name: keyof typeof QUEUE_NAMES, redis: ReturnType<typeof createRedisConnection>) => {
  const queueName = QUEUE_NAMES[name];
  return new QueueMQ(queueName, {
    connection: redis,
    defaultJobOptions: {
      ...defaultJobOptions,
      ...(name === "CAMPAIGN"
        ? {
            removeOnComplete: true,
          }
        : {}),
    },
  });
};

Worker


// Get Redis connection options for BullMQ
const redisConnectionOptions = getRedisConnectionOptions();

// Create a dedicated Redis connection for health checks
const healthCheckRedis = createRedisConnection();

// Create workers with enhanced processors using Redis connection options
const workers = {
  [JobType.EMAIL]: getWorker("EMAIL", emailProcessor, redisConnectionOptions),
  [JobType.VOICEMAIL]: getWorker("VOICEMAIL", voicemailProcessor, redisConnectionOptions),
  [JobType.WATCH]: getWorker("WATCH", processWatchJob, redisConnectionOptions),
  [JobType.CAMPAIGN]: getWorker("CAMPAIGN", campaignProcessor, redisConnectionOptions),
  [JobType.CAMPAIGN_STEP]: getWorker("CAMPAIGN_STEP", campaignStepProcessor, redisConnectionOptions),
  [JobType.SMS]: getWorker("SMS", processSMSJob, redisConnectionOptions),
};

// Add event handlers for all workers
for (const [jobType, worker] of Object.entries(workers)) {
  const typedJobType = jobType as JobType;

  worker.on("ready", () => {
    console.log(`🟢 [WORKER] ${typedJobType} worker is ready and waiting for jobs`);
  });

  worker.on("active", (job: Job) => {
    console.log(`🔄 [WORKER] ${typedJobType} job started: ${job.id}`);
  });

  worker.on("completed", (job: Job) => {
    console.log(`✅ [WORKER] ${typedJobType} job completed: ${job.id}`);
  });

  worker.on("failed", (job: Job | undefined, err: Error) => {
    console.error(`❌ [WORKER] ${typedJobType} job failed: ${job?.id || "unknown"}`);
    console.error(`   Error: ${err.message}`);
  });

  worker.on("error", (err: Error) => {
    if (err.message.includes("timeout") || err.message.includes("ETIMEDOUT")) {
      console.error(`⏰ [WORKER] ${typedJobType} worker timeout error: ${err.message}`);
    } else if (err.message.includes("ECONNRESET") || err.message.includes("connection")) {
      console.error(`🔌 [WORKER] ${typedJobType} worker connection error: ${err.message}`);
    } else {
      console.error(`💥 [WORKER] ${typedJobType} worker error: ${err.message}`);
    }
  });

  worker.on("stalled", (jobId: string) => {
    console.warn(`⚠️ [WORKER] ${typedJobType} job stalled: ${jobId}`);
    console.warn(`   This might indicate lock duration is too short or network issues with Redis`);
  });
}

// Graceful shutdown
export async function closeWorkers(): Promise<void> {
  console.log("🛑 [WORKERS] Closing workers...");

  await Promise.all(Object.values(workers).map((worker) => worker.close()));

  // Close watch job scheduler
  try {
    await watchJobScheduler.close();
    console.log("✅ [SCHEDULER] Watch job scheduler closed");
  } catch (error) {
    console.error("❌ [SCHEDULER] Error closing watch job scheduler:", error);
  }

  // Close health check Redis connection
  try {
    await healthCheckRedis.quit();
    console.log("✅ [REDIS] Health check Redis connection closed");
  } catch (error) {
    console.error("❌ [REDIS] Error closing health check Redis connection:", error);
  }

  // Disconnect from Redis (BullMQ manages Redis connections automatically)
  try {
    console.log("✅ [REDIS] Redis connections will be closed by BullMQ workers");
  } catch (error) {
    console.error("❌ [REDIS] Error with Redis cleanup:", error);
  }

  // Disconnect from database
  try {
    await db.$disconnect();
    console.log("✅ [DB] Database disconnected");
  } catch (error) {
    console.error("❌ [DB] Error disconnecting from database:", error);
  }

  console.log("✅ [WORKERS] All workers closed successfully");
}

How to reproduce.

No response

Relevant log output

I cant find logs what happening, if anyone can say what kind of logs I need to share I can share the logs.

Code of Conduct

  • [x] I agree to follow this project's Code of Conduct

PixelwandDev avatar Nov 06 '25 08:11 PixelwandDev

But you have auto remove enabled, so what is not working as expected?

manast avatar Nov 06 '25 09:11 manast

@manast Jobs was not executed and randomly queue was empty.

Code to add the job in queue

export const scheduleCampaign = async ({ campaignId, brandId }: TCampaignJobData) => {
  const jobKey = generateCampaignJobKey(campaignId);

  await createJob(jobKey, JOBTYPE);

  try {
    const redis = createRedisConnection();

    const queue = createQueueMQ(JOBTYPE, redis);

    const delayMinutes = env.NODE_ENV === "production" ? 5 : 0;

    await queue.add(
      "send_campaign",
      {
        brandId,
        campaignId,
        type: "PUBLISH",
      },
      {
        // 5 minutes delay before processing
        // This allows for any immediate updates to be processed first
        // If user wants to cancel or modify the campaign right after scheduling
        delay: delayMinutes * 60 * 1000,
        jobId: jobKey,
      },
    );

    await queue.close();

    await setScheduledAt(jobKey, new Date(Date.now() + 5 * 60 * 1000));

    // Close the Redis connection
    redis.disconnect();
  } catch (error) {
    console.error("Error scheduling campaign job:", error);
    await setFailedAt(jobKey, new Date(), (error as Error)?.message || "Unknown error");
    throw error;
  }
};

PixelwandDev avatar Nov 06 '25 10:11 PixelwandDev

I am not able to understand what is the issue you are reporting. Can you please clarify and provide a complete but minimal test case that we can run to reproduce the issue?

manast avatar Nov 11 '25 11:11 manast

Sure, will provide it

PixelwandDev avatar Nov 11 '25 19:11 PixelwandDev