sqs-consumer icon indicating copy to clipboard operation
sqs-consumer copied to clipboard

Poller processing next message before previous message has finished

Open adamleemiller opened this issue 4 years ago • 2 comments

Question

I seem to be having an issue. Let's say that I have 5 messages in my queue and I start the poller. The first message is pulled and work begins on handling it. During processing, I have a shell command that executes that can take quite awhile to return and I need the poller to wait until the previous message is completed entirely before it moves on. Unfortunately, it appears that it is only waiting the pollingWaitTimeMs time and then it is processing the next message before the previous message is done. It is even executing the message_processed callback even though the shell command has not been completed.

screenshots

NA

Additional context

Basically, I am downloading an image from an object storage service like S3 (but not S3) and saving it locally to my server. Once saved, I am using that image to create map tiles using gdal2tiles.py however, this command can take sometime to process and finish. I need the poller to wait for this command to finish before it grabs the next message in the queue.

Code Sample

// Watcher Setup
const watcher = Consumer.create({
  queueUrl: global.config.msgsrvc.url,
  region: 'us-east-1',
  pollingWaitTimeMs: 60000,
  messageAttributeNames: ['companyId', 'farmId', 'fieldId', 'flightId', 'pipelineId', 'type', 'uuid', 'bucket', 'path', 'filename'],
  handleMessage: async(message) => {
    console.log("Processing Message...")

    await Pipeline.update({
      status: 2,
      startedAt: db.sequelize.fn('NOW')
    }, {
      where: {
        id: message.MessageAttributes.pipelineId.StringValue,
        uuid: message.MessageAttributes.uuid.StringValue
      }
    }).then(async() => {
      console.log("Downloading image from Wasabi...")

      let directory = `./tmp/${message.MessageAttributes.uuid.StringValue}-${message.MessageAttributes.type.StringValue}`

      fs.mkdir(directory, async() => {
        await storage.doDownload(
          message.MessageAttributes.bucket.StringValue,
          message.MessageAttributes.path.StringValue,
          message.MessageAttributes.filename.StringValue,
          directory
        ).then(async() => {
          console.log("Creating tiles...")

          let dirTiles = `${directory}/tiles`

          fs.mkdir(dirTiles, () => {
            let command = `gdal2tiles.py -z 1-20 --processes=10 ${directory}/${message.MessageAttributes.filename.StringValue} ${dirTiles}`

            // cmd.get(command, (data) => {
            //   console.log("Tiles created successfully...")
            //   console.log("Uploading tiles to Wasabi...")
            // })

            exec(command, (error, stdout, stderr) => {
              if (error) {
                console.log(`Error creating tiles: ${error.message}`)
              }

              console.log("Tiles created successfully...")
              console.log("Uploading tiles to Wasabi...")
            });
          })
        }).catch((error) => {
          console.log("Failed to download image: ", error)
        })
      })
    }).catch((error) => {
      console.log("Error: ", error)
    });
  },
  sqs: new aws.SQS({
    credentials: creds,
    apiVersion: '2012-11-05',
    region: 'us-east-1'
  })
});

watcher.on('message_received', (message) => {
  console.log("Message Received...")
})

watcher.on('message_processed', async(message) => {
  console.log("Message Processed...")
})

watcher.on('empty', () => {
  console.log("SQS Queue Empty...")
})

watcher.on('error', async(err, message) => {
  console.error("General Error:", err);
});

watcher.on('processing_error', async(err, message) => {
  console.error("Processing Error: ", err);
});

watcher.on('timeout_error', (err) => {
  console.error("Timeout Error: ", err);
});

console.log("Starting SQS Watcher")

watcher.start();

adamleemiller avatar Mar 25 '21 08:03 adamleemiller

This now needs to be changed to a bug. According to the documentation:

By default messages are processed one at a time – a new message won't be received until the first one has been processed. To process messages in parallel, use the batchSize option detailed below.

However, I know this to be NOT true. My worker is polling every 5 minutes and when it does, it grabs a new item from queue even if the last one has not completed.

adamleemiller avatar Apr 09 '21 23:04 adamleemiller

Well, I found a solution. Once a message has started processing inside the handleMessage handler, I execute .stop() which will stop the polling. Once the message has been fully processed (ours can take anywhere from 15 minutes to an hour or more) then the polling is started again. So far, this is working on three workers without a problem. For those that need a similar solution, I can say that this does work.

adamleemiller avatar Apr 11 '21 06:04 adamleemiller

Update: this comment is outdated now, solution in the linked PR.

Issue is that if you are returning without an error then SQS Consumer will consider the message as processed and continue with its next task, the solution here would be to return a Promise that actually awaits the processing and then this behaviour should not happen.

Otherwise, implement acknowledgment of message id/s once the PR is released.

nicholasgriffintn avatar Dec 21 '22 02:12 nicholasgriffintn

To make sure I understand the limitation here, is it correct to say the following:

Consumer processing time of messages must be shorter than the polling wait time, or unexpected consumption of messages will occur?

duncanhall avatar Jan 06 '23 13:01 duncanhall

We are noticing this behaviour too. The consumers poll 100s of messages at the same time even though the batch size is set to 2 and there are just 4 consumers. So I should only see around 8 messages in-flight at any given time which is different from 400+ that I am seeing.

tushar-sg avatar Jan 11 '23 07:01 tushar-sg

I have a PR in progress for this (still validating at the time of writing):

https://github.com/bbc/sqs-consumer/pull/354

It should stop Consumer from polling for messages when there are still message ids in its new internal queue, however, this implementation would require users to acknowledge their messages (return the message id or messages ids for batch).

nicholasgriffintn avatar Jan 22 '23 12:01 nicholasgriffintn

I've also updated the docs in that PR to state the following below the line mentioned:

By default, messages that are sent to the handleMessage and handleMessageBatch functions will be considered as processed if they return without an error. To acknowledge individual messages, please return the message that you want to acknowledge if you are using handleMessage or the messages for handleMessageBatch

Hopefully this clears that up as well.

nicholasgriffintn avatar Jan 22 '23 12:01 nicholasgriffintn

Thinking about this recently, I'm not sure if any change should be needed here actually, the pr I have in technically does the trick for further validation but at the cost of being potentially buggy.

I believe the issue here is just not awaiting the processing of your message before returning/ acknowledging, I think I may close the PR for now and instead try to clarify our messaging.

I may go back to the drawing board at some point later on, but at the moment, I think the best course of action is the above recommendation.

nicholasgriffintn avatar Feb 15 '23 21:02 nicholasgriffintn