tmail-backend
tmail-backend copied to clipboard
[Firebase] Implement push with leaky
Why
- In order to avoid too a lot push request to push gateway/ firebase in the short time
How
Implement leaky algorithm
-
PushLeakyPrepareListener
: append newWebPushEvent
to a repository (redis?), and re-dispatch event containcreatedDate
info -
PushWithLeakyListener
: handerWebPushEvent, use
createdDatefor "delay". Then fetch all
WebPushEvent` is available, merge it, and push to gateway with single request.
Here is raw idea:
case class WebPushEvent(eventId: EventId,
username: Username,
createdDate: Instant,
pushPayload: String) extends Event {
override def getUsername: Username = username
override def isNoop: Boolean = false
override def getEventId: EventId = eventId
}
trait WebPushLeakyRepository {
def appendEvent(event: WebPushEvent): SMono[Unit]
def fetchAndDelEvents(username: Username): SFlux[WebPushEvent]
def isQueueAvailable(username: Username): SMono[Boolean]
}
case class RedisWebPushLeakyRepository() extends WebPushLeakyRepository {
override def appendEvent(event: WebPushEvent): SMono[Unit] = ???
override def fetchAndDelEvents(username: Username): SFlux[WebPushEvent] = ???
override def isQueueAvailable(username: Username): SMono[Boolean] = ???
}
case class PushLeakyPrepareListener()(eventBus: EventBus,
clock: Clock,
webPushLeakyRepository: WebPushLeakyRepository) extends ReactiveGroupEventListener {
override def reactiveEvent(event: Event): Publisher[Void] =
SMono.fromCallable(() => buildWebPushEvent(event))
.flatMap(webPushEvent => webPushLeakyRepository.appendEvent(buildWebPushEvent(event))
.`then`(SMono(eventBus.dispatch(webPushEvent, AccountIdRegistrationKey(???)))))
override def getDefaultGroup: Group = PushLeakyPrepareListenerGroup()
private def buildWebPushEvent(event: Event): WebPushEvent = ???
}
case class PushWithLeakyListener()(clock: Clock,
webPushLeakyRepository: WebPushLeakyRepository,
webPushClient: WebPushClient) extends ReactiveGroupEventListener {
val WINDOW_TIME_IN_SECONDS: Int = 20
override def isHandling(event: Event): Boolean = event.isInstanceOf[WebPushEvent]
override def reactiveEvent(event: Event): Publisher[Void] = {
val webPushEvent: WebPushEvent = event.asInstanceOf[WebPushEvent]
val delayTime = Math.max(WINDOW_TIME_IN_SECONDS - java.time.Duration.between(clock.instant(), webPushEvent.createdDate).toSeconds, 0)
SMono(webPushLeakyRepository.isQueueAvailable(event.getUsername))
.filter(queueIsNotEmpty => queueIsNotEmpty)
.flatMap(_ => SMono.delay(new DurationInt(delayTime.toInt).seconds)
.flatMap(_ => webPushLeakyRepository.fetchAndDelEvents(event.getUsername)
.collectSeq()))
.flatMap(listWebPushEvent => SMono(webPushClient.push(???, ???)))
.`then`()
}
override def getDefaultGroup: Group = PushLeakyPrepareListenerGroup()
}
case class PushLeakyPrepareListenerGroup() extends Group {}