samza
samza copied to clipboard
SAMZA-2235: Move the merging of side inputs with task inputs from the ExecutionPlanner to the JobModelManager
Currently side inputs configured using descriptors are merged with task.inputs in the Planner (JobNodeConfigurationGenerator). For low level API, users are expected to add them to task.inputs manually. Since this can be error prone (easy to forget) and tedious, it'll be better to do this ourselves.
A clean way to get the same behavior consistently across all APIs would be to treat the side inputs and task.inputs configs separately and do the side inputs and task inputs merge at read time when we want to get "all inputs".
@rmatharu @bharathkk Please take a look.
@rmatharu @bharathkk This also fixes the issue where jobs want to write a rewriter to generate stores with side inputs, but the rewriter is currently run multiple times.
Not sure if this will work in all scenarios. In our LI rewriter system expansion, the rewriter would not expand the side input system since it wouldn't be aware of it in the first place (not part of task.inputs any longer) unless the system happens to be one of the input system.
@mynameborat Good point, but we can fix that in the rewriter by making it aware of systems for side inputs (and probably for changelogs too). Would that work?
@mynameborat Good point, but we can fix that in the rewriter by making it aware of systems for side inputs (and probably for changelogs too). Would that work?
I think that would address the expansion problem. However, I also noticed couple of other scenarios that also relies on task.inputs
.
- Partition count monitor
- Bootstrap and priorities streams uses it indirectly. Even with Ray's refactor, CSM restores side inputs using SystemConsumer which uses default chooser.
If we need to get this change without addressing the above, can we assess the priorities of these scenarios and create follow up tickets appropriately?
@bharathkk Can you clarify? What do we need to address about partition count monitor and chooser for side inputs?
- PartitionCountMonitor gets instantiated with the list of input streams to monitor. ClusterBasedJobCoordinator[1] and ZkJobCoordinator[2] are the ones that construct it and uses
task.inputs
to determine the input streams. We would regress on the feature that shuts down/restart the container in case of partition count change for side input streams. - Applications can specify priority and bootstrap properties for side inputs like every other stream. These configurations clubbed with
task.inputs
are used by chooser to determine bootstrap streams[3] and priority streams[4]. If application wants to specify priority across side inputs, it wouldn't be respected.
[1] https://github.com/apache/samza/blob/8621db68b64f3d4f2c1a2aa986e3b1ea0dd80969/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java#L321 [2] https://github.com/apache/samza/blob/8621db68b64f3d4f2c1a2aa986e3b1ea0dd80969/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java#L369 [3] https://github.com/apache/samza/blob/8621db68b64f3d4f2c1a2aa986e3b1ea0dd80969/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java#L54 [4]https://github.com/apache/samza/blob/8621db68b64f3d4f2c1a2aa986e3b1ea0dd80969/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java#L73
@bharathkk re: 1, good point about partition count monitor, let me see if / how we can fix that. re: 2, I thought we moved side inputs out of the run loop to the CSM. Is the chooser bootstrap / priority configuration still relevant? I'd imagine the bootstrap for side inputs is handled by the CSM now, does it not look at the bootstrap configuration itself?
@bharathkk
Since the CSM creates a separate SystemConsumers with a default chooser just for side-inputs, wouldnt that be sufficient to respect priorities across side-inputs?
MessageChooser chooser = DefaultChooser.apply(inputStreamMetadata, new RoundRobinChooserFactory(), config, sideInputSystemConsumersMetrics.registry(), systemAdmins);
sideInputSystemConsumers =
new SystemConsumers(chooser, ScalaJavaUtil.toScalaMap(this.sideInputConsumers), systemAdmins, serdeManager,
sideInputSystemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
TaskConfig.DEFAULT_POLL_INTERVAL_MS, ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()));`
@shanthoosh had a question about whether side inputs are part of the job model and will be accounted for in the partition count expansion check. This PR should ensure that they are.
@bharathkk Since the CSM creates a separate SystemConsumers with a default chooser just for side-inputs, wouldnt that be sufficient to respect priorities across side-inputs?
MessageChooser chooser = DefaultChooser.apply(inputStreamMetadata, new RoundRobinChooserFactory(), config, sideInputSystemConsumersMetrics.registry(), systemAdmins);
sideInputSystemConsumers = new SystemConsumers(chooser, ScalaJavaUtil.toScalaMap(this.sideInputConsumers), systemAdmins, serdeManager, sideInputSystemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(), TaskConfig.DEFAULT_POLL_INTERVAL_MS, ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()));`
SystemChooserConfig relies on task.inputs to fetch priority/bootstrap related configs.
@prateekm Do we still need this?