druid icon indicating copy to clipboard operation
druid copied to clipboard

Use worker number instead of task id in MSQ for communication to/from workers.

Open LakshSingla opened this issue 2 years ago • 2 comments

Description

In the current version of MSQ, workers are bound to a specific task. This is not cool for a fault tolerant engine where worker tasks can be ephemeral, and not bound to a specific task id, and can be respawned.in case the task they are bound to fails. This PR introduces a set of changes and reactors which makes the following possible:

  1. WorkerClient uses workerNumber instead of taskId to communicate between the workerTasks, abstracting away the complexity of resolving the workerNumber to the taskId from the callers.
  2. Since the worker-client during the fault-tolerant mode of execution should always read from the durable storage, once it writes it's outputs to the durable storage, it will add a file with __success in the worker no's output directory for that stage and with its task id (IFF it is not present already). This allows the readers to figure out the worker which has successfully written its outputs to the durable storage and will allow it to differentiate from the partial outputs that might have crept in by orphan/failed worker tasks.

Key changed/added classes in this PR
  • DurableStorageInputChannelFactory
  • DurableStorageOutputChannelFactory

This PR has:

  • [ ] been self-reviewed.
    • [ ] using the concurrency checklist (Remove this item if the PR doesn't have any relation to concurrency.)
  • [ ] added documentation for new or modified features or behaviors.
  • [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • [ ] added or updated version, license, or notice information in licenses.yaml
  • [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • [ ] added integration tests.
  • [ ] been tested in a test Druid cluster.

LakshSingla avatar Sep 09 '22 07:09 LakshSingla

Undrafted the PR, S3 storage connector's ls() needs to be implemented.

LakshSingla avatar Sep 13 '22 13:09 LakshSingla

I have updated the code in the WorkerImpl to do the following (when the durable storage is enabled): Once the data for all the partitions have been generated it does the following things:

  1. Write the data into the path "controller_task_id/stage_a/worker_b/taskId_c/part_d". (Note: there is a slight change from the existing path structure).
  2. Checks the folder "controller_task_id/stage_a/worker_b" for a file named "__success". If present, then it does nothing, else it will write it's task Id in that file.

While reading the data for a particular stage, worker number and partition, the worker then does the following:

  1. Checks the folder "controller_task_id/stage_a/worker_b" for a file named "__success" and attempts to read the task id of the task which successfully wrote to it.
  2. If unable to read it or the file is not present, the worker throws an error.
  3. Else it will fetch the task id present there and then read the data from the location: "controller_task_id/stage_a/worker_b/taskId_c/part_d" where the task id was fetched as above.

LakshSingla avatar Sep 19 '22 10:09 LakshSingla

Thanks for the contribution @LakshSingla

cryptoe avatar Nov 03 '22 04:11 cryptoe