Relationship
图 1:Endpoint 核心数据结构关系
在上图中,EndpointServerPools 是类型 []PoolEndpoints,PoolEndpoints 中包含一个 Endpoint 数组。SetCount 表示这个 PoolEndpoints 中有几组 Endpoint,DrivesPerSet 表示一个分组中有几个终端节点。
Create Endpoints
Arguments
创建 EndpointServerPools 的代码如下所示,其中,serverCmdArgs(ctx) 获取 minio 启动命令的参数信息,接下来,进入参数处理方法,看看都做了什么。
globalEndpoints, setupType, err = createServerEndpoints(globalMinioAddr, serverCmdArgs(ctx)...)
可以看到,minio 会从环境变量中检查 MINIO_ARGS 参数,如果没有配置,则读取 MINIO_ENDPOINTS 参数,如果仍然没有,直接读取命令行参数,这就是 minio 获取配置的顺序。这里需要注意,环境变量的管理并不是简单读取系统环境变量这么简单,详细请参阅环境变量。
// EnvArgs = "MINIO_ARGS"
// EnvEndpoints = "MINIO_ENDPOINTS"
func serverCmdArgs(ctx *cli.Context) []string {
v := env.Get(config.EnvArgs, "")
if v == "" {
// Fall back to older ENV MINIO_ENDPOINTS
v = env.Get(config.EnvEndpoints, "")
}
if v == "" {
if !ctx.Args().Present() || ctx.Args().First() == "help" {
cli.ShowCommandHelpAndExit(ctx, ctx.Command.Name, 1)
}
return ctx.Args()
}
return strings.Fields(v)
}
Custom Set Drive Count
可通过环境变量 MINIO_ERASURE_SET_DRIVE_COUNT 来自定义 DrivePerSet 值,注意这个值最好设置为偶数。
var customSetDriveCount uint64
if v := env.Get(EnvErasureSetDriveCount, ""); v != "" {
driveCount, err := strconv.Atoi(v)
if err != nil {
return nil, config.ErrInvalidErasureSetSize(err)
}
customSetDriveCount = uint64(driveCount)
}
Parse Endpoint Set
func parseEndpointSet(customSetDriveCount uint64, args ...string) (ep endpointSet, err error) {
var argPatterns = make([]ellipses.ArgPattern, len(args))
for i, arg := range args {
patterns, perr := ellipses.FindEllipsesPatterns(arg)
if perr != nil {
return endpointSet{}, config.ErrInvalidErasureEndpoints(nil).Msg(perr.Error())
}
argPatterns[i] = patterns
}
ep.setIndexes, err = getSetIndexes(args, getTotalSizes(argPatterns), customSetDriveCount, argPatterns)
if err != nil {
return endpointSet{}, config.ErrInvalidErasureEndpoints(nil).Msg(err.Error())
}
ep.argPatterns = argPatterns
return ep, nil
}
L2 - L9:将参数进行解析,获取各自需要匹配的省略号模式;
L11: 根据模式,获取各个 Endpoint 分组信息;
L16: 保留省略号模式信息;
L11 中的 getTotalSizes 如下所示,遍历每个模式,计算每个模式展开后的实际条目数量,注意 L6 的乘法运算,展开模式时,如果有多个省略号模式,如:xxx{1…64}/yyy{1…4} 会扩展为 xxx1/yyy1 这样的项目,因此时乘法关系
func getTotalSizes(argPatterns []ellipses.ArgPattern) []uint64 {
var totalSizes []uint64
for _, argPattern := range argPatterns {
var totalSize uint64 = 1
for _, p := range argPattern {
totalSize = totalSize * uint64(len(p.Seq))
}
totalSizes = append(totalSizes, totalSize)
}
return totalSizes
}
endpointSet 定义如下
type endpointSet struct {
argPatterns []ellipses.ArgPattern
endpoints []string // Endpoints saved from previous GetEndpoints().
setIndexes [][]uint64 // All the sets.
}
Get Set Index
getSetIndexes 方法较为复杂,函数声明如下所示,需要注意的是 args 中又可能会含有包含省略号的配置项,也有可能只有一个参数,后续无特殊说明,全部都针对多参数项且带省略号配置来分析。在上面章节的分析中,我们知道 args 为输入参数,totalSize 为对应索引的 args[index] 展开后项目的条数,其他参数都有覆盖,不再说明。
func getSetIndexes(
args []string,
totalSizes []uint64,
customSetDriveCount uint64,
argPatterns []ellipses.ArgPattern) (setIndexes [][]uint64, err error)
分配一个二维切片,长度与 totalSize 相同,并遍历 totalSize,筛查是否存在不满足条件的分组,如果存在则退出执行
setIndexes = make([][]uint64, len(totalSizes))
for _, totalSize := range totalSizes {
// Check if totalSize has minimum range upto setSize
if totalSize < setSizes[0] || totalSize < customSetDriveCount {
msg := fmt.Sprintf("Incorrect number of endpoints provided %s", args)
return nil, config.ErrInvalidNumberOfErasureEndpoints(nil).Msg(msg)
}
}
setSizes 定义如下,因此,一个分组长度至少为 4
var setSizes = []uint64{4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
然后找到 totalSize 中全部长度的最大公约数,注意 GCD 计算方法在 L4 - L9
commonSize := getDivisibleSize(totalSizes)
func getDivisibleSize(totalSizes []uint64) (result uint64) {
gcd := func(x, y uint64) uint64 {
for y != 0 {
x, y = y, x%y
}
return x
}
result = totalSizes[0]
for i := 1; i < len(totalSizes); i++ {
result = gcd(result, totalSizes[i])
}
return result
}
根据计算的 commonSize,在 setSizes 中筛选能够整除 commonSize 的数,以数组形式返回
possibleSetCounts := func(setSize uint64) (ss []uint64) {
for _, s := range setSizes {
if setSize%s == 0 {
ss = append(ss, s)
}
}
return ss
}
setCounts := possibleSetCounts(commonSize)
if len(setCounts) == 0 {
msg := fmt.Sprintf("Incorrect number of endpoints provided %s, number of disks %d is not divisible by any supported erasure set sizes %d", args, commonSize, setSizes)
return nil, config.ErrInvalidNumberOfErasureEndpoints(nil).Msg(msg)
}
接下来计算 setSize,如果定义了 customSetDriveCount,那么从刚返回的 setCounts 中找到这个数字,如果找不到,返回失败;
var setSize uint64
// Custom set drive count allows to override automatic distribution.
// only meant if you want to further optimize drive distribution.
if customSetDriveCount > 0 {
msg := fmt.Sprintf("Invalid set drive count. Acceptable values for %d number drives are %d", commonSize, setCounts)
var found bool
for _, ss := range setCounts {
if ss == customSetDriveCount {
found = true
}
}
if !found {
return nil, config.ErrInvalidErasureSetSize(nil).Msg(msg)
}
// No automatic symmetry calculation expected, user is on their own
setSize = customSetDriveCount
globalCustomErasureDriveCount = true
} else {
// Returns possible set counts with symmetry.
setCounts = possibleSetCountsWithSymmetry(setCounts, argPatterns)
if len(setCounts) == 0 {
msg := fmt.Sprintf("No symmetric distribution detected with input endpoints provided %s, disks %d cannot be spread symmetrically by any supported erasure set sizes %d", args, commonSize, setSizes)
return nil, config.ErrInvalidNumberOfErasureEndpoints(nil).Msg(msg)
}
// Final set size with all the symmetry accounted for.
setSize = commonSetDriveCount(commonSize, setCounts)
}
如果没有定义 customSetDriveCount,则重新计算 setCounts。遍历原始 setCounts,找到能被全部模式展开后数量整除的数字,保存下来。注意 L5 - L13,如果一个子模式的全部展开项数量能整除,就说明这个模式能被该数字整除(一个模式展开项总数必然是子项目展开项总数的倍数)。
func possibleSetCountsWithSymmetry(setCounts []uint64, argPatterns []ellipses.ArgPattern) []uint64 {
var newSetCounts = make(map[uint64]struct{})
for _, ss := range setCounts {
var symmetry bool
for _, argPattern := range argPatterns {
for _, p := range argPattern {
if uint64(len(p.Seq)) > ss {
symmetry = uint64(len(p.Seq))%ss == 0
} else {
symmetry = ss%uint64(len(p.Seq)) == 0
}
}
}
// With no arg patterns, it is expected that user knows
// the right symmetry, so either ellipses patterns are
// provided (recommended) or no ellipses patterns.
if _, ok := newSetCounts[ss]; !ok && (symmetry || argPatterns == nil) {
newSetCounts[ss] = struct{}{}
}
}
setCounts = []uint64{}
for setCount := range newSetCounts {
setCounts = append(setCounts, setCount)
}
// Not necessarily needed but it ensures to the readers
// eyes that we prefer a sorted setCount slice for the
// subsequent function to figure out the right common
// divisor, it avoids loops.
sort.Slice(setCounts, func(i, j int) bool {
return setCounts[i] < setCounts[j]
})
return setCounts
}
根据 commonSize,及新计算的 setCounts 重新计算 setSize。setSize 选择的是 setCounts 中最大的能整除 commonSize 的数。
setSize = commonSetDriveCount(commonSize, setCounts)
func commonSetDriveCount(divisibleSize uint64, setCounts []uint64) (setSize uint64) {
// prefers setCounts to be sorted for optimal behavior.
if divisibleSize < setCounts[len(setCounts)-1] {
return divisibleSize
}
// Figure out largest value of total_drives_in_erasure_set which results
// in least number of total_drives/total_drives_erasure_set ratio.
prevD := divisibleSize / setCounts[0]
for _, cnt := range setCounts {
if divisibleSize%cnt == 0 {
d := divisibleSize / cnt
if d <= prevD {
prevD = d
setSize = cnt
}
}
}
return setSize
}
最后,根据每个参数展开项长度,分别除以 setSize 的到分组数量,再填充 setSize。
for i := range totalSizes {
for j := uint64(0); j < totalSizes[i]/setSize; j++ {
setIndexes[i] = append(setIndexes[i], setSize)
}
}
因此,最后返回的 setIndexes 中全部元素值都是相同的,如
[
[4, 4],
[4, 4, 4, 4],
[4, 4]
]
Get Endpoints Set
endpointSet 中,保存了原始参数,每个分组长度。Get 方法获取具体的分组方式,getEndpoints 将包含省略号的参数展开,获取全部的 Endpoint 名称,然后根据 setIndexes 进行划分。
func (s endpointSet) Get() (sets [][]string) {
var k = uint64(0)
endpoints := s.getEndpoints()
for i := range s.setIndexes {
for j := range s.setIndexes[i] {
sets = append(sets, endpoints[k:s.setIndexes[i][j]+k])
k = s.setIndexes[i][j] + k
}
}
return sets
}
Create Server Pools
经过上面的讲解,继续看如何创建 EndpointServerPools,可以发现,整个 Arguments 部分都在下面代码的 L2 种执行。因为 GetAllSets 获取的是最终的分组,且分组长度均为相同值,那么 L12 - L13 行的计算也就不难理解了。
for _, arg := range args {
setArgs, err := GetAllSets(arg)
if err != nil {
return nil, -1, err
}
endpointList, gotSetupType, err := CreateEndpoints(serverAddr, foundPrevLocal, setArgs...)
if err != nil {
return nil, -1, err
}
if err = endpointServerPools.Add(PoolEndpoints{
SetCount: len(setArgs),
DrivesPerSet: len(setArgs[0]),
Endpoints: endpointList,
}); err != nil {
return nil, -1, err
}
foundPrevLocal = endpointList.atleastOneEndpointLocal()
if setupType == UnknownSetupType {
setupType = gotSetupType
}
if setupType == ErasureSetupType && gotSetupType == DistErasureSetupType {
setupType = DistErasureSetupType
}
}
NewEndpoints
根据传入的节点地址,创建全部节点,注意全部节点类型必须一致,否则会报错。String Set 按照字符串集合来理解,可确认内部没有重复元素。
func NewEndpoints(args ...string) (endpoints Endpoints, err error) {
var endpointType EndpointType
var scheme string
uniqueArgs := set.NewStringSet()
// Loop through args and adds to endpoint list.
for i, arg := range args {
endpoint, err := NewEndpoint(arg)
if err != nil {
return nil, fmt.Errorf("'%s': %s", arg, err.Error())
}
// All endpoints have to be same type and scheme if applicable.
if i == 0 {
endpointType = endpoint.Type()
scheme = endpoint.Scheme
} else if endpoint.Type() != endpointType {
return nil, fmt.Errorf("mixed style endpoints are not supported")
} else if endpoint.Scheme != scheme {
return nil, fmt.Errorf("mixed scheme is not supported")
}
arg = endpoint.String()
if uniqueArgs.Contains(arg) {
return nil, fmt.Errorf("duplicate endpoints found")
}
uniqueArgs.Add(arg)
endpoints = append(endpoints, endpoint)
}
return endpoints, nil
}
不难看出,核心方法是 NewEndpoint,代码如下
func NewEndpoint(arg string) (ep Endpoint, e error) {
// isEmptyPath - check whether given path is not empty.
isEmptyPath := func(path string) bool {
return path == "" || path == SlashSeparator || path == `\`
}
if isEmptyPath(arg) {
return ep, fmt.Errorf("empty or root endpoint is not supported")
}
var isLocal bool
var host string
u, err := url.Parse(arg)
if err == nil && u.Host != "" {
// URL style of endpoint.
// Valid URL style endpoint is
// - Scheme field must contain "http" or "https"
// - All field should be empty except Host and Path.
if !((u.Scheme == "http" || u.Scheme == "https") &&
u.User == nil && u.Opaque == "" && !u.ForceQuery && u.RawQuery == "" && u.Fragment == "") {
return ep, fmt.Errorf("invalid URL endpoint format")
}
var port string
host, port, err = net.SplitHostPort(u.Host)
if err != nil {
if !strings.Contains(err.Error(), "missing port in address") {
return ep, fmt.Errorf("invalid URL endpoint format: %w", err)
}
host = u.Host
} else {
var p int
p, err = strconv.Atoi(port)
if err != nil {
return ep, fmt.Errorf("invalid URL endpoint format: invalid port number")
} else if p < 1 || p > 65535 {
return ep, fmt.Errorf("invalid URL endpoint format: port number must be between 1 to 65535")
}
}
if i := strings.Index(host, "%"); i > -1 {
host = host[:i]
}
if host == "" {
return ep, fmt.Errorf("invalid URL endpoint format: empty host name")
}
// As this is path in the URL, we should use path package, not filepath package.
// On MS Windows, filepath.Clean() converts into Windows path style ie `/foo` becomes `\foo`
u.Path = path.Clean(u.Path)
if isEmptyPath(u.Path) {
return ep, fmt.Errorf("empty or root path is not supported in URL endpoint")
}
// On windows having a preceding SlashSeparator will cause problems, if the
// command line already has C:/<export-folder/ in it. Final resulting
// path on windows might become C:/C:/ this will cause problems
// of starting minio server properly in distributed mode on windows.
// As a special case make sure to trim the separator.
// NOTE: It is also perfectly fine for windows users to have a path
// without C:/ since at that point we treat it as relative path
// and obtain the full filesystem path as well. Providing C:/
// style is necessary to provide paths other than C:/,
// such as F:/, D:/ etc.
//
// Another additional benefit here is that this style also
// supports providing \\host\share support as well.
if runtime.GOOS == globalWindowsOSName {
if filepath.VolumeName(u.Path[1:]) != "" {
u.Path = u.Path[1:]
}
}
} else {
// Only check if the arg is an ip address and ask for scheme since its absent.
// localhost, example.com, any FQDN cannot be disambiguated from a regular file path such as
// /mnt/export1. So we go ahead and start the minio server in FS modes in these cases.
if isHostIP(arg) {
return ep, fmt.Errorf("invalid URL endpoint format: missing scheme http or https")
}
absArg, err := filepath.Abs(arg)
if err != nil {
return Endpoint{}, fmt.Errorf("absolute path failed %s", err)
}
u = &url.URL{Path: path.Clean(absArg)}
isLocal = true
}
return Endpoint{
URL: u,
IsLocal: isLocal,
}, nil
}
L14 - L75 处理 URL 式的参数;L77 - L88 处理文件路径式的参数。需要注意,仅当参数为文件路径时,isLocal 为 true。
Endpoint Type
Endpoint 只有两种类型,PathEndpointType 与 URLEndpointType,分别对应本地文件系统类型和网络类型的节点。
func (endpoint Endpoint) Type() EndpointType {
if endpoint.Host == "" {
return PathEndpointType
}
return URLEndpointType
}
创建完全部的节点后,判断节点类型,如果均为 Path 型,完成退出。
if endpoints[0].Type() == PathEndpointType {
setupType = ErasureSetupType
return endpoints, setupType, nil
}
如果全部节点均为 URL 类型,继续处理,首先解析全部节点或至少找到一个本地节点,UpdateIsLocal 退出条件为找到至少一个本地节点或全部节点都解析完毕。然后继续统计全部节点的 URL 路径,本地端口、本地路径等,因为使用的都是 Set 类型,因此这几个变量中都不会包含重复元素。
if err = endpoints.UpdateIsLocal(foundLocal); err != nil {
return endpoints, setupType, config.ErrInvalidErasureEndpoints(nil).Msg(err.Error())
}
// Here all endpoints are URL style.
endpointPathSet := set.NewStringSet()
localEndpointCount := 0
localServerHostSet := set.NewStringSet()
localPortSet := set.NewStringSet()
for _, endpoint := range endpoints {
endpointPathSet.Add(endpoint.Path)
if endpoint.IsLocal {
localServerHostSet.Add(endpoint.Hostname())
var port string
_, port, err = net.SplitHostPort(endpoint.Host)
if err != nil {
port = serverAddrPort
}
localPortSet.Add(port)
localEndpointCount++
}
}
确保不存在监听在同一主机 IP 但是不同端口的节点存在
{
pathIPMap := make(map[string]set.StringSet)
for _, endpoint := range endpoints {
host := endpoint.Hostname()
hostIPSet, _ := getHostIP(host)
if IPSet, ok := pathIPMap[endpoint.Path]; ok {
if !IPSet.Intersection(hostIPSet).IsEmpty() {
return endpoints, setupType,
config.ErrInvalidErasureEndpoints(nil).Msg(fmt.Sprintf("path '%s' can not be served by different port on same address", endpoint.Path))
}
pathIPMap[endpoint.Path] = IPSet.Union(hostIPSet)
} else {
pathIPMap[endpoint.Path] = hostIPSet
}
}
}
确保一个本地路径上最多只有一个节点
{
localPathSet := set.CreateStringSet()
for _, endpoint := range endpoints {
if !endpoint.IsLocal {
continue
}
if localPathSet.Contains(endpoint.Path) {
return endpoints, setupType,
config.ErrInvalidErasureEndpoints(nil).Msg(fmt.Sprintf("path '%s' cannot be served by different address on same server", endpoint.Path))
}
localPathSet.Add(endpoint.Path)
}
}
如果全部节点都是本地节点,且都监听在同一端口,那么设置类型为 ErasureSetupType。否则,即使全部均为本地节点,仍然是 DistErasureSetupType 类型。
if len(endpoints) == localEndpointCount {
// If all endpoints have same port number, Just treat it as local erasure setup
// using URL style endpoints.
if len(localPortSet) == 1 {
if len(localServerHostSet) > 1 {
return endpoints, setupType,
config.ErrInvalidErasureEndpoints(nil).Msg("all local endpoints should not have different hostnames/ips")
}
return endpoints, ErasureSetupType, nil
}
// Even though all endpoints are local, but those endpoints use different ports.
// This means it is DistErasure setup.
}
检查是否存在没有监听端口配置的节点存在,如果存在,补全;如果是本地节点切实际监听端口和全局端口不同,设置 IsLocal 为 false。
for i := range endpoints {
_, port, err := net.SplitHostPort(endpoints[i].Host)
if err != nil {
endpoints[i].Host = net.JoinHostPort(endpoints[i].Host, serverAddrPort)
} else if endpoints[i].IsLocal && serverAddrPort != port {
// If endpoint is local, but port is different than serverAddrPort, then make it as remote.
endpoints[i].IsLocal = false
}
}
uniqueArgs := set.NewStringSet()
for _, endpoint := range endpoints {
uniqueArgs.Add(endpoint.Host)
}
检查是否配置了公共 IP 地址,如果没有配置公共 IP,更新主机为 IP:Port 形式。
publicIPs := env.Get(config.EnvPublicIPs, "")
if len(publicIPs) == 0 {
updateDomainIPs(uniqueArgs)
}
setupType = DistErasureSetupType
updateDomainIPs 代码如下
func updateDomainIPs(endPoints set.StringSet) {
ipList := set.NewStringSet()
for e := range endPoints {
host, port, err := net.SplitHostPort(e)
if err != nil {
if strings.Contains(err.Error(), "missing port in address") {
host = e
port = globalMinioPort
} else {
continue
}
}
if net.ParseIP(host) == nil {
IPs, err := getHostIP(host)
if err != nil {
continue
}
IPsWithPort := IPs.ApplyFunc(func(ip string) string {
return net.JoinHostPort(ip, port)
})
ipList = ipList.Union(IPsWithPort)
}
ipList.Add(net.JoinHostPort(host, port))
}
globalDomainIPs = ipList.FuncMatch(func(ip string, matchString string) bool {
host, _, err := net.SplitHostPort(ip)
if err != nil {
host = ip
}
return !net.ParseIP(host).IsLoopback() && host != "localhost"
}, "")
}