Skip to content

Commit

Permalink
Update - add log
Browse files Browse the repository at this point in the history
  • Loading branch information
nicojwzhang committed Jul 27, 2021
1 parent 854273f commit 837351e
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 4 deletions.
8 changes: 5 additions & 3 deletions group.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,18 @@ func GetGroupById(gid string) (g *Group, err error) {

// GetGroups 获取包含 nid 的 group
// 如果 nid 为空,则获取所有的 group
// 获取机器节点分组,非任务分组
func GetGroups(nid string) (groups map[string]*Group, err error) {
log.Debugf("nid:%s", nid)
resp, err := DefalutClient.Get(conf.Config.Group, client.WithPrefix())
if err != nil {
return
return groups, err
}

count := len(resp.Kvs)
groups = make(map[string]*Group, count)
if count == 0 {
return
return groups, err
}

for _, g := range resp.Kvs {
Expand All @@ -57,7 +59,7 @@ func GetGroups(nid string) (groups map[string]*Group, err error) {
groups[group.ID] = group
}
}
return
return groups, err
}

func WatchGroups() client.WatchChan {
Expand Down
7 changes: 7 additions & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,9 @@ func (j *Job) unlimit() {
atomic.AddInt64(j.Count, -1)
}

// 附加当前node的信息到job上
func (j *Job) Init(nodeID, hostname, ip string) {
log.Debugf("nodeId:%s, hostname:%s, ip:%s", nodeID, hostname, ip)
var c int64
j.Count, j.runOn, j.hostname, j.ip = &c, nodeID, hostname, ip
}
Expand Down Expand Up @@ -339,6 +341,7 @@ func DeleteJob(group, id string) (resp *client.DeleteResponse, err error) {
return DefalutClient.Delete(JobKey(group, id))
}

// 获取所有任务
func GetJobs() (jobs map[string]*Job, err error) {
resp, err := DefalutClient.Get(conf.Config.Cmd, client.WithPrefix())
if err != nil {
Expand Down Expand Up @@ -610,7 +613,9 @@ func (j *Job) Avg(t, et time.Time) {
j.AvgTime = (j.AvgTime + execTime) / 2
}

//
func (j *Job) Cmds(nid string, gs map[string]*Group) (cmds map[string]*Cmd) {
log.Debugf("nid:%s, gs:%+v", nid, gs)
cmds = make(map[string]*Cmd)
if j.Pause {
return
Expand Down Expand Up @@ -641,7 +646,9 @@ LOOP_TIMER_CMD:
return
}

// 判断该job是否会在该节点上运行
func (j Job) IsRunOn(nid string, gs map[string]*Group) bool {
log.Debugf("nid:%s, gs:%+v", nid, gs)
LOOP_TIMER:
for _, r := range j.Rules {
for _, id := range r.ExcludeNodeIDs {
Expand Down
31 changes: 30 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ func (n *Node) Register() (err error) {
return n.set()
}

// 重置node的leaseid
func (n *Node) set() error {
log.Debugf("n.ttl:%d", n.ttl)
resp, err := n.Client.Grant(n.ttl + 2)
if err != nil {
return err
Expand All @@ -103,11 +105,13 @@ func (n *Node) set() error {
}

n.lID = resp.ID
// 又没有重启进程,pid没有改变为什么要写入pidfile呢?
n.writePIDFile()

return nil
}

// 将pid写入pidfile
func (n *Node) writePIDFile() {
if len(n.PIDFile) == 0 {
return
Expand All @@ -133,6 +137,7 @@ func (n *Node) writePIDFile() {
}
}

// 删除pidfile
func (n *Node) removePIDFile() {
if len(n.PIDFile) == 0 {
return
Expand All @@ -145,6 +150,7 @@ func (n *Node) removePIDFile() {

// 断网掉线重新注册
func (n *Node) keepAlive() {
log.Debugf("ttl:%d", n.ttl)
duration := time.Duration(n.ttl) * time.Second
timer := time.NewTimer(duration)
for {
Expand All @@ -153,6 +159,7 @@ func (n *Node) keepAlive() {
return
case <-timer.C:
if n.lID > 0 {
// 续期lease
_, err := n.Client.KeepAliveOnce(n.lID)
if err == nil {
timer.Reset(duration)
Expand All @@ -174,14 +181,17 @@ func (n *Node) keepAlive() {
}

func (n *Node) loadJobs() (err error) {
log.Debugf("")
if n.groups, err = cronsun.GetGroups(""); err != nil {
return
}
log.Debugf("n.groups:%+v", n.groups)

jobs, err := cronsun.GetJobs()
if err != nil {
return
}
log.Debugf("jobs:%+v", jobs)

if len(jobs) == 0 {
return
Expand All @@ -196,13 +206,17 @@ func (n *Node) loadJobs() (err error) {
}

func (n *Node) addJob(job *cronsun.Job, notice bool) {
log.Debugf("job:%+v, notice:%t", job, notice)
n.link.addJob(job)

// 判断该job是否会在该节点运行
if job.IsRunOn(n.ID, n.groups) {
n.jobs[job.ID] = job
}

// 将任务封装为cmd,一个job下的多个rule属于多个cmd
cmds := job.Cmds(n.ID, n.groups)
log.Debugf("cmds:%+v", cmds)
if len(cmds) == 0 {
return
}
Expand Down Expand Up @@ -236,6 +250,7 @@ func (n *Node) delJob(id string) {
}

func (n *Node) modJob(job *cronsun.Job) {
log.Debugf("job:%+v", job)
oJob, ok := n.jobs[job.ID]
// 之前此任务没有在当前结点执行,直接增加任务
if !ok {
Expand Down Expand Up @@ -263,6 +278,7 @@ func (n *Node) modJob(job *cronsun.Job) {
}

func (n *Node) addCmd(cmd *cronsun.Cmd, notice bool) {
log.Debugf("cmd:%+v, notice:%t", cmd, notice)
n.Cron.Schedule(cmd.JobRule.Schedule, cmd)
n.cmds[cmd.GetID()] = cmd

Expand All @@ -273,6 +289,7 @@ func (n *Node) addCmd(cmd *cronsun.Cmd, notice bool) {
}

func (n *Node) modCmd(cmd *cronsun.Cmd, notice bool) {
log.Debugf("cmd:%+v, notice:%t", cmd, notice)
c, ok := n.cmds[cmd.GetID()]
if !ok {
n.addCmd(cmd, notice)
Expand Down Expand Up @@ -440,9 +457,12 @@ func (n *Node) KillExcutingProc(process *cronsun.Process) {
}

func (n *Node) watchJobs() {
log.Debugf("")
rch := cronsun.WatchJobs()
for wresp := range rch {
log.Debugf("wresp:%+v", wresp)
for _, ev := range wresp.Events {
log.Debugf("ev:%+v", ev)
switch {
case ev.IsCreate():
job, err := cronsun.GetJobFromKv(ev.Kv.Key, ev.Kv.Value)
Expand Down Expand Up @@ -473,9 +493,10 @@ func (n *Node) watchJobs() {

func (n *Node) watchExcutingProc() {
rch := cronsun.WatchProcs(n.ID)

for wresp := range rch {
log.Debugf("wresp:%+v", wresp)
for _, ev := range wresp.Events {
log.Debugf("ev:%+v", ev)
switch {
case ev.IsModify():
key := string(ev.Kv.Key)
Expand All @@ -501,9 +522,12 @@ func (n *Node) watchExcutingProc() {
}

func (n *Node) watchGroups() {
log.Debugf("")
rch := cronsun.WatchGroups()
for wresp := range rch {
log.Debugf("wresp:%+v", wresp)
for _, ev := range wresp.Events {
log.Debugf("ev:%+v", ev)
switch {
case ev.IsCreate():
g, err := cronsun.GetGroupFromKv(ev.Kv.Key, ev.Kv.Value)
Expand Down Expand Up @@ -533,7 +557,9 @@ func (n *Node) watchGroups() {
func (n *Node) watchOnce() {
rch := cronsun.WatchOnce()
for wresp := range rch {
log.Debugf("wresp:%+v", wresp)
for _, ev := range wresp.Events {
log.Debugf("ev:%+v", ev)
switch {
case ev.IsCreate(), ev.IsModify():
if len(ev.Kv.Value) != 0 && string(ev.Kv.Value) != n.ID {
Expand All @@ -554,7 +580,9 @@ func (n *Node) watchOnce() {
func (n *Node) watchCsctl() {
rch := cronsun.WatchCsctl()
for wresp := range rch {
log.Debugf("wresp:%+v", wresp)
for _, ev := range wresp.Events {
log.Debugf("ev:%+v", ev)
switch {
case ev.IsCreate(), ev.IsModify():
n.executCsctlCmd(ev.Kv.Key, ev.Kv.Value)
Expand All @@ -565,6 +593,7 @@ func (n *Node) watchCsctl() {

// 启动服务
func (n *Node) Run() (err error) {
log.Debugf("")
go n.keepAlive()

defer func() {
Expand Down

0 comments on commit 837351e

Please sign in to comment.