armeria icon indicating copy to clipboard operation
armeria copied to clipboard

Cache an aggregated result of a `StreamMessage` for non-pooled objects

Open ikhoon opened this issue 3 years ago • 1 comments

Motivation:

HttpRequest.aggregate() and HttpResonse.aggregate() allows to call only once because StreamMessage can be subscribed once. It seems acceptable if they can store the aggregated request or response to somewhere.

It can make a nuisance if it is a request is automatically aggregated by a decorator or service and users want to get the raw HTTP request in the business logic. See #3789 for details.

Modifications:

  • Add AggregationOptions and AggregationOptionsBuilder to control the behavior of aggregation.
  • Add StreamMessage.aggregate(AggregationOptions) and StreamMessage.aggregate(Function<T, U>) operators to easily aggregate the elements in a StreamMessage.
  • Rename the original AbstractStreamMessage to CancellableStreamMessage and add a new AbstractStreamMessage
    • All known StreamMessage implementations now extend AbstractStreamMessage apart from delegators.
  • Make HttpRequest.aggregate() and HttpResponse.aggregate() cached.
    • Note that the behavior of HttpRequest.aggregateWithPooledObjects() and HttpResponse.aggregateWithPooledObjects() are preserved. Pooled objects have their own life cycle which is intricate to cache.

Result:

  • You can now repeatedly aggregate an HttpRequest and HttpResonse. If an HttpRequest and HttpResonse is aggregated, the cached value is returned for the next calls.
    HttpRequest request = ...;
    AggregatedHttpRequest aggregatedReq0 = request.aggregate().join();
    AggregatedHttpRequest aggregatedReq1 = request.aggregate().join();
    assert aggregatedReq0 == aggregatedReq1;
    
    HttpResponse response = ...;
    AggregatedHttpResponse aggregatedRes0 = response.aggregate().join();
    AggregatedHttpResponse aggregatedRes1 = response.aggregate().join();
    assert aggregatedRes0 == aggregatedRes1;
    
  • You can now conveniently aggregate a StreamMessage using StreamMessage.aggregate(Function).
    StreamMessage stream = StreamMessage.of(1, 2, 3, 4);
    int sum = stream.aggregate(nums -> nums.stream().reduce(0, Integer::sum))
                    .join();
    assert sum == 10;
    
  • Fixes #3789

ikhoon avatar Jul 26 '22 09:07 ikhoon

Codecov Report

Merging #4366 (7e53ad3) into master (7395a2e) will increase coverage by 0.43%. The diff coverage is 81.07%.

@@             Coverage Diff              @@
##             master    #4366      +/-   ##
============================================
+ Coverage     73.53%   73.96%   +0.43%     
- Complexity    17648    17905     +257     
============================================
  Files          1500     1520      +20     
  Lines         66010    66555     +545     
  Branches       8328     8372      +44     
============================================
+ Hits          48540    49229     +689     
- Misses        13254    13303      +49     
+ Partials       4216     4023     -193     
Impacted Files Coverage Δ
...p/armeria/common/stream/FilteredStreamMessage.java 78.16% <ø> (+1.40%) :arrow_up:
...ria/common/stream/PublisherBasedStreamMessage.java 83.64% <ø> (+1.14%) :arrow_up:
...rp/armeria/common/stream/StreamMessageWrapper.java 86.95% <ø> (ø)
...eria/internal/common/AbstractSplitHttpMessage.java 88.88% <ø> (ø)
...a/internal/common/stream/AbortedStreamMessage.java 66.66% <ø> (ø)
...p/armeria/common/stream/DeferredStreamMessage.java 81.39% <42.85%> (-3.18%) :arrow_down:
...corp/armeria/common/DefaultAggregationOptions.java 59.09% <59.09%> (ø)
...corp/armeria/common/stream/AggregationSupport.java 72.91% <72.91%> (ø)
...armeria/internal/common/HttpMessageAggregator.java 85.50% <82.82%> (+0.89%) :arrow_up:
...ternal/common/stream/RecoverableStreamMessage.java 73.84% <84.21%> (+1.77%) :arrow_up:
... and 323 more

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

codecov[bot] avatar Jul 27 '22 11:07 codecov[bot]

Do you mind updating the description?

For sure. Updated.

ikhoon avatar Sep 06 '22 14:09 ikhoon

Thanks for reviewing! 🙇‍♂️

ikhoon avatar Sep 08 '22 03:09 ikhoon