5 min read

For example, you might want to retrieve the total amount of all orders for a particular customer. One possible solution is to retrieve all the orders for the customer using a filter and to iterate over them on the client in order to calculate the total. While this will work, you need to consider the implications:

  1. You might end up moving a lot of data across the network in order to calculate a result that is only few bytes long
  2. You will be calculating the result in a single-threaded fashion, which might introduce a performance bottleneck into your application

The better approach would be to calculate partial results on each cache node for the data it manages, and to aggregate those partial results into a single answer before returning it to the client. Fortunately, we can use Coherence aggregators to achieve exactly that.

By using an aggregator, we limit the amount of data that needs to be moved across the wire to the aggregator instance itself, the partial results returned by each Coherence node the aggregator is evaluated on, and the final result. This reduces the network traffic significantly and ensures that we use the network as efficiently as possible. It also allows us to perform the aggregation in parallel, using full processing power of the Coherence cluster.

At the very basic, an aggregator is an instance of a class that implements the com.tangosol.util.InvocableMap.EntryAggregator interface:

interface EntryAggregator extends Serializable {
Object aggregate(Set set);
}

However, you will rarely have the need to implement this interface directly. Instead, you should extend the com.tangosol.util.aggregator.AbstractAggregator class that also implements the com.tangosol.util.InvocableMap.ParallelAwareAggregator interface, which is required to ensure that the aggregation is performed in parallel across the cluster.

The AbstractAggregator class has a constructor that accepts a value extractor to use and defines the three abstract methods you need to override:

public abstract class AbstractAggregator
implements InvocableMap.ParallelAwareAggregator {
public AbstractAggregator(ValueExtractor valueExtractor) {
...
}
protected abstract void init(boolean isFinal);
protected abstract void process(Object value, boolean isFinal);
protected abstract Object finalizeResult(boolean isFinal);
}

The init method is used to initialize the result of aggregation, the process method is used to process a single aggregation value and include it in the result, and the finalizeResult method is used to create the final result of the aggregation.

Because aggregators can be executed in parallel, the init and finalizeResult methods accept a flag specifying whether the result to initialize or finalize is the final result that should be returned by the aggregator or a partial result, returned by one of the parallel aggregators.

The process method also accepts an isFinal flag, but in its case the semantics are somewhat different—if the isFinal flag is true, that means that the object to process is the result of a single parallel aggregator execution that needs to be incorporated into the final result. Otherwise, it is the value extracted from a target object using the value extractor that was specified as a constructor argument.

This will all be much clearer when we look at an example. Let’s write a simple aggregator that returns an average value of a numeric attribute:

public class AverageAggregator
extends AbstractAggregator {
private transient double sum;
private transient int count;
public AverageAggregator() {
// deserialization constructor
}
public AverageAggregator(ValueExtractor valueExtractor) {
super(valueExtractor);
}
public AverageAggregator(String propertyName) {
super(propertyName);
}
protected void init(boolean isFinal) {
sum = 0;
count = 0;
}
protected void process(Object value, boolean isFinal) {
if (value != null) {
if (isFinal) {
PartialResult pr = (PartialResult) o;
sum += pr.getSum();
count += pr.getCount();
}
else {
sum += ((Number) o).doubleValue();
count++;
}
}
}
protected Object finalizeResult(boolean isFinal) {
if (isFinal) {
return count == 0 ? null : sum / count;
}
else {
return new PartialResult(sum, count);
}
}
static class PartialResult implements Serializable {
private double sum;
private int count;
PartialResult(double sum, int count) {
this.sum = sum;
this.count = count;
}
public double getSum() {
return sum;
}
public int getCount() {
return count;
}
}
}

As you can see, the init method simply sets both the sum and the count fields to zero, completely ignoring the value of the isFinal flag. This is OK, as we want those values to start from zero whether we are initializing our main aggregator or one of the parallel aggregators.

The finalizeResult method, on the other hand, depends on the isFinal flag to decide which value to return. If it is true, it divides the sum by the count in order to calculate the average and returns it. The only exception is if the count is zero, in which case the result is undefined and the null value is returned.

However, if the isFinal flag is false, the finalizeResult simply returns an instance of a PartialResult inner class, which is nothing more than a holder for the partial sum and related count on a single node.

Finally, the process method also uses the isFinal flag to determine its correct behavior. If it’s true, that means that the value to be processed is a PartialResult instance, so it reads partial sum and count from it and adds them to the main aggregator’s sum and count fields. Otherwise, it simply adds the value to the sum field and increments the count field by one.

We have implemented AverageAggregator in order to demonstrate with a simple example how the isFinal flag should be used to control the aggregation, as well as to show that the partial and the final result do not have to be of the same type. However, this particular aggregator is pretty much a throw-away piece of code, as we’ll see in the next section.

LEAVE A REPLY

Please enter your comment!
Please enter your name here