Defaults

  1. const (
  2. // Default minio configuration directory where below configuration files/directories are stored.
  3. defaultMinioConfigDir = ".minio"
  4. // Directory contains below files/directories for HTTPS configuration.
  5. certsDir = "certs"
  6. // Directory contains all CA certificates other than system defaults for HTTPS.
  7. certsCADir = "CAs"
  8. // Public certificate file for HTTPS.
  9. publicCertFile = "public.crt"
  10. // Private key file for HTTPS.
  11. privateKeyFile = "private.key"
  12. )

defaultMinioConfigDir 表示 Minio 默认配置文件所在目录为 .minio;默认证书目录为 .minio/certs,CA 目录为 .minio/CAs,默认公钥文件名为 public.crt,默认私钥文件名为 private.key。

getDefaultConfigDir 获取默认配置目录,从代码看默认配置文件目录为 homedir/.minio。默认证书、CA 目录分别为 homedir/.minio/certs、homedir/.minio/CAs。默认配置文件为 homedir/config.json。

  1. func getDefaultConfigDir() string {
  2. homeDir, err := homedir.Dir()
  3. if err != nil {
  4. return ""
  5. }
  6. return filepath.Join(homeDir, defaultMinioConfigDir)
  7. }
  8. func getDefaultCertsDir() string {
  9. return filepath.Join(getDefaultConfigDir(), certsDir)
  10. }
  11. func getDefaultCertsCADir() string {
  12. return filepath.Join(getDefaultCertsDir(), certsCADir)
  13. }
  14. func getConfigFile() string {
  15. return filepath.Join(globalConfigDir.Get(), minioConfigFile)
  16. }

以下变量保存初始默认目录

  1. var (
  2. // Default config, certs and CA directories.
  3. defaultConfigDir = &ConfigDir{path: getDefaultConfigDir()}
  4. defaultCertsDir = &ConfigDir{path: getDefaultCertsDir()}
  5. defaultCertsCADir = &ConfigDir{path: getDefaultCertsCADir()}
  6. // Points to current configuration directory -- deprecated, to be removed in future.
  7. globalConfigDir = defaultConfigDir
  8. // Points to current certs directory set by user with --certs-dir
  9. globalCertsDir = defaultCertsDir
  10. // Points to relative path to certs directory and is <value-of-certs-dir>/CAs
  11. globalCertsCADir = defaultCertsCADir
  12. )

Certificate Manager

Types

Manager 管理多个证书,相关的结构有 pair,用于保存私钥文件名和证书文件名的组合;加载证书的方法 LoadX509KeyPairFunc 类型,本域使用 config.LoadX509KeyPair。

  1. type Manager struct {
  2. lock sync.RWMutex
  3. certificates map[pair]*tls.Certificate // Mapping: certificate file name => TLS certificates
  4. defaultCert pair
  5. duration time.Duration
  6. loadX509KeyPair LoadX509KeyPairFunc
  7. done <-chan struct{}
  8. reloadCerts []chan struct{}
  9. }
  10. type pair struct {
  11. KeyFile string
  12. CertFile string
  13. }

接下来看 Manager 实际创建过程中核心部分。

  1. manager = &Manager{
  2. certificates: map[pair]*tls.Certificate{},
  3. defaultCert: pair{
  4. KeyFile: keyFile,
  5. CertFile: certFile,
  6. },
  7. loadX509KeyPair: loadX509KeyPair,
  8. done: ctx.Done(),
  9. duration: 1 * time.Minute,
  10. }

初始化完毕后,添加默认证书,接下来会详细分析 Manager 实现的各种细节。

  1. if err := manager.AddCertificate(certFile, keyFile); err != nil {
  2. return nil, err
  3. }

AddCertificate

cli-add-certificate.svg
图1: AddCertificate 示意图

AddCertificate 大致过程如上图所示。只有最后一步的监控文件变更需要特殊说明,根据输入文件名是否符号链接有两种监控方式,如下所示,至此已经出现两个不同功能的协程和一种文件变更检查方法,后续再展开。

  1. if certFileIsLink && keyFileIsLink {
  2. go m.watchSymlinks(certFile, keyFile)
  3. } else {
  4. // Windows doesn't allow for watching file changes but instead allows
  5. // for directory changes only, while we can still watch for changes
  6. // on files on other platforms. Watch parent directory on all platforms
  7. // for simplicity.
  8. if err = notify.Watch(filepath.Dir(certFile), m.events, eventWrite...); err != nil {
  9. return err
  10. }
  11. if err = notify.Watch(filepath.Dir(keyFile), m.events, eventWrite...); err != nil {
  12. return err
  13. }
  14. }

Load Certificate

config.LoadX509KeyPair 根据传入的证书文件名、私钥文件名获取需要的证书,方法附上,不做详细注释,以后类似场景下可修改使用。

  1. func LoadX509KeyPair(certFile, keyFile string) (tls.Certificate, error) {
  2. certPEMBlock, err := ioutil.ReadFile(certFile)
  3. if err != nil {
  4. return tls.Certificate{}, ErrSSLUnexpectedError(err)
  5. }
  6. keyPEMBlock, err := ioutil.ReadFile(keyFile)
  7. if err != nil {
  8. return tls.Certificate{}, ErrSSLUnexpectedError(err)
  9. }
  10. key, rest := pem.Decode(keyPEMBlock)
  11. if len(rest) > 0 {
  12. return tls.Certificate{}, ErrSSLUnexpectedData(nil).Msg("The private key contains additional data")
  13. }
  14. if x509.IsEncryptedPEMBlock(key) {
  15. password := env.Get(EnvCertPassword, "")
  16. if len(password) == 0 {
  17. return tls.Certificate{}, ErrSSLNoPassword(nil)
  18. }
  19. decryptedKey, decErr := x509.DecryptPEMBlock(key, []byte(password))
  20. if decErr != nil {
  21. return tls.Certificate{}, ErrSSLWrongPassword(decErr)
  22. }
  23. keyPEMBlock = pem.EncodeToMemory(&pem.Block{Type: key.Type, Bytes: decryptedKey})
  24. }
  25. cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock)
  26. if err != nil {
  27. return tls.Certificate{}, ErrSSLUnexpectedData(nil).Msg(err.Error())
  28. }
  29. // Ensure that the private key is not a P-384 or P-521 EC key.
  30. // The Go TLS stack does not provide constant-time implementations of P-384 and P-521.
  31. if priv, ok := cert.PrivateKey.(crypto.Signer); ok {
  32. if pub, ok := priv.Public().(*ecdsa.PublicKey); ok {
  33. switch pub.Params().Name {
  34. case "P-384":
  35. fallthrough
  36. case "P-521":
  37. // unfortunately there is no cleaner way to check
  38. return tls.Certificate{}, ErrSSLUnexpectedData(nil).Msg("tls: the ECDSA curve '%s' is not supported", pub.Params().Name)
  39. }
  40. }
  41. }
  42. return cert, nil
  43. }

Watching Symbol Link

watchSymlinks 方法用于检查符号链接的变更。不难看出,这个监控的实现比较简单,完全靠定时器触发来进行检测,触发后,重新读取监控的符号链接指向文件的内容,并更改对应记录。

  1. func (m *Manager) watchSymlinks(watch pair, reload <-chan struct{}) {
  2. t := time.NewTimer(m.duration)
  3. defer t.Stop()
  4. for {
  5. select {
  6. case <-m.done:
  7. return // Once stopped exits this routine.
  8. case <-t.C:
  9. case <-reload:
  10. }
  11. t.Reset(m.duration) // Reset timer for new duration
  12. certificate, err := m.loadX509KeyPair(watch.CertFile, watch.KeyFile)
  13. if err != nil {
  14. continue
  15. }
  16. if certificate.Leaf == nil { // This is a performance optimisation
  17. certificate.Leaf, err = x509.ParseCertificate(certificate.Certificate[0])
  18. if err != nil {
  19. continue
  20. }
  21. }
  22. m.lock.Lock()
  23. m.certificates[watch] = &certificate
  24. m.lock.Unlock()
  25. }
  26. }

Watch File Events

watchFileEvents 是在 Manager 创建时同步运行的一个协程。在代码中可以看出,这个协程的核心触发条件是有文件系统变更事件通知,且仅对写文件事件作出反应,动作一样是加载新证书,并变更记录即可。

  1. func (m *Manager) watchFileEvents(watch pair, events chan notify.EventInfo, reload <-chan struct{}) {
  2. for {
  3. select {
  4. case <-m.done:
  5. return
  6. case event := <-events:
  7. if !isWriteEvent(event.Event()) {
  8. continue
  9. }
  10. p := event.Path()
  11. if watch.KeyFile != p && watch.CertFile != p {
  12. continue
  13. }
  14. case <-reload:
  15. }
  16. // Do reload
  17. certificate, err := m.loadX509KeyPair(watch.CertFile, watch.KeyFile)
  18. if err != nil {
  19. continue
  20. }
  21. if certificate.Leaf == nil { // This is performance optimisation
  22. certificate.Leaf, err = x509.ParseCertificate(certificate.Certificate[0])
  23. if err != nil {
  24. continue
  25. }
  26. }
  27. m.lock.Lock()
  28. m.certificates[watch] = &certificate
  29. m.lock.Unlock()
  30. }
  31. }

这个代码有一个问题,需要监听变更的文件在哪里注册的?回忆一下 AddCertificate 方法,当传入的证书路径、私钥路径是普通文件时,有以下操作

  1. if err = notify.Watch(filepath.Dir(certFile), m.events, eventWrite...); err != nil {
  2. return err
  3. }
  4. if err = notify.Watch(filepath.Dir(keyFile), m.events, eventWrite...); err != nil {
  5. return err
  6. }

因此,目标文件的监控是由 notify.Watch 来注册并进行实际监控的,那么接下来,我们来看这个功能到底是如何实现的。

File Change Notification

Event

Event 类型就是 uint32,具体来说 Event 通过不同的位代表不同类型的操作,比如 Create 是 256,通过不同位的组合记录对一个文件的操作。共有四种独立操作:创建、移除、写入及重命名。

  1. type Event uint32
  2. const (
  3. Create = osSpecificCreate
  4. Remove = osSpecificRemove
  5. Write = osSpecificWrite
  6. Rename = osSpecificRename
  7. // All is handful alias for all platform-independent event values.
  8. All = Create | Remove | Write | Rename
  9. )

为方便调试,Event 实现了 String 方法,其中有两个映射:estr 及 osestr

  1. func (e Event) String() string {
  2. var s []string
  3. for _, strmap := range []map[Event]string{estr, osestr} {
  4. for ev, str := range strmap {
  5. if e&ev == ev {
  6. s = append(s, str)
  7. }
  8. }
  9. }
  10. return strings.Join(s, "|")
  11. }

首先看 estr

  1. var estr = map[Event]string{
  2. Create: "notify.Create",
  3. Remove: "notify.Remove",
  4. Write: "notify.Write",
  5. Rename: "notify.Rename",
  6. recursive: "recursive",
  7. omit: "omit",
  8. }

再看 osestr,由于文件系统监控与操作系统相关,后续无特殊说明,均以 Linux 为例

  1. const (
  2. osSpecificCreate = Event(FSEventsCreated)
  3. osSpecificRemove = Event(FSEventsRemoved)
  4. osSpecificWrite = Event(FSEventsModified)
  5. osSpecificRename = Event(FSEventsRenamed)
  6. // internal = Event(0x100000)
  7. // recursive is used to distinguish recursive eventsets from non-recursive ones
  8. recursive = Event(0x200000)
  9. // omit is used for dispatching internal events; only those events are sent
  10. // for which both the event and the watchpoint has omit in theirs event sets.
  11. omit = Event(0x400000)
  12. )
  13. var osestr = map[Event]string{
  14. FSEventsMustScanSubDirs: "notify.FSEventsMustScanSubDirs",
  15. FSEventsUserDropped: "notify.FSEventsUserDropped",
  16. FSEventsKernelDropped: "notify.FSEventsKernelDropped",
  17. FSEventsEventIdsWrapped: "notify.FSEventsEventIdsWrapped",
  18. FSEventsHistoryDone: "notify.FSEventsHistoryDone",
  19. FSEventsRootChanged: "notify.FSEventsRootChanged",
  20. FSEventsMount: "notify.FSEventsMount",
  21. FSEventsUnmount: "notify.FSEventsUnmount",
  22. FSEventsInodeMetaMod: "notify.FSEventsInodeMetaMod",
  23. FSEventsFinderInfoMod: "notify.FSEventsFinderInfoMod",
  24. FSEventsChangeOwner: "notify.FSEventsChangeOwner",
  25. FSEventsXattrMod: "notify.FSEventsXattrMod",
  26. FSEventsIsFile: "notify.FSEventsIsFile",
  27. FSEventsIsDir: "notify.FSEventsIsDir",
  28. FSEventsIsSymlink: "notify.FSEventsIsSymlink",
  29. }

Interface

external-notify-interface.svg
图2:文件监控全景图

文件监控的大致流程如上图所示,核心接口有 tree、watcher 及 EventInfo。tree 接口可理解为监控的文件树,watcher 为监控文件实例,EventInfo 包含文件变更必须的信息。watch loop 表示底层核心的监控模块儿。三个接口的定义如下

  1. type tree interface {
  2. Watch(string, chan<- EventInfo, ...Event) error
  3. Stop(chan<- EventInfo)
  4. Close() error
  5. }
  6. type watcher interface {
  7. // Watch requests a watcher creation for the given path and given event set.
  8. Watch(path string, event Event) error
  9. // Unwatch requests a watcher deletion for the given path and given event set.
  10. Unwatch(path string) error
  11. // Rewatch provides a functionality for modifying existing watch-points, like
  12. // expanding its event set.
  13. //
  14. // Rewatch modifies existing watch-point under for the given path. It passes
  15. // the existing event set currently registered for the given path, and the
  16. // new, requested event set.
  17. //
  18. // It is guaranteed that Tree will not pass to Rewatch zero value for any
  19. // of its arguments. If old == new and watcher can be upgraded to
  20. // recursiveWatcher interface, a watch for the corresponding path is expected
  21. // to be changed from recursive to the non-recursive one.
  22. Rewatch(path string, old, new Event) error
  23. // Close unwatches all paths that are registered. When Close returns, it
  24. // is expected it will report no more events.
  25. Close() error
  26. }
  27. type EventInfo interface {
  28. Event() Event // event value for the filesystem action
  29. Path() string // real path of the file or directory
  30. Sys() interface{} // underlying data source (can return nil)
  31. }

INode Watcher

external-watch-linux.svg
图3: Linux 监控文件变更全景图

首先我们看一下基本的数据类型,简要说明下这些基本类型间的关联,nonrecursiveTree 主要包含了两个核心结构:root 用于保存监控的文件树,并在 node 的 watchpoint 域中关联对外的 chan;inotify 用于控制 epoll 过程。注意在 watchpoint 中,将内部 EventInfo 接收 chan 映射为了外部 Event。

  1. type nonrecursiveTree struct {
  2. rw sync.RWMutex // protects root
  3. root root
  4. w watcher
  5. c chan EventInfo
  6. rec chan EventInfo
  7. }
  8. type root struct {
  9. nd node
  10. }
  11. type watchpoint map[chan<- EventInfo]Event
  12. type inotify struct {
  13. sync.RWMutex // protects inotify.m map
  14. m map[int32]*watched // watch descriptor to watched object
  15. fd int32 // inotify file descriptor
  16. pipefd []int // pipe's read and write descriptors
  17. epfd int // epoll descriptor
  18. epes []unix.EpollEvent // epoll events
  19. buffer [eventBufferSize]byte // inotify event buffer
  20. wg sync.WaitGroup // wait group used to close main loop
  21. c chan<- EventInfo // event dispatcher channel
  22. }
  23. type watched struct {
  24. path string
  25. mask uint32
  26. }

Default Tree Creation

文件监控系统有一个默认的 tree 实例,通过如下代码创建,由于 Linux 实现中的 inotify 没有实现递归监控接口,因此实际执行非递归树(第 7 行)部分

  1. func newTree() tree {
  2. c := make(chan EventInfo, buffer)
  3. w := newWatcher(c)
  4. if rw, ok := w.(recursiveWatcher); ok {
  5. return newRecursiveTree(rw, c)
  6. }
  7. return newNonrecursiveTree(w, c, make(chan EventInfo, buffer))
  8. }

watcher 部分创建没有什么好讲的,注意其中的 epes 用于保存 epoll 事件就好了

  1. func newWatcher(c chan<- EventInfo) watcher {
  2. i := &inotify{
  3. m: make(map[int32]*watched),
  4. fd: invalidDescriptor,
  5. pipefd: []int{invalidDescriptor, invalidDescriptor},
  6. epfd: invalidDescriptor,
  7. epes: make([]unix.EpollEvent, 0),
  8. c: c,
  9. }
  10. runtime.SetFinalizer(i, func(i *inotify) {
  11. i.epollclose()
  12. if i.fd != invalidDescriptor {
  13. unix.Close(int(i.fd))
  14. }
  15. })
  16. return i
  17. }

继续跟踪至非递归树监控部分,可以看到它主要是启动了两个协程,一个用于分发事件,一个用于内部处理(方法名叫 internal,应该是合理的猜测)

  1. func newNonrecursiveTree(w watcher, c, rec chan EventInfo) *nonrecursiveTree {
  2. if rec == nil {
  3. rec = make(chan EventInfo, buffer)
  4. }
  5. t := &nonrecursiveTree{
  6. root: root{nd: newnode("")},
  7. w: w,
  8. c: c,
  9. rec: rec,
  10. }
  11. go t.dispatch(c)
  12. go t.internal(rec)
  13. return t
  14. }

dispatch 协程将从 watcher 中接受到的事件分别分发给合适的 node 再进行分发及分发给 rec,触发内部处理(internal)协程执行

  1. func (t *nonrecursiveTree) dispatch(c <-chan EventInfo) {
  2. for ei := range c {
  3. dbgprintf("dispatching %v on %q", ei.Event(), ei.Path())
  4. go func(ei EventInfo) {
  5. var nd node
  6. var isrec bool
  7. dir, base := split(ei.Path())
  8. fn := func(it node, isbase bool) error {
  9. isrec = isrec || it.Watch.IsRecursive()
  10. if isbase {
  11. nd = it
  12. } else {
  13. it.Watch.Dispatch(ei, recursive)
  14. }
  15. return nil
  16. }
  17. t.rw.RLock()
  18. // Notify recursive watchpoints found on the path.
  19. if err := t.root.WalkPath(dir, fn); err != nil {
  20. dbgprint("dispatch did not reach leaf:", err)
  21. t.rw.RUnlock()
  22. return
  23. }
  24. // Notify parent watchpoint.
  25. nd.Watch.Dispatch(ei, 0)
  26. isrec = isrec || nd.Watch.IsRecursive()
  27. // If leaf watchpoint exists, notify it.
  28. if nd, ok := nd.Child[base]; ok {
  29. isrec = isrec || nd.Watch.IsRecursive()
  30. nd.Watch.Dispatch(ei, 0)
  31. }
  32. t.rw.RUnlock()
  33. // If the event describes newly leaf directory created within
  34. if !isrec || ei.Event()&(Create|Remove) == 0 {
  35. return
  36. }
  37. if ok, err := ei.(isDirer).isDir(); !ok || err != nil {
  38. return
  39. }
  40. t.rec <- ei
  41. }(ei)
  42. }
  43. }

内部处理协程仅处理移除事件,并将对应的 node 移除掉,并在 watcher 上对其执行 Unwatch 操作

  1. func (t *nonrecursiveTree) internal(rec <-chan EventInfo) {
  2. for ei := range rec {
  3. t.rw.Lock()
  4. if ei.Event() == Remove {
  5. nd, err := t.root.Get(ei.Path())
  6. if err != nil {
  7. t.rw.Unlock()
  8. continue
  9. }
  10. t.walkWatchpoint(nd, func(_ Event, nd node) error {
  11. t.w.Unwatch(nd.Name)
  12. return nil
  13. })
  14. t.root.Del(ei.Path())
  15. t.rw.Unlock()
  16. continue
  17. }
  18. var nd node
  19. var eset = internal
  20. t.root.WalkPath(ei.Path(), func(it node, _ bool) error {
  21. if e := it.Watch[t.rec]; e != 0 && e > eset {
  22. eset = e
  23. }
  24. nd = it
  25. return nil
  26. })
  27. if eset == internal {
  28. t.rw.Unlock()
  29. continue
  30. }
  31. if ei.Path() != nd.Name {
  32. nd = nd.Add(ei.Path())
  33. }
  34. err := nd.AddDir(t.recFunc(eset))
  35. t.rw.Unlock()
  36. if err != nil {
  37. dbgprintf("internal(%p) error: %v", rec, err)
  38. }
  39. }
  40. }

Watch

Watch 操作将需要监控的文件记录在内部目录树中,并在 watcher 中进行添加,这个代码可自行进行分析,何时会执行 watchrec 何时会执行 watch,我们在此只继续跟踪 watch 方法

  1. func (t *nonrecursiveTree) Watch(path string, c chan<- EventInfo, events ...Event) error {
  2. if c == nil {
  3. panic("notify: Watch using nil channel")
  4. }
  5. // Expanding with empty event set is a nop.
  6. if len(events) == 0 {
  7. return nil
  8. }
  9. path, isrec, err := cleanpath(path)
  10. if err != nil {
  11. return err
  12. }
  13. eset := joinevents(events)
  14. t.rw.Lock()
  15. defer t.rw.Unlock()
  16. nd := t.root.Add(path)
  17. if isrec {
  18. return t.watchrec(nd, c, eset|recursive)
  19. }
  20. return t.watch(nd, c, eset)
  21. }

inotify 的 Watch 只是 watch 的简单封装,这个方法中最重要的方法是 lazyinit

  1. func (i *inotify) watch(path string, e Event) (err error) {
  2. if e&^(All|Event(unix.IN_ALL_EVENTS)) != 0 {
  3. return errors.New("notify: unknown event")
  4. }
  5. if err = i.lazyinit(); err != nil {
  6. return
  7. }
  8. iwd, err := unix.InotifyAddWatch(int(i.fd), path, encode(e))
  9. if err != nil {
  10. return
  11. }
  12. i.Lock()
  13. if wd, ok := i.m[int32(iwd)]; !ok {
  14. i.m[int32(iwd)] = &watched{path: path, mask: uint32(e)}
  15. } else {
  16. wd.path = path
  17. wd.mask = uint32(e)
  18. }
  19. i.Unlock()
  20. return nil
  21. }

lazyinit 保证 epoll 被正确的初始化,并启动 loop 协程和 send 协程,两个协程通过 esch 进行通信

  1. func (i *inotify) lazyinit() error {
  2. if atomic.LoadInt32(&i.fd) == invalidDescriptor {
  3. i.Lock()
  4. defer i.Unlock()
  5. if atomic.LoadInt32(&i.fd) == invalidDescriptor {
  6. fd, err := unix.InotifyInit1(unix.IN_CLOEXEC)
  7. if err != nil {
  8. return err
  9. }
  10. i.fd = int32(fd)
  11. if err = i.epollinit(); err != nil {
  12. _, _ = i.epollclose(), unix.Close(int(fd)) // Ignore errors.
  13. i.fd = invalidDescriptor
  14. return err
  15. }
  16. esch := make(chan []*event)
  17. go i.loop(esch)
  18. i.wg.Add(consumersCount)
  19. for n := 0; n < consumersCount; n++ {
  20. go i.send(esch)
  21. }
  22. }
  23. }
  24. return nil
  25. }

loop 协程执行 epoll 操作,并将全部事件变更分发至 esch,等待 send 协程处理

  1. func (i *inotify) loop(esch chan<- []*event) {
  2. epes := make([]unix.EpollEvent, 1)
  3. fd := atomic.LoadInt32(&i.fd)
  4. for {
  5. switch _, err := unix.EpollWait(i.epfd, epes, -1); err {
  6. case nil:
  7. switch epes[0].Fd {
  8. case fd:
  9. esch <- i.read()
  10. epes[0].Fd = 0
  11. case int32(i.pipefd[0]):
  12. i.Lock()
  13. defer i.Unlock()
  14. if err = unix.Close(int(fd)); err != nil && err != unix.EINTR {
  15. panic("notify: close(2) error " + err.Error())
  16. }
  17. atomic.StoreInt32(&i.fd, invalidDescriptor)
  18. if err = i.epollclose(); err != nil && err != unix.EINTR {
  19. panic("notify: epollclose error " + err.Error())
  20. }
  21. close(esch)
  22. return
  23. }
  24. case unix.EINTR:
  25. continue
  26. default: // We should never reach this line.
  27. panic("notify: epoll_wait(2) error " + err.Error())
  28. }
  29. }
  30. }

send 协程将事件做转换,并分发至 nonrecursiveTree 的 dispatch 协程,至此全部逻辑完成联通。

  1. func (i *inotify) send(esch <-chan []*event) {
  2. for es := range esch {
  3. for _, e := range i.transform(es) {
  4. if e != nil {
  5. i.c <- e
  6. }
  7. }
  8. }
  9. i.wg.Done()
  10. }