samza icon indicating copy to clipboard operation
samza copied to clipboard

SAMZA-2235: Move the merging of side inputs with task inputs from the ExecutionPlanner to the JobModelManager

Open prateekm opened this issue 5 years ago • 12 comments

Currently side inputs configured using descriptors are merged with task.inputs in the Planner (JobNodeConfigurationGenerator). For low level API, users are expected to add them to task.inputs manually. Since this can be error prone (easy to forget) and tedious, it'll be better to do this ourselves.

A clean way to get the same behavior consistently across all APIs would be to treat the side inputs and task.inputs configs separately and do the side inputs and task inputs merge at read time when we want to get "all inputs".

prateekm avatar Jun 04 '19 22:06 prateekm

@rmatharu @bharathkk Please take a look.

prateekm avatar Jun 04 '19 22:06 prateekm

@rmatharu @bharathkk This also fixes the issue where jobs want to write a rewriter to generate stores with side inputs, but the rewriter is currently run multiple times.

prateekm avatar Jun 04 '19 22:06 prateekm

Not sure if this will work in all scenarios. In our LI rewriter system expansion, the rewriter would not expand the side input system since it wouldn't be aware of it in the first place (not part of task.inputs any longer) unless the system happens to be one of the input system.

mynameborat avatar Jun 05 '19 01:06 mynameborat

@mynameborat Good point, but we can fix that in the rewriter by making it aware of systems for side inputs (and probably for changelogs too). Would that work?

prateekm avatar Jun 06 '19 17:06 prateekm

@mynameborat Good point, but we can fix that in the rewriter by making it aware of systems for side inputs (and probably for changelogs too). Would that work?

I think that would address the expansion problem. However, I also noticed couple of other scenarios that also relies on task.inputs.

  1. Partition count monitor
  2. Bootstrap and priorities streams uses it indirectly. Even with Ray's refactor, CSM restores side inputs using SystemConsumer which uses default chooser.

If we need to get this change without addressing the above, can we assess the priorities of these scenarios and create follow up tickets appropriately?

mynameborat avatar Jun 07 '19 05:06 mynameborat

@bharathkk Can you clarify? What do we need to address about partition count monitor and chooser for side inputs?

prateekm avatar Jun 07 '19 16:06 prateekm

  1. PartitionCountMonitor gets instantiated with the list of input streams to monitor. ClusterBasedJobCoordinator[1] and ZkJobCoordinator[2] are the ones that construct it and uses task.inputs to determine the input streams. We would regress on the feature that shuts down/restart the container in case of partition count change for side input streams.
  2. Applications can specify priority and bootstrap properties for side inputs like every other stream. These configurations clubbed with task.inputs are used by chooser to determine bootstrap streams[3] and priority streams[4]. If application wants to specify priority across side inputs, it wouldn't be respected.

[1] https://github.com/apache/samza/blob/8621db68b64f3d4f2c1a2aa986e3b1ea0dd80969/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java#L321 [2] https://github.com/apache/samza/blob/8621db68b64f3d4f2c1a2aa986e3b1ea0dd80969/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java#L369 [3] https://github.com/apache/samza/blob/8621db68b64f3d4f2c1a2aa986e3b1ea0dd80969/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java#L54 [4]https://github.com/apache/samza/blob/8621db68b64f3d4f2c1a2aa986e3b1ea0dd80969/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java#L73

mynameborat avatar Jun 07 '19 22:06 mynameborat

@bharathkk re: 1, good point about partition count monitor, let me see if / how we can fix that. re: 2, I thought we moved side inputs out of the run loop to the CSM. Is the chooser bootstrap / priority configuration still relevant? I'd imagine the bootstrap for side inputs is handled by the CSM now, does it not look at the bootstrap configuration itself?

prateekm avatar Jun 10 '19 21:06 prateekm

@bharathkk Since the CSM creates a separate SystemConsumers with a default chooser just for side-inputs, wouldnt that be sufficient to respect priorities across side-inputs? MessageChooser chooser = DefaultChooser.apply(inputStreamMetadata, new RoundRobinChooserFactory(), config, sideInputSystemConsumersMetrics.registry(), systemAdmins);

  sideInputSystemConsumers =
      new SystemConsumers(chooser, ScalaJavaUtil.toScalaMap(this.sideInputConsumers), systemAdmins, serdeManager,
          sideInputSystemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
          TaskConfig.DEFAULT_POLL_INTERVAL_MS, ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()));`

rmatharu-zz avatar Jun 26 '19 17:06 rmatharu-zz

@shanthoosh had a question about whether side inputs are part of the job model and will be accounted for in the partition count expansion check. This PR should ensure that they are.

prateekm avatar Jun 26 '19 17:06 prateekm

@bharathkk Since the CSM creates a separate SystemConsumers with a default chooser just for side-inputs, wouldnt that be sufficient to respect priorities across side-inputs? MessageChooser chooser = DefaultChooser.apply(inputStreamMetadata, new RoundRobinChooserFactory(), config, sideInputSystemConsumersMetrics.registry(), systemAdmins);

  sideInputSystemConsumers =
      new SystemConsumers(chooser, ScalaJavaUtil.toScalaMap(this.sideInputConsumers), systemAdmins, serdeManager,
          sideInputSystemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
          TaskConfig.DEFAULT_POLL_INTERVAL_MS, ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()));`

SystemChooserConfig relies on task.inputs to fetch priority/bootstrap related configs.

mynameborat avatar Jun 26 '19 17:06 mynameborat

@prateekm Do we still need this?

mynameborat avatar Feb 11 '20 22:02 mynameborat