qmq
qmq copied to clipboard
change LogRecord.class to MessageLogRecord.class to solve the problem…
Hi, guys, I found a problem, after you started delay server, you could see that messages could be written to message_log, but no messages were replayed to schedule_log. I checked the code and found that in LogIterateService#processLog(), then visitor.nextRecord(), readOneRecord(currentBuffer), then in DelayMessageLogVisitor#readOneRecord(SegmentBuffer segmentBuffer), at the last line you could see return LogVisitorRecord.data(new MessageLogRecord(header, recordBytes, wroteOffset, payloadSize, message)), the event.getClass() was MessageLogRecord.
However, in DefaultDelayLogFacade#DefaultDelayLogFacade(final StoreConfiguration config, final Function<ScheduleIndex, Boolean> func), you guys subscribed LogRecord in
bus.subscribe(LogRecord.class, e -> { AppendLogResult<ScheduleIndex> result = appendScheduleLog(e); int code = result.getCode(); if (MessageProducerCode.SUCCESS != code) { LOGGER.error("appendMessageLog schedule log error,log:{} {},code:{}", e.getSubject(), e.getMessageId(), code); throw new AppendException("appendScheduleLogError"); } func.apply(result.getAdditional()); }); bus.subscribe(Record.class, e -> { long checkpoint = e.getStartWroteOffset() + e.getRecordSize(); updateIterateOffset(checkpoint); });
After I fixed the bug and restarted the delay server, it could run correctly.
Please have a review and approve it, thanks.