17 min read

In this article by Christian Baxter, author of the book Mastering Akka, we will see, the most common, tried, and true approach is to model the data in a relational database when it comes to the persistence needs for an application. Following this approach has been the de facto way to store data until recently, when NoSQL (and to a lesser extent NewSQL) started to chip away at the footholds of relational database dominance. There’s nothing wrong with storing your application’s data this way—it’s how we initially chose to do so for the bookstore application using PostgreSQL as the storage engine.

This article deals with event sourcing and how to implement that approach using Akka Persistence. These are the main things you can expect to learn from this article:

(For more resources related to this topic, see here.)

Akka persistence for event sourcing

Akka persistence is a relatively newer module within the Akka toolkit. It became available as experimental in the 2.3.x series. Throughout that series, it went through quite a few changes as the team worked on getting the API and functionality right. When Akka 2.4.2 was released, the experimental label was removed, signifying that persistence was stable and ready to be leveraged in production code.

Akka persistence allows stateful actors to persist their internal state. It does this not to persisting the state itself, but instead as changes to that state. It uses an append-only model to persist these state changes, allowing you to later reconstitute the state by replaying the changes to that state. It also allows you to take periodic snapshots and use those to reestablish an actor’s state as a performance optimization for long-lived entities with lots of state changes.

Akka persistence’s approach should certainly sound familiar as it’s almost a direct overlay to the features of event sourcing. In fact, it was inspired by the eventsourced Scala library, so that overlay is no coincidence. Because of this alignment with event sourcing, Akka persistence will be the perfect tool for us to switch over to an event sourced model.

Before getting into the details of the refactor, I want to describe some of the high-level concepts in the framework.

The PersistentActor trait

The PersistentActor trait is the core building block to create event sourced entities. This actor is able to persist its events to a pluggable journal. When a persistent actor is restarted (reloaded), it will replay its journaled events to reestablish its current internal state. These two behaviors perfectly fit what we need to do for our event sourced entities, so this will be our core building block.

The PersistentActor trait has a log of features, more that I will cover in the next few sections. I’ll cover the things that we will use in the bookstore refactoring, which I consider to be the most useful features in PersistentActor. If you want to learn more, then I suggest you take a look at the Akka documents as they pretty much cover everything else that you can do with PersistentActor.

Persistent actor state handling

A PersistentActor implementation has two basic states that it can be in—Recovering and Receiving Commands. When Recovering, it’s in the process of reloading its event stream from the journal to rebuild its internal state. Any external messages that come in during this time will be stashed until the recovery process is complete. Once the recovery process completes, the persistent actor transitions into the Receiving Commands state where it can start to handle commands. These commands can then generate new events that can further modify the state of this entity. This two-state flow can be visualized in the following diagram:

These two states are both represented by custom actor receive handling partial functions. You must provide implementations for both of the following vals in order to properly implement these two states for your persistent actor:

val receiveRecover: Receive = {
  . . .
}
 
val receiveCommand: Receive = {
  . . .
}

While in the recovering state, there are two possible messages that you need to be able to handle. The first is one of the event types that you previously persisted for this entity type. When you get that type of message, you have to reapply the change implied by that event to the internal state of the actor. For example, if we had a SalesOrderFO fields object as the internal state, and we received a replayed event indicating that the order was approved, the handling might look something like this:

var state:SalesOrderFO = ...
val receiveRecover: Receive = {
  case OrderApproved(id) =>
  state = state.copy(status = SalesOrderStatus.Approved) 
}

We’d, of course, need to handle a lot more than that one event. This code sample was just to show you how you can modify the internal state of a persistent actor when it’s being recovered.

Once the actor has completed the recovery process, it can transition into the state where it starts to handle incoming command requests. Event sourcing is all about Action (command) and Reaction (events). When the persistent actor receives a command, it has the option to generate zero to many events as a result of that command. These events represent a happening on that entity that will affect its current state.

Events you receive while in the Recovering state will be previously generated while in the Receiving Commands state. So, the preceding example that I coded, where we receive OrderApproved, must have previously come from some command that we handled earlier. The handling of that command could have looked something like this:

val receiveCommand: Receive = {
  case ApproveOrder(id) =>
  persist(OrderApproved(id)){ event =>
    state = state.copy(status = SalesOrderStatus.Approved)
    sender() ! FullResult(state)
  }
}

After receiving the command request to change the order status to approved, the code makes a call to persist, which will asynchronously write an event into the journal. The full signature for persist is:

persist[A](event: A)(handler: (A) ⇒ Unit): Unit

The first argument there represents the event that you want to write to the journal. The second argument is a callback function that will be executed after the event has been successfully persisted (and won’t be called at all if the persistence fails). For our example, we will use that callback function to mutate the internal state to update the status field to match the requested action.

One thing to note is that the writing in the journal is asynchronous. So, one may then think that it’s possible to be closing over that internal state in an unsafe way when the callback function is executed. If you persisted two events in rapid succession, couldn’t it be possible for both of them to access that internal state at the same time in separate threads, kind of like when using Futures in an actor?

Thankfully, this is not the case. The completion of a persistence action is sent back as a new message to the actor. The hidden receive handling for this message will then invoke the callback associated with that persistence action. By using the mailbox again, we will know these post-persistence actions will be executed one at a time, in a safe manner. As an added bonus, the sender associated with those post-persistence messages will be the original sender of the command so you can safely use sender() in a persistence callback to reply to the original requestor, as shown in my example.

Another guarantee that the persistence framework makes when persisting events is that no other commands will be processed in between the persistence action and the associated callback. Any commands that come in during that time will be stashed until all of the post-persistence actions have been completed. This makes the persist/callback sequence atomic and isolated, in that nothing else can interfere with it while it’s happening. Allowing additional commands to be executed during this process may lead to an inconsistent state and response to the caller who sent the commands.

If for some reason, the persisting to the journal fails, there is an onPersistFailure callback that will be invoked. If you want to implement custom handling for this, you can override this method. No matter what, when persistence fails, the actor will be stopped after making this callback. At this point, it’s possible that the actor is in an inconsistent state, so it’s safer to stop it than to allow it to continue on in this state. Persistence failures probably mean something is failing with the journal anyway so restarting as opposed to stopping will more than likely lead to even more failures.

There’s one more callback that you can implement in your persistent actors and that’s onPersistRejected. This will happen if the serialization framework rejects the serialization of the event to store. When this happens, the persist callback does not get invoked, so no internal state update will happen. In this case, the actor does not stop or restart because it’s not in an inconsistent state and the journal itself is not failing.

The PersistenceId

Another important concept that you need to understand with PersistentActor is the persistenceId method. This abstract method must be defined for every type of PersistentActor you define, returning a String that is to be unique across different entity types and also between actor instances within the same type.

Let’s say I will create the Book entity as a PersistentActor and define the persistenceId method as follows:

override def persistenceId = "book"

If I do that, then I will have a problem with this entity, in that every instance will share the entire event stream for every other Book instance. If I want each instance of the Book entity to have its own separate event stream (and trust me, you will), then I will do something like this when defining the Book PersistentActor:

class Book(id:Int) extends PersistentActor{
  override def persistenceId = s"book-$id"
}

If I follow an approach like this, then I can be assured that each of my entity instances will have its own separate event stream as the persistenceId will be unique for every Int keyed book we have.

In the current model, when creating a new instance of an entity, we will pass in the special ID of 0 to indicate that this entity does not yet exist and needs to be persisted. We will defer ID creation to the database, and once we have an ID (after persistence), we will stop that actor instance as it is not properly associated with that newly generated ID. With the persistenceId model of associating the event stream to an entity, we will need the ID as soon as we create the actor instance. This means we will need a way to have a unique identifier even before persisting the initial entity state. This is something to think about before we get to the upcoming refactor.

Taking snapshots for faster recovery

I’ve mentioned the concept of taking a snapshot of the current state of an entity to speed up the process of recovering its state. If you have a long-lived entity that has generated a large amount of events, it will take progressively more and more time to recover its state. Akka’s PersistentActor supports the snapshot concept, putting it in your hands as to when to take the snapshot. Once you have taken the snapshots, the latest one will be offered to the entity during the recovery phase instead of all of the events that led up to it. This will reduce the total number of events to process to recover state, thus speeding up that process.

This is a two-part process, with the first part being taking snapshots periodically and the second being handling them during the recovery phase. Let’s take a look at the snapshot taking process first. Let’s say that you coded a particular entity to save a new snapshot for every one hundred events received. To make this happen, your command handling block may look something like this:

var eventTotal = ...
val receiveCommand:Receive = {
  case UpdateStatus(status) =>
  persist(StatusUpdated(status)){ event =>
    state = state.copy(status = event.status)
    eventTotal += 1
    if (eventTotal % 100 == 0)
    saveSnapshot(state)
  }

  case SaveSnapshotSuccess(metadata) => . . .
  case SaveSnapshotFailure(metadata, reason) => . . .
}

You can see in the post-persist logic that if we we’re making a specific call to saveSnapshot, we are passing the latest version of the actor’s internal state. You’re not limited to doing this just in the post-persist logic in reaction to a new event, but you can also set up the actor to publish a snapshot on regular intervals. You can leverage Akka’s scheduler to send a special message to the entity to instruct it to save the snapshot periodically.

If you start saving snapshots, then you will have to start handling the two new messages that will be sent to the entity indicating the status of the saved snapshot. These two new message types are SaveSnapshotSuccess and SaveSnapshotFailure. The metadata that appears on both messages will tell you things, such as the persistence ID where the failure occurred, the sequence number of the snapshot that failed, and the timestamp of the failure. You can see these two new messages in the command handling block shown in the preceding code.

Once you have saved a snapshot, you will need to start handling it in the recovery phase. The logic to handle a snapshot during recovery will look like the following code block:

val receiveRecover:Receive = {
  case SnapshotOffer(metadata, offeredSnapshot) => 
  state = offeredSnapshot
  case event => . . .
}

Here, you can see that if we get a snapshot during recovery, instead of just making an incremental change, as we do with real replayed events, we set the entire state to whatever the offered snapshot is. There may be hundreds of events that led up to that snapshot, but all we need to handle here is one message in order to wind the state forward to when we took that snapshot. This process will certainly pay dividends if we have lots of events for this entity and we continue to take periodic snapshots.

One thing to note about snapshots is that you will only ever be offered the latest snapshot (per persistence id) during the recovery process. Even though I’m taking a new snapshot every 100 events, I will only ever be offered one,the latest one, during the recovery phase.

Another thing to note is that there is no real harm in losing a snapshot. If your snapshot storage was wiped out for some reason, the only negative side effect is that you’ll be stuck processing all of the events for an entity when recovering it. When you take snapshots, you don’t lose any of the event history. Snapshots are completely supplemental and only benefit the performance of the recovery phase. You don’t need to take them, and you can live without them if something happens to the ones you had taken.

Serialization of events and snapshots

Within both the persistence and snapshot examples, you can see I was passing objects into the persist and saveSnapshot calls. So, how are these objects marshaled to and from a format that can actually be written to those stores? The answer is—via Akka serialization.

Akka persistence is dependent on Akka serialization to convert event and snapshot objects to and from a binary format that can be saved into a data store. If you don’t make any changes to the default serialization configuration, then your objects will be converted into binary via Java serialization. Java serialization is both slow and inefficient in terms of size of the serialized object. It’s also not flexible in terms of the object definition changing after producing the binary when you are trying to read it back in. It’s not a good choice for our needs with our event sourced app.

Luckily, Akka serialization allows you to provide your own custom serializers. If you, perhaps, wanted to use JSON as your serialized object representation then you can pretty easily build a custom serializer to do that. They also have a built-in Google Protobuf serializer that can convert your Protobuf binding classes into their binary format. We’ll explore both custom serializers and the Protobuf serializer when we get into the sections dealing with the refactors.

The AsyncWriteJournal

Another important component in Akka persistence, which I’ve mentioned a few times already, is the AsyncWriteJournal. This component is an append-only data store that stores the sequence of events (per persistence id) a PersistentActor generates via calls to persist. The journal also stores the highestSequenceNr per persistence id that tracks the total number of persisted events for that persistence id.

The journal is a pluggable component. You have the ability to configure the default journal and, also, override it on a per-entity basis. The default configuration for Akka does not provide a value for the journal to use, so you must either configure this setting or add a per-entity override (more on that in a moment) in order to start using persistence. If you want to set the default journal, then it can be set in your config with the following property:

akka.persistence.journal.plugin="akka.persistence.journal.leveldb"

The value in the preceding code must be the fully qualified path to another configuration section of the same name where the journal plugin’s config lives. For this example, I set it to the already provided leveldb config section (from Akka’s reference.conf).

If you want to override the journal plugin for a particular entity instance only, then you can do so by overriding the journalPluginId method on that entity actor, as follows:

class MyEntity extends PersistentActor{
  override def journalPluginId = "my-other-journal"
  . . .
}

The same rules apply here, in which, my-other-journal must be the fully qualified name to a config section where the config for that plugin lives.

My example config showed the use of the leveldb plugin that writes to the local file system. If you actually want to play around using this simple plugin, then you will also need to add the following dependencies into your sbt file:

"org.iq80.leveldb" % "leveldb" % "0.7"
"org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8"

If you want to use something different, then you can check the community plugins page on the Akka site to find one that suits your needs. For our app, we will use the Cassandra journal plugin. I’ll show you how to set up the config for that in the section dealing with the installation of Cassandra.

The SnapshotStore

The last thing I want to cover before we start the refactoring process is the SnapshotStore. Like the AsyncWriteJournal, the SnapshotStore is a pluggable and configurable storage system, but this one stores just snapshots as opposed to the entire event stream for a persistence id. As I mentioned earlier, you don’t need snapshots, and you can survive if the storage system you used for them gets wiped out for some reason. Because of this, you may consider using a separate storage plugin for them.

When selecting the storage system for your events, you need something that is robust, distributed, highly available, fault tolerant, and backup capable. If you lose these events, you lose the entire data set for your application. But, the same is not true for snapshots. So, take that information into consideration when selecting the storage. You may decide to use the same system for both, but you certainly don’t have to. Also, not every journal plugin can act as a snapshot plugin; so, if you decide to use the same for both, make sure that the journal plugin you select can handle snapshots.

If you want to configure the snapshot store, then the config setting to do that is as follows:

akka.persistence.snapshot-store.plugin="my-snapshot-plugin"

The setting here follows the same rules as the write journal; the value must be the fully qualified name to a config section where the plugin’s config lives. If you want to override the default setting on a per entity basis, then you can do so by overriding the snapshotPluginId command on your actor like this:

class MyEntity extends PersistentActor{
  override def snapshotPluginId = "my-other-snap-plugin"
  . . .
}

The same rules apply here as well, in which, the value must be a fully qualified path to a config section where the plugin’s config lives. Also, there are no out-of-the-box default settings for the snapshot store, so if you want to use snapshots, you must either set the appropriate setting in your config or provide the earlier mentioned override on a per entity basis.

For our needs, we will use the same storage mechanism—Cassandra—for both the write journal and the snapshot storage. We have a multi-node system currently, so using something that writes to the local file system, or a simple in-memory plugin, won’t work for us.

Summary

In this article, you learned about Akka persistence for event sourcing and the need to take a snapshot of the current state of an entity to speed up the process to recover its state.

Resources for Article:


Further resources on this subject:


LEAVE A REPLY

Please enter your comment!
Please enter your name here