azure-event-hubs-go icon indicating copy to clipboard operation
azure-event-hubs-go copied to clipboard

Add Receiver Options to EPH

Open elsesiy opened this issue 6 years ago • 13 comments

Expected Behavior

Spinning up a Receiver allows me to specify a lot more opts than it's currently available using the EPH.

e.g. I'd be really interested in using the recently added ReceiveFromTimestamp option.

Actual Behavior

I can't pass any temporal options/offset settings to the EPH. At least, I couldn't find a way to do it.

Environment

  • OS: Mac OS X 10.14.1
  • Go version: go1.11.2 darwin/amd64
  • Version of Library: v1.1.0

elsesiy avatar Dec 13 '18 00:12 elsesiy

This should be pretty simple to plumb via an EPH option. Does that sound good to you?

devigned avatar Dec 13 '18 01:12 devigned

Yes, again exactly what I'm looking for 🥇 Thanks for being so responsive!!

elsesiy avatar Dec 13 '18 01:12 elsesiy

@devigned I started implementing this but stopped since I'm not able to get the tests running on my end so far.

You can check it out here.

Is this how you would've implemented it too? I find it a bit clunky with the checkpointPersister, feel free to suggest how to improve this!

elsesiy avatar Mar 25 '19 02:03 elsesiy

@elsesiy sorry to hear you are not able to get the tests running. What's failing and where did we let you down?

Hmm... I think you are going to run into issues with this approach. If you have multiple EPH processes starting at the same time, they are all going to try to write the same checkpoint without regard to existing checkpoint or other processors.

When I mentioned adding options to EPH, I was thinking the offset option would be stored on the EPH host and used in eph.setup() to set the initial state for a lease that doesn't exist (EnsureCheckpoint). See: https://github.com/Azure/azure-event-hubs-go/blob/783ca54fd1cdf63a7374ee8a788c977740d77b0c/eph/eph.go#L377-L404

What do you think?

devigned avatar Mar 25 '19 18:03 devigned

@devigned Oh yeah that makes much more sense! I won't be able to dedicate much time to this due to other commitments but this is probably the way to go.

elsesiy avatar Mar 27 '19 00:03 elsesiy

@elsesiy @devigned Is this something which you guys are working on ? I was looking for ReceiveFromTimestamp option in EPH but failed to find.

aloysiustany avatar May 15 '19 07:05 aloysiustany

@aloysiustany I'm not actively working on this but it would be definitely useful!

elsesiy avatar May 15 '19 19:05 elsesiy

I haven't had time to work on this to date. It's been on my backlog. I'll see if I can't sneak it in later this week.

devigned avatar May 21 '19 16:05 devigned

@devigned did you get a chance to get to this ?

aloysiustany avatar Sep 24 '19 08:09 aloysiustany

@jhendrixMSFT or @gavinfish any chance you folks could take a look at this?

devigned avatar Sep 24 '19 18:09 devigned

Is there any activity going on w.r.t. this issue?

elsesiy avatar Nov 28 '19 07:11 elsesiy

I also wanted to use ReceiveFromTimestamp with eph. I think I found a workaround with NewStorageLeaserCheckpointer and WithInitialCheckpoint:

leaserCheckpointer, _ := storageLeaser.NewStorageLeaserCheckpointer(
	credential, accountNmae, containerName, env,
	storageLeaser.WithInitialCheckpoint(func() persist.Checkpoint {
		t := time.Now() // whatever you wanted to use with ReceiveFromTimestamp
		// copied from eventhub.ReceiveFromTimestamp
		return persist.NewCheckpoint("", 0, t)
	}),
)
e,  _ := eph.New(ctx, namespace, hubName, tokenProvider, leaserCheckpointer, leaserCheckpointer)
// TODO: register handlers and start

UPDATE: the above only works if there is no data in blob storage.

ryepup avatar Mar 29 '22 20:03 ryepup

After much debugging, here's a workaround that mostly works:


type TimestampCheckpointer struct {
	eph.Checkpointer
	start       persist.Checkpoint
	initialized sync.Map
}

func NewTimestampCheckpointer(t time.Time, inner eph.Checkpointer) *TimestampCheckpointer {
	return &TimestampCheckpointer{
		Checkpointer: inner,
		// copied from eventhub.ReceiveFromTimestamp
		start: persist.NewCheckpoint("", 0, t),
	}
}

func (b *TimestampCheckpointer) UpdateCheckpoint(
	ctx context.Context,
	partitionID string,
	checkpoint persist.Checkpoint) error {
	// record that eph has started a receiver for this partition
	b.initialized.Store(partitionID, true)
	return b.Checkpointer.UpdateCheckpoint(ctx, partitionID, checkpoint)
}

func (b *TimestampCheckpointer) EnsureCheckpoint(ctx context.Context, partitionID string) (persist.Checkpoint, error) {
	_, ok := b.initialized.Load(partitionID)
	if !ok {
		// eph hasn't started a receiver yet
		return b.start, nil
	}

	return b.Checkpointer.EnsureCheckpoint(ctx, partitionID)
}

Then we use that when creating the eph:

t := time.Now() // whatever you wanted to use with ReceiveFromTimestamp
leaserCheckpointer, _ := storageLeaser.NewStorageLeaserCheckpointer(
	credential, accountNmae, containerName, env,
	storageLeaser.WithInitialCheckpoint(func() persist.Checkpoint { return persist.NewCheckpoint("", 0, t) }),
)
e,  _ := eph.New(ctx, namespace, hubName, tokenProvider, leaserCheckpointer, NewTimestampCheckpointer(t, leaserCheckpointer))
// TODO: register handlers and start

This probably still breaks if you have more than one process and leases move back and forth.

ryepup avatar Mar 30 '22 19:03 ryepup