flink-connectors icon indicating copy to clipboard operation
flink-connectors copied to clipboard

Improve fairness of initial allocation of segments

Open EronWright opened this issue 8 years ago • 4 comments

Problem description When the Flink job starts, the readers acquire segments as they start up. The first reader to start is being overly greedy by acquiring all segments and starving the other readers. Rebalancing eventually occurs as other readers come online.

This is an optimization issue not a functional problem.

Problem location FlinkPravegaReader

Suggestions for an improvement The problem is basically a race between reader initialization and segment acquisition. Since the number of subtasks (reader instances) is known when the reader group is created, their names could be pre-registered with the reader group during its initialization. This implies that the names be made stable (issue #16).

Here's a log showing that reader 1 (of 4) acquires all segments before readers 2..4 come online. Also shown is subsequent rebalancing.

2017-05-08 16:58:46,501 28409 [Source: Custom Source (2/4)] INFO  i.p.c.flink.FlinkPravegaReader - Starting Pravega reader 'Source: Custom Source (2/4)' for controller URI tcp://localhost:32770
2017-05-08 16:58:46,506 28414 [flink-akka.actor.default-dispatcher-7] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source (1/4) (4d19a3c2c16c6f6086f6b235f4335dbc) switched from DEPLOYING to RUNNING.
2017-05-08 16:58:46,509 28417 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source (2/4) (86099972408624a6918951a22530243d) switched from DEPLOYING to RUNNING.
2017-05-08 16:58:46,511 28419 [Source: Custom Source (2/4)] INFO  i.p.c.s.impl.EventStreamReaderImpl - EventStreamReaderImpl( id=Source: Custom Source (2/4)) acquiring segments {Segment(scope=scope, streamName=ShlxqXWdXvXzjGdpvZKJ, segmentNumber=2)=10236, Segment(scope=scope, streamName=ShlxqXWdXvXzjGdpvZKJ, segmentNumber=3)=9996, Segment(scope=scope, streamName=ShlxqXWdXvXzjGdpvZKJ, segmentNumber=0)=10308, Segment(scope=scope, streamName=ShlxqXWdXvXzjGdpvZKJ, segmentNumber=1)=10308}
2017-05-08 16:58:46,518 28426 [Source: Custom Source (3/4)] INFO  i.p.c.flink.FlinkPravegaReader - Starting Pravega reader 'Source: Custom Source (3/4)' for controller URI tcp://localhost:32770
2017-05-08 16:58:46,547 28455 [flink-akka.actor.default-dispatcher-6] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source (3/4) (30fd38ce163e51e765d3b236b522c811) switched from DEPLOYING to RUNNING.
2017-05-08 16:58:46,556 28464 [Source: Custom Source (1/4)] INFO  i.p.c.flink.FlinkPravegaReader - Starting Pravega reader 'Source: Custom Source (1/4)' for controller URI tcp://localhost:32770
2017-05-08 16:58:46,558 28466 [Source: Custom Source (4/4)] INFO  i.p.c.flink.FlinkPravegaReader - Starting Pravega reader 'Source: Custom Source (4/4)' for controller URI tcp://localhost:32770
2017-05-08 16:58:46,608 28516 [flink-akka.actor.default-dispatcher-6] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source (4/4) (03d62d7c81dae731e685adeca1175177) switched from DEPLOYING to RUNNING.
2017-05-08 16:58:46,610 28518 [flink-akka.actor.default-dispatcher-6] INFO  o.a.f.r.e.ExecutionGraph - Map -> Sink: Unnamed (1/1) (9aff8650dc230d80764275ab6a5e2dc2) switched from DEPLOYING to RUNNING.
...
2017-05-08 16:58:53,142 25485 [Source: Custom Source (4/4)] DEBUG i.p.c.flink.FlinkPravegaReader - Emitting: 3858
2017-05-08 16:58:53,144 25487 [Source: Custom Source (4/4)] DEBUG i.p.c.flink.FlinkPravegaReader - Emitting: 3859
2017-05-08 16:58:54,723 28631 [Map -> Sink: Unnamed (1/1)] DEBUG i.p.c.f.u.IntSequenceExactlyOnceValidator - IntSequenceExactlyOnceValidator - received element: 3858
2017-05-08 16:58:54,723 28631 [Map -> Sink: Unnamed (1/1)] DEBUG i.p.c.f.u.IntSequenceExactlyOnceValidator - IntSequenceExactlyOnceValidator - received element: 3859
...
2017-05-08 16:59:23,314 25657 [Source: Custom Source (4/4)] INFO  i.p.c.s.impl.EventStreamReaderImpl - EventStreamReaderImpl( id=Source: Custom Source (4/4)) releasing segment Segment(scope=scope, streamName=OwKBLcwfDwfhNkhIqXuK, segmentNumber=2)
2017-05-08 16:59:23,332 25675 [Source: Custom Source (1/4)] INFO  i.p.c.s.impl.EventStreamReaderImpl - EventStreamReaderImpl( id=Source: Custom Source (1/4)) acquiring segments {Segment(scope=scope, streamName=OwKBLcwfDwfhNkhIqXuK, segmentNumber=2)=4572}
...
2017-05-08 16:59:56,142 25485 [Source: Custom Source (1/4)] DEBUG i.p.c.flink.FlinkPravegaReader - Emitting: 4002
2017-05-08 16:59:56,144 25487 [Source: Custom Source (4/4)] DEBUG i.p.c.flink.FlinkPravegaReader - Emitting: 4021
...

EronWright avatar May 09 '17 00:05 EronWright

We can easily add a delay to this acquisition. I think it is currently one second, but we can make that a config parameter.

tkaitchuck avatar May 11 '17 00:05 tkaitchuck

I'd like to make it deterministic using an optional parameter to reader group initialization for 'initial readers'.

EronWright avatar May 11 '17 18:05 EronWright

Those aren't mutually exclusive. But is is normal that there would be a master process that would know the IDs of all the readers? It seems like the IDs would often be generated at process startup.

tkaitchuck avatar May 15 '17 18:05 tkaitchuck

It is normal in Flink's case that all the readers are known upfront.

EronWright avatar May 15 '17 21:05 EronWright