参考资料: 前端中的pipeline Node设计模式之流模式

简介

计算机领域的 Pipeline 通常认为起源于 Unix的管道运算符。
ls -l | grep key # 查看当前目录文件列表,然后搜索文件名含有key的文件
Pipeline的特点:

  1. 各自过程高内聚,专注于解决特定问题,Simple & Sharp
  2. 所有的子过程具有一致的接口,例如从标准输入读取数据,正常结果输出到标准输出,异常结果输出到标准错误
  3. 能够通过一定的形式将自过程组合起来解决复杂问题,例如pipe

事实上,Pipeline作为化整为零、去繁就简的重要手段,在前端中也有诸多应用。

什么是流?

不同于缓冲,流可以读取数据的一部分同时立即提供给后续处理,这样提高了内存的空间效率,和程序运行的时间效率

缓存的内存空间问题:假如文件数据过大(几百GB) ,那么读取整个文件到缓冲,再一次返回,可能导致内存溢出 缓存的运行时间问题:只有文件全部读取到内存中才能进行后续处理,文件过大就会阻塞程序运行,随饭异步方法读取不会阻塞,但仍会把文件整个读到内存中

使用流可以缓解缓冲的以上问题

流的特点

  • 组合性: like中间件,每个中间件指责单一,管道中上一个处理单元的输出流入下一个处理单元的输入。
  • 流的操作模式:对象模式和二进制模式。
  • 流的背压: 数据写入缓存的速度大于被读出的数独,那么缓冲就会积攒大量数组,占用大量内存。如果使用管道的pipe()方法, 那么不用自己解决背压问题。

    流的使用场景

  • 处理IO

  • 对流程控制: 顺序执行、无序并行执行、无序有限制并行执行、顺序并行执行
  • 流的管道模式: 组合流、复制流、合并流、复合分解

    Compose

    reduce loadsh 都有各自的compose方法 ```javascript function compose(…fns){ const len = fns.length; if(!len) return => ; if(len === 1) return fns[0];

    return fns.reduce((a, b) => (args) => a(b(args))); } compose(f1,f2)(2); // 等价于 f1(f2(2)) function f1 (arg){ return arg*3; } function f2 (arg) { return arg + 5; }

// 支持同步异步的compose function compose2(…fns){ const len = fns.length; if(!len) return => Promise.resolve(); if(len === 1) return (args) => Promise.resolve(fns0);

return fns.reduce( (a, b) => async (args) => a(await b(args)) ); } function f3(arg) { return arg * 2; } function f4(arg) { return new Promise(resolve => { setTimeout(() => { resolve(arg + 3); }, 2000) }) } compose2(f4, f3)(1).then(console.log)

  1. <a name="FGkMY"></a>
  2. # Middleware Pipeline
  3. <a name="FC0vR"></a>
  4. ## Express中的pipeline
  5. 先通过几行代码,感受下Expresss中间件散发的优雅气息
  6. ```javascript
  7. express()
  8. .use(bodyParser.json())
  9. .use(cookieParser())
  10. .use(session(sessionOptions))
  11. .use('api', apiRoutes)
  12. .use(errorHandler);

Vue3中的pipeline

  1. import {createApp} from 'vue';
  2. import App from './App.vue';
  3. createApp(App)
  4. .use(store)
  5. .use(router)
  6. .use(installGlobalComps)
  7. .use(Toast)
  8. .mount('#app')

Koa中间件

  1. function compose(...middlewares) {
  2. return function(ctx) {
  3. return dispatch(0);
  4. function dispatch(i) {
  5. const fn = middlewares[i];
  6. if(!fn) return Promise.resolve()
  7. return Promise.resolve(
  8. fn(ctx, function next() {
  9. return dispatch(i+1)
  10. })
  11. )
  12. }
  13. }
  14. }
  15. async function mid1(ctx, next){
  16. console.log('mid1 start');
  17. await next();
  18. console.log('mid2 end');
  19. }
  20. async function mid2(ctx, next) {
  21. console.log('mid2 start');
  22. await next();
  23. console.log('mid2 end');
  24. }
  25. compose(mid1, mid2)({/* ... */})

中间件优点

  • 代码简练、符合直觉。
  • 合理的错误处理。任意 Middleware 出现问题,会越过后续所有普通 Middleware,直接由 Error Middleware 进行处理

Node Stream Pipeline

image.png

gulp

凭借对 Stream 惟妙惟肖地运用,Gulp 在与配置为主的 Grunt 的竞争中迅速取得了领先优势。

  1. gulp.task('js', () => {
  2. return gulp.src('./js/src/*.coffee')
  3. .pipe(coffee())
  4. .pipe(uglify())
  5. .pipe(gulp.dest('./js/'));
  6. });

Browserify

Browserify 的设计目标是将 CommonJS 模块组织的 JS 代码打包为可以在浏览器中运行的代码。实现这一目标所需要做的工作非常复杂,因此 Browserify 将其拆解为职责单一的多个子过程,例如分析依赖、拓扑排序、模块去重、打包合并等,并通过 Stream Pipeline 打通整个流程。这使得整个代码的架构异常清晰,对将来的维护和优化提供了良好基础。

  1. var pipeline = splicer.obj([
  2. 'record', [ this._recorder() ],
  3. 'deps', [ this._mdeps ],
  4. 'json', [ this._json() ],
  5. 'unbom', [ this._unbom() ],
  6. 'unshebang', [ this._unshebang() ],
  7. 'syntax', [ this._syntax() ],
  8. 'sort', [ depsSort(dopts) ],
  9. 'dedupe', [ this._dedupe() ],
  10. 'label', [ this._label(opts) ],
  11. 'emit-deps', [ this._emitDeps() ],
  12. 'debug', [ this._debug(opts) ],
  13. 'pack', [ this._bpack ],
  14. 'wrap', []
  15. ]);

点睛之笔在于,这个基于 labeled-stream-splicer 实现的 pipeline 还支持动态修改和扩展,而且不仅在内部实现中多处应用,还暴露为外部接口方便调用方进行定制。下面这个示例展示了将 deps 子过程输出结果的 source 属性改为大写的逻辑:

  1. pipeline.get('deps').push(through.obj(function (row, enc, next) {
  2. row.source = row.source.toUpperCase();
  3. this.push(row);
  4. next();
  5. }));

Promise Pipeline

与中间件Pipeline有一些相同之处,如支持异步错误处理。
区别:
Promise 属于Monad(函数式编程中的一种概念,也一种设计模式。表示将一个运算过程,通过函数拆解成互相连接的多个步骤你只要提供下一步运算所需的函数,整个运算就会自动进行下去。

  1. 我们总是可以通过 Promise.resolve/Promise.reject 将非 promise 的值转换为 promise,而 promise.then/promise.catch 也总是返回一个新的 promise 从而方便链式调用。
  2. Promise 还有一个 Killer Feature:实现优雅降级等业务需求。

    Ramda Pipeline

    将对象转换为query字符串 ```javascript const obj = { foo: ‘bar’, baz: true, qux: 3.1415, };

const objToQueryStr = R.pipe( R.toPairs, R.map(R.join(‘=’)), R.join(‘&’) ); objToQueryStr(objToQueryStr); ```