tmail-backend icon indicating copy to clipboard operation
tmail-backend copied to clipboard

[Firebase] Implement push with leaky

Open vttranlina opened this issue 2 years ago • 2 comments

Why

  • In order to avoid too a lot push request to push gateway/ firebase in the short time

How

Implement leaky algorithm

  • PushLeakyPrepareListener: append new WebPushEvent to a repository (redis?), and re-dispatch event contain createdDate info
  • PushWithLeakyListener: hander WebPushEvent, use createdDatefor "delay". Then fetch allWebPushEvent` is available, merge it, and push to gateway with single request.

Here is raw idea:


case class WebPushEvent(eventId: EventId,
                        username: Username,
                        createdDate: Instant,
                        pushPayload: String) extends Event {
  override def getUsername: Username = username

  override def isNoop: Boolean = false

  override def getEventId: EventId = eventId
}

trait WebPushLeakyRepository {

  def appendEvent(event: WebPushEvent): SMono[Unit]

  def fetchAndDelEvents(username: Username): SFlux[WebPushEvent]

  def isQueueAvailable(username: Username): SMono[Boolean]
}

case class RedisWebPushLeakyRepository() extends WebPushLeakyRepository {
  override def appendEvent(event: WebPushEvent): SMono[Unit] = ???

  override def fetchAndDelEvents(username: Username): SFlux[WebPushEvent] = ???

  override def isQueueAvailable(username: Username): SMono[Boolean] = ???
}

case class PushLeakyPrepareListener()(eventBus: EventBus,
                                      clock: Clock,
                                      webPushLeakyRepository: WebPushLeakyRepository) extends ReactiveGroupEventListener {

  override def reactiveEvent(event: Event): Publisher[Void] =
    SMono.fromCallable(() => buildWebPushEvent(event))
      .flatMap(webPushEvent => webPushLeakyRepository.appendEvent(buildWebPushEvent(event))
        .`then`(SMono(eventBus.dispatch(webPushEvent, AccountIdRegistrationKey(???)))))

  override def getDefaultGroup: Group = PushLeakyPrepareListenerGroup()

  private def buildWebPushEvent(event: Event): WebPushEvent = ???
}

case class PushWithLeakyListener()(clock: Clock,
                                   webPushLeakyRepository: WebPushLeakyRepository,
                                   webPushClient: WebPushClient) extends ReactiveGroupEventListener {

  val WINDOW_TIME_IN_SECONDS: Int = 20

  override def isHandling(event: Event): Boolean = event.isInstanceOf[WebPushEvent]

  override def reactiveEvent(event: Event): Publisher[Void] = {
    val webPushEvent: WebPushEvent = event.asInstanceOf[WebPushEvent]

    val delayTime = Math.max(WINDOW_TIME_IN_SECONDS - java.time.Duration.between(clock.instant(), webPushEvent.createdDate).toSeconds, 0)

    SMono(webPushLeakyRepository.isQueueAvailable(event.getUsername))
      .filter(queueIsNotEmpty => queueIsNotEmpty)
      .flatMap(_ => SMono.delay(new DurationInt(delayTime.toInt).seconds)
        .flatMap(_ => webPushLeakyRepository.fetchAndDelEvents(event.getUsername)
          .collectSeq()))
      .flatMap(listWebPushEvent => SMono(webPushClient.push(???, ???)))
      .`then`()
  }

  override def getDefaultGroup: Group = PushLeakyPrepareListenerGroup()
}

case class PushLeakyPrepareListenerGroup() extends Group {}

vttranlina avatar Oct 18 '22 01:10 vttranlina