azure-event-hubs-go
azure-event-hubs-go copied to clipboard
Add Receiver Options to EPH
Expected Behavior
Spinning up a Receiver allows me to specify a lot more opts than it's currently available using the EPH.
e.g. I'd be really interested in using the recently added ReceiveFromTimestamp option.
Actual Behavior
I can't pass any temporal options/offset settings to the EPH. At least, I couldn't find a way to do it.
Environment
- OS:
Mac OS X 10.14.1 - Go version:
go1.11.2 darwin/amd64 - Version of Library:
v1.1.0
This should be pretty simple to plumb via an EPH option. Does that sound good to you?
Yes, again exactly what I'm looking for 🥇 Thanks for being so responsive!!
@devigned I started implementing this but stopped since I'm not able to get the tests running on my end so far.
You can check it out here.
Is this how you would've implemented it too? I find it a bit clunky with the checkpointPersister, feel free to suggest how to improve this!
@elsesiy sorry to hear you are not able to get the tests running. What's failing and where did we let you down?
Hmm... I think you are going to run into issues with this approach. If you have multiple EPH processes starting at the same time, they are all going to try to write the same checkpoint without regard to existing checkpoint or other processors.
When I mentioned adding options to EPH, I was thinking the offset option would be stored on the EPH host and used in eph.setup() to set the initial state for a lease that doesn't exist (EnsureCheckpoint). See: https://github.com/Azure/azure-event-hubs-go/blob/783ca54fd1cdf63a7374ee8a788c977740d77b0c/eph/eph.go#L377-L404
What do you think?
@devigned Oh yeah that makes much more sense! I won't be able to dedicate much time to this due to other commitments but this is probably the way to go.
@elsesiy @devigned Is this something which you guys are working on ? I was looking for ReceiveFromTimestamp option in EPH but failed to find.
@aloysiustany I'm not actively working on this but it would be definitely useful!
I haven't had time to work on this to date. It's been on my backlog. I'll see if I can't sneak it in later this week.
@devigned did you get a chance to get to this ?
@jhendrixMSFT or @gavinfish any chance you folks could take a look at this?
Is there any activity going on w.r.t. this issue?
I also wanted to use ReceiveFromTimestamp with eph. I think I found a workaround with NewStorageLeaserCheckpointer and WithInitialCheckpoint:
leaserCheckpointer, _ := storageLeaser.NewStorageLeaserCheckpointer(
credential, accountNmae, containerName, env,
storageLeaser.WithInitialCheckpoint(func() persist.Checkpoint {
t := time.Now() // whatever you wanted to use with ReceiveFromTimestamp
// copied from eventhub.ReceiveFromTimestamp
return persist.NewCheckpoint("", 0, t)
}),
)
e, _ := eph.New(ctx, namespace, hubName, tokenProvider, leaserCheckpointer, leaserCheckpointer)
// TODO: register handlers and start
UPDATE: the above only works if there is no data in blob storage.
After much debugging, here's a workaround that mostly works:
type TimestampCheckpointer struct {
eph.Checkpointer
start persist.Checkpoint
initialized sync.Map
}
func NewTimestampCheckpointer(t time.Time, inner eph.Checkpointer) *TimestampCheckpointer {
return &TimestampCheckpointer{
Checkpointer: inner,
// copied from eventhub.ReceiveFromTimestamp
start: persist.NewCheckpoint("", 0, t),
}
}
func (b *TimestampCheckpointer) UpdateCheckpoint(
ctx context.Context,
partitionID string,
checkpoint persist.Checkpoint) error {
// record that eph has started a receiver for this partition
b.initialized.Store(partitionID, true)
return b.Checkpointer.UpdateCheckpoint(ctx, partitionID, checkpoint)
}
func (b *TimestampCheckpointer) EnsureCheckpoint(ctx context.Context, partitionID string) (persist.Checkpoint, error) {
_, ok := b.initialized.Load(partitionID)
if !ok {
// eph hasn't started a receiver yet
return b.start, nil
}
return b.Checkpointer.EnsureCheckpoint(ctx, partitionID)
}
Then we use that when creating the eph:
t := time.Now() // whatever you wanted to use with ReceiveFromTimestamp
leaserCheckpointer, _ := storageLeaser.NewStorageLeaserCheckpointer(
credential, accountNmae, containerName, env,
storageLeaser.WithInitialCheckpoint(func() persist.Checkpoint { return persist.NewCheckpoint("", 0, t) }),
)
e, _ := eph.New(ctx, namespace, hubName, tokenProvider, leaserCheckpointer, NewTimestampCheckpointer(t, leaserCheckpointer))
// TODO: register handlers and start
This probably still breaks if you have more than one process and leases move back and forth.