Cache an aggregated result of a `StreamMessage` for non-pooled objects
Motivation:
HttpRequest.aggregate() and HttpResonse.aggregate() allows to call
only once because StreamMessage can be subscribed once.
It seems acceptable if they can store the aggregated request or response
to somewhere.
It can make a nuisance if it is a request is automatically aggregated by a decorator or service and users want to get the raw HTTP request in the business logic. See #3789 for details.
Modifications:
- Add
AggregationOptionsandAggregationOptionsBuilderto control the behavior of aggregation. - Add
StreamMessage.aggregate(AggregationOptions)andStreamMessage.aggregate(Function<T, U>)operators to easily aggregate the elements in aStreamMessage. - Rename the original
AbstractStreamMessagetoCancellableStreamMessageand add a newAbstractStreamMessage- All known
StreamMessageimplementations now extendAbstractStreamMessageapart from delegators.
- All known
- Make
HttpRequest.aggregate()andHttpResponse.aggregate()cached.- Note that the behavior of
HttpRequest.aggregateWithPooledObjects()andHttpResponse.aggregateWithPooledObjects()are preserved. Pooled objects have their own life cycle which is intricate to cache.
- Note that the behavior of
Result:
- You can now repeatedly aggregate an
HttpRequestandHttpResonse. If anHttpRequestandHttpResonseis aggregated, the cached value is returned for the next calls.HttpRequest request = ...; AggregatedHttpRequest aggregatedReq0 = request.aggregate().join(); AggregatedHttpRequest aggregatedReq1 = request.aggregate().join(); assert aggregatedReq0 == aggregatedReq1; HttpResponse response = ...; AggregatedHttpResponse aggregatedRes0 = response.aggregate().join(); AggregatedHttpResponse aggregatedRes1 = response.aggregate().join(); assert aggregatedRes0 == aggregatedRes1; - You can now conveniently aggregate a
StreamMessageusingStreamMessage.aggregate(Function).StreamMessage stream = StreamMessage.of(1, 2, 3, 4); int sum = stream.aggregate(nums -> nums.stream().reduce(0, Integer::sum)) .join(); assert sum == 10; - Fixes #3789
Codecov Report
Merging #4366 (7e53ad3) into master (7395a2e) will increase coverage by
0.43%. The diff coverage is81.07%.
@@ Coverage Diff @@
## master #4366 +/- ##
============================================
+ Coverage 73.53% 73.96% +0.43%
- Complexity 17648 17905 +257
============================================
Files 1500 1520 +20
Lines 66010 66555 +545
Branches 8328 8372 +44
============================================
+ Hits 48540 49229 +689
- Misses 13254 13303 +49
+ Partials 4216 4023 -193
| Impacted Files | Coverage Δ | |
|---|---|---|
| ...p/armeria/common/stream/FilteredStreamMessage.java | 78.16% <ø> (+1.40%) |
:arrow_up: |
| ...ria/common/stream/PublisherBasedStreamMessage.java | 83.64% <ø> (+1.14%) |
:arrow_up: |
| ...rp/armeria/common/stream/StreamMessageWrapper.java | 86.95% <ø> (ø) |
|
| ...eria/internal/common/AbstractSplitHttpMessage.java | 88.88% <ø> (ø) |
|
| ...a/internal/common/stream/AbortedStreamMessage.java | 66.66% <ø> (ø) |
|
| ...p/armeria/common/stream/DeferredStreamMessage.java | 81.39% <42.85%> (-3.18%) |
:arrow_down: |
| ...corp/armeria/common/DefaultAggregationOptions.java | 59.09% <59.09%> (ø) |
|
| ...corp/armeria/common/stream/AggregationSupport.java | 72.91% <72.91%> (ø) |
|
| ...armeria/internal/common/HttpMessageAggregator.java | 85.50% <82.82%> (+0.89%) |
:arrow_up: |
| ...ternal/common/stream/RecoverableStreamMessage.java | 73.84% <84.21%> (+1.77%) |
:arrow_up: |
| ... and 323 more |
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.
Do you mind updating the description?
For sure. Updated.
Thanks for reviewing! 🙇♂️