使用RESTClientFor(Config)可以创建指定Kubeconfig配置的Kubernetes Client-go REST客户端。
使用如下:

  1. restClient, err := rest.RESTClientFor(config)

代码片段1:RESTClientFor()

代码路径k8s.io/client-go/rest/config.go

  1. // RESTClientFor returns a RESTClient that satisfies the requested attributes on a client Config
  2. // object. Note that a RESTClient may require fields that are optional when initializing a Client.
  3. // A RESTClient created by this method is generic - it expects to operate on an API that follows
  4. // the Kubernetes conventions, but may not be the Kubernetes API.
  5. func RESTClientFor(config *Config) (*RESTClient, error) {
  6. // 需要在Config中指定GV和Codec
  7. if config.GroupVersion == nil {
  8. return nil, fmt.Errorf("GroupVersion is required when initializing a RESTClient")
  9. }
  10. if config.NegotiatedSerializer == nil {
  11. return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
  12. }
  13. // 从config中获取访问的K8s apiserver的baseURL和versionedAPIPath
  14. // 详细分析见后文代码片段1.1
  15. baseURL, versionedAPIPath, err := defaultServerUrlFor(config)
  16. if err != nil {
  17. return nil, err
  18. }
  19. transport, err := TransportFor(config)
  20. if err != nil {
  21. return nil, err
  22. }
  23. var httpClient *http.Client
  24. if transport != http.DefaultTransport {
  25. httpClient = &http.Client{Transport: transport}
  26. if config.Timeout > 0 {
  27. httpClient.Timeout = config.Timeout
  28. }
  29. }
  30. rateLimiter := config.RateLimiter
  31. if rateLimiter == nil {
  32. qps := config.QPS
  33. if config.QPS == 0.0 {
  34. qps = DefaultQPS
  35. }
  36. burst := config.Burst
  37. if config.Burst == 0 {
  38. burst = DefaultBurst
  39. }
  40. if qps > 0 {
  41. rateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst)
  42. }
  43. }
  44. var gv schema.GroupVersion
  45. if config.GroupVersion != nil {
  46. gv = *config.GroupVersion
  47. }
  48. clientContent := ClientContentConfig{
  49. AcceptContentTypes: config.AcceptContentTypes,
  50. ContentType: config.ContentType,
  51. GroupVersion: gv,
  52. Negotiator: runtime.NewClientNegotiator(config.NegotiatedSerializer, gv),
  53. }
  54. restClient, err := NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)
  55. if err == nil && config.WarningHandler != nil {
  56. restClient.warningHandler = config.WarningHandler
  57. }
  58. return restClient, err
  59. }

代码片段1.1:defaultServerUrlFor(config)

defaultServerUrlFor(config)用来获取config配置中的Host信息。
代码路径:k8s.io/client-go/rest/url_utils

  1. // defaultServerUrlFor is shared between IsConfigTransportTLS and RESTClientFor. It
  2. // requires Host and Version to be set prior to being called.
  3. func defaultServerUrlFor(config *Config) (*url.URL, string, error) {
  4. // TODO: move the default to secure when the apiserver supports TLS by default
  5. // config.Insecure is taken to mean "I want HTTPS but don't bother checking the certs against a CA."
  6. hasCA := len(config.CAFile) != 0 || len(config.CAData) != 0
  7. hasCert := len(config.CertFile) != 0 || len(config.CertData) != 0
  8. defaultTLS := hasCA || hasCert || config.Insecure // 判断是否开启TLS认证
  9. host := config.Host
  10. if host == "" {
  11. host = "localhost" //如果host为空,则使用Localhost
  12. }
  13. // 调用DefaultServerURL获取baseURL, versionedAPIPath
  14. if config.GroupVersion != nil {
  15. return DefaultServerURL(host, config.APIPath, *config.GroupVersion, defaultTLS)
  16. }
  17. return DefaultServerURL(host, config.APIPath, schema.GroupVersion{}, defaultTLS)
  18. }

代码片段1.1.1:DefaultServerURL()

  1. // DefaultServerURL converts a host, host:port, or URL string to the default base server API path
  2. // to use with a Client at a given API version following the standard conventions for a
  3. // Kubernetes API.
  4. func DefaultServerURL(host, apiPath string, groupVersion schema.GroupVersion, defaultTLS bool) (*url.URL, string, error) {
  5. if host == "" {
  6. return nil, "", fmt.Errorf("host must be a URL or a host:port pair")
  7. }
  8. base := host
  9. // 使用内置函数url.Parse()将config中的host字段解析为net/url.URL结构,hostURL内容如下图所示。
  10. hostURL, err := url.Parse(base)
  11. // 这里处理解析错误的情况、scheme解析为空、host解析为空的情况
  12. if err != nil || hostURL.Scheme == "" || hostURL.Host == "" {
  13. scheme := "http://"
  14. if defaultTLS {
  15. scheme = "https://"
  16. }
  17. hostURL, err = url.Parse(scheme + base) // 使用scheme+base再尝试一次,可能config.Host中未配置scheme.
  18. if err != nil {
  19. return nil, "", err
  20. }
  21. if hostURL.Path != "" && hostURL.Path != "/" {
  22. return nil, "", fmt.Errorf("host must be a URL or a host:port pair: %q", base)
  23. }
  24. }
  25. // hostURL.Path is optional; a non-empty Path is treated as a prefix that is to be applied to
  26. // all URIs used to access the host. this is useful when there's a proxy in front of the
  27. // apiserver that has relocated the apiserver endpoints, forwarding all requests from, for
  28. // example, /a/b/c to the apiserver. in this case the Path should be /a/b/c.
  29. //
  30. // if running without a frontend proxy (that changes the location of the apiserver), then
  31. // hostURL.Path should be blank.
  32. //
  33. // versionedAPIPath, a path relative to baseURL.Path, points to a versioned API base
  34. // DefaultVersionedAPIPath将apiPath和groupVersion的内容拼凑为versionedAPIPath,详细分析见代码片段1.1.2
  35. versionedAPIPath := DefaultVersionedAPIPath(apiPath, groupVersion)
  36. return hostURL, versionedAPIPath, nil
  37. }

如果config中配置的host为https://10.176.122.1:6443,则url.Parse()将config中的host字段解析为net/url.URL结构如下所示:
image.png

代码片段1.1.2:DefaultVersionedAPIPath

设置API访问的版本路径。

  1. // DefaultVersionedAPIPathFor constructs the default path for the given group version, assuming the given
  2. // API path, following the standard conventions of the Kubernetes API.
  3. func DefaultVersionedAPIPath(apiPath string, groupVersion schema.GroupVersion) string {
  4. // 使用path包下的Join函数将apiPath添加到/末尾,通常versionedAPIPath为/api或者apis
  5. // /api主要用于Kubernetes内置的资源对象(即核心资源组,没有组名),例如/api/v1/pods
  6. // /apis主要用于非核心资源组,有组名,例如/apis/apps/v1/deployments
  7. versionedAPIPath := path.Join("/", apiPath)
  8. // Add the version to the end of the path
  9. if len(groupVersion.Group) > 0 { // 如果有组名,则将versionedAPIPath+group+version
  10. versionedAPIPath = path.Join(versionedAPIPath, groupVersion.Group, groupVersion.Version)
  11. } else { // 如果没有组名,则versionedAPIPath+version
  12. versionedAPIPath = path.Join(versionedAPIPath, groupVersion.Version)
  13. }
  14. return versionedAPIPath
  15. }

代码片段1.2:TransportFor(Config)

  1. // TransportFor returns an http.RoundTripper that will provide the authentication
  2. // or transport level security defined by the provided Config. Will return the
  3. // default http.DefaultTransport if no special case behavior is needed.
  4. // TransportForfan将返回一个http.RoundTripper对象,这个对象将提供Config配置的认证或者传输层的安全保证。
  5. // 如果没有特别的需要,将返回http.DefaultTransport
  6. func TransportFor(config *Config) (http.RoundTripper, error) {
  7. // 使用config.TransportConfig()方法,将client Config转化为 transport Config
  8. cfg, err := config.TransportConfig()
  9. if err != nil {
  10. return nil, err
  11. }
  12. return transport.New(cfg)
  13. }

代码片段1.2.1:TransportConfig()

将rest.Config转化为transport.Config,以便用于传输层的安全认证

  1. // TransportConfig converts a client config to an appropriate transport config.
  2. func (c *Config) TransportConfig() (*transport.Config, error) {
  3. conf := &transport.Config{
  4. UserAgent: c.UserAgent,
  5. Transport: c.Transport,
  6. WrapTransport: c.WrapTransport,
  7. DisableCompression: c.DisableCompression,
  8. TLS: transport.TLSConfig{
  9. Insecure: c.Insecure,
  10. ServerName: c.ServerName,
  11. CAFile: c.CAFile,
  12. CAData: c.CAData,
  13. CertFile: c.CertFile,
  14. CertData: c.CertData,
  15. KeyFile: c.KeyFile,
  16. KeyData: c.KeyData,
  17. NextProtos: c.NextProtos,
  18. },
  19. Username: c.Username,
  20. Password: c.Password,
  21. BearerToken: c.BearerToken,
  22. BearerTokenFile: c.BearerTokenFile,
  23. Impersonate: transport.ImpersonationConfig{
  24. UserName: c.Impersonate.UserName,
  25. Groups: c.Impersonate.Groups,
  26. Extra: c.Impersonate.Extra,
  27. },
  28. Dial: c.Dial,
  29. Proxy: c.Proxy,
  30. }
  31. if c.ExecProvider != nil && c.AuthProvider != nil {
  32. return nil, errors.New("execProvider and authProvider cannot be used in combination")
  33. }
  34. if c.ExecProvider != nil {
  35. provider, err := exec.GetAuthenticator(c.ExecProvider)
  36. if err != nil {
  37. return nil, err
  38. }
  39. if err := provider.UpdateTransportConfig(conf); err != nil {
  40. return nil, err
  41. }
  42. }
  43. if c.AuthProvider != nil {
  44. provider, err := GetAuthProvider(c.Host, c.AuthProvider, c.AuthConfigPersister)
  45. if err != nil {
  46. return nil, err
  47. }
  48. conf.Wrap(provider.WrapTransport)
  49. }
  50. return conf, nil
  51. }

下图为rest.Config配置
image.png
下图为transport.Config配置
image.png

代码片段1.2.2:

  1. // New returns an http.RoundTripper that will provide the authentication
  2. // or transport level security defined by the provided Config.
  3. func New(config *Config) (http.RoundTripper, error) {
  4. // Set transport level security
  5. if config.Transport != nil && (config.HasCA() || config.HasCertAuth() || config.HasCertCallback() || config.TLS.Insecure) {
  6. return nil, fmt.Errorf("using a custom transport with TLS certificate options or the insecure flag is not allowed")
  7. }
  8. var (
  9. rt http.RoundTripper
  10. err error
  11. )
  12. if config.Transport != nil {
  13. rt = config.Transport
  14. } else {
  15. // client-go内部维护了一个transport的缓存,map[tlsCacheKey]*http.Transport
  16. rt, err = tlsCache.get(config)
  17. if err != nil {
  18. return nil, err
  19. }
  20. }
  21. return HTTPWrappersForConfig(config, rt)
  22. }

代码片段1.2.2.1

  1. func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) {
  2. // 为tls.Config生成为一个Key,详细分析见1.2.2.1.1
  3. key, canCache, err := tlsConfigKey(config)
  4. if err != nil {
  5. return nil, err
  6. }
  7. // 如果可以缓存,则为给定的tls生成单个transport
  8. if canCache {
  9. // Ensure we only create a single transport for the given TLS options
  10. c.mu.Lock()
  11. defer c.mu.Unlock()
  12. // See if we already have a custom transport for this config
  13. if t, ok := c.transports[key]; ok {
  14. return t, nil
  15. }
  16. }
  17. // Get the TLS options for this client config
  18. tlsConfig, err := TLSConfigFor(config)
  19. if err != nil {
  20. return nil, err
  21. }
  22. // The options didn't require a custom TLS config
  23. if tlsConfig == nil && config.Dial == nil && config.Proxy == nil {
  24. return http.DefaultTransport, nil
  25. }
  26. dial := config.Dial
  27. if dial == nil {
  28. dial = (&net.Dialer{
  29. Timeout: 30 * time.Second,
  30. KeepAlive: 30 * time.Second,
  31. }).DialContext
  32. }
  33. // If we use are reloading files, we need to handle certificate rotation properly
  34. // TODO(jackkleeman): We can also add rotation here when config.HasCertCallback() is true
  35. if config.TLS.ReloadTLSFiles {
  36. dynamicCertDialer := certRotatingDialer(tlsConfig.GetClientCertificate, dial)
  37. tlsConfig.GetClientCertificate = dynamicCertDialer.GetClientCertificate
  38. dial = dynamicCertDialer.connDialer.DialContext
  39. go dynamicCertDialer.Run(wait.NeverStop)
  40. }
  41. proxy := http.ProxyFromEnvironment
  42. if config.Proxy != nil {
  43. proxy = config.Proxy
  44. }
  45. transport := utilnet.SetTransportDefaults(&http.Transport{
  46. Proxy: proxy,
  47. TLSHandshakeTimeout: 10 * time.Second,
  48. TLSClientConfig: tlsConfig,
  49. MaxIdleConnsPerHost: idleConnsPerHost,
  50. DialContext: dial,
  51. DisableCompression: config.DisableCompression,
  52. })
  53. if canCache {
  54. // Cache a single transport for these options
  55. c.transports[key] = transport
  56. }
  57. return transport, nil
  58. }
  59. // TlsTransportCache caches TLS http.RoundTrippers different configurations. The
  60. // same RoundTripper will be returned for configs with identical TLS options If
  61. // the config has no custom TLS options, http.DefaultTransport is returned.
  62. type tlsTransportCache struct {
  63. mu sync.Mutex
  64. transports map[tlsCacheKey]*http.Transport
  65. }

1.2.2.1.1
  1. // tlsConfigKey returns a unique key for tls.Config objects returned from TLSConfigFor
  2. func tlsConfigKey(c *Config) (tlsCacheKey, bool, error) {
  3. // Make sure ca/key/cert content is loaded
  4. if err := loadTLSFiles(c); err != nil {
  5. return tlsCacheKey{}, false, err
  6. }
  7. if c.TLS.GetCert != nil || c.Dial != nil || c.Proxy != nil {
  8. // cannot determine equality for functions
  9. return tlsCacheKey{}, false, nil
  10. }
  11. k := tlsCacheKey{
  12. insecure: c.TLS.Insecure,
  13. caData: string(c.TLS.CAData),
  14. serverName: c.TLS.ServerName,
  15. nextProtos: strings.Join(c.TLS.NextProtos, ","),
  16. disableCompression: c.DisableCompression,
  17. }
  18. if c.TLS.ReloadTLSFiles {
  19. k.certFile = c.TLS.CertFile
  20. k.keyFile = c.TLS.KeyFile
  21. } else {
  22. k.certData = string(c.TLS.CertData)
  23. k.keyData = string(c.TLS.KeyData)
  24. }
  25. return k, true, nil
  26. }