Bit Rot Algorithm

通过类型 BitrotAlgorithm 来实现不同 Hash 算法的统一使用。例如在 minio 服务启动时,要执行 Hash 算法检查,如下所示

  1. func bitrotSelfTest() {
  2. var checksums = map[BitrotAlgorithm]string{
  3. SHA256: "a7677ff19e0182e4d52e3a3db727804abc82a5818749336369552e54b838b004",
  4. BLAKE2b512: "e519b7d84b1c3c917985f544773a35cf265dcab10948be3550320d156bab612124a5ae2ae5a8c73c0eea360f68b0e28136f26e858756dbfe7375a7389f26c669",
  5. HighwayHash256: "39c0407ed3f01b18d22c85db4aeff11e060ca5f43131b0126731ca197cd42313",
  6. HighwayHash256S: "39c0407ed3f01b18d22c85db4aeff11e060ca5f43131b0126731ca197cd42313",
  7. }
  8. for algorithm := range bitrotAlgorithms {
  9. if !algorithm.Available() {
  10. continue
  11. }
  12. checksum, err := hex.DecodeString(checksums[algorithm])
  13. if err != nil {
  14. logger.Fatal(errSelfTestFailure, fmt.Sprintf("bitrot: failed to decode %v checksum %s for selftest: %v", algorithm, checksums[algorithm], err))
  15. }
  16. var (
  17. hash = algorithm.New()
  18. msg = make([]byte, 0, hash.Size()*hash.BlockSize())
  19. sum = make([]byte, 0, hash.Size())
  20. )
  21. for i := 0; i < hash.Size()*hash.BlockSize(); i += hash.Size() {
  22. hash.Write(msg)
  23. sum = hash.Sum(sum[:0])
  24. msg = append(msg, sum...)
  25. hash.Reset()
  26. }
  27. if !bytes.Equal(sum, checksum) {
  28. logger.Fatal(errSelfTestFailure, fmt.Sprintf("bitrot: %v selftest checksum mismatch: got %x - want %x", algorithm, sum, checksum))
  29. }
  30. }
  31. }

核心是通过新类型定义,统一 Hash 算法使用的规范,BitrotAlgorithm 定义如下

  1. type BitrotAlgorithm uint
  2. const (
  3. // SHA256 represents the SHA-256 hash function
  4. SHA256 BitrotAlgorithm = 1 + iota
  5. // HighwayHash256 represents the HighwayHash-256 hash function
  6. HighwayHash256
  7. // HighwayHash256S represents the Streaming HighwayHash-256 hash function
  8. HighwayHash256S
  9. // BLAKE2b512 represents the BLAKE2b-512 hash function
  10. BLAKE2b512
  11. )

其 New 方法如下所示

  1. func (a BitrotAlgorithm) New() hash.Hash {
  2. switch a {
  3. case SHA256:
  4. return sha256.New()
  5. case BLAKE2b512:
  6. b2, _ := blake2b.New512(nil) // New512 never returns an error if the key is nil
  7. return b2
  8. case HighwayHash256:
  9. hh, _ := highwayhash.New(magicHighwayHash256Key) // New will never return error since key is 256 bit
  10. return hh
  11. case HighwayHash256S:
  12. hh, _ := highwayhash.New(magicHighwayHash256Key) // New will never return error since key is 256 bit
  13. return hh
  14. default:
  15. logger.CriticalIf(GlobalContext, errors.New("Unsupported bitrot algorithm"))
  16. return nil
  17. }
  18. }

Erasure Code

Minio 使用 Reed Solomon 纠错码来编码数据,算法实现部分使用 klauspost/reedsolomon。Minio 中封装如下

  1. type Erasure struct {
  2. encoder func() reedsolomon.Encoder
  3. dataBlocks, parityBlocks int
  4. blockSize int64
  5. }
  6. func NewErasure(ctx context.Context, dataBlocks, parityBlocks int, blockSize int64) (e Erasure, err error) {
  7. // Check the parameters for sanity now.
  8. if dataBlocks <= 0 || parityBlocks <= 0 {
  9. return e, reedsolomon.ErrInvShardNum
  10. }
  11. if dataBlocks+parityBlocks > 256 {
  12. return e, reedsolomon.ErrMaxShardNum
  13. }
  14. e = Erasure{
  15. dataBlocks: dataBlocks,
  16. parityBlocks: parityBlocks,
  17. blockSize: blockSize,
  18. }
  19. // Encoder when needed.
  20. var enc reedsolomon.Encoder
  21. var once sync.Once
  22. e.encoder = func() reedsolomon.Encoder {
  23. once.Do(func() {
  24. e, err := reedsolomon.New(dataBlocks, parityBlocks, reedsolomon.WithAutoGoroutines(int(e.ShardSize())))
  25. if err != nil {
  26. // Error conditions should be checked above.
  27. panic(err)
  28. }
  29. enc = e
  30. })
  31. return enc
  32. }
  33. return
  34. }

编解码部分如下所示

  1. // EncodeData encodes the given data and returns the erasure-coded data.
  2. // It returns an error if the erasure coding failed.
  3. func (e *Erasure) EncodeData(ctx context.Context, data []byte) ([][]byte, error) {
  4. if len(data) == 0 {
  5. return make([][]byte, e.dataBlocks+e.parityBlocks), nil
  6. }
  7. encoded, err := e.encoder().Split(data)
  8. if err != nil {
  9. logger.LogIf(ctx, err)
  10. return nil, err
  11. }
  12. if err = e.encoder().Encode(encoded); err != nil {
  13. logger.LogIf(ctx, err)
  14. return nil, err
  15. }
  16. return encoded, nil
  17. }
  18. // DecodeDataBlocks decodes the given erasure-coded data.
  19. // It only decodes the data blocks but does not verify them.
  20. // It returns an error if the decoding failed.
  21. func (e *Erasure) DecodeDataBlocks(data [][]byte) error {
  22. var isZero = 0
  23. for _, b := range data[:] {
  24. if len(b) == 0 {
  25. isZero++
  26. break
  27. }
  28. }
  29. if isZero == 0 || isZero == len(data) {
  30. // If all are zero, payload is 0 bytes.
  31. return nil
  32. }
  33. return e.encoder().ReconstructData(data)
  34. }
  35. func (e *Erasure) DecodeDataAndParityBlocks(ctx context.Context, data [][]byte) error {
  36. if err := e.encoder().Reconstruct(data); err != nil {
  37. logger.LogIf(ctx, err)
  38. return err
  39. }
  40. return nil
  41. }

Certificate Pool

cli-cert-pool.svg
图 1: x509 Certificate Pool AddCert 示意图

CertPool 来自于 Go 标准库,用于管理证书,在此处简要记录,方便后续理解。AddCert 可以清楚的看到 Certificate 及 CertPool 的关联,在 CertPool 的两个 map 中,value 域存储的是新增证书在切片中的索引值。