Skip to content

参考

RxJS 7.x 常用 API 速查:创建函数、核心操作符、Subject 家族、调度器与子路径。所有操作符均从 'rxjs' 扁平导出,用 source$.pipe(...) 组合。

一、创建函数(Creation)

函数作用
of(a, b, c)把每个参数依次同步发出后 complete
from(input)把 Iterable / 数组 / Promise / Observable 转成流并逐项发出
fromEvent(target, name)把 DOM / 事件目标的事件包成流(自动 add/removeEventListener)
interval(period)period ms 发递增整数;首值滞后一个周期,永不自动完成
timer(due[, period])due 后发一个 0(单参);带 period 则之后周期发递增整数
range(start, count)同步发出一段连续整数
defer(factory)订阅时才调用工厂创建流,每次订阅生成新流(按订阅时刻求值)
EMPTY不发值、立即 complete
NEVER既不发值也不终止(永远挂起)
throwError(() => err)订阅时立即以指定错误 error(7.x 推荐传工厂函数)
iif(cond, a$, b$)订阅时按条件二选一

二、组合 / 合并(Combination)

函数 / 操作符行为
merge(a$, b$)并行订阅所有源,值交错发出
concat(a$, b$)串行:等前一个 complete 再订阅下一个,值不交错
combineLatest([a$, b$])每源都发过值后,任一源更新即组合「各源最新值」发出
withLatestFrom(other$)主源发值时输出,附带其它源最新值快照(不对称)
forkJoin([a$, b$])等所有源 complete,取各自最后值组合发一次(似 Promise.all
zip(a$, b$)按序「配对」各源的第 n 个值
race(a$, b$)采用最先发值的那个源,其余退订

三、变换 / 过滤(Transformation / Filtering)

操作符作用
map(fn)逐值变换,数量不变
filter(pred)只放行满足谓词的值
tap(fn)副作用(日志/调试),原样透传值、不改流
scan(acc, seed)每个值都发出当前累加结果(运行中的 reduce)
reduce(acc, seed)只在 complete 时发出最终累加结果一次
take(n) / takeUntil(n$) / takeWhile(p)取前 n 个 / 直到通知流发值 / 满足条件期间
skip(n) / first() / last()跳过 n 个 / 取第一个 / 取最后一个
distinctUntilChanged()连续重复(只比相邻)
startWith(...v)在源发值前先发出初始值
pairwise()把相邻两值配成 [prev, cur]
toArray()收集所有值,complete 时作为一个数组发出

四、高阶映射(Flattening,高频考点)

操作符新值到来时对未完成内层的处理典型场景
switchMap取消上一个内层,切到新的搜索补全(取最新、防竞态)
mergeMap并发保留所有内层(可传 concurrent 限流)并行请求
concatMap排队:等上一个完成再订阅下一个(保序不丢)顺序写操作
exhaustMap忽略新外层值直到当前内层完成防重复提交

xxxMapmap(...) + 对应的 xxxAllmergeAll/concatAll/switchAll/exhaustAll,用于已是高阶 Observable 的拍平)。

五、时间相关(Time-based,默认走 asyncScheduler)

操作符作用
debounceTime(d)源静默达 d ms 后发出最近一个值(搜索输入)
throttleTime(d[, sch, cfg])发一个值后窗口内忽略其余;{ leading, trailing } 控发头/发尾
auditTime(d)窗口结束时发出窗口内最近的值(采样最新)
sampleTime(d)d ms 采样一次最新值
delay(d)把每个值延迟 d ms 发出
bufferTime(d)d ms 内的所有值收集成数组整批发出(不丢值)

六、错误处理(Error Handling)

操作符作用
catchError((err, caught) => obs$)上游 error 时返回一个新 Observable 替代(降级 / 静默 / 重抛)
retry(count) / retry({ count, delay, resetOnSuccess })error 后重新订阅源;delay 可为 ms 或 (err, n) => 通知流
retryWhen(fn)⚠️ 已废弃(v9/v10 移除),改用 retry({ delay })
repeat(count)complete 后重新订阅源(轮询场景)
finalize(fn)源以任何方式终止(complete / error / 退订)时执行清理
timeout(due)超时未发值则 error

七、Subject 家族(多播)

类型特性
new Subject()多播;订阅者只收订阅之后的值;本身也是 Observer(可 next
new BehaviorSubject(init)需初始值;新订阅者立即收当前值.value / .getValue() 同步读
new ReplaySubject(n[, time])向新订阅者重放最近 n 个值(可带时间窗)
new AsyncSubject()仅在 complete 时发出最后一个值
subject.asObservable()返回只读 Observable 视图(隐藏 next,防外部写入)

八、多播操作符

操作符作用
share(config?)用 Subject 多播 + 引用计数,多订阅者共享一次源执行
shareReplay({ bufferSize, refCount })在多播之上缓存并重放给后来的订阅者(缓存 HTTP 响应)
connectable(source, { connector })需手动 .connect() 才开始多播,精确控制开闸时机
connect(selector)在局部把源多播给多个分支再合并

九、调度器(Scheduler)

调度器执行时机
asyncSchedulersetTimeout/setInterval(宏任务,时间相关默认)
asapScheduler微任务队列(尽快异步)
queueScheduler当前同步队列(递归调度时排队防栈溢出)
animationFrameSchedulerrequestAnimationFrame(动画)
VirtualTimeScheduler / TestScheduler虚拟时间(TestScheduler 来自 rxjs/testing,弹珠测试)

observeOn(sch) 控制其下游通知投递的上下文;subscribeOn(sch) 控制订阅/生产开始的上下文。

十、互操作与子路径

用法说明
firstValueFrom(obs[, {defaultValue}])Observable → Promise(第一个值,空流 reject EmptyError
lastValueFrom(obs[, {defaultValue}])Observable → Promise(最后一个值)
import { ... } from 'rxjs'7.x 扁平导入(操作符 + 创建函数)
'rxjs/operators'旧式子路径,仍可用但非首选
'rxjs/ajax' / 'rxjs/fetch'ajax / fromFetch HTTP 工具
'rxjs/testing'TestScheduler
'rxjs/webSocket'webSocket 客户端

API 查完,进 指南 · 进阶 看实战组合,或 指南 · 专家 看调度器与测试内核。