Skip to content

Commit

Permalink
Merge pull request #30 from alibaba/bugfix-panic-mapreduce
Browse files Browse the repository at this point in the history
Bugfix panic mapreduce
  • Loading branch information
HuangXiaomeng authored Oct 24, 2024
2 parents a5ce30e + 85f888a commit 9ee5844
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 47 deletions.
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,11 @@ func (mr *TestMapReduceJob) Reduce(jobCtx *jobcontext.JobContext) (*processor.Pr
package main
import (
"os"
"os/signal"
"syscall"
"time"
"github.com/alibaba/schedulerx-worker-go"
)
Expand Down Expand Up @@ -379,7 +384,12 @@ func main() {
client.RegisterTask("TestBroadcast", task2)
client.RegisterTask("TestMapJob", task3)
client.RegisterTask("TestMapReduceJob", task4)
select {}
// wait for the stop signal
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR1, syscall.SIGUSR2)
<-c
time.Sleep(time.Second * 5)
}
```
Expand Down
47 changes: 28 additions & 19 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import (
"fmt"
"net"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/asynkron/protoactor-go/actor"

"github.com/alibaba/schedulerx-worker-go/config"
sxactor "github.com/alibaba/schedulerx-worker-go/internal/actor"
actorcomm "github.com/alibaba/schedulerx-worker-go/internal/actor/common"
Expand All @@ -47,12 +48,21 @@ var (
)

type Client struct {
cfg *Config
opts *Options
connpool pool.ConnPool
tasks *tasks.TaskMap
actorSystem *actor.ActorSystem
taskMasterPool *masterpool.TaskMasterPool
cfg *Config
opts *Options
tasks *tasks.TaskMap

stopChan chan os.Signal
}

func (c *Client) RegisterTask(name string, task processor.Processor) {
c.tasks.Register(name, task)
}

// Shutdown gracefully stop the client
func (c *Client) Shutdown() {
c.stopChan <- syscall.SIGINT
time.Sleep(time.Second * 5)
}

type Config struct {
Expand Down Expand Up @@ -149,23 +159,22 @@ func newClient(cfg *Config, opts ...Option) (*Client, error) {

// Init actors
actorSystem := actorcomm.GetActorSystem()
if err := sxactor.InitActors(actorSystem); err != nil {
return nil, fmt.Errorf("Init actors faild, err=%s. ", err.Error())
if err = sxactor.InitActors(actorSystem); err != nil {
return nil, fmt.Errorf("init actors faild, err=%s", err.Error())
}

stopChan := make(chan os.Signal, 1)
signal.Notify(stopChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR1, syscall.SIGUSR2)

// Keep heartbeat, and receive message
// KeepHeartbeat must after init actors, so that can get actorSystemPort from actorSystem
go remoting.KeepHeartbeat(ctx, actorSystem, cfg.AppKey)
go remoting.KeepHeartbeat(ctx, actorSystem, cfg.AppKey, stopChan)
go remoting.OnMsgReceived(ctx)

return &Client{
cfg: cfg,
opts: options,
tasks: taskMap,
actorSystem: actorSystem,
cfg: cfg,
opts: options,
tasks: taskMap,
stopChan: stopChan,
}, nil
}

func (c *Client) RegisterTask(name string, task processor.Processor) {
c.tasks.Register(name, task)
}
12 changes: 11 additions & 1 deletion example/broadcast/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
package main

import (
"os"
"os/signal"
"syscall"
"time"

"github.com/alibaba/schedulerx-worker-go"
)

Expand All @@ -36,5 +41,10 @@ func main() {
// The name TestBroadcast registered here must be consistent with the configured on the platform
task := &TestBroadcast{}
client.RegisterTask("TestBroadcast", task)
select {}

// wait for the stop signal
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR1, syscall.SIGUSR2)
<-c
time.Sleep(time.Second * 5)
}
8 changes: 4 additions & 4 deletions example/broadcast/test_broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var _ processor.BroadcastProcessor = &TestBroadcast{}
type TestBroadcast struct{}

// Process all machines would execute it.
func (t TestBroadcast) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
func (t *TestBroadcast) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
value := rand.Intn(10)
fmt.Printf("Total sharding num=%d, sharding=%d, value=%d\n", ctx.ShardingNum(), ctx.ShardingId(), value)
ret := new(processor.ProcessResult)
Expand All @@ -41,13 +41,13 @@ func (t TestBroadcast) Process(ctx *jobcontext.JobContext) (*processor.ProcessRe
}

// PreProcess only one machine will execute it.
func (t TestBroadcast) PreProcess(ctx *jobcontext.JobContext) error {
func (t *TestBroadcast) PreProcess(ctx *jobcontext.JobContext) error {
fmt.Println("TestBroadcastJob PreProcess")
return nil
}

// PostProcess only one machine will execute it.
func (t TestBroadcast) PostProcess(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
func (t *TestBroadcast) PostProcess(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
fmt.Println("TestBroadcastJob PostProcess")
allTaskResults := ctx.TaskResults()
allTaskStatuses := ctx.TaskStatuses()
Expand All @@ -68,7 +68,7 @@ func (t TestBroadcast) PostProcess(ctx *jobcontext.JobContext) (*processor.Proce
return ret, nil
}

func (t TestBroadcast) Kill(ctx *jobcontext.JobContext) error {
func (t *TestBroadcast) Kill(ctx *jobcontext.JobContext) error {
fmt.Println("[Kill] Start kill my task: TestBroadcast")
return nil
}
12 changes: 11 additions & 1 deletion example/mapreduce/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
package main

import (
"os"
"os/signal"
"syscall"
"time"

"github.com/alibaba/schedulerx-worker-go"
"github.com/alibaba/schedulerx-worker-go/processor/mapjob"
)
Expand All @@ -39,5 +44,10 @@ func main() {
mapjob.NewMapReduceJobProcessor(), // FIXME how define user behavior
}
client.RegisterTask("TestMapReduceJob", task)
select {}

// wait for the stop signal
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR1, syscall.SIGUSR2)
<-c
time.Sleep(time.Second * 5)
}
12 changes: 11 additions & 1 deletion example/standalone/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
package main

import (
"os"
"os/signal"
"syscall"
"time"

"github.com/alibaba/schedulerx-worker-go"
)

Expand All @@ -38,5 +43,10 @@ func main() {
// The name HelloWorld registered here must be consistent with the configured on the platform
task := &HelloWorld{}
client.RegisterTask("HelloWorld", task)
select {}

// wait for the stop signal
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR1, syscall.SIGUSR2)
<-c
time.Sleep(time.Second * 5)
}
1 change: 0 additions & 1 deletion internal/batch/base_req_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ func (rcvr *BaseReqHandler) Stop() {
}
if rcvr.reqsQueue != nil {
rcvr.reqsQueue.Clear()
rcvr.reqsQueue = nil
}
if rcvr.activeRunnableNum != nil {
rcvr.activeRunnableNum = nil
Expand Down
32 changes: 13 additions & 19 deletions internal/remoting/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@ import (
"fmt"
"math"
"os"
"os/signal"
"runtime"
"syscall"
"time"

"github.com/asynkron/protoactor-go/actor"
"github.com/shirou/gopsutil/load"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"

"github.com/alibaba/schedulerx-worker-go/config"
Expand All @@ -51,22 +49,21 @@ var (
waitHeartbeatRespTimeout = 5 * time.Second
)

func KeepHeartbeat(ctx context.Context, actorSystem *actor.ActorSystem, appKey string) {
func KeepHeartbeat(ctx context.Context, actorSystem *actor.ActorSystem, appKey string, stopChan chan os.Signal) {
var (
online = atomic.NewBool(true)
taskMasterPool = masterpool.GetTaskMasterPool()
groupManager = discovery.GetGroupManager()
)

heartbeat := func() {
heartbeat := func(online bool) {
_, actorSystemPort, err := actorSystem.GetHostPort()
if err != nil {
logger.Errorf("Write heartbeat to remote failed due to get actorSystem port failed, err=%s", err.Error())
return
}
for groupId, appGroupId := range groupManager.GroupId2AppGroupIdMap() {
jobInstanceIds := taskMasterPool.GetInstanceIds(appGroupId)
heartbeatReq := genHeartBeatRequest(groupId, appGroupId, jobInstanceIds, actorSystemPort, online.Load(), appKey)
heartbeatReq := genHeartBeatRequest(groupId, appGroupId, jobInstanceIds, actorSystemPort, online, appKey)
if err := sendHeartbeat(ctx, heartbeatReq); err != nil {
if errors.Is(err, syscall.EPIPE) || errors.Is(err, os.ErrDeadlineExceeded) {
pool.GetConnPool().ReconnectTrigger() <- struct{}{}
Expand All @@ -77,22 +74,19 @@ func KeepHeartbeat(ctx context.Context, actorSystem *actor.ActorSystem, appKey s
logger.Debugf("Write heartbeat to remote succeed.")
}
}
heartbeat()

// send worker offline heartbeat when shutdown
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR1, syscall.SIGUSR2)
<-c
online.Store(false)
heartbeat()
logger.Infof("Write shutdown heartbeat to remote succeed.")
}()
heartbeat(true)

ticker := time.NewTicker(heartbeatInterval)
defer ticker.Stop()
for range ticker.C {
heartbeat()
for {
select {
case <-stopChan:
heartbeat(false)
logger.Infof("Write shutdown heartbeat to remote succeed.")
return
case <-ticker.C:
heartbeat(true)
}
}
}

Expand Down

0 comments on commit 9ee5844

Please sign in to comment.