1 of 34

Emitters

for Stage 1

Pedram Emrouznejad, Shu-yu Guo

Bloomberg

2 of 34

JS programmers solve

2 problems over and over,

often together

  1. Conveyance (w/ control)
    1. Events (server and client side)
    2. Iterator and stream-likes
  2. Transformation (w/ side effects)
    • Generics (e.g. map, filter, reduce)

3 of 34

Community has thought about this space a lot

Sample of Libraries

4 of 34

Lodash

  • Older pioneer, but popular
  • Sync-only
  • Representationally rigid: Array, Object, etc
  • Used by: 3.5 million, npm: 20 million/week

// The `lodash/map` iteratee receives three arguments:

// (value, index|key, collection)

_.map(['6', '8', '10'], parseInt)

// ➜ [6, NaN, 2]

// The `lodash/fp/map` iteratee is capped at one argument:

// (value)

fp.map(parseInt)(['6', '8', '10'])

// ➜ [6, 8, 10]

5 of 34

Transducers

  • Everything (map, filter, etc) implemented as 3-arity reducing function
  • Compose functions into a single transformation function
  • Transformation decoupled from it’s inputs/outputs, can use across different mediums, sync, async
  • Does not serialise intermediate representations
  • Complete paradigm, academic history

let { map, filter, comp, into } = t

let inc = (n) => n + 1

let isEven = (n) => n % 2 == 0

let xf = comp(map(inc), filter(isEven))

console.log(into([], xf, [0,1,2,3,4])) // [2,4]

6 of 34

Concurrency

  • Iteration protocol: do one thing at a time
  • Generalisation: do N things at a time
  • Hard to get right, and painfully needed
  • p-limit: Used by: 1.5 million, npm: 13 million/week
  • p-map: Used by: 1.5million, npm: 6 million/week

const limit = pLimit(1);

const input = [

limit(() => fetchSomething('foo')),

limit(() => fetchSomething('bar')),

limit(() => doSomething())

];

// Only one promise is run at once

const result = await Promise.all(input);

7 of 34

RxJS

  • Reactive programming library
  • Representation agnostic
  • Sync and async sources
  • Single-consumer
  • Bi-di
  • Used by: 1.5 million, npm: 9 million/week

import { range } from 'rxjs/observable/range'

import { map, filter, scan, toArray } from 'rxjs/operators'

const value$ = range(0, 10)

|> filter(x => x % 2 === 0),

|> map(x => x + x),

|> scan((acc, x) => acc + x, 0),

|> toArray()

value$.subscribe(x => console.log(x)) // [0, 4, 12, 24, 40]

8 of 34

most.js

  • Representation agnostic
  • Sync and async
  • Single-consumer
  • Bi-di (“onion architecture”)
  • Ultra-high performance reactive event programming library
  • Used by: 3k, npm: 50k/week

import { constant, scan, merge, tap, runEffects } from '@most/core'

import { newDefaultScheduler } from '@most/scheduler'

import { click } from '@most/dom-event'

import { qs } from '../../common'

const incButton = qs('[name=inc]', document)

const decButton = qs('[name=dec]', document)

const value = qs('.value', document)

const inc = constant(1, click(incButton))

const dec = constant(-1, click(decButton))

const counter = scan((total, delta) => total + delta, 0, merge(inc, dec))

const render = tap(total => { value.innerText = String(total) }, counter)

runEffects(render, newDefaultScheduler())

9 of 34

Performance

  • Several orders of magnitude difference

filter -> map -> reduce 1000000 integers

-------------------------------------------------------

most 568.23 op/s ± 1.43% (80 samples)

rx 4 1.45 op/s ± 1.48% (12 samples)

rx 5 11.38 op/s ± 1.29% (55 samples)

xstream 18.41 op/s ± 0.91% (81 samples)

kefir 10.36 op/s ± 1.11% (51 samples)

bacon 0.83 op/s ± 1.93% (9 samples)

highland 4.82 op/s ± 1.90% (27 samples)

-------------------------------------------------------

10 of 34

Everybody and their dog has written map(fn), map(fn,source)

11 of 34

Platform and committee have thought about this too

12 of 34

Platform API

  • Just conveyance
  • Very similar interfaces
  • Lots of divergences too

// Server-Side (EventEmitter)

process.addListener(‘message’, callback)

process.removeListener(‘message’, callback)

process.emit(‘message’, ...args)

process.on(‘message’, ...args)

process.once(‘message’)

once(process, message)

on(process, message)

// Client-Side (EventTarget)

element.addEventListener(‘click’, callback)

element.removeEventListener(‘click’, callback)

element.dispatchEvent(new CustomEvent(‘click’, { detail }))

13 of 34

Generics

  • Some of the most commonly used function, as part of standard library
  • Operate on any data structure

import { len, map } from "std:builtins"

const ar = [1, 2, 3]

const st = new Set([4, 5, 6, 7])

len(ar) // => 3

len(dt) // => 4

map(ar, (val, idx) => val * 2) // Array {2, 4, 6}

map(st, (val, idx) => val * 2) // Set {8, 10, 12, 14}

14 of 34

Iterator Helpers

  • Create operators that return iterators�
  • Used by: 16, npm: ~400/week

function* naturals() {

let i = 0;

while (true) yield i++

}

const evens = naturals()

.filter((n) => n % 2 === 0);

for (const even of evens)

console.log(even, 'is an even number');

const MyIteratorPrototype = {

next() {},

throw() {},

return() {},

// but we don't properly implement %IteratorPrototype%!!!

};

// Previously...

// Object.setPrototypeOf(MyIteratorPrototype,

// Object.getPrototypeOf(Object.getPrototypeOf([][Symbol.iterator]())));

Object.setPrototypeOf(MyIteratorPrototype, Iterator.syncPrototype);

15 of 34

Observables

  • Previously in TC39, various criticisms stalled proposal�
  • WHATWG Proposal for integrating with DOM�
  • Some new adhoc promise-returning operators�
  • Bi-di

// filtering and mapping:

element.on("click")

.filter(e => e.target.matches(".foo"))

.map(e => ({x: e.clientX, y: e.clientY }))

.subscribe(handleClickAtPoint)

// conversion to promises for one-time events

document.on("DOMContentLoaded").first().then(e => …)

// ergonomic unsubscription via AbortControllers

const controller = element.on("input").subscribe(e => …)

controller.abort()

// or automatic/declarative unsubscription via the takeUntil method:

element.on("mousemove")

.takeUntil(document.on("mouseup"))

.subscribe(etc => …)

16 of 34

We propose a new

unifying primitive for

conveyance + transformation

  • Conveyance (w/ control)
    • Events (server and client side)
    • Iterator and stream-likes
  • Transformation (w/ side effects)
    • Generics (e.g. map, filter, reduce)

17 of 34

High-level Overview

18 of 34

  • Representation agnostic (sync and async)
  • Push-based
  • Multi-consumer
  • Unidirectional
  • Transformations as first-class values
  • Expressive
  • Easier to reason about for conveyance and computation
  • Ergonomic (composable)
  • Performant
  • Unifying building block for both client and server-side events

19 of 34

What is an Emitter

code

next

send

20 of 34

Unidirectional, multi-consumer pipelines

code

code

code

21 of 34

Emitter

  • Author can define how to process values
  • Win: Avoids allowing anyone who gets hold of one to directly send values�
  • Uses same error-handling mechanism as promises
  • Win: Avoids creating new semantics, and better language integration

const emitter = new Emitter()

emitter.each(d => console.log('d', d)) // d 42

emitter.next(42)

// only send true if an odd number was received

new Emitter((d, i, { send }) => { if (d % 2) send(true) })

// delay sending the value by 1 second

new Emitter((d, i, { send }) => { setTimeout(() => send(d), 1000) })

// send 3 values, then resolve the emitter

new Emitter((d, i, { send, resolve, reject }) => {

send(1)

send(2)

send(3)

resolve('done')

})

22 of 34

Emitter

  • Simple and expressive to implement any operator
  • Fast to implement

// connects one emitter to another and returns last one

emitter.each(d => console.log('d', d))

emitter.each(new Emitter(d => console.log('d', d)))

// processing before send

emitter.each((d, i, n) => n.send(d + 1))

// operators, as higher-order functions

const map = fn => (d, i, n) => n.send(fn(d))

const filter = fn => (d, i, n) => fn(d) && n.send(d)

emitter

.each(map(add1))

.each(filter(even))

emitter.each(map(add1), filter(even))

emitter.run(map(add1), filter(even))

23 of 34

Collection Transformation

  • You can use Iterable/AsyncIterable at the boundaries. Emitter.from to create an Emitter from other collections. Emitter.reduce operator is in the business of serialising back to some collection.

function* range(from, to) {

for (let i = from; i <= to; i++) yield i

}

// before

const arr = []

const itr = range(40, 60)() // “priming”

itr.next()

let done = false

while (true) {

const { value, done } = itr.next()

if (done) break

const mappedValue = value / 4

if (mappedValue < 12)

arr.push(mappedValue)

}

// after

const arr = run(

range(40, 60)

, map(d => d / 4)

, filter(d => d < 12)

, reduce([])

)

24 of 34

Unifying

Mental Model

  • Unifying mental model with callbacks: omitting the callback on platform API returns an Emitter. Examples here are equivalent
  • Two options: latter more backward-compatible

something.on(‘event’, callback)�something.on(‘event’).each(callback)

on(something, event’)

25 of 34

Cluster Stabilisation

  • Callback Hell
  • Shared mutable state
  • Intertwined Logic

const stable = (peers, start) => new Promise(resolve => {

let timeout

peers.map(peer =>

peer.on('status', ({ checksum }) => {

if (checksum == expected) {

log('peer stable')

if (peers.every(peer => peer.connected)) {

log('cluster stable')

if (timeout) clearTimeout(timeout)

timeout = setTimeout(() => {

resolve(peers)

peers.map(peer => peer.removeAllListeners())

}, 200)

}

}

})

)

})

26 of 34

Cluster Stabilisation

  • No Callback Hell
  • No Shared mutable state
  • Easily, linearly, extensible
  • This is self-contained, and could be part of a section of a larger pipeline!

const stable = (peers, start) => compose(

peers

, flatten(peer => peer.on('status'))

, filter(({ checksum }) => checksum == expected)

, tap(({ peer }) => log('peer stable', peer))

, filter(() => peers.every(peer => peer.stable))

, tap(({ peer }) => log('cluster stable'))

, debounce(200)

, until(1)

)

27 of 34

Bonus: Concurrency!

  • Not only does it unify those problem spaces, this API is more expressive for concurrency
  • An Emitter can return a Promise to signal when it’s done
  • Caller has fine-grained information to decide how much information to send
  • Running: run to completion, do them as fast as you can
  • Iteration protocol: do one thing at a time
  • Generalisation: do N things at a time
  • This is the limit(N) function

// search 1000 ports, 2 at a time, upto 10 servers

await run(

range(8000, 9000)

, limit(2)

, createServer

, until(10)

, reduce([])

)

28 of 34

Summary

  • The language’s push-based primitive
  • Unidirectional
  • Multi-consumer
  • Compat with existing APIs

Prototype

each

next

resolve

reject

Static Helpers

from

run

on

emit

Operators

map

filter

reduce

flatten

until

29 of 34

Why do this as a

language feature?

  • Cross-environment event class, makes future API and abstractions portable
  • Backwards-compatible upgrade to benefit from better ergonomics
  • Core operators of Standard Library (map, filter, reduce), and foundation for others (e.g. stats)
  • Interest in this being done in the runtime
  • Option to consider potential syntax
  • Future-proof

30 of 34

For Stage 1

  • Identifying interested parties who want to be involved
  • We’ve collected feedback to work on
  • Advancement criteria
  • For a deeper demo, please speak with Shu/Pedram later

31 of 34

How does this relate to Transducers

  • Composable algorithmic transformations: You can compose Emitters together, and use in place

  • Both do not serialise any intermediate representations

  • Esoteric signature for operators, same for other baseless alternatives (e.g. callbags)�
  • Trying to implement everything only with reducing functions becomes awkward e.g. early termination

  • Higher-order functions have performance cost

32 of 34

How does this relate to Observables?

  • Both push

  • Observables pushes to single-consumer, Emitter supports multiple consumers

  • Observables have “complete/error/done”. Emitter integrates with Promises and has same error-handling.

  • Observables have bidirectional signals. Emitter has unidirectional guarantee. Easier to reason about.

  • Observables are factories, create a new Subscription every time. Observables could be layered on top.

33 of 34

How does this relate to Streams?

  • One is push, the other is pull. Serve different use-cases.

  • You can implement Streams on top of Emitter (see buffer operator) - not a goal.

  • WHATWG and Node streams now implement Symbol.asyncIterator - so interoperates great.

  • You can use at boundaries, e.g. pull values, map, filter, then reduce into a file.

34 of 34

How does this relate to Standard Library?

  • Operators of Emitter form a core part of the standard library, like Generics work on any data-structure

  • Differences: pipelinable, lazy, no intermediate representations

  • These are the most widely used transformations to consider standardising.

  • Great foundation for others to extend
    • e.g. stats libraries can be implemented as operators, they’d benefit from being lazy, composable, etc