15 min read

(For more resources on JBoss see here.)

CEP and ESP

CEP and ESP are styles of processing in an Event Driven Architecture (General introduction to Event Driven Architecture can be found at: http://elementallinks.typepad.com/bmichelson/2006/02/eventdriven_arc.html). One of the core benefits of such an architecture is that it provides loose coupling of its components. A component simply publishes events about actions that it is executing and other components can subscribe/listen to these events. The producer and the subscriber are completely unaware of each other. A subscriber listens for events and doesn’t care where they come from. Similarly, a publisher generates events and doesn’t know anything about who is listening to those events. Some orchestration layer then deals with the actual wiring of subscribers to publishers.

An event represents a significant change of state. It usually consists of a header and a body. The header contains meta information such as its name, time of occurrence, duration, and so on. The body describes what happened. For example, if a transaction has been processed, the event body would contain the transaction ID, the amount transferred, source account number, destination account number, and so on.

CEP deals with complex events. A complex event is a set of simple events. For example, a sequence of large withdrawals may raise a suspicious transaction event. The simple events are considered to infer that a complex event has occurred.

ESP is more about real-time processing of huge volume of events. For example, calculating the real-time average transaction volume over time.

More information about CEP and ESP can be found on the web site, http://complexevents.com/ or in a book written by Prof. David Luckham, The Power of Events. This book is considered the milestone for the modern research and development of CEP.

There are many existing pure CEP/ESP engines, both commercial and open source. Drools Fusion enhances the rule based programming with event support. It makes use of its Rete algorithm and provides an alternative to existing engines.

Drools Fusion

Drools Fusion is a Drools module that is a part of the Business Logic Integration Platform. It is the Drools event processing engine covering both CEP and ESP. Each event has a type, a time of occurrence, and possibly, a duration. Both point in time (zero duration) and interval-based events are supported. Events can also contain other data like any other facts—properties with a name and type. All events are facts but not all facts are events. An event’s state should not be changed. However, it is valid to populate the unpopulated values. Events have clear life cycle windows and may be transparently garbage collected after the life cycle window expires (for example, we may be interested only in transactions that happened in the last 24 hours). Rules can deal with time relationships between events.

Fraud detection

It will be easier to explain these concepts by using an example—a fraud detection system. Fraud in banking systems is becoming a major concern. The amount of online transactions is increasing every day. An automatic system for fraud detection is needed. The system should analyze various events happening in a bank and, based on a set of rules, raise an appropriate alarm.

This problem cannot be solved by the standard Drools rule engine. The volume of events is huge and it happens asynchronously. If we simply inserted them into the knowledge session, we would soon run out of memory. While the Rete algorithm behind Drools doesn’t have any theoretical limitation on number of objects in the session, we could use the processing power more wisely. Drools Fusion is the right candidate for this kind of task.

Problem description

Let’s consider the following set of business requirements for the fraud detection system:

  1. If a notification is received from a customer about a stolen card, block this account and any withdrawals from this account.
  2. Check each transaction against a blacklist of account numbers. If the transaction is transferring money from/to such an account, then flag this transaction as suspicious with the maximum severity.
  3. If there are two large debit transactions from the same account within a ninety second period and each transaction is withdrawing more than 300% of the average monthly (30 days) withdrawal amount, flag these transactions as suspicious with minor severity.
  4. If there is a sequence of three consecutive, increasing, debit transactions originating from a same account within a three minute period and these transactions are together withdrawing more than 90% of the account’s average balance over 30 days, then flag those transactions as suspicious with major severity and suspend the account.
  5. If the number of withdrawals over a day is 500% higher than the average number of withdrawals over a 30 day period and the account is left with less than 10% of the average balance over a month (30 days), then flag the account as suspicious with minor severity.
  6. Duplicate transactions check—if two transactions occur in a time window of 15 seconds that have the same source/destination account number, are of the same amount, and just differ in the transaction ID, then flag those transactions as duplicates.

Monitoring:

  1. Monitor the average withdrawal amount over all of the accounts for 30 days.
  2. Monitor the average balance across all of the accounts.

Design and modeling

Looking at the requirements, we’ll need a way of flagging a transaction as suspicious. This state can be added to an existing Transaction type, or we can externalize this state to a new event type. We’ll do the latter. The following new events will be defined:

  • TransactionCreatedEvent—An event that is triggered when a new transaction is created. It contains a transaction identifier, source account number, destination account number, and the actual amount transferred.
  • TransactionCompletedEvent—An event that is triggered when an existing transaction has been processed. It contains the same fields as the TransactionCreatedEvent class.
  • AccountUpdatedEvent—An event that is triggered when an account has been updated. It contains the account number, current balance, and the transaction identifier of a transaction that initiated this update.
  • SuspiciousAccount—An event triggered when there is some sort of a suspicion around the account. It contains the account number and severity of the suspicion. It is an enumeration that can have two values MINOR and MAJOR. This event’s implementation is shown in the following code.
  • SuspiciousTransaction—Similar to SuspiciousAccount, this is an event that flags a transaction as suspicious. It contains a transaction identifier and severity level.
  • LostCardEvent—An event indicating that a card was lost. It contains an account number.

One of events described—SuspiciousAccount—is shown in the following code. It also defines SuspiciousAccountSeverity enumeration that encapsulates various severity levels that the event can represent. The event will define two properties. One of them is already mentioned severity and the other one will identify the account—accountNumber.

/**
* marks an account as suspicious
*/
public class SuspiciousAccount implements Serializable {
public enum SuspiciousAccountSeverity {
MINOR, MAJOR
}
private final Long accountNumber;
private final SuspiciousAccountSeverity severity;

public SuspiciousAccount(Long accountNumber,
SuspiciousAccountSeverity severity) {
this.accountNumber = accountNumber;
this.severity = severity;
}
private transient String toString;

@Override
public String toString() {
if (toString == null) {
toString = new ToStringBuilder(this).appendSuper(
super.toString()).append("accountNumber",
accountNumber).append("severity", severity)
.toString();
}
return toString;
}

Code listing 1: Implementation of SuspiciousAccount event.

Please note that the equals and hashCode methods in SuspiciousAccount from the preceding code listing are not overridden. This is because an event represents an active entity, which means that each instance is unique. The toString method is implemented using org.apache.commons.lang.builder.ToStringBuilder. All of these event classes are lightweight, and they have no references to other domain classes (no object reference; only a number—accountNumber—in this case). They are also implementing the Serializable interface. This makes them easier to transfer between JVMs. As best practice, this event is immutable. The two properties above (accountNumber and severity) are marked as final. They can be set only through a constructor (there are no set methods—only two get methods). The get methods were excluded from this code listing.

The events themselves don’t carry a time of occurrence—a time stamp (they easily could, if we needed it; we’ll see how in the next set of code listings). When the event is inserted into the knowledge session, the rule engine assigns such a time stamp to FactHandle that is returned. (Remember? session.insert(..) returns a FactHandle). In fact, there is a special implementation of FactHandle called EventFactHandle. It extends the DefaultFactHandle (which is used for normal facts) and adds few additional fields, for example—startTimestamp and duration. Both contain millisecond values and are of type long.

Ok, we now have the event classes and we know that there is a special FactHandle for events. However, we still don’t know how to tell Drools that our class represents an event. Drools type declarations provide this missing link. It can define new types and enhance existing types. For example, to specify that the class TransactionCreatedEvent is an event, we have to write:

declare TransactionCreatedEvent
@role( event )
end

Code listing 2: Event role declaration (cep.drl file).

This code can reside inside a normal .drl file. If our event had a time stamp property or a duration property, we could map it into startTimestamp or duration properties of EventFactHandle by using the following mapping:

@duration( durationProperty )

Code listing 3: Duration property mapping.

The name in brackets is the actual name of the property of our event that will be mapped to the duration property of EventFactHandle. This can be done similarly for startTimestamp property.

As an event’s state should not be changed (only unpopulated values can be populated), think twice before declaring existing beans as events. Modification to a property may result in an unpredictable behavior.

In the following examples, we’ll also see how to define a new type declaration

Fraud detection rules

Let’s imagine that the system processes thousands of transactions at any given time. It is clear that this is challenging in terms of time and memory consumption. It is simply not possible to keep all of the data (transactions, accounts, and so on) in memory. A possible solution would be to keep all of the accounts in memory as there won’t be that many of them (in comparison to transactions) and keep only transactions for a certain period. With Drools Fusion, we can do this very easily by declaring that a Transaction is an event.

The transaction will then be inserted into the knowledge session through an entry-point. Each entry point defines a partition in the input data storage, reducing the match space and allowing patterns to target specific partitions. Matching data from a partition requires explicit reference at the pattern declaration. This makes sense, especially if there are large quantities of data and only some rules are interested in them. We’ll look at entry points in the following example.

If you are still concerned about the volume of objects in memory, this solution can be easily partitioned, for example, by account number. There might be more servers, each processing only a subset of accounts (simple routing strategy might be: accountNumber module totalNumberOfServersInCluster). Then each server would receive only appropriate events.

Notification

The requirement we’re going to implement here is essentially to block an account whenever a LostCardEvent is received. This rule will match two facts: one of type Account and one of type LostCardEvent. The rule will then set the the status of this account to blocked. The implementation of the rule is as follows:

rule notification
when
$account : Account( status != Account.Status.BLOCKED )
LostCardEvent( accountNumber == $account.number )
from entry-point LostCardStream
then
modify($account) {
setStatus(Account.Status.BLOCKED)
};
end

Code listing 4: Notification rule that blocks an account (cep.drl file).

As we already know, Account is an ordinary fact from the knowledge session. The second fact—LostCardEvent—is an event from an entry point called LostCardStream. Whenever a new event is created and goes through the entry point, LostCardStream, this rule tries to match (checks if its conditions can be satisfied). If there is an Account in the knowledge session that didn’t match with this event yet, and all conditions are met, the rule is activated. The consequence sets the status of the account to blocked in a modify block.

As we’re updating the account in the consequence and also matching on it in the condition, we have to add a constraint that matches only the non-blocked accounts to prevent looping (see above: status != Account.Status.BLOCKED).

Test configuration setup

Following the best practice that every code/rule needs to be tested, we’ll now set up a class for writing unit tests. All of the rules will be written in a file called cep.drl. When creating this file, just make sure it is on the classpath. The creation of knowledgeBase won’t be shown. It is similar to the previous tests that we’ve written. We just need to change the default knowledge base configuration slightly:

KnowledgeBaseConfiguration config = KnowledgeBaseFactory
.newKnowledgeBaseConfiguration();
config.setOption( EventProcessingOption.STREAM );

Code listing 5: Enabling STREAM event processing mode on knowledge base configuration.

This will enable the STREAM event processing mode. KnowledgeBaseConfiguration from the preceding code is then used when creating the knowledge base—KnowledgeBaseFactory.newKnowledgeBase(config).

Part of the setup is also clock initialization. We already know that every event has a time stamp. This time stamp comes from a clock which is inside the knowledge session. Drools supports several clock types, for example, a real-time clock or a pseudo clock. The real-time clock is the default and should be used in normal circumstances. The pseudo clock is especially useful for testing as we have complete control over the time. The following initialize method sets up a pseudo clock. This is done by setting the clock type on KnowledgeSessionConfiguration and passing this object to the newStatefulKnowledgeSession method of knowledgeBase. The initialize method then makes this clock available as a test instance variable called clock when calling session.getSessionClock(), as we can see in the following code:

public class CepTest {
static KnowledgeBase knowledgeBase;
StatefulKnowledgeSession session;
Account account;
FactHandle accountHandle;
SessionPseudoClock clock;
TrackingAgendaEventListener trackingAgendaEventListener;
WorkingMemoryEntryPoint entry;
@Before
public void initialize() throws Exception {
KnowledgeSessionConfiguration conf =
KnowledgeBaseFactory.newKnowledgeSessionConfiguration();
conf.setOption( ClockTypeOption.get( "pseudo" ) );
session = knowledgeBase.newStatefulKnowledgeSession(conf,
null);
clock = (SessionPseudoClock) session.getSessionClock();
trackingAgendaEventListener =
new TrackingAgendaEventListener();
session.addEventListener(trackingAgendaEventListener);
account = new Account();
account.setNumber(123456l);
account.setBalance(BigDecimal.valueOf(1000.00));
accountHandle = session.insert(account);

Code listing 6: Unit tests setup (CepTest.java file).

The preceding initialize method also creates an event listener and passes it into the session. The event listener is called TrackingAgendaEventListener. It simply tracks all of the rule executions. It is useful for unit testing to verify whether a rule fired or not. Its implementation is as follows:


public class TrackingAgendaEventListener extends
DefaultAgendaEventListener {
List<String> rulesFiredList = new ArrayList<String>();
@Override
public void afterActivationFired(
AfterActivationFiredEvent event) {
rulesFiredList.add(event.getActivation().getRule()
.getName());
}
public boolean isRuleFired(String ruleName) {
for (String firedRuleName : rulesFiredList) {
if (firedRuleName.equals(ruleName)) {
return true;
}
}
return false;
}
public void reset() {
rulesFiredList.clear();
}
}

Code listing 7: Agenda event listener that tracks all rules that have been fired.

DefaultAgendaEventListener comes from the org.drools.event.rule package that is part of drools-api.jar file as opposed to the org.drools.event package that is part of the old API in drools-core.jar.

All of the Drools agenda event listeners must implement the AgendaEventListener interface. TrackingAgendaEventListener in the preceding code extends DefaultAgendaEventListener so that we don’t have to implement all of the methods defined in the AgendaEventListener interface. Our listener just overrides the afterActivationFired method that will be called by Drools every time a rule’s consequence has been executed. Our implementation of this method adds the fired rule name into a list of fired rules—rulesFiredList. Then the convenience method isRuleFired takes a ruleName as a parameter and checks if this rule has been executed/fired. The reset method is useful for clearing out the state of this listener, for example, after session.fireAllRules is called.

Now, back to the test configuration setup. The last part of the initialize method from code listing 6 is account object creation (account = new Account(); …). This is for convenience purposes so that every test does not have to create one. The account balance is set to 1000. The account is inserted into the knowledge session and its FactHandle is stored so that the account object can be easily updated.

Testing the notification rule

The test infrastructure is now fully set up and we can write a test for the notification rule from code listing 4 as follows:

@Test
public void notification() throws Exception {
session.fireAllRules();
assertNotSame(Account.Status.BLOCKED,account.getStatus());
entry = session
.getWorkingMemoryEntryPoint("LostCardStream");
entry.insert(new LostCardEvent(account.getNumber()));
session.fireAllRules();
assertSame(Account.Status.BLOCKED, account.getStatus());
}

Code listing 8: Notification rule’s unit test (CepTest.java file).

The test verifies that the account is not blocked by accident first. Then it gets the LostCardStream entry point from the session by calling: session.getWorkingMemoryEntryPoint(“LostCardStream”). Then the code listing demonstrates how an event can be inserted into the knowledge session through an entry-point—entry.insert(new LostCardEvent(…)).

In a real application, you’ll probably want to use Drools Pipeline for inserting events into the knowledge session. It can be easily connected to an existing ESB (Enterprise Service Bus) or a JMS topic or queue.

Drools entry points are thread-safe. Each entry point can be used by a different thread; however, no two threads can concurrently use the same entry point. In this case, it makes sense to start the engine in fireUntilHalt mode in a separate thread like this:

new Thread(new Runnable() {
public void run() {
session.fireUntilHalt();
}
}).start();

Code listing 9: Continuous execution of rules.

The engine will then continuously execute activations until the session.halt() method is called.

The test then verifies that the status of the account is blocked. If we simply executed session.insert(new LostCardEvent(..)); the test would fail because the rule wouldn’t see the event.

LEAVE A REPLY

Please enter your comment!
Please enter your name here