Overview

Writer - 图1
从上图可以看出,multiWriteSyncer 使用了 Composite 模式;lockedWriteSyncer 使用了 Decorator 模式。multiWriteSyncer 可以包含一切实现了 WriteSyncer 接口的对象。

实现细节

lockedWriteSyncer 修饰器

提供了多线程安全的写操作
Writer - 图2
实现如下:

  1. type lockedWriteSyncer struct {
  2. sync.Mutex
  3. ws WriteSyncer
  4. }
  5. // Lock wraps a WriteSyncer in a mutex to make it safe for concurrent use. In
  6. // particular, *os.Files must be locked before use.
  7. func Lock(ws WriteSyncer) WriteSyncer {
  8. if _, ok := ws.(*lockedWriteSyncer); ok {
  9. // no need to layer on another lock
  10. return ws
  11. }
  12. return &lockedWriteSyncer{ws: ws}
  13. }
  14. func (s *lockedWriteSyncer) Write(bs []byte) (int, error) {
  15. s.Lock()
  16. n, err := s.ws.Write(bs)
  17. s.Unlock()
  18. return n, err
  19. }
  20. func (s *lockedWriteSyncer) Sync() error {
  21. s.Lock()
  22. err := s.ws.Sync()
  23. s.Unlock()
  24. return err
  25. }

multiWriteSyncer

定义:

  1. type multiWriteSyncer []WriteSyncer

WriteSyncer 接口实现:

  1. func (ws multiWriteSyncer) Write(p []byte) (int, error) {
  2. var writeErr error
  3. nWritten := 0
  4. for _, w := range ws { // 遍历全部 WriteSyncer 实例
  5. n, err := w.Write(p)
  6. writeErr = multierr.Append(writeErr, err)
  7. if nWritten == 0 && n != 0 { // 更新实际写入数量
  8. nWritten = n
  9. } else if n < nWritten {
  10. nWritten = n
  11. }
  12. }
  13. return nWritten, writeErr
  14. }
  15. func (ws multiWriteSyncer) Sync() error {
  16. var err error
  17. for _, w := range ws {
  18. err = multierr.Append(err, w.Sync())
  19. }
  20. return err
  21. }

Sink

全局变量:

  1. var (
  2. _sinkMutex sync.RWMutex
  3. _sinkFactories map[string]func(*url.URL) (Sink, error) // keyed by scheme
  4. )

初始化时注册文件类型 Sink:

  1. func resetSinkRegistry() {
  2. _sinkMutex.Lock()
  3. defer _sinkMutex.Unlock()
  4. _sinkFactories = map[string]func(*url.URL) (Sink, error){
  5. schemeFile: newFileSink,
  6. }
  7. }

自定义 Sink 注册:

  1. func RegisterSink(scheme string, factory func(*url.URL) (Sink, error)) error {
  2. _sinkMutex.Lock()
  3. defer _sinkMutex.Unlock()
  4. if scheme == "" {
  5. return errors.New("can't register a sink factory for empty string")
  6. }
  7. normalized, err := normalizeScheme(scheme)
  8. if err != nil {
  9. return fmt.Errorf("%q is not a valid scheme: %v", scheme, err)
  10. }
  11. if _, ok := _sinkFactories[normalized]; ok {
  12. return fmt.Errorf("sink factory already registered for scheme %q", normalized)
  13. }
  14. _sinkFactories[normalized] = factory
  15. return nil
  16. }

根据 url 生成 sink:

  1. func newSink(rawURL string) (Sink, error) {
  2. u, err := url.Parse(rawURL)
  3. if err != nil {
  4. return nil, fmt.Errorf("can't parse %q as a URL: %v", rawURL, err)
  5. }
  6. if u.Scheme == "" {
  7. u.Scheme = schemeFile
  8. }
  9. _sinkMutex.RLock()
  10. factory, ok := _sinkFactories[u.Scheme]
  11. _sinkMutex.RUnlock()
  12. if !ok {
  13. return nil, &errSinkNotFound{u.Scheme}
  14. }
  15. return factory(u)
  16. }

生成 Writer

返回 zapcore.WriteSyncer 实例切片,实际创建为 Sink 实例,Sink 实例同时实现了 io.Closer 方法,因此,在返回值中包含 func() 类型的 close 方法。

  1. func open(paths []string) ([]zapcore.WriteSyncer, func(), error) {
  2. writers := make([]zapcore.WriteSyncer, 0, len(paths))
  3. closers := make([]io.Closer, 0, len(paths))
  4. close := func() { // 封装 close 方法
  5. for _, c := range closers {
  6. c.Close()
  7. }
  8. }
  9. var openErr error
  10. for _, path := range paths {
  11. sink, err := newSink(path) // 根据 URL 生成 Sink
  12. if err != nil {
  13. openErr = multierr.Append(openErr, fmt.Errorf("couldn't open sink %q: %v", path, err))
  14. continue
  15. }
  16. writers = append(writers, sink)
  17. closers = append(closers, sink)
  18. }
  19. if openErr != nil {
  20. close()
  21. return writers, nil, openErr
  22. }
  23. return writers, close, nil
  24. }