来源
https://mp.weixin.qq.com/s/VZfwNlqSLzCK0hWSfG4Yzw

Go语言设计模式实战

嗯,Go设计模式实战系列,一个设计模式业务真实使用的golang系列。

前言

本系列主要分享,如何在我们的真实业务场景中使用设计模式。
本系列文章主要采用如下结构:

  • 什么是「XX设计模式」?
  • 什么真实业务场景可以使用「XX设计模式」?
  • 怎么用「XX设计模式」?

本文主要介绍「组合模式」结合Go语言天生的并发特性,如何在真实业务场景中使用。
之前文章《代码组件 | Go设计模式实战》已经介绍了「组合模式」的概念,以及在业务中的使用。今天我们结合Go语言天生的并发特性,升级「组合模式」为「并发组合模式」。
我们先来简单回顾下「组合模式」的知识,详细可以查看上篇文章《代码组件 | Go设计模式实战》

什么是「并发组合模式」?

组合模式的概念:

一个具有层级关系的对象由一系列拥有父子关系的对象通过树形结构组成。

并发组合模式的概念:

一个具有层级关系的对象由一系列拥有父子关系的对象通过树形结构组成,子对象即可被串行执行,也可被并发执行

并发组合模式的优势:

  • 原本串行的业务(存在阻塞的部分,比如网络IO等)可以被并发执行,利用多核优势提升性能。

    什么真实业务场景可以用「并发组合模式」?

    我们还是以「组合模式」中的“订单结算页面”为例,继续来看看某东的订单结算页面:
    7. 并发组件 - 图1

从页面的展示形式上,可以看出:

  • 页面由多个模块构成,比如:
    • 地址模块:获取用户地址数据
    • 支付方式模块:获取支付方式列表
    • 店铺模块:获取店铺、购物车选中商品等信息
    • 发票模块:获取发票类型列表
    • 优惠券模块:获取用户优惠券列表
    • 某豆模块:获取用户积分信息
    • 礼品卡模块:获取礼品卡列表列表
    • 订单详细金额模块:获取订单金额信息
  • 单个模块可以由多个子模块构成
    • 商品模块:获取购物车选中商品信息
    • 售后模块:获取商品售后信息
    • 优惠模块:获取商品参与的优惠活动信息
    • 物流模块:获取商品支持的配送方式列表
    • 店铺模块,又由如下模块构成:

按照「组合模式」的业务逻辑执行流程:
7. 并发组件 - 图2
但是,我们很清楚有些模块之间并没有依赖,且该模块涉及服务远程调用等阻塞操作,比如:

  • 地址模块调用地址服务获取用户地址数据时。
  • 支付方式模块也可以同时去读redis获取支付方式列表数据等等。

所以: 有的模块其实可以被并发的执行。
如果把上面不存在依赖关系的模块修改为并发的执行,则我们得到如下的执行流程:
7. 并发组件 - 图3

怎么用「并发组合模式」?

关于「并发组合模式」的建模过程完全可以参考之前文章《代码组件 | Go设计模式实战》,我们这里只说说需要着重注意的地方。
「并发组合模式」的核心还是Component组件接口,我们先看看「组合模式」的Component组件接口如下(再之前的文章上做了优化,进一步封装提取了BusinessLogicDo方法):

  1. // Component 组件接口
  2. type Component interface {
  3. // 添加一个子组件
  4. Mount(c Component, components ...Component) error
  5. // 移除一个子组件
  6. Remove(c Component) error
  7. // 执行当前组件业务和执行子组件
  8. // ctx 业务上下文
  9. // currentConponent 当前组件
  10. Do(ctx *Context, currentConponent Component) error
  11. // 执行当前组件业务业务逻辑
  12. BusinessLogicDo(ctx *Context) error
  13. // 执行子组件
  14. ChildsDo(ctx *Context) error
  15. }

再来看看「并发组合模式」的Component`组件接口,如下(重点看和「组合模式」的区别):

  1. // Component 组件接口
  2. type Component interface {
  3. // 添加一个子组件
  4. Mount(c Component, components ...Component) error
  5. // 移除一个子组件
  6. Remove(c Component) error
  7. // 执行当前组件业务:`BusinessLogicDo`和执行子组件:`ChildsDo`
  8. // ctx 业务上下文
  9. // currentConponent 当前组件
  10. // wg 父组件的WaitGroup对象
  11. // 区别1:增加了WaitGroup对象参数,目的是等待并发子组件的执行完成。
  12. Do(ctx *Context, currentConponent Component, wg *sync.WaitGroup) error
  13. // 执行当前组件业务逻辑
  14. // resChan 回写当前组件业务执行结果的channel
  15. // 区别2:增加了一个channel参数,目的是并发组件执行逻辑时引入了超时机制,需要一个channel接受组件的执行结果
  16. BusinessLogicDo(resChan chan interface{}) error
  17. // 执行子组件
  18. ChildsDo(ctx *Context) error
  19. }

我们详细再来看,相对于「组合模式」,引入并发之后需要着重关注如下几点:

  • 并发子组件需要设置超时时间:防止子组件执行时间过长,解决方案关键字context.WithTimeout
  • 区分普通组件和并发组件:合成复用基础组件,封装为并发基础组件
  • 拥有并发子组件的父组件需要等待并发子组件执行完毕(包含超时),解决方案关键字sync.WaitGroup
  • 并发子组件执行自身业务逻辑是需检测超时:防止子组件内部执行业务逻辑时间过长,解决方案关键字select<-ctx.Done()

    第一点:并发子组件需要设置超时时间

    1. // Context 业务上下文
    2. type Context struct {
    3. // context.WithTimeout派生的子上下文
    4. TimeoutCtx context.Context
    5. // 超时函数
    6. context.CancelFunc
    7. }

    第二点:区分普通组件和并发组件

    增加新的并发基础组件结构体BaseConcurrencyComponent,并合成复用「组合模式」中的基础组件BaseComponent,如下:

    1. // BaseConcurrencyComponent 并发基础组件
    2. type BaseConcurrencyComponent struct {
    3. // 合成复用基础组件
    4. BaseComponent
    5. // 当前组件是否有并发子组件
    6. HasChildConcurrencyComponents bool
    7. // 并发子组件列表
    8. ChildConcurrencyComponents []Component
    9. // wg 对象
    10. *sync.WaitGroup
    11. // 当前组件业务执行结果channel
    12. logicResChan chan interface{}
    13. // 当前组件执行过程中的错误信息
    14. Err error
    15. }

    第三点:拥有并发子组件的父组件需要等待并发子组件执行完毕(包含超时)

    修改「组合模式」中的ChildsDo方法,使其支持并发执行子组件,主要修改和实现如下:

  • 通过go关键字执行子组件

  • 通过*WaitGroup.Wait()等待子组件执行结果
    1. // ChildsDo 执行子组件
    2. func (bc *BaseConcurrencyComponent) ChildsDo(ctx *Context) (err error) {
    3. if bc.WaitGroup == nil {
    4. bc.WaitGroup = &sync.WaitGroup{}
    5. }
    6. // 执行并发子组件
    7. for _, childComponent := range bc.ChildConcurrencyComponents {
    8. bc.WaitGroup.Add(1)
    9. go childComponent.Do(ctx, childComponent, bc.WaitGroup)
    10. }
    11. // 执行子组件
    12. for _, childComponent := range bc.ChildComponents {
    13. if err = childComponent.Do(ctx, childComponent, nil); err != nil {
    14. return err
    15. }
    16. }
    17. if bc.HasChildConcurrencyComponents {
    18. // 等待并发组件执行结果
    19. bc.WaitGroup.Wait()
    20. }
    21. return
    22. }

    第四点:并发子组件执行自身业务逻辑是需检测超时

    select关键字context.WithTimeout()派生的子上下文Done()方案返回的channel,发生超时该channel会被关闭。具体实现代码如下:
    1. // Do 执行子组件
    2. // ctx 业务上下文
    3. // currentConponent 当前组件
    4. // wg 父组件的waitgroup对象
    5. func (bc *BaseConcurrencyComponent) Do(ctx *Context, currentConponent Component, wg *sync.WaitGroup) (err error) {
    6. defer wg.Done()
    7. // 初始化并发子组件channel
    8. if bc.logicResChan == nil {
    9. bc.logicResChan = make(chan interface{}, 1)
    10. }
    11. go currentConponent.BusinessLogicDo(bc.logicResChan)
    12. select {
    13. // 等待业务执行结果
    14. case <-bc.logicResChan:
    15. // 业务执行结果
    16. fmt.Println(runFuncName(), "bc.BusinessLogicDo wait.done...")
    17. break
    18. // 超时等待
    19. case <-ctx.TimeoutCtx.Done():
    20. // 超时退出
    21. fmt.Println(runFuncName(), "bc.BusinessLogicDo timeout...")
    22. bc.Err = ErrConcurrencyComponentTimeout
    23. break
    24. }
    25. // 执行子组件
    26. err = currentConponent.ChildsDo(ctx)
    27. return
    28. }

    代码demo

    1. package main
    2. import (
    3. "context"
    4. "errors"
    5. "fmt"
    6. "net/http"
    7. "reflect"
    8. "sync"
    9. "time"
    10. )
    11. //------------------------------------------------------------
    12. //Go设计模式实战系列
    13. //组合模式
    14. //@auhtor TIGERB<https://github.com/TIGERB>
    15. //------------------------------------------------------------
    16. //example:
    17. // 创建一个根组件
    18. // 如果子组件存在并发组件则父组件必须为并发组件
    19. // type RootComponent struct {
    20. // BaseConcurrencyComponent
    21. // }
    22. //
    23. // func (bc *RootComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
    24. // // do nothing
    25. // return
    26. // }
    27. //
    28. // 创建一个并发组件
    29. // type DemoConcurrenyComponent struct {
    30. // BaseConcurrencyComponent
    31. // }
    32. //
    33. // func (bc *DemoConcurrenyComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
    34. // // 并发组件业务逻辑填充到这
    35. // return
    36. // }
    37. //
    38. // 创建一个普通组件
    39. // type DemoComponent struct {
    40. // BaseComponent
    41. // }
    42. //
    43. // func (bc *DemoComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
    44. // // 普通组件业务逻辑填充到这
    45. // return
    46. // }
    47. //
    48. // // 普通组件
    49. // root.Mount(
    50. // &DemoComponent{},
    51. // )
    52. //
    53. // // 并发组件
    54. // root := &RootComponent{}
    55. // root.MountConcurrency(
    56. // &DemoConcurrenyComponent{},
    57. // )
    58. //
    59. // // 初始化业务上下文 并设置超时时间
    60. // ctx := GetContext(5 * time.Second)
    61. // defer ctx.CancelFunc()
    62. // // 开始执行子组件
    63. // root.ChildsDo(ctx)
    64. var (
    65. // ErrConcurrencyComponentTimeout 并发组件业务超时
    66. ErrConcurrencyComponentTimeout = errors.New("Concurrency Component Timeout")
    67. )
    68. // Context 业务上下文
    69. type Context struct {
    70. // context.WithTimeout派生的子上下文
    71. TimeoutCtx context.Context
    72. // 超时函数
    73. context.CancelFunc
    74. }
    75. // GetContext 获取业务上下文实例
    76. // d 超时时间
    77. func GetContext(d time.Duration) *Context {
    78. c := &Context{}
    79. c.TimeoutCtx, c.CancelFunc = context.WithTimeout(context.Background(), d)
    80. return c
    81. }
    82. // Component 组件接口
    83. type Component interface {
    84. // 添加一个子组件
    85. Mount(c Component, components ...Component) error
    86. // 移除一个子组件
    87. Remove(c Component) error
    88. // 执行当前组件业务:`BusinessLogicDo`和执行子组件:`ChildsDo`
    89. // ctx 业务上下文
    90. // currentConponent 当前组件
    91. // wg 父组件的waitgroup对象
    92. Do(ctx *Context, currentConponent Component, wg *sync.WaitGroup) error
    93. // 执行当前组件业务逻辑
    94. // resChan 回写当前组件业务执行结果的channel
    95. BusinessLogicDo(resChan chan interface{}) error
    96. // 执行子组件
    97. ChildsDo(ctx *Context) error
    98. }
    99. // BaseComponent 基础组件
    100. // 实现Add:添加一个子组件
    101. // 实现Remove:移除一个子组件
    102. type BaseComponent struct {
    103. // 子组件列表
    104. ChildComponents []Component
    105. }
    106. // Mount 挂载一个子组件
    107. func (bc *BaseComponent) Mount(c Component, components ...Component) (err error) {
    108. bc.ChildComponents = append(bc.ChildComponents, c)
    109. if len(components) == 0 {
    110. return
    111. }
    112. bc.ChildComponents = append(bc.ChildComponents, components...)
    113. return
    114. }
    115. // Remove 移除一个子组件
    116. func (bc *BaseComponent) Remove(c Component) (err error) {
    117. if len(bc.ChildComponents) == 0 {
    118. return
    119. }
    120. for k, childComponent := range bc.ChildComponents {
    121. if c == childComponent {
    122. fmt.Println(runFuncName(), "移除:", reflect.TypeOf(childComponent))
    123. bc.ChildComponents = append(bc.ChildComponents[:k], bc.ChildComponents[k+1:]...)
    124. }
    125. }
    126. return
    127. }
    128. // Do 执行子组件
    129. // ctx 业务上下文
    130. // currentConponent 当前组件
    131. // wg 父组件的waitgroup对象
    132. func (bc *BaseComponent) Do(ctx *Context, currentConponent Component, wg *sync.WaitGroup) (err error) {
    133. //执行当前组件业务代码
    134. err = currentConponent.BusinessLogicDo(nil)
    135. if err != nil {
    136. return err
    137. }
    138. // 执行子组件
    139. return currentConponent.ChildsDo(ctx)
    140. }
    141. // BusinessLogicDo 当前组件业务逻辑代码填充处
    142. func (bc *BaseComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
    143. // do nothing
    144. return
    145. }
    146. // ChildsDo 执行子组件
    147. func (bc *BaseComponent) ChildsDo(ctx *Context) (err error) {
    148. // 执行子组件
    149. for _, childComponent := range bc.ChildComponents {
    150. if err = childComponent.Do(ctx, childComponent, nil); err != nil {
    151. return err
    152. }
    153. }
    154. return
    155. }
    156. // BaseConcurrencyComponent 并发基础组件
    157. type BaseConcurrencyComponent struct {
    158. // 合成复用基础组件
    159. BaseComponent
    160. // 当前组件是否有并发子组件
    161. HasChildConcurrencyComponents bool
    162. // 并发子组件列表
    163. ChildConcurrencyComponents []Component
    164. // wg 对象
    165. *sync.WaitGroup
    166. // 当前组件业务执行结果channel
    167. logicResChan chan interface{}
    168. // 当前组件执行过程中的错误信息
    169. Err error
    170. }
    171. // Remove 移除一个子组件
    172. func (bc *BaseConcurrencyComponent) Remove(c Component) (err error) {
    173. if len(bc.ChildComponents) == 0 {
    174. return
    175. }
    176. for k, childComponent := range bc.ChildComponents {
    177. if c == childComponent {
    178. fmt.Println(runFuncName(), "移除:", reflect.TypeOf(childComponent))
    179. bc.ChildComponents = append(bc.ChildComponents[:k], bc.ChildComponents[k+1:]...)
    180. }
    181. }
    182. for k, childComponent := range bc.ChildConcurrencyComponents {
    183. if c == childComponent {
    184. fmt.Println(runFuncName(), "移除:", reflect.TypeOf(childComponent))
    185. bc.ChildConcurrencyComponents = append(bc.ChildComponents[:k], bc.ChildComponents[k+1:]...)
    186. }
    187. }
    188. return
    189. }
    190. // MountConcurrency 挂载一个并发子组件
    191. func (bc *BaseConcurrencyComponent) MountConcurrency(c Component, components ...Component) (err error) {
    192. bc.HasChildConcurrencyComponents = true
    193. bc.ChildConcurrencyComponents = append(bc.ChildConcurrencyComponents, c)
    194. if len(components) == 0 {
    195. return
    196. }
    197. bc.ChildConcurrencyComponents = append(bc.ChildConcurrencyComponents, components...)
    198. return
    199. }
    200. // ChildsDo 执行子组件
    201. func (bc *BaseConcurrencyComponent) ChildsDo(ctx *Context) (err error) {
    202. if bc.WaitGroup == nil {
    203. bc.WaitGroup = &sync.WaitGroup{}
    204. }
    205. // 执行并发子组件
    206. for _, childComponent := range bc.ChildConcurrencyComponents {
    207. bc.WaitGroup.Add(1)
    208. go childComponent.Do(ctx, childComponent, bc.WaitGroup)
    209. }
    210. // 执行子组件
    211. for _, childComponent := range bc.ChildComponents {
    212. if err = childComponent.Do(ctx, childComponent, nil); err != nil {
    213. return err
    214. }
    215. }
    216. if bc.HasChildConcurrencyComponents {
    217. // 等待并发组件执行结果
    218. bc.WaitGroup.Wait()
    219. }
    220. return
    221. }
    222. // Do 执行子组件
    223. // ctx 业务上下文
    224. // currentConponent 当前组件
    225. // wg 父组件的waitgroup对象
    226. func (bc *BaseConcurrencyComponent) Do(ctx *Context, currentConponent Component, wg *sync.WaitGroup) (err error) {
    227. defer wg.Done()
    228. // 初始化并发子组件channel
    229. if bc.logicResChan == nil {
    230. bc.logicResChan = make(chan interface{}, 1)
    231. }
    232. go currentConponent.BusinessLogicDo(bc.logicResChan)
    233. select {
    234. // 等待业务执行结果
    235. case <-bc.logicResChan:
    236. // 业务执行结果
    237. fmt.Println(runFuncName(), "bc.BusinessLogicDo wait.done...")
    238. break
    239. // 超时等待
    240. case <-ctx.TimeoutCtx.Done():
    241. // 超时退出
    242. fmt.Println(runFuncName(), "bc.BusinessLogicDo timeout...")
    243. bc.Err = ErrConcurrencyComponentTimeout
    244. break
    245. }
    246. // 执行子组件
    247. err = currentConponent.ChildsDo(ctx)
    248. return
    249. }
    250. // CheckoutPageComponent 订单结算页面组件
    251. type CheckoutPageComponent struct {
    252. // 合成复用基础组件
    253. BaseConcurrencyComponent
    254. }
    255. // BusinessLogicDo 当前组件业务逻辑代码填充处
    256. func (bc *CheckoutPageComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
    257. // 当前组件的业务逻辑写这
    258. fmt.Println(runFuncName(), "订单结算页面组件...")
    259. return
    260. }
    261. // AddressComponent 地址组件
    262. type AddressComponent struct {
    263. // 合成复用基础组件
    264. BaseConcurrencyComponent
    265. }
    266. // BusinessLogicDo 并发组件实际填充业务逻辑的地方
    267. func (bc *AddressComponent) BusinessLogicDo(resChan chan interface{}) error {
    268. fmt.Println(runFuncName(), "地址组件...")
    269. fmt.Println(runFuncName(), "获取地址信息 ing...")
    270. // 模拟远程调用地址服务
    271. http.Get("http://example.com/")
    272. resChan <- struct{}{} // 写入业务执行结果
    273. fmt.Println(runFuncName(), "获取地址信息 done...")
    274. return nil
    275. }
    276. // PayMethodComponent 支付方式组件
    277. type PayMethodComponent struct {
    278. // 合成复用基础组件
    279. BaseConcurrencyComponent
    280. }
    281. // BusinessLogicDo 并发组件实际填充业务逻辑的地方
    282. func (bc *PayMethodComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
    283. // 当前组件的业务逻辑写这
    284. fmt.Println(runFuncName(), "支付方式组件...")
    285. fmt.Println(runFuncName(), "获取支付方式 ing...")
    286. // 模拟远程调用地址服务 略
    287. resChan <- struct{}{}
    288. fmt.Println(runFuncName(), "获取支付方式 done...")
    289. return nil
    290. }
    291. // StoreComponent 店铺组件
    292. type StoreComponent struct {
    293. // 合成复用基础组件
    294. BaseComponent
    295. }
    296. // BusinessLogicDo 并发组件实际填充业务逻辑的地方
    297. func (bc *StoreComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
    298. // 当前组件的业务逻辑写这
    299. fmt.Println(runFuncName(), "店铺组件...")
    300. return
    301. }
    302. // SkuComponent 商品组件
    303. type SkuComponent struct {
    304. // 合成复用基础组件
    305. BaseComponent
    306. }
    307. // BusinessLogicDo 并发组件实际填充业务逻辑的地方
    308. func (bc *SkuComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
    309. // 当前组件的业务逻辑写这
    310. fmt.Println(runFuncName(), "商品组件...")
    311. return
    312. }
    313. // PromotionComponent 优惠信息组件
    314. type PromotionComponent struct {
    315. // 合成复用基础组件
    316. BaseComponent
    317. }
    318. // BusinessLogicDo 并发组件实际填充业务逻辑的地方
    319. func (bc *PromotionComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
    320. // 当前组件的业务逻辑写这
    321. fmt.Println(runFuncName(), "优惠信息组件...")
    322. return
    323. }
    324. // ExpressComponent 物流组件
    325. type ExpressComponent struct {
    326. // 合成复用基础组件
    327. BaseComponent
    328. }
    329. // BusinessLogicDo 并发组件实际填充业务逻辑的地方
    330. func (bc *ExpressComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
    331. // 当前组件的业务逻辑写这
    332. fmt.Println(runFuncName(), "物流组件...")
    333. return
    334. }
    335. // AftersaleComponent 售后组件
    336. type AftersaleComponent struct {
    337. // 合成复用基础组件
    338. BaseComponent
    339. }
    340. // BusinessLogicDo 并发组件实际填充业务逻辑的地方
    341. func (bc *AftersaleComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
    342. // 当前组件的业务逻辑写这
    343. fmt.Println(runFuncName(), "售后组件...")
    344. return
    345. }
    346. // InvoiceComponent 发票组件
    347. type InvoiceComponent struct {
    348. // 合成复用基础组件
    349. BaseConcurrencyComponent
    350. }
    351. // BusinessLogicDo 并发组件实际填充业务逻辑的地方
    352. func (bc *InvoiceComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
    353. // 当前组件的业务逻辑写这
    354. fmt.Println(runFuncName(), "发票组件...")
    355. fmt.Println(runFuncName(), "获取发票信息 ing...")
    356. // 模拟远程调用地址服务 略
    357. resChan <- struct{}{} // 写入业务执行结果
    358. fmt.Println(runFuncName(), "获取发票信息 done...")
    359. return
    360. }
    361. // CouponComponent 优惠券组件
    362. type CouponComponent struct {
    363. // 合成复用基础组件
    364. BaseConcurrencyComponent
    365. }
    366. // BusinessLogicDo 并发组件实际填充业务逻辑的地方
    367. func (bc *CouponComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
    368. // 当前组件的业务逻辑写这
    369. fmt.Println(runFuncName(), "优惠券组件...")
    370. fmt.Println(runFuncName(), "获取最优优惠券 ing...")
    371. // 模拟远程调用优惠券服务
    372. http.Get("http://example.com/")
    373. // 写入业务执行结果
    374. resChan <- struct{}{}
    375. fmt.Println(runFuncName(), "获取最优优惠券 done...")
    376. return
    377. }
    378. // GiftCardComponent 礼品卡组件
    379. type GiftCardComponent struct {
    380. // 合成复用基础组件
    381. BaseConcurrencyComponent
    382. }
    383. // BusinessLogicDo 并发组件实际填充业务逻辑的地方
    384. func (bc *GiftCardComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
    385. // 当前组件的业务逻辑写这
    386. fmt.Println(runFuncName(), "礼品卡组件...")
    387. fmt.Println(runFuncName(), "获取礼品卡信息 ing...")
    388. // 模拟远程调用地址服务 略
    389. resChan <- struct{}{} // 写入业务执行结果
    390. fmt.Println(runFuncName(), "获取礼品卡信息 done...")
    391. return
    392. }
    393. // OrderComponent 订单金额详细信息组件
    394. type OrderComponent struct {
    395. // 合成复用基础组件
    396. BaseComponent
    397. }
    398. // BusinessLogicDo 当前组件业务逻辑代码填充处
    399. func (bc *OrderComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
    400. // 当前组件的业务逻辑写这
    401. fmt.Println(runFuncName(), "订单金额详细信息组件...")
    402. return
    403. }
    404. // Demo 示例
    405. func Demo() {
    406. // 初始化订单结算页面 这个大组件
    407. checkoutPage := &CheckoutPageComponent{}
    408. // 挂载子组件
    409. storeComponent := &StoreComponent{}
    410. skuComponent := &SkuComponent{}
    411. skuComponent.Mount(
    412. &PromotionComponent{},
    413. &AftersaleComponent{},
    414. )
    415. storeComponent.Mount(
    416. skuComponent,
    417. &ExpressComponent{},
    418. )
    419. // ---挂载组件---
    420. // 普通组件
    421. checkoutPage.Mount(
    422. storeComponent,
    423. &OrderComponent{},
    424. )
    425. // 并发组件
    426. checkoutPage.MountConcurrency(
    427. &AddressComponent{},
    428. &PayMethodComponent{},
    429. &InvoiceComponent{},
    430. &CouponComponent{},
    431. &GiftCardComponent{},
    432. )
    433. // 初始化业务上下文 并设置超时时间
    434. ctx := GetContext(5 * time.Second)
    435. defer ctx.CancelFunc()
    436. // 开始构建页面组件数据
    437. checkoutPage.ChildsDo(ctx)
    438. }
    439. func main() {
    440. runtime.GOMAXPROCS(runtime.NumCPU() - 1)
    441. DemoConcurrency(
    442. }
    443. // 获取正在运行的函数名
    444. func runFuncName() string {
    445. pc := make([]uintptr, 1)
    446. runtime.Callers(2, pc)
    447. f := runtime.FuncForPC(pc[0])
    448. return f.Name()
    449. return ""
    450. }
    代码运行结果:
    1. Running] go run "../easy-tips/go/patterns/composite/concurrency/composite-concurrency.go"
    2. main.(*StoreComponent).BusinessLogicDo 店铺组件...
    3. main.(*SkuComponent).BusinessLogicDo 商品组件...
    4. main.(*PromotionComponent).BusinessLogicDo 优惠信息组件...
    5. main.(*AftersaleComponent).BusinessLogicDo 售后组件...
    6. main.(*ExpressComponent).BusinessLogicDo 物流组件...
    7. main.(*OrderComponent).BusinessLogicDo 订单金额详细信息组件...
    8. main.(*PayMethodComponent).BusinessLogicDo 支付方式组件...
    9. main.(*PayMethodComponent).BusinessLogicDo 获取支付方式 ing...
    10. main.(*InvoiceComponent).BusinessLogicDo 发票组件...
    11. main.(*InvoiceComponent).BusinessLogicDo 获取发票信息 ing...
    12. main.(*GiftCardComponent).BusinessLogicDo 礼品卡组件...
    13. main.(*GiftCardComponent).BusinessLogicDo 获取礼品卡信息 ing...
    14. main.(*CouponComponent).BusinessLogicDo 优惠券组件...
    15. main.(*CouponComponent).BusinessLogicDo 获取发票信息 ing...
    16. main.(*AddressComponent).BusinessLogicDo 地址组件...
    17. main.(*AddressComponent).BusinessLogicDo 获取地址信息 ing...
    18. main.(*InvoiceComponent).BusinessLogicDo 获取发票信息 done...
    19. main.(*BaseConcurrencyComponent).Do bc.BusinessLogicDo wait.done...
    20. main.(*BaseConcurrencyComponent).Do bc.BusinessLogicDo wait.done...
    21. main.(*PayMethodComponent).BusinessLogicDo 获取支付方式 done...
    22. main.(*AddressComponent).BusinessLogicDo 获取地址信息 done...
    23. main.(*BaseConcurrencyComponent).Do bc.BusinessLogicDo wait.done...
    24. main.(*CouponComponent).BusinessLogicDo 获取发票信息 done...
    25. main.(*BaseConcurrencyComponent).Do bc.BusinessLogicDo wait.done...
    26. main.(*GiftCardComponent).BusinessLogicDo 获取礼品卡信息 done...
    27. main.(*BaseConcurrencyComponent).Do bc.BusinessLogicDo wait.done...

    「组合模式」和「并发组合模式」基准测试对比

    基准测试代码:
    1. package composite
    2. import (
    3. "easy-tips/go/patterns/composite/concurrency"
    4. "easy-tips/go/patterns/composite/normal"
    5. "runtime"
    6. "testing"
    7. )
    8. // go test -benchmem -run=^$ easy-tips/go/patterns/composite -bench . -v -count=1 --benchtime 20s
    9. func Benchmark_Normal(b *testing.B) {
    10. b.SetParallelism(runtime.NumCPU())
    11. b.RunParallel(func(pb *testing.PB) {
    12. for pb.Next() {
    13. normal.Demo()
    14. }
    15. })
    16. }
    17. func Benchmark_Concurrency(b *testing.B) {
    18. b.SetParallelism(runtime.NumCPU())
    19. b.RunParallel(func(pb *testing.PB) {
    20. for pb.Next() {
    21. concurrency.Demo()
    22. }
    23. })
    24. }
    本地机器Benchmark对比测试结果:
    1. (TIGERB) 🤔 composite git:(master) go test -benchmem -run=^$ easy-tips/go/patterns/composite -bench . -v -count=1 --benchtime 20s
    2. goos: darwin
    3. goarch: amd64
    4. pkg: easy-tips/go/patterns/composite
    5. Benchmark_Normal-4 376 56666895 ns/op 35339 B/op 286 allocs/op
    6. Benchmark_Concurrency-4 715 32669301 ns/op 36445 B/op 299 allocs/op
    7. PASS
    8. ok easy-tips/go/patterns/composite 68.835s
    从上面的基准测试结果可以看出来Benchmark_Concurrency-4平均每次的执行时间是32669301 ns是要优于Benchmark_Normal56666895 ns

    结语

    「并发组合模式」是一个由特定的设计模式结合Go语言天生的并发特性,通过适当封装形成的“新模式”。

    附录「并发组合模式」的基础代码模板与使用说明

    1. //------------------------------------------------------------
    2. //Go设计模式实战系列
    3. //组合模式
    4. //@auhtor TIGERB<https://github.com/TIGERB>
    5. //------------------------------------------------------------
    6. //example:
    7. // 创建一个根组件
    8. // 如果子组件存在并发组件则父组件必须为并发组件
    9. // type RootComponent struct {
    10. // BaseConcurrencyComponent
    11. // }
    12. //
    13. // func (bc *RootComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
    14. // // do nothing
    15. // return
    16. // }
    17. //
    18. // 创建一个并发组件
    19. // type DemoConcurrenyComponent struct {
    20. // BaseConcurrencyComponent
    21. // }
    22. //
    23. // func (bc *DemoConcurrenyComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
    24. // // 并发组件业务逻辑填充到这
    25. // return
    26. // }
    27. //
    28. // 创建一个普通组件
    29. // type DemoComponent struct {
    30. // BaseComponent
    31. // }
    32. //
    33. // func (bc *DemoComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
    34. // // 普通组件业务逻辑填充到这
    35. // return
    36. // }
    37. //
    38. // // 普通组件
    39. // root.Mount(
    40. // &DemoComponent{},
    41. // )
    42. //
    43. // // 并发组件
    44. // root := &RootComponent{}
    45. // root.MountConcurrency(
    46. // &DemoConcurrenyComponent{},
    47. // )
    48. //
    49. // // 初始化业务上下文 并设置超时时间
    50. // ctx := GetContext(5 * time.Second)
    51. // defer ctx.CancelFunc()
    52. // // 开始执行子组件
    53. // root.ChildsDo(ctx)
    54. var (
    55. // ErrConcurrencyComponentTimeout 并发组件业务超时
    56. ErrConcurrencyComponentTimeout = errors.New("Concurrency Component Timeout")
    57. )
    58. // Context 业务上下文
    59. type Context struct {
    60. // context.WithTimeout派生的子上下文
    61. TimeoutCtx context.Context
    62. // 超时函数
    63. context.CancelFunc
    64. }
    65. // GetContext 获取业务上下文实例
    66. // d 超时时间
    67. func GetContext(d time.Duration) *Context {
    68. c := &Context{}
    69. c.TimeoutCtx, c.CancelFunc = context.WithTimeout(context.Background(), d)
    70. return c
    71. }
    72. // Component 组件接口
    73. type Component interface {
    74. // 添加一个子组件
    75. Mount(c Component, components ...Component) error
    76. // 移除一个子组件
    77. Remove(c Component) error
    78. // 执行当前组件业务:`BusinessLogicDo`和执行子组件:`ChildsDo`
    79. // ctx 业务上下文
    80. // currentConponent 当前组件
    81. // wg 父组件的waitgroup对象
    82. Do(ctx *Context, currentConponent Component, wg *sync.WaitGroup) error
    83. // 执行当前组件业务逻辑
    84. // resChan 回写当前组件业务执行结果的channel
    85. BusinessLogicDo(resChan chan interface{}) error
    86. // 执行子组件
    87. ChildsDo(ctx *Context) error
    88. }
    89. // BaseComponent 基础组件
    90. // 实现Add:添加一个子组件
    91. // 实现Remove:移除一个子组件
    92. type BaseComponent struct {
    93. // 子组件列表
    94. ChildComponents []Component
    95. }
    96. // Mount 挂载一个子组件
    97. func (bc *BaseComponent) Mount(c Component, components ...Component) (err error) {
    98. bc.ChildComponents = append(bc.ChildComponents, c)
    99. if len(components) == 0 {
    100. return
    101. }
    102. bc.ChildComponents = append(bc.ChildComponents, components...)
    103. return
    104. }
    105. // Remove 移除一个子组件
    106. func (bc *BaseComponent) Remove(c Component) (err error) {
    107. if len(bc.ChildComponents) == 0 {
    108. return
    109. }
    110. for k, childComponent := range bc.ChildComponents {
    111. if c == childComponent {
    112. fmt.Println(runFuncName(), "移除:", reflect.TypeOf(childComponent))
    113. bc.ChildComponents = append(bc.ChildComponents[:k], bc.ChildComponents[k+1:]...)
    114. }
    115. }
    116. return
    117. }
    118. // Do 执行子组件
    119. // ctx 业务上下文
    120. // currentConponent 当前组件
    121. // wg 父组件的waitgroup对象
    122. func (bc *BaseComponent) Do(ctx *Context, currentConponent Component, wg *sync.WaitGroup) (err error) {
    123. //执行当前组件业务代码
    124. err = currentConponent.BusinessLogicDo(nil)
    125. if err != nil {
    126. return err
    127. }
    128. // 执行子组件
    129. return currentConponent.ChildsDo(ctx)
    130. }
    131. // BusinessLogicDo 当前组件业务逻辑代码填充处
    132. func (bc *BaseComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
    133. // do nothing
    134. return
    135. }
    136. // ChildsDo 执行子组件
    137. func (bc *BaseComponent) ChildsDo(ctx *Context) (err error) {
    138. // 执行子组件
    139. for _, childComponent := range bc.ChildComponents {
    140. if err = childComponent.Do(ctx, childComponent, nil); err != nil {
    141. return err
    142. }
    143. }
    144. return
    145. }
    146. // BaseConcurrencyComponent 并发基础组件
    147. type BaseConcurrencyComponent struct {
    148. // 合成复用基础组件
    149. BaseComponent
    150. // 当前组件是否有并发子组件
    151. HasChildConcurrencyComponents bool
    152. // 并发子组件列表
    153. ChildConcurrencyComponents []Component
    154. // wg 对象
    155. *sync.WaitGroup
    156. // 当前组件业务执行结果channel
    157. logicResChan chan interface{}
    158. // 当前组件执行过程中的错误信息
    159. Err error
    160. }
    161. // Remove 移除一个子组件
    162. func (bc *BaseConcurrencyComponent) Remove(c Component) (err error) {
    163. if len(bc.ChildComponents) == 0 {
    164. return
    165. }
    166. for k, childComponent := range bc.ChildComponents {
    167. if c == childComponent {
    168. fmt.Println(runFuncName(), "移除:", reflect.TypeOf(childComponent))
    169. bc.ChildComponents = append(bc.ChildComponents[:k], bc.ChildComponents[k+1:]...)
    170. }
    171. }
    172. for k, childComponent := range bc.ChildConcurrencyComponents {
    173. if c == childComponent {
    174. fmt.Println(runFuncName(), "移除:", reflect.TypeOf(childComponent))
    175. bc.ChildConcurrencyComponents = append(bc.ChildComponents[:k], bc.ChildComponents[k+1:]...)
    176. }
    177. }
    178. return
    179. }
    180. // MountConcurrency 挂载一个并发子组件
    181. func (bc *BaseConcurrencyComponent) MountConcurrency(c Component, components ...Component) (err error) {
    182. bc.HasChildConcurrencyComponents = true
    183. bc.ChildConcurrencyComponents = append(bc.ChildConcurrencyComponents, c)
    184. if len(components) == 0 {
    185. return
    186. }
    187. bc.ChildConcurrencyComponents = append(bc.ChildConcurrencyComponents, components...)
    188. return
    189. }
    190. // ChildsDo 执行子组件
    191. func (bc *BaseConcurrencyComponent) ChildsDo(ctx *Context) (err error) {
    192. if bc.WaitGroup == nil {
    193. bc.WaitGroup = &sync.WaitGroup{}
    194. }
    195. // 执行并发子组件
    196. for _, childComponent := range bc.ChildConcurrencyComponents {
    197. bc.WaitGroup.Add(1)
    198. go childComponent.Do(ctx, childComponent, bc.WaitGroup)
    199. }
    200. // 执行子组件
    201. for _, childComponent := range bc.ChildComponents {
    202. if err = childComponent.Do(ctx, childComponent, nil); err != nil {
    203. return err
    204. }
    205. }
    206. if bc.HasChildConcurrencyComponents {
    207. // 等待并发组件执行结果
    208. bc.WaitGroup.Wait()
    209. }
    210. return
    211. }
    212. // Do 执行子组件
    213. // ctx 业务上下文
    214. // currentConponent 当前组件
    215. // wg 父组件的waitgroup对象
    216. func (bc *BaseConcurrencyComponent) Do(ctx *Context, currentConponent Component, wg *sync.WaitGroup) (err error) {
    217. defer wg.Done()
    218. // 初始化并发子组件channel
    219. if bc.logicResChan == nil {
    220. bc.logicResChan = make(chan interface{}, 1)
    221. }
    222. go currentConponent.BusinessLogicDo(bc.logicResChan)
    223. select {
    224. // 等待业务执行结果
    225. case <-bc.logicResChan:
    226. // 业务执行结果
    227. fmt.Println(runFuncName(), "bc.BusinessLogicDo wait.done...")
    228. break
    229. // 超时等待
    230. case <-ctx.TimeoutCtx.Done():
    231. // 超时退出
    232. fmt.Println(runFuncName(), "bc.BusinessLogicDo timeout...")
    233. bc.Err = ErrConcurrencyComponentTimeout
    234. break
    235. }
    236. // 执行子组件
    237. err = currentConponent.ChildsDo(ctx)
    238. return
    239. }
    1. 特别说明:
    2. 本系列的一些设计模式的概念可能和原概念存在差异,因为会结合实际使用,取其精华,适当改变,灵活使用。

Go设计模式实战系列 更多文章

image.gif

image.gif

image.gif
收录于话题 #Go语言设计模式实战
7个
下一篇
阅读原文

微信扫一扫
关注该公众号