Defaults
const (// Default minio configuration directory where below configuration files/directories are stored.defaultMinioConfigDir = ".minio"// Directory contains below files/directories for HTTPS configuration.certsDir = "certs"// Directory contains all CA certificates other than system defaults for HTTPS.certsCADir = "CAs"// Public certificate file for HTTPS.publicCertFile = "public.crt"// Private key file for HTTPS.privateKeyFile = "private.key")
defaultMinioConfigDir 表示 Minio 默认配置文件所在目录为 .minio;默认证书目录为 .minio/certs,CA 目录为 .minio/CAs,默认公钥文件名为 public.crt,默认私钥文件名为 private.key。
getDefaultConfigDir 获取默认配置目录,从代码看默认配置文件目录为 homedir/.minio。默认证书、CA 目录分别为 homedir/.minio/certs、homedir/.minio/CAs。默认配置文件为 homedir/config.json。
func getDefaultConfigDir() string {homeDir, err := homedir.Dir()if err != nil {return ""}return filepath.Join(homeDir, defaultMinioConfigDir)}func getDefaultCertsDir() string {return filepath.Join(getDefaultConfigDir(), certsDir)}func getDefaultCertsCADir() string {return filepath.Join(getDefaultCertsDir(), certsCADir)}func getConfigFile() string {return filepath.Join(globalConfigDir.Get(), minioConfigFile)}
以下变量保存初始默认目录
var (// Default config, certs and CA directories.defaultConfigDir = &ConfigDir{path: getDefaultConfigDir()}defaultCertsDir = &ConfigDir{path: getDefaultCertsDir()}defaultCertsCADir = &ConfigDir{path: getDefaultCertsCADir()}// Points to current configuration directory -- deprecated, to be removed in future.globalConfigDir = defaultConfigDir// Points to current certs directory set by user with --certs-dirglobalCertsDir = defaultCertsDir// Points to relative path to certs directory and is <value-of-certs-dir>/CAsglobalCertsCADir = defaultCertsCADir)
Certificate Manager
Types
Manager 管理多个证书,相关的结构有 pair,用于保存私钥文件名和证书文件名的组合;加载证书的方法 LoadX509KeyPairFunc 类型,本域使用 config.LoadX509KeyPair。
type Manager struct {lock sync.RWMutexcertificates map[pair]*tls.Certificate // Mapping: certificate file name => TLS certificatesdefaultCert pairduration time.DurationloadX509KeyPair LoadX509KeyPairFuncdone <-chan struct{}reloadCerts []chan struct{}}type pair struct {KeyFile stringCertFile string}
接下来看 Manager 实际创建过程中核心部分。
manager = &Manager{certificates: map[pair]*tls.Certificate{},defaultCert: pair{KeyFile: keyFile,CertFile: certFile,},loadX509KeyPair: loadX509KeyPair,done: ctx.Done(),duration: 1 * time.Minute,}
初始化完毕后,添加默认证书,接下来会详细分析 Manager 实现的各种细节。
if err := manager.AddCertificate(certFile, keyFile); err != nil {return nil, err}
AddCertificate
图1: AddCertificate 示意图
AddCertificate 大致过程如上图所示。只有最后一步的监控文件变更需要特殊说明,根据输入文件名是否符号链接有两种监控方式,如下所示,至此已经出现两个不同功能的协程和一种文件变更检查方法,后续再展开。
if certFileIsLink && keyFileIsLink {go m.watchSymlinks(certFile, keyFile)} else {// Windows doesn't allow for watching file changes but instead allows// for directory changes only, while we can still watch for changes// on files on other platforms. Watch parent directory on all platforms// for simplicity.if err = notify.Watch(filepath.Dir(certFile), m.events, eventWrite...); err != nil {return err}if err = notify.Watch(filepath.Dir(keyFile), m.events, eventWrite...); err != nil {return err}}
Load Certificate
config.LoadX509KeyPair 根据传入的证书文件名、私钥文件名获取需要的证书,方法附上,不做详细注释,以后类似场景下可修改使用。
func LoadX509KeyPair(certFile, keyFile string) (tls.Certificate, error) {certPEMBlock, err := ioutil.ReadFile(certFile)if err != nil {return tls.Certificate{}, ErrSSLUnexpectedError(err)}keyPEMBlock, err := ioutil.ReadFile(keyFile)if err != nil {return tls.Certificate{}, ErrSSLUnexpectedError(err)}key, rest := pem.Decode(keyPEMBlock)if len(rest) > 0 {return tls.Certificate{}, ErrSSLUnexpectedData(nil).Msg("The private key contains additional data")}if x509.IsEncryptedPEMBlock(key) {password := env.Get(EnvCertPassword, "")if len(password) == 0 {return tls.Certificate{}, ErrSSLNoPassword(nil)}decryptedKey, decErr := x509.DecryptPEMBlock(key, []byte(password))if decErr != nil {return tls.Certificate{}, ErrSSLWrongPassword(decErr)}keyPEMBlock = pem.EncodeToMemory(&pem.Block{Type: key.Type, Bytes: decryptedKey})}cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock)if err != nil {return tls.Certificate{}, ErrSSLUnexpectedData(nil).Msg(err.Error())}// Ensure that the private key is not a P-384 or P-521 EC key.// The Go TLS stack does not provide constant-time implementations of P-384 and P-521.if priv, ok := cert.PrivateKey.(crypto.Signer); ok {if pub, ok := priv.Public().(*ecdsa.PublicKey); ok {switch pub.Params().Name {case "P-384":fallthroughcase "P-521":// unfortunately there is no cleaner way to checkreturn tls.Certificate{}, ErrSSLUnexpectedData(nil).Msg("tls: the ECDSA curve '%s' is not supported", pub.Params().Name)}}}return cert, nil}
Watching Symbol Link
watchSymlinks 方法用于检查符号链接的变更。不难看出,这个监控的实现比较简单,完全靠定时器触发来进行检测,触发后,重新读取监控的符号链接指向文件的内容,并更改对应记录。
func (m *Manager) watchSymlinks(watch pair, reload <-chan struct{}) {t := time.NewTimer(m.duration)defer t.Stop()for {select {case <-m.done:return // Once stopped exits this routine.case <-t.C:case <-reload:}t.Reset(m.duration) // Reset timer for new durationcertificate, err := m.loadX509KeyPair(watch.CertFile, watch.KeyFile)if err != nil {continue}if certificate.Leaf == nil { // This is a performance optimisationcertificate.Leaf, err = x509.ParseCertificate(certificate.Certificate[0])if err != nil {continue}}m.lock.Lock()m.certificates[watch] = &certificatem.lock.Unlock()}}
Watch File Events
watchFileEvents 是在 Manager 创建时同步运行的一个协程。在代码中可以看出,这个协程的核心触发条件是有文件系统变更事件通知,且仅对写文件事件作出反应,动作一样是加载新证书,并变更记录即可。
func (m *Manager) watchFileEvents(watch pair, events chan notify.EventInfo, reload <-chan struct{}) {for {select {case <-m.done:returncase event := <-events:if !isWriteEvent(event.Event()) {continue}p := event.Path()if watch.KeyFile != p && watch.CertFile != p {continue}case <-reload:}// Do reloadcertificate, err := m.loadX509KeyPair(watch.CertFile, watch.KeyFile)if err != nil {continue}if certificate.Leaf == nil { // This is performance optimisationcertificate.Leaf, err = x509.ParseCertificate(certificate.Certificate[0])if err != nil {continue}}m.lock.Lock()m.certificates[watch] = &certificatem.lock.Unlock()}}
这个代码有一个问题,需要监听变更的文件在哪里注册的?回忆一下 AddCertificate 方法,当传入的证书路径、私钥路径是普通文件时,有以下操作
if err = notify.Watch(filepath.Dir(certFile), m.events, eventWrite...); err != nil {return err}if err = notify.Watch(filepath.Dir(keyFile), m.events, eventWrite...); err != nil {return err}
因此,目标文件的监控是由 notify.Watch 来注册并进行实际监控的,那么接下来,我们来看这个功能到底是如何实现的。
File Change Notification
Event
Event 类型就是 uint32,具体来说 Event 通过不同的位代表不同类型的操作,比如 Create 是 256,通过不同位的组合记录对一个文件的操作。共有四种独立操作:创建、移除、写入及重命名。
type Event uint32const (Create = osSpecificCreateRemove = osSpecificRemoveWrite = osSpecificWriteRename = osSpecificRename// All is handful alias for all platform-independent event values.All = Create | Remove | Write | Rename)
为方便调试,Event 实现了 String 方法,其中有两个映射:estr 及 osestr
func (e Event) String() string {var s []stringfor _, strmap := range []map[Event]string{estr, osestr} {for ev, str := range strmap {if e&ev == ev {s = append(s, str)}}}return strings.Join(s, "|")}
首先看 estr
var estr = map[Event]string{Create: "notify.Create",Remove: "notify.Remove",Write: "notify.Write",Rename: "notify.Rename",recursive: "recursive",omit: "omit",}
再看 osestr,由于文件系统监控与操作系统相关,后续无特殊说明,均以 Linux 为例
const (osSpecificCreate = Event(FSEventsCreated)osSpecificRemove = Event(FSEventsRemoved)osSpecificWrite = Event(FSEventsModified)osSpecificRename = Event(FSEventsRenamed)// internal = Event(0x100000)// recursive is used to distinguish recursive eventsets from non-recursive onesrecursive = Event(0x200000)// omit is used for dispatching internal events; only those events are sent// for which both the event and the watchpoint has omit in theirs event sets.omit = Event(0x400000))var osestr = map[Event]string{FSEventsMustScanSubDirs: "notify.FSEventsMustScanSubDirs",FSEventsUserDropped: "notify.FSEventsUserDropped",FSEventsKernelDropped: "notify.FSEventsKernelDropped",FSEventsEventIdsWrapped: "notify.FSEventsEventIdsWrapped",FSEventsHistoryDone: "notify.FSEventsHistoryDone",FSEventsRootChanged: "notify.FSEventsRootChanged",FSEventsMount: "notify.FSEventsMount",FSEventsUnmount: "notify.FSEventsUnmount",FSEventsInodeMetaMod: "notify.FSEventsInodeMetaMod",FSEventsFinderInfoMod: "notify.FSEventsFinderInfoMod",FSEventsChangeOwner: "notify.FSEventsChangeOwner",FSEventsXattrMod: "notify.FSEventsXattrMod",FSEventsIsFile: "notify.FSEventsIsFile",FSEventsIsDir: "notify.FSEventsIsDir",FSEventsIsSymlink: "notify.FSEventsIsSymlink",}
Interface
图2:文件监控全景图
文件监控的大致流程如上图所示,核心接口有 tree、watcher 及 EventInfo。tree 接口可理解为监控的文件树,watcher 为监控文件实例,EventInfo 包含文件变更必须的信息。watch loop 表示底层核心的监控模块儿。三个接口的定义如下
type tree interface {Watch(string, chan<- EventInfo, ...Event) errorStop(chan<- EventInfo)Close() error}type watcher interface {// Watch requests a watcher creation for the given path and given event set.Watch(path string, event Event) error// Unwatch requests a watcher deletion for the given path and given event set.Unwatch(path string) error// Rewatch provides a functionality for modifying existing watch-points, like// expanding its event set.//// Rewatch modifies existing watch-point under for the given path. It passes// the existing event set currently registered for the given path, and the// new, requested event set.//// It is guaranteed that Tree will not pass to Rewatch zero value for any// of its arguments. If old == new and watcher can be upgraded to// recursiveWatcher interface, a watch for the corresponding path is expected// to be changed from recursive to the non-recursive one.Rewatch(path string, old, new Event) error// Close unwatches all paths that are registered. When Close returns, it// is expected it will report no more events.Close() error}type EventInfo interface {Event() Event // event value for the filesystem actionPath() string // real path of the file or directorySys() interface{} // underlying data source (can return nil)}
INode Watcher
图3: Linux 监控文件变更全景图
首先我们看一下基本的数据类型,简要说明下这些基本类型间的关联,nonrecursiveTree 主要包含了两个核心结构:root 用于保存监控的文件树,并在 node 的 watchpoint 域中关联对外的 chan;inotify 用于控制 epoll 过程。注意在 watchpoint 中,将内部 EventInfo 接收 chan 映射为了外部 Event。
type nonrecursiveTree struct {rw sync.RWMutex // protects rootroot rootw watcherc chan EventInforec chan EventInfo}type root struct {nd node}type watchpoint map[chan<- EventInfo]Eventtype inotify struct {sync.RWMutex // protects inotify.m mapm map[int32]*watched // watch descriptor to watched objectfd int32 // inotify file descriptorpipefd []int // pipe's read and write descriptorsepfd int // epoll descriptorepes []unix.EpollEvent // epoll eventsbuffer [eventBufferSize]byte // inotify event bufferwg sync.WaitGroup // wait group used to close main loopc chan<- EventInfo // event dispatcher channel}type watched struct {path stringmask uint32}
Default Tree Creation
文件监控系统有一个默认的 tree 实例,通过如下代码创建,由于 Linux 实现中的 inotify 没有实现递归监控接口,因此实际执行非递归树(第 7 行)部分
func newTree() tree {c := make(chan EventInfo, buffer)w := newWatcher(c)if rw, ok := w.(recursiveWatcher); ok {return newRecursiveTree(rw, c)}return newNonrecursiveTree(w, c, make(chan EventInfo, buffer))}
watcher 部分创建没有什么好讲的,注意其中的 epes 用于保存 epoll 事件就好了
func newWatcher(c chan<- EventInfo) watcher {i := &inotify{m: make(map[int32]*watched),fd: invalidDescriptor,pipefd: []int{invalidDescriptor, invalidDescriptor},epfd: invalidDescriptor,epes: make([]unix.EpollEvent, 0),c: c,}runtime.SetFinalizer(i, func(i *inotify) {i.epollclose()if i.fd != invalidDescriptor {unix.Close(int(i.fd))}})return i}
继续跟踪至非递归树监控部分,可以看到它主要是启动了两个协程,一个用于分发事件,一个用于内部处理(方法名叫 internal,应该是合理的猜测)
func newNonrecursiveTree(w watcher, c, rec chan EventInfo) *nonrecursiveTree {if rec == nil {rec = make(chan EventInfo, buffer)}t := &nonrecursiveTree{root: root{nd: newnode("")},w: w,c: c,rec: rec,}go t.dispatch(c)go t.internal(rec)return t}
dispatch 协程将从 watcher 中接受到的事件分别分发给合适的 node 再进行分发及分发给 rec,触发内部处理(internal)协程执行
func (t *nonrecursiveTree) dispatch(c <-chan EventInfo) {for ei := range c {dbgprintf("dispatching %v on %q", ei.Event(), ei.Path())go func(ei EventInfo) {var nd nodevar isrec booldir, base := split(ei.Path())fn := func(it node, isbase bool) error {isrec = isrec || it.Watch.IsRecursive()if isbase {nd = it} else {it.Watch.Dispatch(ei, recursive)}return nil}t.rw.RLock()// Notify recursive watchpoints found on the path.if err := t.root.WalkPath(dir, fn); err != nil {dbgprint("dispatch did not reach leaf:", err)t.rw.RUnlock()return}// Notify parent watchpoint.nd.Watch.Dispatch(ei, 0)isrec = isrec || nd.Watch.IsRecursive()// If leaf watchpoint exists, notify it.if nd, ok := nd.Child[base]; ok {isrec = isrec || nd.Watch.IsRecursive()nd.Watch.Dispatch(ei, 0)}t.rw.RUnlock()// If the event describes newly leaf directory created withinif !isrec || ei.Event()&(Create|Remove) == 0 {return}if ok, err := ei.(isDirer).isDir(); !ok || err != nil {return}t.rec <- ei}(ei)}}
内部处理协程仅处理移除事件,并将对应的 node 移除掉,并在 watcher 上对其执行 Unwatch 操作
func (t *nonrecursiveTree) internal(rec <-chan EventInfo) {for ei := range rec {t.rw.Lock()if ei.Event() == Remove {nd, err := t.root.Get(ei.Path())if err != nil {t.rw.Unlock()continue}t.walkWatchpoint(nd, func(_ Event, nd node) error {t.w.Unwatch(nd.Name)return nil})t.root.Del(ei.Path())t.rw.Unlock()continue}var nd nodevar eset = internalt.root.WalkPath(ei.Path(), func(it node, _ bool) error {if e := it.Watch[t.rec]; e != 0 && e > eset {eset = e}nd = itreturn nil})if eset == internal {t.rw.Unlock()continue}if ei.Path() != nd.Name {nd = nd.Add(ei.Path())}err := nd.AddDir(t.recFunc(eset))t.rw.Unlock()if err != nil {dbgprintf("internal(%p) error: %v", rec, err)}}}
Watch
Watch 操作将需要监控的文件记录在内部目录树中,并在 watcher 中进行添加,这个代码可自行进行分析,何时会执行 watchrec 何时会执行 watch,我们在此只继续跟踪 watch 方法
func (t *nonrecursiveTree) Watch(path string, c chan<- EventInfo, events ...Event) error {if c == nil {panic("notify: Watch using nil channel")}// Expanding with empty event set is a nop.if len(events) == 0 {return nil}path, isrec, err := cleanpath(path)if err != nil {return err}eset := joinevents(events)t.rw.Lock()defer t.rw.Unlock()nd := t.root.Add(path)if isrec {return t.watchrec(nd, c, eset|recursive)}return t.watch(nd, c, eset)}
inotify 的 Watch 只是 watch 的简单封装,这个方法中最重要的方法是 lazyinit
func (i *inotify) watch(path string, e Event) (err error) {if e&^(All|Event(unix.IN_ALL_EVENTS)) != 0 {return errors.New("notify: unknown event")}if err = i.lazyinit(); err != nil {return}iwd, err := unix.InotifyAddWatch(int(i.fd), path, encode(e))if err != nil {return}i.Lock()if wd, ok := i.m[int32(iwd)]; !ok {i.m[int32(iwd)] = &watched{path: path, mask: uint32(e)}} else {wd.path = pathwd.mask = uint32(e)}i.Unlock()return nil}
lazyinit 保证 epoll 被正确的初始化,并启动 loop 协程和 send 协程,两个协程通过 esch 进行通信
func (i *inotify) lazyinit() error {if atomic.LoadInt32(&i.fd) == invalidDescriptor {i.Lock()defer i.Unlock()if atomic.LoadInt32(&i.fd) == invalidDescriptor {fd, err := unix.InotifyInit1(unix.IN_CLOEXEC)if err != nil {return err}i.fd = int32(fd)if err = i.epollinit(); err != nil {_, _ = i.epollclose(), unix.Close(int(fd)) // Ignore errors.i.fd = invalidDescriptorreturn err}esch := make(chan []*event)go i.loop(esch)i.wg.Add(consumersCount)for n := 0; n < consumersCount; n++ {go i.send(esch)}}}return nil}
loop 协程执行 epoll 操作,并将全部事件变更分发至 esch,等待 send 协程处理
func (i *inotify) loop(esch chan<- []*event) {epes := make([]unix.EpollEvent, 1)fd := atomic.LoadInt32(&i.fd)for {switch _, err := unix.EpollWait(i.epfd, epes, -1); err {case nil:switch epes[0].Fd {case fd:esch <- i.read()epes[0].Fd = 0case int32(i.pipefd[0]):i.Lock()defer i.Unlock()if err = unix.Close(int(fd)); err != nil && err != unix.EINTR {panic("notify: close(2) error " + err.Error())}atomic.StoreInt32(&i.fd, invalidDescriptor)if err = i.epollclose(); err != nil && err != unix.EINTR {panic("notify: epollclose error " + err.Error())}close(esch)return}case unix.EINTR:continuedefault: // We should never reach this line.panic("notify: epoll_wait(2) error " + err.Error())}}}
send 协程将事件做转换,并分发至 nonrecursiveTree 的 dispatch 协程,至此全部逻辑完成联通。
func (i *inotify) send(esch <-chan []*event) {for es := range esch {for _, e := range i.transform(es) {if e != nil {i.c <- e}}}i.wg.Done()}
