datafusion-ballista icon indicating copy to clipboard operation
datafusion-ballista copied to clipboard

feat: Improve fetch partition performance, support skip validation arrow ipc files

Open westhide opened this issue 9 months ago • 7 comments

Which issue does this PR close?

Closes #1189.

Rationale for this change

What changes are included in this PR?

Are there any user-facing changes?

westhide avatar Mar 28 '25 16:03 westhide

Thanks @westhide this PR makes sense,

It also brings configurable validation option, which I did not really think about. I have few questions, for which I do not have definitive answer, so I'd like to hear your opinion:

  1. In which cases do we want to enable validation?
  2. If we want to have option to enable/disable it, should that be decision of who ever operates scheduler/executor rather than connected client?

milenkovicm avatar Mar 30 '25 06:03 milenkovicm

Thanks @westhide this PR makes sense,

It also brings configurable validation option, which I did not really think about. I have few questions, for which I do not have definitive answer, so I'd like to hear your opinion:

  1. In which cases do we want to enable validation?
  2. If we want to have option to enable/disable it, should that be decision of who ever operates scheduler/executor rather than connected client?

Q1: As the BallistaFlightService keep listenning on each Executor,writting it allow client to send a do_get request, and without check FetchPartition action's path is created by shuffle writer, so the client can try to read any file on the executor. In this scene, we can enable validation.

Q2: I'm not sure. As currently we just read ipc file created by ShuffleWriterExec, it's safe to skip all validation.

Addition

Should we consider the power down scene? It may cause Arrow ipc file broken if ShuffleWriterExec is writting. Maybe we can support Job recover and reuse the partition file in the future.

westhide avatar Mar 30 '25 07:03 westhide

Q1: As the BallistaFlightService keep listenning on each Executor,writting it allow client to send a do_get request, and without check FetchPartition action's path is created by shuffle writer, so the client can try to read any file on the executor. In this scene, we can enable validation.

I guess we can safely assume that only shuffle files are accessed, not a random files.

Q2: I'm not sure. As currently we just read ipc file created by ShuffleWriterExec, it's safe to skip all validation.

what I was having in mind is enabling unsafe (without validation) by default but having a executor configuration switch which could revert this. It might be easy to cover case with arrow flight but a bit harder when ShuffleReader reads local shuffle file

Addition

Should we consider the power down scene? It may cause Arrow ipc file broken if ShuffleWriterExec is writting. Maybe we can support Job recover and reuse the partition file in the future.

Im not sure this case can be achieved with this validation? how can we be sure that file is written fully? I guess we'd need some kind of checksum for this scenario.

milenkovicm avatar Mar 30 '25 18:03 milenkovicm

Q1: As the BallistaFlightService keep listenning on each Executor,writting it allow client to send a do_get request, and without check FetchPartition action's path is created by shuffle writer, so the client can try to read any file on the executor. In this scene, we can enable validation.

I guess we can safely assume that only shuffle files are accessed, not a random files.

Q2: I'm not sure. As currently we just read ipc file created by ShuffleWriterExec, it's safe to skip all validation.

what I was having in mind is enabling unsafe (without validation) by default but having a executor configuration switch which could revert this. It might be easy to cover case with arrow flight but a bit harder when ShuffleReader reads local shuffle file

Addition

Should we consider the power down scene? It may cause Arrow ipc file broken if ShuffleWriterExec is writting. Maybe we can support Job recover and reuse the partition file in the future.

Im not sure this case can be achieved with this validation? how can we be sure that file is written fully? I guess we'd need some kind of checksum for this scenario.

All right, will move the validation config to Executor.

And will try to add a checksum validation with failed Job rerun after finish the wasm udf.

westhide avatar Apr 04 '25 06:04 westhide

hey @westhide are you still interested to get this PR merged?

milenkovicm avatar Apr 18 '25 21:04 milenkovicm

moving to draft as its waiting for changes

milenkovicm avatar Apr 21 '25 21:04 milenkovicm

As far as I can see, we don't have to validate the IPC files:

  • Ballista has control over writing the output
  • In a power down scenario where the file is being written but the stage is not yet completed, we will / should not read the resulting (incomplete) files. As the stage is never completed, the files may be ignored (or cleaned up).

Maybe we can support Job recover and reuse the partition file in the future

I think it makes sense to support job recovery either by stage or by completed task rather than per IPC file. Supporting it by partial IPC file seems it might be complicated, as it keep track of input and execution state, which is lost when powering down as well.

Dandandan avatar Jun 01 '25 10:06 Dandandan