iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

Support for incremental reads using Spark SQL proposal

Open igorcalabria opened this issue 2 years ago • 5 comments

Feature Request / Improvement

Hi everyone,

I asked around in slack and @singhpk234 pointed me towards https://github.com/apache/spark/pull/34072 where spark rejected the idea of passing DataFrame options via hints, which was a reasonable decision(in my option). The funny thing is that in that thread a user proposed using table valued functions as an alternative and it matched my initial idea. Instead of using hints to pass read options, one could simply use a table function like this

SELECT * FROM incremental_read('table_name', 'snapshot-start', 'snapshot-end')

Or something more generic

SELECT * FROM with_options('table_name', 'option1-key', 'option1-value', 'option2-key', 'option2-value')

I'm terrible with naming things so please ignore the function names :smile:. This syntax is a bit odd, but it's present on other databases too

From initial testing, implementing a function for this is surprisingly easy. We just have to inject a table function https://spark.apache.org/docs/3.3.0/api/java/org/apache/spark/sql/SparkSessionExtensions.html#injectTableFunction-scala.Tuple3- that returns an UnresolvedRelation with the injected options.

Simple proof of concept(using java, so just ignore the weird scala conversions):

public class Extensions implements Function1<SparkSessionExtensions, BoxedUnit> {
    @Override
    public BoxedUnit apply(SparkSessionExtensions sparkSessionExtensions) {
        FunctionIdentifier identifer = FunctionIdentifier.apply("incremental_read");
        ExpressionInfo info = new ExpressionInfo("noclass", "incremental_read");
        sparkSessionExtensions.injectTableFunction(Tuple3.apply(identifer, info, new Function1<Seq<Expression>, LogicalPlan>() {
            @Override
            public LogicalPlan apply(Seq<Expression> children) {
                if (children.size() != 1) {
                    throw new RuntimeException("Wrong number of arguments my dude");
                }
                var identifierExpression = children.apply(0);

                if (!identifierExpression.foldable()) {
                    throw new RuntimeException("Only constant arguments supported my dude");
                }

                if (!identifierExpression.dataType().sameType(DataTypes.StringType)) {
                    throw new RuntimeException("Only string types my dude");
                }

                String table = ((UTF8String)identifierExpression.eval(InternalRow.empty())).toString();
                Seq<String> tableIdentifier = CollectionConverters.CollectionHasAsScala(Arrays.asList(table.split("\\."))).asScala().toSeq();
                HashMap<String, String> options = new HashMap<>();
                options.put("start-snapshot-id", "3807160769970787368"); // hard coded for simplicity
                return new UnresolvedRelation(tableIdentifier, new CaseInsensitiveStringMap(options), false);
            }
        }));

        return null;
    }
}

The best thing about this is that it doesn't even need changes to iceberg or spark. One could load this extension separately, but I do think that incremental reads are a very important part of iceberg's API and shouldn't be left out of the SQL syntax.

I'd love to hear opinions on this strategy and if (or when) we all agree on something, I could provide a PR for this.

Cheers,

Query engine

Spark

igorcalabria avatar Aug 19 '22 20:08 igorcalabria

Incremental read is fine when you have append only tables, but starts to get tricky for tables with deletes. @marton-bod did some experimentation with it in Hive, but we did not continued with it after we saw the complexity caused by the deletes and the SQL specification of such reads.

pvary avatar Sep 15 '22 05:09 pvary

Hi, I'm sorry but I'm not really sure if I understood your comment. Iceberg already supports incremental reads (https://iceberg.apache.org/docs/latest/spark-queries/#incremental-read) and it only works on append only tables:

Currently gets only the data from append operation. Cannot support replace, overwrite, delete operations. Incremental read works with both V1 and V2 format-version. Incremental read is not supported by Spark’s SQL syntax.

The proposal here is about exposing the functionality that's already there to the SQL syntax, not expanding it with support for more operations.

igorcalabria avatar Sep 15 '22 10:09 igorcalabria

Yep, the work we did was exactly that, to expose this existing Iceberg functionality via SQL syntax. See PR: https://github.com/apache/hive/pull/3222

It was ultimately abandoned, because we wanted to reuse the SQL standard FOR SYSTEM TIME BETWEEN ... AND ...., but the semantics of what Iceberg returns (appends that happened during this time period) is different than the semantics of the SQL standard (returns all rows that were active during this time period - so rows could have been appended before the time window as well). At the time we found no justification for introducing a new, Hive-specific SQL syntax for this.

marton-bod avatar Sep 15 '22 14:09 marton-bod

@marton-bod That makes sense now, thanks!

In my case, we're not really adding new syntax(table functions are already part of spark's SQL). Depending on the approach it might not even be specific to incremental reads either, it's just a mechanism to pass read options to the data source.

igorcalabria avatar Sep 15 '22 16:09 igorcalabria

@igorcalabria: Let’s wait for the Spark team members to chime in then.

pvary avatar Sep 15 '22 20:09 pvary

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] avatar Mar 16 '23 00:03 github-actions[bot]

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'

github-actions[bot] avatar Apr 07 '23 00:04 github-actions[bot]