Overview
从上图可以看出,multiWriteSyncer 使用了 Composite 模式;lockedWriteSyncer 使用了 Decorator 模式。multiWriteSyncer 可以包含一切实现了 WriteSyncer 接口的对象。
实现细节
lockedWriteSyncer 修饰器
提供了多线程安全的写操作
实现如下:
type lockedWriteSyncer struct {
sync.Mutex
ws WriteSyncer
}
// Lock wraps a WriteSyncer in a mutex to make it safe for concurrent use. In
// particular, *os.Files must be locked before use.
func Lock(ws WriteSyncer) WriteSyncer {
if _, ok := ws.(*lockedWriteSyncer); ok {
// no need to layer on another lock
return ws
}
return &lockedWriteSyncer{ws: ws}
}
func (s *lockedWriteSyncer) Write(bs []byte) (int, error) {
s.Lock()
n, err := s.ws.Write(bs)
s.Unlock()
return n, err
}
func (s *lockedWriteSyncer) Sync() error {
s.Lock()
err := s.ws.Sync()
s.Unlock()
return err
}
multiWriteSyncer
定义:
type multiWriteSyncer []WriteSyncer
WriteSyncer 接口实现:
func (ws multiWriteSyncer) Write(p []byte) (int, error) {
var writeErr error
nWritten := 0
for _, w := range ws { // 遍历全部 WriteSyncer 实例
n, err := w.Write(p)
writeErr = multierr.Append(writeErr, err)
if nWritten == 0 && n != 0 { // 更新实际写入数量
nWritten = n
} else if n < nWritten {
nWritten = n
}
}
return nWritten, writeErr
}
func (ws multiWriteSyncer) Sync() error {
var err error
for _, w := range ws {
err = multierr.Append(err, w.Sync())
}
return err
}
Sink
全局变量:
var (
_sinkMutex sync.RWMutex
_sinkFactories map[string]func(*url.URL) (Sink, error) // keyed by scheme
)
初始化时注册文件类型 Sink:
func resetSinkRegistry() {
_sinkMutex.Lock()
defer _sinkMutex.Unlock()
_sinkFactories = map[string]func(*url.URL) (Sink, error){
schemeFile: newFileSink,
}
}
自定义 Sink 注册:
func RegisterSink(scheme string, factory func(*url.URL) (Sink, error)) error {
_sinkMutex.Lock()
defer _sinkMutex.Unlock()
if scheme == "" {
return errors.New("can't register a sink factory for empty string")
}
normalized, err := normalizeScheme(scheme)
if err != nil {
return fmt.Errorf("%q is not a valid scheme: %v", scheme, err)
}
if _, ok := _sinkFactories[normalized]; ok {
return fmt.Errorf("sink factory already registered for scheme %q", normalized)
}
_sinkFactories[normalized] = factory
return nil
}
根据 url 生成 sink:
func newSink(rawURL string) (Sink, error) {
u, err := url.Parse(rawURL)
if err != nil {
return nil, fmt.Errorf("can't parse %q as a URL: %v", rawURL, err)
}
if u.Scheme == "" {
u.Scheme = schemeFile
}
_sinkMutex.RLock()
factory, ok := _sinkFactories[u.Scheme]
_sinkMutex.RUnlock()
if !ok {
return nil, &errSinkNotFound{u.Scheme}
}
return factory(u)
}
生成 Writer
返回 zapcore.WriteSyncer 实例切片,实际创建为 Sink 实例,Sink 实例同时实现了 io.Closer 方法,因此,在返回值中包含 func() 类型的 close 方法。
func open(paths []string) ([]zapcore.WriteSyncer, func(), error) {
writers := make([]zapcore.WriteSyncer, 0, len(paths))
closers := make([]io.Closer, 0, len(paths))
close := func() { // 封装 close 方法
for _, c := range closers {
c.Close()
}
}
var openErr error
for _, path := range paths {
sink, err := newSink(path) // 根据 URL 生成 Sink
if err != nil {
openErr = multierr.Append(openErr, fmt.Errorf("couldn't open sink %q: %v", path, err))
continue
}
writers = append(writers, sink)
closers = append(closers, sink)
}
if openErr != nil {
close()
return writers, nil, openErr
}
return writers, close, nil
}