hbase icon indicating copy to clipboard operation
hbase copied to clipboard

HBASE-29699 Scan#setLimit ignored in MapReduce jobs

Open junegunn opened this issue 1 month ago • 13 comments

https://issues.apache.org/jira/browse/HBASE-29699

junegunn avatar Nov 03 '25 05:11 junegunn

:confetti_ball: +1 overall

Vote Subsystem Runtime Logfile Comment
+0 :ok: reexec 2m 11s Docker mode activated.
_ Prechecks _
+1 :green_heart: dupname 0m 0s No case conflicting files found.
+0 :ok: codespell 0m 0s codespell was not available.
+0 :ok: detsecrets 0m 0s detect-secrets was not available.
+0 :ok: buf 0m 0s buf was not available.
+0 :ok: buf 0m 0s buf was not available.
+1 :green_heart: @author 0m 0s The patch does not contain any @author tags.
+1 :green_heart: hbaseanti 0m 0s Patch does not have any anti-patterns.
_ master Compile Tests _
+0 :ok: mvndep 0m 17s Maven dependency ordering for branch
+1 :green_heart: mvninstall 5m 12s master passed
+1 :green_heart: compile 2m 55s master passed
+1 :green_heart: checkstyle 0m 57s master passed
+1 :green_heart: spotbugs 4m 51s master passed
+1 :green_heart: spotless 1m 11s branch has no errors when running spotless:check.
_ Patch Compile Tests _
+0 :ok: mvndep 0m 13s Maven dependency ordering for patch
+1 :green_heart: mvninstall 4m 50s the patch passed
+1 :green_heart: compile 2m 47s the patch passed
+1 :green_heart: cc 2m 47s the patch passed
+1 :green_heart: javac 2m 47s the patch passed
+1 :green_heart: blanks 0m 0s The patch has no blanks issues.
+1 :green_heart: checkstyle 0m 53s the patch passed
+1 :green_heart: spotbugs 5m 19s the patch passed
+1 :green_heart: hadoopcheck 14m 49s Patch does not cause any errors with Hadoop 3.3.6 3.4.1.
+1 :green_heart: hbaseprotoc 1m 37s the patch passed
+1 :green_heart: spotless 1m 4s patch has no errors when running spotless:check.
_ Other Tests _
+1 :green_heart: asflicense 0m 33s The patch does not generate ASF License warnings.
59m 16s
Subsystem Report/Notes
Docker ClientAPI=1.48 ServerAPI=1.48 base: https://ci-hbase.apache.org/job/HBase-PreCommit-GitHub-PR/job/PR-7432/1/artifact/yetus-general-check/output/Dockerfile
GITHUB PR https://github.com/apache/hbase/pull/7432
Optional Tests dupname asflicense javac spotbugs checkstyle codespell detsecrets compile hadoopcheck hbaseanti spotless cc buflint bufcompat hbaseprotoc
uname Linux 019599eec802 6.8.0-1024-aws #26~22.04.1-Ubuntu SMP Wed Feb 19 06:54:57 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Build tool maven
Personality dev-support/hbase-personality.sh
git revision master / 42a8c059894673f9adfe602b198b2cb18f38436a
Default Java Eclipse Adoptium-17.0.11+9
Max. process+thread count 71 (vs. ulimit of 30000)
modules C: hbase-protocol-shaded hbase-client hbase-mapreduce U: .
Console output https://ci-hbase.apache.org/job/HBase-PreCommit-GitHub-PR/job/PR-7432/1/console
versions git=2.34.1 maven=3.9.8 spotbugs=4.7.3
Powered by Apache Yetus 0.15.0 https://yetus.apache.org

This message was automatically generated.

Apache-HBase avatar Nov 03 '25 07:11 Apache-HBase

:confetti_ball: +1 overall

Vote Subsystem Runtime Logfile Comment
+0 :ok: reexec 2m 13s Docker mode activated.
-0 :warning: yetus 0m 3s Unprocessed flag(s): --brief-report-file --spotbugs-strict-precheck --author-ignore-list --blanks-eol-ignore-file --blanks-tabs-ignore-file --quick-hadoopcheck
_ Prechecks _
_ master Compile Tests _
+0 :ok: mvndep 0m 18s Maven dependency ordering for branch
+1 :green_heart: mvninstall 5m 42s master passed
+1 :green_heart: compile 2m 17s master passed
+1 :green_heart: javadoc 1m 13s master passed
+1 :green_heart: shadedjars 9m 12s branch has no errors when building our shaded downstream artifacts.
_ Patch Compile Tests _
+0 :ok: mvndep 0m 16s Maven dependency ordering for patch
+1 :green_heart: mvninstall 4m 54s the patch passed
+1 :green_heart: compile 1m 49s the patch passed
+1 :green_heart: javac 1m 49s the patch passed
+1 :green_heart: javadoc 0m 57s the patch passed
+1 :green_heart: shadedjars 8m 37s patch has no errors when building our shaded downstream artifacts.
_ Other Tests _
+1 :green_heart: unit 0m 54s hbase-protocol-shaded in the patch passed.
+1 :green_heart: unit 3m 0s hbase-client in the patch passed.
+1 :green_heart: unit 41m 35s hbase-mapreduce in the patch passed.
85m 8s
Subsystem Report/Notes
Docker ClientAPI=1.48 ServerAPI=1.48 base: https://ci-hbase.apache.org/job/HBase-PreCommit-GitHub-PR/job/PR-7432/1/artifact/yetus-jdk17-hadoop3-check/output/Dockerfile
GITHUB PR https://github.com/apache/hbase/pull/7432
Optional Tests javac javadoc unit compile shadedjars
uname Linux 1734650b9b51 6.8.0-1024-aws #26~22.04.1-Ubuntu SMP Wed Feb 19 06:54:57 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Build tool maven
Personality dev-support/hbase-personality.sh
git revision master / 42a8c059894673f9adfe602b198b2cb18f38436a
Default Java Eclipse Adoptium-17.0.11+9
Test Results https://ci-hbase.apache.org/job/HBase-PreCommit-GitHub-PR/job/PR-7432/1/testReport/
Max. process+thread count 1578 (vs. ulimit of 30000)
modules C: hbase-protocol-shaded hbase-client hbase-mapreduce U: .
Console output https://ci-hbase.apache.org/job/HBase-PreCommit-GitHub-PR/job/PR-7432/1/console
versions git=2.34.1 maven=3.9.8
Powered by Apache Yetus 0.15.0 https://yetus.apache.org

This message was automatically generated.

Apache-HBase avatar Nov 03 '25 07:11 Apache-HBase

When we run an MR job or a Spark job, we define how we want to scan a table or a snapshot via a Scan object, serialize it into a String, and set it as the hbase.mapreduce.scan (TableInputFormat.SCAN) property of a Configuration.

https://github.com/apache/hbase/blob/f800a13a3575be3d70537f40c3c21171ce4b2603/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java#L154-L193

TableInputFormat and TableSnapshotInputFormat then deserialize this back into a Scan.

junegunn avatar Nov 03 '25 15:11 junegunn

OK, so the problem is that, we do not store limit in the serialized scan object, so when deserializing, we can not get the limit.

But the design of the client API is that, we do not pass the actual limit through the scan object, since it is the global limit, not per region, so we need to calculate the remaining limit for each region and pass it to region server.

Maybe a possible way is to add a new config field where we serialize the Scan object with json? In this way we can add all the fields we want.

Thanks.

Apache9 avatar Nov 04 '25 02:11 Apache9

Thanks for the comment.

OK, so the problem is that, we do not store limit in the serialized scan object, so when deserializing, we can not get the limit.

Correct. When you pass a Scan with a custom limit to an MR job, you would expect each mapper to return at most that number of rows, but instead you end up getting all records in the table.

since it is the global limit, not per region

I assumed that users advanced enough to run MR/Spark jobs with HBase would already understand that, in that context, each partition (region) runs its own Scan in parallel, and that setLimit applies locally to each partition. Analogous to how you can't enforce a global limit using PageFilter, etc. But I could be wrong, maybe I just know too much about the quirks and limitations :)

When this patch is applied, users might be surprised to see that an MR job using setLimit returns "limit × regions" number of rows, but I think that's still better than having setLimit silently ignored.

As for storing the limit value in the serialized field, I don't see any problem with it. It might look a bit redundant, but it's harmless because it's not used anywhere else (please correct me if I'm wrong) and serves as a more accurate description of the original Scan itself.

junegunn avatar Nov 04 '25 05:11 junegunn

Thanks for the comment.

OK, so the problem is that, we do not store limit in the serialized scan object, so when deserializing, we can not get the limit.

Correct. When you pass a Scan with a custom limit to an MR job, you would expect each mapper to return at most that number of rows, but instead you end up getting all records in the table.

since it is the global limit, not per region

I assumed that users advanced enough to run MR/Spark jobs with HBase would already understand that, in that context, each partition (region) runs its own Scan in parallel, and that setLimit applies locally to each partition. Analogous to how you can't enforce a global limit using PageFilter, etc. But I could be wrong, maybe I just know too much about the quirks and limitations :)

When this patch is applied, users might be surprised to see that an MR job using setLimit returns "limit × regions" number of rows, but I think that's still better than having setLimit silently ignored.

OK, I think this is the reason that why we do not consider scan limit in the past, under a parallel execution scenario, a global limit does not make sense. The rowsLimitPerSplit configuration is better as it says 'per split', so maybe we should also introduce a config in TableInputFormat? And when serializing the Scan object, if we find users setting scan limit, we log a warn message to tell users that this will not work?

As for storing the limit value in the serialized field, I don't see any problem with it. It might look a bit redundant, but it's harmless because it's not used anywhere else (please correct me if I'm wrong) and serves as a more accurate description of the original Scan itself.

It will increase the message size for Scan object and does not bring any advantages for Scan, so for me I prefer we do not add it if possible...

Apache9 avatar Nov 04 '25 08:11 Apache9

It will increase the message size for Scan object and does not bring any advantages for Scan, so for me I prefer we do not add it if possible...

Fair enough, however small, it's still wasteful. If I'm not mistaken, the serialized size does not increase if we don't set the optional field. So how about overloading ProtobufUtil.toScan with a version that skips the field, and using that in normal scans (in RequestConverter.buildScanRequest)? It would benefit MR users without introducing unnecessary overhead to normal scans. If that sounds reasonable, I can update the patch; otherwise, I'd say we leave this as a known issue and close it as "wontfix".

junegunn avatar Nov 04 '25 14:11 junegunn

Oh, there are some formatting problem with my reply so maybe you missed part of it...

As I explained above, scan limit is a global limit, under parallel scenario a global limit does not make sense...

The rowsLimitPerSplit configuration is better as it says 'per split', so maybe we should also introduce a config in TableInputFormat? And when serializing the Scan object, if we find users setting scan limit, we log a warn message to tell users that this will not work?

Apache9 avatar Nov 04 '25 14:11 Apache9

Thanks, that is one option. But can we update the documentation of Scan#setLimit to say:

... When used with TableInputFormat or TableSnapshotInputFormat, this limit is applied locally to each split.

From a user’s perspective, even having a per-split limit is still an improvement over the current behavior.

As for introducing a new dedicated option, I'd personally prefer fixing and reusing the existing interface. As long as we clearly document its behavior, I think it's better than introducing another configuration parameter for users to discover. HBase already has too many little-known parameters buried deep in the source code.

junegunn avatar Nov 04 '25 14:11 junegunn

Thanks, that is one option. But can we update the documentation of Scan#setLimit to say:

... When used with TableInputFormat or TableSnapshotInputFormat, this limit is applied locally to each split.

From a user’s perspective, even having a per-split limit is still an improvement over the current behavior.

As for introducing a new dedicated option, I'd personally prefer fixing and reusing the existing interface. As long as we clearly document its behavior, I think it's better than introducing another configuration parameter for users to discover. HBase already has too many little-known parameters buried deep in the source code.

We'd better follow the same pattern with TableSnapshotInputFormat, where we introduce a configuration to set limit per split, instead of passing it through Scan. This is a more clear solution. Not all users will look deeply into the javadoc and we introduce different meanings when using Scan limit through normal scan and map reduce job if go with your solution, which may confuse our users...

Apache9 avatar Nov 05 '25 14:11 Apache9

I understand your point.

Not all users will look deeply into the javadoc

True, and unfortunately, those users will still complain that Scan#setLimit doesn't work as expected no matter what. So we should:

  • Document the limitation of the method in the javadoc ("this doesn't work with TableInputFormat"),
  • And in case it's overlooked, print a warning message when serializing a Scan with a limit for an MR job.

In order to do that, we need to introduce an internal version of ProtobufUtil.toScan that doesn't print a warning message and use it in RequestConverter.buildScanRequest. However, the public ProtobufUtil.toScan cannot tell if users are setting the new per-split-limit parameter in their configuration, or they are aware of the limitation; cases where the warning message can feel redundant. Users would then have to manually unset the limit to silence it, which is not ideal, so I'm not entirely sure about adding the warning.

introduce different meanings when using Scan limit

I thought it was acceptable, because we already have constructs that behave differently in parallel scenarios (e.g. stateful filters like PageFilter and WhileMatchFilter).

This is because the filter is applied separately on different region servers. https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/filter/PageFilter.html

So I assumed it was already well-understood that a separate Scan operates per split in such cases. But maybe that's just me.

junegunn avatar Nov 06 '25 01:11 junegunn

But we already have a rowsLimitPerSplit for TableSnapshotInputFormat right? So aligning the logic with TableInputFormat and TableSnapshotInputFormat is natural, and we also do not need to care about the serialization problem. The logic will be complicated if we support both(as in this PR).

Apache9 avatar Nov 06 '25 01:11 Apache9

I see. Actually, when I opened this PR I was considering deprecating the row.limit.per.inputsplit configuration, because with Scan#setLimit, we have a unified interface for per-split limit, eliminating the need for two separate configuration parameters:

  • hbase.TableSnapshotInputFormat.row.limit.per.inputsplit
  • hbase.TableInputFormat.row.limit.per.inputsplit

However, I respect your decision if you prefer not to promote ⁠setLimit for per-split limits. In that case the first thing we should do is to document the limitation in the javadoc.

By the way, hbase.TableSnapshotInputFormat.row.limit.per.inputsplit parameter is defined in TableSnapshotInputFormatImpl which is marked @InterfaceAudience.Private.

junegunn avatar Nov 06 '25 02:11 junegunn