connectors icon indicating copy to clipboard operation
connectors copied to clipboard

Improve deterministic guarantees implementations, or at least update API docs

Open scottsand-db opened this issue 3 years ago • 18 comments

  • DeltaLog.getChanges - are the actions inside a given VersionLog in determinist order? If not, can we make them so?
  • Snapshot.getAllFiles - are the AddFiles in deterministic order? If not, can we make them so?
  • DeltaScan.getFiles - are the AddFiles in deterministic order? If not, can we make them so?

scottsand-db avatar Jan 13 '22 17:01 scottsand-db

  • DeltaLog.getChanges: Yes.
  • Snapshot.getAllFiles: No. We don't want to add the guarantee so that we can optimize how to read transaction logs.
  • DeltaScan.getFiles: No. We don't want to add the guarantee so that we can optimize how to read transaction logs.

zsxwing avatar Jan 13 '22 18:01 zsxwing

@zsxwing if it could benefit connectors ... is it worth adding APIs, or adding optional parameters, that if set ensure that we do have some sort of order guarantee?

e.g. reading deterministic actions through an iterator could be useful for failure recovery for the Flink Source connector.

scottsand-db avatar Jan 13 '22 19:01 scottsand-db

e.g. reading deterministic actions through an iterator could be useful for failure recovery for the Flink Source connector.

This means we need to sort by something before returning the result. It means we would need to load all actions to memory.

zsxwing avatar Jan 13 '22 20:01 zsxwing

Fair. Can't we depend on the order guarantees of the other APIs we use? e.g. if listFiles returns them in order ... and we read each action in the .json file line by line ... shouldn't that guarantee deterministic order?

scottsand-db avatar Jan 13 '22 21:01 scottsand-db

if listFiles returns them in order ... and we read each action in the .json file line by line ... shouldn't that guarantee deterministic order?

For example, a snapshot version 10 can be loaded from:

  1. 10.checkpoint
  2. 0.json, 1.json, ..., 10.json.

We won't be able to know the original order of files in the checkpoint when they were in 0.json, 1.json, ..., 10.json.

zsxwing avatar Jan 13 '22 22:01 zsxwing

Makes sense. Thanks

scottsand-db avatar Jan 18 '22 17:01 scottsand-db

if listFiles returns them in order ... and we read each action in the .json file line by line ... shouldn't that guarantee deterministic order?

For example, a snapshot version 10 can be loaded from:

  1. 10.checkpoint
  2. 0.json, 1.json, ..., 10.json.

We won't be able to know the original order of files in the checkpoint when they were in 0.json, 1.json, ..., 10.json.

Can we make the checkpoint content files order. The current checkPoint content = last checkPoint + recent transaction logs.

horizonzy avatar Sep 09 '22 11:09 horizonzy

@horizonzy - when we create the checkpoint, we use snapshot.allFilesScala (see Checkpoints.scala). To generate snapshot.allFilesScala, we perform an in-memory-log-replay, where we keep track of the AddFiles seen so far using a hashMap. To create the allFilesScala, we get an iterable from that hash map. Thus, ordering is not guaranteed.

So I don't see a way to make the checkpoint contain files in order.

scottsand-db avatar Sep 12 '22 16:09 scottsand-db

Change (transactions,activeFiles,tombstones) type to LinkedHashMap in InMemoryLogReplay may make it.

horizonzy avatar Sep 12 '22 17:09 horizonzy

@horizonzy - perhaps. But this is only one delta client. We don't know who wrote the previous json files or checkpoints.

If another delta client wrote the previous checkpoint, and mixed up the order of files, doesn't that defeat the purpose?

scottsand-db avatar Sep 12 '22 18:09 scottsand-db

It doesn't matter, we just keep the order to be consistent with the order in the json file, rather unpredictable order.

horizonzy avatar Sep 12 '22 18:09 horizonzy

We cannot enforce any ordering on the checkpoint files. A checkpoint may contain multiple files, and it's up to the writer to decide how to split a checkpoint.

zsxwing avatar Sep 12 '22 18:09 zsxwing

A checkpoint may contain multiple files, and it's up to the writer to decide how to split a checkpoint.

The chenkpoint content is from transactions, we just add all the actions from the first transaction's first action to the last transaction ac's last action.

horizonzy avatar Sep 13 '22 02:09 horizonzy

The chenkpoint content is from transactions, we just add all the actions from the first transaction's first action to the last transaction ac's last action.

The current (protocol)[https://github.com/delta-io/delta/blob/master/PROTOCOL.md] says nothing about the order. And there is no order guarantee in the current checkpoint writer implementation. We cannot add the order requirement when the checkpoints in existing tables cannot satisfy the requirement.

If we want to provide an API in Delta Standalone, we have to sort actions in some deterministic order.

zsxwing avatar Sep 13 '22 05:09 zsxwing

We cannot add the order requirement when the checkpoints in existing tables cannot satisfy the requirement.

Yes, the old data is difficult to compatible.

If we want to provide an API in Delta Standalone, we have to sort actions in some deterministic order.

We follow the order of the content order in transactions.

Example:

00000000000000000000.json

{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"6ce78fa5-c24a-41c2-a350-10ffd40004c7","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1662709324810}}
{"add":{"path":"part-00000-bfa47503-f267-422d-b495-6bf8f775c3f3-c000.snappy.parquet","partitionValues":{},"size":296,"modificationTime":1662709326234,"dataChange":true,"stats":"{\"numRecords\":0,\"minValues\":{},\"maxValues\":{},\"nullCount\":{}}"}}
{"add":{"path":"part-00001-516ff7ee-14af-4386-9516-d083ea917f3e-c000.snappy.parquet","partitionValues":{},"size":478,"modificationTime":1662709326645,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":0},\"maxValues\":{\"id\":0},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":"part-00003-6f99edaf-31eb-4d14-99d7-8adac6bf8da8-c000.snappy.parquet","partitionValues":{},"size":478,"modificationTime":1662709326645,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":1},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":"part-00005-09b48bc3-f629-4003-b663-a906c2d36cf5-c000.snappy.parquet","partitionValues":{},"size":478,"modificationTime":1662709326645,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2},\"maxValues\":{\"id\":2},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":"part-00007-f386b922-92de-4c02-9674-6094cf61a799-c000.snappy.parquet","partitionValues":{},"size":478,"modificationTime":1662709326645,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":3},\"maxValues\":{\"id\":3},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":"part-00009-fe41f0c5-3bac-43c8-8bcd-82ac9d73cea1-c000.snappy.parquet","partitionValues":{},"size":478,"modificationTime":1662709326645,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":4},\"maxValues\":{\"id\":4},\"nullCount\":{\"id\":0}}"}}
{"commitInfo":{"timestamp":1662709326676,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"6","numOutputRows":"5","numOutputBytes":"2686"},"engineInfo":"Apache-Spark/3.2.1 Delta-Lake/1.2.0","txnId":"02a8ed70-271b-4ad9-b7fa-69a907d8ae66"}}

It contains 6 AddFile, we should follow the order in the tnx content. The next tnx log also followed it.

The order example: json0_AddFile0 -> json0_AddFile1 -> json0_AddFile2 -> json0_AddFile3 -> json0_AddFile4 -> json0_AddFile5 -> json1_AddFile0 -> json1_AddFile1 -> json1_AddFile2 -> json1_AddFile3 -> json1_AddFile4 -> json1_AddFile5 ...

horizonzy avatar Sep 13 '22 05:09 horizonzy

We don't have the original file name of an action when reading a checkpoint. So we cannot sort by the json files. If you propose to add the original file name of an action into the checkpoint files, what should we do with the existing tables not having such information?

zsxwing avatar Sep 13 '22 06:09 zsxwing

We make sure the checkpoint's action is ordered. Every time to generate checkpoint, it will append current snapshot.activeFiles, If the activeFiles always order, the content of checkpoint is always ordered. But there is a not sure point, the snapshot differ action as setTransactions, tombstones, activeFiles, maybe we can't ensure the order between setTransactions, tombstones, activeFiles.

horizonzy avatar Sep 13 '22 06:09 horizonzy

Hi @horizonzy - we would need all clients to do this. Else, when you read a checkpoint, you wouldn't know which client wrote it and if it is sorted. Enforcing all clients to do this would be a protocol change.

We are allowed to propose new protocol changes, but I think you'd have to really argue your case. What benefit do we get from this?

scottsand-db avatar Sep 13 '22 18:09 scottsand-db

I want to filter particular AddFile to read data. I will apeend tag to AddFile, likes (start:0, end:100), that means this AddFile contains the data which is from 0 to 100. If I want to fetch data 50, The AddFile(start:0, end: 100) contains the data I want, then I read the data from AddFile.getPath().

After I read the source code, I find out that it is not work. Although the AddFile is ordered, it still need check the AddFile tag one by one. If there are lots of AddFile (million level), it is inefficient.

I think it should add some index info to search target AddFile faster.

horizonzy avatar Sep 14 '22 13:09 horizonzy

@horizonzy can you partition your data? We provide snapshot.scan(Expression) APIs to let you partition prune.

scottsand-db avatar Sep 14 '22 15:09 scottsand-db

The snapshot.scan(Expression) also check AddFile one by one, it check if AddFile partitionValues is match the expression, it's also inefficient.

horizonzy avatar Sep 14 '22 15:09 horizonzy

We are discussing the new topic about AddFile scanning in https://delta-users.slack.com/archives/CJ70UCSHM/p1663156471536779

zsxwing avatar Sep 14 '22 16:09 zsxwing

@horizonzy what if we added an API/config that sorted the data on read? also, what would be sort it by?

scottsand-db avatar Oct 11 '22 17:10 scottsand-db

This repo has been deprecated and the code is moved under connectors module in https://github.com/delta-io/delta repository. Please create the issue in repository https://github.com/delta-io/delta. See delta-io/connectors#556 for details.

vkorukanti avatar Jul 11 '23 17:07 vkorukanti