FREE Training Courses — 100+ Hours of Akka and Reactive Architecture Expertise Get Started
Support
akka akka-cluster

How To Distribute Application State with Akka Cluster - Part 1: Getting Started

Michael Read Principal Consultant, Lightbend, Inc.

Introduction To This Series

Building, testing, containerizing, deploying, and monitoring distributed microservices is difficult, but using the tools from Lightbend can make it a lot faster and easier to be successful. In this series of blog posts, we’ll walk you through a working Proof of Concept (PoC) built using Lightbend’s open source Scala programming language with the Akka distributed toolkit.

In this part, we walk you through building, testing, and running the PoC locally, with instrumentation and monitoring wired in from the very beginning using Lightbend Telemetry. In Part 2: Docker and Local Deploy, we deploy to Docker to our local machine and test. In Part 3: Kubernetes & Monitoring, we move our PoC to Kubernetes (on Minikube), review some monitoring options, and dive deep into the Kubernetes deployment YAML files. Finally, in Part 4: Source Code, we look at the Scala source code required to create the PoC.

The PoC takes advantage of Akka’s commercial resilience enhancements as well as Lightbend Telemetry so you’ll need to have a current Lightbend subscription with properly configured credentials to successfully build the project. If you don’t have a current subscription you can ask Lightbend for a quote from here. In any case, we’ve adapted our build process (build.sbt) to key off the existence of a Lightbend subscription credential file stored locally as ~/.lightbend/commercial.credentials. If this file doesn’t exist then none of Lightbend’s commercial resilience enhancements are included in the build. Separate configurations are included and prefixed with “nonsup-” (non-support) and called out where appropriate.

The Github project covering this blog can be found and cloned from here.

CLONE THE PROJECT

The Premise: Proving a Microservice Use Case

In our PoC, the primary aim is to track two different digital artifact states, by user, for a user interface (UI) like LinkedIn. First, we want to know whether a given user has read a given artifact, which could literally be any type of digital payload. Second, we want to know if the artifact is currently included in the user’s feed. These two states are to be queried by the UI through a REST API, and then reflected in the UI. In both cases these states are simply represented as a boolean contained within a single Scala case class:

final case class CurrState(artifactRead: Boolean = false, artifactInUserFeed : Boolean = false)

Of course in the real world we’d want to maintain a lot more state than is described by these two fields, but for our purposes these will suffice.

The ultimate aim of our PoC is to be able to scale out to handle millions of users in a highly responsive manner, with a strongly consistent state, while maintaining a high level of performance, and resilience. In other words, the PoC delivers a highly performant distributed cache with a persistent backing that manifests the traits described in the Reactive Manifesto–responsive, elastic, resilient, and message driven.

Can we use Strong Consistency?

Traditional architectures are built for Strong Consistency, but we’re building a microservice in a distributed system. So how can we achieve Strong Consistency in a distributed environment? Physics prevents Strong Consistency, so instead we must simulate it.

We could use locks and transactions to simulate it, but these create undesirable contention. Any two things that contend for a single resource are in competition. This competition can only have one winner, and others are forced to wait for the winner to complete. This contention creates bottlenecks and thus reduces our ability to scale.

Another scalability factor we must take into consideration when building distributed microservices are the effects of Coherency Delay. In a distributed system, synchronizing the state of multiple nodes is done using crosstalk or gossip. Each node in a cluster sends messages to other nodes informing them of any state changes.

The time it takes for this synchronization to complete is called the Coherency Delay, which further adds to the overhead of parallelizing a system, and therefore there is a price to pay. The bottom line is that we need to be aware of the costs associated with crosstalk and try to reduce it where we can.

In our use case we’re relying on the Akka toolkit to optimize the impacts associated with crosstalk. For more information on the costs of Coherency Delay please see the article on Gunther’s Universal Scalability Law.

We can balance the need for Strong Consistency with the need for scalability by seeking to eliminate contention and Coherency Delay by introducing Akka Cluster Sharding to limit the scope of contention, and to reduce crosstalk. Crosstalk is reduced because you don’t need to share the same state across the entire cluster. By using Akka Cluster Sharding, contention is isolated to a single entity via a unique identifier. Contention in one entity has little or no impact on the other entities.

We’re using the Akka Toolkit

Akka provides many different individual libraries, based upon the Actor Model, which is used to build highly scalable, cloud-native, and distributed applications. The Actor Model was first introduced in a 1973 paper by Carl Hewitt, and later popularized by the Erlang programming language. Now with the advent of multi-core computers, cloud based computing, and high speed networks, the Actor Model on the JVM has come into its own and matured through Akka.

Akka’s features are packaged into separate modules so you only need to include the dependencies you’re using.

In our PoC we will be using seven different open source Akka modules:

  1. HTTP Server – Akka HTTP provides for fully asynchronous non-blocking HTTP endpoints and routes within to implement a REST API. Documentation can be found here: https://doc.akka.io/docs/akka-http/current/introduction.html
  2. Actor System – The Akka Actor system enables each actor to execute its own isolated context, greatly simplifying concurrency and distribution. Each actor has its own mailbox, and communicates with other actors via message passing. In the past Akka messages were untyped (of any type), but now with Akka Typed, actors can only receive messages of an expected type(s). Actors can crash too, and in fact Akka promotes the concept of “let it crash”. Resilience is provided through actor hierarchies (parent / children) with supervision strategies that can be implemented for recovery, and self healing. Documentation can be found here: https://doc.akka.io/docs/akka/current/typed/index.html
  3. Cluster – Akka Cluster provides a fault-tolerant distributed cluster of actor systems (member nodes) that are typically run in separate containers and JVMs. Documentation can be found here: https://doc.akka.io/docs/akka/current/typed/cluster.html
  4. Cluster Sharding – Akka Cluster Sharding sits on top of Akka Cluster and distributes data in shards, and load across members of a cluster without developers needing to keep track of where data actually resides in the cluster. Data is stored in Actors that represent individual entities, identified by a unique key, which closely corresponds to an Aggregate Root in Domain-Driven Design terminology. For example, in our PoC, the key to each entity is a combination of User & Artifact ID. Documentation can be found here: https://doc.akka.io/docs/akka/current/typed/cluster-sharding.html
  5. Persistence – Combined with Cluster Sharding, Akka Persistence gives actor entities the ability to persist their state as an event log using the pattern of Command Query Responsibility Segregation and Event Sourcing (CQRS / ES). When an actor receives a message (command) that would cause a state to change, the event is saved to a database first and once confirmed successful, then state change is reflected inside the actor. This event journal can be used to reconstitute the actor by replaying the events at any time after a failure or passivation. Documentation can be found here: https://doc.akka.io/docs/akka/current/typed/persistence.html
  6. Multi-JVM Testing – Akka provides a toolkit that allows you form Akka clusters locally on your own machine and then run Scala tests against all the various components of your microservice. Documentation can be found here: https://doc.akka.io/docs/akka/current/multi-jvm-testing.html
  7. Akka Cluster Bootstrap – is a small subset of the Akka Management module that facilitates dynamic seed nodes that are needed to form an Akka cluster in an environment where IP addresses are assigned during the bootstrapping process. We’ll be using this module to assist in forming our cluster when deploying to Kubernetes. Documentation can be found here: https://doc.akka.io/docs/akka-management/current/bootstrap/index.html

These are just the seven Akka modules that were used to build this PoC but there are many more interesting Akka modules that can be very useful in building distributed microservices. We'd like to call your attention to some of the others: Akka Streams, Akka Cluster Singleton, Distributed Data, Alpakka, gRPC, and Akka Enhancements.

Commercial Resilience Enhancements

This project takes advantage of additional technology enhancements that come with a Lightbend subscription, including:

  • Lightbend Telemetry - Telemetry, part of Lightbend’s Intelligent Monitoring feature set, is a suite of insight tools that provides a view into the workings of our distributed platforms, including distributed tracing, customized metrics, and all the integrations needed to connect with other APM and metrics tools.
  • Lightbend Console - The Console works with Lightbend Telemetry to provide visibility into your distributed applications. Grafana and Prometheus are included and pre-configured for quick and easy setup, allowing you to focus on building the core business value of your applications.
  • Akka Split Brain Resolver - When operating an Akka cluster you must consider how to handle network partitions (a.k.a. split brain scenarios) and machine crashes (including JVM and hardware failures). This is crucial for correct behavior if you use Cluster Singleton or Cluster Sharding, especially together with Akka Persistence.
  • Akka Thread Starvation Detector - The Akka Thread Starvation Detector is a diagnostic tool that monitors the dispatcher of an ActorSystem and logs a warning if the dispatcher becomes unresponsive.
  • Akka Configuration Checker - Akka comes with a massive amount of configuration settings that can be tweaked. It can be difficult to know which knobs to turn and which to leave alone. Finding correct values and appropriate relations between different settings may seem like a black art. In many cases incorrect configuration values contribute to terrible stability and bad performance.
  • Akka Diagnostics Recorder - The Akka Diagnostics Recorder writes configuration and system information to a file that can be attached to your Lightbend support cases. The information helps us at Lightbend to give you the best possible support.

Project Layout with sbt

The project is run and built using the Simple Build Tool (sbt). In the case that you’re not familiar with SBT, each project’s build configuration is based upon the contents of the project directory, and the build.sbt file that is saved in the root of the project’s directory. The project directory is used to add SBT plug-ins used by the build process, and depending on style sometimes used to configure a library of dependencies, and required versions. The build.sbt file provides the guts of the build process. In our PoC, the build.sbt file provides specific build dependencies, and run options for monitoring.

The source code directory layout is as follows:

src/
  main/
    resources/
    scala/
  multi-jvm/
    scala/
  test/
    scala/

One thing that you might find odd is the multi-jvm directory, which is used for our test cases. The test directory is standard and provided for future unit tests.

Reactive System Architecture

Throughout this section we'll be referring to the image above, which provides a visualization of a small deployment of the PoC, with five Akka clustered nodes.

The first two nodes (Node 1, and Node2) are of the same type and provide two levels of functionality. The first functionality, referred to as Endpoint Router (EP), is an HTTP endpoint (Akka HTTP) which implements the REST API. The second functionality, referred to as Shard Region Proxy (SP), provides access into Akka Cluster Sharding. Commands and Queries are created by the EP, then wrapped in a sharding envelope that includes the targeted entity’s unique Id, and finally sent through the proxy for handling.

In the PoC, each Entity’s unique identity is a combination of the User’s Id, and the specific Artifact’s Id. For this node type, the only code I’ve created is for the EP, which is made up of three source files saved in the package com.lightbend.artifactstate.endpoint.

The next three nodes (Node 3, Node 4, and Node5), are of the same type and handle all sharding operations including but not limited to the Entity’s (E) life-cycle, and rebalancing shards, which are all provided by the Akka toolkit. The only source code that’s required for this node type is the implementation of the Entity itself. That’s it!

The main advantage of having two types of nodes is that each type can be scaled independently, as needed. With a project such as this, that’ll ultimately be deployed through container orchestration, Lightbend recommends using a single code base without sub-projects, and a node’s behavior be defined through configuration, and thus a single image is used. If sub-projects were used for this PoC then we’d have two image types to manage.

For this PoC each Entity persists it’s event log to Cassandra as depicted in the bottom right cluster through the Akka Persistence Cassandra Plug-in. If you don’t want to deal with building and managing a Cassandra cluster, which can be difficult, there are other Lightbend supported persistence plug-ins available including JDBC, and Couchbase. A complete list of Akka Persistence community plug-ins can be found here.

All circles in the image above represent actors and their size and color is primarily related to the hierarchy of the parent / child relationship. All communications between actors is message driven in an asynchronous non-block manner. For the most part, communication within each node follows the actor hierarchies through the SR, but documenting node to node actor communication was too busy to include in the image.

The primary key to communication between the nodes is the Shard Coordinator (SC) that keeps track of which shard regions are available, and what shard # the nodes are hosting. Another chore of the SC is handling the redistribution of shards when nodes go down, are removed, or are added to the cluster. The SC is a cluster singleton, and if the node hosting the SC goes down, a new SC is automatically recreated in another node. The cluster’s distributed data internals keeps track of the internal state of the SC if it ever needs to be reconstituted somewhere else.

The following sequence diagram illustrates a common message flow pattern when sending commands through the Endpoint and on to specific entities:

Notice that the response from the Entity above returns directly to the Endpoint where the command originated. This is because the internal API, defined by the Entity, requires a replyTo actor reference in the original command, thus bypassing the need to contact any of the Cluster Sharding actors.

Bootstrapping Configurations

As mentioned in the Introduction, the PoC can be run and tested locally, run in Docker, and finally in Kubernetes. Configuration files for each environment and node type is provided in the src/main/resources.

src/main/resources

-rw-rw-r-- 1 sample sample  585 Jul 25 09:03 cluster-application-base.conf
-rw-rw-r-- 1 sample sample 1090 Jul  3 12:29 cluster-application.conf
-rw-rw-r-- 1 sample sample 1341 Jun 24 11:38 cluster-application-docker.conf
-rw-rw-r-- 1 sample sample 1231 Jun 24 14:19 cluster-application-k8s.conf
-rw-rw-r-- 1 sample sample  749 Jul 25 09:03 endpoint-application-base.conf
-rw-rw-r-- 1 sample sample  873 Jul  3 12:29 endpoint-application.conf
-rw-rw-r-- 1 sample sample 1139 Jun 24 11:39 endpoint-application-docker.conf
-rw-rw-r-- 1 sample sample 1321 Jun 24 14:22 endpoint-application-k8s.conf

Files names beginning with endpoint create Endpoint nodes, and file names beginning with cluster create Cluster Sharded nodes. The platform is determined by the remaining suffix of application.conf, application-docker, and application-k8s.conf support running local, Docker, and Kubernetes, respectively. Files ending with base.conf provide common configuration settings for each node type and are not referenced when starting nodes.

Monitoring and Telemetry

Being able to monitor today’s distributed microservices is a “must have” when deploying distributed systems, and collecting metrics is the first requirement. To that end we’re using Lightbend Telemetry to facilitate the collection of metrics to instrument the PoC to either Elastic Stack or Prometheus. To display charts and graphs we’ll be feeding data to Grafana for rendering. Common configuration settings for Telemetry are stored in the telemetry.conf file.

src/main/resources

-rw-rw-r-- 1 sample sample 1515 Jul  3 15:56 telemetry.conf
-rw-rw-r-- 1 sample sample  303 Jun 19 08:23 telemetry-elasticsearch.conf
-rw-rw-r-- 1 sample sample  249 Jul  1 15:40 telemetry-prometheus.conf

When running locally we’ll be using Lightbend’s Developer Sandbox for Elasticsearch to collect and display metrics collected by Lightbend Telemetry. Later, when running on Kubernetes we’ll be switching to Prometheus to collect metrics generated by the instrumentation and Lightbend Console with Graphana to display real time telemetry along with Lightbend provided dashboards for the Actor System, Cluster operations, plus others.

Testing the PoC Locally

Prerequisites

  1. Make sure you have the Java 8 JDK installed.
    java -version
    Make sure you have version 1.8.
    (If you don't have it installed, download Java here.)
  2. Make sure you have SBT installed.
    To check, open a terminal and type:
    sbt
    Make sure you have version 1.8.
    (if you don’t have it installed, follow these instructions.)
  3. Make sure you have Docker and Docker Compose installed. For more information, see this link.
  4. Make sure your Lightbend subscription credentials are properly configured. For more information, please see this link.
  5. Download and install the Lightbend’s Developer Sandbox for Elasticsearch.
  6. Clone the PoC repository to your local machine. Open a terminal, change to the desired destination directory and type:
    git clone git@github.com:michael-read/akka-typed-distributed-state-blog.git

If you don’t have a Lightbend subscription, and are following along, you don’t need to perform steps #4 & #5 above.

Testing locally with Akka Multi-JVM

We’re using the Akka Multi-JVM toolkit to form an Akka cluster locally using multiple JVMs and then running Scala integration tests to verify proper operation of the APIs for the PoC microservice.

We’re performing two types of tests through the ArtifactStatesSpec.scala file. First, we form an Akka cluster using three JVMs and test that we can set the state of a Persistent Entity internally through a proxy shard region. Finally, we test the external API provided by Akka HTTP.

In the final installment of this blog, we’ll do a deep dive into this testing approach.

To run tests locally:

  1. Start a single Cassandra node locally using docker-compose:
    From a new terminal go to the root directory of the PoC project, and start Cassandra by issuing the command:
    docker-compose -f docker-compose-cassandra.yml up
  2. Start the tests:
    From a new terminal go to the root directory of the PoC project, and start the tests with the command:
    sbt multi-jvm:test

It’s worth noting here that the first time an event is committed to Cassandra, the Akka Persistence Cassandra (akka-persistence-cassandra) plug-in, creates the supporting tables in the database, which can take a few seconds.

This is usually fine for a development environment, however, in production, the DevOps team usually creates supporting tables before deploying an application. For more information on these tables please see “Default keyspace and table definitions.”

Running the PoC locally

  1. Make sure Cassandra is still running from the docker-compose-cassandra.yml script described in the previous step.
  2. Start the Lightbend’s Developer Sandbox for Elasticsearch that you’ve downloaded earlier.
    From a new terminal, change into the unzipped directory for the downloaded developer sandbox scripts (cinnamon-elasticsearch-docker-sandbox-<version>), and issue the command:
    docker-compose up
  3. Start the entity cluster:
    From a new terminal go to the root directory of the PoC project, and start the Akka entity cluster with the command:
    sbt '; set javaOptions += "-Dconfig.resource=cluster-application.conf" ; run'

    If you don’t have a Lightbend subscription you can instead use the alternate configuration file nonsup-cluster-application.conf in the command above.

  4. Start the HTTP server to provide an API endpoint:
    From a new terminal go to the root directory of the PoC project, and start the Akka entity cluster with the command:
    sbt '; set javaOptions += "-Dconfig.resource=endpoint-application.conf" ; run'

    If you don’t have a Lightbend subscription you can instead use the alternate configuration file nonsup-endpoint-application.conf in the command above.

If you take a look at the logs for either of the terminal windows containing the entity cluster or HTTP server endpoint, you should see each of the applications start, find each other, and form a cluster.

If you want to verify the operation of the API manually you can use any of the following CURL commands in another terminal, which includes examples of all the API options:

Note: all API responses are provided in JSON format.

Artifact / User Read

Artifact / User Read provides two APIs: setArtifactReadByUser is used to set a boolean that indicates that the user has read the artifact, and isArtifactReadByUser is used to check if the artifact has been read. The two parameters, which are provided as JSON are artifactId, and userId.

curl -d '{"artifactId":1, "userId":"Michael"}' -H "Content-Type: application/json" -X POST http://localhost:8082/artifactState/setArtifactReadByUser
curl -d '{"artifactId":1, "userId":"Michael"}' -H "Content-Type: application/json" -X POST http://localhost:8082/artifactState/isArtifactReadByUser

Artifact / User Feed

Artifact / User Feed provides three APIs: setArtifactAddedToUserFeed is used to add an artifact to the User’s feed, isArtifactInUserFeed is used to see if a particular artifact is in the user’s feed, and finally setArtifactRemovedFromUserFeed is used to remove an artifact from the user’s feed.

curl -d '{"artifactId":1, "userId":"Michael"}' -H "Content-Type: application/json" -X POST http://localhost:8082/artifactState/setArtifactAddedToUserFeed
curl -d '{"artifactId":1, "userId":"Michael"}' -H "Content-Type: application/json" -X POST http://localhost:8082/artifactState/isArtifactInUserFeed
curl -d '{"artifactId":1, "userId":"Michael"}' -H "Content-Type: application/json" -X POST http://localhost:8082/artifactState/setArtifactRemovedFromUserFeed

Query All States

The final API provides access to all states through getAllStates. It’s worth noting here that both the POST and GET can be used to retrieve the states. The POST uses the JSON format to send the artifactId, and userId, whereas the GET uses the corresponding URL parameters.

curl -d '{"artifactId":1, "userId":"Michael"}' -H "Content-Type: application/json" -X POST http://localhost:8082/artifactState/getAllStates

Or:

curl 'http://localhost:8082/artifactState/getAllStates?artifactId=1&userId=Michael'

Load testing w/ Gatling

Gatling is a highly capable load testing tool built on top of Akka that is used to load test HTTP endpoints.

We’ve set up a testing scenario for running through all the APIs emulating a ramp up of one thousand users over a period of five minutes.

To start Gatling load test:

  1. In a new terminal switch to the gatling directory from the PoCs root directory.
  2. Enter the command:
    sbt gatling:test
    While the load test is running we recommend jumping to the next section and taking a look at the metrics being collected real time by Telemetry.
  3. Once the test has completed, a message is displayed referring you to the specific report. You can find all reports created by Gatling, which are stored the ./gatling/target/gatling directory that is created each time a load test is run.

Browsing through the Metrics captured by Lightbend Telemetry

You can find the Grafana Dashboards provided by Lightbend’s Developer Sandbox for Elasticsearch at http://localhost:3000.

Directions for using the Grafana Dashboards can be found here.

We recommended that you take a look at the following dashboards while Gatling load testing is running (previous section).

  • Akka Actors
  • Akka and Lagom Persistence
  • Akka HTTP and Play Servers
  • Akka Cluster
  • Akka Cluster Sharding
  • Akka Dispatchers
  • JVM Metrics

Shutting Down:

  1. In each of the terminal windows running the PoC (cluster & endpoint) simply issue a control+c to shutdown.
  2. In the terminal window running Cassandra, issue a control+c, and then issue the command:
    docker-compose down
  3. In the terminal window running Lightbend’s Developer Sandbox for Elasticsearch, issue a control+c.

Conclusion

In this blog post, we’ve introduced our PoC microservice use case, similar to how the state of your feed might be managed by LinkedIn, and how we can maintain distributed state across a cluster in a highly performant manner using Akka and other Lightbend technologies, and provided a working repository on Github with instructions on how to build and run the project locally on your own machine. We’ve also shown you how to run load testing with Gatling, and how to view the instrumentation for the PoC using Lightbend’s Developer Sandbox for Elasticsearch, as well testing using the Multi-JVM testing toolkit.

In future blog posts, we’ll show you how to deploy the PoC to Docker, and Kubernetes as well as perform deep dives into the source code including Akka 2.6.x functional Typed Persistent Actors, and finally the configuration that makes the PoC work. 

Next up in this series is Part 2: Docker and Local Deploy, in which we deploy to Docker to our local machine and test. Or, if you'd like to set up a demo or speak to someone about using Akka Cluster in your organization, click below to get started in that direction:

SEE AKKA CLUSTER IN ACTION