Author: Ben Lesh <benlesh@google.com> <ben@benlesh.com>
The idea here is to reduce the size and overhead of RxJS by switching operators over to something akin to https://github.com/staltz/callbag-basics
We are in a good position to move people over to this somewhat seamlessly, because we have moved everyone over to pipe() operators.
The idea is that all operators would return an "ObservableFunction", which is essentially a callbag. We could then make pipe (implemented as operate below), convert the Observable to an ObservableFunction, set up operations, and then Convert back to an Observable.
NOTE: A goal of this design is to limit the Public API changes of RxJS to practically nothing. Our users shouldn't know what happened, just that everything is smaller and faster.
function subscribe(fn) { var fobs = this; fobs(SUBSCRIBE, fn); } function x(msg, value, subscription) { } prev.patchFn(x); x.lift = lift; .... x: Subscribable & FOBs |
We'll need to implement something similar. The Callbags implementation has a few limitations that make it inadequate for RxJS:
function (type: number, arg: any, subs: Subscription) {
}
export enum FOType { SUBSCRIBE = 0, NEXT = 1, COMPLETE = 2, ERROR = 3, } export type SinkArg<T> = T | void | any; export interface Sink<T> { (type: FOType.NEXT, value: T, subs: Subscription): void; (type: FOType.ERROR, err: any, subs: Subscription): void; (type: FOType.COMPLETE, arg: void, subs: Subscription): void; (type: FOType, arg: FObsArg<T>, subs: Subscription): void; } export interface Source<T> { (type: FOType.SUBSCRIBE, sink: Sink<T>, subs: Subscription): void; } export interface FObs<T> extends Source<T>, Sink<T> { (type: FOType, arg: FObsArg<T>, subs: Subscription): void; } export type Operation<T, R> = (source: Observable<T>) => Observable<R>; export interface ObservableConstructor { new<T>(init?: (subscriber: Subscriber<T>) => void): Observable<T>; } export interface Observable<T> extends FObs<T> { subscribe(observer: PartialObserver<T>, scheduler?: Scheduler): Subscription; subscribe( nextHandler?: (value: T, subscription: Subscription) => void, errorHandler?: (err: any) => void, completeHandler?: () => void, scheduler?: Scheduler, ): Subscription; subscribe(): Subscription; // TODO: flush out types pipe(...operations: Array<Operation<any, any>>): Observable<any>; } |
**REVISED**
After speaking with Misko, about the previous design in which FObs was only used as an internal construct of Rx, he had the awesome idea of making the observable instance itself be a FObs. This is done by "tricking" JavaScript (and TypeScript) into treating a function that creates the FObs as a constructor for Observable.
This solves the problem where previously created operators that went from (source: Observable<T>) => Observable<R> wouldn't work with the new architecture, because pipe would expect (source: FObs<T>) => FObs<R>. If Observable is a FObs, then that problem is solved.
Additionally, shared methods can be defined elsewhere and cheaply patched on at creation time, so the function references are still shared.
Also the __proto__ can be set, so instanceof checks pass.
Another amazing side-effect of this design is we're not using classes anywhere. This has the added benefit of not downlevelings with IIFE-wrapped definitions, which tree shakers have a hard time with, and also means we don't require class definition helpers that ship with things like tslib, etc.
The basic idea is as follows:
// (given the interfaces above…) export const Observable: ObservableConstructor = function <T>(init?: (subscriber: Subscriber<T>) => void) { return sourceAsObservable((type: FOType.SUBSCRIBE, dest: Sink<T>, subs: Subscription) => { let teardown: Teardown; subs.add(() => teardownToFunction(teardown)()); const subscriber = createSubscriber(dest, subs); teardown = init(subscriber); }); } as any |
Subscriptions are all over in Rx internals. In an ideal design, there would be one subscription shared throughout several operations (if they're synchronous), and for async operations, such as a mergeMap, etc, child subscriptions would be added to that shared subscription. Also, as an ideal architecture, the subscription would be created ahead of the actual act of initializing the observable stream, and passed in somewhat like a token. This makes decision paths WRT subscription being open or closed much easier to manage in the internals of the library. The downside to a token-esque design is that the API for it is less ergonomic, as you need to create a subscription before you subscribe to the observable.
This design gives us the best of both worlds, as the external API will use the current RxJS paradigm of returning the Subscription from the subscribe call. But internally, we will use the FSubs, which, as of the time of this writing is just a function that can add or remove child subscriptions, unsubscribe and return whether or not it's closed. Internally, we can create the subscription ahead of time and pass it in as an argument to the FObs at subscribe time, as well as keep it provided during all parts of the reactive lifecycle.
The Subscription is also using a similar pattern to what we're doing with Observable above, where it's defined as a function, then patched with methods that it has.
The FObs really shines, here. Because both Sinks and Sources (Observers and Observables) have the same FObs signature, and because Subject is both an Observer and Observable, we're able to implement the entire Subject as one function. Thus reducing its size immensely over the v5/v6 implementation.
We are, again, using the same approach as we did with Observable, so that each Subject instance is actually also a FObs.
Another thing I'd like to do with this implementation to simplify it slightly is to do away with the current behavior of Subject.prototype.unsubscribe as it stands in RxJS 5 and 6. What it does is basically put the Subject instance into a state where any action taken on it throws an ObjectUnsubscribedError. In practice, this is rarely if ever used, and the use cases are generally debatable. If we need to readd it later, we can do so with some trickery for sure. The implementation is just much cleaner without it.
Until all operators are migrated, and we can release a new version, during version 6, we can convert the operators one by one, by simply decorating those operators (or otherwise marking them) as being the "new type" of operator, so that the logic inside pipe knows whether or not to convert them.
This would mean that we could move to these new operators internally, but not break anyone's custom v5.5-and-up pipeable operators.
The basic logic would be (newOperator is a symbol, below):
Observable.prototype.pipe = function <T, R>(...ops: Array<OperatorFunction<T, R>|Operation<T, R>>) { |
This could be done with a simple function, so that any currently "Pipeable" operator could be used as an Observable function operator:
function convertOperatorFunction<T, R>(op: (source: Observable<T>) => Observable<any>): Operation<T, R> { return (obfn: ObservableFunction<T>) => { return toObservableFunction(op(fromObservableFunction(obfn))); } } |
As this approach has all operators returning FObs functions, and ConnectableObservable relies on a connect() method, operators like multicast and publish will prove difficult to implement. This might be supported by an additional "type" that can be passed as the first argument to the FObs. FOType.CONNECT, but that may complicate composition of operators.
Another option is to only allow the variants of multicast and publish that have a selector function. With this, multicast and publish only return Observable and not ConnectableObservable. This has pros and cons:
Pros: It reduces API surface area for the library, and forces what some would say is a better way to use publish and multicast-type operators.
Cons: It makes it slightly more difficult to imperatively pass around a multicast instance prior to connecting it. (Note: It's still not impossible, as developers could use function composition to get the same effect)
Example of the selector function with publish:
const published$ = source$.pipe( publish() ) published$.pipe(map(x => x + x)).subscribe(doAThing); published$.pipe(map(x => x - 2)).subscribe(doSomethingElse); published$.connect() /// is the same as this (Selector function version): source$.pipe( publish(published$ => merge( published$.pipe(map(x => x + x), tap(doAThing)), published$.pipe(map(x => x - 2), tap(doSomethingElse)), )) ).subscribe(); |
The use cases for publish/connect and publish/selector patterns are primarily focused on multicasting completely synchronous observables. In those situations, refCounted variants like share and shareReplay do not work, as a synchronous observable will complete and jump the reference count back to zero before even getting to the next subscriber.
It is true that these situations are not the 80% or even 90% use case, and in most cases share() or shareReplay() are recommended, which are easy to implement in this proposed design.
Another option for people here is to use share() but schedule subscription with subscribeOn:
Workaround for synchronous observables with share:
const shared$ = source$.pipe( subscribeOn(asapScheduler), share(), ); published$.pipe(map(x => x + x)).subscribe(doAThing); published$.pipe(map(x => x - 2)).subscribe(doSomethingElse); // or with queue scheduling: const shared$ = source$.pipe( subscribeOn(queueScheduler), share(), ); queueScheduler.schedule(() => { shared$.pipe(map(x => x + x)).subscribe(doAThing); shared$.pipe(map(x => x - 2)).subscribe(doSomethingElse); }); |
It would be easier and simplify the library a lot to move away from ConnectableObservable, and it would be both good and bad for consumers of the library. Currently, along with refCount(), there are 10-12 or more variants of multicasting in RxJS, each with their own nuances (Subjects going "dead" in some cases, not in others, etc). Removing ConnectableObservable and moving towards a more functional approach would narrow these options down to 5 or 6, but retain the same available functionality. But patterns would change.
I spoke with Andre, and he pointed out that rather than Subscriptions, callbags uses what he called "talkbacks" that are passed to sinks with a SUBSCRIBE (0) command. Callbags also treats Subscriptions as callbags, such that Observables, Subscribers, and Subscriptions all have the same "shape", basically a function with two arguments: (type: number, arg: any) => void.
This effectively means that to set up observation, one must create a subscription, then imperatively pass it to the Sink with a command like sink(FOType.Subscribe, subscription), before the sink could really be used and provide teardown guarantees.
I spiked this approach out in the experimental branch, with tests, and while it does work I found there to be a few problems with the design:
Instead, I've opted for having Subscription be a type totally separate from the FObs type, and have FObs always take a third argument of the Subscription. This means that:
You can see the difference between the two approaches in these commits on GitHub. In particular, look at the differences in the take operator or the map operator where it's more obvious what's changed and has been simplified. The Subscriber logic also simplified, with a mild downside of needing to keep a handle to the Subscription in a Symbol on the subscriber instance.
Since observable instances are going to technically be functions here, we can have the static version of concat, merge, et al, return a function that:
Basically:
function concat<T>(...sources: Array<ObservableInput<T>>) { return (typeOrSource: any, sink: Sink<T>, subs: Subscription) => { if (isObservable(typeOrSource)) { // operator logic to return new Observable } else if (typeOrSource === FOType.SUBSCRIBE) { // subscribe to the concatenated observable here. } } } |
This means that both of these calls to concat would be valid.
import { concat } from 'rxjs'; concat(a$, b$, c$); // and a$.pipe(concat(b$, c$)); |
It also means that we might be able to fold all operators into the main rxjs export site, thus reducing exports to one location.
Looking at utilizing a similar centralized error handling to what we used in the original RxJS 5 implementation.
An advantage to this design, is we could introduce the change in RxJS 6 as a non-breaking change, if the desired behavior of overriding the error handling for Node is needed before we can get what's in experimental out to the public.
In this case a developer (or us, as a core team) should be able to pull in the tryUserFunction function from the utility module and patch it to simply execute the function it's given and throw if it errors.
Optionally, we could have an implementation that looks for the presence of another node module like "rxjs-node-panic" or some similar name, and if it's there, use that function for tryUserFunction instead of the default implementation.
We have a lot of options here.
Basic Implementation Is As Follows
(copied from experimental):
const ERROR_OBJECT = { error: null as any, }; /** * Executes a user-provided function within a try-catch, and returns either the result * or returns {@link ERROR_OBJECT} if an error has occurred. * Use {@link resultIsError} to verify whether the result is an error or not. * * @param fn the user-provided function to wrap in some error handling for safety * @param args The arguments to execute the user-provided function with. */ export function tryUserFunction<R>(fn: (...args: any[]) => R, ...args: any[]): typeof ERROR_OBJECT | R { ERROR_OBJECT.error = null; let result: R; try { result = fn(...args); } catch (err) { ERROR_OBJECT.error = err; return ERROR_OBJECT; } return result; } /** * Used to verify whether the result of {@link tryUserFunction} * is an error or not. If this returns true, check {@link ERROR_OBJECT}'s * error property for the error value. */ export function resultIsError<R>(result: R|typeof ERROR_OBJECT): result is typeof ERROR_OBJECT { return result === ERROR_OBJECT; } |