Sync flow: use metadata for last offset fetch and cancel groupCtx
Consider the following scenario where you have a mirror to ClickHouse with 2 replicas:
- Normalize takes a while due to wide tables on ClickHouse with some MVs on them and so let's say normalize is 10 batches behind sync flow.
- Normalize is currently loading 10 batches of avro files into the raw table, and these queries run on replica A.
- While this is happening, sync flow tries to open a destination connection to call destinationConnector.GetLastOffset, but we try to open a connection to replica B.
- Let's say at the time we do the above 3rd step, replica B is down for some reason
In this case, sync flow never retries because normalizeLoop is stuck on normalizing batches and so does not get the chance to see that the syncDone channel is closed (the closing is done when sync flow errors out, in this case due to last offset erroring) And so we get stuck on the errgroup waiting on normalize to finish -- so we never retry sync flow in this case
This PR:
- Instead calls metadata.GetLastOffset rather than calling this method through connector construction which opens connections to the data stores -- an unneeded step when we just want to get something from catalog
- Makes groupCtx cancelled if sync flow errors out. The two goroutines which rely on it are: i) maintainReplConn which does not error out if groupCtx is cancelled ii) normalizeLoop which anyway will exit because of syncDone
The whole purpose of cancelling groupCtx when syncErr != nil is so that we hit this branch which leads to syncflow activity retrying:
if groupCtx.Err() != nil {
// need to return ctx.Err(), avoid returning syncErr that's wrapped context canceled
break
}
seems like we should be running the sync loop as another waitGroup.Go so it can return syncErr to cancel the errgroup