Implement input/output streaming support
The PR adds support for Input/Output streams.
~~The implementation has a new type of MaksingState called "BufferedMaskingState". BufferedMaskingState is a wrapper around MaskingState, which got renamed to "ByteArrayMaskingState" as MaskingState now is an interface.~~ The impementation enhances MaskingState to support buffering data from input stream and flushing replacement operations into the output stream. Both streamind mode and byte array mode share the same execution path.
BufferedMaskingState reads chunks of size 8192 bytes from the provided input stream and buffers the data in ByteArrayMaskingState. In case the end of the current buffer is reached while the masker processes a JSON value, the current buffer gets extended and everything before the start of the current JSON value gets cut. In case the masker does not process a JSON value, the buffer gets replaced with the next one.
For writes, BufferedMaskingState flushes prepared replacement operations byte array into provided output stream. In case the data is flushed while the masker is still processing the current JSON value, everything after the start of the current value gets cut in the output byte array.
The PR adds an assertion to every test that the output of bytes API and streams API is equivalent. There is also a disabled test that creates a ~1GB file and asserts that the masker is able to process it with streams API without going OOM.
The PR also creates StreamsBenchmark JMH suite. Those should only be used to compare performance for different types of streams as they have a specific JMH setup which has some timing measurement implications.
[!NOTE] These results are affected by shared workloads on GitHub runners. Use the results only to detect possible regressions, but always rerun on more stable machine before making any conclusions!
Benchmark results (pull-request, 7a0502148c0c0375c516e348039c0d9209836a1c)
Benchmark (characters) (jsonPath) (jsonSize) (keyLength) (maskedKeyProbability) (numberOfTargetKeys) (streamInputType) (streamOutputType) Mode Cnt Score Error Units
BaselineBenchmark.countBytes unicode N/A 1kb N/A 0.1 N/A N/A N/A thrpt 4 2590774.930 ± 144199.334 ops/s
BaselineBenchmark.countBytes:gc.alloc.rate.norm unicode N/A 1kb N/A 0.1 N/A N/A N/A thrpt 4 0.001 ± 0.001 B/op
BaselineBenchmark.jacksonParseAndMask unicode N/A 1kb N/A 0.1 N/A N/A N/A thrpt 4 29558.388 ± 899.027 ops/s
BaselineBenchmark.jacksonParseAndMask:gc.alloc.rate.norm unicode N/A 1kb N/A 0.1 N/A N/A N/A thrpt 4 64368.071 ± 0.048 B/op
BaselineBenchmark.jacksonParseOnly unicode N/A 1kb N/A 0.1 N/A N/A N/A thrpt 4 50798.375 ± 844.626 ops/s
BaselineBenchmark.jacksonParseOnly:gc.alloc.rate.norm unicode N/A 1kb N/A 0.1 N/A N/A N/A thrpt 4 24304.041 ± 0.028 B/op
BaselineBenchmark.regexReplace unicode N/A 1kb N/A 0.1 N/A N/A N/A thrpt 4 5302.964 ± 122.121 ops/s
BaselineBenchmark.regexReplace:gc.alloc.rate.norm unicode N/A 1kb N/A 0.1 N/A N/A N/A thrpt 4 61656.397 ± 0.273 B/op
BaselineBenchmark.writeFile unicode N/A 1kb N/A 0.1 N/A N/A N/A thrpt 4 4884.155 ± 1673.103 ops/s
BaselineBenchmark.writeFile:gc.alloc.rate.norm unicode N/A 1kb N/A 0.1 N/A N/A N/A thrpt 4 10664.430 ± 0.261 B/op
InstanceCreationBenchmark.jsonMasker N/A N/A N/A N/A N/A 1000 N/A N/A thrpt 4 646.528 ± 8.596 ops/s
InstanceCreationBenchmark.jsonMasker:gc.alloc.rate.norm N/A N/A N/A N/A N/A 1000 N/A N/A thrpt 4 2796876.393 ± 6.282 B/op
JsonMaskerBenchmark.jsonMaskerByteArrayStreams unicode false 1kb N/A 0.1 N/A N/A N/A thrpt 4 277152.180 ± 3324.872 ops/s
JsonMaskerBenchmark.jsonMaskerByteArrayStreams:gc.alloc.rate.norm unicode false 1kb N/A 0.1 N/A N/A N/A thrpt 4 10816.007 ± 0.001 B/op
JsonMaskerBenchmark.jsonMaskerByteArrayStreams unicode true 1kb N/A 0.1 N/A N/A N/A thrpt 4 374381.742 ± 6183.266 ops/s
JsonMaskerBenchmark.jsonMaskerByteArrayStreams:gc.alloc.rate.norm unicode true 1kb N/A 0.1 N/A N/A N/A thrpt 4 10400.005 ± 0.001 B/op
JsonMaskerBenchmark.jsonMaskerBytes unicode false 1kb N/A 0.1 N/A N/A N/A thrpt 4 368311.929 ± 17670.629 ops/s
JsonMaskerBenchmark.jsonMaskerBytes:gc.alloc.rate.norm unicode false 1kb N/A 0.1 N/A N/A N/A thrpt 4 2256.005 ± 0.001 B/op
JsonMaskerBenchmark.jsonMaskerBytes unicode true 1kb N/A 0.1 N/A N/A N/A thrpt 4 504785.556 ± 21811.697 ops/s
JsonMaskerBenchmark.jsonMaskerBytes:gc.alloc.rate.norm unicode true 1kb N/A 0.1 N/A N/A N/A thrpt 4 1304.004 ± 0.001 B/op
JsonMaskerBenchmark.jsonMaskerString unicode false 1kb N/A 0.1 N/A N/A N/A thrpt 4 211776.239 ± 2017.484 ops/s
JsonMaskerBenchmark.jsonMaskerString:gc.alloc.rate.norm unicode false 1kb N/A 0.1 N/A N/A N/A thrpt 4 10160.009 ± 0.001 B/op
JsonMaskerBenchmark.jsonMaskerString unicode true 1kb N/A 0.1 N/A N/A N/A thrpt 4 222157.707 ± 4461.823 ops/s
JsonMaskerBenchmark.jsonMaskerString:gc.alloc.rate.norm unicode true 1kb N/A 0.1 N/A N/A N/A thrpt 4 10336.009 ± 0.001 B/op
LargeKeySetInstanceCreationBenchmark.jsonMasker N/A N/A N/A 100 N/A 1000 N/A N/A thrpt 4 9.804 ± 1.589 ops/s
LargeKeySetInstanceCreationBenchmark.jsonMasker:gc.alloc.rate.norm N/A N/A N/A 100 N/A 1000 N/A N/A thrpt 4 65694215.310 ± 13583605.669 B/op
StreamTypeBenchmark.jsonMaskerStreams N/A N/A 1kb N/A N/A N/A ByteArrayStream ByteArrayStream thrpt 4 308516.593 ± 5259.614 ops/s
StreamTypeBenchmark.jsonMaskerStreams:gc.alloc.rate.norm N/A N/A 1kb N/A N/A N/A ByteArrayStream ByteArrayStream thrpt 4 10400.007 ± 0.006 B/op
StreamTypeBenchmark.jsonMaskerStreams N/A N/A 1kb N/A N/A N/A ByteArrayStream FileStream thrpt 4 4977.686 ± 1833.213 ops/s
StreamTypeBenchmark.jsonMaskerStreams:gc.alloc.rate.norm N/A N/A 1kb N/A N/A N/A ByteArrayStream FileStream thrpt 4 8584.424 ± 0.185 B/op
StreamTypeBenchmark.jsonMaskerStreams N/A N/A 1kb N/A N/A N/A FileStream ByteArrayStream thrpt 4 99149.612 ± 467.078 ops/s
StreamTypeBenchmark.jsonMaskerStreams:gc.alloc.rate.norm N/A N/A 1kb N/A N/A N/A FileStream ByteArrayStream thrpt 4 10528.021 ± 0.018 B/op
StreamTypeBenchmark.jsonMaskerStreams N/A N/A 1kb N/A N/A N/A FileStream FileStream thrpt 4 4956.429 ± 977.492 ops/s
StreamTypeBenchmark.jsonMaskerStreams:gc.alloc.rate.norm N/A N/A 1kb N/A N/A N/A FileStream FileStream thrpt 4 8760.426 ± 0.274 B/op
ValueMaskerBenchmark.maskWithRawValueFunction unicode N/A 1kb N/A 0.1 N/A N/A N/A thrpt 4 397721.837 ± 6940.919 ops/s
ValueMaskerBenchmark.maskWithRawValueFunction:gc.alloc.rate.norm unicode N/A 1kb N/A 0.1 N/A N/A N/A thrpt 4 1616.005 ± 0.001 B/op
ValueMaskerBenchmark.maskWithStatic unicode N/A 1kb N/A 0.1 N/A N/A N/A thrpt 4 523438.882 ± 12981.929 ops/s
ValueMaskerBenchmark.maskWithStatic:gc.alloc.rate.norm unicode N/A 1kb N/A 0.1 N/A N/A N/A thrpt 4 1256.004 ± 0.001 B/op
ValueMaskerBenchmark.maskWithTextValueFunction unicode N/A 1kb N/A 0.1 N/A N/A N/A thrpt 4 324624.702 ± 4445.994 ops/s
ValueMaskerBenchmark.maskWithTextValueFunction:gc.alloc.rate.norm unicode N/A 1kb N/A 0.1 N/A N/A N/A thrpt 4 1904.006 ± 0.001 B/op
Benchmark results (master, 2458b81a40ff94aeb469a23b8fb5f4fefbfeaf2b)
Benchmark (characters) (jsonPath) (jsonSize) (keyLength) (maskedKeyProbability) (numberOfTargetKeys) Mode Cnt Score Error Units
BaselineBenchmark.countBytes unicode N/A 1kb N/A 0.1 N/A thrpt 4 2589068.526 ± 140023.334 ops/s
BaselineBenchmark.countBytes:gc.alloc.rate.norm unicode N/A 1kb N/A 0.1 N/A thrpt 4 0.001 ± 0.001 B/op
BaselineBenchmark.jacksonParseAndMask unicode N/A 1kb N/A 0.1 N/A thrpt 4 29836.592 ± 1153.862 ops/s
BaselineBenchmark.jacksonParseAndMask:gc.alloc.rate.norm unicode N/A 1kb N/A 0.1 N/A thrpt 4 64152.067 ± 0.012 B/op
BaselineBenchmark.jacksonParseOnly unicode N/A 1kb N/A 0.1 N/A thrpt 4 50533.741 ± 627.804 ops/s
BaselineBenchmark.jacksonParseOnly:gc.alloc.rate.norm unicode N/A 1kb N/A 0.1 N/A thrpt 4 24304.039 ± 0.001 B/op
BaselineBenchmark.regexReplace unicode N/A 1kb N/A 0.1 N/A thrpt 4 5306.829 ± 56.158 ops/s
BaselineBenchmark.regexReplace:gc.alloc.rate.norm unicode N/A 1kb N/A 0.1 N/A thrpt 4 61656.373 ± 0.016 B/op
InstanceCreationBenchmark.jsonMasker N/A N/A N/A N/A N/A 1000 thrpt 4 674.776 ± 30.441 ops/s
InstanceCreationBenchmark.jsonMasker:gc.alloc.rate.norm N/A N/A N/A N/A N/A 1000 thrpt 4 2638443.801 ± 5.318 B/op
JsonMaskerBenchmark.jsonMaskerBytes unicode false 1kb N/A 0.1 N/A thrpt 4 415928.508 ± 7590.403 ops/s
JsonMaskerBenchmark.jsonMaskerBytes:gc.alloc.rate.norm unicode false 1kb N/A 0.1 N/A thrpt 4 2232.005 ± 0.001 B/op
JsonMaskerBenchmark.jsonMaskerBytes unicode true 1kb N/A 0.1 N/A thrpt 4 539718.322 ± 42266.584 ops/s
JsonMaskerBenchmark.jsonMaskerBytes:gc.alloc.rate.norm unicode true 1kb N/A 0.1 N/A thrpt 4 1280.004 ± 0.001 B/op
JsonMaskerBenchmark.jsonMaskerString unicode false 1kb N/A 0.1 N/A thrpt 4 226024.295 ± 2152.855 ops/s
JsonMaskerBenchmark.jsonMaskerString:gc.alloc.rate.norm unicode false 1kb N/A 0.1 N/A thrpt 4 10136.009 ± 0.001 B/op
JsonMaskerBenchmark.jsonMaskerString unicode true 1kb N/A 0.1 N/A thrpt 4 239249.391 ± 4689.099 ops/s
JsonMaskerBenchmark.jsonMaskerString:gc.alloc.rate.norm unicode true 1kb N/A 0.1 N/A thrpt 4 10312.008 ± 0.001 B/op
LargeKeySetInstanceCreationBenchmark.jsonMasker N/A N/A N/A 100 N/A 1000 thrpt 4 11.091 ± 3.814 ops/s
LargeKeySetInstanceCreationBenchmark.jsonMasker:gc.alloc.rate.norm N/A N/A N/A 100 N/A 1000 thrpt 4 57869009.076 ± 14516826.819 B/op
ValueMaskerBenchmark.maskWithRawValueFunction unicode N/A 1kb N/A 0.1 N/A thrpt 4 560714.165 ± 3493.695 ops/s
ValueMaskerBenchmark.maskWithRawValueFunction:gc.alloc.rate.norm unicode N/A 1kb N/A 0.1 N/A thrpt 4 1592.004 ± 0.001 B/op
ValueMaskerBenchmark.maskWithStatic unicode N/A 1kb N/A 0.1 N/A thrpt 4 704062.460 ± 9124.899 ops/s
ValueMaskerBenchmark.maskWithStatic:gc.alloc.rate.norm unicode N/A 1kb N/A 0.1 N/A thrpt 4 1232.003 ± 0.001 B/op
ValueMaskerBenchmark.maskWithTextValueFunction unicode N/A 1kb N/A 0.1 N/A thrpt 4 552069.960 ± 9552.497 ops/s
ValueMaskerBenchmark.maskWithTextValueFunction:gc.alloc.rate.norm unicode N/A 1kb N/A 0.1 N/A thrpt 4 1880.004 ± 0.001 B/op
Without looking much into the changes in the PR
The implementation has a new type of MaksingState called "BufferedMaskingState". BufferedMaskingState is a wrapper around MaskingState, which got renamed to "ByteArrayMaskingState" as MaskingState now is an interface.
why even do this distinction? The byte array can be always wrapped into a ByteArrayInputStream as input and ByteArrayOutputStream as output and processed in the "streaming mode". I wouldn't keep two separate implementations of MaskingState unless there's a huge performance hit with the byte array wrappers.
Without looking much into the changes in the PR
The implementation has a new type of MaksingState called "BufferedMaskingState". BufferedMaskingState is a wrapper around MaskingState, which got renamed to "ByteArrayMaskingState" as MaskingState now is an interface.
why even do this distinction? The byte array can be always wrapped into a
ByteArrayInputStreamas input andByteArrayOutputStreamas output and processed in the "streaming mode". I wouldn't keep two separate implementations ofMaskingStateunless there's a huge performance hit with the byte array wrappers.
Streaming mode turned out to be more complex than just wrapping the byte arrays into ByteArrayInputStream/ByteArrayOutputStream.
BufferedMaskingState is a wrapper which is primarily responsible for bufferization. Essentially it reads the next byte array from the stream and adjust the pointers in the masking state (i.e. currentIndex and currentValueStartIndex) while taking care of nuances (such as if the masker is currently looping through a JSON value).
Also I had an idea that someone will pass exceptionally long PipedInputStream or FileInputStream that could not be kept in memory. The current implementation handles them fine, there is a disabled test "LargeStreamTest" that creates 1GB json file and asserts that the masker is able to process it.
@gavlyukovskiy to elaborate on the motivation to create a separate BufferedMaskingState implementation. Initially I tried to implement the streaming mode inside MaskingState, but there were 2 nuances. The first and the main nuance is the random access of the "message" byte array. There is logic that arbitrarily moves the pointer along the byte array including logic in KeyMatcher trie. The only way to achieve random access in input streams is to use mark/reset/skip methods, but rewriting every "message" array random access in terms of these methods turned out to be more cumbersome than I expected (maybe I did not try hard enough though). The second nuance is code clarity. Every input stream method declares that it throws IOException, so handling it in the same class makes the code in MaskingState (and in KeyMatcher) almost incomprehensible.
Great work! Adde a couple of comments. I think we'll need some more tests to make sure every possible scenario with buffer operations are covered sufficiently.
Also, file benchmarks look a bit sus
StreamTypeBenchmark.jsonMaskerStreams N/A N/A 1kb N/A N/A N/A ByteArrayStream ByteArrayStream thrpt 4 393902.826 ± 13635.863 ops/s StreamTypeBenchmark.jsonMaskerStreams N/A N/A 1kb N/A N/A N/A ByteArrayStream FileStream thrpt 4 4326.934 ± 499.007 ops/s StreamTypeBenchmark.jsonMaskerStreams N/A N/A 1kb N/A N/A N/A FileStream ByteArrayStream thrpt 4 89939.494 ± 499.991 ops/s StreamTypeBenchmark.jsonMaskerStreams N/A N/A 1kb N/A N/A N/A FileStream FileStream thrpt 4 5089.371 ± 2363.523 ops/sIs it because of flushing on every replacement? It would be nice to also update
BaselineBenchmarkwith simply writing a json into a file without any masking to see how it compares.Also not sure where the difference between
StreamTypeBenchmark.jsonMaskerStreamsandJsonMaskerBenchmark.jsonMaskerByteArrayStreamsis coming from:JsonMaskerBenchmark.jsonMaskerByteArrayStreams unicode false 1kb N/A 0.1 N/A N/A N/A thrpt 4 265071.377 ± 4694.735 ops/sboth are 1kb jsons, but throughput is 265k vs 393k, is it because of different masking options?
Removed the flushing on every replacement operation and created a basline benchmark for writing a json into a file. Let's see how it compares now.
As to ByteArray streams comparisons, StreamType benchmarks always have jsonPath enabled. From the most recent run, the benchmarkss look similar:
JsonMaskerBenchmark.jsonMaskerByteArrayStreams unicode true 1kb N/A 0.1 N/A N/A N/A thrpt 4 334683.542 ± 4351.616 ops/s
StreamTypeBenchmark.jsonMaskerStreams N/A N/A 1kb N/A N/A N/A ByteArrayStream ByteArrayStream thrpt 4 335748.850 ± 10837.677 ops/s
Discussion summary:
- Create ADR to cover streaming decision
- buffer extension
- max of 50k characters per token
- explain what a token is
- min buffer size == 5 (due to jumping)
- transferring the remaining buffer after end of the outmost JSON
- when we need to extend the buffer while we're inside the token - we apply speculative optimization of transferring the token to the beginning of the buffer instead of extending the buffer
- Change all tests to always use the new method that checks both byte array and streaming APIs
Quality Gate passed
Issues
3 New issues
0 Accepted issues
Measures
0 Security Hotspots
93.3% Coverage on New Code
0.0% Duplication on New Code