[FLINK-23598] [core] Fix a bug that caused position to be updated twice when writing a string to a byte array
What is the purpose of the change
This PR fixes a bug in the DataOutputSerializer's method writeBytes(String) and adds tests. The method writeBytes takes a string as an argument and writes it to a byte array. The bug occurs because the positions in the byte array are updated by writeBytes, even though they are updated by the method writeByte that writeBytes calls. Although this method is not called by any other class in Flink, it has the potential to cause problems, so we've fixed the bug and added related tests.
Brief change log
- Remove the code that updates the position from the implementation of the method writeBytes
- Add a test to serialize a given string
- Add a test to serialize an arbitrary string
Verifying this change
This change added tests and can be verified as follows:
- Added a test to serialize the string "Hello, World!" and verify that the result is correct.
- Added a test to serialize an arbitrary string and verify that the result is correct.
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
@Public(Evolving): no - The serializers: yes
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
- The S3 file system connector: no
Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not applicable
CI report:
- dadc6fa86f25d65d195fb2ffebb133ff3a4aedfa Azure: FAILURE
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
Hello, could someone please clarify if this PR requires any further modifications as the PR is open from quite some time now.
Hello, could someone please clarify if this PR requires any further modifications as the PR is open from quite some time now.
I resolved the conflict with another PR that was merged and saw it pass the test :)
@all-cloudz , @injae-kim, Thanks for the response. Does this mean it is good to be merged now ?
@sn-12-3 Yes it's ok to merge now.
@injae-kim , I was looking at the change, this line in here is increasing the position by 1 to write each character of string into a byte. But this line in here is to safely move the position pointer forward by the length of the data being written isn't it ? advancing to a safe position, so that it ensures data is written without any conflicts. I believe the same has been done for other methods as well in the flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
cc @all-cloudz
But this line in here is to safely move the position pointer forward by the length of the data being written isn't it ? advancing to a safe position, so that it ensures data is written without any conflicts
@Test
public void testWriteBytes() {
DataOutputSerializer serializer = new DataOutputSerializer(1);
String givenStr = "Hello, World!";
assertThatCode(() -> serializer.writeBytes(givenStr)).doesNotThrowAnyException();
ByteBuffer wrote = serializer.wrapAsByteBuffer();
String actual = Charset.defaultCharset().decode(wrote).toString();
assertThat(actual).isEqualTo(givenStr);
}
Hmm with current code, above test failed isn't it? @all-cloudz
I added code to move the position in the same way as the existing DataOutputSerializer code and ran the test. The following error occurred, which was due to moving the position multiple times. So, I believe the changes I made are correct.
[ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.040 s <<< FAILURE! -- in org.apache.flink.core.memory.DataInputOutputSerializerTest
[ERROR] org.apache.flink.core.memory.DataInputOutputSerializerTest.testWriteBytes -- Time elapsed: 0.032 s <<< ERROR!
java.lang.IllegalArgumentException: newLimit > capacity: (26 > 14)
at java.base/java.nio.Buffer.createLimitException(Buffer.java:395)
at java.base/java.nio.Buffer.limit(Buffer.java:369)
at java.base/java.nio.ByteBuffer.limit(ByteBuffer.java:1529)
at org.apache.flink.core.memory.DataOutputSerializer.wrapAsByteBuffer(DataOutputSerializer.java:52)
at org.apache.flink.core.memory.DataInputOutputSerializerTest.testWriteBytes(DataInputOutputSerializerTest.java:152)
at java.base/java.lang.reflect.Method.invoke(Method.java:569)
at java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
While running this test, I noticed that the test code format does not adhere to the guidelines. I will correct it and commit the changes.
This PR is being marked as stale since it has not had any activity in the last 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch.
If you are having difficulty finding a reviewer, please reach out to the community, contact details can be found here: https://flink.apache.org/what-is-flink/community/
If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.
This PR has been closed since it has not had any activity in 120 days. If you feel like this was a mistake, or you would like to continue working on it, please feel free to re-open the PR and ask for a review.