fx 是一个完备的流式处理组件。 和 MapReduce 类似的,fx 也存在并发处理的函数:Parallel(fn, options)。但同时它也不只有并发处理。From(chan) ,Map(fn),Filter(fn),Reduce(fn) 等,从数据源读取成流,到处理流数据,最后聚合流数据。是不是有点像 Java Lambda ,如果你之前是 Java 开发者,看到这也就明白整个基本设计。
整体API
还是从整体上概览 fx 到底是怎么构建的: 
被标注的部分就是整个 fx 最重要的部分:
- 由 From(fn) 这类API,产生数据流 Stream
 - 对Stream 转换,聚合,求值的API集合
 
所以列举出目前支持的 Stream API:
| API | 作用 | 
|---|---|
| Distinct(fn) | fn中选定特定item类型,对其去重 | 
| Filter(fn, option) | fn指定具体规则,满足规则的element传递给下一个 stream | 
| Group(fn) | 根据fn把stream中的element分到不同的组中 | 
| Head(num) | 取出stream中前 num 个element ,生成一个新的stream | 
| Map(fn, option) | 将每个ele转换为另一个对应的ele, 传递给下一个 stream | 
| Merge() | 将所有ele合并到一个slice中并生成一个新stream | 
| Reverse() | 反转stream中的element。【使用双指针】 | 
| Sort(fn) | 按照 fn 排序stream中的element | 
| Tail(num) | 取出stream最后的 num 个element,生成一个新 stream。【使用双向环状链表】 | 
| Walk(fn, option) | 把 fn 作用在 source 的每个元素。生成新的 stream | 
不再生成新的 stream,做最后的求值操作:
| API | 作用 | 
|---|---|
| ForAll(fn) | 按照fn处理stream,且不再产生stream【求值操作】 | 
| ForEach(fn) | 对 stream 中所有 element 执行fn【求值操作】 | 
| Parallel(fn, option) | 将给定的fn与给定的worker数量并发应用于每个element【求值操作】 | 
| Reduce(fn) | 直接处理stream【求值操作】 | 
| Done() | 啥也不做,等待所有的操作完成 | 
如何使用
result := make(map[string]string)fx.From(func(source chan<- interface{}) {for _, item := range data {source <- item}}).Walk(func(item interface{}, pipe chan<- interface{}) {each := item.(*model.ClassData)class, err := l.rpcLogic.GetClassInfo()if err != nil {l.Errorf("get class %s failed: %s", each.ClassId, err.Error())return}students, err := l.rpcLogic.GetUsersInfo(class.ClassId)if err != nil {l.Errorf("get students %s failed: %s", each.ClassId, err.Error())return}pipe <- &classObj{classId: each.ClassIdstudentIds: students}}).ForEach(func(item interface{}) {o := item.(*classObj)result[o.classId] = o.studentIds})
- From() 从一个 slice 生成 stream
 - Walk() 接收与一个 stream ,对流中每个 ele 转换重组,生成新的 stream
 - 最后由 求值操作 把 stream 输出(fmt.Println),存储(map,slice),持久化(db操作)
 
简要分析
fx 中的函数命名语义化,开发者只需要知道业务逻辑需要什么样的转换,调用与之匹配的函数即可。
所以这里只简要分析几个比较典型的函数。
Walk()
Walk() 在整个 fx 被多个函数当成底层实现,Map(), Filter() 等。
所以本质就是:Walk() 负责并发将传进来的函数作用在 输入流 的每个 ele,并 生成新的 stream。
跟到源码,分成两个子函数:自定义 worker 数,默认 worker 数
// 自定义 workder 数func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {pipe := make(chan interface{}, option.workers)go func() {var wg sync.WaitGroup// channel<- 如果达到设定workers数,channel阻塞,从而达到控制并发数。// 简易的 goroutine poolpool := make(chan lang.PlaceholderType, option.workers)for {// 每一次for循环都会开启一个goroutine。如果达到workers数,从而阻塞pool <- lang.Placeholderitem, ok := <-p.sourceif !ok {<-poolbreak}// 使用 waitgroup 保证任务完成的完整性wg.Add(1)threading.GoSafe(func() {defer func() {wg.Done()// 归还<-pool}()fn(item, pipe)})}wg.Wait()close(pipe)}()return Range(pipe)}
- 使用 有缓冲channel 做并发队列,限制并发数
 - waitgroup 保证任务完成的完整性
 
另外一个 walkUnlimited():也使用了 waitgroup 做并发控制,因为没有自定义并发数限制,所以也就没有另外一个 channel 做并发数控制。
Tail()
介绍这个主要是里面运用了 ring 这个双向链表,其中的简单算法还是很有意思的。
func (p Stream) Tail(n int64) Stream {source := make(chan interface{})go func() {ring := collection.NewRing(int(n))// “顺序”插入,源的顺序和ring的顺序一致for item := range p.source {ring.Add(item)}// 取出 ring 中全部的 itemfor _, item := range ring.Take() {source <- item}close(source)}()return Range(source)}
至于为什么 Tail() 可以做到把源的后n个取出来,这个就留给大家去细品了。这里给出我的理解: 
Tip 假设有以下这个场景,Tail(5)
- stream size :7
 - ring size:5
 
这里可以使用把环状链表拉开的方式,环转线,此时以全部长度划分对称轴,翻转多余的元素,之后的元素就是 Tail(5) 需要的部分了。
Tip 这里采用图的方式更清晰的表现,不过代码大家也要看看。算法要考的
Stream Transform Design
分析整个 fx ,会发现整体设计遵循一个设计模版:
func (p Stream) Transform(fn func(item interface{}) interface{}) Stream {// make channelsource := make(chan interface{})// goroutine workergo func() {// tranformfor item := range p.source {...source <- item...}...// 关闭输入,但是依然可以从这个 stream 输出。防止内存泄漏close(source)}()// channel -> streamreturn Range(source)}
- channel 作为流的容器
 - 开 goroutine 对 source 做转换,聚合,输送到 channel
 - 处理完毕,close(outputStream)
 - channel -> stream
总结
到这就把 fx 基本介绍完了,如果你对其他API源码感兴趣,可以跟着上面的 API 列表挨个读一读。 
同时也建议大家把 java stream 的API大致看看,对这种 stream call 理解可以更加深 。
同时在 go-zero 中还有很多实用的组件工具,用好工具对于提升服务性能和开发效率都有很大的帮助,希望本篇文章能给大家带来一些收获。
参考资料
示例
package mainimport ("fmt""github.com/tal-tech/go-zero/core/fx")func main() {result, err := fx.From(func(source chan<- interface{}) {for i := 0; i < 10; i++ {source <- i}}).Map(func(item interface{}) interface{} {i := item.(int)return i * i}).Filter(func(item interface{}) bool {i := item.(int)return i%2 == 0}).Distinct(func(item interface{}) interface{} {return item}).Reduce(func(pipe <-chan interface{}) (interface{}, error) {var result intfor item := range pipe {i := item.(int)result += i}return result, nil})if err != nil {fmt.Println(err)} else {fmt.Println(result)}}
PS D:\Projects\Github\NoobWu\zero-examples\fx> go run .\square.go120
