Defaults
const (
// Default minio configuration directory where below configuration files/directories are stored.
defaultMinioConfigDir = ".minio"
// Directory contains below files/directories for HTTPS configuration.
certsDir = "certs"
// Directory contains all CA certificates other than system defaults for HTTPS.
certsCADir = "CAs"
// Public certificate file for HTTPS.
publicCertFile = "public.crt"
// Private key file for HTTPS.
privateKeyFile = "private.key"
)
defaultMinioConfigDir 表示 Minio 默认配置文件所在目录为 .minio;默认证书目录为 .minio/certs,CA 目录为 .minio/CAs,默认公钥文件名为 public.crt,默认私钥文件名为 private.key。
getDefaultConfigDir 获取默认配置目录,从代码看默认配置文件目录为 homedir/.minio。默认证书、CA 目录分别为 homedir/.minio/certs、homedir/.minio/CAs。默认配置文件为 homedir/config.json。
func getDefaultConfigDir() string {
homeDir, err := homedir.Dir()
if err != nil {
return ""
}
return filepath.Join(homeDir, defaultMinioConfigDir)
}
func getDefaultCertsDir() string {
return filepath.Join(getDefaultConfigDir(), certsDir)
}
func getDefaultCertsCADir() string {
return filepath.Join(getDefaultCertsDir(), certsCADir)
}
func getConfigFile() string {
return filepath.Join(globalConfigDir.Get(), minioConfigFile)
}
以下变量保存初始默认目录
var (
// Default config, certs and CA directories.
defaultConfigDir = &ConfigDir{path: getDefaultConfigDir()}
defaultCertsDir = &ConfigDir{path: getDefaultCertsDir()}
defaultCertsCADir = &ConfigDir{path: getDefaultCertsCADir()}
// Points to current configuration directory -- deprecated, to be removed in future.
globalConfigDir = defaultConfigDir
// Points to current certs directory set by user with --certs-dir
globalCertsDir = defaultCertsDir
// Points to relative path to certs directory and is <value-of-certs-dir>/CAs
globalCertsCADir = defaultCertsCADir
)
Certificate Manager
Types
Manager 管理多个证书,相关的结构有 pair,用于保存私钥文件名和证书文件名的组合;加载证书的方法 LoadX509KeyPairFunc 类型,本域使用 config.LoadX509KeyPair。
type Manager struct {
lock sync.RWMutex
certificates map[pair]*tls.Certificate // Mapping: certificate file name => TLS certificates
defaultCert pair
duration time.Duration
loadX509KeyPair LoadX509KeyPairFunc
done <-chan struct{}
reloadCerts []chan struct{}
}
type pair struct {
KeyFile string
CertFile string
}
接下来看 Manager 实际创建过程中核心部分。
manager = &Manager{
certificates: map[pair]*tls.Certificate{},
defaultCert: pair{
KeyFile: keyFile,
CertFile: certFile,
},
loadX509KeyPair: loadX509KeyPair,
done: ctx.Done(),
duration: 1 * time.Minute,
}
初始化完毕后,添加默认证书,接下来会详细分析 Manager 实现的各种细节。
if err := manager.AddCertificate(certFile, keyFile); err != nil {
return nil, err
}
AddCertificate
图1: AddCertificate 示意图
AddCertificate 大致过程如上图所示。只有最后一步的监控文件变更需要特殊说明,根据输入文件名是否符号链接有两种监控方式,如下所示,至此已经出现两个不同功能的协程和一种文件变更检查方法,后续再展开。
if certFileIsLink && keyFileIsLink {
go m.watchSymlinks(certFile, keyFile)
} else {
// Windows doesn't allow for watching file changes but instead allows
// for directory changes only, while we can still watch for changes
// on files on other platforms. Watch parent directory on all platforms
// for simplicity.
if err = notify.Watch(filepath.Dir(certFile), m.events, eventWrite...); err != nil {
return err
}
if err = notify.Watch(filepath.Dir(keyFile), m.events, eventWrite...); err != nil {
return err
}
}
Load Certificate
config.LoadX509KeyPair 根据传入的证书文件名、私钥文件名获取需要的证书,方法附上,不做详细注释,以后类似场景下可修改使用。
func LoadX509KeyPair(certFile, keyFile string) (tls.Certificate, error) {
certPEMBlock, err := ioutil.ReadFile(certFile)
if err != nil {
return tls.Certificate{}, ErrSSLUnexpectedError(err)
}
keyPEMBlock, err := ioutil.ReadFile(keyFile)
if err != nil {
return tls.Certificate{}, ErrSSLUnexpectedError(err)
}
key, rest := pem.Decode(keyPEMBlock)
if len(rest) > 0 {
return tls.Certificate{}, ErrSSLUnexpectedData(nil).Msg("The private key contains additional data")
}
if x509.IsEncryptedPEMBlock(key) {
password := env.Get(EnvCertPassword, "")
if len(password) == 0 {
return tls.Certificate{}, ErrSSLNoPassword(nil)
}
decryptedKey, decErr := x509.DecryptPEMBlock(key, []byte(password))
if decErr != nil {
return tls.Certificate{}, ErrSSLWrongPassword(decErr)
}
keyPEMBlock = pem.EncodeToMemory(&pem.Block{Type: key.Type, Bytes: decryptedKey})
}
cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock)
if err != nil {
return tls.Certificate{}, ErrSSLUnexpectedData(nil).Msg(err.Error())
}
// Ensure that the private key is not a P-384 or P-521 EC key.
// The Go TLS stack does not provide constant-time implementations of P-384 and P-521.
if priv, ok := cert.PrivateKey.(crypto.Signer); ok {
if pub, ok := priv.Public().(*ecdsa.PublicKey); ok {
switch pub.Params().Name {
case "P-384":
fallthrough
case "P-521":
// unfortunately there is no cleaner way to check
return tls.Certificate{}, ErrSSLUnexpectedData(nil).Msg("tls: the ECDSA curve '%s' is not supported", pub.Params().Name)
}
}
}
return cert, nil
}
Watching Symbol Link
watchSymlinks 方法用于检查符号链接的变更。不难看出,这个监控的实现比较简单,完全靠定时器触发来进行检测,触发后,重新读取监控的符号链接指向文件的内容,并更改对应记录。
func (m *Manager) watchSymlinks(watch pair, reload <-chan struct{}) {
t := time.NewTimer(m.duration)
defer t.Stop()
for {
select {
case <-m.done:
return // Once stopped exits this routine.
case <-t.C:
case <-reload:
}
t.Reset(m.duration) // Reset timer for new duration
certificate, err := m.loadX509KeyPair(watch.CertFile, watch.KeyFile)
if err != nil {
continue
}
if certificate.Leaf == nil { // This is a performance optimisation
certificate.Leaf, err = x509.ParseCertificate(certificate.Certificate[0])
if err != nil {
continue
}
}
m.lock.Lock()
m.certificates[watch] = &certificate
m.lock.Unlock()
}
}
Watch File Events
watchFileEvents 是在 Manager 创建时同步运行的一个协程。在代码中可以看出,这个协程的核心触发条件是有文件系统变更事件通知,且仅对写文件事件作出反应,动作一样是加载新证书,并变更记录即可。
func (m *Manager) watchFileEvents(watch pair, events chan notify.EventInfo, reload <-chan struct{}) {
for {
select {
case <-m.done:
return
case event := <-events:
if !isWriteEvent(event.Event()) {
continue
}
p := event.Path()
if watch.KeyFile != p && watch.CertFile != p {
continue
}
case <-reload:
}
// Do reload
certificate, err := m.loadX509KeyPair(watch.CertFile, watch.KeyFile)
if err != nil {
continue
}
if certificate.Leaf == nil { // This is performance optimisation
certificate.Leaf, err = x509.ParseCertificate(certificate.Certificate[0])
if err != nil {
continue
}
}
m.lock.Lock()
m.certificates[watch] = &certificate
m.lock.Unlock()
}
}
这个代码有一个问题,需要监听变更的文件在哪里注册的?回忆一下 AddCertificate 方法,当传入的证书路径、私钥路径是普通文件时,有以下操作
if err = notify.Watch(filepath.Dir(certFile), m.events, eventWrite...); err != nil {
return err
}
if err = notify.Watch(filepath.Dir(keyFile), m.events, eventWrite...); err != nil {
return err
}
因此,目标文件的监控是由 notify.Watch 来注册并进行实际监控的,那么接下来,我们来看这个功能到底是如何实现的。
File Change Notification
Event
Event 类型就是 uint32,具体来说 Event 通过不同的位代表不同类型的操作,比如 Create 是 256,通过不同位的组合记录对一个文件的操作。共有四种独立操作:创建、移除、写入及重命名。
type Event uint32
const (
Create = osSpecificCreate
Remove = osSpecificRemove
Write = osSpecificWrite
Rename = osSpecificRename
// All is handful alias for all platform-independent event values.
All = Create | Remove | Write | Rename
)
为方便调试,Event 实现了 String 方法,其中有两个映射:estr 及 osestr
func (e Event) String() string {
var s []string
for _, strmap := range []map[Event]string{estr, osestr} {
for ev, str := range strmap {
if e&ev == ev {
s = append(s, str)
}
}
}
return strings.Join(s, "|")
}
首先看 estr
var estr = map[Event]string{
Create: "notify.Create",
Remove: "notify.Remove",
Write: "notify.Write",
Rename: "notify.Rename",
recursive: "recursive",
omit: "omit",
}
再看 osestr,由于文件系统监控与操作系统相关,后续无特殊说明,均以 Linux 为例
const (
osSpecificCreate = Event(FSEventsCreated)
osSpecificRemove = Event(FSEventsRemoved)
osSpecificWrite = Event(FSEventsModified)
osSpecificRename = Event(FSEventsRenamed)
// internal = Event(0x100000)
// recursive is used to distinguish recursive eventsets from non-recursive ones
recursive = Event(0x200000)
// omit is used for dispatching internal events; only those events are sent
// for which both the event and the watchpoint has omit in theirs event sets.
omit = Event(0x400000)
)
var osestr = map[Event]string{
FSEventsMustScanSubDirs: "notify.FSEventsMustScanSubDirs",
FSEventsUserDropped: "notify.FSEventsUserDropped",
FSEventsKernelDropped: "notify.FSEventsKernelDropped",
FSEventsEventIdsWrapped: "notify.FSEventsEventIdsWrapped",
FSEventsHistoryDone: "notify.FSEventsHistoryDone",
FSEventsRootChanged: "notify.FSEventsRootChanged",
FSEventsMount: "notify.FSEventsMount",
FSEventsUnmount: "notify.FSEventsUnmount",
FSEventsInodeMetaMod: "notify.FSEventsInodeMetaMod",
FSEventsFinderInfoMod: "notify.FSEventsFinderInfoMod",
FSEventsChangeOwner: "notify.FSEventsChangeOwner",
FSEventsXattrMod: "notify.FSEventsXattrMod",
FSEventsIsFile: "notify.FSEventsIsFile",
FSEventsIsDir: "notify.FSEventsIsDir",
FSEventsIsSymlink: "notify.FSEventsIsSymlink",
}
Interface
图2:文件监控全景图
文件监控的大致流程如上图所示,核心接口有 tree、watcher 及 EventInfo。tree 接口可理解为监控的文件树,watcher 为监控文件实例,EventInfo 包含文件变更必须的信息。watch loop 表示底层核心的监控模块儿。三个接口的定义如下
type tree interface {
Watch(string, chan<- EventInfo, ...Event) error
Stop(chan<- EventInfo)
Close() error
}
type watcher interface {
// Watch requests a watcher creation for the given path and given event set.
Watch(path string, event Event) error
// Unwatch requests a watcher deletion for the given path and given event set.
Unwatch(path string) error
// Rewatch provides a functionality for modifying existing watch-points, like
// expanding its event set.
//
// Rewatch modifies existing watch-point under for the given path. It passes
// the existing event set currently registered for the given path, and the
// new, requested event set.
//
// It is guaranteed that Tree will not pass to Rewatch zero value for any
// of its arguments. If old == new and watcher can be upgraded to
// recursiveWatcher interface, a watch for the corresponding path is expected
// to be changed from recursive to the non-recursive one.
Rewatch(path string, old, new Event) error
// Close unwatches all paths that are registered. When Close returns, it
// is expected it will report no more events.
Close() error
}
type EventInfo interface {
Event() Event // event value for the filesystem action
Path() string // real path of the file or directory
Sys() interface{} // underlying data source (can return nil)
}
INode Watcher
图3: Linux 监控文件变更全景图
首先我们看一下基本的数据类型,简要说明下这些基本类型间的关联,nonrecursiveTree 主要包含了两个核心结构:root 用于保存监控的文件树,并在 node 的 watchpoint 域中关联对外的 chan;inotify 用于控制 epoll 过程。注意在 watchpoint 中,将内部 EventInfo 接收 chan 映射为了外部 Event。
type nonrecursiveTree struct {
rw sync.RWMutex // protects root
root root
w watcher
c chan EventInfo
rec chan EventInfo
}
type root struct {
nd node
}
type watchpoint map[chan<- EventInfo]Event
type inotify struct {
sync.RWMutex // protects inotify.m map
m map[int32]*watched // watch descriptor to watched object
fd int32 // inotify file descriptor
pipefd []int // pipe's read and write descriptors
epfd int // epoll descriptor
epes []unix.EpollEvent // epoll events
buffer [eventBufferSize]byte // inotify event buffer
wg sync.WaitGroup // wait group used to close main loop
c chan<- EventInfo // event dispatcher channel
}
type watched struct {
path string
mask uint32
}
Default Tree Creation
文件监控系统有一个默认的 tree 实例,通过如下代码创建,由于 Linux 实现中的 inotify 没有实现递归监控接口,因此实际执行非递归树(第 7 行)部分
func newTree() tree {
c := make(chan EventInfo, buffer)
w := newWatcher(c)
if rw, ok := w.(recursiveWatcher); ok {
return newRecursiveTree(rw, c)
}
return newNonrecursiveTree(w, c, make(chan EventInfo, buffer))
}
watcher 部分创建没有什么好讲的,注意其中的 epes 用于保存 epoll 事件就好了
func newWatcher(c chan<- EventInfo) watcher {
i := &inotify{
m: make(map[int32]*watched),
fd: invalidDescriptor,
pipefd: []int{invalidDescriptor, invalidDescriptor},
epfd: invalidDescriptor,
epes: make([]unix.EpollEvent, 0),
c: c,
}
runtime.SetFinalizer(i, func(i *inotify) {
i.epollclose()
if i.fd != invalidDescriptor {
unix.Close(int(i.fd))
}
})
return i
}
继续跟踪至非递归树监控部分,可以看到它主要是启动了两个协程,一个用于分发事件,一个用于内部处理(方法名叫 internal,应该是合理的猜测)
func newNonrecursiveTree(w watcher, c, rec chan EventInfo) *nonrecursiveTree {
if rec == nil {
rec = make(chan EventInfo, buffer)
}
t := &nonrecursiveTree{
root: root{nd: newnode("")},
w: w,
c: c,
rec: rec,
}
go t.dispatch(c)
go t.internal(rec)
return t
}
dispatch 协程将从 watcher 中接受到的事件分别分发给合适的 node 再进行分发及分发给 rec,触发内部处理(internal)协程执行
func (t *nonrecursiveTree) dispatch(c <-chan EventInfo) {
for ei := range c {
dbgprintf("dispatching %v on %q", ei.Event(), ei.Path())
go func(ei EventInfo) {
var nd node
var isrec bool
dir, base := split(ei.Path())
fn := func(it node, isbase bool) error {
isrec = isrec || it.Watch.IsRecursive()
if isbase {
nd = it
} else {
it.Watch.Dispatch(ei, recursive)
}
return nil
}
t.rw.RLock()
// Notify recursive watchpoints found on the path.
if err := t.root.WalkPath(dir, fn); err != nil {
dbgprint("dispatch did not reach leaf:", err)
t.rw.RUnlock()
return
}
// Notify parent watchpoint.
nd.Watch.Dispatch(ei, 0)
isrec = isrec || nd.Watch.IsRecursive()
// If leaf watchpoint exists, notify it.
if nd, ok := nd.Child[base]; ok {
isrec = isrec || nd.Watch.IsRecursive()
nd.Watch.Dispatch(ei, 0)
}
t.rw.RUnlock()
// If the event describes newly leaf directory created within
if !isrec || ei.Event()&(Create|Remove) == 0 {
return
}
if ok, err := ei.(isDirer).isDir(); !ok || err != nil {
return
}
t.rec <- ei
}(ei)
}
}
内部处理协程仅处理移除事件,并将对应的 node 移除掉,并在 watcher 上对其执行 Unwatch 操作
func (t *nonrecursiveTree) internal(rec <-chan EventInfo) {
for ei := range rec {
t.rw.Lock()
if ei.Event() == Remove {
nd, err := t.root.Get(ei.Path())
if err != nil {
t.rw.Unlock()
continue
}
t.walkWatchpoint(nd, func(_ Event, nd node) error {
t.w.Unwatch(nd.Name)
return nil
})
t.root.Del(ei.Path())
t.rw.Unlock()
continue
}
var nd node
var eset = internal
t.root.WalkPath(ei.Path(), func(it node, _ bool) error {
if e := it.Watch[t.rec]; e != 0 && e > eset {
eset = e
}
nd = it
return nil
})
if eset == internal {
t.rw.Unlock()
continue
}
if ei.Path() != nd.Name {
nd = nd.Add(ei.Path())
}
err := nd.AddDir(t.recFunc(eset))
t.rw.Unlock()
if err != nil {
dbgprintf("internal(%p) error: %v", rec, err)
}
}
}
Watch
Watch 操作将需要监控的文件记录在内部目录树中,并在 watcher 中进行添加,这个代码可自行进行分析,何时会执行 watchrec 何时会执行 watch,我们在此只继续跟踪 watch 方法
func (t *nonrecursiveTree) Watch(path string, c chan<- EventInfo, events ...Event) error {
if c == nil {
panic("notify: Watch using nil channel")
}
// Expanding with empty event set is a nop.
if len(events) == 0 {
return nil
}
path, isrec, err := cleanpath(path)
if err != nil {
return err
}
eset := joinevents(events)
t.rw.Lock()
defer t.rw.Unlock()
nd := t.root.Add(path)
if isrec {
return t.watchrec(nd, c, eset|recursive)
}
return t.watch(nd, c, eset)
}
inotify 的 Watch 只是 watch 的简单封装,这个方法中最重要的方法是 lazyinit
func (i *inotify) watch(path string, e Event) (err error) {
if e&^(All|Event(unix.IN_ALL_EVENTS)) != 0 {
return errors.New("notify: unknown event")
}
if err = i.lazyinit(); err != nil {
return
}
iwd, err := unix.InotifyAddWatch(int(i.fd), path, encode(e))
if err != nil {
return
}
i.Lock()
if wd, ok := i.m[int32(iwd)]; !ok {
i.m[int32(iwd)] = &watched{path: path, mask: uint32(e)}
} else {
wd.path = path
wd.mask = uint32(e)
}
i.Unlock()
return nil
}
lazyinit 保证 epoll 被正确的初始化,并启动 loop 协程和 send 协程,两个协程通过 esch 进行通信
func (i *inotify) lazyinit() error {
if atomic.LoadInt32(&i.fd) == invalidDescriptor {
i.Lock()
defer i.Unlock()
if atomic.LoadInt32(&i.fd) == invalidDescriptor {
fd, err := unix.InotifyInit1(unix.IN_CLOEXEC)
if err != nil {
return err
}
i.fd = int32(fd)
if err = i.epollinit(); err != nil {
_, _ = i.epollclose(), unix.Close(int(fd)) // Ignore errors.
i.fd = invalidDescriptor
return err
}
esch := make(chan []*event)
go i.loop(esch)
i.wg.Add(consumersCount)
for n := 0; n < consumersCount; n++ {
go i.send(esch)
}
}
}
return nil
}
loop 协程执行 epoll 操作,并将全部事件变更分发至 esch,等待 send 协程处理
func (i *inotify) loop(esch chan<- []*event) {
epes := make([]unix.EpollEvent, 1)
fd := atomic.LoadInt32(&i.fd)
for {
switch _, err := unix.EpollWait(i.epfd, epes, -1); err {
case nil:
switch epes[0].Fd {
case fd:
esch <- i.read()
epes[0].Fd = 0
case int32(i.pipefd[0]):
i.Lock()
defer i.Unlock()
if err = unix.Close(int(fd)); err != nil && err != unix.EINTR {
panic("notify: close(2) error " + err.Error())
}
atomic.StoreInt32(&i.fd, invalidDescriptor)
if err = i.epollclose(); err != nil && err != unix.EINTR {
panic("notify: epollclose error " + err.Error())
}
close(esch)
return
}
case unix.EINTR:
continue
default: // We should never reach this line.
panic("notify: epoll_wait(2) error " + err.Error())
}
}
}
send 协程将事件做转换,并分发至 nonrecursiveTree 的 dispatch 协程,至此全部逻辑完成联通。
func (i *inotify) send(esch <-chan []*event) {
for es := range esch {
for _, e := range i.transform(es) {
if e != nil {
i.c <- e
}
}
}
i.wg.Done()
}