HttpServer
var configFile string
func initArgs() {
flag.StringVar(&configFile, "config", "./master.json", "配置文件位置")
flag.Parse()
}
func initEnv() {
runtime.GOMAXPROCS(runtime.NumCPU())
}
func main() {
initArgs()
initEnv()
// 加载配置
err := master.InitConfig(configFile)
if err != nil {
panic(err)
}
// 任务管理器
err = master.InitJobMgr()
if err != nil {
panic(err)
}
if err := master.InitApiServer(); err != nil {
panic(err)
}
for {
time.Sleep(1 * time.Second)
}
}
func InitApiServer() error {
mux := http.NewServeMux()
mux.HandleFunc("/job/save", handleJobSave)
go http.ListenAndServe(":"+strconv.Itoa(G_config.ApiPort), mux)
log.Println("ApiPort: ", G_config.ApiPort)
return nil
}
type Config struct {
ApiPort int `json:"apiPort"`
ReadTimeout int `json:"readTimeout"`
WriteTimeout int `json:"writeTimeout"`
EtcdEndpoints []string `json:"etcdEndpoints"`
EtcdDialTimeout int `json:"etcdDialTimeout"`
}
func InitConfig(filename string) error {
file, err := ioutil.ReadFile(filename)
if err != nil {
return err
}
var conf Config
err = json.Unmarshal(file, &conf)
if err != nil {
return err
}
G_config = &conf
return nil
}
保存任务接口, 保存到etcd
func handleJobSave(w http.ResponseWriter, r *http.Request) {
w.Header().Set("content-type", "text/json")
err := r.ParseForm()
if err != nil {
fmt.Println(`ParseForm err: `, err)
response, _ := common.BuildResponse(1, err.Error(), nil)
_, err = w.Write(response)
return
}
var job common.Job
postJob := r.PostForm.Get("job")
err = json.Unmarshal([]byte(postJob), &job)
if err != nil {
fmt.Println(`ParseForm err: `, err)
response, _ := common.BuildResponse(1, err.Error(), nil)
_, err = w.Write(response)
return
}
old, err := G_jobMgr.SaveJob(&job)
if err != nil {
response, _ := common.BuildResponse(1, err.Error(), old)
_, err := w.Write(response)
fmt.Println(err)
return
}
response, _ := common.BuildResponse(0, "ok", old)
_, err = w.Write(response)
return
}
func (jobMgr *JobMgr) SaveJob(job *common.Job) (old *common.Job, err error) {
jobKey := `/cron/jobs/` + job.Name
jobVal, err := json.Marshal(job)
if err != nil {
return nil, err
}
response, err := jobMgr.kv.Put(context.TODO(), jobKey, string(jobVal), clientv3.WithPrevKV())
if err != nil {
return nil, err
}
oldJob := common.Job{}
if response.PrevKv != nil {
err := json.Unmarshal(response.PrevKv.Value, &oldJob)
if err != nil {
return nil, err
}
}
return &oldJob, err
}
删除任务
func (jobMgr *JobMgr) DeleteJob(name string) (old *common.Job, err error) {
jobKey := `/cron/jobs/` + name
response, err := jobMgr.kv.Delete(context.TODO(), jobKey, clientv3.WithPrevKV())
if err != nil {
return nil, err
}
oldJob := common.Job{}
if len(response.PrevKvs) != 0 {
err := json.Unmarshal(response.PrevKvs[0].Value, &oldJob)
if err != nil {
return nil, err
}
}
return &oldJob, err
}
List Job
func (jobMgr *JobMgr) ListJobs() (list []*common.Job, err error) {
dirKey := common.JOB_SAVE_DIR
response, err := jobMgr.kv.Get(context.TODO(), dirKey, clientv3.WithPrefix())
if err != nil {
return nil, err
}
for _, kv := range response.Kvs {
job := &common.Job{}
err = json.Unmarshal(kv.Value, &job)
if err == nil {
list = append(list, &common.Job{
Name: job.Name,
Command: job.Command,
CronExpr: job.CronExpr,
})
}
}
return list, err
}
强制杀死任务
func (jobMgr *JobMgr) KillJob(name string) (err error) {
key := common.JOB_KILL_DIR + name
// 创建一个自动过期的租约
grant, err := jobMgr.lease.Grant(context.TODO(), 1)
if err != nil {
return err
}
leaseID := grant.ID
_, err = jobMgr.kv.Put(context.TODO(), key, "", clientv3.WithLease(leaseID))
return err
}