RxJS FObs (Functional Observables)

Author: Ben Lesh <benlesh@google.com> <ben@benlesh.com>

Experimental Impl: https://github.com/ReactiveX/rxjs/tree/experimental

Basic Idea

The idea here is to reduce the size and overhead of RxJS by switching operators over to something akin to https://github.com/staltz/callbag-basics

We are in a good position to move people over to this somewhat seamlessly, because we have moved everyone over to pipe() operators.

The idea is that all operators would return an "ObservableFunction", which is essentially a callbag. We could then make pipe (implemented as operate below), convert the Observable to an ObservableFunction, set up operations, and then Convert back to an Observable.

NOTE: A goal of this design is to limit the Public API changes of RxJS to practically nothing. Our users shouldn't know what happened, just that everything is smaller and faster.

Pros

  • The size of operators are drastically reduced. For example, map is defined as a simple higher order function in a few lines of code.
  • The memory footprint is likely an order of magnitude smaller. Before, for every operation, a subscription and a subscriber, plus several functions needed to be allocated. Now just one function or two functions are allocated.
  • We *could* migrate to this without breaking existing operators.
  • Due to the "sometimes sync, sometimes async" nature of Observables, a token-style subscription is much easier to implement, but less ergonomic for users. This design gives us the best of both worlds, as the FObs, which really the internal implementation of observable, can pass a subscription around as an argument like a token, where the external API, Observable, will still return a Subscription from the subscribe call. This greatly cleans up the logic around subscription.
  • An FObs can represent an Observable, a Subscriber or a Subject as a function.
  • Fewer allocations per operator
  • Gets rid of Subscriber as Observer and Subscription architecture from v5. It was great for optimizing performance in an object-oriented sorta way, but ultimately it's a heavy and confusing construct.
  • Minifiers have better results since there are no properties which minifiers can't rename (closure excluded)

Cons

  • Maintenance is a little more mind-bending
  • During migration, there will be a performance cost at Observable setup time.
  • Eventually we'll want people to migrate their operators to the new style… which isn't very ergonomic to implement.
  • Operators that return ConnectableObservable, such as mulitcast, publish, publishLast and publishReplay present a significant design problem for this approach. (See section on ConnectableObservable Challenges below)

function subscribe(fn) {

  var fobs = this;

  fobs(SUBSCRIBE, fn); 

}

function x(msg, value, subscription) {

}

prev.patchFn(x);

x.lift = lift;

....

x: Subscribable & FOBs

Callbags Aren't Quite Perfect for RxJS

We'll need to implement something similar. The Callbags implementation has a few limitations that make it inadequate for RxJS:

  1. Current teardown semantics aren't as efficient as we'd like (See section on Callbags Approach to Subscriptions below)
  2. There's only two communication channels: Next and Complete. The completion channel allows for an optional argument to signal error. This doesn't work for RxJS as a design because someone could write a mapping function that throws undefined. Meaning you'd have to do an argument length check every time to see if there's an error.
  3. There is no scheduling accounted for
  4. Callbags operators enforce no error handling or safety.
  5. Callbags doesn't provide the same semantic guarantees OOTB.

function (type: number, arg: any, subs: Subscription) {

}

Current Recommended Implementation

Types

export enum FOType {

  SUBSCRIBE = 0,

  NEXT = 1,

  COMPLETE = 2,

  ERROR = 3,

}

export type SinkArg<T> = T | void | any;

export interface Sink<T> {

  (type: FOType.NEXT, value: T, subs: Subscription): void;

  (type: FOType.ERROR, err: any, subs: Subscription): void;

  (type: FOType.COMPLETE, arg: void, subs: Subscription): void;

  (type: FOType, arg: FObsArg<T>, subs: Subscription): void;

}

export interface Source<T> {

  (type: FOType.SUBSCRIBE, sink: Sink<T>, subs: Subscription): void;

}

export interface FObs<T> extends Source<T>, Sink<T> {

  (type: FOType, arg: FObsArg<T>, subs: Subscription): void;

}

export type Operation<T, R> = (source: Observable<T>) => Observable<R>;

export interface ObservableConstructor {

 new<T>(init?: (subscriber: Subscriber<T>) => void): Observable<T>;

}

export interface Observable<T> extends FObs<T> {

 subscribe(observer: PartialObserver<T>, scheduler?: Scheduler): Subscription;

 subscribe(

   nextHandler?: (value: T, subscription: Subscription) => void,

   errorHandler?: (err: any) => void,

   completeHandler?: () => void,

   scheduler?: Scheduler,

 ): Subscription;

 subscribe(): Subscription;

 // TODO: flush out types

 pipe(...operations: Array<Operation<any, any>>): Observable<any>;

}

The Observable Implementation Itself

**REVISED**

After speaking with Misko, about the previous design in which FObs was only used as an internal construct of Rx, he had the awesome idea of making the observable instance itself be a FObs. This is done by "tricking" JavaScript (and TypeScript) into treating a function that creates the FObs as a constructor for Observable.


This solves the problem where previously created operators that went from
(source: Observable<T>) => Observable<R> wouldn't work with the new architecture, because pipe would expect (source: FObs<T>) => FObs<R>. If Observable is a FObs, then that problem is solved.

Additionally, shared methods can be defined elsewhere and cheaply patched on at creation time, so the function references are still shared.

Also the __proto__ can be set, so instanceof checks pass.

Another amazing side-effect of this design is we're not using classes anywhere. This has the added benefit of not downlevelings with IIFE-wrapped definitions, which tree shakers have a hard time with, and also means we don't require class definition helpers that ship with things like tslib, etc.

The basic idea is as follows:

// (given the interfaces above…)

export const Observable: ObservableConstructor = function <T>(init?: (subscriber: Subscriber<T>) => void) {

 return sourceAsObservable((type: FOType.SUBSCRIBE, dest: Sink<T>, subs: Subscription) => {

   let teardown: Teardown;

   subs.add(() => teardownToFunction(teardown)());

   const subscriber = createSubscriber(dest, subs);

   teardown = init(subscriber);

 });

} as any

export function sourceAsObservable<T>(source: Source<T>): Observable<T> {
 
const result = source as Observable<T>;
 
(result as any).__proto__ = Observable.prototype;
 result
.subscribe = subscribe;
 result
.pipe = observablePipe;
 
return result;
}

The Subscription Implementation

Subscriptions are all over in Rx internals. In an ideal design, there would be one subscription shared throughout several operations (if they're synchronous), and for async operations, such as a mergeMap, etc, child subscriptions would be added to that shared subscription. Also, as an ideal architecture, the subscription would be created ahead of the actual act of initializing the observable stream, and passed in somewhat like a token. This makes decision paths WRT subscription being open or closed much easier to manage in the internals of the library. The downside to a token-esque design is that the API for it is less ergonomic, as you need to create a subscription before you subscribe to the observable.

This design gives us the best of both worlds, as the external API will use the current RxJS paradigm of returning the Subscription from the subscribe call. But internally, we will use the FSubs, which, as of the time of this writing is just a function that can add or remove child subscriptions, unsubscribe and return whether or not it's closed. Internally, we can create the subscription ahead of time and pass it in as an argument to the FObs at subscribe time, as well as keep it provided during all parts of the reactive lifecycle.


The Subscription is also using a similar pattern to what we're doing with Observable above, where it's defined as a function, then patched with methods that it has.

The Subject Implementation

The FObs really shines, here. Because both Sinks and Sources (Observers and Observables) have the same FObs signature, and because Subject is both an Observer and Observable, we're able to implement the entire Subject as one function. Thus reducing its size immensely over the v5/v6 implementation.

We are, again, using the same approach as we did with Observable, so that each Subject instance is actually also a FObs.

Another thing I'd like to do with this implementation to simplify it slightly is to do away with the current behavior of Subject.prototype.unsubscribe as it stands in RxJS 5 and 6. What it does is basically put the Subject instance into a state where any action taken on it throws an ObjectUnsubscribedError. In practice, this is rarely if ever used, and the use cases are generally debatable. If we need to readd it later, we can do so with some trickery for sure. The implementation is just much cleaner without it.

Migration Possibility: Mixing Pipeable Operators with New ObservableFunction Operators

Until all operators are migrated, and we can release a new version, during version 6, we can convert the operators one by one, by simply decorating those operators (or otherwise marking them) as being the "new type" of operator, so that the logic inside pipe knows whether or not to convert them.

This would mean that we could move to these new operators internally, but not break anyone's custom v5.5-and-up pipeable operators.

The basic logic would be (newOperator is a symbol, below):

Observable.prototype.pipe = function <T, R>(...ops: Array<OperatorFunction<T, R>|Operation<T, R>>) {
 ops
= ops.map(op => op[newOperator] ? op : convertOperatorFunction(op as OperatorFunction<T, R>));
 
return fromObservableFunction(pipe(...ops)(toObservableFunction(this)) as any);
}


Possibility: Converting Pipeable Operators to ObservableFunction Operators

This could be done with a simple function, so that any currently "Pipeable" operator could be used as an Observable function operator:

function convertOperatorFunction<T, R>(op: (source: Observable<T>) => Observable<any>): Operation<T, R> {

  return (obfn: ObservableFunction<T>) => {

    return toObservableFunction(op(fromObservableFunction(obfn)));

  }

}

ConnectableObservable Challenges

As this approach has all operators returning FObs functions, and ConnectableObservable relies on a connect() method, operators like multicast and publish will prove difficult to implement. This might be supported by an additional "type" that can be passed as the first argument to the FObs. FOType.CONNECT, but that may complicate composition of operators.

Another option is to only allow the variants of multicast and publish that have a selector function. With this, multicast and publish only return Observable and not ConnectableObservable. This has pros and cons:

Pros: It reduces API surface area for the library, and forces what some would say is a better way to use publish and multicast-type operators.

Cons: It makes it slightly more difficult to imperatively pass around a multicast instance prior to connecting it. (Note: It's still not impossible, as developers could use function composition to get the same effect)

Example of the selector function with publish:

const published$ = source$.pipe(

  publish()

)

published$.pipe(map(x => x + x)).subscribe(doAThing);

published$.pipe(map(x => x - 2)).subscribe(doSomethingElse);

published$.connect()

/// is the same as this (Selector function version):

source$.pipe(

  publish(published$ => merge(

    published$.pipe(map(x => x + x), tap(doAThing)),

    published$.pipe(map(x => x - 2), tap(doSomethingElse)),

  ))

).subscribe();

The use cases for publish/connect and publish/selector patterns are primarily focused on multicasting completely synchronous observables. In those situations, refCounted variants like share and shareReplay do not work, as a synchronous observable will complete and jump the reference count back to zero before even getting to the next subscriber.

It is true that these situations are not the 80% or even 90% use case, and in most cases share() or shareReplay() are recommended, which are easy to implement in this proposed design.

Another option for people here is to use share() but schedule subscription with subscribeOn:

Workaround for synchronous observables with share:

const shared$ = source$.pipe(

  subscribeOn(asapScheduler),

  share(),

);

published$.pipe(map(x => x + x)).subscribe(doAThing);

published$.pipe(map(x => x - 2)).subscribe(doSomethingElse);

// or with queue scheduling:

const shared$ = source$.pipe(

  subscribeOn(queueScheduler),

  share(),

);

queueScheduler.schedule(() => {

  shared$.pipe(map(x => x + x)).subscribe(doAThing);

  shared$.pipe(map(x => x - 2)).subscribe(doSomethingElse);

});

It would be easier and simplify the library a lot to move away from ConnectableObservable, and it would be both good and bad for consumers of the library. Currently, along with refCount(), there are 10-12 or more variants of multicasting in RxJS, each with their own nuances (Subjects going "dead" in some cases, not in others, etc). Removing ConnectableObservable and moving towards a more functional approach would narrow these options down to 5 or 6, but retain the same available functionality. But patterns would change.

Callbags Approach To Subscriptions Didn't Work Out

I spoke with Andre, and he pointed out that rather than Subscriptions, callbags uses what he called "talkbacks" that are passed to sinks with a SUBSCRIBE (0) command. Callbags also treats Subscriptions as callbags, such that Observables, Subscribers, and Subscriptions all have the same "shape", basically a function with two arguments: (type: number, arg: any) => void.

This effectively means that to set up observation, one must create a subscription, then imperatively pass it to the Sink with a command like sink(FOType.Subscribe, subscription), before the sink could really be used and provide teardown guarantees.

I spiked this approach out in the experimental branch, with tests, and while it does work I found there to be a few problems with the design:

  1. There was a lot of additional code required, both in terms of lines of code, and execution, to pass subscriptions around between functional observers.
  2. It was very easy to miss imperatively passing the subscription to a Sink that needed it.
  3. Subscriptions, in particular subscriptions with "child subscriptions" like you might find in a flattening operator such as mergeMap, either required functional composition that would be slow, or required a "composite subscription" implementation that maintained an inner list of teardown logic to execute on unsubscribe. This meant adding additional FOTypes that had nothing to do with the rest of the FObs functionality: ADD and REMOVE.

Instead, I've opted for having Subscription be a type totally separate from the FObs type, and have FObs always take a third argument of the Subscription. This means that:

  1. We no longer require additional lines of code to pass subscription instances around.
  2. We no longer require closing over a subscription instance so that we might capture it when a particular signal is sent
  3. Overall, less code to write and execute.

You can see the difference between the two approaches in these commits on GitHub. In particular, look at the differences in the take operator or the map operator where it's more obvious what's changed and has been simplified. The Subscriber logic also simplified, with a mild downside of needing to keep a handle to the Subscription in a Symbol on the subscriber instance.

Idea For Not Deprecating concat/merge/zip/combineLatest Operators

Since observable instances are going to technically be functions here, we can have the static version of concat, merge, et al, return a function that:

  1. Examines the first argument,
  1. if it's an Observable, acts as an operator
  2. If it's an FOType (number), acts as an Observable subscription
  3. If subscribe is called on it directly, obviously go to b.

Basically:

function concat<T>(...sources: Array<ObservableInput<T>>) {

  return (typeOrSource: any, sink: Sink<T>, subs: Subscription) => {

    if (isObservable(typeOrSource)) {

      // operator logic to return new Observable

    } else if (typeOrSource === FOType.SUBSCRIBE) {

      // subscribe to the concatenated observable here.

    }

  }

}


This means that
both of these calls to concat would be valid.

import { concat } from 'rxjs';

concat(a$, b$, c$);

// and

a$.pipe(concat(b$, c$));

It also means that we might be able to fold all operators into the main rxjs export site, thus reducing exports to one location.

Centralized Error Handling

Looking at utilizing a similar centralized error handling to what we used in the original RxJS 5 implementation.

An advantage to this design, is we could introduce the change in RxJS 6 as a non-breaking change, if the desired behavior of overriding the error handling for Node is needed before we can get what's in experimental out to the public.

Goals:

  • More optimized functions (via fewer try-catch blocks)
  • The ability to override this behavior for Node purposes. Node developers frequently desire unhandled errors to be thrown immediately, rather than having them in a try-catch. This is because they want the core dump and other information provided when the process panics.

Overriding Error Handling For Node's Case

In this case a developer (or us, as a core team) should be able to pull in the tryUserFunction function from the utility module and patch it to simply execute the function it's given and throw if it errors.

Optionally, we could have an implementation that looks for the presence of another node module like "rxjs-node-panic" or some similar name, and if it's there, use that function for tryUserFunction instead of the default implementation.

We have a lot of options here.

Basic Implementation Is As Follows

(copied from experimental):

const ERROR_OBJECT = {

  error: null as any,

};

/**

 * Executes a user-provided function within a try-catch, and returns either the result

 * or returns {@link ERROR_OBJECT} if an error has occurred.

 * Use {@link resultIsError} to verify whether the result is an error or not.

 *

 * @param fn the user-provided function to wrap in some error handling for safety

 * @param args The arguments to execute the user-provided function with.

 */

export function tryUserFunction<R>(fn: (...args: any[]) => R, ...args: any[]): typeof ERROR_OBJECT | R {

  ERROR_OBJECT.error = null;

  let result: R;

  try {

    result = fn(...args);

  } catch (err) {

    ERROR_OBJECT.error = err;

    return ERROR_OBJECT;

  }

  return result;

}

/**

 * Used to verify whether the result of {@link tryUserFunction}

 * is an error or not. If this returns true, check {@link ERROR_OBJECT}'s

 * error property for the error value.

 */

export function resultIsError<R>(result: R|typeof ERROR_OBJECT): result is typeof ERROR_OBJECT {

  return result === ERROR_OBJECT;

}