18 min read

In a client-server ObjectGrid interaction, local ObjectGrid instances run in the same memory process as the business application. Access to objects stored in the grid is extremely fast, and there are no network hops or routing done on ObjectGrid operations. The disadvantage with a local ObjectGrid instance is that all objects stored in the grid must fit into the heap space of one JVM.

The client-server distributed ObjectGrid instances overcomes that single heap space disadvantage by combining the resources of multiple JVMs on multiple servers. These combined resources hide behind the façade of an ObjectGrid instance. The ObjectGrid instance has far more CPU, memory, and network I/O available to it than the resources available to any single client.

In this article, we’ll learn how to use those resources held by the ObjectGrid instance to co-locate data and business logic on a single JVM. The client-server model relies on a client pulling objects across a network from an ObjectGrid shard. The client performs some operations on those objects. Any object whose state has changed must be sent back across the network to the appropriate shard. The client-server programming model co-locates data and code by moving data to the code. The data grid programming model does the opposite by moving code to the data. Rather than dragging megabytes of objects from an ObjectGrid shard to a client, only to send it right back to the ObjectGrid, we instead send our much smaller application code to an ObjectGrid shard to operate on the data in place. The end result is the same: code and data are co-located. We now have the resources of an entire data grid available to run that code instead of one client process.

What does DataGrid do for me?

The DataGrid API provides encapsulation to send application-specific methods into the grid and operate directly on the objects in shards. The API consists of only five public classes. These five classes provide us with several patterns to make an ObjectGrid instance do the heavy lifting for a client application. The client application did a lot of work by operating on the objects in the grid. The client requires a network hop to get an object from the grid and performs an operation on it, persisting that the object requires another network hop to the grid.

In a single client environment, the probable bottlenecks in dealing with ObjectGrid are all on the client side. A single client will not stress the resources in the ObjectGrid deployment. The client application is most likely the bottleneck. With all computers in a deployment being equal, one client application on one computer will not stress the combined resources of the grid.

In a naïve application that performs single object get and put operations, our application will first notice a bottleneck due to data starvation. This is where a client cannot get the data it needs fast enough, caused by network latency. Single object get and put operations (and the corresponding Entity API calls) won’t saturate a gigabit ethernet connection by any means, but the latency in making the RPC is higher than what the CPU can handle. The application works, but it’s slow.

A smarter application would use the ObjectMap#getAll method. This would go out to the grid and get an object for every key in the list. Instead of waiting for each individual object, the client application waits for the entire list to come over the network. While the cost of network RPC is amortized over the size of the list, the client still incurs that cost.

In addition to these network latency concerns, we may not want a near-cache that eats up client-side memory. Turning off the near-cache means that every get operation is an RPC. Turning it on means that some of our JVM heap space is used to store objects, which we may not need after the first use.

The fundamental problem is that our objects and client application are architecturally separated. For our application to do anything, it needs to operate on objects that exist in the grid. In the client-server model, we copy data from the server to the client. At this point, our data and code are co-located, and the application can perform some business logic with that data. This model breaks down when there are huge data sets copied between boxes.

Databases co-locate data and code with stored procedures. The processing power of the stored procedure is a product of the CPU and memory resources of the computer running the database. The stored procedure is code compiled into a module and executed by the database. Within that process, the stored procedure accesses data available in the same process.

ObjectGrid gives us the ability to run code in the same process that gives an object access via the DataGrid API. Unlike the database example, where the throughput and latency of getting the store procedure result is limited to the power of the server it’s on, ObjectGrid’s power is limited by the number of CPUs in the deployment, and it can scale out at any time.

ObjectGrid co-locates our code and objects by sending serialized classes with our application code methods to primary partitions in the grid. There are two ways to do this. The first way sends the code to every primary partition in the grid. The code executes and returns a result to the client. In the second way, we supply a collection of keys to the DataGrid API. With a list of keys, ObjectGrid only sends the application code to the partitions that contain at least one object with a key in the list. This reduces the amount of container processes doing the work for our client application, and is preferred instead of making the entire grid service on one client request.

The DataGrid API with IBM WebSphere eXtreme Scale 6: Part 1

Let’s look at finding an object by key in the client-server distributed model. The client has a key for an object. Calling the ObjectMap#get(key) method creates some work for the client. It first needs to determine to which partition the key belongs. The partition is important because the ClientClusterContext, already obtained by the client, knows how to get to the container that holds the primary shard in one hop. We find out the partition ID (pID) for a key with the PartitionManager class:

BackingMap bMap = grid.getMap("Payment");
PartitionManager pm = bMap.getPartitionManager();
int pId = pm.getPartition(key);

After obtaining the partition ID and the host running the container process, the client performs a network hop to request the object. The object is serialized and sent back to the client, where the client performs some operation with the object. Persisting an updated object requires one more network hop to put it back in the primary shard.

We can now repeat that process for every object in our multi-million object collection.

On second thought, that may not be such a great idea. Instead, we’ll create an agent that we send to the grid. The agent encapsulates the logic we want to perform. An AgentManager serializes the agent and sends it to each primary shard in the deployment. Once on a primary shard, the agent executes and produces a result which is sent back to the client.


 

Borrowing from functional programming

The DataGrid API borrows the “map” and “reduce” concepts from the world of functional programming. Just so we’re all on the same page, let’s go over the concepts behind these two functions. Functional programming focuses more on what a program does, instead of how it does it. This is in contrast to the most imperative programming we do in the C family of languages. That’s not to say we can’t follow a functional programming model, it’s just that we don’t. Other languages, like Lisp and its descendants, make functional programming the natural thing to do.

Map and reduce are commonly found in functional programming. They are known as higher-order functions because they take functions as arguments. This is similar to how we would use a function pointer in C, or an anonymous inner class in Java, to implement callbacks. Though the focus is on what to do, at some point, we need to tell our program how to do it. We do this with the function passed as an argument to map or reduce.

Let’s look at a simple example in Ruby, which has both functional and imperative programming influences:

>> numbers = [0,1,2,3,4,5,6,7,8,9]
>> numbers.map { |number| number * 2 }
=> [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

We assign an array of numbers 0-9 to the variable numbers. The array has a method called map that we call in the second line. Map is a higher-order function and accepts a function as its argument. The Array#map method calls the passed-in function for each element in the array. It passes the element in the variable numbers. In this way, we return a new array that contains the results of each call to our function which performs number * 2.

Let’s look at the reduce method. In Ruby, reduce is called inject but the concept is the same:

>> numbers = [0,1,2,3,4,5,6,7,8,9]
>> numbers.inject(0) { |sum, number| sum = sum + number }
=> 45

The inject (read as reduce) method takes a function that performs a running total on the numbers in the array. Instead of an array as our return type, we only get one number. The reduce operation returns a single result for an entire data set. The map operation returns a new set based on running the original set through a given function.

These concepts are relevant in the data grid environment because we work with large data sets where we frequently need to work with large segments of data. Pulling raw data across the network, and operating over the data set on one client, are both too slow. Map and reduce helps us by using the remote CPU resources of the grid to cut down on the data sent across the network and the CPU power required on the client. This help comes from writing methods that work like map and reduce and sending them to our objects in the grid.

java.util.M  ap, BackingMaps, ObjectMaps, HashMaps, like we need one more use for the word “map”. We just saw the functional origin of the map concept. Let’s take a look at a Java implementation. Map implements an algorithm that performs an operation on each element in a collection and returns a new collection of results:

public Collection doubleOddInts(Collection c) {
Collection results = new HashSet();
Iterator iter = c.iterator();
while (iter.hasNext()) {
int i = (Integer)iter.next();
if (i % 2 == 0) {
[ 172 ]
results.add(i);
} else {
results.add(i*2);
}
}
return results;
}

Our needs go beyond performing a map function over an array. In order to be useful in a DataGrid environment, the map function must operate on a distributed collection of objects in an ObjectGrid instance. The DataGrid API supports this by giving us the MapGridAgent interface. A business logic class implements the two methods in MapGridAgent to encapsulate the code we intend to run in the grid. Classes that implement MapGridAgent must implement two methods, namely, MapGridAgent#process(Session session, ObjectMap map, Object key) and MapGridAgent#processAllEntries(Session session, ObjectMap map).

Let’s implement the doubleOddInts algorithm with MapGridAgent. We first create a class that implements the MapGridAgent interface. We give this class a meaningful name that describes the map operation implemented in the process methods:

public class DoubleOddIntsMapAgent implements Serializable,
MapGridAgent {
public Object process(Session session, ObjectMap map, Object key)
{
int i = (Integer)map.get(key);
if (i % 2 == 0) {
return i;
} else {
return i*2;
}
}
public Map processAllEntries(Session session, ObjectMap map) {
// nothing to do here for now!
}
}

The map function itself is called by our client code. The process (session, map, key) method performs the how in the map function. Because ObjectGrid gives us the what for free (the map function), we only need to implement the how part. Like the Ruby example, this process (session, map, key) method is performed for each element in a collection. The Session and ObjectMap arguments are supplied by the AgentManager based on the current session and ObjectMap that starts the map function. The key is the crucial object for a given value in the collection, and that collection is supplied by us when we run the DoubleOddIntsMapAgent.

After implementing the MapGridAgent#process(session, map, key) method, the DoubleOddIntsMapAgent is ready to run. We want it to run on each shard in an ObjectGrid instance that has a key in the collection we pass to it. We do this with an instance of the AgentManager class. The AgentManager class has two methods to send a MapGridAgent to the grid: AgentManager#callMapAgent(MapGridAgent agent, Collection keys) and AgentManager#callMapAgent(MapGridAgent agent).

The first method provides a set of keys for our agent to use when run on each partition. Using this method is preferable to the non-keyed version because the non-keyed version runs the code on every primary shard in the grid. The Agent Manager#callMapAgent(agent, keys) method only runs the code on primary partitions that contain at least one key in the key collection. Whenever we have the choice to use part of the grid instead of the entire grid, we should take the choice that uses only part of the grid. Whenever we use the entire grid for one operation, we limit scalability and throughput. The AgentManager serializes the DoubleOddIntsMapAgent agent and sends it to each partition that has a key in the keys collection. Once on the primary partition, the process (session, map, key) method is called for each key in the keys collection supplied to AgentManager#callMapAgent(agent, keys). This set of keys is a subset of all of the keys in the BackingMap, and likely a subset of keys in each partition.

Let’s create an instance of this agent and submit it to the grid:

Collection numbers = new ArrayList();
for(int i = 0; i < 10000; i++) {
numbers.add(i);
}
MapGridAgent agent = new DoubleOddIntsAgent();
AgentManager am = session.getMap("Integer").getAgentManager();
am.callMapAgent(agent, numbers);

This example assumes that we have a BackingMap of Integer for both the key and value objects. The numbers collection is a list of keys to use. Once we create the agent, we submit it to the grid with the 10,000 keys to operate on. Before running the agent, the AgentManager sorts the keys by partition. The agent only runs on partitions that have a list of keys that hash to that partition. The agent runs on each partition that has a list of keys that hash to it. In each primary partition, the DoubleOddIntsMapAgent#process(session, map, key) method is called only for the keys that map to that partition.

GridAgent and Entity

GridAgent works with Entity classes as well. We don’t directly use key objects when working with Entity objects. The Entity API hides the key/value implementation from us to make working with Entity objects easier than working with the ObjectMap API.

The method definition for MapGridAgent#process(session, map, key) normally expects an object to be used as a key for an ObjectMap. We can still find the value object by converting key and value objects to their Tuple representations, but the DataGrid API makes it much easier for us. Instead of passing a key to the process method, we can convince the primary shard to pass us the Entity object itself, rather than a key using the EntityAgentMixin interface. EntityAgentMixin has one method, namely, EntityAgentMixin#getClassForEntity(). The implementation of this method should return the class object of the Entity. DataGrid needs this method defined in the grid agent implementation so it can provide the Entity object itself, rather than its key to the MapGridAgent#process(session, map, key) method.

Let’s assume that we have an Entity MyInteger that acts as a wrapper for Integer:

public class DoubleOddIntsMapAgent implements Serializable,
MapGridAgent, EntityAgentMixin {
public Object process(Session session, ObjectMap map, Object key)
{
MyInteger myInt = (MyInteger)key;
if (myInt.mod(2) == 0) {
return myInt;
} else {
return myInt.multiplyBy(2);
}
}
public Map processAllEntries(Session session, ObjectMap map) {
// nothing to do here for now!
}
public Class getClassForEntity() {
return MyInteger.class;
}
}

Our agent now implements the EntityAgentMixin interface and the getClassForEntity() method. The key is converted to the correct class before the MapGridAgent#process(session, map, key) method is called. Instead of the Tuple key for an Entity, the process method is passed a reference to the Entity itself. Because it is passed as an object, we must cast the Entity to its defined class. There is no need to look up for the Entity in its BackingMap because it’s already the Entity we want to work with. This means the collection of keys passed to AgentManager#callMapAgent(agent, keys) is a collection with all elements of the c lass returned by getClassForEntity().

GridAgent with an unknown key set

We may not always know the keys for each object we want to submit to an agent. In this situation, we send an agent into the grid without a key set. The grid agent cannot call the process (session, map, key) method because we don’t know which keys to use. Instead, our grid agent method relies on the Query API to narrow the number of objects in each partition we work with. The MapGridAgent interface gives us the MapGridAgent#processAllEntries(Session session, ObjectMap map) method for this situation.

The MapGridAgent#processAllEntries(session, map) method lets us specify what to do when we potentially need to work with all objects in a partition. Particularly, it lets us narrow the field with a query. In the past, we used a query to find card and address objects in a local ObjectGrid instance. This was fine for local instances with only one partition. The real power of the Query API is revealed when used with the DataGrid API.

Query does not work across partitions when called from an ObjectGrid client in a distributed environment. It works with just one partition. In a distributed deployment, where we use the DataGrid API, a grid agent instance runs on one partition. Each partition has an instance of the grid agent running in it and each agent can see the objects in its partition. If we have 20 partitions, then we have 20 grid agents running, one in each partition. Because we’re working with a single partition in each grid agent, we use the Query API to determine which objects are of interest to the business logic.

Now that we know how to run code in the grid, the Query API is suddenly much more useful. Now, we want a query to run against just one partition. Using a query in a GridAgent is a natural fit. Each agent runs on one partition, and each query runs on that partition in the primary shard container process:

public class DoubleOddIntsMapAgent implements Serializable,
MapGridAgent, EntityAgentMixin {
public Object process(Session session, ObjectMap map, Object key)
{
MyInteger myInt = (MyInteger)key;
if (myInt.mod(2) == 0) {
return myInt;
} else {
return myInt.multiplyBy(2);
}
}
public Map processAllEntries(Session session, ObjectMap map) {
EntityManager em = session.getEntityManager();
Query q = em.createQuery("select m from MyInteger m " +
"where m.integer > 0 " +
"and m.integer < 10000");
Iterator iter = q.getResultIterator();
Map<MyInteger, Integer> results =
new HashMap<MyInteger, Integer)();
while (iter.hasNext()) {
MyInteger mi = (MyInteger)iter.next();
results.put(mi, (Integer)process(session, map, mi));
}
return results;
}
public Class getClassForEntity() {
return MyInteger.class;
}
}

The MapGridAgent#processAllEntries(session, map) method generally follows the same pattern when implemented:

  1. Narrow the scope of objects in the partition. This is important in the MapGridAgent because it returns a result for every object it processes. This can result in hundreds of megabytes of objects sent back to a client from every partition for an indiscriminate query.
  2. Create a map to hold the results of each process operation. This map is keyed with the key object, or the value object, when using ObjectMap. The client application can perform its own gets if the keys are returned. Otherwise, it works directly with the value objects. We can also return a map of key/value objects. The map is keyed with the Entity class itself when using Entity.
  3. Iterate over the query results calling MapGridAgent#process(session, map, key) for each result. Calling the process method is required here since we didn’t pass a collection of keys to the AgentManager#callMapAgent(agent) method. The key set is unknown before the agent runs. The agent finds all objects in a partition that meet our criteria for processing, and then we call process to get each result.
  4. Return the results. This map contains an entry for each object that meets our processing criteria in this partition. This map is merged, client-side, with the maps from every other partition where the agent ran. The merged map is the final result, and it is the return value to the AgentManager#callMapAgent(agent) method.

Following the call to AgentManager#callMapAgent(agent), we have a Map that contains the combined agent results from every partition. We also split the workload between N partitions rather than performing all of the processing on the client. The ObjectGrid deployment performed our business logic because we passed the business logic to the grid rather than pulling objects out of the grid.

One of the great things about this pattern is that our task on many partitions completes in about 1/Nth the amount of time it would take for one huge partition containing the same objects running on one computer. Of course, there is the overhead of the merge operation and network connections, but this is amortized over the number of primary partitions used by the agent. This is distinctly different than scaling up a database server when it needs more CPU speed for stored procedures. Instead of incurred downtime for database server migration, we simply add more containers on additional computers. The power of our grid increases as easily as starting a few more JVMs.

>> Continue Reading: The DataGrid API with IBM WebSphere eXtreme Scale 6: Part 2

LEAVE A REPLY

Please enter your comment!
Please enter your name here