HttpServer

  1. var configFile string
  2. func initArgs() {
  3. flag.StringVar(&configFile, "config", "./master.json", "配置文件位置")
  4. flag.Parse()
  5. }
  6. func initEnv() {
  7. runtime.GOMAXPROCS(runtime.NumCPU())
  8. }
  9. func main() {
  10. initArgs()
  11. initEnv()
  12. // 加载配置
  13. err := master.InitConfig(configFile)
  14. if err != nil {
  15. panic(err)
  16. }
  17. // 任务管理器
  18. err = master.InitJobMgr()
  19. if err != nil {
  20. panic(err)
  21. }
  22. if err := master.InitApiServer(); err != nil {
  23. panic(err)
  24. }
  25. for {
  26. time.Sleep(1 * time.Second)
  27. }
  28. }
  1. func InitApiServer() error {
  2. mux := http.NewServeMux()
  3. mux.HandleFunc("/job/save", handleJobSave)
  4. go http.ListenAndServe(":"+strconv.Itoa(G_config.ApiPort), mux)
  5. log.Println("ApiPort: ", G_config.ApiPort)
  6. return nil
  7. }
  1. type Config struct {
  2. ApiPort int `json:"apiPort"`
  3. ReadTimeout int `json:"readTimeout"`
  4. WriteTimeout int `json:"writeTimeout"`
  5. EtcdEndpoints []string `json:"etcdEndpoints"`
  6. EtcdDialTimeout int `json:"etcdDialTimeout"`
  7. }
  8. func InitConfig(filename string) error {
  9. file, err := ioutil.ReadFile(filename)
  10. if err != nil {
  11. return err
  12. }
  13. var conf Config
  14. err = json.Unmarshal(file, &conf)
  15. if err != nil {
  16. return err
  17. }
  18. G_config = &conf
  19. return nil
  20. }

保存任务接口, 保存到etcd

  1. func handleJobSave(w http.ResponseWriter, r *http.Request) {
  2. w.Header().Set("content-type", "text/json")
  3. err := r.ParseForm()
  4. if err != nil {
  5. fmt.Println(`ParseForm err: `, err)
  6. response, _ := common.BuildResponse(1, err.Error(), nil)
  7. _, err = w.Write(response)
  8. return
  9. }
  10. var job common.Job
  11. postJob := r.PostForm.Get("job")
  12. err = json.Unmarshal([]byte(postJob), &job)
  13. if err != nil {
  14. fmt.Println(`ParseForm err: `, err)
  15. response, _ := common.BuildResponse(1, err.Error(), nil)
  16. _, err = w.Write(response)
  17. return
  18. }
  19. old, err := G_jobMgr.SaveJob(&job)
  20. if err != nil {
  21. response, _ := common.BuildResponse(1, err.Error(), old)
  22. _, err := w.Write(response)
  23. fmt.Println(err)
  24. return
  25. }
  26. response, _ := common.BuildResponse(0, "ok", old)
  27. _, err = w.Write(response)
  28. return
  29. }
  1. func (jobMgr *JobMgr) SaveJob(job *common.Job) (old *common.Job, err error) {
  2. jobKey := `/cron/jobs/` + job.Name
  3. jobVal, err := json.Marshal(job)
  4. if err != nil {
  5. return nil, err
  6. }
  7. response, err := jobMgr.kv.Put(context.TODO(), jobKey, string(jobVal), clientv3.WithPrevKV())
  8. if err != nil {
  9. return nil, err
  10. }
  11. oldJob := common.Job{}
  12. if response.PrevKv != nil {
  13. err := json.Unmarshal(response.PrevKv.Value, &oldJob)
  14. if err != nil {
  15. return nil, err
  16. }
  17. }
  18. return &oldJob, err
  19. }

删除任务

  1. func (jobMgr *JobMgr) DeleteJob(name string) (old *common.Job, err error) {
  2. jobKey := `/cron/jobs/` + name
  3. response, err := jobMgr.kv.Delete(context.TODO(), jobKey, clientv3.WithPrevKV())
  4. if err != nil {
  5. return nil, err
  6. }
  7. oldJob := common.Job{}
  8. if len(response.PrevKvs) != 0 {
  9. err := json.Unmarshal(response.PrevKvs[0].Value, &oldJob)
  10. if err != nil {
  11. return nil, err
  12. }
  13. }
  14. return &oldJob, err
  15. }

List Job

  1. func (jobMgr *JobMgr) ListJobs() (list []*common.Job, err error) {
  2. dirKey := common.JOB_SAVE_DIR
  3. response, err := jobMgr.kv.Get(context.TODO(), dirKey, clientv3.WithPrefix())
  4. if err != nil {
  5. return nil, err
  6. }
  7. for _, kv := range response.Kvs {
  8. job := &common.Job{}
  9. err = json.Unmarshal(kv.Value, &job)
  10. if err == nil {
  11. list = append(list, &common.Job{
  12. Name: job.Name,
  13. Command: job.Command,
  14. CronExpr: job.CronExpr,
  15. })
  16. }
  17. }
  18. return list, err
  19. }

强制杀死任务

  1. func (jobMgr *JobMgr) KillJob(name string) (err error) {
  2. key := common.JOB_KILL_DIR + name
  3. // 创建一个自动过期的租约
  4. grant, err := jobMgr.lease.Grant(context.TODO(), 1)
  5. if err != nil {
  6. return err
  7. }
  8. leaseID := grant.ID
  9. _, err = jobMgr.kv.Put(context.TODO(), key, "", clientv3.WithLease(leaseID))
  10. return err
  11. }