1 of 46

23 October 2018

Akka stack and high-performance microservices

1

2 of 46

Setting up the backstage�long time ago, in a galaxy far, far away

Do not refresh this file

2

3 of 46

Redmart delivery slots

Constraints:

  • “Synchronous” process - can only proceed to the next phase if “Reserve” was successful
  • Must keep promises - if reservation was confirmed there is truly no way to tell “sorry, we couldn’t proceed with your order” (because SOP)
  • Overbooking is costly (overtime costs, service quality risks).
  • Grid is displayed for 7 days
  • SLAs are as follows:
    • Latency <200ms 99th percentile
    • 99.95% availability

3

4 of 46

Live load test demo�88 mph

Do not refresh this file

4

5 of 46

Contents

Live loadtest demo

Akka-streams

Akka-persistence

Akka cluster and actor sharding

Distributed data

Overview of microservices ecosystem

Q&A

Akka-http

Do not refresh this file

5

6 of 46

Capacity services - Overview

6

7 of 46

Capacity services - Technology map

Akka-streams:

  • Parallel processing of multiple outgoing requests
  • Backpressure

Akka-persistence:

  • Event-sourcing

Akka-cluster Singleton:

  • Lock-free “serialized” write access to the in-memory state

Akka-cluster Sharding:

  • Lock-free “serialized” write access to the in-memory state

Akka Distributed Data:

  • Fast eventually-consistent read access to an in-memory replicated state (+a bit of CQRS)

7

8 of 46

Akka-streams�the spice must flow

Do not refresh this file

8

9 of 46

Akka streams - overview

Docs:

The purpose is to offer an intuitive and safe way to formulate stream processing setups [and] execute them efficiently and with bounded resource usage [...]. In order to achieve this our streams [...] need to be able to slow down producers [...]. This feature is called back-pressure and is at the core of the Reactive Streams initiative …

Features:

  • Back pressure as a first-class concept
  • Explicit buffering and rate control
  • Runs atop actors - same concurrent execution guarantees and benefits
  • Works with both Actors and Futures
  • Streams are only a “recipe” for computation, actual execution is happened when they are materialized and run
  • Stream fusion - transparently merges multiple “synchronous” stages into a single execution unit - avoids unnecessary async boundaries

9

10 of 46

Akka streams - use in capacity services

10

11 of 46

Akka streams - use in capacity services

Advantages:

  • Concurrent execution without usual concurrency caveats and performance pitfalls
  • Backpressure prevents from overloading slow components
  • Similar to simulink/matlab - people with hardware/embedded background like it.

Disadvantages:

  • Built-in logging is somewhat insufficient - need to throw in custom logging stages
  • Complex graphs are complex
  • Debugging is hard - “step into/out” almost never makes sense

Caveats:

  • Need to think “right to left” - if consumer does not pull, producer is not executed
  • Materialization is costly
  • Easy to convert sync and async functions to Flow - just use Flow.map or Flow.mapAsync�Hard and expensive to go backward - need stream materialization

11

12 of 46

Akka streams - examples

// Functional combinators API

Flow[CapacityReservationRequest]

.via(serviceTimeValidator.flow)

.mapError(transformValidationExceptions)

.mapAsync(config.parallelism)(saveReservation)

.map(rsvId => CapacityReservation(rsvId))

// Graph API

override protected val flow: Flow[_, _, _] = {

import GraphDSL.Implicits._

Flow.fromGraph(GraphDSL.create() { implicit b =>

val queryAddress = b.add(Flow.mapAsync(1)(serviceApi.getAddress))

// other stages: broadcast, searchWindow, merge

// Graph

broadcast ~> queryAddress ~> merge.in0

broadcast ~> searchWindow ~> merge.in1

FlowShape(broadcast.in, merge.out) // Not limited to SingleIn-SingleOut - can be FanIn, FanOut, BidiFlow, etc.

})

}

12

13 of 46

Akka streams - examples

def parallelExecution: Flow[_, _, _] = {

val broadcast = Broadcast(3)

val collect = ZipWith[UUID, UUID, UUID, UUID](joinUUIDs)

// Both "reserve*" flows are executed in parallel, than merged

broadcast ~> Flow.map(_.id) ~> collect.in0

broadcast ~> reserveInFcFlow ~> collect.in1

broadcast ~> reserveInTransportFlow ~> collect.in2

FlowShape(broadcast.in, collect.out)

}

def stuck: Flow[_, _, _] = {

val buffer = Flow.buffer(1, OverflowStrategy.backpressure)

val queryCart = Flow.map(_.cartId).mapAsync(1)(cartService.getById)

val orElse =OrElse()

// without `buffer` doesn't run, as broadcast needs both ports to be pulled

broadcast.out(0) ~> Flow.map(_.items) ~> orElseBlock.in(0)

broadcast.out(1) ~> /* buffer ~>*/ queryCart ~> orElse.in(1)

FlowShape(broadcast.in, orElse.out)

}

13

14 of 46

Akka-HTTP�kessel run in 12 parsecs

Do not refresh this file

14

15 of 46

Akka http - overview

Docs:

It’s not a web-framework but rather a more general toolkit for providing and consuming HTTP-based services

Features:

  • Supports WebSockets and HTTPS
  • Works great with streams and actors
  • Uses akka-streams internally, hence supports backpressure
  • API definitions are composable
  • Out of the box streaming both in server and client APIs
  • Opt-in caching out of the box
  • Easily extendable
  • Low overhead

15

16 of 46

Akka http - use in capacity services

16

17 of 46

Akka http - use in capacity services

Advantages:

  • Composability eases handling cross-cutting concerns (logging, metrics, etc.)
  • Backpressure avoids overloading downstream services
  • Can be plugged directly into akka-stream, however the default integration method presumes stream materialization on every request

Disadvantages:

  • Harder to find an endpoint compared to Play’s routes file

Caveats:

  • Watch out for akka version compatibility (minor versions not 100% compatible)
  • Client must consume or discard the entire response body even if it doesn’t need it - otherwise backpressures incoming data and request becomes stale

17

18 of 46

Akka http - server example

// RootRouter

val log = LoggerFactory.getLogger(this.getClass) // or LazyLogging, or ScalaLogging

val logAndMonitor = withLog(log) & logRequestResult(LoggingMagnet(logRequestAndResponse)) & requestResponseMetrics()

val route: Route = handleExceptions(exceptionHandler) {

healthRouter.route ~ logAndMonitor {

pathPrefix("v1") { appV1Router.route } ~

pathPrefix("v2") { appV1Router.route }

}

}

// appV1Router

override def route: Route =

pathPrefix("order") {

get {

path(IntNumber) { orderId: Int => StatusCodes.Ok -> orderService.getById(orderId) }

} ~

post {

entity(as[CreateOrderRequest]) { request =>

complete { StatusCodes.Created -> orderService.createOrder(request) } }

}

}

18

19 of 46

Akka http - client example

// OnDemandClient

def getSlotAvailability(request: SlotAvailabilityRequest) =

makeRequest(Post(Endpoints.availability, request), handleResponse)

def handleResponse = {

case HttpResponse(OK, _, entity, _) => Unmarshal(entity).to[Response].map(Some)

case HttpResponse(NotFound, _, entity, _) =>

entity.discardBytes()

Future.successful(None)

}

// HTTPClientBase�protected def makeRequest[Result]

(request: HttpRequest, handleExpected: ResponseHandler[Result])

(implicit timeout: FiniteDuration): Future[Result] = {

Future.firstCompletedOf(Seq(

Http().singleRequest(request),

timeoutFuture(timeout)

))

.flatMap(handleExpected orElse defaultHandler)

.recoverWith(convertExceptions andThen Future.failed)

}

19

20 of 46

Akka-persistence

make it so

Do not refresh this file

20

21 of 46

Akka persistence - overview

Docs:

Akka persistence enables stateful actors to persist their internal state so that it can be recovered when an actor is started, restarted after a JVM crash or by a supervisor, or migrated in a cluster. The key concept [...] is that only changes to an actor’s internal state are persisted but never its current state directly [...]. These changes are only ever appended to storage [...]

Features:

  • Eventsourcing
  • State is always in memory (in an Actor) - no need to read from DB first
  • Multiple storage plugins
  • At-Least-Once delivery for actor messaging

21

22 of 46

Akka persistence - use in capacity services

22

23 of 46

Akka persistence - use in capacity services

Advantages:

  • Very high throughput and low latency, as application state stays in memory
  • All the eventsourcing advantages - immutability, easier switch to data streaming architectures, works well with CQRS etc.

Disadvantages:

  • No state records in the DB - can’t just look it up
  • All the eventsourcing disadvantages: harder (than state-sourcing) schema evolution, data size concerns, memory footprint concerns, etc.

Caveats:

  • Pick serialization plugin carefully - very hard to change
  • Before v0.85 eager initialization was partially broken
  • Persistence failure forces actor to stop (bypasses supervisor)

23

24 of 46

Akka persistence - persistent actor example

class ShiftScheduleActor(shiftName: String) extends PersistentActor {

protected var state: Option[ActorState] = None

override def persistenceId: String = s"shift-$shiftName" // MUST be unique

override def receiveRecover: Receive = {

case SnapshotOffer(_, snapshot: ShiftScheduleSnapshot) => state = Some(ActorState.fromSnapshot(snapshot))

case evt: ShiftScheduleEvent => handleEvent(evt)

case RecoveryCompleted => log.info("Recovery completed") // or other tasks

}

override def receiveCommand: Receive = {

case MakeReservation(id, someOtherData) => persist(ReservationMade(id, someOtherData))(handleEvent)

case CancelReservation(id) => persist(ReservationCancelled(id))(handleEvent)

}

protected def handleEvent(event: ShiftScheduleEvent): Unit = event match {

case ReservationMade(rsv) => state = reservationMade(state, rsv)

case ReservationCancelled(rsv) => state = reservationCancelled(state, rsv)

}

}

24

25 of 46

Detour: event-sourcing and schema migration (+rollback)

In-app techniques:

  • Upcast/convert in the application (akka take on this: EventAdapter)
  • Weak schema - simplest and works great for backward compatible changes

Out-of-band techniques:

  • Update in the persistence storage - similar to “classical” database migrations
  • Copy-transform - extract events, transform as necessary (upcast/convert/delete) and load into a different store/collection/stream

Radical:

  • Create a completely new version of microservice, if necessary - run it in parallel for some time to accumulate data and/or migrate old data

Some rollback strategies require certain sequence and content of releases - plan releases beforehand.

Rollback strategies:

  • Persisted events are accurate representation of reality - no rollback, but hotfix
  • Persisted events are wrong and should be seen as they never happened - erase from event store (i.e. restore from backup) or ignore in the application (caveat: “new” and “old” events might be intermixed)

Trick: can mix both approaches to some extent

  • Find “last good” snapshot
  • “Copy” the snapshot as if it is the latest system state

25

26 of 46

Cluster and actor sharding

the Grid

Do not refresh this file

26

27 of 46

Reasoning about distributed systems: Consistency models

We’ll mostly remain in the right branch of the diagram, as it describes single object consistency models.

For more details (and clickable version of the map) go to http://jepsen.io/consistency

27

28 of 46

Reasoning about distributed systems: PACELC theorem

Wikipedia:

PACELC theorem is an extension to the CAP theorem. It states that in case of network partitioning (P) in a distributed computer system, one has to choose between availability (A) and consistency (C) (as per the CAP theorem), but else (E), even when the system is running normally in the absence of partitions, one has to choose between latency (L) and consistency (C).

Relational databases are generally outside of this framework as they are normally not distributed, but employ different techniques for high availability and performance (i.e. read replicas, master-slave replication, hot reserve servers, etc.)

Examples:

  • MongoDB: P+A, E+C
  • Cassandra: P+A, E+L
  • DynamoDB: P+A, E+L
  • Hazelcast: P+A, E+L/C

Some DBs (e.g. Cassandra, Mongo) support E+L vs. E+C choice to be made by the application - i.e. specify “read concern” and/or “write concern”

28

29 of 46

Akka cluster Singleton - overview

Docs:

For some use cases it is convenient and sometimes also mandatory to ensure that you have exactly one actor of a certain type running somewhere in the cluster. [...] Using a singleton should not be the first design choice. It has several drawbacks, such as single-point of bottleneck. Single-point of failure is also a relevant concern, [...]

Features:

  • Guaranteed to have single instance somewhere in the cluster
  • Automatic restart if node fails
  • All the nodes in the cluster can send messages to the singleton via proxies
  • Consistency model: Linearizable
  • PACELC: P+C, E+C

29

30 of 46

Akka cluster Singleton - use in capacity services

30

31 of 46

Akka cluster Singleton - use in capacity services

Advantages:

  • Strong consistency model - can be used to “serialize” access to certain state or resource without any locking (in application code)

Disadvantages:

  • Performance bottleneck
  • Single point of failure
  • Can’t scale out
  • Only one instance runs in an entire cluster - other nodes are lazy (unless doing something else, i.e. readonly replicas in CQRS fashion)

Caveats:

  • Can’t rely on singleton to be always up - takes time to detect failure and recover
  • Need to be careful with node downing
  • ??? Crashes and does not restart if persistence fails

Overall: really stick to the recommendation given in the docs - singleton should not be your first design choice.

31

32 of 46

Akka cluster Singleton - code example

// starts and supervises the singleton

system.actorOf(

ClusterSingletonManager.props(

singletonProps = ScheduleActor.props(...),

terminationMessage = SingletonTermination(), // caveat: call by value - don’t put DateTime.now or similar here

settings = ClusterSingletonManagerSettings(system)

),

name = "ScheduleActor"

)

// automatically resolves singleton's actorRef, buffers messages to singleton while it's down

val proxy = system.actorOf(

ClusterSingletonProxy.props(

settings = ClusterSingletonProxySettings(system),

singletonManagerPath = "/user/ScheduleActor"

)

)

// sending/asking to singleton via proxy

proxy ! Reserve(...)

(proxy ? Reserve(...)).mapTo[ReservationResult]

32

33 of 46

Akka cluster Sharding - overview

Docs:

Cluster sharding is useful when you need to distribute actors across several nodes in the cluster [...] without having to care about their physical location in the cluster, which might also change over time.

It could [...] be actors representing Aggregate Roots in Domain-Driven Design terminology. [...]. These actors typically have persistent (durable) state, but this feature is not limited to actors with persistent state.

Features:

  • A collection of actors spread across all the nodes in the cluster
  • With akka-persistence allows durable “state” to migrate between nodes in the cluster
  • Sender need to know only “logical” location and agnostic about actual node running the actor
  • Consistency model: Sequential
  • PACELC: P+C, E+C

33

34 of 46

Akka cluster Sharding - use in capacity services

34

35 of 46

Akka cluster Sharding - use in capacity services

Advantages:

  • Still strong consistency model
  • Scales out easily - just add more nodes
  • No lazy nodes in the cluster

Disadvantages:

  • Rebalancing/crash recovery renders some entities not available for some time
  • Custom supervision is complicated - need to introduce an “intermediate” actor solely for supervision purposes

Caveats:

  • If used with persistence - must have unique persistence IDs
  • Sharding coordinator is a singleton
  • “Hot” entity might still be a bottleneck
  • Actors are spawned by a shard region, which runs in the /system actor space - and hence they also run in /system.

35

36 of 46

Akka cluster Sharding - code example

// handles message routing

class MessageExtractor(private val numShards: Int) extends MessageExtractor {

override def entityId(msg: Any): String = msg match { case Envelope(id: ShiftReference, _) => s"${id.date}-${id.shift}"

override def entityMessage(msg: Any): Any = msg match { case Envelope(_, payload) => payload }

override def shardId(msg: Any): String =

msg match { case Envelope(id, _) => (id.hashCode % numShards).toString } // consistent hashing

}

// starts sharding

val shardRegion: ActorRef = ClusterSharding(context.system).start(

typeName = shardName, entityProps = ShiftScheduleActor.props(...),

messageExtractor = new MessageExtractor(ShardingConfig.numShards),

allocationStrategy = new LeastShardAllocationStrategy(...),

handOffStopMessage = HandoffStop() // caveat: call-by-value

)

// alternative way to get a reference to already created ShardRegion actor

val shardRegion: ActorRef = ClusterSharding(ctx.system).shardRegion(shardName)

shardRegion ! Envelope(shiftRef, message) // sends message to sharded actor

36

37 of 46

Distributed data

Voight-Kampff test

Do not refresh this file

37

38 of 46

Akka Distributed Data - overview

Docs:

Akka Distributed Data is useful when you need to share data between nodes in an Akka Cluster. The data is accessed with an actor providing a key-value store like API. The keys are unique identifiers with type information of the data values. The values are Conflict Free Replicated Data Types(CRDTs).

Features:

  • Non-blocking masterless updates
  • Eventually consistent copy of the same data on every node in the cluster
  • Controlled consistency levels
  • Opt-in durable storage
  • Consistency model: depends, in our case Monotonic Reads + Monotonic Writes
  • PACELC: P+A, E+L

38

39 of 46

Akka Distributed Data - use in capacity services

39

40 of 46

Akka Distributed Data - use in capacity services

Advantages:

  • Blazingly fast read queries (with ReadLocal)
  • Concurrent non-blocking updates from anywhere in the cluster

Disadvantages:

  • Not suited for high-cardinality data (100K keys and more)
  • Limited to CRDT data types

Caveats:

  • ReadMajority+WriteMajority is not “no stale updates” - cluster membership changes might trick
  • Risk of OOM crashes with “medium”-cardinality + frequent-update data.

40

41 of 46

Akka Distributed Data - code example

val replicator: ActorRef = DistributedData(actorSystem).replicator

val dataKey: Key[LWWMap[_, _]] = LWWMapKey.create(s"ddata-capacityMap")

def saveCapacities(capacity: CapacitySheet): Future[Unit] = {

val updateRequest = Update(dataKey, LWWMap.empty, WriteLocal) {

lwwMap => capacity.capacities.foldLeft(lwwMap)(_ + _)

}

(replicator ? updateRequest).collect({

case success: UpdateSuccess => logger.info("Success")

case failure: UpdateFailure => throw DDataException(...)

})

}

def findCapacities(keys: Set[BucketKey]): Future[CapacitySheet] = {

(replicator ? Get(dataKey, ReadLocal)).collect({

case success: GetSuccess => CapacitySheet(success.get(dataKey).entries.filterKeys(keys contains _))

case failure => throw DDataException(...)

})

}

41

42 of 46

Recap

where we’re going we don’t need roads

Do not refresh this file

42

43 of 46

Capacity services - Technology map

Akka-streams:

  • Parallel processing of multiple outgoing requests
  • Backpressure

Akka-persistence:

  • Event-sourcing

Akka-cluster Singleton:

  • Lock-free “serialized” write access to the in-memory state

Akka-cluster Sharding:

  • Lock-free “serialized” write access to the in-memory state

Akka Distributed Data:

  • Fast eventually-consistent read access to an in-memory replicated state (+a bit of CQRS)

43

44 of 46

Reference

44

45 of 46

Questions?

42

Do not refresh this file

45

46 of 46

Thank you

hasta la vista, baby

Do not refresh this file

46