Relationship
图 1:Endpoint 核心数据结构关系
在上图中,EndpointServerPools 是类型 []PoolEndpoints,PoolEndpoints 中包含一个 Endpoint 数组。SetCount 表示这个 PoolEndpoints 中有几组 Endpoint,DrivesPerSet 表示一个分组中有几个终端节点。
Create Endpoints
Arguments
创建 EndpointServerPools 的代码如下所示,其中,serverCmdArgs(ctx) 获取 minio 启动命令的参数信息,接下来,进入参数处理方法,看看都做了什么。
globalEndpoints, setupType, err = createServerEndpoints(globalMinioAddr, serverCmdArgs(ctx)...)
可以看到,minio 会从环境变量中检查 MINIO_ARGS 参数,如果没有配置,则读取 MINIO_ENDPOINTS 参数,如果仍然没有,直接读取命令行参数,这就是 minio 获取配置的顺序。这里需要注意,环境变量的管理并不是简单读取系统环境变量这么简单,详细请参阅环境变量。
// EnvArgs = "MINIO_ARGS"// EnvEndpoints = "MINIO_ENDPOINTS"func serverCmdArgs(ctx *cli.Context) []string {v := env.Get(config.EnvArgs, "")if v == "" {// Fall back to older ENV MINIO_ENDPOINTSv = env.Get(config.EnvEndpoints, "")}if v == "" {if !ctx.Args().Present() || ctx.Args().First() == "help" {cli.ShowCommandHelpAndExit(ctx, ctx.Command.Name, 1)}return ctx.Args()}return strings.Fields(v)}
Custom Set Drive Count
可通过环境变量 MINIO_ERASURE_SET_DRIVE_COUNT 来自定义 DrivePerSet 值,注意这个值最好设置为偶数。
var customSetDriveCount uint64if v := env.Get(EnvErasureSetDriveCount, ""); v != "" {driveCount, err := strconv.Atoi(v)if err != nil {return nil, config.ErrInvalidErasureSetSize(err)}customSetDriveCount = uint64(driveCount)}
Parse Endpoint Set
func parseEndpointSet(customSetDriveCount uint64, args ...string) (ep endpointSet, err error) {var argPatterns = make([]ellipses.ArgPattern, len(args))for i, arg := range args {patterns, perr := ellipses.FindEllipsesPatterns(arg)if perr != nil {return endpointSet{}, config.ErrInvalidErasureEndpoints(nil).Msg(perr.Error())}argPatterns[i] = patterns}ep.setIndexes, err = getSetIndexes(args, getTotalSizes(argPatterns), customSetDriveCount, argPatterns)if err != nil {return endpointSet{}, config.ErrInvalidErasureEndpoints(nil).Msg(err.Error())}ep.argPatterns = argPatternsreturn ep, nil}
L2 - L9:将参数进行解析,获取各自需要匹配的省略号模式;
L11: 根据模式,获取各个 Endpoint 分组信息;
L16: 保留省略号模式信息;
L11 中的 getTotalSizes 如下所示,遍历每个模式,计算每个模式展开后的实际条目数量,注意 L6 的乘法运算,展开模式时,如果有多个省略号模式,如:xxx{1…64}/yyy{1…4} 会扩展为 xxx1/yyy1 这样的项目,因此时乘法关系
func getTotalSizes(argPatterns []ellipses.ArgPattern) []uint64 {var totalSizes []uint64for _, argPattern := range argPatterns {var totalSize uint64 = 1for _, p := range argPattern {totalSize = totalSize * uint64(len(p.Seq))}totalSizes = append(totalSizes, totalSize)}return totalSizes}
endpointSet 定义如下
type endpointSet struct {argPatterns []ellipses.ArgPatternendpoints []string // Endpoints saved from previous GetEndpoints().setIndexes [][]uint64 // All the sets.}
Get Set Index
getSetIndexes 方法较为复杂,函数声明如下所示,需要注意的是 args 中又可能会含有包含省略号的配置项,也有可能只有一个参数,后续无特殊说明,全部都针对多参数项且带省略号配置来分析。在上面章节的分析中,我们知道 args 为输入参数,totalSize 为对应索引的 args[index] 展开后项目的条数,其他参数都有覆盖,不再说明。
func getSetIndexes(args []string,totalSizes []uint64,customSetDriveCount uint64,argPatterns []ellipses.ArgPattern) (setIndexes [][]uint64, err error)
分配一个二维切片,长度与 totalSize 相同,并遍历 totalSize,筛查是否存在不满足条件的分组,如果存在则退出执行
setIndexes = make([][]uint64, len(totalSizes))for _, totalSize := range totalSizes {// Check if totalSize has minimum range upto setSizeif totalSize < setSizes[0] || totalSize < customSetDriveCount {msg := fmt.Sprintf("Incorrect number of endpoints provided %s", args)return nil, config.ErrInvalidNumberOfErasureEndpoints(nil).Msg(msg)}}
setSizes 定义如下,因此,一个分组长度至少为 4
var setSizes = []uint64{4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
然后找到 totalSize 中全部长度的最大公约数,注意 GCD 计算方法在 L4 - L9
commonSize := getDivisibleSize(totalSizes)func getDivisibleSize(totalSizes []uint64) (result uint64) {gcd := func(x, y uint64) uint64 {for y != 0 {x, y = y, x%y}return x}result = totalSizes[0]for i := 1; i < len(totalSizes); i++ {result = gcd(result, totalSizes[i])}return result}
根据计算的 commonSize,在 setSizes 中筛选能够整除 commonSize 的数,以数组形式返回
possibleSetCounts := func(setSize uint64) (ss []uint64) {for _, s := range setSizes {if setSize%s == 0 {ss = append(ss, s)}}return ss}setCounts := possibleSetCounts(commonSize)if len(setCounts) == 0 {msg := fmt.Sprintf("Incorrect number of endpoints provided %s, number of disks %d is not divisible by any supported erasure set sizes %d", args, commonSize, setSizes)return nil, config.ErrInvalidNumberOfErasureEndpoints(nil).Msg(msg)}
接下来计算 setSize,如果定义了 customSetDriveCount,那么从刚返回的 setCounts 中找到这个数字,如果找不到,返回失败;
var setSize uint64// Custom set drive count allows to override automatic distribution.// only meant if you want to further optimize drive distribution.if customSetDriveCount > 0 {msg := fmt.Sprintf("Invalid set drive count. Acceptable values for %d number drives are %d", commonSize, setCounts)var found boolfor _, ss := range setCounts {if ss == customSetDriveCount {found = true}}if !found {return nil, config.ErrInvalidErasureSetSize(nil).Msg(msg)}// No automatic symmetry calculation expected, user is on their ownsetSize = customSetDriveCountglobalCustomErasureDriveCount = true} else {// Returns possible set counts with symmetry.setCounts = possibleSetCountsWithSymmetry(setCounts, argPatterns)if len(setCounts) == 0 {msg := fmt.Sprintf("No symmetric distribution detected with input endpoints provided %s, disks %d cannot be spread symmetrically by any supported erasure set sizes %d", args, commonSize, setSizes)return nil, config.ErrInvalidNumberOfErasureEndpoints(nil).Msg(msg)}// Final set size with all the symmetry accounted for.setSize = commonSetDriveCount(commonSize, setCounts)}
如果没有定义 customSetDriveCount,则重新计算 setCounts。遍历原始 setCounts,找到能被全部模式展开后数量整除的数字,保存下来。注意 L5 - L13,如果一个子模式的全部展开项数量能整除,就说明这个模式能被该数字整除(一个模式展开项总数必然是子项目展开项总数的倍数)。
func possibleSetCountsWithSymmetry(setCounts []uint64, argPatterns []ellipses.ArgPattern) []uint64 {var newSetCounts = make(map[uint64]struct{})for _, ss := range setCounts {var symmetry boolfor _, argPattern := range argPatterns {for _, p := range argPattern {if uint64(len(p.Seq)) > ss {symmetry = uint64(len(p.Seq))%ss == 0} else {symmetry = ss%uint64(len(p.Seq)) == 0}}}// With no arg patterns, it is expected that user knows// the right symmetry, so either ellipses patterns are// provided (recommended) or no ellipses patterns.if _, ok := newSetCounts[ss]; !ok && (symmetry || argPatterns == nil) {newSetCounts[ss] = struct{}{}}}setCounts = []uint64{}for setCount := range newSetCounts {setCounts = append(setCounts, setCount)}// Not necessarily needed but it ensures to the readers// eyes that we prefer a sorted setCount slice for the// subsequent function to figure out the right common// divisor, it avoids loops.sort.Slice(setCounts, func(i, j int) bool {return setCounts[i] < setCounts[j]})return setCounts}
根据 commonSize,及新计算的 setCounts 重新计算 setSize。setSize 选择的是 setCounts 中最大的能整除 commonSize 的数。
setSize = commonSetDriveCount(commonSize, setCounts)func commonSetDriveCount(divisibleSize uint64, setCounts []uint64) (setSize uint64) {// prefers setCounts to be sorted for optimal behavior.if divisibleSize < setCounts[len(setCounts)-1] {return divisibleSize}// Figure out largest value of total_drives_in_erasure_set which results// in least number of total_drives/total_drives_erasure_set ratio.prevD := divisibleSize / setCounts[0]for _, cnt := range setCounts {if divisibleSize%cnt == 0 {d := divisibleSize / cntif d <= prevD {prevD = dsetSize = cnt}}}return setSize}
最后,根据每个参数展开项长度,分别除以 setSize 的到分组数量,再填充 setSize。
for i := range totalSizes {for j := uint64(0); j < totalSizes[i]/setSize; j++ {setIndexes[i] = append(setIndexes[i], setSize)}}
因此,最后返回的 setIndexes 中全部元素值都是相同的,如
[[4, 4],[4, 4, 4, 4],[4, 4]]
Get Endpoints Set
endpointSet 中,保存了原始参数,每个分组长度。Get 方法获取具体的分组方式,getEndpoints 将包含省略号的参数展开,获取全部的 Endpoint 名称,然后根据 setIndexes 进行划分。
func (s endpointSet) Get() (sets [][]string) {var k = uint64(0)endpoints := s.getEndpoints()for i := range s.setIndexes {for j := range s.setIndexes[i] {sets = append(sets, endpoints[k:s.setIndexes[i][j]+k])k = s.setIndexes[i][j] + k}}return sets}
Create Server Pools
经过上面的讲解,继续看如何创建 EndpointServerPools,可以发现,整个 Arguments 部分都在下面代码的 L2 种执行。因为 GetAllSets 获取的是最终的分组,且分组长度均为相同值,那么 L12 - L13 行的计算也就不难理解了。
for _, arg := range args {setArgs, err := GetAllSets(arg)if err != nil {return nil, -1, err}endpointList, gotSetupType, err := CreateEndpoints(serverAddr, foundPrevLocal, setArgs...)if err != nil {return nil, -1, err}if err = endpointServerPools.Add(PoolEndpoints{SetCount: len(setArgs),DrivesPerSet: len(setArgs[0]),Endpoints: endpointList,}); err != nil {return nil, -1, err}foundPrevLocal = endpointList.atleastOneEndpointLocal()if setupType == UnknownSetupType {setupType = gotSetupType}if setupType == ErasureSetupType && gotSetupType == DistErasureSetupType {setupType = DistErasureSetupType}}
NewEndpoints
根据传入的节点地址,创建全部节点,注意全部节点类型必须一致,否则会报错。String Set 按照字符串集合来理解,可确认内部没有重复元素。
func NewEndpoints(args ...string) (endpoints Endpoints, err error) {var endpointType EndpointTypevar scheme stringuniqueArgs := set.NewStringSet()// Loop through args and adds to endpoint list.for i, arg := range args {endpoint, err := NewEndpoint(arg)if err != nil {return nil, fmt.Errorf("'%s': %s", arg, err.Error())}// All endpoints have to be same type and scheme if applicable.if i == 0 {endpointType = endpoint.Type()scheme = endpoint.Scheme} else if endpoint.Type() != endpointType {return nil, fmt.Errorf("mixed style endpoints are not supported")} else if endpoint.Scheme != scheme {return nil, fmt.Errorf("mixed scheme is not supported")}arg = endpoint.String()if uniqueArgs.Contains(arg) {return nil, fmt.Errorf("duplicate endpoints found")}uniqueArgs.Add(arg)endpoints = append(endpoints, endpoint)}return endpoints, nil}
不难看出,核心方法是 NewEndpoint,代码如下
func NewEndpoint(arg string) (ep Endpoint, e error) {// isEmptyPath - check whether given path is not empty.isEmptyPath := func(path string) bool {return path == "" || path == SlashSeparator || path == `\`}if isEmptyPath(arg) {return ep, fmt.Errorf("empty or root endpoint is not supported")}var isLocal boolvar host stringu, err := url.Parse(arg)if err == nil && u.Host != "" {// URL style of endpoint.// Valid URL style endpoint is// - Scheme field must contain "http" or "https"// - All field should be empty except Host and Path.if !((u.Scheme == "http" || u.Scheme == "https") &&u.User == nil && u.Opaque == "" && !u.ForceQuery && u.RawQuery == "" && u.Fragment == "") {return ep, fmt.Errorf("invalid URL endpoint format")}var port stringhost, port, err = net.SplitHostPort(u.Host)if err != nil {if !strings.Contains(err.Error(), "missing port in address") {return ep, fmt.Errorf("invalid URL endpoint format: %w", err)}host = u.Host} else {var p intp, err = strconv.Atoi(port)if err != nil {return ep, fmt.Errorf("invalid URL endpoint format: invalid port number")} else if p < 1 || p > 65535 {return ep, fmt.Errorf("invalid URL endpoint format: port number must be between 1 to 65535")}}if i := strings.Index(host, "%"); i > -1 {host = host[:i]}if host == "" {return ep, fmt.Errorf("invalid URL endpoint format: empty host name")}// As this is path in the URL, we should use path package, not filepath package.// On MS Windows, filepath.Clean() converts into Windows path style ie `/foo` becomes `\foo`u.Path = path.Clean(u.Path)if isEmptyPath(u.Path) {return ep, fmt.Errorf("empty or root path is not supported in URL endpoint")}// On windows having a preceding SlashSeparator will cause problems, if the// command line already has C:/<export-folder/ in it. Final resulting// path on windows might become C:/C:/ this will cause problems// of starting minio server properly in distributed mode on windows.// As a special case make sure to trim the separator.// NOTE: It is also perfectly fine for windows users to have a path// without C:/ since at that point we treat it as relative path// and obtain the full filesystem path as well. Providing C:/// style is necessary to provide paths other than C:/,// such as F:/, D:/ etc.//// Another additional benefit here is that this style also// supports providing \\host\share support as well.if runtime.GOOS == globalWindowsOSName {if filepath.VolumeName(u.Path[1:]) != "" {u.Path = u.Path[1:]}}} else {// Only check if the arg is an ip address and ask for scheme since its absent.// localhost, example.com, any FQDN cannot be disambiguated from a regular file path such as// /mnt/export1. So we go ahead and start the minio server in FS modes in these cases.if isHostIP(arg) {return ep, fmt.Errorf("invalid URL endpoint format: missing scheme http or https")}absArg, err := filepath.Abs(arg)if err != nil {return Endpoint{}, fmt.Errorf("absolute path failed %s", err)}u = &url.URL{Path: path.Clean(absArg)}isLocal = true}return Endpoint{URL: u,IsLocal: isLocal,}, nil}
L14 - L75 处理 URL 式的参数;L77 - L88 处理文件路径式的参数。需要注意,仅当参数为文件路径时,isLocal 为 true。
Endpoint Type
Endpoint 只有两种类型,PathEndpointType 与 URLEndpointType,分别对应本地文件系统类型和网络类型的节点。
func (endpoint Endpoint) Type() EndpointType {if endpoint.Host == "" {return PathEndpointType}return URLEndpointType}
创建完全部的节点后,判断节点类型,如果均为 Path 型,完成退出。
if endpoints[0].Type() == PathEndpointType {setupType = ErasureSetupTypereturn endpoints, setupType, nil}
如果全部节点均为 URL 类型,继续处理,首先解析全部节点或至少找到一个本地节点,UpdateIsLocal 退出条件为找到至少一个本地节点或全部节点都解析完毕。然后继续统计全部节点的 URL 路径,本地端口、本地路径等,因为使用的都是 Set 类型,因此这几个变量中都不会包含重复元素。
if err = endpoints.UpdateIsLocal(foundLocal); err != nil {return endpoints, setupType, config.ErrInvalidErasureEndpoints(nil).Msg(err.Error())}// Here all endpoints are URL style.endpointPathSet := set.NewStringSet()localEndpointCount := 0localServerHostSet := set.NewStringSet()localPortSet := set.NewStringSet()for _, endpoint := range endpoints {endpointPathSet.Add(endpoint.Path)if endpoint.IsLocal {localServerHostSet.Add(endpoint.Hostname())var port string_, port, err = net.SplitHostPort(endpoint.Host)if err != nil {port = serverAddrPort}localPortSet.Add(port)localEndpointCount++}}
确保不存在监听在同一主机 IP 但是不同端口的节点存在
{pathIPMap := make(map[string]set.StringSet)for _, endpoint := range endpoints {host := endpoint.Hostname()hostIPSet, _ := getHostIP(host)if IPSet, ok := pathIPMap[endpoint.Path]; ok {if !IPSet.Intersection(hostIPSet).IsEmpty() {return endpoints, setupType,config.ErrInvalidErasureEndpoints(nil).Msg(fmt.Sprintf("path '%s' can not be served by different port on same address", endpoint.Path))}pathIPMap[endpoint.Path] = IPSet.Union(hostIPSet)} else {pathIPMap[endpoint.Path] = hostIPSet}}}
确保一个本地路径上最多只有一个节点
{localPathSet := set.CreateStringSet()for _, endpoint := range endpoints {if !endpoint.IsLocal {continue}if localPathSet.Contains(endpoint.Path) {return endpoints, setupType,config.ErrInvalidErasureEndpoints(nil).Msg(fmt.Sprintf("path '%s' cannot be served by different address on same server", endpoint.Path))}localPathSet.Add(endpoint.Path)}}
如果全部节点都是本地节点,且都监听在同一端口,那么设置类型为 ErasureSetupType。否则,即使全部均为本地节点,仍然是 DistErasureSetupType 类型。
if len(endpoints) == localEndpointCount {// If all endpoints have same port number, Just treat it as local erasure setup// using URL style endpoints.if len(localPortSet) == 1 {if len(localServerHostSet) > 1 {return endpoints, setupType,config.ErrInvalidErasureEndpoints(nil).Msg("all local endpoints should not have different hostnames/ips")}return endpoints, ErasureSetupType, nil}// Even though all endpoints are local, but those endpoints use different ports.// This means it is DistErasure setup.}
检查是否存在没有监听端口配置的节点存在,如果存在,补全;如果是本地节点切实际监听端口和全局端口不同,设置 IsLocal 为 false。
for i := range endpoints {_, port, err := net.SplitHostPort(endpoints[i].Host)if err != nil {endpoints[i].Host = net.JoinHostPort(endpoints[i].Host, serverAddrPort)} else if endpoints[i].IsLocal && serverAddrPort != port {// If endpoint is local, but port is different than serverAddrPort, then make it as remote.endpoints[i].IsLocal = false}}uniqueArgs := set.NewStringSet()for _, endpoint := range endpoints {uniqueArgs.Add(endpoint.Host)}
检查是否配置了公共 IP 地址,如果没有配置公共 IP,更新主机为 IP:Port 形式。
publicIPs := env.Get(config.EnvPublicIPs, "")if len(publicIPs) == 0 {updateDomainIPs(uniqueArgs)}setupType = DistErasureSetupType
updateDomainIPs 代码如下
func updateDomainIPs(endPoints set.StringSet) {ipList := set.NewStringSet()for e := range endPoints {host, port, err := net.SplitHostPort(e)if err != nil {if strings.Contains(err.Error(), "missing port in address") {host = eport = globalMinioPort} else {continue}}if net.ParseIP(host) == nil {IPs, err := getHostIP(host)if err != nil {continue}IPsWithPort := IPs.ApplyFunc(func(ip string) string {return net.JoinHostPort(ip, port)})ipList = ipList.Union(IPsWithPort)}ipList.Add(net.JoinHostPort(host, port))}globalDomainIPs = ipList.FuncMatch(func(ip string, matchString string) bool {host, _, err := net.SplitHostPort(ip)if err != nil {host = ip}return !net.ParseIP(host).IsLoopback() && host != "localhost"}, "")}
