Skip to content

指南 · 专家

版本基线 RxJS 7.x。深入内核:调度器与执行时机、多播底层、TestScheduler 弹珠测试、自定义 pipeable 操作符、内存泄漏防治、以及 v6 → v7 破坏性变更全清单。

一、Scheduler:控制执行时机

Scheduler 不改变值的内容,只控制「订阅何时开始」与「通知何时投递」的执行上下文与时机——即「什么时候、在哪种执行队列上跑」。

调度器底层机制用途
asyncSchedulersetTimeout/setInterval(宏任务)基于时间的延迟(时间操作符默认)
asapScheduler微任务队列尽快异步,比 setTimeout 早
queueScheduler当前同步队列递归调度时排队,防栈溢出
animationFrameSchedulerrequestAnimationFrame动画
VirtualTimeScheduler / TestScheduler虚拟时钟测试时把时间同步化
ts
import { of, observeOn, subscribeOn, asyncScheduler } from 'rxjs';

of(1, 2, 3).pipe(observeOn(asyncScheduler)).subscribe(console.log);
// 「just after subscribe」会先打印,1/2/3 被异步投递到下一个宏任务
  • subscribeOn(sch):决定订阅 / 生产开始在哪个调度器执行(影响整链起点,一条链一个即可,放哪都作用于源)。
  • observeOn(sch):决定其位置之后的下游通知投递在哪个调度器执行(可放链中任意处,只影响下游)。

调度器调时机,不调线程

JS 是单线程,Scheduler 不能把同步代码变成真正并行。可用 asyncScheduler 把发值切片成异步任务、让出主线程,缓解卡顿;但 CPU 密集计算仍在主线程跑。真正并行需 Web Worker

二、多播内核:从 share 到 connectable

share() / shareReplay() 是日常多播首选。需要精确控制多播开始时机时,用 connectable

ts
import { connectable, Subject, interval, take } from 'rxjs';

const source$ = connectable(interval(1000).pipe(take(3)), {
  connector: () => new Subject(),
});
source$.subscribe((v) => console.log('A:', v));
source$.subscribe((v) => console.log('B:', v));
// 此时源还没开始执行!
source$.connect(); // 显式开闸,A、B 同时从头收到 0,1,2

connectable 让你「先把所有订阅者接好,再统一 connect() 开闸」,避免早订阅者错过值。它是已废弃的 multicast() / ConnectableObservable 系列的现代替代。

shareReplay({ bufferSize: 1, refCount: true }) 是缓存 HTTP 响应的常用配方:一次请求、结果共享并对后续订阅者重放。注意 refCount: false + 源不 complete 时会一直保活,可能内存泄漏。

三、TestScheduler:弹珠测试

测试基于时间的流(debounceTimeinterval)不应真的 setTimeout 等待。RxJS 提供 TestSchedulerrxjs/testing)+ 弹珠图,把时间虚拟化、同步执行:

ts
import { TestScheduler } from 'rxjs/testing';
import { throttleTime } from 'rxjs';

const scheduler = new TestScheduler((actual, expected) => {
  expect(actual).toEqual(expected); // 接入你的断言库
});

it('throttleTime', () => {
  scheduler.run(({ cold, time, expectObservable }) => {
    const source = ' -a--b--c---|';
    const t = time('   ---|       '); // t = 3 帧
    const expected = '-a-----c---|';
    expectObservable(cold(source).pipe(throttleTime(t))).toBe(expected);
  });
});

弹珠语法:- = 一帧时间流逝;字母/数字 = 发出一个值;| = complete;# = error;() = 同帧分组;^ = hot Observable 的订阅点。在 run() 回调里,所有基于 asyncScheduler 的时间操作都被虚拟时钟接管。

四、自定义 pipeable 操作符

一个 pipeable 操作符就是「接收源 Observable、返回新 Observable 的函数」:(source) => Observable。推荐用现有操作符通过独立 pipe 组合,而非手写底层 subscriber

ts
import { pipe, filter, map, type Observable } from 'rxjs';

// 用组合实现:只取偶数并翻倍
const evenDouble = () =>
  pipe(
    filter((x: number) => x % 2 === 0),
    map((x) => x * 2),
  );

source$.pipe(evenDouble()).subscribe(console.log);

独立 pipe(...ops)函数组合(把多个操作符组成一个可复用操作符),与具体 Observable 解耦;实例 source$.pipe(...) 则是直接应用到某条流。

五、内存泄漏防治清单

隐患防治
持续型流(interval/fromEvent)忘退订takeUntil(destroy$) / take / 手动 unsubscribe
长生命周期 Subject 持有未退订的订阅者订阅方用 takeUntil 退订;对外用 asObservable() 暴露只读
shareReplay 源不 complete + refCount:falserefCount:true 或确保源会终止
嵌套订阅(subscribe 里再 subscribe)改用高阶映射 switchMap/mergeMap,自动管理内层退订
Angular 组件优先模板 async 管道;或 takeUntilDestroyed()
ts
import { Subject, interval, takeUntil } from 'rxjs';

class Component {
  private destroy$ = new Subject<void>();
  init() {
    interval(1000).pipe(takeUntil(this.destroy$)).subscribe(/* ... */);
    // 多条业务流都 pipe(takeUntil(this.destroy$))
  }
  onDestroy() {
    this.destroy$.next();      // 一处触发,批量退订
    this.destroy$.complete();
  }
}

六、v6 → v7 破坏性变更全清单

变更说明
扁平导入操作符与创建函数从 'rxjs' 直接导出;'rxjs/operators' 仍可用但非首选
toPromise() 废弃改用 firstValueFrom/lastValueFrom;空流 reject EmptyError(旧版 resolve undefined),可传 { defaultValue }
subscribe 位置回调废弃subscribe(next, error, complete) 改为 subscribe({ next, error, complete });单 subscribe(fn) 仍合法
retryWhen 废弃改用 retry({ count, delay })delay 可为 ms 或 (err, n) => 通知流
多播 API 重构multicast/publish/refCount 系列废弃,改用 share/shareReplay/connectable/connect
pipeable 全面取代 patch不再支持 obs.map(...).filter(...) 原型链操作符
retry 负数行为移除旧的「传负数 = 无限重试」被移除
tree-shaking 优化纯函数操作符 + 扁平导出,未用到的可被摇掉,7.x 体积与类型推断更优

迁移建议:坚持「扁平导入 + pipeable 操作符 + firstValueFrom/lastValueFrom」,并把旧 toPromise/retryWhen/多播 patch 逐步替换。RxJS 8 仍在演进,上述废弃项多计划在 v8 移除。


回到 入门 复习三件套,或查 参考 速览 API 与调度器。