1. package main
    2. import (
    3. "fmt"
    4. "strings"
    5. "time"
    6. )
    7. func main() {
    8. p := obServer.NewPublisher(100*time.Millisecond, 10)
    9. defer p.Close()
    10. all := p.Subscribe()
    11. golang := p.SubscribeTopic(func(v interface{}) bool {
    12. if s, ok := v.(string); ok {
    13. return strings.Contains(s, "golang")
    14. }
    15. return false
    16. })
    17. p.Publish("hello, world!")
    18. p.Publish("hello, golang!")
    19. go func() {
    20. for msg := range all {
    21. fmt.Println("all:", msg)
    22. }
    23. }()
    24. go func() {
    25. for msg := range golang {
    26. fmt.Println("golang:", msg)
    27. }
    28. }()
    29. // 运行一定时间后退出
    30. time.Sleep(3 * time.Second)
    31. }
    1. package main
    2. import (
    3. "sync"
    4. "time"
    5. )
    6. type (
    7. subscriber chan interface{} // 为订阅者提供的通道类型
    8. topicFunc func(v interface{}) bool // 为订阅者提供的过滤器类型
    9. )
    10. // 发布者对象
    11. type Publisher struct {
    12. m sync.RWMutex // 读写锁
    13. buffer int // 订阅队列的缓存大小
    14. timeout time.Duration // 发布超时时间
    15. subscribers map[subscriber]topicFunc // 订阅者信息
    16. }
    17. // 构建一个发布者对象, 可以设置发布超时时间和缓存队列的长度
    18. func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
    19. return &Publisher{
    20. buffer: buffer,
    21. timeout: publishTimeout,
    22. subscribers: make(map[subscriber]topicFunc),
    23. }
    24. }
    25. // 添加一个新的订阅者
    26. func (p *Publisher) Subscribe() chan interface{} {
    27. return p.SubscribeTopic(nil)
    28. }
    29. // 添加一个新的订阅者,订阅过滤器筛选后的主题
    30. func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
    31. ch := make(chan interface{}, p.buffer)
    32. p.m.Lock()
    33. p.subscribers[ch] = topic
    34. p.m.Unlock()
    35. return ch
    36. }
    37. // 退出订阅
    38. func (p *Publisher) Evict(sub chan interface{}) {
    39. p.m.Lock()
    40. defer p.m.Unlock()
    41. delete(p.subscribers, sub)
    42. close(sub)
    43. }
    44. // 发布一个主题
    45. func (p *Publisher) Publish(v interface{}) {
    46. p.m.RLock()
    47. defer p.m.RUnlock()
    48. //var wg sync.WaitGroup
    49. //for sub, topic := range p.subscribers {
    50. // wg.Add(1)
    51. // go p.sendTopic(sub, topic, v, &wg)
    52. //}
    53. wg := p.send(v)
    54. wg.Wait()
    55. }
    56. // 关闭发布者对象,同时关闭所有的订阅者管道。
    57. func (p *Publisher) Close() {
    58. p.m.Lock()
    59. defer p.m.Unlock()
    60. for sub := range p.subscribers {
    61. delete(p.subscribers, sub)
    62. close(sub)
    63. }
    64. }
    65. // 发送主题,可以容忍一定的超时
    66. func (p *Publisher) sendTopic(
    67. sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup,
    68. ) {
    69. defer wg.Done()
    70. if topic != nil && !topic(v) {
    71. return
    72. }
    73. select {
    74. case sub <- v:
    75. case <-time.After(p.timeout):
    76. }
    77. }
    78. func (p *Publisher) send(v interface{}) *sync.WaitGroup {
    79. wg := &sync.WaitGroup{}
    80. for sub, topic := range p.subscribers {
    81. wg.Add(1)
    82. go func(sub subscriber, topic topicFunc) {
    83. defer wg.Done()
    84. if topic != nil && !topic(v) {
    85. return
    86. }
    87. select {
    88. case sub <- v:
    89. case <-time.After(p.timeout):
    90. }
    91. }(sub, topic)
    92. }
    93. return wg
    94. }