In many real-world applications, it is required to process data as it arrives as a stream of real time events. Examples of such applications include financial transactions, cars in motion emitting GPS signals, measurements from industrial sensors, web traffic, and so on.
One of the main requirements for such systems is low latency processing of the streaming data, but it is not the only requirement. As such systems are moving to the forefront of enterprise application architecture, other requirements for a well designed architecture include:
Many of these capabilities are implemented by the open source project Cloudflow, which helps developers and architects quickly develop, orchestrate, and operate distributed streaming applications on Kubernetes.
Cloudflow supports the building of streaming applications by using a set of small, composable components communicating over Kafka and wired together with schema-based contracts. This approach can significantly improve component reuse, saving time, and dramatically accelerates streaming application development. At the time of this writing, such components can be implemented using Akka Streams, Apache Flink and Apache Spark.
Everything in Cloudflow is done in the context of an application, which represents a self-contained distributed system (graph) of data processing services connected together by data streams over Kafka.
In the following figure we show a Cloudflow application that contains four components that communicate through message passing, leveraging Kafka topics.
This application has two Akka Streams components, one Flink component and one Spark component.
Cloudflow supports:
With multiple runtimes natively supported by Cloudflow, we often hear questions about which runtime to use for a specific implementation. In this series of blog posts, we will look at some of the capabilities of Apache Flink and Akka Streams Cloudflow runtimes and show how to implement the same functionality for both.
Apache Flink is an open source streaming platform which supports real-time data processing pipelines in a fault-tolerant way at scale–i.e. millions of events per second. As defined here, the main features of Flink are:
Flink is an application server and as such is based on organizing computations in blocks and leveraging cluster architectures1. Splitting computations in blocks enables execution parallelism, where different blocks run on different threads on the same machine, or on different machines. It also enables failover by moving execution blocks from failed machines to the healthy ones.
As with any application engine, Flink requires a developer to adhere to its programming model and deployment. They also often require a steeper learning curve for mastering their functionality.
Fortunately Cloudflow hides many of the details of packaging and deployment of Flink applications, which means that the user just writes a basic functionality that he needs to implement as (a set) of classes.
Akka-streams is a library that is built atop of the Akka actor toolkit, which follows to the Reactive Streams initiative and protocols. The Akka Streams API allows us to easily compose data transformation flows from independent steps. It provides easy-to-use APIs to create streams that leverage the power of Akka without explicitly defining actor behaviors and messages (you can, of course, explicitly use Actors in Akka Streams, as we will show later in this blog post).
Akka Streams provide a higher-level abstraction over Akka’s existing actor model. This allows you to focus on business logic and forget about all of the boilerplate code required to manage the actor. Akka Streams is based on the following core API concepts:
The Akka Streams API provides us a builder pattern syntax to chain source, flow, and sink components in order to create a graph (RunnableGraph in Akka Streams terminology). A graph must have at least one Source, one Sink and any amount of flows. Our graph represents just a description of the topology, and it’s completely “lazy”: it starts only when a Source starts emitting data. Running a graph will allocate the needed resources to execute topology, like actors and threads.
The main features of Akka Streams are:
Unlike Flink, Akka Streams is not an application server, but rather a stream processing library, with a Domain Specific language (DSL) providing a set of constructs simplifying building streaming applications. As outlined in Jay Kreps' blog, stream processing engines and stream processing libraries are two very different approaches to building streaming applications.
Stream processing libraries are typically easier to use, providing more flexibility, but require specific implementation of deployment, scalability and load balancing. Fortunately Cloudflow implements most of these concerns, so similar to Flink, the user just writes a basic functionality that he needs to implement as (a set) of classes.
Additionally, the Akka Streams runtime in Cloudflow is fully integrated with Akka Cluster, leveraging Kafka aware cluster sharding, which allows for very powerful stateful processing as described later in this blog.
One of the common considerations for choosing a runtime is the requirement for windowing. While Flink out of the box provides a very well defined support for time and windowing, similar support in Akka Streams is fairly weak. Currently Akka Streams provides several operators that can be (sort of) used for windowing:
None of these implementations really provide the functionality necessary for true time/window support. Does this mean that every time a service has a time/window requirement, one has to use Flink? Not necessarily.
Over the time there were quite a few attempts to enhance Akka Streams with windowing. This blog post provides an excellent overview of windowing along with an implementation, other implementations are provided here and here. Unfortunately none of these implementations provide time functionality - they are all based on processing time, not event time. They also do not ensure absence of data lost in the case of execution restarts, which in Cloudflow is implemented leveraging Kafka commit after the message was processed. As a result we will here provide a new implementation based on Akka Stream’s custom graph stage. We implemented the following timing windows:
Both types of timing windows support both events (stored inside the message) and processing time and watermarking (for the cases of event time). We also provide two types of session windows:
Both types of session windows support both events (stored inside the message) and processing time. All of the windows implementations support maximum window data size. If this size is met, then we would leave only the most recent data.
As we mentioned above, all windows implementations are based on custom graph stage, which internally captures the content of the window and a list of Kafka offsets for each message inside the window.
As a result of such implementation, none of the messages are committed until the window is processed as a whole. This means that although a window's data is collected in memory, if the execution fails and the instance restarts, all of the messages of the window(s) that have not been processed will be reread.
Support for watermarks for time based windows and inactivity for the session windows is based on timer (we are using TimerGraphStageLogic as the base class for the GraphStages logic). We are here using timers with fixed intervals (which can be controlled by windows parameters) and checking required conditions at each timer firing.
Below is the code for the tumbling window implementation:
case class TumblingWindow[T](duration: FiniteDuration,
time_extractor: T ⇒ Long = (_: T) ⇒ System.currentTimeMillis(),
watermark: FiniteDuration = 0.millisecond, maxsize: Int = 1000,
watermakInterval: FiniteDuration = 100.millisecond)
extends GraphStage[FlowShape[(T, Committable), (Seq[T], Committable)]] {
// Define inlets and outlets and shape
private val inlet = Inlet[(T, Committable)]("TumblingWindow.inlet")
private val outlet = Outlet[(Seq[T], Committable)]("TumblingWindow.outlet")
override val shape: FlowShape[(T, Committable), (Seq[T], Committable)] =
FlowShape(inlet, outlet)
// Create stage logic with timer
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with InHandler with OutHandler {
// Internal parameters
private val buffer = new ListBuffer[T] // Data collector
private val offsets = new ListBuffer[Committable] // Context collector
private var windowStart: Long = -1 // Collection start time
private var windowEnd: Long = -1 // Collection end time
private val completed = new ListBuffer[ReadyWindow[T]] // Windows waiting for watermark
// Convert timings to milliseconds
private val durationMS = duration.toMillis
private val watermarkMS = watermark.toMillis
// Setup handlers
this.setHandlers(inlet, outlet, this)
// pre start - start timer
override def preStart(): Unit =
scheduleWithFixedDelay(shape, 0.millisecond, watermakInterval)
// post stop - cleanup
override def postStop(): Unit = {
buffer.clear()
offsets.clear()
completed.clear()
}
// Get new event
override def onPush(): Unit = {
// Get element and its time
val event = grab(inlet)
val time = time_extractor(event._1)
// First time through - initial setup
if (windowStart < 0) {
setup(time)
}
if ((windowStart until windowEnd) contains time) {
// We are in the current window the window
updateData[T](event._1, buffer, maxsize)
} else if (time >= windowEnd) {
val currentTime = System.currentTimeMillis()
// Event pass the window
completed +=
ReadyWindow[T](windowStart until windowEnd, buffer.clone(), maxsize,
offsets.clone(), currentTime + watermarkMS)
// Reset for next window
setup(windowEnd)
// Store this event
buffer += event._1
// Push as quickly as possible
submitReadyWindows(currentTime)
} else {
// Late arriving event - store it if window is still around
for (window ← completed) {
if (window.range.contains(time))
window.addLateEvent(event._1)
}
}
// Regardless of where the actual data goes, the offset should go here
// to avoid premature commits
offsets += event._2
// Get next request
if (!hasBeenPulled(inlet)) pull(inlet)
}
// new window request
override def onPull(): Unit = {
if (!hasBeenPulled(inlet)) pull(inlet)
}
// New timer event
override protected def onTimer(timerKey: Any): Unit = {
// Timer event, see if we can submit some waitinfg events
submitReadyWindows(System.currentTimeMillis())
}
// Support methods
private def setup(time: Long): Unit = {
buffer.clear()
offsets.clear()
windowStart = time
windowEnd = windowStart + durationMS
}
private def submitReadyWindows(currentTime: Long): Unit = for (window ← completed) {
if (window.isReady(currentTime)) {
val commitable = CommittableOffsetBatch(window.committable)
println(s"Send committable $commitable")
// Make sure to remove feom completed
completed -= window
emit(outlet, (window.buffer, commitable))
}
}
}
}
The rest of the windows implementations can be found in Github for Sliding window, SessionInactivityWindow and SessionValueWindow.
With these implementations in place we can now use them. Here is an example of usage of tumbling windows:
def runnableGraph() = sourceWithCommittableContext(in)
.via(TumblingWindow[SimpleMessage](duration = 3.second, time_extractor = (msg) ⇒ msg.ts,
watermark = 1.2.second))
.map(records ⇒ {
println("Got new Tumbling window")
records.foreach(record ⇒ println(s" time ${record.ts} - value ${record.value}"))
})
.to(committableSink(committerSettings))
Here we are running the source (messages from Kafka) through a tumbling window with a duration of 3 seconds, using event time (located in the ts element of the message) with the watermark of 1.2 seconds.
Usage of the sliding window is very similar:
def runnableGraph() = sourceWithCommittableContext(in)
.via(SlidingWindow[SimpleMessage](duration = 3.second, slide = 1.5.second,
time_extractor = (msg) ⇒ msg.ts, watermark = 1.2.second))
.map(records ⇒ {
println("Got new Sliding window")
records.foreach(record ⇒ println(s" time ${record.ts} - value ${record.value}"))
})
.to(committableSink(committerSettings))
The structure of the code, in this case is identical, and the window parameters are very similar as well, with addition of the slide parameter, defining when in the current window to start the next window. Usage of the session windows can be found in GitHub for SessionInactivityWindow and SessionValueWindow.
Windowing is a very important technique in stream processing, but while using it it is important to understand, that it is not free. Windowing requires memory, typically proportional to the window size. The implementation allows you to limit the amount of memory used for windowing data (not offsets), but even with this, when designing windows, it is necessary to consider memory requirements for windows, to make sure that you have enough memory.
Windowing implementation described here allows users to leverage Akka Streams for cases when time/windowing functionality is required.
Check out my free eBook: Serving Machine Learning Models
Any application that processes a stream of events and does not just perform trivial record-at-a-time transformations needs to be stateful, with the ability to store and access intermediate data.
As we have mentioned above, stateful processing is one of the main capabilities of Flink. It supports two types of state: keyed state and operator state.
Keyed state can only be used in functions and operators on a KeyedStream, where each record has an explicit key. It can be thought of as a partitioned or sharded hashmap, with exactly one state-partition per key. Keyed state is organized into key groups. Key groups are atomic units by which Flink can redistribute keyed states. The amount of key groups is defined by application’s parallelism. Flink guarantees processing of all keys from a given key group in the same task manager.
For operator (non-keyed) state, each operator state is bound to one parallel operator instance. A good example of operator state can be found in Kafka Connector implementation - there is one instance of the connector running on every node. Each consumer maintains a map of topic partitions and offsets as its operator state. Operator state also supports redistributing state among parallel operator instances when the parallelism is changed.
Both keyed and operator state can exist in two forms: managed and raw.
Managed state is implemented by data structures controlled by the Flink runtime, such as internal hash tables, or RocksDB. Flink’s runtime encodes the states and writes them into the checkpoints as part of checkpointing implementation. The types of managed state include ValueState, ListState, etc.
Raw state is a state that implementations keep in their own data structures. Flink knows nothing about the state’s data structures and sees only the raw bytes. When checkpointed, Flink only writes a sequence of bytes into the checkpoint.
Although Akka Streams by itself is stateless, there are many ways to implement stateful streaming in Akka Streams by leveraging existing elements like statefulMapConcat
or a custom graph stage. However, the most straightforward and natural way of doing this is integration with Akka actors.
In this case, Akka Streams manage the overall execution flow, while individual actors both maintain state and implement stateful operations. Coupled with Akka Persistence, this approach provides a great foundation of stateful, fault-tolerant stream processing using Akka streams.
Here we will use a simplified version of TaxiRide application to show how stateful stream processing can be implemented in Cloudflow using both Flink and Akka Streams runtimes.
The main part of the Flink implementation is a processor that reads two streams–taxi ride and taxi fare–and for each rideID
calculates the fare for a given ride.
The actual merge is implemented by a rich function - RichCoFlatMapFunction
, that leverages shared keyed state:
class EnrichmentFunction extends RichCoFlatMapFunction[TaxiRide, TaxiFare, TaxiRideFare] {
@transient var rideState: ValueState[TaxiRide] = null
@transient var fareState: ValueState[TaxiFare] = null
override def open(params: Configuration): Unit = {
super.open(params)
rideState =
getRuntimeContext.getState(new ValueStateDescriptor[TaxiRide]("saved ride", classOf[TaxiRide]))
fareState = getRuntimeContext.getState(new ValueStateDescriptor[TaxiFare]("saved fare", classOf[TaxiFare]))
}
override def flatMap1(ride: TaxiRide, out: Collector[TaxiRideFare]): Unit = {
val fare = fareState.value
if (fare != null) {
fareState.clear()
out.collect(new TaxiRideFare(ride.rideId, fare.totalFare))
} else {
rideState.update(ride)
}
}
override def flatMap2(fare: TaxiFare, out: Collector[TaxiRideFare]): Unit = {
val ride = rideState.value
if (ride != null) {
rideState.clear()
out.collect(new TaxiRideFare(ride.rideId, fare.totalFare))
} else {
fareState.update(fare)
}
}
}
This function is used by a main logic of the Flink implementation:
override def createLogic() = new FlinkStreamletLogic {
override def buildExecutionGraph = {
val rides: DataStream[TaxiRide] =
readStream(inTaxiRide)
.filter { ride ⇒
ride.isStart.booleanValue
}
.keyBy("rideId")
val fares: DataStream[TaxiFare] =
readStream(inTaxiFare)
.keyBy("rideId")
val processed: DataStream[TaxiRideFare] =
rides
.connect(fares)
.flatMap(new EnrichmentFunction)
writeStream(out, processed)
}
}
Here we read both streams, key them by the same key (ride ID) and connect these streams using EnrichmentFunction
(shown above).
The overall implementation is quite simple, very expressive and easy to read. Because Cloudflow is running a Flink server with checkpointing, we are guaranteed that the state here is persistent, which means that in the case of restart of any of the task manager the state will be restored. In the current version (Cloudflow is currently on Flink v1.10), this does not ensure the full high availability–in the case of job manager failures, the snapshots are not getting restored.
High availability for Flink kubernetes deployments was introduced in Flink 1.12. Once cloudflow supports this latest Flink version, then full high availability for Flink stateful stream processing will be in place.
As described above, Akka Streams stateful streaming is implemented using Akka Actors with cluster sharding. To use Kafka aware sharding efficiently, both streams should come from the same topic, to ensure that their sharding is the same. As a result our Akka Streams processor implementation has to contain two streamlets: streams merger and the actual processor.
For merger implementation we introduce an additional data type allowing to send either taxi ride and taxi fare (we are using protobufs here):
message TaxiRideOrFare {
int64 rideId = 1;
oneof message_type {
TaxiFare fare = 2;
TaxiRide ride = 3;
}
}
With this new combined data type the implementation of the merger streamlet is straightforward:
class MessageMerger extends AkkaStreamlet {
val inTaxiRide = ProtoInlet[TaxiRide]("in-taxiride")
val inTaxiFare = ProtoInlet[TaxiFare]("in-taxifare")
val out = ProtoOutlet[TaxiRideOrFare]("taxirideorfare", _.rideId.toString)
val shape = StreamletShape.withInlets(inTaxiRide, inTaxiFare).withOutlets(out)
override protected def createLogic(): AkkaStreamletLogic = new RunnableGraphStreamletLogic() {
def runnableGraph = {
Merger.source(Seq(
sourceWithCommittableContext(inTaxiFare).map(f => TaxiRideOrFare(rideId = f.rideId, messageType = MessageType.Fare(f))),
sourceWithCommittableContext(inTaxiRide).map(r => TaxiRideOrFare(rideId = r.rideId, messageType = MessageType.Ride(r)))
)).to(committableSink(out))
}
}
}
Once we have a combined input, similar to Flink, the implementation, an Akka Streams implementation is comprised of two parts - the actor, implementing data merger (compare with EnrichmentFunction
implementation for Flink) and the main logic leveraging this actor (compare with implementation of main logic for Flink). The actor can be implemented as follows:
object RideShare {
def apply(rideid: String): Behavior[ProcessMessage] = {
// Execute behavior with the current state
def executionstate(rideState: Option[TaxiRide], fareState: Option[TaxiFare]): Behavior[ProcessMessage] = {
// Behaviour describes processing of the actor
Behaviors.receive { (context, msg) => {
msg.record.messageType match {
// Ride message
case MessageType.Ride(ride) =>
fareState match {
case Some(fare) =>
// We have a fare with the same ride ID - produce result
msg.reply ! Some(TaxiRideFare(ride.rideId, fare.totalFare))
executionstate(rideState, None)
case _ =>
// Remember the ride to be used when the fare with the same ride ID arrives
msg.reply ! None
executionstate(Some(ride), fareState)
}
// Fare message
case MessageType.Fare(fare) =>
rideState match {
case Some(ride) =>
// We have a ride with the same ride ID - produce result
msg.reply ! Some(TaxiRideFare(ride.rideId, fare.totalFare))
executionstate(None, fareState)
case None =>
// Remember the fare to be used when the ride with the same ride ID arrives
msg.reply ! None
executionstate(rideState, Some(fare))
}
// Unknown message type - ignore
case MessageType.Empty =>
executionstate(rideState, fareState)
}
}}
}
// Initialize state
executionstate(None, None)
}
}
}
We can see that this implementation is virtually identical to the rich function in Flink. The two main differences are:
match
thus supporting as many message types as required.With the actor in place, it can be used to implement a main logic:
override protected def createLogic(): AkkaStreamletLogic = new RunnableGraphStreamletLogic() {
val typeKey = EntityTypeKey[ProcessMessage]("RideShare")
val entity = Entity(typeKey)(createBehavior = entityContext => RideShare(entityContext.entityId))
val sharding = clusterSharding()
def runnableGraph = {
shardedSourceWithCommittableContext(inTaxiMessage, entity).via(messageFlow).to(committableSink(out))
}
private def messageFlow =
FlowWithCommittableContext[TaxiRideOrFare]
.mapAsync(1)(msg ⇒ {
val actor = sharding.entityRefFor(typeKey, msg.rideId.toString)
actor.ask[Option[TaxiRideFare]](ref => ProcessMessage(ref, msg))
}).collect{ case Some(v) => v }
}
Unlike Flink implementation, here there is no explicit keyBy
clause, instead it relies explicitly on the key from the incoming message and chooses the actor’s instance based on this key. This is done to ensure the locality of the actors for a running instance Akka Streams (see scaling applications below)
Flink data model is not based on key-value pairs. Therefore, it does not require physically packing the data set types into keys and values. Keys are “virtual”: they are defined as functions over the actual data (not the message key) to guide the grouping operator. Although this is also a case in Akka Stream implementation, in order to ensure “locality” of the actor, key has to be defined at the message level and effectively is a key in the Kafka message. Of course sharding can be defined differently, but this will lead to cross instance communications.
Additionally, unlike Flink implementation, where streams are connected before invoking RichCoFlatMapFunction
, in the case of Akka, we do merge as a separate streamlet and then process messages, which can contain either ride or fare information.
This implementation works, but it lacks persistence. All of the object state is in memory and will be destroyed on an instance restart. Let's take a look at how to add persistence to this implementation.
One of the important features of the Akka toolkit is its richness. It includes a wealth of features, including persistence. Persistence here allows us to persist the state of actors2, which enables both restore actors in the case of implementation failures and move actors from one instance to another (we will talk about it later when we will discuss scaling).
Akka Persistence supports multiple options for the database backend, including Cassandra and several relational databases: Postgres, MySQL, H2, Oracle, SQL Server. For our implementation we decided to use Postgres database. The first step for persistence implementation is to add the following additional jars to our build:
val akkaVersion = "2.6.10"
val jdbcVersion = "4.0.0"
val postgreVersion = "42.2.16"
"com.typesafe.akka" %% "akka-persistence-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence-query" % akkaVersion,
"com.lightbend.akka" %% "akka-persistence-jdbc" % jdbcVersion,
"org.postgresql" % "postgresql" % postgreVersion,
Additionally we need to create and initialize our Postgres database. With this in place, we need to first modify the actor's implementation following Cluster sharding persistence documentation. We first need to create case classes used for implementation:
// Command
final case class TaxiRideMessage(reply: ActorRef[Option[TaxiRideFare]], record : TaxiRideOrFare) extends CborSerializable
// Event
final case class TaxiRideEvent(state : TaxiState) extends CborSerializable
// State
final case class TaxiState(rideState: Option[TaxiRide], fareState: Option[TaxiFare]) extends CborSerializable{
def reset(updated: TaxiState): TaxiState = copy(updated.rideState, updated.fareState)
}
With this in place actor’s implementation looks as follows:
object RideShare{
private val commandHandler: (TaxiState, TaxiRideMessage) => Effect[TaxiRideEvent, TaxiState] = { (state, cmd) =>
cmd match {
case cmd: TaxiRideMessage => processTaxiMessage(state, cmd)
}
}
private val eventHandler: (TaxiState, TaxiRideEvent) => TaxiState = {
(state, evt) => state.reset(evt.state)
}
def apply(entityId: String, persistenceId: PersistenceId): Behavior[TaxiRideMessage] = {
Behaviors.setup { context =>
EventSourcedBehavior(persistenceId, emptyState = TaxiState(None, None), commandHandler, eventHandler)
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 50, keepNSnapshots = 3))
}
}
}
The implementation consists of three main parts:
processTaxiMessage
, which we will discuss belowThe implementation above is using process taxi message to implement taxi ride specific logic:
private def processTaxiMessage(state: TaxiState, cmd: TaxiRideMessage): Effect[TaxiRideEvent, TaxiState] = {
def processMessage(): (TaxiState, Option[TaxiRideFare]) =
cmd.record.messageType match {
// Ride message
case MessageType.Ride(ride) =>
state.fareState match {
case Some(fare) =>
// We have a fare with the same ride ID - produce result
(TaxiState(state.rideState, None), Some(TaxiRideFare(ride.rideId, fare.totalFare)))
case _ =>
// Remeber the ride to be used when the fare with the same ride ID arrives
(TaxiState(Some(ride), state.fareState), None)
}
// Fare message
case MessageType.Fare(fare) =>
state.rideState match {
case Some(ride) =>
// We have a ride with the same ride ID - produce result
(TaxiState(None, state.fareState), Some(TaxiRideFare(ride.rideId, fare.totalFare)))
case None =>
// Remeber the fare to be used when the ride with the same ride ID arrives
(TaxiState(state.rideState, Some(fare)), None)
}
// Unknown message type - ignore
case MessageType.Empty =>
(state, None)
}
// Calculate new State and reply
val stateWithReply = processMessage()
// Persist state and send reply
Effect.persist(TaxiRideEvent(stateWithReply._1)).thenRun(state => cmd.reply ! stateWithReply._2)
}
This implementation is very similar to the original one, with the difference that we are using the persist effect to store data. Finally we need to modify streamlet implementation slightly to add persistence ID
override protected def createLogic(): AkkaStreamletLogic = new RunnableGraphStreamletLogic() {
val typeKey = EntityTypeKey[TaxiRideMessage]("RideShare")
val entity = Entity(typeKey)(createBehavior = entityContext =>
RideShare(entityContext.entityId, PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId)))
val sharding = clusterSharding()
def runnableGraph = {
shardedSourceWithCommittableContext(inTaxiMessage, entity).via(messageFlow).to(committableSink(out))
}
private def messageFlow =
FlowWithCommittableContext[TaxiRideOrFare]
.mapAsync(1)(msg ⇒ {
val actor = sharding.entityRefFor(typeKey, msg.rideId.toString)
actor.ask[Option[TaxiRideFare]](ref => TaxiRideMessage(ref, msg))
}).collect{ case Some(v) => v }
}
Adding persistence to our Akka streams implementation makes it fully reliable. In the case of any application restarts, the state will be fully restored. This implementation is slightly less readable compared to the Flink one, it provides the same functionality, and even better reliability.
The application that we are using is a little bit unusual. In our application, the new keys are constantly created and are fairly short lived. As a result, actors are being created and garbage collection is happening all the time.
To avoid this constant churn of actors, we can remember that Akka cluster sharding is synchronised with Kafka partitioning. This means that we can create an actor per Kafka partition4, holding all the state for this partition.
To do this let's remember that default Kafka partitioner uses a 32-bit murmur2 hash to compute the partition id based on the key (bytes) and the amount of partition). If we assume that the default partitioner is used, we can implement key calculation as following:
def convertRideToPartition(rideID : Long) : String = {
// hash the keyBytes to choose a partition
val bytes = BigInt(rideID).toByteArray
val converted = Utils.toPositive(Utils.murmur2(bytes)) % numberOfPartitions
converted.toString
}
With this function for calculating actor key we can modify message flow to pick the actor id based on the shards:
private def messageFlow =
FlowWithCommittableContext[TaxiRideOrFare]
.mapAsync(1)(msg ⇒ {
val rideId = if(msg.messageType.isFare) msg.messageType.fare.get.rideId else msg.messageType.ride.get.rideId
val actor = sharding.entityRefFor(typeKey, KafkaSupport.convertRideToPartition(rideId))
actor.ask[Option[TaxiRideFare]](ref => TaxiRideMessage(ref, msg))
}).collect{ case Some(v) => v }
}
The implementation of the actor itself has to be changed as well. First we need to change supporting state classes to reflect that the state now is not Options, but rather Maps, that can contain information for multiple ride ids belonging to a given partition.
// Command
final case class TaxiRideMessage(reply: ActorRef[Option[TaxiRideFare]], record : TaxiRideOrFare) extends CborSerializable
// Event
final case class TaxiRideEvent(state : TaxiState) extends CborSerializable
// State
final case class TaxiState(rideState: Map[Long, TaxiRide], fareState: Map[Long, TaxiFare]) extends CborSerializable{
def reset(updated: TaxiState): TaxiState = copy(updated.rideState, updated.fareState)
}
With this in place, both command and event handlers state unchanged, while an apply method has to change to reflect a new state definition:
def apply(entityId: String, persistenceId: PersistenceId): Behavior[TaxiRideMessage] = {
Behaviors.setup { context =>
EventSourcedBehavior(persistenceId, emptyState = TaxiState(Map(), Map()), commandHandler, eventHandler)
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 50, keepNSnapshots = 3))
}
}
The biggest change in this case is in the process taxi message:
private def processTaxiMessage(state: TaxiState, cmd: TaxiRideMessage): Effect[TaxiRideEvent, TaxiState] = {
def processMessage(): (TaxiState, Option[TaxiRideFare]) =
cmd.record.messageType match {
// Ride message
case MessageType.Ride(ride) =>
state.fareState.get(ride.rideId) match {
case Some(fare) =>
// We have a fare with the same ride ID - produce result
(TaxiState(state.rideState, state.fareState - ride.rideId), Some(TaxiRideFare(ride.rideId, fare.totalFare)))
case _ =>
// Remeber the ride to be used when the fare with the same ride ID arrives
(TaxiState(state.rideState + (ride.rideId -> ride), state.fareState), None)
}
// Fare message
case MessageType.Fare(fare) =>
state.rideState.get(fare.rideId) match {
case Some(ride) =>
// We have a ride with the same ride ID - produce result
(TaxiState(state.rideState - ride.rideId, state.fareState), Some(TaxiRideFare(ride.rideId, fare.totalFare)))
case None =>
// Remeber the fare to be used when the ride with the same ride ID arrives
(TaxiState(state.rideState, state.fareState + (fare.rideId -> fare)), None)
}
// Unknown message type - ignore
case MessageType.Empty =>
(state, None)
}
// Calculate new State and reply
val stateWithReply = processMessage()
// Persist state and send reply
Effect.persist(TaxiRideEvent(stateWithReply._1)).thenRun(state => cmd.reply ! stateWithReply._2)
}
As we have shown here, we can successfully use Akka Streams runtime (along with Akka cluster) for implementing reliable stateful streaming applications.
Learn more in this O'Reilly eBook: Serving Machine Learning Models
When it comes to scaling of the streaming applications, Flink and Akka Streams runtimes implement it completely differently.
In the case of Flink, an implementation is running inside the Flink cluster. As defined here, a Flink execution is split into several parallel instances for execution and each parallel instance processes a subset of the task’s input data. The number of parallel instances of a task is called its parallelism. So scaling of Flink based applications requires changing of parallelism–and potentially the amount and resources of the task manager, where applications are running.
Currently there is no way to rescale a Flink job dynamically. Any scaling of Flink’s job requires stopping a cluster and restarting it. In order to preserve execution state for such situations, Flink provides a savepoint mechanism, which is a way to create a consistent image of the execution state of a streaming job. So a scaling operation in Flink involves taking a savepoint, stopping the complete job and restarting it from this savepoint with new parallelism and potentially new resources. Currently Cloudflow does not support savepointing, so scaling Flink in Cloudflow leads to the loss of state.
In the case of Akka Streams, scaling is based on Kafka consumer groups, a set of consumers that cooperate to consume data from some topics. By default all instances of a given service belong to the same consumer group.
The partitions of all the topics are divided among the consumers in the group. As new group members arrive and old members leave, the partitions are re-assigned so that each member receives a proportional share of the partitions. This is known as “rebalancing the group”.
Because our implementation is leveraging Kafka-aware sharding, all of the actors for a given partition will be relocated to the instance receiving messages from this partition without losing state. As a result, scaling of the Akka streams based streaming stateful applications entails adding/removing a new instance of an application.
Scaling with Akka Streams happens with no interruption to the execution.
In this blog post we have looked at some of the features of Flink and Akka Streams runtimes for Cloudflow and showed how they can be implemented. Both runtimes have their strengths and weaknesses, and we hope that examples in this blog post will allow you to make a more informed choice, while building your own implementations.
1 Compare to Map Reduce architecture ?
2 The only state in our implementation is contained in actors, as a result persisting the state of actors is sufficient to make overall implementation persistent. ?
3 Without snapshotting, an actor’s restore will replay back all of the messages (commands) sent to the actor, which can take a considerable amount of time. ?
4 Kafka partitions are the basic unit of scalability (see below), that is why using partition-based actors is an optimal actor’s granularity. ?