Building, testing, containerizing, deploying, and monitoring distributed microservices is difficult, but using the tools from Lightbend can make it a lot faster and easier to be successful. In this series of blog posts, we’ll walk you through a working Proof of Concept (PoC) built using Lightbend’s open source Scala programming language with the Akka distributed toolkit.
In this part, we walk you through building, testing, and running the PoC locally, with instrumentation and monitoring wired in from the very beginning using Lightbend Telemetry. In Part 2: Docker and Local Deploy, we deploy to Docker to our local machine and test. In Part 3: Kubernetes & Monitoring, we move our PoC to Kubernetes (on Minikube), review some monitoring options, and dive deep into the Kubernetes deployment YAML files. Finally, in Part 4: Source Code, we look at the Scala source code required to create the PoC.
The Github project covering this blog can be found and cloned from here.
In our PoC, the primary aim is to track two different digital artifact states, by user, for a user interface (UI) like LinkedIn. First, we want to know whether a given user has read a given artifact, which could literally be any type of digital payload. Second, we want to know if the artifact is currently included in the user’s feed. These two states are to be queried by the UI through a REST API, and then reflected in the UI. In both cases these states are simply represented as a boolean contained within a single Scala case class:
final case class CurrState(artifactRead: Boolean = false, artifactInUserFeed : Boolean = false)
Of course in the real world we’d want to maintain a lot more state than is described by these two fields, but for our purposes these will suffice.
The ultimate aim of our PoC is to be able to scale out to handle millions of users in a highly responsive manner, with a strongly consistent state, while maintaining a high level of performance, and resilience. In other words, the PoC delivers a highly performant distributed cache with a persistent backing that manifests the traits described in the Reactive Manifesto–responsive, elastic, resilient, and message driven.
Traditional architectures are built for Strong Consistency, but we’re building a microservice in a distributed system. So how can we achieve Strong Consistency in a distributed environment? Physics prevents Strong Consistency, so instead we must simulate it.
We could use locks and transactions to simulate it, but these create undesirable contention. Any two things that contend for a single resource are in competition. This competition can only have one winner, and others are forced to wait for the winner to complete. This contention creates bottlenecks and thus reduces our ability to scale.
Another scalability factor we must take into consideration when building distributed microservices are the effects of Coherency Delay. In a distributed system, synchronizing the state of multiple nodes is done using crosstalk or gossip. Each node in a cluster sends messages to other nodes informing them of any state changes.
The time it takes for this synchronization to complete is called the Coherency Delay, which further adds to the overhead of parallelizing a system, and therefore there is a price to pay. The bottom line is that we need to be aware of the costs associated with crosstalk and try to reduce it where we can.
In our use case we’re relying on the Akka toolkit to optimize the impacts associated with crosstalk. For more information on the costs of Coherency Delay please see the article on Gunther’s Universal Scalability Law.
We can balance the need for Strong Consistency with the need for scalability by seeking to eliminate contention and Coherency Delay by introducing Akka Cluster Sharding to limit the scope of contention, and to reduce crosstalk. Crosstalk is reduced because you don’t need to share the same state across the entire cluster. By using Akka Cluster Sharding, contention is isolated to a single entity via a unique identifier. Contention in one entity has little or no impact on the other entities.
Akka provides many different individual libraries, based upon the Actor Model, which is used to build highly scalable, cloud-native, and distributed applications. The Actor Model was first introduced in a 1973 paper by Carl Hewitt, and later popularized by the Erlang programming language. Now with the advent of multi-core computers, cloud based computing, and high speed networks, the Actor Model on the JVM has come into its own and matured through Akka.
Akka’s features are packaged into separate modules so you only need to include the dependencies you’re using.
In our PoC we will be using seven different open source Akka modules:
These are just the seven Akka modules that were used to build this PoC but there are many more interesting Akka modules that can be very useful in building distributed microservices. We'd like to call your attention to some of the others: Akka Streams, Akka Cluster Singleton, Distributed Data, Alpakka, gRPC, and Akka Enhancements.
This project takes advantage of additional technology enhancements that come with a Lightbend subscription, including:
The project is run and built using the Simple Build Tool (sbt). In the case that you’re not familiar with SBT, each project’s build configuration is based upon the contents of the project directory, and the build.sbt file that is saved in the root of the project’s directory. The project directory is used to add SBT plug-ins used by the build process, and depending on style sometimes used to configure a library of dependencies, and required versions. The build.sbt file provides the guts of the build process. In our PoC, the build.sbt file provides specific build dependencies, and run options for monitoring.
The source code directory layout is as follows:
src/
main/
resources/
scala/
multi-jvm/
scala/
test/
scala/
One thing that you might find odd is the multi-jvm directory, which is used for our test cases. The test directory is standard and provided for future unit tests.
Throughout this section we'll be referring to the image above, which provides a visualization of a small deployment of the PoC, with five Akka clustered nodes.
The first two nodes (Node 1, and Node2) are of the same type and provide two levels of functionality. The first functionality, referred to as Endpoint Router (EP), is an HTTP endpoint (Akka HTTP) which implements the REST API. The second functionality, referred to as Shard Region Proxy (SP), provides access into Akka Cluster Sharding. Commands and Queries are created by the EP, then wrapped in a sharding envelope that includes the targeted entity’s unique Id, and finally sent through the proxy for handling.
In the PoC, each Entity’s unique identity is a combination of the User’s Id, and the specific Artifact’s Id. For this node type, the only code I’ve created is for the EP, which is made up of three source files saved in the package com.lightbend.artifactstate.endpoint.
The next three nodes (Node 3, Node 4, and Node5), are of the same type and handle all sharding operations including but not limited to the Entity’s (E) life-cycle, and rebalancing shards, which are all provided by the Akka toolkit. The only source code that’s required for this node type is the implementation of the Entity itself. That’s it!
The main advantage of having two types of nodes is that each type can be scaled independently, as needed. With a project such as this, that’ll ultimately be deployed through container orchestration, Lightbend recommends using a single code base without sub-projects, and a node’s behavior be defined through configuration, and thus a single image is used. If sub-projects were used for this PoC then we’d have two image types to manage.
For this PoC each Entity persists it’s event log to Cassandra as depicted in the bottom right cluster through the Akka Persistence Cassandra Plug-in. If you don’t want to deal with building and managing a Cassandra cluster, which can be difficult, there are other Lightbend supported persistence plug-ins available including JDBC, and Couchbase. A complete list of Akka Persistence community plug-ins can be found here.
All circles in the image above represent actors and their size and color is primarily related to the hierarchy of the parent / child relationship. All communications between actors is message driven in an asynchronous non-block manner. For the most part, communication within each node follows the actor hierarchies through the SR, but documenting node to node actor communication was too busy to include in the image.
The primary key to communication between the nodes is the Shard Coordinator (SC) that keeps track of which shard regions are available, and what shard # the nodes are hosting. Another chore of the SC is handling the redistribution of shards when nodes go down, are removed, or are added to the cluster. The SC is a cluster singleton, and if the node hosting the SC goes down, a new SC is automatically recreated in another node. The cluster’s distributed data internals keeps track of the internal state of the SC if it ever needs to be reconstituted somewhere else.
The following sequence diagram illustrates a common message flow pattern when sending commands through the Endpoint and on to specific entities:
Notice that the response from the Entity above returns directly to the Endpoint where the command originated. This is because the internal API, defined by the Entity, requires a replyTo actor reference in the original command, thus bypassing the need to contact any of the Cluster Sharding actors.
As mentioned in the Introduction, the PoC can be run and tested locally, run in Docker, and finally in Kubernetes. Configuration files for each environment and node type is provided in the src/main/resources.
src/main/resources
-rw-rw-r-- 1 sample sample 585 Jul 25 09:03 cluster-application-base.conf
-rw-rw-r-- 1 sample sample 1090 Jul 3 12:29 cluster-application.conf
-rw-rw-r-- 1 sample sample 1341 Jun 24 11:38 cluster-application-docker.conf
-rw-rw-r-- 1 sample sample 1231 Jun 24 14:19 cluster-application-k8s.conf
-rw-rw-r-- 1 sample sample 749 Jul 25 09:03 endpoint-application-base.conf
-rw-rw-r-- 1 sample sample 873 Jul 3 12:29 endpoint-application.conf
-rw-rw-r-- 1 sample sample 1139 Jun 24 11:39 endpoint-application-docker.conf
-rw-rw-r-- 1 sample sample 1321 Jun 24 14:22 endpoint-application-k8s.conf
Files names beginning with endpoint create Endpoint nodes, and file names beginning with cluster create Cluster Sharded nodes. The platform is determined by the remaining suffix of application.conf, application-docker, and application-k8s.conf support running local, Docker, and Kubernetes, respectively. Files ending with base.conf provide common configuration settings for each node type and are not referenced when starting nodes.
Being able to monitor today’s distributed microservices is a “must have” when deploying distributed systems, and collecting metrics is the first requirement. To that end we’re using Lightbend Telemetry to facilitate the collection of metrics to instrument the PoC to either Elastic Stack or Prometheus. To display charts and graphs we’ll be feeding data to Grafana for rendering. Common configuration settings for Telemetry are stored in the telemetry.conf file.
src/main/resources
-rw-rw-r-- 1 sample sample 1515 Jul 3 15:56 telemetry.conf
-rw-rw-r-- 1 sample sample 303 Jun 19 08:23 telemetry-elasticsearch.conf
-rw-rw-r-- 1 sample sample 249 Jul 1 15:40 telemetry-prometheus.conf
When running locally we’ll be using Lightbend’s Developer Sandbox for Elasticsearch to collect and display metrics collected by Lightbend Telemetry. Later, when running on Kubernetes we’ll be switching to Prometheus to collect metrics generated by the instrumentation and Lightbend Console with Graphana to display real time telemetry along with Lightbend provided dashboards for the Actor System, Cluster operations, plus others.
java -version
sbt
git clone git@github.com:michael-read/akka-typed-distributed-state-blog.git
If you don’t have a Lightbend subscription, and are following along, you don’t need to perform steps #4 & #5 above.
We’re using the Akka Multi-JVM toolkit to form an Akka cluster locally using multiple JVMs and then running Scala integration tests to verify proper operation of the APIs for the PoC microservice.
We’re performing two types of tests through the ArtifactStatesSpec.scala file. First, we form an Akka cluster using three JVMs and test that we can set the state of a Persistent Entity internally through a proxy shard region. Finally, we test the external API provided by Akka HTTP.
In the final installment of this blog, we’ll do a deep dive into this testing approach.
To run tests locally:
docker-compose -f docker-compose-cassandra.yml up
sbt multi-jvm:test
It’s worth noting here that the first time an event is committed to Cassandra, the Akka Persistence Cassandra (akka-persistence-cassandra) plug-in, creates the supporting tables in the database, which can take a few seconds.
This is usually fine for a development environment, however, in production, the DevOps team usually creates supporting tables before deploying an application. For more information on these tables please see “Default keyspace and table definitions.”
docker-compose up
sbt '; set javaOptions += "-Dconfig.resource=cluster-application.conf" ; run'
If you don’t have a Lightbend subscription you can instead use the alternate configuration file nonsup-cluster-application.conf in the command above.
sbt '; set javaOptions += "-Dconfig.resource=endpoint-application.conf" ; run'
If you don’t have a Lightbend subscription you can instead use the alternate configuration file nonsup-endpoint-application.conf in the command above.
If you take a look at the logs for either of the terminal windows containing the entity cluster or HTTP server endpoint, you should see each of the applications start, find each other, and form a cluster.
If you want to verify the operation of the API manually you can use any of the following CURL commands in another terminal, which includes examples of all the API options:
Note: all API responses are provided in JSON format.
Artifact / User Read
Artifact / User Read provides two APIs: setArtifactReadByUser is used to set a boolean that indicates that the user has read the artifact, and isArtifactReadByUser is used to check if the artifact has been read. The two parameters, which are provided as JSON are artifactId, and userId.
curl -d '{"artifactId":1, "userId":"Michael"}' -H "Content-Type: application/json" -X POST http://localhost:8082/artifactState/setArtifactReadByUser
curl -d '{"artifactId":1, "userId":"Michael"}' -H "Content-Type: application/json" -X POST http://localhost:8082/artifactState/isArtifactReadByUser
Artifact / User Feed
Artifact / User Feed provides three APIs: setArtifactAddedToUserFeed is used to add an artifact to the User’s feed, isArtifactInUserFeed is used to see if a particular artifact is in the user’s feed, and finally setArtifactRemovedFromUserFeed is used to remove an artifact from the user’s feed.
curl -d '{"artifactId":1, "userId":"Michael"}' -H "Content-Type: application/json" -X POST http://localhost:8082/artifactState/setArtifactAddedToUserFeed
curl -d '{"artifactId":1, "userId":"Michael"}' -H "Content-Type: application/json" -X POST http://localhost:8082/artifactState/isArtifactInUserFeed
curl -d '{"artifactId":1, "userId":"Michael"}' -H "Content-Type: application/json" -X POST http://localhost:8082/artifactState/setArtifactRemovedFromUserFeed
Query All States
The final API provides access to all states through getAllStates. It’s worth noting here that both the POST and GET can be used to retrieve the states. The POST uses the JSON format to send the artifactId, and userId, whereas the GET uses the corresponding URL parameters.
curl -d '{"artifactId":1, "userId":"Michael"}' -H "Content-Type: application/json" -X POST http://localhost:8082/artifactState/getAllStates
Or:
curl 'http://localhost:8082/artifactState/getAllStates?artifactId=1&userId=Michael'
Gatling is a highly capable load testing tool built on top of Akka that is used to load test HTTP endpoints.
We’ve set up a testing scenario for running through all the APIs emulating a ramp up of one thousand users over a period of five minutes.
To start Gatling load test:
sbt gatling:test
You can find the Grafana Dashboards provided by Lightbend’s Developer Sandbox for Elasticsearch at http://localhost:3000.
Directions for using the Grafana Dashboards can be found here.
We recommended that you take a look at the following dashboards while Gatling load testing is running (previous section).
docker-compose down
In this blog post, we’ve introduced our PoC microservice use case, similar to how the state of your feed might be managed by LinkedIn, and how we can maintain distributed state across a cluster in a highly performant manner using Akka and other Lightbend technologies, and provided a working repository on Github with instructions on how to build and run the project locally on your own machine. We’ve also shown you how to run load testing with Gatling, and how to view the instrumentation for the PoC using Lightbend’s Developer Sandbox for Elasticsearch, as well testing using the Multi-JVM testing toolkit.
In future blog posts, we’ll show you how to deploy the PoC to Docker, and Kubernetes as well as perform deep dives into the source code including Akka 2.6.x functional Typed Persistent Actors, and finally the configuration that makes the PoC work.
Next up in this series is Part 2: Docker and Local Deploy, in which we deploy to Docker to our local machine and test. Or, if you'd like to set up a demo or speak to someone about using Akka Cluster in your organization, click below to get started in that direction: