Emitters
for Stage 1
Pedram Emrouznejad, Shu-yu Guo
Bloomberg
JS programmers solve
2 problems over and over,
often together
Community has thought about this space a lot
Sample of Libraries
Lodash
// The `lodash/map` iteratee receives three arguments:
// (value, index|key, collection)
_.map(['6', '8', '10'], parseInt)
// ➜ [6, NaN, 2]
// The `lodash/fp/map` iteratee is capped at one argument:
// (value)
fp.map(parseInt)(['6', '8', '10'])
// ➜ [6, 8, 10]
Transducers
let { map, filter, comp, into } = t
let inc = (n) => n + 1
let isEven = (n) => n % 2 == 0
let xf = comp(map(inc), filter(isEven))
console.log(into([], xf, [0,1,2,3,4])) // [2,4]
Concurrency
const limit = pLimit(1);
const input = [
limit(() => fetchSomething('foo')),
limit(() => fetchSomething('bar')),
limit(() => doSomething())
];
// Only one promise is run at once
const result = await Promise.all(input);
RxJS
import { range } from 'rxjs/observable/range'
import { map, filter, scan, toArray } from 'rxjs/operators'
const value$ = range(0, 10)
|> filter(x => x % 2 === 0),
|> map(x => x + x),
|> scan((acc, x) => acc + x, 0),
|> toArray()
value$.subscribe(x => console.log(x)) // [0, 4, 12, 24, 40]
most.js
import { constant, scan, merge, tap, runEffects } from '@most/core'
import { newDefaultScheduler } from '@most/scheduler'
import { click } from '@most/dom-event'
import { qs } from '../../common'
const incButton = qs('[name=inc]', document)
const decButton = qs('[name=dec]', document)
const value = qs('.value', document)
const inc = constant(1, click(incButton))
const dec = constant(-1, click(decButton))
const counter = scan((total, delta) => total + delta, 0, merge(inc, dec))
const render = tap(total => { value.innerText = String(total) }, counter)
runEffects(render, newDefaultScheduler())
Performance
filter -> map -> reduce 1000000 integers
-------------------------------------------------------
most 568.23 op/s ± 1.43% (80 samples)
rx 4 1.45 op/s ± 1.48% (12 samples)
rx 5 11.38 op/s ± 1.29% (55 samples)
xstream 18.41 op/s ± 0.91% (81 samples)
kefir 10.36 op/s ± 1.11% (51 samples)
bacon 0.83 op/s ± 1.93% (9 samples)
highland 4.82 op/s ± 1.90% (27 samples)
-------------------------------------------------------
Everybody and their dog has written map(fn), map(fn,source)
Platform and committee have thought about this too
Platform API
// Server-Side (EventEmitter)
process.addListener(‘message’, callback)
process.removeListener(‘message’, callback)
process.emit(‘message’, ...args)
process.on(‘message’, ...args)
process.once(‘message’)
once(process, ‘message)
on(process, ‘message)
// Client-Side (EventTarget)
element.addEventListener(‘click’, callback)
element.removeEventListener(‘click’, callback)
element.dispatchEvent(new CustomEvent(‘click’, { detail }))
Generics
import { len, map } from "std:builtins"
const ar = [1, 2, 3]
const st = new Set([4, 5, 6, 7])
len(ar) // => 3
len(dt) // => 4
map(ar, (val, idx) => val * 2) // Array {2, 4, 6}
map(st, (val, idx) => val * 2) // Set {8, 10, 12, 14}
Iterator Helpers
function* naturals() {
let i = 0;
while (true) yield i++
}
const evens = naturals()
.filter((n) => n % 2 === 0);
for (const even of evens)
console.log(even, 'is an even number');
const MyIteratorPrototype = {
next() {},
throw() {},
return() {},
// but we don't properly implement %IteratorPrototype%!!!
};
// Previously...
// Object.setPrototypeOf(MyIteratorPrototype,
// Object.getPrototypeOf(Object.getPrototypeOf([][Symbol.iterator]())));
Object.setPrototypeOf(MyIteratorPrototype, Iterator.syncPrototype);
Observables
// filtering and mapping:
element.on("click")
.filter(e => e.target.matches(".foo"))
.map(e => ({x: e.clientX, y: e.clientY }))
.subscribe(handleClickAtPoint)
// conversion to promises for one-time events
document.on("DOMContentLoaded").first().then(e => …)
// ergonomic unsubscription via AbortControllers
const controller = element.on("input").subscribe(e => …)
controller.abort()
// or automatic/declarative unsubscription via the takeUntil method:
element.on("mousemove")
.takeUntil(document.on("mouseup"))
.subscribe(etc => …)
We propose a new
unifying primitive for
conveyance + transformation
High-level Overview
What is an Emitter
code
next
send
Unidirectional, multi-consumer pipelines
code
code
code
Emitter
const emitter = new Emitter()
emitter.each(d => console.log('d', d)) // d 42
emitter.next(42)
// only send true if an odd number was received
new Emitter((d, i, { send }) => { if (d % 2) send(true) })
// delay sending the value by 1 second
new Emitter((d, i, { send }) => { setTimeout(() => send(d), 1000) })
// send 3 values, then resolve the emitter
new Emitter((d, i, { send, resolve, reject }) => {
send(1)
send(2)
send(3)
resolve('done')
})
Emitter
// connects one emitter to another and returns last one
emitter.each(d => console.log('d', d))
emitter.each(new Emitter(d => console.log('d', d)))
// processing before send
emitter.each((d, i, n) => n.send(d + 1))
// operators, as higher-order functions
const map = fn => (d, i, n) => n.send(fn(d))
const filter = fn => (d, i, n) => fn(d) && n.send(d)
emitter
.each(map(add1))
.each(filter(even))
emitter.each(map(add1), filter(even))
emitter.run(map(add1), filter(even))
Collection Transformation
function* range(from, to) {
for (let i = from; i <= to; i++) yield i
}
// before
const arr = []
const itr = range(40, 60)() // “priming”
itr.next()
let done = false
while (true) {
const { value, done } = itr.next()
if (done) break
const mappedValue = value / 4
if (mappedValue < 12)
arr.push(mappedValue)
}
// after
const arr = run(
range(40, 60)
, map(d => d / 4)
, filter(d => d < 12)
, reduce([])
)
Unifying
Mental Model
something.on(‘event’, callback)�something.on(‘event’).each(callback)
on(something, ‘event’)
Cluster Stabilisation
const stable = (peers, start) => new Promise(resolve => {
let timeout
peers.map(peer =>
peer.on('status', ({ checksum }) => {
if (checksum == expected) {
log('peer stable')
if (peers.every(peer => peer.connected)) {
log('cluster stable')
if (timeout) clearTimeout(timeout)
timeout = setTimeout(() => {
resolve(peers)
peers.map(peer => peer.removeAllListeners())
}, 200)
}
}
})
)
})
Cluster Stabilisation
const stable = (peers, start) => compose(
peers
, flatten(peer => peer.on('status'))
, filter(({ checksum }) => checksum == expected)
, tap(({ peer }) => log('peer stable', peer))
, filter(() => peers.every(peer => peer.stable))
, tap(({ peer }) => log('cluster stable'))
, debounce(200)
, until(1)
)
Bonus: Concurrency!
// search 1000 ports, 2 at a time, upto 10 servers
await run(
range(8000, 9000)
, limit(2)
, createServer
, until(10)
, reduce([])
)
Summary
Prototype
each
next
resolve
reject
Static Helpers
from
run
on
emit
Operators
map
filter
reduce
flatten
until
Why do this as a
language feature?
For Stage 1
How does this relate to Transducers
How does this relate to Observables?
How does this relate to Streams?
How does this relate to Standard Library?