spark-rapids
spark-rapids copied to clipboard
Fix read old data from alluxio regardless of S3 changes when using CONVERT_TIME replacement algorithm
Closes #6779
Root cause
There are 2 ways to create InMemoryFileIndex
:
First
val fi = new InMemoryFileIndex(
relation.sparkSession,
inputFiles, // This is actually sub files under the root path
val replacedPathsAndIndicator = rootPaths.map(replaceFunc.get)
...
val fi = new InMemoryFileIndex(
relation.sparkSession,
replacedPaths, // This is actually replaced root path.
For the second case, actually filled no sub-files, this will depend on InMemoryFileIndex
listing leaf files.
As you know Alluxio by default does not sync the file lists(It just lists Alluxio local cache files), so will query old data from Alluxio.
// log is like:
INFO InMemoryFileIndex: Start listing leaf files and directories. Size of Paths: 0; threshold: 32
DEBUG FileSystemMasterClient: Exit (OK): ListStatus(path=/liangcail/chongg-test,options=loadMetadataType: ONCE
commonOptions {
syncIntervalMs: -1
ttl: -1
ttlAction: DELETE
}
loadMetadataOnly: false
) in 5 ms
Solution
-
This PR passes all the sub-files to
InMemoryFileIndex
, the file list is from the s3 metadata, considering 3 metadata changes: S3 Add: the passed-in sub-files contain this file, Alluxio will pull from UFS. S3 Delete: the passed-in sub-files do not contain it, and thus will not take effect. S3 Update the same file: this will also read old data. Because the updates are rare cases, so should just add Alluxio sync at a 1-hour interval: alluxio.user.file.metadata.sync.interval=3600s -
Keep the original code, just add sync configure. In order to trace the Add/Delete/Update change, should set a relative small sync interval, like: alluxio.user.file.metadata.sync.interval=60s
I prefer the 1st solution.
Signed-off-by: Chong Gao [email protected]