Published using Google Docs
262a lesson outline
Updated automatically every 5 minutes

high-level outline:

  1. motivation and background for declarative distributed systems programming (-0:20)
  2. the runtime systems P2 and JOL (0:30)
  3. code examples, small and large(1:00)
  4. dedalus: state and asynchrony(1:15)
  5. debugging and analysis(1:30)

  1. motivation
  1. problem
  1. actual target
  1. overlay specification (it was the early 2000s)
  1. “protocol centric” v. “structure centric” specs
  2. academic: chord, CAN, tapestry, pastry, narada, viceroy, TOR etc
  3. practical, ad-hoc: gnutella, kazaa, freenet
  1. update target:
  1. distributed systems are hard to program
  1. concurrency
  2. asynchrony and nondeterminism
  3. partial failure
  1. distributed systems are becoming more widespread
  1. cloud, datacenter scale
  2. chipscale
  1. goal: we need to make distributed systems programmable
  1. abstraction!
  2. the code needs to get a lot smaller as the systems get bigger.  BOOM.
  3. do we need a DSL for DS?  why not a general purpose lang?
  1. but this is a hollow statement -- soon all systems will be distributed!
  1. the “right” language for a domain is one which
  1. hides the details that don’t matter (the easy stuff)
  2. brings into focus those that do (the hard stuff)
  3. take a language like erlang
  1. little person model of computation
  2. deal with the complexity of concurrency by narrowing the focus and thinking actor-wise (protocol-centric).
  3. commit to a dispatch model (event-driven, message-matching)
  1. programmer doesn’t need to worry about synchronizing threads etc
  1. build in primitives for send, timeout, peer failure etc.
  1. domain-specific stuff
  1. observations -- (where we’re coming from)
  1. the tricky stuff about distributed systems is managing the data.  
  1. data comes in many forms:
  1. messages
  2. protocol state
  3. session state
  4. process state
  5. caches
  6. the “data” itself
  1. the state is
  1. diverse in representation
  2. physically distributed
  3. concurrently updated
  1. but querying (even distributed) data is supposed to be easy!
  1. even data-parallel programming is “embarrassingly” parallel
  2. (mostly) PTIME or less expressive, which is good because exponential messages are usually undesirable :)
  3. not actor-centric, but data-centric
  1. hide the complexity by describing operations on data sets (that may be applied to subsets in partitions, etc)
  2. compare this to implementing correct little actors whose distributed behavior is correct by composition
  1. (if the protocol is correct)
  1. we need to get out from under the “grisly rigor” of the von neumann  model
  1. ordered data, ordered instructions
  2. terrible fit for parallel and DS
  3. threaded programming: take an ordered recipe, break it into pieces (or make many copies) and run many instances of sequential code concurrently
  1. hopefully not stepping on each other’s feet.
  2. programmer has complete burden of reasoning about concurrency
  1. event-driven programming
  1. let the programmer think about one thing at a time
  2. need to worry that nothing you do “takes time”; need to ensure all blocking calls are asynchronous.
  1. our approach: unordered data, unordered instructions
  1. order injected where necessary
  2. order is one of the “hard” things
  1. make it explicit!
  1. helps: declarative language
  1. describes properties of output
  2. not a recipe!
  1. MapReduce is a data-centric paradigm
  1. not declarative per se, really macro-dataflow and micro-functional, but similarly motivated
  2. but mapreduce’s expressivity (and style) is tuned for analytics -- like SQL.
  1. you can’t implement MR in MR (not expressive)
  2. you hard-wire the dataflow (still too orderly!)
  1. possible goal: do for general distributed systems what SQL and MR do for analytics
  1. lessons from the history of databases
  1. codd’s equivalence between RC and RA
  1. SQL -- a (almost) pure FO language that describes the output is sufficient to produce an executable dataflow!
  2. there is a wide (too wide) space of choices.
  3. let the computer worry about the optimal execution: write the spec.
  1. I am not saying this is easy, just that it’s well researched
  1. uniform representation of state
  1. tables are sets: they abstract away internal representation
  1. choice of data structure, and perhaps more fundamentally,
  2. order
  1. this leads to a ‘disorderly’ programming style that avoids overspecification of order
  1. joins instead of nested for loops
  2. aggregate operations (with known properties like commutativity, distributivity, associativity, idempotence)
  1. as P2 authors point out, many dist. sys use tables anyway
  1. not too painful a change (in this domain anyway)
  1. does this work? can we express distributed systems as queries?
  1. sorta; we could write http, dns etc in SQL:
  1. create view http_response as select r.client, w.content from www_root w, http_request r where =;
  2. silly example.  but...
  3. static web serving is the join of requests and pages.
  1. there are potentially many pages and many concurrent requests.
  2. so what?
  3. details like threads, process pools, polling, etc are abstracted away.
  4. there are still hotspots, tough issues
  1. how to colocate processing and data.
  2. what data to replicate, how aggressively, etc  
  1. but it’s an optimization decision now!
  2. a model (to start out with):
  1. cardinality of input relations
  2. distribution of their values
  1. need iteration in the language to do more interesting things (eg routing)
  1. FO(LFP) is a good place to operate.  still PTIME.
  2. Datalog -- bottom-up, function-free, declarative subset of prolog.
  1. path(A, B) :- link(A, B).
  2. path(A, B) :- path(A, C), link(C, B).
  1. that’ll do!
  1. given known facts, make implications true till nothing happens (naive)
  1. link(1, 2). link(2, 3), link(3, 4).
  2. round 1:  path(1,2). path(2,3), path(3,4)
  3. round 2(paths of len 2): round 1 + path(1,3), path(2,4)
  4. round 3(len 3): round 1 + round 2 + path(1, 4).
  1. semi-naive -- use only deltas
  1. same as above, without the (already hidden) redundant derivations .


  1. the system(s) -- P2 and JOL
  1. dataflow runtime at each node
  1. pipeline of operators that consume and emit tuples
  1. typed inputs and outputs
  1. typical database operators
  1. select, project, join, anti-join
  2. aggregation (eg count, min/max, etc)
  1. and some other stuff
  1. recursion (dataflows are not DAGs anymore)
  2. networking
  3. timers -- sources (tree leaves) that produce tuples periodically
  1. by composition, big distributed dataflow
  2. evaluator loop
  1. observe
  1. batch up input, clock events
  1. think
  1. stop time, do “pure logical deduction” to fixpoint
  2. EDB is union of stored state and batch of input
  1. act
  1. send batched messages and perform batched side effects.  update state.
  1. GOTO 1
  1. datalog-based language on top
  1. program statements are implications
  2. and some other stuff
  1. syntax to annotate location specifiers
  2. special syntax/semantics for update, deletion, keys
  3. UDFs, UDAs, tables functions


  1. writing protocols in logic
  1. networking protocols
  1. Shortest paths
  1. you already saw the transitive closure of a graph.
  1. path(A, B, B, 1) :- link(A, B).
  2. path(A, B, N, C + 1) :- link(A, N), path(N, B, _, C).
  3. mincost(A, B, min<C>) :- path(A, B,_,  C).
  4. nexthop(A,B, N) :- path(A, B, N, C), mincost(A, B, C).
  1. issues?
  1. cycles in the graph?
  1. fix by storing the path prefix (eg, as a string using string functions)
  1. more than one ‘shortest’ path?
  1. NP here, there are just multiple “choices” in nexthop.
  2. but how does one ‘choose’ in datalog??  
  1. for another time, but will come up in dedalus
  1. ok, but let’s distribute it!  each node only starts with local routes
  1. path(@A, B, B, 1) :- link(@A, B).
  2. path(@A, B, N, C + 1) :- link(@A, N), path(@N, B, C).
  3. mincost(@A, B, min<C>) :- path(@A, B,_,  C).
  4. nexthop(@A,B, N) :- path(@A, B, N, C), mincost(@A, B, C).
  1. note that these are all local computations, except for (2).
  1. how do we implement 2?
  1. localization rewrite.
  1. each node sends their link table to their immediate neighbor, where it is stored in a ‘proxy link’ table
  2. (2) may then be evaluated as a local query at “N”.
  3. N sends the derived path tuples back to “A”.
  1. in JOL/dedalus/bloom, such rewrites are not primitive
  1. don’t pretend networking isn’t networking (eg, RPC)
  2. but feel free to define language constructs that “hide” it if you really want to.

  1. path in graphs is kid stuff. what about programming distributed systems!
  1. heartbeats (need timers)
  1. heartbeat(@Master, Peer, Nonce) :- timer(@Peer), master(@Peer, Master), Nonce = create_nonce().
  1. timeoutsBFS
  1. heartbeat_log(@Master, Peer, Nonce, Time) :- heartbeat(@Master, Peer, Nonce), Time = time().
  2. latest_hb(@Master, Peer, max<Time>) :- heartbeat_log(@Master, Peer, Nonce, Time).
  3. dead_node(@Master, Peer) :- duty_cycle(@Master), config(@Master, “hb_interval”, Interval),  latest_hb(@Master, Peer, Last), time() - Last > Interval.
  1. synchronization (need aggregation.  note that we’ve implemented multicast!)
  1. roll_call(@Peer, Master) :- members(@Master, Peer).
  2. ack(@Master, Peer) :- roll_call(@Peer, Master).
  3. peer_cnt(@Master, count<Peer>) :- members(@Master, Peer).
  4. ack_cnt(@Master, count<Peer>) :- ack(@Master, Peer).
  5. all_in(@Master) :- peer_cnt(@Master, Cnt), ack_cnt(@Master, Cnt).
  1. BUT can anyone read this?
  1. we can do better I think
  2. here’s one way: people are becoming accustomeed to thinking in comprehensions:
  1. path <= { |l| [l.from,,, 1]
  2. path <= join([link, path]).map  do |l, p|

     [l.from,,, p.cost + 1] if == p.from


  1. writing systems in logic
  1. toys aside, what can you really build?
  1. path vector, distance vector routing
  2. Chord DHT
  3. Distributed inference (kuang, what to say?)
  4. 2-phase commit
  5. Paxos (basic and production multipaxos)
  6. HDFS
  7. MapReduce Scheduler.
  8. replicated key-value store
  9. various shopping cart apps.
  1. BFS
  2. 2PC/Paxos


  1. Dedalus -- the problem of state and asynchrony
  1. recall: these are two of the really tricky things about distributed systems
  1. why?  state is temporal!  
  2. distributed state is hard to manage because distribution induces asynchrony and it’s hard to reason about state changing when we can’t control the order.
  1. we have so far sidestepped these issues        
  1. or papered over them.
  2. eg, the timeout system above is all well and good, but all it does is accumulate a list of dead nodes.  
  1. what if we want to delete them from a list of good nodes?
  2. what if nodes can come back up?
  3. um, GC?
  4. I am talking about mutable state, people.
  1. eg, the heartbeat rule
  1. it reads like an implication, but the head isn’t true when the body is true
  2. get_nonce may return monotonically increasing numbers.  but the messages may not arrive in order.
  3. the messages may never arrive
  1. sidestepping state and asynchrony compromises semantics
  1. not “Declarative” -- a fuzzy criticism
  2. but seriously:
  1. declarative languages (SQL (mostly), datalog) have a model-theoretic semantics
  1. the meaning of a program is its smallest consistent model.
  2. the model produced by fixpoint evaluation is the minimal model -- semantics correspond to execution
  1. this doesn’t work out as nicely with nonmonotonic (w/ negation, aggregation) programs.
  2. but even so, among multiple minimal models, well-formed (stratifiable) programs always have a stratified model that IS the semantics, and corresponds to intuition.
  3. without negation, a third
  1. most importantly: to understand what a program means, we need not consider HOW it is executed!
  1. dedalus: explicitly temporal distributed datalog.
  1. state update requires moving the clock forward
  2. message transmissions require picking an “arbitrary” receipt time
  1. hence honest about indeterminacy of receipt clock time.
  2. some contraints make sense on this choice, but depend on the distributed system model
  1. synchronous? always pick successor, as with state update.
  2. truly asynchronous with time travel?  no constraints
  3. async, but causal?  lamport clocks
  1. you can send a message to the past..
  2. but you can’t depend on a message from the future.
  1. partially synchronous with guarantees?  hm, bounds constraining the mapping of logical to physical clocks?


  1. debugging
  1. state of the art in debugging logic systems
  1. “tee” the existing dataflow
  1. register continuous queries over the teed dataflow
  2. assert invariants over flow
  3. log the trace locally
  1. or forward to a central location
  1. count
  1. rule ‘firings’ (pretty operational)
  2. messages (these too are just data tuples)
  1. what about assertions over distributed state?
  1. eg, only one node has the token
  2. consistent global snapshots
  1. chandy-lamport
  1. where it needs to go
  1. provenance: whence does this tuple (absence of tuple) come?
  1. analysis
  1. static analysis is easy in declarative languages
  1. syntactic
  2. traditionally, two main classes of static analysis:
  1. safety (termination).
  1. show that the program always produces finite outputs given finite inputs
  2. basically impossible given the extensions (functions with infinite range, most critically successor())
  3. but, we can give conservative “quiescence” conditions (see the dedalus paper if interested)
  1. stratifiability
  1. if the program isn’t monotonic, how do we know if its execution converges?
  2. show the the program ‘makes sense’
  1. has a model
  1. p :- not p
  2. no model --  what does it mean??
  1. has only one model
  1. p :- not q
  2. q :- not p
  3. 2 models -- what does it mean??
  1. p
  2. q
  1. simple approach: syntactic stratification
  1. can we establish a partial order of evaluation over the goals, s.t. the program result is monotonic?
  2. intuition: if we negate or aggregate over a set, we are doing a ‘universal quantification’ over the set.
  3. we need to make a CWA that the set is ‘complete’.
  4. algorithm: order the program goals into strata s.t. we only negate or aggregate over goals in lower strata.  cycles are OK as long as they have no NM.
  1. syntactic strat is conservative
  1. many useful systems idioms are ruled out
  2. eg, do something if you haven’t already done so
  3. eg, update and deletion of stored state combined with iteration
  1. (wording from joe’s slides) there are less conservative things
  1. inheriting these analyses from datalog is nice, but can they help us analyze distributed systems?
  2. Hellerstein’s CALM Conjecture
  1. background:
  1. consistency ~ deterministic result of distributed computation.
  1. eg, replication.  consistency = replicas agree.
  1. strong consistency: replicas agree on total order of updates and all intermediate states
  2. eventual consistency: replicas agree on end state
  1. eventually consistent systems are gaining popularity b/c strongly consistent systems (managed via protocols like 2pc and paxos) are difficult to scale (failures partitions etc)
  2. unfortunately, EC boils down to best programming practices instead of consistency guarantees from the substrate
  1. eg, use commutative operations
  1. ACID2.0 vs ACID (0.9) (not LSD-25)
  2. that sounds hard!
  1. examples
  1. classic: escrow transactions
  1. a set of debits/credits accumulates
  1. common: shopping carts
  1. same principle: contents of cart are the union of the set of cart updates
  2. this is why, eg, when version conflicts occur in the cart implemented on amazon dynamo, the system can always resolve them automatically
  1. hard to reason formally about when systems are EC
  1. CALM:
  1. logically monotonic --> eventually consistent
  2. !logically monotonic --> !eventually consistent
  3. if true....
  1. NoSQL = Datalog ;)
  1. intuition
  1. asynchrony induces nondeterminism
  1. timestamps
  2. ordering
  1. straw man:
  1. single-node systems are EC :)
  1. barring internal ND
  1. all components that don’t communicate are deterministic, ok, fine.
  1. can we do better? YES
  1. async --> ND only if NM.
  2. set-union is order independent.  so is monontonic implication.
  3. p <- q, r   (with q and r network-derived)
  1. if q are r are persistent tables (sets, every q tuple commutes with every other q tuple and with every r tuple!!!
  2. imagine q and r racing to the server
  1. whenever p gets there, either
  1. r is there, or
  2. it waits till it gets there.
  1. (this accumulates substantial state)
  1. p <- q, not r
  1. problem!  q and r no longer co-commute
  2. the order of arrival of r tuples may affect the result, as may the inter-order btw q and r.
  1. we can adapt the stratification analyses from the datalog lit!
  1. draw a dependency graph among collections.  
  2. annotate async, temporal and nonmonotonic edges
  1. note: unpersisted async (events) are NM too
  1. implicit deletion
  1. NM edges in dataflow may require coordination to ensure deterministic results downstream
  2. identify maximal monotonic components
  1. messaging within the MCs is “free” -- no coordination necessary.
  1. refactor code to maximize the MCs
  1. “push back” points of order to as late as possible in the dataflow
  2. “localize” points of order so the coordination can occur at a single node.
  1. application: shopping carts
  1. take 1 : imperative shopping cart
  1. use a key-value store (overwriting)
  1. replicated via multicast
  1. key: session id
  2. value: array of items
  3. algo: read current value of session, append/delete from array, store
  4. analysis:
  1. there are “points of order” at every client update
  1. b/c overwriting old array value
  1. there are points of order at each replication of an update
  1. same reason
  1. a solution: synchronized message sending
  1. wait for acks.  hence ensure order.
  2. this is strong consistency!
  1. if client waits for ack, and server doesn’t ack till it gets acks from replicas...
  1. looks like 2pc!
  1. high latency.  bad failure scenarios
  2. and not strictly necessary.
  1. we could make a commutativity argument about updates
  2. the array is an ordered structure but in the end we treat it as a set
  1. only care about contents, not their internal order
  1. there are corner cases here
  1. deletion appears before corresponding insertion.
  1. if we drop the sync we may have incorrect behavior.
  1. take 2: a “disorderly shopping cart”
  1. model the user’s cart as a monotonically growing set of cart updates
  1. additions and deletions by session, item
  1. when a checkout message comes, aggregate
  1. sum up additions and deletions of items
  2. subtract
  3. calculate manifest.
  1. analysis tells us
  1. no point of order at client update
  1. client-server communication may be asynchronous, uncoordinated
  1. no point of order at replication        
  1. servers may replicate data asynchronously: result is “eventually consistent”
  1. there is a still a point of order at checkout
  1. this corresponds to intuition
  1. consider a cart update and checkout message racing to a replica.
  2. of course they don’t commute wrt the final cart contents
  1. we need to insert some coordination to ensure confluence.        
  1. eg, checkout contains a manifest of actions that precede it
  1. could just be a count...
  1. hence, we need to coordinate once per session rather than once per message

Bells to ring:

  1. click
  2. volcano?
  3. dynamo
  4. paxos
  5. parallel dbms
  6. selinger
  7. chord
  8. mapreduce
  9. system R, obviously