flink-connectors icon indicating copy to clipboard operation
flink-connectors copied to clipboard

Reader Group is not deleted

Open claudiofahey opened this issue 5 years ago • 4 comments

Problem description This connector creates a reader group but never deletes it.

Problem location FlinkPravegaReader.java

Suggestions for an improvement FlinkPravegaReader.close() should call ReaderGroupManager.deleteReaderGroup().

claudiofahey avatar Jan 28 '20 15:01 claudiofahey

It's not clear to me that the suggested change is the best course of action. A stream job can be restarted, and if we use the same group name, then it should resume from the same revisioned stream. It is not necessary to delete. But, it is also correct that starting a group from scratch and reseting it to a checkpoint should work, so in principle, deleting it on close should be fine, except on ungraceful shutdowns.

If the application has to delete the reader group out of band in at least one case (ungraceful shutdowns), then would it work that we do not delete on close, require deleting out of band, and make sure that we are using consistent reader group names? Let's talk about it before we jump to conclusions.

fpj avatar Jan 28 '20 15:01 fpj

I support the first approach. It follows the rule "who create who delete" and is enough for restarting with checkpoint-enabled cases. It's not ideal to delete the reader group out of band. There are several reasons.

  1. The naming. Now we are using random strings to identify each reader group. If we still control it, it's hard to get a UUID for each application reading the same stream. Plus, this name should also be known to the users to have the reader group deleted.
    public static String generateRandomReaderGroupName() {
        return "flink" + RandomStringUtils.randomAlphanumeric(20).toLowerCase();
    }
  1. How to do it out of band. We can't do it in Flink applications as it will take no effect in ungraceful shutdowns. I would think it as a cronjob to delete unused reader groups. It's hard to tell if unused and will be an extra burden to the users.

crazyzhou avatar Jan 29 '20 02:01 crazyzhou

If we only have a single reader group through the lifetime of a Flink application, then we only really ever have one reader group to delete, so there is no risk of having orphaned reader groups accumulating. The main question is whether we can fix the naming:

The naming. Now we are using random strings to identify each reader group. If we still control it, it's hard to get a UUID for each application reading the same stream. Plus, this name should also be known to the users to have the reader group deleted.

Could we not rely on random strings and use a well-defined and known name?

I would think it as a cronjob to delete unused reader groups.

How does the cronjob know which reader groups to delete?

It's hard to tell if unused and will be an extra burden to the users.

I need to be convinced that we can really avoid that burden.

fpj avatar Jan 29 '20 10:01 fpj

Could we not rely on random strings and use a well-defined and known name?

As reader group(or say SourceFunction in Flink) is known to the Flink as a single task. Due to the task chaining in Flink, a typical task name (which is the "id" I can get from the connector side) would be something like

Source: Pravega Source -> Map

We can tell it's very likely to have the same name.

These names can also be overwritten by the Flink application developers, of course we can say, hey, please train the developers that they should use the different names for readers in each Flink app, but it's too demanding and not a good solution to me.

We can have another idea that maybe we can name it from the Pravega side, i.e. putting the context of what stream/streamcut it is being read, but it will also get duplicates when two Flink applications reads the same stream (multiple apps reading from one stream is a typical use case I think).

How does the cronjob know which reader groups to delete?

Sorry for the misunderstanding, but this problem is what I'm concerned as an extra burden aforementioned. Deleting the RG in close() is a good approach to avoid the burden.

crazyzhou avatar Jan 31 '20 03:01 crazyzhou