kinesis-producer icon indicating copy to clipboard operation
kinesis-producer copied to clipboard

Accept an interface instead of []byte in producer's Put method

Open owais opened this issue 5 years ago • 4 comments

Put expects records to be written to Kinesis as []byte. This works fine for simple cases but often a more complex layer wraps kinesis-producer and needs more control/flexibility over how different cases (such as failures) are handled. For example, a program might want to handle failures in a special way but right now the FailureResult struct only contains the partition key and the raw data as bytes. There is no other identifying information that might help identify the records.

Accepting an interface instead of raw bytes would make the library a lot more flexible. For example,

type Record struct {
      id string
      data []byte
      failure_count int
}

func (r Record) Bytes() []byte {
     return r.data
}

func process(r Record) {
      producer.Put(r, "partition-key")
}

func processErrors() {
   for f := producer.NotifyFailures() {
      r := f.Item.(Record)
      if (r.failure_count > 3) {
           // log failure 
           return
      }
      r.failure_count++
      process(r)
   }
}

Or a record could marshal itself as bytes that kinesis-producer could use internally by calling the Read() method and then wouldn't have to unmarshal the bytes on every failure in order to be able to inspect the record (for specialized logging, retries, failure handling etc).

kinesis-producer API would add an interface definition

type Record interface {
     Read() []byte
}

producer.Put(r Record, k string)

type FailureRecord struct {
     Record Record
     ParitionKey string
}

Or partition key could be rolled into the record as well like:

type Record interface {
      Read() []byte
      PartitionKey() string
}

producer.Put(r Record)

In this case, failure channel would just return failed Records and client side implementation could look like:

type Record struct {
    ID string
    UserID string
    Timestamp time.Time
    // any other number of fields
}

func (r Record) Read() []byte {
    // marshal r and return []bytes
}

func (r Record) PartitionKey() string {
     return r.ID
}

The library could also ship with a simple implementation to cover simple cases. Usage would look like:

pr.Put(producer.Record{myData, myPartitionKey})

owais avatar Nov 19 '18 13:11 owais

I think the general idea is good and useful. I’ll give it more attention tomorrow morning.

a8m avatar Nov 19 '18 21:11 a8m

Has there been any progress on this front? It would be a great feature addition.

guyest avatar Aug 20 '19 12:08 guyest

I didn't have any progress on this, but I'm in favor of adding it, since it's actually needed for other uses cases as well, like #17.

a8m avatar Aug 25 '19 11:08 a8m

this would be quite useful, will look at submitting a PR.

quick question: do we think this should be added using the existing Put or as a new function, e.g. PutRecord?

tiny-dancer avatar Dec 03 '19 22:12 tiny-dancer