14 min read

In this article by Mat Johns, author of the book Getting Started with Hazelcast – Second Edition, we will learn the following topics:

  • Creating and using collection listeners
  • Instance, lifecycle, and cluster membership listeners
  • The partition migration listener
  • Quorum functions and listeners

(For more resources related to this topic, see here.)

Listening to the goings-on

One great feature of Hazelcast is its ability to notify us of the goings-on of our persisted data and the cluster as a whole, allowing us to register an interest in events. The listener concept is borrowed from Java. So, you should feel pretty familiar with it. To provide this, there are a number of listener interfaces that we can implement to receive, process, and handle different types of events—one of which we previously encountered. The following are the listener interfaces:

  • Collection listeners:
    • EntryListener is used for map-based (IMap and MultiMap) events
    • ItemListener is used for flat collection-based (IList, ISet, and IQueue) events
    • MessageListener is used to receive topic events, but as we’ve seen before, it is used as a part of the standard operation of topics
    • QuorumListener is used for quorum state change events
  • Cluster listeners:
    • DistributedObjectListener is used for the collection, creation, and destruction of events
    • MembershipListener is used for cluster membership events
    • LifecycleListener is used for local node state events
    • MigrationListener is used for partition migration state events

The sound of our own data

Being notified about data changes can be rather useful as we can make an application-level decision regarding whether the change is important or not and react accordingly. The first interface that we are going to look at is EntryListener. This class will notify us when changes are made to the entries stored in a map collection. If we take a look at the interface, we can see four entry event types and two map-wide events that we will be notified about. EntryListener has also being broken up into a number of individual super MapListener interfaces. So, should we be interested in only a subset of event types, we can implement the appropriate super interfaces as required. Let’s take a look at the following code:

void entryAdded(EntryEvent<K, V> event);
void entryRemoved(EntryEvent<K, V> event);
void entryUpdated(EntryEvent<K, V> event);
void entryEvicted(EntryEvent<K, V> event);
void mapCleared(MapEvent event);
void mapEvicted(MapEvent event);

Hopefully, the first three are pretty self-explanatory. However, the fourth is a little less clear and in fact, one of the most useful. The entryEvicted method is invoked when an entry is removed from a map non-programmatically (that is, Hazelcast has done it all by itself). This instance will occur in one of the following two scenarios:

  • An entry’s TTL has been reached and the entry has been expired
  • The map size, according to the configured policy, has been reached, and the appropriate eviction policy has kicked in to clear out space in the map

The first scenario allows us a capability that is very rarely found in data sources—to have our application be told when a time-bound record has expired and the ability to trigger some behavior based on it. For example, we can use it to automatically trigger a teardown operation if an entry is not correctly maintained by a user’s interactions. This will allow us to generate an event based on the absence of activity, which is rather useful!

Let’s create an example MapEntryListener class to illustrate the various events firing off:

public class MapEntryListener
implements EntryListener<String, String> {
@Override
public void entryAdded(EntryEvent<String, String> event) {
System.err.println("Added: " + event);
}
@Override
public void entryRemoved(EntryEvent<String, String> event) {
System.err.println("Removed: " + event);
}
@Override
public void entryUpdated(EntryEvent<String, String> event) {
System.err.println("Updated: " + event);
}
@Override
public void entryEvicted(EntryEvent<String, String> event) {
System.err.println("Evicted: " + event);
}
@Override
public void mapCleared(MapEvent event) {
System.err.println("Map Cleared: " + event);
}
@Override
public void mapEvicted(MapEvent event) {
System.err.println("Map Evicted: " + event);
}
}

We shall see the various events firing off as expected, with a short 10-second wait for the Berlin entry to expire, which will trigger the eviction event, as follows:

Added: EntryEvent {c:capitals} key=GB, oldValue=null, value=Winchester,
event=ADDED, by Member [127.0.0.1]:5701 this
Updated: EntryEvent {c:capitals} key=GB, oldValue=Winchester,
value=London, event=UPDATED, by Member [127.0.0.1]:5701 this
Added: EntryEvent {c:capitals} key=DE, oldValue=null, value=Berlin,
event=ADDED, by Member [127.0.0.1]:5701 this
Removed: EntryEvent {c:capitals} key=GB, oldValue=null, value=London,
event=REMOVED, by Member [127.0.0.1]:5701 this
Evicted: EntryEvent {c:capitals} key=DE, oldValue=null, value=Berlin,
event=EVICTED, by Member [127.0.0.1]:5701 this

We can obviously implement the interface as extensively as possible to service our application, potentially creating no-op stubs should we wish not to handle a particular type of event.

Continuously querying

The previous example focuses on notifying us of all the entry events. However, what if we were only interested in some particular data? We can obviously filter out our listener to only handle entries that we are interested in. However, it is potentially expensive to have all the events flying around the cluster, especially if our interest lies only in a minority of the potential data. To address this, we can combine capabilities from the map-searching features that we looked at a while back. As and when we register the entry listener to a collection, we can optionally provide a search Predicate that can be used as a filter within Hazelcast itself.

We can whittle events down to relevant data before they even reach our listener, as follows:

IMap<String, String> capitals = hz.getMap("capitals");
capitals.addEntryListener(new MapEntryListener(),
new SqlPredicate("name = 'London'"), true);

Listeners racing into action

One issue with the previous example is that we retrospectively reconfigured the map to feature the listener after it was already in service. To avoid this race condition, we should wire up the listener before the node-entering service. We can do this by registering the listener within the map configuration, as follows:

<hazelcast>
<map name="default">
<entry-listeners>
<entry-listener include-value="true">
com.packtpub.hazelcast.listeners.MapEntryListener
</entry-listener>
</entry-listeners>
</map>
</hazelcast>

However, in both the methods of configuration, we have provided a Boolean flag when registering the listener to the map. This include-value flag allows us to configure the listener when it is invoked, as if we are interested in just the key of the event entry or the entries value as well. The default behavior (true) is to include the value, but in case the use case does not require it, there is a performance benefit of not having to provide it to the listener. So, if the use case does not require this extra data, it will be beneficial to set this flag to false.

Keyless collections

Though the keyless collections (set, list, and queue) are very similar to map collections, they feature their own interface to define the available events, in this case, ItemListener. It is not as extensive as its map counterpart, featuring just the itemAdded and itemRemoved events, and can be used in the same way, though it only offers visibility of these two event types.

Programmatic configuration ahead of time

So far, most of the extra configurations that we applied have been done either by customizing the hazelcast.xml file, or retrospectively modifying a collection in the code. However, what if we want to programmatically configure Hazelcast without the race condition that we discovered earlier? Fortunately, there is a way. By creating an instance of the Config class, we can configure the appropriate behavior on it by using a hierarchy that is similar to the XML configuration, but in code. Before passing this configuration object over to the instance creation method, the previous example can be reconfigured to do so, as follows:

public static void main(String[] args) {
Config conf = new Config();
conf.addListenerConfig(
new EntryListenerConfig(new MapEntryListener(), false, true));
HazelcastInstance hz = Hazelcast.newHazelcastInstance(conf);

Events unfolding in the wider world

Now that we can determine what is going on with our data within the cluster, we may wish to have a higher degree of visibility of the state of the cluster itself. We can use this either to trigger application-level responses to cluster instability, or provide mechanisms to enable graceful scaling. We are provided with a number of interfaces for different types of cluster activity. All of these listeners can be configured retrospectively, as we have seen in the previous examples. However, in production, it is better to configure them in advance for the same reasons regarding the race condition as the collection listeners. We can do this either by using the hazelcast.xml configuration, or by using the Config class, as follows:

<hazelcast>
<listeners>
<listener>com.packtpub.hazelcast.MyClusterListener</listener>
</listeners>
</hazelcast>

The first of these, DistributedObjectListener, simply notifies all the nodes in the cluster as to the new collection objects that are being created or destroyed. Again, let’s create a new example listener, ClusterObjectListener, to receive events, as follows:

public class ClusterObjectListener
implements DistributedObjectListener {
@Override
public void distributedObjectCreated(
DistributedObjectEvent event) {
System.err.println("Created: " + event);
}
@Override
public void distributedObjectDestroyed(
DistributedObjectEvent event) {
System.err.println("Destroyed: " + event);
}
}

As these listeners are for cluster-wide events, the example usage of this listener is rather simple. It mainly creates an instance with the appropriate listener registered, as follows:

public class ClusterListeningExample {
public static void main(String[] args) {
Config config = new Config();
config.addListenerConfig(
new ListenerConfig(new ClusterObjectListener()));
HazelcastInstance hz = Hazelcast.newHazelcastInstance(config);
}
}

When using the TestApp console, we can create and destroy some collections, as follows:

hazelcast[default] > ns test
namespace: test
hazelcast[test] > m.put foo bar
null
hazelcast[test] > m.destroy
Destroyed!

The preceding code will produce the following, logging on ALL the nodes that feature the listener:

Created: DistributedObjectEvent{eventType=CREATED, serviceName='hz:impl:mapService', 
distributedObject=IMap{name='test'}}
Destroyed: DistributedObjectEvent{eventType=DESTROYED, serviceName='hz:impl:mapService', 
distributedObject=IMap{name='test'}}

The next type of cluster listener is MembershipListener, which notifies all the nodes as to the joining or leaving of a node from the cluster. Let’s create another example class, this time ClusterMembershipListener, as follows:

public class ClusterMembershipListener
implements MembershipListener {
@Override
public void memberAdded(MembershipEvent membershipEvent) {
System.err.println("Added: " + membershipEvent);
}
@Override
public void memberRemoved(MembershipEvent membershipEvent) {
System.err.println("Removed: " + membershipEvent);
}
@Override
public void memberAttributeChanged(MemberAttributeEvent
memberAttributeEvent) {
System.err.println("Changed: " + memberAttributeEvent);
}
}

Let’s add the following code to the previous example application:

conf.addListenerConfig(new ListenerConfig(new   ClusterMembershipListener()));

Lastly, we have LifecycleListener, which is local to an individual node and allows the application built on top of Hazelcast to understand its particular node state by being notified as it changes when starting, pausing, resuming, or even shutting down, as follows:

public class NodeLifecycleListener implements LifecycleListener {
@Override
public void stateChanged(LifecycleEvent event) {
System.err.println(event);
}
}

Moving data around the place

The final listener is very useful as it lets an application know when Hazelcast is rebalancing the data within the cluster. This gives us an opportunity to prevent or even block the shutdown of a node, as we might be in a period of increased data resilience risk because we may be actively moving data around at the time. The interface used in this case is MigrationListener. It will notify the application when the partitions migrate from one node to another and when they complete:

public class ClusterMigrationListener implements MigrationListener {
@Override
public void migrationStarted(MigrationEvent migrationEvent) {
System.err.println("Started: " + migrationEvent);
}
@Override
public void migrationCompleted(MigrationEvent migrationEvent) {
System.err.println("Completed: " + migrationEvent);
}
@Override
public void migrationFailed(MigrationEvent migrationEvent) {
System.err.println("Failed: " + migrationEvent);
}
}

When you are registering this cluster listener in your example application and creating and destroying various nodes, you will see a deluge of events that show the ongoing migrations. The more astute among you may have previously spotted a repartitioning task logging when spinning up the multiple nodes:

INFO: [127.0.0.1]:5701 [dev] [3.5] Re-partitioning cluster data... Migration queue size: 271

The previous code indicated that 271 tasks (one migration task for each partition being migrated) have been scheduled to rebalance the cluster. The new listener will now give us significantly more visibility on these events as they occur and hopefully, they will be completed successfully:

Started: MigrationEvent{partitionId=98, oldOwner=Member [127.0.0.1]:5701,
newOwner=Member [127.0.0.1]:5702 this}
Completed: MigrationEvent{partitionId=98, oldOwner=Member [127.0.0.1]:5701,
newOwner=Member [127.0.0.1]:5702 this}
Started: MigrationEvent{partitionId=99, oldOwner=Member [127.0.0.1]:5701,
newOwner=Member [127.0.0.1]:5702 this}
Completed: MigrationEvent{partitionId=99, oldOwner=Member [127.0.0.1]:5701,
newOwner=Member [127.0.0.1]:5702 this}

However, this logging information is overwhelming and actually not all that useful to us. So, let’s expand on the listener to try and provide the application with the ability to check whether the cluster is currently migrating data partitions or has recently done so.

Let’s create a new static class, MigrationStatus, to hold information about cluster migration and help us interrogate it as regards its current status:

public abstract class MigrationStatus {
private static final Map<Integer, Boolean> MIGRATION_STATE =
new ConcurrentHashMap<Integer, Boolean>();
private static final AtomicLong LAST_MIGRATION_TIME =
new AtomicLong(System.currentTimeMillis());
public static void migrationEvent(
int partitionId, boolean state) {
MIGRATION_STATE.put(partitionId, state);
if (!state) {
LAST_MIGRATION_TIME.set(System.currentTimeMillis());
}
}
public static boolean isMigrating() {
Collection<Boolean> migrationStates
= MIGRATION_STATE.values();
Long lastMigrationTime = LAST_MIGRATION_TIME.get();
// did we recently (< 10 seconds ago) complete a migration
if (System.currentTimeMillis() < lastMigrationTime + 10000) {
return true;
}
// are any partitions currently migrating
for (Boolean partition : migrationStates) {
if (partition) return true;
}
// otherwise we're not migrating
return false;
}
}

Then, we will update the listener to pass through the appropriate calls in response to the events coming into it, as follows:

@Override
public void migrationStarted(MigrationEvent migrationEvent) {
MigrationStatus.migrationEvent(
migrationEvent.getPartitionId(), true);
}
@Override
public void migrationCompleted(MigrationEvent migrationEvent) {
MigrationStatus.migrationEvent(
migrationEvent.getPartitionId(), false);
}
@Override
public void migrationFailed(MigrationEvent migrationEvent) {
System.err.println("Failed: " + migrationEvent);
MigrationStatus.migrationEvent(
migrationEvent.getPartitionId(), false);
}

Finally, let’s add a loop to the example application to print out the migration state over time, as follows:

public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.addListenerConfig(
new ListenerConfig(new ClusterMembershipListener()));
conf.addListenerConfig(
new ListenerConfig(new MigrationStatusListener()));
HazelcastInstance hz = Hazelcast.newHazelcastInstance(conf);
while(true) {
System.err.println(
"Is Migrating?: " + MigrationStatus.isMigrating());
Thread.sleep(5000);
}
}

When starting and stopping various nodes, we should see each node detect the presence of the rebalance occurring, but it passes by quite quickly. It is in these small, critical periods of time when data is being moved around that resilience is mostly at risk, albeit depending on the configured numbers of backup, the risk could potentially be quite small.

Added: MembershipEvent {member=Member [127.0.0.1]:5703,type=added}
Is Migrating?: true
Is Migrating?: true
Is Migrating?: false

Extending quorum

We previously saw how we can configure a simple cluster health check to ensure that a given number of nodes were present to support the application. However, should we need more detailed control over the quorum definition beyond a simple node count check, we can create our own quorum function that will allow us to programmatically define what it means to be healthy. This can be as simple or as complex as what the application requires. In the following example, we sourced an expected cluster size (probably from a suitable location than a hard-coded) and dynamically checked whether a majority of the nodes are present:

public class QuorumExample {
public static void main(String[] args) throws Exception {
QuorumConfig quorumConf = new QuorumConfig();
quorumConf.setName("atLeastTwoNodesWithMajority");
quorumConf.setEnabled(true);
quorumConf.setType(QuorumType.WRITE);
final int expectedClusterSize = 5;
quorumConf.setQuorumFunctionImplementation(
new QuorumFunction() {
@Override
public boolean apply(Collection<Member> members) {
return members.size() >= 2
&& members.size() > expectedClusterSize / 2;
}
}
);
MapConfig mapConf = new MapConfig();
mapConf.setName("default");
mapConf.setQuorumName("atLeastTwoNodesWithMajority");
Config conf = new Config();
conf.addQuorumConfig(quorumConf);
conf.addMapConfig(mapConf);
HazelcastInstance hz = Hazelcast.newHazelcastInstance(conf);
new ConsoleApp(hz).start(args);
}
}

We can also create a listener for the quorum health check so that we can be notified when the state of the quorum changes, as follows:

public class ClusterQuorumListener implements QuorumListener {
@Override
public void onChange(QuorumEvent quorumEvent) {
System.err.println("Changed: " + quorumEvent);
}
}

Let’s attach the new listener to the appropriate configuration, as follows:

quorumConf.addListenerConfig(new QuorumListenerConfig(new   ClusterQuorumListener()));

Summary

Hazelcast allows us to be a first-hand witness to a lot of internal state information. By registering the listeners so that they can be notified as the events occur, we can further enhance an application not only in terms of its functionality, but also with respect to its resilience. By allowing the application to know when and what events are unfolding underneath it, we can add defensiveness to it, embracing the dynamic and destroyable nature of the ephemeral approaches towards applications and infrastructure.

Resources for Article:


Further resources on this subject:


LEAVE A REPLY

Please enter your comment!
Please enter your name here