23 October 2018
Akka stack and high-performance microservices
1
Setting up the backstage�long time ago, in a galaxy far, far away
Do not refresh this file
2
Redmart delivery slots
Constraints:
3
Live load test demo�88 mph
Do not refresh this file
4
Contents
Live loadtest demo
Akka-streams
Akka-persistence
Akka cluster and actor sharding
Distributed data
Overview of microservices ecosystem
Q&A
Akka-http
Do not refresh this file
5
Capacity services - Overview
6
Capacity services - Technology map
Akka-streams:
Akka-persistence:
Akka-cluster Singleton:
Akka-cluster Sharding:
Akka Distributed Data:
7
Akka-streams�the spice must flow
Do not refresh this file
8
Akka streams - overview
Docs:
The purpose is to offer an intuitive and safe way to formulate stream processing setups [and] execute them efficiently and with bounded resource usage [...]. In order to achieve this our streams [...] need to be able to slow down producers [...]. This feature is called back-pressure and is at the core of the Reactive Streams initiative …
Features:
9
Akka streams - use in capacity services
10
Akka streams - use in capacity services
Advantages:
Disadvantages:
Caveats:
11
Akka streams - examples
// Functional combinators API
Flow[CapacityReservationRequest]
.via(serviceTimeValidator.flow)
.mapError(transformValidationExceptions)
.mapAsync(config.parallelism)(saveReservation)
.map(rsvId => CapacityReservation(rsvId))
// Graph API
override protected val flow: Flow[_, _, _] = {
import GraphDSL.Implicits._
Flow.fromGraph(GraphDSL.create() { implicit b =>
val queryAddress = b.add(Flow.mapAsync(1)(serviceApi.getAddress))
// other stages: broadcast, searchWindow, merge
// Graph
broadcast ~> queryAddress ~> merge.in0
broadcast ~> searchWindow ~> merge.in1
FlowShape(broadcast.in, merge.out) // Not limited to SingleIn-SingleOut - can be FanIn, FanOut, BidiFlow, etc.
})
}
12
Akka streams - examples
def parallelExecution: Flow[_, _, _] = {
val broadcast = Broadcast(3)
val collect = ZipWith[UUID, UUID, UUID, UUID](joinUUIDs)
// Both "reserve*" flows are executed in parallel, than merged
broadcast ~> Flow.map(_.id) ~> collect.in0
broadcast ~> reserveInFcFlow ~> collect.in1
broadcast ~> reserveInTransportFlow ~> collect.in2
FlowShape(broadcast.in, collect.out)
}
def stuck: Flow[_, _, _] = {
val buffer = Flow.buffer(1, OverflowStrategy.backpressure)
val queryCart = Flow.map(_.cartId).mapAsync(1)(cartService.getById)
val orElse =OrElse()
// without `buffer` doesn't run, as broadcast needs both ports to be pulled
broadcast.out(0) ~> Flow.map(_.items) ~> orElseBlock.in(0)
broadcast.out(1) ~> /* buffer ~>*/ queryCart ~> orElse.in(1)
FlowShape(broadcast.in, orElse.out)
}
13
Akka-HTTP�kessel run in 12 parsecs
Do not refresh this file
14
Akka http - overview
Docs:
It’s not a web-framework but rather a more general toolkit for providing and consuming HTTP-based services
Features:
15
Akka http - use in capacity services
16
Akka http - use in capacity services
Advantages:
Disadvantages:
Caveats:
17
Akka http - server example
// RootRouter
val log = LoggerFactory.getLogger(this.getClass) // or LazyLogging, or ScalaLogging
val logAndMonitor = withLog(log) & logRequestResult(LoggingMagnet(logRequestAndResponse)) & requestResponseMetrics()
val route: Route = handleExceptions(exceptionHandler) {
healthRouter.route ~ logAndMonitor {
pathPrefix("v1") { appV1Router.route } ~
pathPrefix("v2") { appV1Router.route }
}
}
// appV1Router
override def route: Route =
pathPrefix("order") {
get {
path(IntNumber) { orderId: Int => StatusCodes.Ok -> orderService.getById(orderId) }
} ~
post {
entity(as[CreateOrderRequest]) { request =>
complete { StatusCodes.Created -> orderService.createOrder(request) } }
}
}
18
Akka http - client example
// OnDemandClient
def getSlotAvailability(request: SlotAvailabilityRequest) =
makeRequest(Post(Endpoints.availability, request), handleResponse)
def handleResponse = {
case HttpResponse(OK, _, entity, _) => Unmarshal(entity).to[Response].map(Some)
case HttpResponse(NotFound, _, entity, _) =>
entity.discardBytes()
Future.successful(None)
}
// HTTPClientBase�protected def makeRequest[Result]
(request: HttpRequest, handleExpected: ResponseHandler[Result])
(implicit timeout: FiniteDuration): Future[Result] = {
Future.firstCompletedOf(Seq(
Http().singleRequest(request),
timeoutFuture(timeout)
))
.flatMap(handleExpected orElse defaultHandler)
.recoverWith(convertExceptions andThen Future.failed)
}
19
Akka-persistence
make it so
Do not refresh this file
20
Akka persistence - overview
Docs:
Akka persistence enables stateful actors to persist their internal state so that it can be recovered when an actor is started, restarted after a JVM crash or by a supervisor, or migrated in a cluster. The key concept [...] is that only changes to an actor’s internal state are persisted but never its current state directly [...]. These changes are only ever appended to storage [...]
Features:
21
Akka persistence - use in capacity services
22
Akka persistence - use in capacity services
Advantages:
Disadvantages:
Caveats:
23
Akka persistence - persistent actor example
class ShiftScheduleActor(shiftName: String) extends PersistentActor {
protected var state: Option[ActorState] = None
override def persistenceId: String = s"shift-$shiftName" // MUST be unique
override def receiveRecover: Receive = {
case SnapshotOffer(_, snapshot: ShiftScheduleSnapshot) => state = Some(ActorState.fromSnapshot(snapshot))
case evt: ShiftScheduleEvent => handleEvent(evt)
case RecoveryCompleted => log.info("Recovery completed") // or other tasks
}
override def receiveCommand: Receive = {
case MakeReservation(id, someOtherData) => persist(ReservationMade(id, someOtherData))(handleEvent)
case CancelReservation(id) => persist(ReservationCancelled(id))(handleEvent)
}
protected def handleEvent(event: ShiftScheduleEvent): Unit = event match {
case ReservationMade(rsv) => state = reservationMade(state, rsv)
case ReservationCancelled(rsv) => state = reservationCancelled(state, rsv)
}
}
24
Detour: event-sourcing and schema migration (+rollback)
In-app techniques:
Out-of-band techniques:
Radical:
Some rollback strategies require certain sequence and content of releases - plan releases beforehand.
Rollback strategies:
Trick: can mix both approaches to some extent
25
Cluster and actor sharding
the Grid
Do not refresh this file
26
Reasoning about distributed systems: Consistency models
We’ll mostly remain in the right branch of the diagram, as it describes single object consistency models.
For more details (and clickable version of the map) go to http://jepsen.io/consistency
27
Reasoning about distributed systems: PACELC theorem
Wikipedia:
PACELC theorem is an extension to the CAP theorem. It states that in case of network partitioning (P) in a distributed computer system, one has to choose between availability (A) and consistency (C) (as per the CAP theorem), but else (E), even when the system is running normally in the absence of partitions, one has to choose between latency (L) and consistency (C).
Relational databases are generally outside of this framework as they are normally not distributed, but employ different techniques for high availability and performance (i.e. read replicas, master-slave replication, hot reserve servers, etc.)
Examples:
Some DBs (e.g. Cassandra, Mongo) support E+L vs. E+C choice to be made by the application - i.e. specify “read concern” and/or “write concern”
28
Akka cluster Singleton - overview
Docs:
For some use cases it is convenient and sometimes also mandatory to ensure that you have exactly one actor of a certain type running somewhere in the cluster. [...] Using a singleton should not be the first design choice. It has several drawbacks, such as single-point of bottleneck. Single-point of failure is also a relevant concern, [...]
Features:
29
Akka cluster Singleton - use in capacity services
30
Akka cluster Singleton - use in capacity services
Advantages:
Disadvantages:
Caveats:
Overall: really stick to the recommendation given in the docs - singleton should not be your first design choice.
31
Akka cluster Singleton - code example
// starts and supervises the singleton
system.actorOf(
ClusterSingletonManager.props(
singletonProps = ScheduleActor.props(...),
terminationMessage = SingletonTermination(), // caveat: call by value - don’t put DateTime.now or similar here
settings = ClusterSingletonManagerSettings(system)
),
name = "ScheduleActor"
)
// automatically resolves singleton's actorRef, buffers messages to singleton while it's down
val proxy = system.actorOf(
ClusterSingletonProxy.props(
settings = ClusterSingletonProxySettings(system),
singletonManagerPath = "/user/ScheduleActor"
)
)
// sending/asking to singleton via proxy
proxy ! Reserve(...)
(proxy ? Reserve(...)).mapTo[ReservationResult]
32
Akka cluster Sharding - overview
Docs:
Cluster sharding is useful when you need to distribute actors across several nodes in the cluster [...] without having to care about their physical location in the cluster, which might also change over time.
It could [...] be actors representing Aggregate Roots in Domain-Driven Design terminology. [...]. These actors typically have persistent (durable) state, but this feature is not limited to actors with persistent state.
Features:
33
Akka cluster Sharding - use in capacity services
34
Akka cluster Sharding - use in capacity services
Advantages:
Disadvantages:
Caveats:
35
Akka cluster Sharding - code example
// handles message routing
class MessageExtractor(private val numShards: Int) extends MessageExtractor {
override def entityId(msg: Any): String = msg match { case Envelope(id: ShiftReference, _) => s"${id.date}-${id.shift}"
override def entityMessage(msg: Any): Any = msg match { case Envelope(_, payload) => payload }
override def shardId(msg: Any): String =
msg match { case Envelope(id, _) => (id.hashCode % numShards).toString } // consistent hashing
}
// starts sharding
val shardRegion: ActorRef = ClusterSharding(context.system).start(
typeName = shardName, entityProps = ShiftScheduleActor.props(...),
messageExtractor = new MessageExtractor(ShardingConfig.numShards),
allocationStrategy = new LeastShardAllocationStrategy(...),
handOffStopMessage = HandoffStop() // caveat: call-by-value
)
// alternative way to get a reference to already created ShardRegion actor
val shardRegion: ActorRef = ClusterSharding(ctx.system).shardRegion(shardName)
shardRegion ! Envelope(shiftRef, message) // sends message to sharded actor
36
Distributed data
Voight-Kampff test
Do not refresh this file
37
Akka Distributed Data - overview
Docs:
Akka Distributed Data is useful when you need to share data between nodes in an Akka Cluster. The data is accessed with an actor providing a key-value store like API. The keys are unique identifiers with type information of the data values. The values are Conflict Free Replicated Data Types(CRDTs).
Features:
38
Akka Distributed Data - use in capacity services
39
Akka Distributed Data - use in capacity services
Advantages:
Disadvantages:
Caveats:
40
Akka Distributed Data - code example
val replicator: ActorRef = DistributedData(actorSystem).replicator
val dataKey: Key[LWWMap[_, _]] = LWWMapKey.create(s"ddata-capacityMap")
def saveCapacities(capacity: CapacitySheet): Future[Unit] = {
val updateRequest = Update(dataKey, LWWMap.empty, WriteLocal) {
lwwMap => capacity.capacities.foldLeft(lwwMap)(_ + _)
}
(replicator ? updateRequest).collect({
case success: UpdateSuccess => logger.info("Success")
case failure: UpdateFailure => throw DDataException(...)
})
}
def findCapacities(keys: Set[BucketKey]): Future[CapacitySheet] = {
(replicator ? Get(dataKey, ReadLocal)).collect({
case success: GetSuccess => CapacitySheet(success.get(dataKey).entries.filterKeys(keys contains _))
case failure => throw DDataException(...)
})
}
41
Recap
where we’re going we don’t need roads
Do not refresh this file
42
Capacity services - Technology map
Akka-streams:
Akka-persistence:
Akka-cluster Singleton:
Akka-cluster Sharding:
Akka Distributed Data:
43
Reference
44
Questions?
42
Do not refresh this file
45
Thank you
hasta la vista, baby
Do not refresh this file
46