Bora Kaplan

Data Engineer

Akka Stream Coexistence With Akka Typed

Posted at — May 19, 2020

Akka Typed is the new API to write type-safe actors. The untyped API is now referred as Classic Actors so that means the typed API is the future of all Akka projects. As it has been production-ready for a while I started using them on production. The experience has been great so far. Now I can actually use the strong type system of Scala to have more maintainable projects.

In this post we will be writing a basic streaming application. The example I choose is really simple just to keep the business logic small. So that we can focus on techniques and actual tools that we are going to use. Its basically a data collection stream that a data engineer has to write quite often.

The example is built upon spending data. The stream will read expense events which have a description and a spending amount in USD. We are going to enrich the data by including the amount in TRY and EUR. There will be an actor that fetches current conversion rates from an external API and updates itself given a time interval. The stream will just parse the event, enrich it with different currencies, publish the results into another Kafka topic.

Stream Flow

Akka Typed Introduction

The new API introduces a lot of changes in the actor environment. Starting from ActorSystem to the ActorRef itself there are a lot of new features which are finally type safe. Some of the important changes are:

As you can see there are quite a lot of changes in the new Typed API. But the best part is, you can actually start using this new API without rewriting anything on your current codebase. There is a package called akka.actor.typed.scaladsl.adapter which includes implicit methods to seamlessly switch between typed and untyped. For example you can use actorSystem.toClassic to get an untyped instance of your actor system so that you can use it somewhere else like in an Akka Stream.

Lets write some code now to actually see these changes in action.

You can find the full example on Github.

Master Actor

This example uses actors to hold and update the state of currency conversion rates. The actor hierarchy is composed of two different actors. Master Actor and Currency Actor. There will be only one Master Actor and multiple Currency Actors. The currency actors will be created by our stream depending on the number of Kafka Partitions. So there will be one Currency Actor per partition.

Master actor is responsible for spawning and supervising currency actors. It will update their state periodically via sending an HTTP request to an external API, then send the response to each Currency Actor. Currency Actors will just do one thing and that is serving the current conversion rates given to them. Our stream will use this actor system to get the latest conversion rates so that it can enrich the data.

Master Actor is a great example of the new typed actors. It uses plenty of new features of the new API. Lets start by creating its protocol.

sealed trait Command

case class UpdatedRates(conversionRates: ConversionRates)                          extends Command
case class ChildTerminated(child: ActorRef[CurrencyActor.Command], partition: Int) extends Command
case class SpawnCurrencyActor(partition: Int, replyTo: ActorRef[SpawnResponse])    extends Command
case object TriggerUpdate                                                          extends Command

case class SpawnResponse(currencyActor: ActorRef[CurrencyActor.Command])

def apply(initialRates: ConversionRates,
          refreshInterval: FiniteDuration,
          stateUpdater: () => Future[ConversionRates]): Behavior[Command] = ???

Being able to use sum types for receiving messages is my favorite thing about the new API. Finally we can have a typed receive method instead of PartialFunction[Any, Unit]. This actor does quite a bit so there are a few types of messages that we need to handle. We have to update our state so that we can update currency actors, spawn and watch currency actors and schedule a timer to trigger the update. Instead of using Props to get the dependencies, we are just using function parameters. We also don’t have a var to hold the state. We will just create the updated version of this actor once we update the state. Yet another point for type safety!

In Akka Typed, we define actors by a Behavior[T]. A behavior is a blueprint that states how the actor should be constructed and behave. To do this we use factory methods from Behaviors object. Lets see some examples.

Behaviors.supervise[Command] {
  Behaviors.withTimers { timer: TimerScheduler[Command] =>
    timer.startTimerWithFixedDelay(TriggerUpdate, refreshInterval)
    onMessage(currentRates = initialRates, children = Map.empty)
  }
}.onFailure(
  SupervisorStrategy.restartWithBackoff(minBackoff = 100.millis, maxBackoff = 30.second, randomFactor = 0.2)
)

You can wrap multiple Behaviors in each other. We used supervise to get .onFailure method and withTimers to get the timer: TimerScheduler[Command]. We set a timer to send the actor itself TriggerUpdate message given a refresh interval. This will trigger the state update part which we will define briefly. Notice that even the timer itself is now typed! All these have the type parameter of Command which is the type of message that we declared in the protocol.

Now lets define our typed receive function onMessage where we actually apply pattern matching to an incoming Command.

def onMessage(currentRates: ConversionRates,
              children: Map[Int, ActorRef[CurrencyActor.Command]]): Behavior[Command] = 

  Behaviors.receive { case (context, message) => 
    message match {
      case SpawnCurrencyActor(partition, replyTo) =>
        val newActor = context.spawn(CurrencyActor(currentRates), s"currency-actor-$partition")
        replyTo ! SpawnResponse(newActor)
        context.watchWith(newActor, ChildTerminated(newActor, partition))
        onMessage(currentRates, children + (partition -> newActor))
      case TriggerUpdate =>
        context.pipeToSelf(stateUpdater())(
          _.fold(_ => UpdatedRates(currentRates), newRates => UpdatedRates(newRates))
        )
        Behaviors.same
      case UpdatedRates(newRates) =>
        children.values.foreach(_ ! CurrencyActor.UpdateRates(newRates))
        onMessage(newRates, children)
      case ChildTerminated(_, partition) =>
        onMessage(currentRates, children - partition)
    }
  }

Master actor is responsible of creating currency actors when asked. You can see it in first message. After creating the actor we watch it using a custom termination message which is ChildTerminated. We need to get an actor ref from the message so we can reply back. There is no sender() in typed API because in typed world we cannot know who this sender() is and what kind of message it expects.

The scheduler in the previous snippet was set up to send itself TriggerUpdate message. When we receive this message we use stateUpdater to update our state, which is just a function that calls an external API with HTTP. It returns a Future so we need a way to handle the async nature of this situation. As usual we use messages to update the state when the Future has been completed. pipeToSelf function comes handy in this situation. We send ourselves a message with either existing state or new state depending on the future’s completion. When we receive this new state with UpdatedRates we broadcast the new state to currency actors.

We have to return a Behavior after handling a message. You can use helpers from Behaviors which include useful ones like Behaviors.same and Behaviors.stopped. If you want to change actor state you need to call your function again. There is one important detail here regarding the usage of recursion. We are using onMessage again instead of apply to create the new behavior of our actor. If we would use apply it would still work but the supervision and the timer would be set again. In that case whenever we updated the actor when timer triggered, apply function would create another timer and cancel the existing one. This is just an unnecessary overhead that we need to be careful of. Just update the part of your implementation where its required.

Currency Actor

Most of the work is done by master actor already. All there is left to do for currency actor is to just serve the state it has been given.

sealed trait Command
case class RetrieveRates(replyTo: ActorRef[CurrencyResponse]) extends Command
case class UpdateRates(conversionRates: ConversionRates)      extends Command

case class CurrencyResponse(conversionRates: ConversionRates)

def apply(conversionRates: ConversionRates): Behavior[Command] =
  Behaviors.receiveMessage {
    case RetrieveRates(replyTo) =>
      replyTo ! CurrencyResponse(conversionRates)
      Behaviors.same
    case UpdateRates(newRates) =>
      apply(newRates)
  }

Currency actor’s protocol is really simple. One message to provide the conversion rates when it is asked, one message to get updated conversion rates from master actor, then a response that contains current conversion rates.

We are done with actors. Now lets create our stream where they will come handy.

Creating Stream Sources, Sinks and Flows

We are going to consume messages from a Kafka topic and then produce messages to another one. For this we need to create Sources and Sinks for our stream. Using Alpakka Kafka we can do that easily. Notice that we are going to use some type aliases for this demonstration so that the code is easier to read.

We are going to create a Source for each Kafka partition. So that we can have the best scalability possible. Consumer.committablePartitionedSource gives us just what we need. We are also going to need a committer sink so that we can commit the messages we read.

type KafkaMessage = ConsumerMessage.CommittableMessage[String, String]

def makeSource: Source[(TopicPartition, Source[KafkaMessage, NotUsed]), Consumer.Control] =
  Consumer.committablePartitionedSource(makeConsumerSettings, Subscriptions.topics(config.sourceTopic))

def makeSink: Sink[ConsumerMessage.Committable, Future[Done]] =
  Committer.sink(CommitterSettings(system))

def makeConsumerSettings: ConsumerSettings[String, String] =
  ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
    .withBootstrapServers(config.hosts)
    .withClientId(config.clientId)
    .withGroupId(config.groupId)

def makeProducerSettings: ProducerSettings[String, String] =
  ProducerSettings(system, new StringSerializer, new StringSerializer)
    .withBootstrapServers(config.hosts)

Now lets create our flows. Remember the diagram. First we are going to have to map incoming json events into a model. If parsing is successful, we move forward with the event itself and its offset. If it fails we just keep the offset to commit it in the next flow.

case class ExpenseEvent(id: String, amount: BigDecimal, description: String, timestamp: Long)

type KafkaEvent    = (CommittableOffset, ExpenseEvent)
type KafkaEnvelope = ProducerMessage.Envelope[String, String, CommittableOffset]
type KafkaSource   = Source[ConsumerMessage.CommittableMessage[String, String], NotUsed] 

val parser: Flow[KafkaMessage, KafkaEvent, NotUsed] =
  Flow[KafkaMessage]
    .map { event =>
      Try(event.record.value.parseJson.convertTo[ExpenseEvent])
        .fold(
          _           => Left(event.committableOffset),
          parsedEvent => Right(event.committableOffset -> parsedEvent)
        )
    }
    .divertLeft(invalidEventsCommitter)

We use Either to handle both cases of parsing. So that we can divert the stream to another flow if parsing is unsuccessful. Either usage will not be leaked everywhere thanks to this wonderful diversion technique. Which is explained by @bszwej.

implicit class FlowOps[A, L, R, M](flow: Flow[A, Either[L, R], M]) {
    def divertLeft(diversionFlow: Graph[SinkShape[Either[L, R]], M]): Flow[A, R, M] =
      flow.via {
        Flow[Either[L, R]]
          .divertTo(diversionFlow, _.isLeft)
          .collect { case Right(right) => right }
      }
  }

val invalidEventsCommitter: Sink[Either[CommittableOffset, _], NotUsed] =
  Flow[Either[CommittableOffset, _]]
    .collect { case Left(offset) => offset }
    .to(committerSink)

Now that we have the data we could use, lets enrich it using the state on our actor. The currency actor holds conversion rates based on USD. Lets ask currency actor for current conversion rates and enrich our data with amount in EUR and TRY.

def enricher(currencyActor: ActorRef[CurrencyActor.Command]): Flow[KafkaEvent, KafkaEnvelope, NotUsed] =
  Flow[KafkaEvent]
    .mapAsyncUnordered(1) { case (offset, event) =>
      currencyActor.ask(RetrieveRates)
        .map { currencyResponse =>
          val enrichedEvent = event.enrich(currencyResponse.conversionRates.rates).toJson.compactPrint
          val record = new ProducerRecord(destinationTopic, event.id, enrichedEvent)
          ProducerMessage.single(record, offset)
        }
    }

Enricher flow is created per Kafka partition because there is a Source per partition. Same thing goes for the currency actor. Each source for given partition has their own currency actor. Using a single actor for whole stream might have led to a bottleneck.

You can see that ask works just like the classic API. On the surface it might look the same but under the hood there is a lot of type inference going on. Here is the same code with all the types explicitly annotated.

val response: Future[CurrencyResponse] = currencyActor.ask[CurrencyResponse] {
  replyTo: ActorRef[CurrencyResponse] => RetrieveRates(replyTo)
}

After enriching our data the only thing left to do is publishing this as a message to another Kafka topic. Lets create the Kafka Producer as a Flow instead of a Sink. We need to do this so that we can commit the messages we read using committer sink as a final step of our graph.

val producer: Flow[KafkaEnvelope, CommittableOffset, NotUsed] =
  Producer
    .flexiFlow[String, String, CommittableOffset](makeProducerSettings)
    .map(_.passThrough)

Putting it all together, here is how our stream looks like.

val partitionedKafkaSource = makeSource
val committerSink = makeSink

def run: Future[Done] =
  partitionedKafkaSource
    .mapAsyncUnordered(1)(spawnActors)
    .map { case (currencyActor, kafkaSource) =>
      kafkaSource via parser via enricher(currencyActor) via producer runWith committerSink
    }
    .runWith(Sink.ignore)

val spawnActors: ((TopicPartition, KafkaSource)) => Future[(ActorRef[CurrencyActor.Command], KafkaSource)] = {
  case (kafkaPartition, kafkaSource) =>
    masterActor.ask(SpawnCurrencyActor(kafkaPartition.partition, _))
      .map { case SpawnResponse(currencyActor) => currencyActor -> kafkaSource }
}

Starting the Stream

Lets finalize all this by putting it all together to start the stream.

import akka.actor.typed.scaladsl.adapter._

def main(args: Array[String]): Unit = {
  val config = ConfigSource.default.loadOrThrow[Config]

  implicit val system: ActorSystem[SpawnProtocol.Command] = ActorSystem(SpawnProtocol(), "expense-stream")
  implicit val classicSystem: actor.ActorSystem = system.toClassic
  implicit val ec: ExecutionContext = system.executionContext
  implicit val timeout: Timeout = Timeout(10.seconds)

  val stateUpdater: () => Future[ConversionRates] = () => getLatestRates(config.apiToken)

  for {
    initialState   <- getLatestRates(config.apiToken)
    masterActor    <- spawnMasterActor(initialState, config.refreshInterval, stateUpdater)
    expenseStream   = new ExpenseStream(new KafkaIO(config.kafka), masterActor)
    streamResult   <- expenseStream.run
  } yield streamResult
}

You can see the adapter method toClassic to convert typed actor system into untyped version. Its really that simple to have both typed and untyped stuff coexist with each other without a problem.

With typed API we have to provide the behavior of our typed Actor System. For this use case, we just need to use the system to spawn our master actor. To do this we use SpawnProtocol so we can ask it to create us a new actor via SpawnProtocol.Spawn message. You can think of SpawnProtocol’s behavior as how the actor systems behaved before typed API. This is how it’s used:

system.ask(
  SpawnProtocol.Spawn(
    MasterActor(initialState, refreshInterval, stateUpdater),
    "master-actor",
    Props.empty,
    _
  )
)

Summary

We have implemented a great data collector example that is type safe, scalable and maintainable. We have created a stream that is resilient against errors, scalable to be able to handle high throughput. We have created an actor system that holds our state and scales well with the stream it is attached to. All around a great Scala and Akka example for data collection.

Thank you for taking time to read this post. There is a lot of code to read. But I hope that you got a feeling for the new typed API. I absolutely love it and use it over the classic API whenever I can. I suggest you to do the same.

You can find the full example on Github.