通常我们更新应用程序的配置文件,都需要手动重启程序或手动重新加载配置。假设一组服务部署在10台机器上,你需要借助批量运维工具执行重启命令,而且10台同时重启可能还会造成服务短暂不可用。要是更新配置后,服务自动刷新配置多好…今天我们就用go实现配置文件热加载的小功能,以后更新配置再也不用手动重启了…

基本思路

通常应用程序启动的流程:加载配置,然后run()。我们怎么做到热加载呢?我们的思路是这样的:
【1】在加载配置文件之后,启动一个线程。
【2】该线程定时监听这个配置文件是否有改动。
【3】如果配置文件有变动,就重新加载一下。
【4】重新加载之后通知需要使用这些配置的应用程序(进程或线程),实际上就是刷新内存中配置。

加载配置

首先我们要实现加载配置功能。假设配置文件是k=v格式的,如下:
配置文件热加载的go语言实现 - 图1

那我们得写一个解析配置的包了…让我们一起面向对象:
配置文件热加载的go语言实现 - 图2

  • filename string 配置文件名称
  • data map[string]string 将配置文件中的k/v解析存放到map中
  • lastModifyTime int64 记录配置文件上一次更改时间
  • rwLock sync.RWMutex 读写锁,处理这样一种竞争情况:更新这个结构体时其他线程正在读取改结构体中的内容,后续用到的时候会讲
  • notifyList []Notifyer 存放所有观察者,此处我们用到了观察者模式,也就是需要用到这个配置的对象,我们就把它加到这个切片。当配置更新之后,通知切片中的对象配置更新了。

构造函数

  1. 1 func NewConfig(file string)(conf *Config, err error){
  2. 2 conf = &Config{
  3. 3 filename: file,
  4. 4 data: make(map[string]string, 1024),
  5. 5 }
  6. 6
  7. 7 m, err := conf.parse()
  8. 8 if err != nil {
  9. 9 fmt.Printf("parse conf error:%v\n", err)
  10. 10 return
  11. 11 }
  12. 12
  13. 13 // 将解析配置文件后的数据更新到结构体的map中,写锁
  14. 14 conf.rwLock.Lock()
  15. 15 conf.data = m
  16. 16 conf.rwLock.Unlock()
  17. 17
  18. 18 // 启一个后台线程去检测配置文件是否更改
  19. 19 go conf.reload()
  20. 20 return
  21. 21}

构造函数做了三件事:
【1】初始化Config 
【2】调用parse()函数,解析配置文件,并把解析后的map更新到Config
【3】启动一个线程,准确说是启动一个goroutine,即reload()
注意此处更新map时加了写锁了,目的在于不影响拥有读锁的线程读取数据。

parse()

解析函数比较简单,主要是读取配置文件,一行行解析,数据存放在map中。

  1. 1 func (c *Config) parse() (m map[string]string, err error) {
  2. 2 // 如果在parse()中定义一个map,这样就是一个新的map不用加锁
  3. 3 m = make(map[string]string, 1024)
  4. 4
  5. 5 f, err := os.Open(c.filename)
  6. 6 if err != nil {
  7. 7 return
  8. 8 }
  9. 9 defer f.Close()
  10. 10
  11. 11 reader := bufio.NewReader(f)
  12. 12 // 声明一个变量存放读取行数
  13. 13 var lineNo int
  14. 14 for {
  15. 15 line, errRet := reader.ReadString('\n')
  16. 16 if errRet == io.EOF {
  17. 17 // 这里有一个坑,最后一行如果不是\n结尾会漏读
  18. 18 lineParse(&lineNo, &line, &m)
  19. 19 break
  20. 20 }
  21. 21 if errRet != nil {
  22. 22 err = errRet
  23. 23 return
  24. 24 }
  25. 25
  26. 26 lineParse(&lineNo, &line, &m)
  27. 27 }
  28. 28
  29. 29 return
  30. 30}
  31. 31
  32. 32 func lineParse(lineNo *int, line *string, m *map[string]string) {
  33. 33 *lineNo++
  34. 34
  35. 35 l := strings.TrimSpace(*line)
  36. 36 // 如果空行 或者 是注释 跳过
  37. 37 if len(l) == 0 || l[0] =='\n' || l[0]=='#' || l[0]==';' {
  38. 38 return
  39. 39 }
  40. 40
  41. 41 itemSlice := strings.Split(l, "=")
  42. 42 // =
  43. 43 if len(itemSlice) == 0 {
  44. 44 fmt.Printf("invalid config, line:%d", lineNo)
  45. 45 return
  46. 46 }
  47. 47
  48. 48 key := strings.TrimSpace(itemSlice[0])
  49. 49 if len(key) == 0 {
  50. 50 fmt.Printf("invalid config, line:%d", lineNo)
  51. 51 return
  52. 52 }
  53. 53 if len(key) == 1 {
  54. 54 (*m)[key] = ""
  55. 55 return
  56. 56 }
  57. 57
  58. 58 value := strings.TrimSpace(itemSlice[1])
  59. 59 (*m)[key] = value
  60. 60
  61. 61 return
  62. 62}

这里我写了两个函数。lineParse()是解析每一行配置的。parse()就是解析的主函数,在parse()中我调用了两次lineParse()。原因是在使用bufio按行读取配置文件的时候,有时候会出现这样的情况:有的人在写配置文件的时候,最后一行没有换行,也就是没有‘\n’,然后我们就直接读到io.EOF了,这时候如果直接break就会导致最后一行丢失。所以这种情况下我们应该在break之前调用lineParse()把最后一行处理了。

封装接口

  1. 上面我们已经实现了读取配置文件,并放到一个Config示例中,我们需要为这个Config封装一些接口,方便用户通过接口访问Config的内容。这步比较简单:
  1. 1func (c *Config) GetInt(key string)(value int, err error){
  2. 2 c.rwLock.RLock()
  3. 3 defer c.rwLock.RUnlock()
  4. 4
  5. 5 str, ok := c.data[key]
  6. 6 if !ok {
  7. 7 err = fmt.Errorf("key [%s] not found", key)
  8. 8 }
  9. 9 value, err = strconv.Atoi(str)
  10. 10 return
  11. 11}
  12. 12
  13. 13func (c *Config) GetIntDefault(key string, defaultInt int)(value int){
  14. 14 c.rwLock.RLock()
  15. 15 defer c.rwLock.RUnlock()
  16. 16
  17. 17 str, ok := c.data[key]
  18. 18 if !ok {
  19. 19 value = defaultInt
  20. 20 return
  21. 21 }
  22. 22 value, err := strconv.Atoi(str)
  23. 23 if err != nil {
  24. 24 value = defaultInt
  25. 25 }
  26. 26 return
  27. 27}
  28. 28
  29. 29func (c *Config) GetString(key string)(value string, err error){
  30. 30 c.rwLock.RLock()
  31. 31 defer c.rwLock.RUnlock()
  32. 32
  33. 33 value, ok := c.data[key]
  34. 34 if !ok {
  35. 35 err = fmt.Errorf("key [%s] not found", key)
  36. 36 }
  37. 37 return
  38. 38}
  39. 39
  40. 40func (c *Config) GetIStringDefault(key string, defaultStr string)(value string){
  41. 41 c.rwLock.RLock()
  42. 42 defer c.rwLock.RUnlock()
  43. 43
  44. 44 value, ok := c.data[key]
  45. 45 if !ok {
  46. 46 value = defaultStr
  47. 47 return
  48. 48 }
  49. 49 return
  50. 50}

如上,一共封装了4个接口:

  • GetInt(key string)(value int, err error) 通过key获取value,并将value转成int类
  • GetIntDefault(key string, defaultInt int)(value int) 通过key获取value,并将value转成int类型;如果获取失败,使用默认值
  • GetString(key string)(value string, err error) 通过key获取value,默认value为string类
  • GetIStringDefault(key string, defaultStr string)(value string) 通过key获取value,默认value为string类型;如果获取失败,使用默认值


    注意:四个接口都用了读锁

reload()

上面我们已经实现了解析,加载配置文件,并为Config封装了比较友好的接口。接下来,我们可以仔细看一下我们之前启动的goroutine了,即reload()方法。

  1. 1func (c *Config) reload(){
  2. 2 // 定时器
  3. 3 ticker := time.NewTicker(time.Second * 5)
  4. 4 for _ = range ticker.C {
  5. 5 // 打开文件
  6. 6 // 为什么使用匿名函数? 当匿名函数退出时可用defer去关闭文件
  7. 7 // 如果不用匿名函数,在循环中不好关闭文件,一不小心就内存泄露
  8. 8 func (){
  9. 9 f, err := os.Open(c.filename)
  10. 10 if err != nil {
  11. 11 fmt.Printf("open file error:%s\n", err)
  12. 12 return
  13. 13 }
  14. 14 defer f.Close()
  15. 15
  16. 16 fileInfo, err := f.Stat()
  17. 17 if err != nil {
  18. 18 fmt.Printf("stat file error:%s\n", err)
  19. 19 return
  20. 20 }
  21. 21 // 或取当前文件修改时间
  22. 22 curModifyTime := fileInfo.ModTime().Unix()
  23. 23 if curModifyTime > c.lastModifyTime {
  24. 24 // 重新解析时,要考虑应用程序正在读取这个配置因此应该加锁
  25. 25 m, err := c.parse()
  26. 26 if err != nil {
  27. 27 fmt.Printf("parse config error:%v\n", err)
  28. 28 return
  29. 29 }
  30. 30
  31. 31 c.rwLock.Lock()
  32. 32 c.data = m
  33. 33 c.rwLock.Unlock()
  34. 34
  35. 35 c.lastModifyTime = curModifyTime
  36. 36
  37. 37 // 配置更新通知所有观察者
  38. 38 for _, n := range c.notifyList {
  39. 39 n.Callback(c)
  40. 40 }
  41. 41 }
  42. 42 }()
  43. 43 }
  44. 44}

reload()函数中做了这几件事:
【1】用time.NewTicker每隔5秒去检查一下配置文件
【2】如果配置文件的修改时间比上一次修改时间大,我们认为配置文件更新了。那么我们调用parse()解析配置文件,并更新conf实例中的数据。并且更新配置文件的修改时间。
【3】通知所有观察者,即通知所有使用配置文件的程序、进程或实例,配置更新了。

观察者模式

我们反复提到观察者,反复提到通知所有观察者配置文件更新。那么我们就要实现这个观察者:

  1. type Notifyer interface {
  2. Callback(*Config)
  3. }

定义这样一个Notifyer接口,只要实现了Callback方法的对象,就都实现了这个Notifyer接口。实现了这个接口的对象,如果都需要被通知配置文件更新,那这些对象都可以加入到前面定义的notifyList []Notifyer这个切片中,等待被通知配置文件更新。

好了,此处我们是否少写了添加观察者的方法呢??是的,马上写:

  1. // 添加观察者
  2. func (c *Config) AddObserver(n Notifyer) {
  3. c.notifyList = append(c.notifyList, n)
  4. }

测试

经过上面一番折腾,咱们的热加载就快实现了,我们来测一测:
通常我们在应用程序中怎么使用配置文件?【1】加载配置文件,加载之后数据放在一个全局结构体中 【2】run()
也就是run()中我们要使用全局的结构体,但是这个全局结构体会因为配置文件的更改被更新。此时又存在需要加锁的情况了。我擦,是不是很麻烦。。不用担心,我们用一个原子操作搞定。

假设我们的配置文件中存放的是hostname/port/kafkaAddr/kafkaPort这几个字段。。

  1. type AppConfig struct {
  2. hostname string
  3. port int
  4. kafkaAddr string
  5. kafkaPort int
  6. }

接下来我们要用原子操作保证数据一致性了:

  1. // reload()协程写 和 for循环的读,都是对Appconfig对象,因此有读写冲突
  2. type AppConfigMgr struct {
  3. config atomic.Value
  4. }
  5. // 初始化结构体
  6. var appConfigMgr = &AppConfigMgr{}

atomic.Value能保证存放数据和读取出数据不会有冲突。所以当我们更新数据时存放到atomic.Value中,我们使用数据从atomic.Value加载出来,这样不用加锁就能保证数据的一致性了。完美~~

我们需要AppConfigMgr实现Callback方法,即实现Notifyer接口,这样才能被通知配置更新:

  1. func (a *AppConfigMgr)Callback(conf *reconf.Config) {
  2. appConfig := &AppConfig{}
  3. hostname, err := conf.GetString("hostname")
  4. if err != nil {
  5. fmt.Printf("get hostname err: %v\n", err)
  6. return
  7. }
  8. appConfig.hostname = hostname
  9. kafkaPort, err := conf.GetInt("kafkaPort")
  10. if err != nil {
  11. fmt.Printf("get kafkaPort err: %v\n", err)
  12. return
  13. }
  14. appConfig.kafkaPort = kafkaPort
  15. appConfigMgr.config.Store(appConfig)
  16. }

这个Callback实现功能是:当被通知配置更新时,马上读取更新的数据并存放到config atomic.Value 中。

好了,我们要写主函数了。

  1. func initConfig(file string) {
  2. // [1] 打开配置文件
  3. conf, err := reconf.NewConfig(file)
  4. if err != nil {
  5. fmt.Printf("read config file err: %v\n", err)
  6. return
  7. }
  8. // 添加观察者
  9. conf.AddObserver(appConfigMgr)
  10. // [2]第一次读取配置文件
  11. var appConfig AppConfig
  12. appConfig.hostname, err = conf.GetString("hostname")
  13. if err != nil {
  14. fmt.Printf("get hostname err: %v\n", err)
  15. return
  16. }
  17. fmt.Println("Hostname:", appConfig.hostname)
  18. appConfig.kafkaPort, err = conf.GetInt("kafkaPort")
  19. if err != nil {
  20. fmt.Printf("get kafkaPort err: %v\n", err)
  21. return
  22. }
  23. fmt.Println("kafkaPort:", appConfig.kafkaPort)
  24. // [3] 把读取到的配置文件数据存储到atomic.Value
  25. appConfigMgr.config.Store(&appConfig)
  26. fmt.Println("first load sucess.")
  27. }
  28. func run(){
  29. for {
  30. appConfig := appConfigMgr.config.Load().(*AppConfig)
  31. fmt.Println("Hostname:", appConfig.hostname)
  32. fmt.Println("kafkaPort:", appConfig.kafkaPort)
  33. fmt.Printf("%v\n", "--------------------")
  34. time.Sleep(5 * time.Second)
  35. }
  36. }
  37. func main() {
  38. confFile := "../parseConfig/test.cfg"
  39. initConfig(confFile)
  40. // 应用程序 很多配置已经不是存在文件中而是etcd
  41. run()
  42. }

主函数中调用了initConfig()和run()。

  • initConfig()中:reconf.NewConfig(file)的时候我们已经第一次解析配置,并启动线程不断更新配置了。此外initConfig()还做了一些事,就是通过Config提供的接口,将配置文件中的数据读取到appConfig 中,然后再将appConfig 存储到 atomic.Value中。
  • run()就是模拟应用程序在运行过程中使用配置的过程:run()中获取配置信息就是从 atomic.Value加载出来,这样保证数据一致性。


    编译运行,然后不断更改配置文件中kafkaAddr,测试结果如下:
    配置文件热加载的go语言实现 - 图3

这样配置文集热加载就实现了。


image.jpeg