kinesis-producer
kinesis-producer copied to clipboard
Accept an interface instead of []byte in producer's Put method
Put expects records to be written to Kinesis as []byte. This works fine for simple cases but often a more complex layer wraps kinesis-producer and needs more control/flexibility over how different cases (such as failures) are handled. For example, a program might want to handle failures in a special way but right now the FailureResult struct only contains the partition key and the raw data as bytes. There is no other identifying information that might help identify the records.
Accepting an interface instead of raw bytes would make the library a lot more flexible. For example,
type Record struct {
id string
data []byte
failure_count int
}
func (r Record) Bytes() []byte {
return r.data
}
func process(r Record) {
producer.Put(r, "partition-key")
}
func processErrors() {
for f := producer.NotifyFailures() {
r := f.Item.(Record)
if (r.failure_count > 3) {
// log failure
return
}
r.failure_count++
process(r)
}
}
Or a record could marshal itself as bytes that kinesis-producer could use internally by calling the Read() method and then wouldn't have to unmarshal the bytes on every failure in order to be able to inspect the record (for specialized logging, retries, failure handling etc).
kinesis-producer API would add an interface definition
type Record interface {
Read() []byte
}
producer.Put(r Record, k string)
type FailureRecord struct {
Record Record
ParitionKey string
}
Or partition key could be rolled into the record as well like:
type Record interface {
Read() []byte
PartitionKey() string
}
producer.Put(r Record)
In this case, failure channel would just return failed Records and client side implementation could look like:
type Record struct {
ID string
UserID string
Timestamp time.Time
// any other number of fields
}
func (r Record) Read() []byte {
// marshal r and return []bytes
}
func (r Record) PartitionKey() string {
return r.ID
}
The library could also ship with a simple implementation to cover simple cases. Usage would look like:
pr.Put(producer.Record{myData, myPartitionKey})
I think the general idea is good and useful. I’ll give it more attention tomorrow morning.
Has there been any progress on this front? It would be a great feature addition.
I didn't have any progress on this, but I'm in favor of adding it, since it's actually needed for other uses cases as well, like #17.
this would be quite useful, will look at submitting a PR.
quick question: do we think this should be added using the existing Put
or as a new function, e.g. PutRecord
?