diff --git a/config/worker_config.go b/config/worker_config.go index b51127b..42271db 100644 --- a/config/worker_config.go +++ b/config/worker_config.go @@ -127,9 +127,15 @@ func WithTaskBodySizeMax(taskBodySizeMax int32) Option { } } -func WithActorSystemPort(port int32) Option { +func WithGrpcPort(port int32) Option { return func(config *WorkerConfig) { - config.actorSystemPort = port + config.grpcPort = port + } +} + +func WithIface(iface string) Option { + return func(config *WorkerConfig) { + config.iface = iface } } @@ -158,7 +164,8 @@ type WorkerConfig struct { workerParallelTaskMaxSize int32 workerMapPageSize int32 taskBodySizeMax int32 - actorSystemPort int32 + grpcPort int32 + iface string } func (w *WorkerConfig) IsShareContainerPool() bool { @@ -217,8 +224,12 @@ func (w *WorkerConfig) TaskBodySizeMax() int32 { return w.taskBodySizeMax } -func (w *WorkerConfig) ActorSystemPort() int32 { - return w.actorSystemPort +func (w *WorkerConfig) GrpcPort() int32 { + return w.grpcPort +} + +func (w *WorkerConfig) Iface() string { + return w.iface } func defaultWorkerConfig() *WorkerConfig { diff --git a/example/broadcast/main.go b/example/broadcast/main.go index 0e8d199..3c0fe86 100644 --- a/example/broadcast/main.go +++ b/example/broadcast/main.go @@ -24,9 +24,9 @@ func main() { // This is just an example, the real configuration needs to be obtained from the platform cfg := &schedulerx.Config{ Endpoint: "acm.aliyun.com", - Namespace: "a0e3ffd7-xxx-xxx-xxx-86ca9dc68932", - GroupId: "dts-demo", - AppKey: "xxxxx", + Namespace: "fa6ed99e-1469-4477-855c-a2bf1659d039", + GroupId: "xueren_test_sub", + AppKey: "myV5K5Xaf1knuzKdPBaj3A==", } client, err := schedulerx.GetClient(cfg) if err != nil { diff --git a/example/mapreduce/main.go b/example/mapreduce/main.go index d08e4f4..50aa392 100644 --- a/example/mapreduce/main.go +++ b/example/mapreduce/main.go @@ -25,9 +25,9 @@ func main() { // This is just an example, the real configuration needs to be obtained from the platform cfg := &schedulerx.Config{ Endpoint: "acm.aliyun.com", - Namespace: "a0e3ffd7-xxx-xxx-xxx-86ca9dc68932", - GroupId: "dts-demo", - AppKey: "xxxxx", + Namespace: "fa6ed99e-1469-4477-855c-a2bf1659d039", + GroupId: "xueren_test_sub", + AppKey: "myV5K5Xaf1knuzKdPBaj3A==", } client, err := schedulerx.GetClient(cfg) if err != nil { @@ -38,6 +38,6 @@ func main() { task := &TestMapReduceJob{ mapjob.NewMapReduceJobProcessor(), // FIXME how define user behavior } - client.RegisterTask("TestMapReduce", task) + client.RegisterTask("TestMapReduceJob", task) select {} } diff --git a/example/mapreduce/order_info.go b/example/mapreduce/order_info.go index 44e96eb..d081757 100644 --- a/example/mapreduce/order_info.go +++ b/example/mapreduce/order_info.go @@ -51,7 +51,7 @@ func (mr *TestMapReduceJob) Kill(jobCtx *jobcontext.JobContext) error { // Process the MapReduce model is used to distributed scan orders for timeout confirmation func (mr *TestMapReduceJob) Process(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) { var ( - num = 100 * 10000 + num = 1000 err error ) taskName := jobCtx.TaskName() @@ -76,8 +76,8 @@ func (mr *TestMapReduceJob) Process(jobCtx *jobcontext.JobContext) (*processor.P fmt.Printf("task is not OrderInfo, task=%+v\n", jobCtx.Task()) } fmt.Printf("orderInfo=%+v\n", orderInfo) - time.Sleep(1 * time.Second) - fmt.Println("Finish Process...") + time.Sleep(10 * time.Millisecond) + // fmt.Println("Finish Process...") return processor.NewProcessResult( processor.WithSucceed(), processor.WithResult(strconv.Itoa(orderInfo.Value)), diff --git a/internal/actor/common/utils.go b/internal/actor/common/utils.go index d90f3ce..08c54d0 100644 --- a/internal/actor/common/utils.go +++ b/internal/actor/common/utils.go @@ -25,7 +25,6 @@ import ( "github.com/asynkron/protoactor-go/actor" "github.com/alibaba/schedulerx-worker-go/internal/remoting/pool" - "github.com/alibaba/schedulerx-worker-go/internal/utils" ) const ( @@ -72,11 +71,6 @@ func SchedulerxServerPid(ctx context.Context) *actor.PID { // The workerAddr issued by the server is the address reported by the heartbeat. // It is the connection address obtained from the connection pool, not the ActorSystem address, so it needs to be converted. func GetRealWorkerAddr(workerIdAddr string) string { - localHostAddr, err := utils.GetIpv4AddrHost() - if err != nil { - panic(err) - } - parts := strings.Split(workerIdAddr, "@") workerAddr := parts[1] addrParts := strings.Split(workerAddr, ":") @@ -92,12 +86,7 @@ func GetRealWorkerAddr(workerIdAddr string) string { panic(fmt.Sprintf("invalid worker addr: %s", workerAddr)) } - if addrParts[0] == localHostAddr { - // Debugging on local machine, starting multiple processes - host = "127.0.0.1" - } else { - host = addrParts[0] - } + host = addrParts[0] if len(addrParts) == 2 { port = addrParts[1] diff --git a/internal/actor/container_actor.go b/internal/actor/container_actor.go index 70aa33e..dd19f52 100644 --- a/internal/actor/container_actor.go +++ b/internal/actor/container_actor.go @@ -17,6 +17,7 @@ package actor import ( + "context" "encoding/json" "fmt" "runtime/debug" @@ -346,5 +347,6 @@ func convertMasterStartContainerRequest2JobContext(req *schedulerx.MasterStartCo jobCtx.SetShardingNum(req.GetShardingNum()) jobCtx.SetTimeType(req.GetTimeType()) jobCtx.SetTimeExpression(req.GetTimeExpression()) + jobCtx.Context = context.Background() return jobCtx, nil } diff --git a/internal/actor/init.go b/internal/actor/init.go index 36fb175..f9734ae 100644 --- a/internal/actor/init.go +++ b/internal/actor/init.go @@ -17,6 +17,7 @@ package actor import ( + "github.com/alibaba/schedulerx-worker-go/internal/utils" "github.com/asynkron/protoactor-go/actor" "github.com/asynkron/protoactor-go/remote" "google.golang.org/grpc" @@ -85,11 +86,25 @@ func InitActors(actorSystem *actor.ActorSystem) error { }() var ( - host = "127.0.0.1" + host = "0.0.0.0" port = 0 // random port ) - if actorSystemPort := config.GetWorkerConfig().ActorSystemPort(); actorSystemPort != 0 { - port = int(actorSystemPort) + if grpcPort := config.GetWorkerConfig().GrpcPort(); grpcPort != 0 { + port = int(grpcPort) + } + + if config.GetWorkerConfig().Iface() != "" { + localHost, err := utils.GetIpv4AddrByIface(config.GetWorkerConfig().Iface()) + if err != nil { + panic(err) + } + host = localHost + } else { + localHost, err := utils.GetIpv4AddrHost() + if err != nil { + panic(err) + } + host = localHost } // The maximum limit for a subtask is 64kb, and a maximum of 1000 batches can be sent together, which is 64MB, diff --git a/internal/utils/ip_util.go b/internal/utils/ip_util.go index bbeec88..f210250 100644 --- a/internal/utils/ip_util.go +++ b/internal/utils/ip_util.go @@ -56,6 +56,31 @@ func GetIpv4AddrHost() (string, error) { return "", errors.New("cannot find valid ipv4 addr") } +func GetIpv4AddrByIface(_iface string) (string, error) { + ifaces, err := net.Interfaces() + if err != nil { + return "", err + } + + // 遍历所有网卡 + for _, iface := range ifaces { + if iface.Name == _iface { // 指定网卡名称 + addrs, err := iface.Addrs() + if err != nil { + return "", err + } + // 遍历网卡的地址信息 + for _, addr := range addrs { + ip, _, _ := net.ParseCIDR(addr.String()) + if ip.To4() != nil { + return ip.String(), nil + } + } + } + } + return "", errors.New("cannot find valid ipv4 addr") +} + func ParseIPAddr(addr string) (string, int, error) { host, portStr, err := net.SplitHostPort(addr) if err != nil {