examples
examples copied to clipboard
FraudService solution has NPE and also doesn't aggregate first order in the method simpleMerge
The solution for the FraudService causes an NPE in the aggregate because total.getValue() could be null.
https://github.com/confluentinc/examples/blob/5.3.1-post/microservices-orders/exercises/FraudService.java
Solution:
final KTable<Windowed<Long>, OrderValue> aggregate = orders
.groupBy((id, order) -> order.getCustomerId(), Grouped.with(Serdes.Long(), Schemas.Topics.ORDERS.getValueSerde()))
.windowedBy(SessionWindows.with(Duration.ofHours(1)))
.aggregate(OrderValue::new,
(custId, order, total) -> {
return new OrderValue(order, (total.getValue() == null ? 0 : total.getValue()) + order.getQuantity() * order.getPrice());
},
(k, a, b) -> simpleMerge(a, b),
Materialized.with(null, new JsonSerdes<OrderValue>(OrderValue.class)));
The simpleMerge also doesn't aggregate the first order because it sets the value to 0 if a is null.
private OrderValue simpleMerge(final OrderValue a, final OrderValue b) {
return new OrderValue(b.getOrder(), (a == null ? 0D : a.getValue()) + b.getValue());
}
should be:
private OrderValue simpleMerge(final OrderValue a, final OrderValue b) {
return new OrderValue(b.getOrder(), ((a == null || a.getValue() == null) ? b.getValue() : a.getValue() + b.getValue()));
}