From 837351e0c4c0ffa9d83e6da3534dbaecf6c69c41 Mon Sep 17 00:00:00 2001 From: nicojwzhang Date: Tue, 27 Jul 2021 17:20:37 +0800 Subject: [PATCH] Update - add log --- group.go | 8 +++++--- job.go | 7 +++++++ node/node.go | 31 ++++++++++++++++++++++++++++++- 3 files changed, 42 insertions(+), 4 deletions(-) diff --git a/group.go b/group.go index a2ffeea..f43ae23 100644 --- a/group.go +++ b/group.go @@ -35,16 +35,18 @@ func GetGroupById(gid string) (g *Group, err error) { // GetGroups 获取包含 nid 的 group // 如果 nid 为空,则获取所有的 group +// 获取机器节点分组,非任务分组 func GetGroups(nid string) (groups map[string]*Group, err error) { + log.Debugf("nid:%s", nid) resp, err := DefalutClient.Get(conf.Config.Group, client.WithPrefix()) if err != nil { - return + return groups, err } count := len(resp.Kvs) groups = make(map[string]*Group, count) if count == 0 { - return + return groups, err } for _, g := range resp.Kvs { @@ -57,7 +59,7 @@ func GetGroups(nid string) (groups map[string]*Group, err error) { groups[group.ID] = group } } - return + return groups, err } func WatchGroups() client.WatchChan { diff --git a/job.go b/job.go index 4006a13..7c30bdb 100644 --- a/job.go +++ b/job.go @@ -188,7 +188,9 @@ func (j *Job) unlimit() { atomic.AddInt64(j.Count, -1) } +// 附加当前node的信息到job上 func (j *Job) Init(nodeID, hostname, ip string) { + log.Debugf("nodeId:%s, hostname:%s, ip:%s", nodeID, hostname, ip) var c int64 j.Count, j.runOn, j.hostname, j.ip = &c, nodeID, hostname, ip } @@ -339,6 +341,7 @@ func DeleteJob(group, id string) (resp *client.DeleteResponse, err error) { return DefalutClient.Delete(JobKey(group, id)) } +// 获取所有任务 func GetJobs() (jobs map[string]*Job, err error) { resp, err := DefalutClient.Get(conf.Config.Cmd, client.WithPrefix()) if err != nil { @@ -610,7 +613,9 @@ func (j *Job) Avg(t, et time.Time) { j.AvgTime = (j.AvgTime + execTime) / 2 } +// func (j *Job) Cmds(nid string, gs map[string]*Group) (cmds map[string]*Cmd) { + log.Debugf("nid:%s, gs:%+v", nid, gs) cmds = make(map[string]*Cmd) if j.Pause { return @@ -641,7 +646,9 @@ LOOP_TIMER_CMD: return } +// 判断该job是否会在该节点上运行 func (j Job) IsRunOn(nid string, gs map[string]*Group) bool { + log.Debugf("nid:%s, gs:%+v", nid, gs) LOOP_TIMER: for _, r := range j.Rules { for _, id := range r.ExcludeNodeIDs { diff --git a/node/node.go b/node/node.go index 74f2049..336f027 100644 --- a/node/node.go +++ b/node/node.go @@ -92,7 +92,9 @@ func (n *Node) Register() (err error) { return n.set() } +// 重置node的leaseid func (n *Node) set() error { + log.Debugf("n.ttl:%d", n.ttl) resp, err := n.Client.Grant(n.ttl + 2) if err != nil { return err @@ -103,11 +105,13 @@ func (n *Node) set() error { } n.lID = resp.ID + // 又没有重启进程,pid没有改变为什么要写入pidfile呢? n.writePIDFile() return nil } +// 将pid写入pidfile func (n *Node) writePIDFile() { if len(n.PIDFile) == 0 { return @@ -133,6 +137,7 @@ func (n *Node) writePIDFile() { } } +// 删除pidfile func (n *Node) removePIDFile() { if len(n.PIDFile) == 0 { return @@ -145,6 +150,7 @@ func (n *Node) removePIDFile() { // 断网掉线重新注册 func (n *Node) keepAlive() { + log.Debugf("ttl:%d", n.ttl) duration := time.Duration(n.ttl) * time.Second timer := time.NewTimer(duration) for { @@ -153,6 +159,7 @@ func (n *Node) keepAlive() { return case <-timer.C: if n.lID > 0 { + // 续期lease _, err := n.Client.KeepAliveOnce(n.lID) if err == nil { timer.Reset(duration) @@ -174,14 +181,17 @@ func (n *Node) keepAlive() { } func (n *Node) loadJobs() (err error) { + log.Debugf("") if n.groups, err = cronsun.GetGroups(""); err != nil { return } + log.Debugf("n.groups:%+v", n.groups) jobs, err := cronsun.GetJobs() if err != nil { return } + log.Debugf("jobs:%+v", jobs) if len(jobs) == 0 { return @@ -196,13 +206,17 @@ func (n *Node) loadJobs() (err error) { } func (n *Node) addJob(job *cronsun.Job, notice bool) { + log.Debugf("job:%+v, notice:%t", job, notice) n.link.addJob(job) + // 判断该job是否会在该节点运行 if job.IsRunOn(n.ID, n.groups) { n.jobs[job.ID] = job } + // 将任务封装为cmd,一个job下的多个rule属于多个cmd cmds := job.Cmds(n.ID, n.groups) + log.Debugf("cmds:%+v", cmds) if len(cmds) == 0 { return } @@ -236,6 +250,7 @@ func (n *Node) delJob(id string) { } func (n *Node) modJob(job *cronsun.Job) { + log.Debugf("job:%+v", job) oJob, ok := n.jobs[job.ID] // 之前此任务没有在当前结点执行,直接增加任务 if !ok { @@ -263,6 +278,7 @@ func (n *Node) modJob(job *cronsun.Job) { } func (n *Node) addCmd(cmd *cronsun.Cmd, notice bool) { + log.Debugf("cmd:%+v, notice:%t", cmd, notice) n.Cron.Schedule(cmd.JobRule.Schedule, cmd) n.cmds[cmd.GetID()] = cmd @@ -273,6 +289,7 @@ func (n *Node) addCmd(cmd *cronsun.Cmd, notice bool) { } func (n *Node) modCmd(cmd *cronsun.Cmd, notice bool) { + log.Debugf("cmd:%+v, notice:%t", cmd, notice) c, ok := n.cmds[cmd.GetID()] if !ok { n.addCmd(cmd, notice) @@ -440,9 +457,12 @@ func (n *Node) KillExcutingProc(process *cronsun.Process) { } func (n *Node) watchJobs() { + log.Debugf("") rch := cronsun.WatchJobs() for wresp := range rch { + log.Debugf("wresp:%+v", wresp) for _, ev := range wresp.Events { + log.Debugf("ev:%+v", ev) switch { case ev.IsCreate(): job, err := cronsun.GetJobFromKv(ev.Kv.Key, ev.Kv.Value) @@ -473,9 +493,10 @@ func (n *Node) watchJobs() { func (n *Node) watchExcutingProc() { rch := cronsun.WatchProcs(n.ID) - for wresp := range rch { + log.Debugf("wresp:%+v", wresp) for _, ev := range wresp.Events { + log.Debugf("ev:%+v", ev) switch { case ev.IsModify(): key := string(ev.Kv.Key) @@ -501,9 +522,12 @@ func (n *Node) watchExcutingProc() { } func (n *Node) watchGroups() { + log.Debugf("") rch := cronsun.WatchGroups() for wresp := range rch { + log.Debugf("wresp:%+v", wresp) for _, ev := range wresp.Events { + log.Debugf("ev:%+v", ev) switch { case ev.IsCreate(): g, err := cronsun.GetGroupFromKv(ev.Kv.Key, ev.Kv.Value) @@ -533,7 +557,9 @@ func (n *Node) watchGroups() { func (n *Node) watchOnce() { rch := cronsun.WatchOnce() for wresp := range rch { + log.Debugf("wresp:%+v", wresp) for _, ev := range wresp.Events { + log.Debugf("ev:%+v", ev) switch { case ev.IsCreate(), ev.IsModify(): if len(ev.Kv.Value) != 0 && string(ev.Kv.Value) != n.ID { @@ -554,7 +580,9 @@ func (n *Node) watchOnce() { func (n *Node) watchCsctl() { rch := cronsun.WatchCsctl() for wresp := range rch { + log.Debugf("wresp:%+v", wresp) for _, ev := range wresp.Events { + log.Debugf("ev:%+v", ev) switch { case ev.IsCreate(), ev.IsModify(): n.executCsctlCmd(ev.Kv.Key, ev.Kv.Value) @@ -565,6 +593,7 @@ func (n *Node) watchCsctl() { // 启动服务 func (n *Node) Run() (err error) { + log.Debugf("") go n.keepAlive() defer func() {