flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-23598] [core] Fix a bug that caused position to be updated twice when writing a string to a byte array

Open all-cloudz opened this issue 2 years ago • 8 comments

What is the purpose of the change

This PR fixes a bug in the DataOutputSerializer's method writeBytes(String) and adds tests. The method writeBytes takes a string as an argument and writes it to a byte array. The bug occurs because the positions in the byte array are updated by writeBytes, even though they are updated by the method writeByte that writeBytes calls. Although this method is not called by any other class in Flink, it has the potential to cause problems, so we've fixed the bug and added related tests.

Brief change log

  • Remove the code that updates the position from the implementation of the method writeBytes
  • Add a test to serialize a given string
  • Add a test to serialize an arbitrary string

Verifying this change

This change added tests and can be verified as follows:

  • Added a test to serialize the string "Hello, World!" and verify that the result is correct.
  • Added a test to serialize an arbitrary string and verify that the result is correct.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: yes
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

all-cloudz avatar Oct 21 '23 09:10 all-cloudz

CI report:

  • dadc6fa86f25d65d195fb2ffebb133ff3a4aedfa Azure: FAILURE
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Oct 21 '23 09:10 flinkbot

Hello, could someone please clarify if this PR requires any further modifications as the PR is open from quite some time now.

sn-12-3 avatar Jul 23 '24 04:07 sn-12-3

Hello, could someone please clarify if this PR requires any further modifications as the PR is open from quite some time now.

I resolved the conflict with another PR that was merged and saw it pass the test :)

all-cloudz avatar Jul 27 '24 03:07 all-cloudz

@all-cloudz , @injae-kim, Thanks for the response. Does this mean it is good to be merged now ?

sn-12-3 avatar Aug 12 '24 09:08 sn-12-3

@sn-12-3 Yes it's ok to merge now.

all-cloudz avatar Aug 12 '24 23:08 all-cloudz

@injae-kim , I was looking at the change, this line in here is increasing the position by 1 to write each character of string into a byte. But this line in here is to safely move the position pointer forward by the length of the data being written isn't it ? advancing to a safe position, so that it ensures data is written without any conflicts. I believe the same has been done for other methods as well in the flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java

cc @all-cloudz

sn-12-3 avatar Aug 21 '24 13:08 sn-12-3

But this line in here is to safely move the position pointer forward by the length of the data being written isn't it ? advancing to a safe position, so that it ensures data is written without any conflicts

    @Test
    public void testWriteBytes() {
        DataOutputSerializer serializer = new DataOutputSerializer(1);
        String givenStr = "Hello, World!";

        assertThatCode(() -> serializer.writeBytes(givenStr)).doesNotThrowAnyException();

        ByteBuffer wrote = serializer.wrapAsByteBuffer();
        String actual = Charset.defaultCharset().decode(wrote).toString();
        assertThat(actual).isEqualTo(givenStr);
    }

Hmm with current code, above test failed isn't it? @all-cloudz

injae-kim avatar Aug 22 '24 09:08 injae-kim

I added code to move the position in the same way as the existing DataOutputSerializer code and ran the test. The following error occurred, which was due to moving the position multiple times. So, I believe the changes I made are correct.

[ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.040 s <<< FAILURE! -- in org.apache.flink.core.memory.DataInputOutputSerializerTest
[ERROR] org.apache.flink.core.memory.DataInputOutputSerializerTest.testWriteBytes -- Time elapsed: 0.032 s <<< ERROR!
java.lang.IllegalArgumentException: newLimit > capacity: (26 > 14)
	at java.base/java.nio.Buffer.createLimitException(Buffer.java:395)
	at java.base/java.nio.Buffer.limit(Buffer.java:369)
	at java.base/java.nio.ByteBuffer.limit(ByteBuffer.java:1529)
	at org.apache.flink.core.memory.DataOutputSerializer.wrapAsByteBuffer(DataOutputSerializer.java:52)
	at org.apache.flink.core.memory.DataInputOutputSerializerTest.testWriteBytes(DataInputOutputSerializerTest.java:152)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)

While running this test, I noticed that the test code format does not adhere to the guidelines. I will correct it and commit the changes.

all-cloudz avatar Aug 24 '24 05:08 all-cloudz

This PR is being marked as stale since it has not had any activity in the last 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch.

If you are having difficulty finding a reviewer, please reach out to the community, contact details can be found here: https://flink.apache.org/what-is-flink/community/

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

github-actions[bot] avatar Apr 04 '25 06:04 github-actions[bot]

This PR has been closed since it has not had any activity in 120 days. If you feel like this was a mistake, or you would like to continue working on it, please feel free to re-open the PR and ask for a review.

github-actions[bot] avatar May 05 '25 06:05 github-actions[bot]