21 min read

Aggregate results

One thing to be aware of with the MapGridAgent interface is its potential for a partition to send huge result maps to a client. This is the nature of the map function. Its output size can be proportional to its input size if we don’t use a query to select    specific objects to work with or specify a key set. In this case, we need a specific result for every key, with the key set as narrow as we can make it. We then just need to deal with large maps once in a while.

What if we need an aggregate result for a key set? Instead of an operation and result for each element, we need an operation over all elements with just one result. Simple examples include the highest or lowest number in a set, and the earliest or total payroll expenses in a management hierarchy. In these examples, we need data from a set of elements in a partition, but we don’t need a result for each. We only want one result for the entire set of objects.

Going back to our functional programming reference, this is where the reduce function shines. Like the map function, reduce has a corresponding grid agent interface. The reduce function takes a collection of input keys and only produces one result for the entire collection. The result is typically an aggregate result: a sum, product, max, min, average, or any other aggregate function.

Classes that implement ReduceGridAgent are used as parameters to the AgentManager#callReduceAgent(ReduceGridAgent agent, Collection keys) and AgentManager#callReduceAgent(agent) methods. The implementation itself is similar to the MapGridAgent pattern. The reduce grid agent we write operates on a collection of known keys or an unknown key set. If we have a known key set, then we will run the agent with AgentManager#callReduceAgent(agent, keys). If the key set is not known, and if we need a query to find the interesting objects, then we will call the AgentManager#callReduceAgent(agent).

Let’s write a ReduceGridAgent that finds the largest integer in a set. We’ll start with a naïve implementation for finding the largest integer in an array:

public int findLargestInteger(Integer[] ints) {
int largestInt = ints[0];
for (int i = 0; i < ints.length; i++) {
if (ints[i] > largestInt) {
largestInt = ints[i];
}
}
return largestInt;
}

Implementing ReduceGridAgent requires three methods. Two of those methods look like the process methods in MapGridAgent. We have ReduceGridAgent#reduce(session, map, keys) and ReduceGridAgent#reduce(session, map). Like its MapGridAgent counterparts, the reduce method that accepts keys in the signature works with keys or Entity objects. The reduce method without keys in the signature should use a Query to find the objects most interesting to our business logic.

public class LargestIntReduceAgent implements ReduceGridAgent,
EntityAgentMixin {
public Object reduce(Session session, ObjectMap map,
Collection keys) {
MyInteger largestInt = null;
Iterator iter = keys.iterator();
while (iter.hasNext()) {
(MyInteger)myInt = (MyInteger)iter.next();
if (myInt.greaterThan(largestInt)) {
largestInt = myInt;
}
}
return largestInt;
}
public Object reduce(Session session,ObjectMap map) {
// Nothing to do for now!
}
public Object reduceResults(Collection results) {
// Nothing to do for now!
}
public Class getClassForEntity() {
return MyInteger.class;
}
}

The first reduce method is similar in signature to the MapGridAgent#process(session, map, key) method. The difference here is that the third argument in ReduceGridAgent#reduce(session, map, keys) is a collection of keys rather than one key. This immediately illustrates the difference between the two. A Map operation takes place on only one element. Reduce operates on the entire collection.

With a known key set, the ReduceGridAgent#reduce(session, map, keys) method is called. Without a key set passed to the AgentManager#callReduceAgent(agent) method, the GridReduceAgent#reduce(session, map) method is called. This method should use a Query to ?  nd the objects we want to use in our business logic. The keys or entity objects can then be passed to the ReduceGridAgent#reduce(session, map, keys) method for the actual business logic.

We submit this agent to the grid in almost the same way as we submit a MapGridAgent to the grid. AgentManager has two callReduceAgent methods. The first takes a collection of keys as an argument, while the second does not.

Submitting this agent to the grid looks like this:

Collection numbers = new ArrayList();
for(int i = 0; i < 10000; i++) {
numbers.add(i);
}
ReduceGridAgent agent = new LargestIntReduceAgent();
AgentManager am = session.getMap("MyInteger").getAgentManager();
am.callReduceAgent(agent, numbers);

This looks so similar to submitting a MapGridAgent to the grid and you may miss the method change to am.callReduceAgent(agent, keys). The programming models are so similar you may ask why there isn’t just one generic callAgent method. Take a look at the ReduceGridAgent, particularly the ReduceGridAgent#reduceResults(results)  method. This method is called on the client side after all instances of the agent return their results. At this point, we have a collection of results for each partition. It is acceptable for the AgentManager#callMapAgent(agent, keys) to return the merged results here. AgentManager#callReduceAgent(agent, keys) must return one result for the entire operation. The ReduceGridAgent#reduceResults(results) method aggregates each partition’s aggregate results:

public class LargestIntReduceAgent implements ReduceGridAgent,
EntityAgentMixin {
public Object reduce(Session session, ObjectMap map,
Collection keys) {
return findLargestInt(keys);
}
public Object reduce(Session session,ObjectMap map) {
// Nothing to do for now!
}
public Object reduceResults(Collection results) {
findLargestInt(results);
}
public Class getClassForEntity() {
return MyInteger.class;
}
private MyInteger findLargestInt(Collection keys) {
MyInteger largestInt = null;
Iterator iter = keys.iterator();
while (iter.hasNext()) {
(MyInteger)myInt = (MyInteger)iter.next();
if (myInt.greaterThan(largestInt)) {
largestInt = myInt;
}
}
return largestInt;
}
}

ReduceGridAgent#reduceResults(keys) is responsible for producing the final result passed back to the AgentManager#callReduceAgent(agent, keys) caller. Sometimes, the reduce operation performed in this final aggregation is the same as the operation performed in the ReduceGridAgent#reduce(session, map, keys) method. Sometimes, the operation is different. In our case, it is the same, and we refactor the reduce operation into a private method.

Finishing off the ReduceGridAgent, we come to ReduceGridAgent#reduce(session, map). The method signature is similar to MapGridAgent#processAllEntries(session, map) and should be a hint that they have a similar purpose. The ReduceGridAgent#reduce(session, map) is called when a key list is not provided to AgentManager#callReduceAgent(agent).

ReduceGridAgent#reduce(session, map) should limit the number of objects used in the reduce operation. Like MapGridAgent#processAllEntries(session, map), we typically use a Query. While the reduce agent does not send large results back to the client, we still care about finding objects that meet our criteria to use in the reduce operation:

public Object reduce(Session session,ObjectMap map) {
EntityManager em = session.getEntityManager();
Query q = em.createQuery("select m from MyInteger m " +
"where m.integer > 0 " +
"and m.integer < 10000");
Iterator iter = q.getResultIterator();
Collection<MyInteger> keys = new ArrayList<MyInteger)();
while (iter.hasNext()) {
MyInteger mi = (MyInteger)iter.next();
keys.add(mi);
}
return reduce(session, map, keys);
}

Though these are not strict rules, this method usually follows a pattern like MapGridAgent#processAllEntries(session, map).

  1. Run a query to limit the number of objects used in the reduce operation.
  2. Create a collection of keys used by the reduce operation. We’re using entities here. Rather than duplicating the reduce operation in this method, we use put entities from the Query in the key collection. ReduceGridAgent#reduce(session, map, keys), when using Entities, expects a collection of MyInteger objects.
  3. Call ReduceGridAgent#reduce(session, map, keys) using the key collection we just created. There is no rule against re-implementing the reduce operation in each method but we’ll be good software engineers and keep it DRY. If we can massage the query results into arguments, the reduce method accepts, and then we have enough reason to reuse it.

At this point, we can submit this agent to the grid with or without a set of known keys and get the largest MyInteger back. In both, the MapGridAgent and ReduceGridAgent, we used a Query to limit the number of objects used in each operation:

Query q = em.createQuery("select m from MyInteger m " +
"where m.integer > 0 " +
"and m.integer < 10000");
Iterator iter = q.getResultIterator();

Obviously, this query is limited in what it can do. The criteria is hardcoded into the query. This query can only find MyIntegers with values between 0 and 10,000. Initially, we hardcoded these values because the agent runs on a partition in a container. Fortunately, we can pass additional data along with our agent.


 

Using ephemeral objects in agents

In the previous examples, we hard coded the query criteria in the process and reduce methods. We should let the client-side program set those parameters instead of dictating what range of numbers the queries operate on. Right now, our queries are limited to exactly what is coded. A grid agent is just a POJO. It can have fields, getter and setter methods, and any other methods outside of the implemented grid agent interface. It’s probably best to limit functionality to grid agent functionality but that doesn’t mean that we can’t have fields or other objects on the implementing class.

Classes that implement the agent interfaces are POJOs. We’ll send additional data to the grid by adding fields to the implementing class:

public class LargestIntReduceAgent implements ReduceGridAgent,
EntityAgentMixin {
private Integer minValue;
private Integer maxValue;
// Reduce methods omitted for brevity
public void setMinValue(Integer min) {
this.minValue = min;
}
public void setMaxValue(Integer max) {
this.maxValue = max;
}
}

The only requirement for sending these additional fields to the grid is that they must each be serializable. Sending these objects to the grid is probably a one-way trip. Unless they’re passed back as part of a map result, we cannot use them to communicate the state between client and grid. The grid agent instance used on the client side does not get a copy of the state of grid agent variables when the agents finish execution in the grid. Including grid state objects in the result set is bad practice and unnecessary. Before we pass the agent to AgentManager#callReduceAgent(agent), we set the fields used in the partition-side query:

ReduceGridAgent agent = new LargestIntReduceAgent();
agent.setMinValue(500);
agent.setMaxValue(5000);
AgentManager am = session.getMap("MyInteger").getAgentManager();
am.callReduceAgent(agent);

The ReduceGridAgent#reduce(session, map) method requires a small change to use our new query parameters:

public Object reduce(Session session,ObjectMap map) {
EntityManager em = session.getEntityManager();
Query q = em.createQuery("select m from MyInteger m " +
"where m.integer > ?1 " +
"and m.integer < ?2");
query.setParameter(1, minValue);
query.setParameter(2, maxValue);
Iterator iter = q.getResultIterator();
Collection<MyInteger> keys = new ArrayList<MyInteger)();
while (iter.hasNext()) {
MyInteger mi = (MyInteger)iter.next();
keys.add(mi);
}
return reduce(session, map, keys);
}

It’s almost the same as before. We’ve just parameterized the query. It now uses the two values we sent into the grid with the agent.

We can send more than query parameters along with an agent. We can send additional, complex business logic. If we obey the principles of object-oriented design, then we favor composition over inheritance. This allows the composition of agents with complex map or reduce operations, without cluttering the agent implementation class with business logic. To demonstrate, we’ll refactor the findLargestInt(collection) method out of the LargestIntReduceAgent class:

public interface MyHelper {
public MyInteger call(Collection keys);
}
public class AgentHelper implements MyHelper, Serializeable {
public MyInteger call(Collection keys) {
MyInteger largestInt = null;
Iterator iter = keys.iterator();
while (iter.hasNext()) {
(MyInteger)myInt = (MyInteger)iter.next();
if (myInt.greaterThan(largestInt)) {
largestInt = myInt;
}
}
return largestInt;
}
}

This is just a class that encapsulates the method formerly known as findLargestInt(collection). The name changed to conform to an imaginary calling convention is used by our agents. The ReduceGridAgent changes a bit to accommodate this calling convention:

public class LargestIntReduceAgent implements ReduceGridAgent,
EntityAgentMixin {
private Integer minValue;
private Integer maxValue;
private MyHelper helper;
public Object reduce(Session session, ObjectMap map,
Collection keys) {
return helper.call(keys);
}
public Object reduce(Session session,ObjectMap map) {
EntityManager em = session.getEntityManager();
Query q = em.createQuery("select m from MyInteger m " +
"where m.integer > ?1 " +
"and m.integer < ?2");
query.setParameter(1, minValue);
query.setParameter(2, maxValue);
Iterator iter = q.getResultIterator();
Collection<MyInteger> keys = new ArrayList<MyInteger)();
while (iter.hasNext()) {
MyInteger mi = (MyInteger)iter.next();
keys.add(mi);
}
return reduce(session, map, keys);
}
public Object reduceResults(Collection results) {
helper.call(results);
}
public getClassForEntity() {
return MyInteger.class;
}
public void setMinValue(Integer min) {
this.minValue = min;
}
public void setMaxValue(Integer max) {
this.maxValue = max;
}
public void setHelper(MyHelper helper) {
this.helper = helper;
}
}

LargestIntReduceAgent’s concern is interacting with the grid. Refactoring the findLargestInt method into different classes keeps our code clean and more easily testable. It also allows algorithm replacement. If we come up with a better map or a reduce method, then the GridAgent implementation doesn’t change.

LargestIntReduceAgent calls the helper.call(collection) method. The AgentHelper class is serialized with the agent and sent to each partition the agent is sent to. Once on the grid, the AgentHelper#call(collection) method is available to the agent. The normal Java serialization process handles agent serialization. Anything serializable in that processes is sent to the grid. Serializing these objects, and sending them to the grid requires that the appropriate class files be on the classpath of each ObjectGrid container process before the agent is sent to the grid.

Updates with agents

The agents we’ve seen so far are idempotent. They do not change any objects in the grid. They create new objects as a result of their operation but the objects queried by the agents remained unchanged.

There is no rule against updating objects in an agent. Any operation valid inside an ObjectGrid transaction can also be performed in an agent, including inserts, updates, and deletes. In this way, an agent doesn’t necessarily need to perform a map or reduce operation. It acts as a code transport between the client and server. We should be cautious with this relaxed approach to agents because there is a lot of potential for abuse. Used with caution, running agents on the grid for inserts, updates, and deletes creates a powerful application controlled by submitting agents to the grid. Building an application around agents reduces the need for running large numbers of client processes.

Let’s go back to our payment processor example to look at updates using a GridAgent. Specifically, we’ll update a batch of deposit payments with a status of BatchStatus.SENT_TO_NETWORK after we receive the payments from the merchant and check for duplicates.

We need to make a choice between using a MapGridAgent and a ReduceGridAgent. The choice depends on the behavior our application needs with the result of the operation. If we want to do more work with each payment after it is sent to the network, then we choose a MapGridAgent. Because we only care that the payments are updated, we’ll choose the ReduceGridAgent. ReduceGridAgent gives one result for the entire operation, which in this case is the status of the operation, either success or failure.

We don’t have a particular known key set for all of the payments in a batch. A large batch has payments spread across nearly all partitions. We call our PaymentStatusReduceAgent with the AgentManager#callReduceAgent(agent) method:

PaymentStatusReduceAgent agent = new PaymentStatusReduceAgent();
agent.setBatch(batch);
agent.setFromStatus(PaymentStatus.WAITING);
agent.setToStatus(PaymentStatus.SENT_TO_NETWORK);
AgentManager am = session.getMap("Payment").getAgentManager();
am.callReduceAgent(agent);

We use the AgentManager#callReduceAgent(agent) method because we want all partitions in the grid to participate in the reduce operation. The reduce operation begins by finding all payments that match a certain criteria. We want all payments for a batch that have a status of WAITING. We set these properties on the agent so that the AgentManager serializes them and sends them to the grid along with the agent. They are used as query parameters in the ReduceGridAgent#reduce(session, map) method:

public Object reduce(Session session,ObjectMap map) {
EntityManager em = session.getEntityManager();
Query q = em.createQuery("select p from Payment p " +
"where p.batch = ?1 " +
"and p.status = ?2");
query.setParameter(1, batch);
query.setParameter(2, fromStatus);
Iterator iter = q.getResultIterator();
Collection<MyInteger> keys = new ArrayList<MyInteger)();
while (iter.hasNext()) {
Payment payment = (Payment)iter.next();
keys.add(payment);
}
return reduce(session, map, keys);
}

We create a collection of payments to pass to the ReduceGridAgent#reduce(session, map, keys) method. In there, we perform the update payment status operations. Instead of an aggregate result based on calculations of objects in the grid, it is based on the success or failure of the update operations to each object. The ReduceGridAgent#reduce(session, map, keys) method returns a  Boolean value if the update succeeds, and throws an exception if it does not:

public Object reduce(Session session, ObjectMap map,
Collection keys) {
try{
Session s = session.getObjectGrid().getSession();
EntityManager em = s.getEntityManager();
Iterator iter = keys.iterator();
while (iter.hasNext()) {
Payment payment = (Payment)iter.next();
payment.setStatus(toStatus);
em.merge(payment);
}
return Boolean.TRUE;
} catch(ObjectGridException e) {
throw new ObjectGridRuntimeException(e);
}
}

Throwing an exception doesn’t exactly follow the spirit of the reduce operation. If the update operation fails, then the exception is thrown up the call stack and across the network to AgentManager#callReduceAgent(agent). If the update operation fails, then we have bigger problems to worry about than the exception uncovered by the update operation. We throw the exception here because the situation is unrecoverable by the reduce operation. A call to ReduceGridAgent#reduceResults(results) is meaningless when there is an exception.

Absent from this code are explicit transaction demarcations. When the MapGridAgent and ReduceGridAgent methods are called, they are under an already-active transaction on the session passed in to them. Should the grid agent methods throw an exception, the transaction is rolled back. This transaction is independent of the client transaction and any other active agent transactions. If one of the agent transactions rolls back, then the client transaction rolls back too.

We see a few interesting things from the payment update implemented as a reduce operation. In the happy-path case, each agent will return Boolean.TRUE. We only return Boolean.TRUE to conform to the method signature. A collection of values of Boolean.TRUE is passed to the ReduceGridAgent#reduceResults(Collection results) method. There is nothing more to do in the reduce operation. The values in the results collection do not play any part in the update operation. The update was successful. We know this because an exception wasn’t thrown in any of the reduce methods.

These two things let us implement a very simple ReduceGridAgent#reduceResults(Collection results) method:

public Object reduceResults(Collection results) {
return null;
}

Either the update succeeds and we don’t need to do any more, or we know the update failed by getting an exception thrown out of the AgentManager#callReduceAgent(agent) method. It may seem strange that we don’t  confirm the update is successful. Do we always explicitly check that JDBC updates were successful? No. We assume that because there was no thrown exception, the update happened. The same goes for our update in the ReduceGridAgent.

For clarity, let’s look at the PaymentStatusReduceAgent in its entirety:

public class PaymentStatusReduceAgent implements ReduceGridAgent,
EntityAgentMixin {
private Batch batch;
private PaymentStatus fromStatus;
private PaymentStatus toStatus;
public Object reduce(Session session, ObjectMap map,
Collection keys) {
try{
Session s = session.getObjectGrid().getSession();
EntityManager em = s.getEntityManager();
Iterator iter = keys.iterator();
em.getTransaction().begin();
while (iter.hasNext()) {
Payment payment = (Payment)iter.next();
payment.setStatus(toStatus);
em.merge(payment);
}
em.getTransaction().commit();
return Boolean.TRUE;
} catch(ObjectGridException e) {
throw new ObjectGridRuntimeException(e);
}
}
public Object reduce(Session session,ObjectMap map) {
EntityManager em = session.getEntityManager();
Query q = em.createQuery("select p from Payment p " +
"where p.batch = ?1 " +
"and p.status = ?2");
query.setParameter(1, batch);
query.setParameter(2, fromStatus);
Iterator iter = q.getResultIterator();
Collection<MyInteger> keys = new ArrayList<MyInteger)();
while (iter.hasNext()) {
Payment payment = (Payment)iter.next();
keys.add(payment);
}
return reduce(session, map, keys);
}
public Object reduceResults(Collection results) {
return null;
}
public getClassForEntity() {
return Payment.class;
}
public void setBatch(Batch b) {
this.batch = b;
}
public void setFromStatus(PaymentStatus status) {
this.fromStatus = status;
}
public void setToStatus(PaymentStatus status) {
this.toStatus = status;
}
}

Scheduling agents

The AgentManager methods are blocking methods. A method call on any method in AgentManager remains at that point in execution, while the data grid runs the agent instances against its primary partitions. The thread that calls the AgentManager method must wait for a return from the call before it proceeds. In case blocking is unacceptable, we should schedule the call to the AgentManager methods using the java.util.concurrent API.

There are two cases to consider when thinking about scheduling agents. The first is with the AgentManager#callReduceAgent(agent) and AgentManager#callMapAgent(agent) methods. These methods do not pass any keys to the agents. In this case, the agent is executed on all primary partitions. It may be okay for a client application to block here while it waits for the result from the grid. Obviously, scheduling insert, update, and delete operations provides some performance improvement, if we work at the client-side in the future, that does not depend on those objects being in the grid (or not, as the case may be). A read operation where the client depends on the result before proceeding probably shouldn’t schedule the agent.

One case where scheduling read operations is important is when we have multiple sets of keys passed to agents of the same type. Given a large object set, where the objects partition many different primaries, we don’t want to pass the entire key set to an agent. For a sufficiently large key set, an agent will spend most of its time processing (ignoring) keys that do not belong to its partition. Instead, we can pre-sort the keys into collections of objects where all belong to the same partition. We then send the smaller, pre-sorted collections to the grid.

We determine an object’s partition with a PartitionManager. Each BackingMap has a PartitionManager associated with it, which is obtained with the BackingMap#getPartitionManager() method. PartitionManager#getPartition(Object key) returns the 0-based partition number, which is the partition the PartitionManager puts the object in. This is easy when working with the ObjectMap API. Let’s assume:

MyInteger mi = (MyInteger)myIntMap.get(35);
BackingMap map = session.getObjectGrid().getMap("MyInteger");
int partitionId = map.getPartitionManager().getPartition(35);

We don’t need the first line. It only shows that we have a MyInteger with a key of 35. We obtain the ObjectGrid reference, and then the BackingMap for the MyInteger map from the session.  We then call the getPartition(key) method for that same key. The result of this call is the ID of the partition that holds the MyInteger object with the key 35.

Now, we can use object and entity keys to sort objects based on partitions. After sorting the objects into smaller collections, we pass them to AgentManager#callMapAgent(agent, keys) and AgentManager#callReduceAgent(agent, keys). These calls should now be scheduled in different threads, rather than making each call in a loop. If we then make these method calls in a loop, we then effectively turn the data grid into an expensive client program. The client program blocks during each call to the AgentManager methods. If we have 20 key collections that map to 20 different partitions, we will send only one request at a time to the grid if we send the agents in a loop. Instead, we want them to execute in parallel. We can do this by sending each instance of grid agent to the grid using a java.util.concurrent.ExecutorService.

Summary

We covered a lot of ground again in this article. Working with objects where they live produces much higher throughput than dragging objects to a client and pushing them back to the grid when we’re done. Co-locating logic and data is easy to do with the DataGrid API.

DataGrid gives us a few patterns to follow when writing agents. It also makes us think in terms of map operations and reduce operations. Though these two methods seem limiting at first, they are useful when operating on very large data sets. The    map operation gives us a way to perform an algorithm on each object in a set. The reduce operation lets us create aggregate results from a set.

We aren’t limited to only sending logic to the grid with an agent. Thanks to Java serialization, we send any serializable object referenced by our agent to the grid along with it. This gives us flexibility in running queries in an agent, and in passing helper logic.

We also looked at pre-sorting objects into maps based on their partition ID. This reduces the size of the bytes sent from the client to a partition, and lets the agent run only for keys known to be in the partition the agent runs on.

With a little imagination, we can put more work on the grid. This gives us much higher throughput and scales horizontally with the resources given to our grid. Appropriately partitioned, a grid can scale out and return results at a predictable rate, no matter how many objects it stores.

LEAVE A REPLY

Please enter your comment!
Please enter your name here