goka icon indicating copy to clipboard operation
goka copied to clipboard

How to fill up local cache with messages from a custom offset on start up?

Open onkarbanerjee opened this issue 1 year ago • 1 comments

We are using goka to join event streams and it has been running for over two years now. So everytime the pod is restarted, the goka processor starts backfilling with two year old data. Now this is causing an issue with huge memory requirements for the service using the goka processor and also we do not need the two year old data. How can I configure the goka processor to start backfilling from custom offset or timestamp?

onkarbanerjee avatar Jul 06 '23 09:07 onkarbanerjee

Hi @onkarbanerjee,

sorry for late reply! Currently there is no way to configure a goka-processor to stark backfilling from other than the beginning. This is intentional to guarantee integrity of the data. If you wanted to remove old entries in the processor table, then those entries should be deleted by the processor itself using ctx.Delete(). But how to do that is a different story :).

Usually, a processor is not meant to recover (backfill) all data from its table-topic. Instead, the table is persisted on disk and only recovers the changes since the last run. If you have only one instance, there wouldn't be anything to recover after a restart. I'm guessing you're running on kubernetes, so consider the following points:

  • use statefulsets with persistent disks instead of deployments
  • configure a storage directory (default directory for goak is /tmp, which is not persisted)
  • do not use memory-based storage

How to configure a storage directory

When you initialize your processor you will have to configure the storage layer one way or another. Otherwise the data ends up in /tmp/goka, which is not the right place in almost all cases. To get the default storage, with just the folder changed, do

	goka.NewProcessor(
		[]string{"localhost:9092"}, // brokers
		goka.DefineGroup(
			"group",
			/* edges ...*/
		),
		goka.WithStorageBuilder(storage.DefaultBuilder("/path/to/persisted/directory")),
	)

As said, this is required in most cases for initializing processors or views and the documentation is probably lacking that info a bit :)

Let us know if that fixes the issue!

frairon avatar Jul 24 '23 05:07 frairon