13 min read

In this article by Tanmay Deshpande, the author of the book Mastering Apache Flink, we will learn the Table API provided by Apache Flink and how we can use it to process relational data structures. We will start learning more about the libraries provided by Apache Flink and how we can use them for specific use cases. To start with, let’s try to understand a library called complex event processing (CEP). CEP is a very interesting but complex topic which has its value in various industries. Wherever there is a stream of events expected, naturally people want to perform complex event processing in all such use cases. Let’s try to understand what CEP is all about.

(For more resources related to this topic, see here.)

What is complex event processing?

CEP is a technique to analyze streams of disparate events occurring with high frequency and low latency. These days, streaming events can be found in various industries, for example:

  • In the oil and gas domain, sensor data comes from various drilling tools or from upstream oil pipeline equipment
  • In the security domain, activity data, malware information, and usage pattern data come from various end points
  • In the wearable domain, data comes from various wrist bands with information about your heart beat rate, your activity, and so on
  • In the banking domain, data from credit cards usage, banking activities, and so on

It is very important to analyze the variation patterns to get notified in real time about any change in the regular assembly. CEP is able to understand the patterns across the streams of events, sub-events, and their sequences. CEP helps to identify meaningful patterns and complex relationships among unrelated events, and sends notifications in real and near real time to avoid any damage:

The preceding diagram shows how the CEP flow works. Even though the flow looks simple, CEP has various abilities such as:

  • Ability to produce results as soon as the input event stream is available
  • Ability to provide computations like aggregation over time and timeout between two events of interest
  • Ability to provide real time/near real time alerts and notifications on detection of complex event patterns
  • Ability to connect and correlate heterogeneous sources and analyze patterns in them
  • Ability to achieve high throughput, low latency processing

There are various solutions available in the market. With big data technology advancements, we have multiple options like Apache Spark, Apache Samza, Apache Beam, among others, but none of them have a dedicated library to fit all solutions. Now let us try to understand what we can achieve with Flink’s CEP library.

Flink CEP

Apache Flink provides the Flink CEP library which provides APIs to perform complex event processing. The library consists of the following core components:

  • Event stream
  • Pattern definition
  • Pattern detection
  • Alert generation

Flink CEP works on Flink’s streaming API called DataStream. A programmer needs to define the pattern to be detected from the stream of events and then Flink’s CEP engine detects the pattern and takes appropriate action, such as generating alerts.

In order to get started, we need to add following Maven dependency:

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala_2.10 -->

<dependency>

   <groupId>org.apache.flink</groupId>

   <artifactId>flink-cep-scala_2.10</artifactId>

   <version>1.1.2</version>

</dependency>

Event stream

A very important component of CEP is its input event stream. We have seen details of DataStream API. Now let’s use that knowledge to implement CEP. The very first thing we need to do is define a Java POJO for the event. Let’s assume we need to monitor a temperature sensor event stream.

First we define an abstract class and then extend this class.

While defining the event POJOs we need to make sure that we implement the hashCode() and equals() methods, as while comparing the events, compile will make use of them.

The following code snippets demonstrate this.

First, we write an abstract class as shown here:

package com.demo.chapter05;

public abstract class MonitoringEvent {

private String machineName;

public String getMachineName() {

   return machineName;

}

public void setMachineName(String machineName) {

   this.machineName = machineName;

}

@Override

public int hashCode() {

   final int prime = 31;

   int result = 1;

   result = prime * result + ((machineName == null) ? 0 : machineName.hashCode());

   return result;

}

@Override

public boolean equals(Object obj) {

   if (this == obj)

     return true;

   if (obj == null)

     return false;

   if (getClass() != obj.getClass())

     return false;

   MonitoringEvent other = (MonitoringEvent) obj;

   if (machineName == null) {

     if (other.machineName != null)

       return false;

 } else if (!machineName.equals(other.machineName))

     return false;

   return true;

}

public MonitoringEvent(String machineName) {

   super();

   this.machineName = machineName;

}

}

Then we write the actual temperature event:

package com.demo.chapter05;

public class TemperatureEvent extends MonitoringEvent {

public TemperatureEvent(String machineName) {

   super(machineName);

}

private double temperature;

public double getTemperature() {

   return temperature;

}

public void setTemperature(double temperature) {

   this.temperature = temperature;

}

@Override

public int hashCode() {

   final int prime = 31;

   int result = super.hashCode();

   long temp;

   temp = Double.doubleToLongBits(temperature);

   result = prime * result + (int) (temp ^ (temp >>> 32));

   return result;

}

@Override

public boolean equals(Object obj) {

   if (this == obj)

     return true;

   if (!super.equals(obj))

     return false;

   if (getClass() != obj.getClass())

     return false;

   TemperatureEvent other = (TemperatureEvent) obj;

   if (Double.doubleToLongBits(temperature) != Double.doubleToLongBits(other.temperature))

     return false;

   return true;

}

public TemperatureEvent(String machineName, double temperature) {

   super(machineName);

   this.temperature = temperature;

}

@Override

public String toString() {

   return "TemperatureEvent [getTemperature()=" + getTemperature() + ", getMachineName()=" + getMachineName()

       + "]";

}

}

Now we can define the event source as shown follows.

In Java:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

   DataStream<TemperatureEvent> inputEventStream = env.fromElements(new TemperatureEvent("xyz", 22.0),

       new TemperatureEvent("xyz", 20.1), new TemperatureEvent("xyz", 21.1), new TemperatureEvent("xyz", 22.2),

       new TemperatureEvent("xyz", 22.1), new TemperatureEvent("xyz", 22.3), new TemperatureEvent("xyz", 22.1),

       new TemperatureEvent("xyz", 22.4), new TemperatureEvent("xyz", 22.7),

       new TemperatureEvent("xyz", 27.0));

In Scala:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

   val input: DataStream[TemperatureEvent] = env.fromElements(new TemperatureEvent("xyz", 22.0),

     new TemperatureEvent("xyz", 20.1), new TemperatureEvent("xyz", 21.1), new TemperatureEvent("xyz", 22.2),

     new TemperatureEvent("xyz", 22.1), new TemperatureEvent("xyz", 22.3), new TemperatureEvent("xyz", 22.1),

   new TemperatureEvent("xyz", 22.4), new TemperatureEvent("xyz", 22.7),

     new TemperatureEvent("xyz", 27.0))

Pattern API

Pattern API allows you to define complex event patterns very easily. Each pattern consists of multiple states. To go from one state to another state, generally we need to define the conditions. The conditions could be continuity or filtered out events.

Let’s try to understand each pattern operation in detail.

Begin

The initial state can be defined as follows:

In Java:

Pattern<Event, ?> start = Pattern.<Event>begin("start");

In Scala:

val start : Pattern[Event, _] = Pattern.begin("start")

Filter

We can also specify the filter condition for the initial state:

In Java:

start.where(new FilterFunction<Event>() {

   @Override

   public boolean filter(Event value) {

       return ... // condition

   }

});

In Scala:

start.where(event => ... /* condition */)

Subtype

We can also filter out events based on their sub-types, using the subtype() method.

In Java:

start.subtype(SubEvent.class).where(new FilterFunction<SubEvent>() {

   @Override

   public boolean filter(SubEvent value) {

       return ... // condition

   }

});

In Scala:

start.subtype(classOf[SubEvent]).where(subEvent => ... /* condition */)

Or

Pattern API also allows us define multiple conditions together. We can use OR and AND operators.

In Java:

pattern.where(new FilterFunction<Event>() {

   @Override

   public boolean filter(Event value) {

       return ... // condition

   }

}).or(new FilterFunction<Event>() {

   @Override

   public boolean filter(Event value) {

       return ... // or condition

   }

});

In Scala:

pattern.where(event => ... /* condition */).or(event => ... /* or condition */)

Continuity

As stated earlier, we do not always need to filter out events. There can always be some pattern where we need continuity instead of filters.

Continuity can be of two types – strict continuity and non-strict continuity.

Strict continuity

Strict continuity needs two events to succeed directly which means there should be no other event in between. This pattern can be defined by next().

In Java:

Pattern<Event, ?> strictNext = start.next("middle");

In Scala:

val strictNext: Pattern[Event, _] = start.next("middle")

Non-strict continuity

Non-strict continuity can be stated as other events are allowed to be in between the specific two events. This pattern can be defined by followedBy().

In Java:

Pattern<Event, ?> nonStrictNext = start.followedBy("middle");

In Scala:

val nonStrictNext : Pattern[Event, _] = start.followedBy("middle")

Within

Pattern API also allows us to do pattern matching based on time intervals. We can define a time-based temporal constraint as follows.

In Java:

next.within(Time.seconds(30));

In Scala:

next.within(Time.seconds(10))

Detecting patterns

To detect the patterns against the stream of events, we need run the stream though the pattern. The CEP.pattern() returns PatternStream.

The following code snippet shows how we can detect a pattern. First the pattern is defined to check if temperature value is greater than 26.0 degrees in 10 seconds.

In Java:

Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent> begin("first")

       .subtype(TemperatureEvent.class).where(new FilterFunction<TemperatureEvent>() {

         public boolean filter(TemperatureEvent value) {

           if (value.getTemperature() >= 26.0) {

             return true;

           }

           return false;

         }

       }).within(Time.seconds(10));

   PatternStream<TemperatureEvent> patternStream = CEP.pattern(inputEventStream, warningPattern);

In Scala:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

val input = // data

val pattern: Pattern[TempEvent, _] = Pattern.begin("start").where(event => event.temp >= 26.0) 

val patternStream: PatternStream[TempEvent] = CEP.pattern(input, pattern)

Use case – complex event processing on temperature sensor

In earlier sections, we learnt various features provided by the Flink CEP engine. Now it’s time to understand how we can use it in real-world solutions. For that let’s assume we work for a mechanical company which produces some products. In the product factory, there is a need to constantly monitor certain machines. The factory has already set up the sensors which keep on sending the temperature of the machines at a given time.

Now we will be setting up a system that constantly monitors the temperature value and generates an alert if the temperature exceeds a certain value.

We can use the following architecture:

Here we will be using Kafka to collect events from sensors. In order to write a Java application, we first need to create a Maven project and add the following dependency:

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala_2.10 -->

   <dependency>

     <groupId>org.apache.flink</groupId>

     <artifactId>flink-cep-scala_2.10</artifactId>

     <version>1.1.2</version>

   </dependency>

   <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java_2.10 -->

   <dependency>

     <groupId>org.apache.flink</groupId>

     <artifactId>flink-streaming-java_2.10</artifactId>

     <version>1.1.2</version>

   </dependency>

   <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala_2.10 -->

   <dependency>

     <groupId>org.apache.flink</groupId>

     <artifactId>flink-streaming-scala_2.10</artifactId>

     <version>1.1.2</version>

   </dependency>

   <dependency>

     <groupId>org.apache.flink</groupId>

     <artifactId>flink-connector-kafka-0.9_2.10</artifactId>

     <version>1.0.0</version>

   </dependency>

Next we need to do following things for using Kafka.

First we need to define a custom Kafka deserializer. This will read bytes from a Kafka topic and convert it into TemperatureEvent. The following is the code to do this.

EventDeserializationSchema.java:

package com.demo.chapter05;

import java.io.IOException;

import java.nio.charset.StandardCharsets;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.api.java.typeutils.TypeExtractor;

import org.apache.flink.streaming.util.serialization.DeserializationSchema;

public class EventDeserializationSchema implements DeserializationSchema<TemperatureEvent> {

public TypeInformation<TemperatureEvent> getProducedType() {

   return TypeExtractor.getForClass(TemperatureEvent.class);

}

public TemperatureEvent deserialize(byte[] arg0) throws IOException {

   String str = new String(arg0, StandardCharsets.UTF_8);

   String[] parts = str.split("=");

   return new TemperatureEvent(parts[0], Double.parseDouble(parts[1]));

}

public boolean isEndOfStream(TemperatureEvent arg0) {

   return false;

}

}

Next we create topics in Kafka called temperature:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic temperature

Now we move to Java code which would listen to these events in Flink streams:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

   Properties properties = new Properties();

   properties.setProperty("bootstrap.servers", "localhost:9092");

   properties.setProperty("group.id", "test");

DataStream<TemperatureEvent> inputEventStream = env.addSource(

       new FlinkKafkaConsumer09<TemperatureEvent>("temperature", new EventDeserializationSchema(), properties));

Next we will define the pattern to check if the temperature is greater than 26.0 degrees Celsius within 10 seconds:

Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent> begin("first").subtype(TemperatureEvent.class).where(new FilterFunction<TemperatureEvent>() {

         private static final long serialVersionUID = 1L;

         public boolean filter(TemperatureEvent value) {

           if (value.getTemperature() >= 26.0) {

             return true;

           }

           return false;

         }

       }).within(Time.seconds(10));

Next match this pattern with the stream of events and select the event. We will also add up the alert messages into results stream as shown here:

DataStream<Alert> patternStream = CEP.pattern(inputEventStream, warningPattern)

       .select(new PatternSelectFunction<TemperatureEvent, Alert>() {

         private static final long serialVersionUID = 1L;

         public Alert select(Map<String, TemperatureEvent> event) throws Exception {

           return new Alert("Temperature Rise Detected:" + event.get("first").getTemperature()

               + " on machine name:" + event.get("first").getMachineName());

         }

});

In order to know the alerts generated, we will print the results:

patternStream.print();

And we execute the stream:

env.execute("CEP on Temperature Sensor");

Now we are all set to execute the application. So as and when we get messages in Kafka topics, the CEP will keep on executing.

The actual execution will looks like the following.

Example input:

xyz=21.0

xyz=30.0

LogShaft=29.3

Boiler=23.1

Boiler=24.2

Boiler=27.0

Boiler=29.0

Example output:

Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1010488393]

10/09/2016 18:15:55 Job execution switched to status RUNNING.

10/09/2016 18:15:55 Source: Custom Source(1/4) switched to SCHEDULED

10/09/2016 18:15:55 Source: Custom Source(1/4) switched to DEPLOYING

10/09/2016 18:15:55 Source: Custom Source(2/4) switched to SCHEDULED

10/09/2016 18:15:55 Source: Custom Source(2/4) switched to DEPLOYING

10/09/2016 18:15:55 Source: Custom Source(3/4) switched to SCHEDULED

10/09/2016 18:15:55 Source: Custom Source(3/4) switched to DEPLOYING

10/09/2016 18:15:55 Source: Custom Source(4/4) switched to SCHEDULED

10/09/2016 18:15:55 Source: Custom Source(4/4) switched to DEPLOYING

10/09/2016 18:15:55 CEPPatternOperator(1/1) switched to SCHEDULED

10/09/2016 18:15:55 CEPPatternOperator(1/1) switched to DEPLOYING

10/09/2016 18:15:55 Map -> Sink: Unnamed(1/4) switched to SCHEDULED

10/09/2016 18:15:55 Map -> Sink: Unnamed(1/4) switched to DEPLOYING

10/09/2016 18:15:55 Map -> Sink: Unnamed(2/4) switched to SCHEDULED

10/09/2016 18:15:55 Map -> Sink: Unnamed(2/4) switched to DEPLOYING

10/09/2016 18:15:55 Map -> Sink: Unnamed(3/4) switched to SCHEDULED

10/09/2016 18:15:55 Map -> Sink: Unnamed(3/4) switched to DEPLOYING

10/09/2016 18:15:55 Map -> Sink: Unnamed(4/4) switched to SCHEDULED

10/09/2016 18:15:55 Map -> Sink: Unnamed(4/4) switched to DEPLOYING

10/09/2016 18:15:55 Source: Custom Source(2/4) switched to RUNNING

10/09/2016 18:15:55 Source: Custom Source(3/4) switched to RUNNING

10/09/2016 18:15:55 Map -> Sink: Unnamed(1/4) switched to RUNNING

10/09/2016 18:15:55 Map -> Sink: Unnamed(2/4) switched to RUNNING

10/09/2016 18:15:55 Map -> Sink: Unnamed(3/4) switched to RUNNING

10/09/2016 18:15:55 Source: Custom Source(4/4) switched to RUNNING

10/09/2016 18:15:55 Source: Custom Source(1/4) switched to RUNNING

10/09/2016 18:15:55 CEPPatternOperator(1/1) switched to RUNNING

10/09/2016 18:15:55 Map -> Sink: Unnamed(4/4) switched to RUNNING

1> Alert [message=Temperature Rise Detected:30.0 on machine name:xyz]

2> Alert [message=Temperature Rise Detected:29.3 on machine name:LogShaft]

3> Alert [message=Temperature Rise Detected:27.0 on machine name:Boiler]

4> Alert [message=Temperature Rise Detected:29.0 on machine name:Boiler]

We can also configure a mail client and use some external web hook to send e-mail or messenger notifications.

The code for the application can be found on GitHub: https://github.com/deshpandetanmay/mastering-flink.

Summary

We learnt about complex event processing (CEP). We discussed the challenges involved and how we can use the Flink CEP library to solve CEP problems. We also learnt about Pattern API and the various operators we can use to define the pattern. In the final section, we tried to connect the dots and see one complete use case. With some changes, this setup can be used as it is present in various other domains as well.

We will see how to use Flink’s built-in Machine Learning library to solve complex problems.

Resources for Article:


Further resources on this subject:


LEAVE A REPLY

Please enter your comment!
Please enter your name here