bullmq icon indicating copy to clipboard operation
bullmq copied to clipboard

[Bug]: Workers suddenly stopped processing jobs on AWS memorydb

Open nullndr opened this issue 1 year ago • 12 comments
trafficstars

Version

5.12.12

Platform

NodeJS

What happened?

I was using bullmq 5.6 where I suffered the issue https://github.com/taskforcesh/bullmq/issues/2466.

After upgrading to 5.12 some workers suddenly stop processing jobs, actually I think they got stuck since I was unable to gracefully shut them down with the following code that works flawless in 5.6:

let isClosing = false;

const runOnce = async (callback: () => Promise<void>) => {
  if (!isClosing) {
    isClosing = true;
    await callback();
  }
};

const closeFlows = async () => {
  const res = await Promise.allSettled([
    runAutomationFlow.close(),
    runCampaignFlow.close(),
  ]);

  if (res.some(({ status }) => status === "rejected")) {
    logger(logger.err("Something went wrong while closing flows"));
  } else {
    logger(logger.ok("Flows successfully closed"));
  }
};

const closeQueues = async () => {
  const res = await Promise.allSettled([
    foo.queue.close(),
    bar.queue.close(),
    baz.queue.close(),
  ]);

  if (res.some(({ status }) => status === "rejected")) {
    logger(logger.err("Something went wrong while closing queues"));
  } else {
    logger(logger.ok("Queues successfully closed"));
  }
};

const closeWorkers = async () => {
  const res = await Promise.allSettled([
    foo.worker.close(),
    bar.worker.close(),
    baz.worker.close(),
  ]);

  if (res.some(({ status }) => status === "rejected")) {
    logger(logger.err("Something went wrong while closing workers"));
  } else {
    logger(logger.ok("Workers successfully closed"));
  }
};

const disconnectDb = async () => {
  try {
    await db.$disconnect();
    logger(logger.ok("Database connection successfully closed"));
  } catch (error) {
    logger(
      logger.err("Something went wrong while disconnecting the database"),
      {
        error,
      },
    );
    throw error;
  }
};

const disconnectRedis = async () => {
  const res = await Promise.allSettled([
    queueRedisConnection.quit(),
    workerRedisConnection.quit(),
    flowRedisConnection.quit(),
  ]);

  if (res.some(({ status }) => status === "rejected")) {
    logger(logger.err("Something went wrong while redis connections"));
  } else {
    logger(logger.ok("Redis connections successfully closed"));
  }
};

const closeAll = async () => {
  await Promise.allSettled([closeWorkers(), closeQueues(), closeFlows()]);
  /**
   * The database and the redis connection must be closed after all workers complete.
   */
  await Promise.allSettled([disconnectDb(), disconnectRedis()]);
  await notify({
    type: NotifyType.JobStopped,
    pid: process.pid,
  });
};

My configs for the queues, workers and flows are the following:

const baseRedisOptions: RedisOptions = {
  maxRetriesPerRequest: null,
  enableReadyCheck: false,
  showFriendlyErrorStack: true,
  retryStrategy: (t) => t * t * 1000,
  tls:
    process.env.NODE_ENV === "production" && Env.get("CLUSTER_MODE_ENABLED")
      ? /*
         * This is the same as the `--skipverify` flag in redli.
         * In production we must have a strong certificate, with a know authority.
         */
        { rejectUnauthorized: false }
      : undefined,
};

const queueRedisOptions: RedisOptions = {
  ...baseRedisOptions,
  enableOfflineQueue: false,
};

const workerRedisOptions: RedisOptions = {
  ...baseRedisOptions,
  enableOfflineQueue: true,
  maxRetriesPerRequest: null,
};

export const queueRedisConnection =
  process.env.NODE_ENV === "production" && Env.get("CLUSTER_MODE_ENABLED")
    ? new Redis.Cluster([{ host: Env.get("REDIS_HOST"), port: 6379 }], {
        clusterRetryStrategy: () => null,
        enableReadyCheck: false,
        enableOfflineQueue: false,
        showFriendlyErrorStack: true,
        redisOptions: {
          tls: {
            checkServerIdentity: () => undefined,
          },
          username: Env.get("REDIS_USERNAME"),
          password: Env.get("REDIS_PASSWORD"),
        },
      })
    : new Redis(Env.get("REDIS_HOST"), queueRedisOptions);

export const workerRedisConnection =
  process.env.NODE_ENV === "production" && Env.get("CLUSTER_MODE_ENABLED")
    ? new Redis.Cluster([{ host: Env.get("REDIS_HOST"), port: 6379 }], {
        clusterRetryStrategy: () => null,
        enableReadyCheck: false,
        enableOfflineQueue: true,
        showFriendlyErrorStack: true,
        redisOptions: {
          tls: {
            checkServerIdentity: () => undefined,
          },
          username: Env.get("REDIS_USERNAME"),
          password: Env.get("REDIS_PASSWORD"),
          maxRetriesPerRequest: null,
        },
      })
    : new Redis(Env.get("REDIS_HOST"), workerRedisOptions);

export const flowRedisConnection =
  process.env.NODE_ENV === "production" && Env.get("CLUSTER_MODE_ENABLED")
    ? new Redis.Cluster([{ host: Env.get("REDIS_HOST"), port: 6379 }], {
        clusterRetryStrategy: () => null,
        enableReadyCheck: false,
        enableOfflineQueue: false,
        showFriendlyErrorStack: true,
        redisOptions: {
          tls: {
            checkServerIdentity: () => undefined,
          },
          username: Env.get("REDIS_USERNAME"),
          password: Env.get("REDIS_PASSWORD"),
        },
      })
    : new Redis(Env.get("REDIS_HOST"), queueRedisOptions);

The sudden stop of processing them can be easily seen in the memorydb metrics:

image

Please tell me how can I provide you more usefull informations.

  • [X] I agree to follow this project's Code of Conduct

nullndr avatar Sep 04 '24 06:09 nullndr

Unfortunately there is not a lot for us to go for with this information... are there jobs in the wait list, or delayed, are all the expected workers actually online?

manast avatar Sep 04 '24 07:09 manast

The jobs are not delayed ones, so they should be in the wait list.

are all the expected workers actually online?

Is there a simple way I can check this? Also what is the reason for which they can go offline?

nullndr avatar Sep 04 '24 07:09 nullndr

The jobs are not delayed ones, so they should be in the wait list.

Could you please verify that this is the case, as "should" here seems to imply you do not know for sure...

Is there a simple way I can check this? Also what is the reason for which they can go offline?

You can use Taskforce.sh or any other frontend to check which workers are online for a given queue.

manast avatar Sep 04 '24 12:09 manast

Could you please verify that this is the case, as "should" here seems to imply you do not know for sure...

Yeah, as soon we will face the same issue I will check it

You can use Taskforce.sh or any other frontend to check which workers are online for a given queue.

Sadly I use AWS MemoryDB and it is not possible to connect it outside AWS resources, I am thinking about writing a little script that use Queue.getWorkers(), is this the correct way to check this?

Also, I took a look again at the docs and found out about the listener for the error event:

image

I will add it and check again.

nullndr avatar Sep 09 '24 06:09 nullndr

I can confirm they are in the waiting status

nullndr avatar Sep 10 '24 05:09 nullndr

hey @nullndr could you connection to your redis instances and executed monitor, let us know which commands are executing while waiting jobs are not processed

roggervalf avatar Sep 10 '24 14:09 roggervalf

@roggervalf I will try, in the meantime I downgraded to 5.1.12, I'll test all versions to bisect the exact commit.

nullndr avatar Sep 10 '24 14:09 nullndr

You can use Taskforce.sh or any other frontend to check which workers are online for a given queue.

Sadly I use AWS MemoryDB and it is not possible to connect it outside AWS resources, I am thinking about writing a little script that use Queue.getWorkers(), is this the correct way to check this?

This scenario is very common, if you Redis instance is isolated you should use the Taskforce connector: https://github.com/taskforcesh/taskforce-connector

You can use getWorkers as you mention to get the list of online workers.

manast avatar Sep 10 '24 19:09 manast

I can confirm they are in the waiting status

And the workers are idling and online?

manast avatar Sep 10 '24 19:09 manast

I have been able to connect my AWS memorydb to taskforce.sh, but the dashboard shows no workers in any queue.

I think this is because I have missing the name option in WorkersOptions since I had to downgrade bullmq to 5.1.12, I will try minor upgrades until I found the issue.

nullndr avatar Sep 11 '24 07:09 nullndr

It could also be that MemoryDB does not implement this command: https://redis.io/docs/latest/commands/client-setname/ but I could not find in the documentation of MemoryDB that this is not the case.

manast avatar Sep 11 '24 07:09 manast

hi @nullndr I see that you were using cluster mode. There was a similar issue reported also when using cluster that was fixes in https://github.com/taskforcesh/bullmq/pull/3504 pls upgrade your version to 5.61.2 and let us know how it goes

roggervalf avatar Oct 24 '25 14:10 roggervalf