rqueue icon indicating copy to clipboard operation
rqueue copied to clipboard

RqueueMessageManager.deleteMessage(queueName, Id) not deleting the message from the Queue

Open pcastroadc opened this issue 1 year ago • 5 comments

What's not working?

RqueueMessageManager.deleteMessage(queueName, Id) not deleting the message from the Queue, I basically call it but if I call RqueueMessageManager.getMessage() is still there afterwards

While RqueueMessageManager.deleteAllMessages(queueName) does work

What're application dependencies ?

  • Rqueue Version: 2.10.2-RELEASE
  • Spring Boot Version: 2.6.8
  • Spring Data Redis Version: 2.6.4

Sample code:

@Slf4j
public abstract class AbstractTaskScheduler {
    @Value("${auction-segment.online.rqueue.enabled:true}")
    private boolean isRqueueEnabled;
    @Value("${auction-segment.online.rqueue.slack-offset:100ms}")
    private Duration slackOffset;

    private final RqueueMessageEnqueuer rqueueMessageEnqueuer;
    private final RqueueMessageManager rqueueMessageManager;
    private final String queueName;

    protected final TaskScheduler scheduler;
    protected Map<Long, ScheduledFuture<?>> jobsMap = new HashMap<>();

    public AbstractTaskScheduler(
        TaskScheduler scheduler,
        RqueueMessageEnqueuer rqueueMessageEnqueuer,
        RqueueMessageManager rqueueMessageManager,
        String queueName
    ) {
        this.scheduler = scheduler;
        this.rqueueMessageEnqueuer = rqueueMessageEnqueuer;
        this.rqueueMessageManager = rqueueMessageManager;
        this.queueName = queueName;
    }

    @SuppressWarnings("PMD.DoNotUseThreads")
    protected abstract Runnable getTask(Long id);

    public List<Long> getTaskIds() {
        if (isRqueueEnabled) {
            List<Object> taskIds = rqueueMessageManager.getAllMessages(queueName);
            return !CollectionUtils.isEmpty(taskIds)
                ? taskIds.stream().map(t -> Long.valueOf(t.toString()))
                .collect(Collectors.toList())
                : Collections.emptyList();
        } else {
            return jobsMap != null
                ? new ArrayList<>(jobsMap.keySet())
                : Collections.emptyList();
        }
    }

    public RqueueMessage getTaskById(Long id) {
        if (isRqueueEnabled) {
            return rqueueMessageManager.getRqueueMessage(queueName, String.valueOf(id));
        }
        return null;
    }

    public boolean taskExists(Long id) {
        return getTaskById(id) != null;
    }

    @Retryable(
        maxAttempts = 3,
        backoff = @Backoff(delay = 30),
        value = {Exception.class}
    )
    protected void addTaskToScheduler(long id, Instant startDateTime) {
        if (isRqueueEnabled) {
            log.info("adding message to {} queue - message id {} - to be started at {}", queueName, id, startDateTime);
            boolean success = rqueueMessageEnqueuer.enqueueAt(
                queueName,
                String.valueOf(id),
                id,
                startDateTime.minusMillis(slackOffset.toMillis())
            );

            if (!success) {
               throw new IllegalStateException(String.format("Failed to enqueue message %s for listing %s", queueName, id));
            }
        } else {
            ScheduledFuture<?> scheduledTask = scheduler.schedule(getTask(id), startDateTime);
            jobsMap.put(id, scheduledTask);
        }
    }

    public void removeTaskFromScheduler(long id) {
        if (isRqueueEnabled) {
            log.info("removing message from {} queue - message id {}", queueName, id);
            rqueueMessageManager.deleteMessage(queueName, String.valueOf(id));
        } else {
            ScheduledFuture<?> scheduledTask = jobsMap.get(id);
            if(scheduledTask != null) {
                scheduledTask.cancel(true);
                jobsMap.put(id, null);
            }
        }
    }

    @EventListener({ ContextRefreshedEvent.class })
    protected abstract void contextRefreshedEvent();
}

I am not sure if this is related to this part of your code:

  @Override
  public boolean deleteMessage(String queueName, String messageId, Duration duration) {
    String lockValue = UUID.randomUUID().toString();
    try {
      if (lockManager.acquireLock(messageId, lockValue, Duration.ofSeconds(1))) {
        String id = RqueueMessageUtils.getMessageMetaId(queueName, messageId);
        MessageMetadata messageMetadata = rqueueMessageMetadataDao.get(id);
        if (messageMetadata == null) {
          messageMetadata = new MessageMetadata(id, MessageStatus.DELETED);
        }
        messageMetadata.setDeleted(true);
        messageMetadata.setDeletedOn(System.currentTimeMillis());
        save(messageMetadata, duration);
        return true;
      }
    } finally {
      lockManager.releaseLock(messageId, lockValue);
    }
    return false;
  }

you are not releasing the lock here when it enters in the IF

pcastroadc avatar Aug 05 '22 02:08 pcastroadc

you are not releasing the lock here when it enters in the IF

Finally block is always called.

Can you read MessageMetadata it should have the flag deleted is true, this is not a bug? This is just an inconsistency about the usage of MesssageMetadata.

sonus21 avatar Aug 05 '22 07:08 sonus21

how is it not a bug if calling RqueueMessageManager.getMessage(queueName, id) after deleting it still returns it?

and also if you call deleteAllMessages it does delete it and then RqueueMessageManager.getMessage() doesnt return it anymore. That's not consistent behavior.

And what do you mean with calling read MessageMetadata exactly? using another one of your classes to do it? can you provide a quick exmaple?

pcastroadc avatar Aug 05 '22 12:08 pcastroadc

I meant to say, this method does not check delete flag , we can add a fix for this

On Fri, 5 Aug 2022 at 5:46 PM, pcastroadc @.***> wrote:

how is it not a bug if calling RqueueMessageManager.getMessage(queueName, id) after deleting it still returns it?

and also if you call deleteAllMessages it does delete it and then RqueueMessageManager.getMessage() doesnt return it anymore. That's not consistent behavior, what do you mean with calling read MessageMetadata exactly? using another one of your classes to do it?

— Reply to this email directly, view it on GitHub https://github.com/sonus21/rqueue/issues/162#issuecomment-1206379809, or unsubscribe https://github.com/notifications/unsubscribe-auth/AC2FCPVIO42BSFCK2TGQB2DVXUAZHANCNFSM55URTWZA . You are receiving this because you were assigned.Message ID: @.***>

-- Thanks, Sonu Kumar Linkedin https://in.linkedin.com/in/sonus21 GitHub https://github.com/sonus21

sonus21 avatar Aug 05 '22 12:08 sonus21

One more question, Will messages mark as deleted still be picked up by the listeners?

pcastroadc avatar Aug 05 '22 12:08 pcastroadc

no, it will not be picked by message listener.

Thanks, Sonu Kumar Linkedin https://in.linkedin.com/in/sonus21 GitHub https://github.com/sonus21

On Fri, Aug 5, 2022 at 6:26 PM pcastroadc @.***> wrote:

One more question, Will messages mark as deleted still be picked up by the listeners?

— Reply to this email directly, view it on GitHub https://github.com/sonus21/rqueue/issues/162#issuecomment-1206425417, or unsubscribe https://github.com/notifications/unsubscribe-auth/AC2FCPSJJ6IRDX4RM2PCINTVXUFPPANCNFSM55URTWZA . You are receiving this because you were assigned.Message ID: @.***>

sonus21 avatar Aug 05 '22 16:08 sonus21