HttpServer
var configFile stringfunc initArgs() { flag.StringVar(&configFile, "config", "./master.json", "配置文件位置") flag.Parse()}func initEnv() { runtime.GOMAXPROCS(runtime.NumCPU())}func main() { initArgs() initEnv() // 加载配置 err := master.InitConfig(configFile) if err != nil { panic(err) } // 任务管理器 err = master.InitJobMgr() if err != nil { panic(err) } if err := master.InitApiServer(); err != nil { panic(err) } for { time.Sleep(1 * time.Second) }}
func InitApiServer() error { mux := http.NewServeMux() mux.HandleFunc("/job/save", handleJobSave) go http.ListenAndServe(":"+strconv.Itoa(G_config.ApiPort), mux) log.Println("ApiPort: ", G_config.ApiPort) return nil}
type Config struct { ApiPort int `json:"apiPort"` ReadTimeout int `json:"readTimeout"` WriteTimeout int `json:"writeTimeout"` EtcdEndpoints []string `json:"etcdEndpoints"` EtcdDialTimeout int `json:"etcdDialTimeout"`}func InitConfig(filename string) error { file, err := ioutil.ReadFile(filename) if err != nil { return err } var conf Config err = json.Unmarshal(file, &conf) if err != nil { return err } G_config = &conf return nil}
保存任务接口, 保存到etcd
func handleJobSave(w http.ResponseWriter, r *http.Request) { w.Header().Set("content-type", "text/json") err := r.ParseForm() if err != nil { fmt.Println(`ParseForm err: `, err) response, _ := common.BuildResponse(1, err.Error(), nil) _, err = w.Write(response) return } var job common.Job postJob := r.PostForm.Get("job") err = json.Unmarshal([]byte(postJob), &job) if err != nil { fmt.Println(`ParseForm err: `, err) response, _ := common.BuildResponse(1, err.Error(), nil) _, err = w.Write(response) return } old, err := G_jobMgr.SaveJob(&job) if err != nil { response, _ := common.BuildResponse(1, err.Error(), old) _, err := w.Write(response) fmt.Println(err) return } response, _ := common.BuildResponse(0, "ok", old) _, err = w.Write(response) return}
func (jobMgr *JobMgr) SaveJob(job *common.Job) (old *common.Job, err error) { jobKey := `/cron/jobs/` + job.Name jobVal, err := json.Marshal(job) if err != nil { return nil, err } response, err := jobMgr.kv.Put(context.TODO(), jobKey, string(jobVal), clientv3.WithPrevKV()) if err != nil { return nil, err } oldJob := common.Job{} if response.PrevKv != nil { err := json.Unmarshal(response.PrevKv.Value, &oldJob) if err != nil { return nil, err } } return &oldJob, err}
删除任务
func (jobMgr *JobMgr) DeleteJob(name string) (old *common.Job, err error) { jobKey := `/cron/jobs/` + name response, err := jobMgr.kv.Delete(context.TODO(), jobKey, clientv3.WithPrevKV()) if err != nil { return nil, err } oldJob := common.Job{} if len(response.PrevKvs) != 0 { err := json.Unmarshal(response.PrevKvs[0].Value, &oldJob) if err != nil { return nil, err } } return &oldJob, err}
List Job
func (jobMgr *JobMgr) ListJobs() (list []*common.Job, err error) { dirKey := common.JOB_SAVE_DIR response, err := jobMgr.kv.Get(context.TODO(), dirKey, clientv3.WithPrefix()) if err != nil { return nil, err } for _, kv := range response.Kvs { job := &common.Job{} err = json.Unmarshal(kv.Value, &job) if err == nil { list = append(list, &common.Job{ Name: job.Name, Command: job.Command, CronExpr: job.CronExpr, }) } } return list, err}
强制杀死任务
func (jobMgr *JobMgr) KillJob(name string) (err error) { key := common.JOB_KILL_DIR + name // 创建一个自动过期的租约 grant, err := jobMgr.lease.Grant(context.TODO(), 1) if err != nil { return err } leaseID := grant.ID _, err = jobMgr.kv.Put(context.TODO(), key, "", clientv3.WithLease(leaseID)) return err}