Skip to content

Commit

Permalink
feat: support broadcast and mapreduce (#5)
Browse files Browse the repository at this point in the history
* feat: support broadcast and mapreduce

Co-authored-by: xiaomeng.hxm <[email protected]>
  • Loading branch information
SinnerA and xiaomeng.hxm authored Jan 20, 2024
1 parent 68fecec commit 772e642
Show file tree
Hide file tree
Showing 112 changed files with 11,043 additions and 1,299 deletions.
51 changes: 37 additions & 14 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@ import (
"sync"
"time"

"github.com/asynkron/protoactor-go/actor"

"github.com/alibaba/schedulerx-worker-go/config"
sxactor "github.com/alibaba/schedulerx-worker-go/internal/actor"
"github.com/alibaba/schedulerx-worker-go/internal/actor/common"
"github.com/alibaba/schedulerx-worker-go/internal/discovery"
"github.com/alibaba/schedulerx-worker-go/internal/masterpool"
"github.com/alibaba/schedulerx-worker-go/internal/openapi"
"github.com/alibaba/schedulerx-worker-go/internal/remoting"
"github.com/alibaba/schedulerx-worker-go/internal/remoting/connpool"
"github.com/alibaba/schedulerx-worker-go/internal/remoting/pool"
"github.com/alibaba/schedulerx-worker-go/internal/tasks"
"github.com/alibaba/schedulerx-worker-go/logger"
"github.com/alibaba/schedulerx-worker-go/processor"
Expand All @@ -42,8 +47,12 @@ var (
)

type Client struct {
cfg *Config
opts *Options
cfg *Config
opts *Options
connpool pool.ConnPool
tasks *tasks.TaskMap
actorSystem *actor.ActorSystem
taskMasterPool *masterpool.TaskMasterPool
}

type Config struct {
Expand All @@ -52,7 +61,8 @@ type Config struct {
Namespace string `json:"Namespace"`
// GroupId may be exited multiple, separated by comma, such as "group1,group2"
GroupId string `json:"GroupId"`
// AppKey may be exited multiple, separated by comma, such as "appKey1,appKey2", appKey and groupId are in one-to-one correspondence
// AppKey may be exited multiple, separated by comma, such as "appKey1,appKey2",
// appKey and groupId are in one-to-one correspondence
AppKey string `json:"AppKey"`
}

Expand Down Expand Up @@ -121,29 +131,42 @@ func newClient(cfg *Config, opts ...Option) (*Client, error) {

// Init connection pool
dialer := func() (net.Conn, error) {
logger.Infof("Schedulerx discovery active server addr=%s", getActiveServer())
logger.Infof("SchedulerX discovery active server addr=%s", getActiveServer())
return net.DialTimeout("tcp", getActiveServer(), time.Millisecond*500)
}
singleConnPool := connpool.NewSingleConnPool(ctx, dialer,
connpool.WithPostDialer(remoting.Handshake),
connpool.WithAddrChangedSignalCh(serverDiscover.ResultChangedCh()))
connpool.InitConnPool(singleConnPool)
singleConnPool := pool.NewSingleConnPool(ctx, dialer,
pool.WithPostDialer(remoting.Handshake),
pool.WithAddrChangedSignalCh(serverDiscover.ResultChangedCh()))
pool.InitConnPool(singleConnPool)
if conn, err := singleConnPool.Get(ctx); err != nil {
return nil, fmt.Errorf("cannot connect schedulerx server, maybe network was broken, err=%s", err.Error())
} else {
logger.Infof("Schedulerx server connected, remoteAddr=%s, localAddr=%s", conn.RemoteAddr(), conn.LocalAddr().String())
logger.Infof("SchedulerX server connected, remoteAddr=%s, localAddr=%s", conn.RemoteAddr(), conn.LocalAddr().String())
}

tasks := tasks.GetTaskMap()
masterpool.InitTaskMasterPool(masterpool.NewTaskMasterPool(tasks))

// Init actors
actorSystem := actor.NewActorSystem()
actorcomm.InitActorSystem(actorSystem)
if err := sxactor.InitActors(actorSystem); err != nil {
return nil, fmt.Errorf("Init actors faild, err=%s. ", err.Error())
}

// Keep heartbeat, and receive message
go remoting.KeepHeartbeat(ctx)
// KeepHeartbeat must after init actors, so that can get actorSystemPort from actorSystem
go remoting.KeepHeartbeat(ctx, actorSystem)
go remoting.OnMsgReceived(ctx)

return &Client{
cfg: cfg,
opts: options,
cfg: cfg,
opts: options,
tasks: tasks,
actorSystem: actorSystem,
}, nil
}

func (c *Client) RegisterTask(name string, task processor.Processor) {
tasks.GetTaskMap().Register(name, task)
c.tasks.Register(name, task)
}
61 changes: 43 additions & 18 deletions config/worker_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package config

import (
"sync"
"time"

"github.com/alibaba/schedulerx-worker-go/internal/constants"
)
Expand All @@ -42,12 +43,6 @@ func GetWorkerConfig() *WorkerConfig {

type Option func(*WorkerConfig)

func WithDebugMode() Option {
return func(config *WorkerConfig) {
config.isDebugMode = true
}
}

func WithEnableShareContainerPool() Option {
return func(config *WorkerConfig) {
config.isShareContainerPool = true
Expand All @@ -72,6 +67,18 @@ func WithEnableDispatchSecondDelayStandalone() Option {
}
}

func WithDisableBroadcastMasterExec() Option {
return func(config *WorkerConfig) {
config.broadcastMasterExecEnable = false
}
}

func WithBroadcastDispatchRetryTimes(broadcastDispatchRetryTimes int32) Option {
return func(config *WorkerConfig) {
config.broadcastDispatchRetryTimes = broadcastDispatchRetryTimes
}
}

func WithMapMasterPageSize(mapMasterPageSize int32) Option {
return func(config *WorkerConfig) {
config.mapMasterPageSize = mapMasterPageSize
Expand All @@ -90,6 +97,12 @@ func WithMapMasterDispatcherSize(mapMasterDispatcherSize int32) Option {
}
}

func WithMapMasterStatusCheckInterval(mapMasterStatusCheckInterval time.Duration) Option {
return func(config *WorkerConfig) {
config.mapMasterStatusCheckInterval = mapMasterStatusCheckInterval
}
}

func WithSharePoolSize(sharePoolSize int32) Option {
return func(config *WorkerConfig) {
config.sharePoolSize = sharePoolSize
Expand All @@ -114,9 +127,9 @@ func WithTaskBodySizeMax(taskBodySizeMax int32) Option {
}
}

func WithWorkerLabel(workerLabel string) Option {
func WithActorSystemPort(port int32) Option {
return func(config *WorkerConfig) {
config.workerLabel = workerLabel
config.actorSystemPort = port
}
}

Expand All @@ -131,23 +144,21 @@ func NewWorkerConfig(opts ...Option) *WorkerConfig {
}

type WorkerConfig struct {
isDebugMode bool
isShareContainerPool bool
isMapMasterFailover bool
isSecondDelayIntervalMS bool
isDispatchSecondDelayStandalone bool
broadcastMasterExecEnable bool
broadcastDispatchRetryTimes int32
mapMasterPageSize int32
mapMasterQueueSize int32
mapMasterDispatcherSize int32
mapMasterStatusCheckInterval time.Duration
sharePoolSize int32
workerParallelTaskMaxSize int32
workerMapPageSize int32
taskBodySizeMax int32
workerLabel string
}

func (w *WorkerConfig) IsDebugMode() bool {
return w.isDebugMode
actorSystemPort int32
}

func (w *WorkerConfig) IsShareContainerPool() bool {
Expand All @@ -166,6 +177,14 @@ func (w *WorkerConfig) IsDispatchSecondDelayStandalone() bool {
return w.isDispatchSecondDelayStandalone
}

func (w *WorkerConfig) BroadcastMasterExecEnable() bool {
return w.broadcastMasterExecEnable
}

func (w *WorkerConfig) BroadcastDispatchRetryTimes() int32 {
return w.broadcastDispatchRetryTimes
}

func (w *WorkerConfig) MapMasterPageSize() int32 {
return w.mapMasterPageSize
}
Expand All @@ -178,6 +197,10 @@ func (w *WorkerConfig) MapMasterDispatcherSize() int32 {
return w.mapMasterDispatcherSize
}

func (w *WorkerConfig) MapMasterStatusCheckInterval() time.Duration {
return w.mapMasterStatusCheckInterval
}

func (w *WorkerConfig) SharePoolSize() int32 {
return w.sharePoolSize
}
Expand All @@ -194,22 +217,24 @@ func (w *WorkerConfig) TaskBodySizeMax() int32 {
return w.taskBodySizeMax
}

func (w *WorkerConfig) WorkerLabel() string {
return w.workerLabel
func (w *WorkerConfig) ActorSystemPort() int32 {
return w.actorSystemPort
}

func defaultWorkerConfig() *WorkerConfig {
return &WorkerConfig{
isDebugMode: false,
isSecondDelayIntervalMS: false,
isShareContainerPool: false,
isDispatchSecondDelayStandalone: false,
isMapMasterFailover: true,
broadcastMasterExecEnable: true,
broadcastDispatchRetryTimes: constants.BroadcastDispatchRetryTimesDefault,
mapMasterPageSize: constants.MapMasterPageSizeDefault,
mapMasterQueueSize: constants.MapMasterQueueSizeDefault,
mapMasterDispatcherSize: constants.MapMasterDispatcherSizeDefault,
mapMasterStatusCheckInterval: constants.MapMasterStatusCheckIntervalDefault,
sharePoolSize: constants.SharedPoolSizeDefault,
workerParallelTaskMaxSize: constants.ParallelTaskListSizeMax,
workerParallelTaskMaxSize: constants.ParallelTaskListSizeMaxDefault,
workerMapPageSize: constants.WorkerMapPageSizeDefault,
taskBodySizeMax: constants.TaskBodySizeMaxDefault,
}
Expand Down
7 changes: 2 additions & 5 deletions config/worker_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ package config
import "testing"

func TestStructAssign(t *testing.T) {
config := NewWorkerConfig(WithDebugMode())
if config.isDebugMode == false {
t.Fatalf("Except enableMapMasterFailover=true, but got false")
}
if config.isMapMasterFailover == false {
config := NewWorkerConfig(WithDisableMapMasterFailover())
if config.isMapMasterFailover == true {
t.Fatalf("Except enableMapMasterFailover=true, but got false")
}
}
40 changes: 40 additions & 0 deletions example/broadcast/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"github.com/alibaba/schedulerx-worker-go"
)

func main() {
// This is just an example, the real configuration needs to be obtained from the platform
cfg := &schedulerx.Config{
Endpoint: "acm.aliyun.com",
Namespace: "a0e3ffd7-xxx-xxx-xxx-86ca9dc68932",
GroupId: "dts-demo",
AppKey: "xxxxx",
}
client, err := schedulerx.GetClient(cfg)
if err != nil {
panic(err)
}

// The name TestBroadcast registered here must be consistent with the configured on the platform
task := &TestBroadcast{}
client.RegisterTask("TestBroadcast", task)
select {}
}
69 changes: 69 additions & 0 deletions example/broadcast/test_broadcast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"fmt"
"math/rand"
"strconv"

"github.com/alibaba/schedulerx-worker-go/processor"
"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
"github.com/alibaba/schedulerx-worker-go/processor/taskstatus"
)

var _ processor.BroadcastProcessor = &TestBroadcast{}

type TestBroadcast struct{}

// Process all machines would execute it.
func (t TestBroadcast) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
value := rand.Intn(10)
fmt.Printf("Total sharding num=%d, sharding=%d, value=%d\n", ctx.ShardingNum(), ctx.ShardingId(), value)
ret := new(processor.ProcessResult)
ret.SetSucceed()
ret.SetResult(strconv.Itoa(value))
return ret, nil
}

// PreProcess only one machine will execute it.
func (t TestBroadcast) PreProcess(ctx *jobcontext.JobContext) error {
fmt.Println("TestBroadcastJob PreProcess")
return nil
}

// PostProcess only one machine will execute it.
func (t TestBroadcast) PostProcess(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
fmt.Println("TestBroadcastJob PostProcess")
allTaskResults := ctx.TaskResults()
allTaskStatuses := ctx.TaskStatuses()
num := 0

for key, val := range allTaskResults {
fmt.Printf("%v:%v\n", key, val)
if allTaskStatuses[key] == taskstatus.TaskStatusSucceed {
valInt, _ := strconv.Atoi(val)
num += valInt
}
}

fmt.Printf("TestBroadcastJob PostProcess(), num=%d\n", num)
ret := new(processor.ProcessResult)
ret.SetSucceed()
ret.SetResult(strconv.Itoa(num))
return ret, nil
}
Loading

0 comments on commit 772e642

Please sign in to comment.