Skip to content

Commit

Permalink
split agent into vm and container
Browse files Browse the repository at this point in the history
Signed-off-by: Benedict Schlueter <[email protected]>
  • Loading branch information
benschlueter committed Nov 2, 2024
1 parent e937b3c commit 414c9f7
Show file tree
Hide file tree
Showing 37 changed files with 2,142 additions and 1,973 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ add_custom_target(images ALL
#
add_custom_target(delegatio-agent ALL
go build -o ${CMAKE_BINARY_DIR}/delegatio-agent
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/agent/server
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/agent/vm
BYPRODUCTS delegatio-agent
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) Benedict Schlueter
*/

package vmapi
package containerapi

import (
"context"
Expand All @@ -13,7 +13,7 @@ import (
"strconv"
"syscall"

"github.com/benschlueter/delegatio/agent/vmapi/vmproto"
"github.com/benschlueter/delegatio/agent/manageapi/manageproto"
"github.com/benschlueter/delegatio/internal/config"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand All @@ -22,8 +22,8 @@ import (
"k8s.io/client-go/tools/remotecommand"
)

// VMAPI interface contains functions to access the agent.
type VMAPI interface {
// ContainerAPI interface contains functions to access the container agent.
type ContainerAPI interface {
CreateExecInPodgRPC(context.Context, string, *config.KubeExecConfig) error
WriteFileInPodgRPC(context.Context, string, *config.KubeFileWriteConfig) error
}
Expand All @@ -33,7 +33,7 @@ type API struct {
logger *zap.Logger
core Core
dialer Dialer
vmproto.UnimplementedAPIServer
manageproto.UnimplementedAPIServer
}

// New creates a new API.
Expand Down Expand Up @@ -74,9 +74,9 @@ func (a *API) WriteFileInPodgRPC(ctx context.Context, endpoint string, conf *con
return err
}
defer conn.Close()
client := vmproto.NewAPIClient(conn)
client := manageproto.NewAPIClient(conn)
_, err = client.WriteFile(ctx,
&vmproto.WriteFileRequest{
&manageproto.WriteFileRequest{
Filepath: conf.FilePath,
Filename: conf.FileName,
Content: conf.FileData,
Expand All @@ -98,14 +98,14 @@ func (a *API) CreateExecInPodgRPC(ctx context.Context, endpoint string, conf *co
return err
}
defer conn.Close()
client := vmproto.NewAPIClient(conn)
client := manageproto.NewAPIClient(conn)
resp, err := client.ExecCommandStream(ctx)
if err != nil {
return err
}
err = resp.Send(&vmproto.ExecCommandStreamRequest{
Content: &vmproto.ExecCommandStreamRequest_Command{
Command: &vmproto.ExecCommandRequest{
err = resp.Send(&manageproto.ExecCommandStreamRequest{
Content: &manageproto.ExecCommandStreamRequest_Command{
Command: &manageproto.ExecCommandRequest{
Command: conf.Command,
Tty: conf.Tty,
},
Expand All @@ -132,7 +132,7 @@ func (a *API) CreateExecInPodgRPC(ctx context.Context, endpoint string, conf *co
return err
}

func (a *API) termSizeHandler(ctx context.Context, resp vmproto.API_ExecCommandStreamClient, resizeData remotecommand.TerminalSizeQueue) error {
func (a *API) termSizeHandler(ctx context.Context, resp manageproto.API_ExecCommandStreamClient, resizeData remotecommand.TerminalSizeQueue) error {
queue := make(chan *remotecommand.TerminalSize, 1)
go func() {
for {
Expand All @@ -155,9 +155,9 @@ func (a *API) termSizeHandler(ctx context.Context, resp vmproto.API_ExecCommandS
a.logger.Debug("terminalSizeHandler queue closed")
return errors.New("window size queue closed")
}
err := resp.Send(&vmproto.ExecCommandStreamRequest{
Content: &vmproto.ExecCommandStreamRequest_Termsize{
Termsize: &vmproto.TerminalSizeRequest{
err := resp.Send(&manageproto.ExecCommandStreamRequest{
Content: &manageproto.ExecCommandStreamRequest_Termsize{
Termsize: &manageproto.TerminalSizeRequest{
Width: int32(item.Width),
Height: int32(item.Height),
},
Expand All @@ -173,7 +173,7 @@ func (a *API) termSizeHandler(ctx context.Context, resp vmproto.API_ExecCommandS

// receiver is called from the agent.
// It receives data from the agent and writes it to the SSH Client (end-user).
func (a *API) receiver(ctx context.Context, cancel context.CancelFunc, resp vmproto.API_ExecCommandStreamClient, stdout io.Writer, stderr io.Writer) error {
func (a *API) receiver(ctx context.Context, cancel context.CancelFunc, resp manageproto.API_ExecCommandStreamClient, stdout io.Writer, stderr io.Writer) error {
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -207,7 +207,7 @@ func (a *API) receiver(ctx context.Context, cancel context.CancelFunc, resp vmpr

// we don't need to cancel the context. If we fail to send something receiving will either return EOF or an error.
// Thus the receiver will stop and cancel the context.
func (a *API) sender(ctx context.Context, resp vmproto.API_ExecCommandStreamClient, stdin io.Reader) error {
func (a *API) sender(ctx context.Context, resp manageproto.API_ExecCommandStreamClient, stdin io.Reader) error {
// g, _ := errgroup.WithContext(ctx)
errChan := make(chan error, 1)

Expand All @@ -227,8 +227,8 @@ func (a *API) sender(ctx context.Context, resp vmproto.API_ExecCommandStreamClie
errChan <- err
return
}
err = resp.Send(&vmproto.ExecCommandStreamRequest{
Content: &vmproto.ExecCommandStreamRequest_Stdin{
err = resp.Send(&manageproto.ExecCommandStreamRequest{
Content: &manageproto.ExecCommandStreamRequest_Stdin{
Stdin: copier[:n],
},
})
Expand Down
8 changes: 8 additions & 0 deletions agent/container/containerapi/core.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/* SPDX-License-Identifier: AGPL-3.0-only
* Copyright (c) Benedict Schlueter
*/

package containerapi

// Core interface contains functions to access the state Core data.
type Core interface{}
27 changes: 27 additions & 0 deletions agent/container/core/core.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/* SPDX-License-Identifier: AGPL-3.0-only
* Copyright (c) Benedict Schlueter
* Copyright (c) Edgeless Systems GmbH
*/

package core

import (
"sync"

"go.uber.org/zap"
)

// Core is responsible for maintaining state information of the container-agent.
type Core struct {
zaplogger *zap.Logger
mux sync.Mutex
}

// NewCore creates and initializes a new Core object.
func NewCore(zapLogger *zap.Logger) (*Core, error) {
c := &Core{
zaplogger: zapLogger,
mux: sync.Mutex{},
}
return c, nil
}
File renamed without changes.
82 changes: 82 additions & 0 deletions agent/container/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/* SPDX-License-Identifier: AGPL-3.0-only
* Copyright (c) Edgeless Systems GmbH
* Copyright (c) Benedict Schlueter
* Copyright (c) Leonard Cohnen
*/

package main

import (
"context"
"net"
"sync"

"github.com/benschlueter/delegatio/agent/manageapi"
"github.com/benschlueter/delegatio/agent/manageapi/manageproto"
"github.com/benschlueter/delegatio/agent/vm/core"
"github.com/benschlueter/delegatio/agent/vm/core/state"
"github.com/benschlueter/delegatio/internal/config"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

var version = "0.0.0"

/*
* This will run on the VM's bare matel. We try to contact the control plane
* via the loadbalancerIPAddr to give us the join token. At the same time
* we are waiting for the init-request from a user *only* if we are a control plane.
*/
func run(dialer manageapi.Dialer, bindIP, bindPort string, zapLoggerCore *zap.Logger, containerMode *bool, loadbalancerIPAddr string) {
defer func() { _ = zapLoggerCore.Sync() }()
zapLoggerCore.Info("starting delegatio agent", zap.String("version", version), zap.String("commit", config.Commit))

if *containerMode {
zapLoggerCore.Info("running in container mode")
} else {
zapLoggerCore.Info("running in qemu mode")
}

core, err := core.NewCore(zapLoggerCore, loadbalancerIPAddr)
if err != nil {
zapLoggerCore.Fatal("failed to create core", zap.Error(err))
}

vapi := manageapi.New(zapLoggerCore.Named("vmapi"), core, dialer)
zapLoggergRPC := zapLoggerCore.Named("gRPC")

grpcServer := grpc.NewServer(
grpc.Creds(insecure.NewCredentials()),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpc_ctxtags.StreamServerInterceptor(),
grpc_zap.StreamServerInterceptor(zapLoggergRPC),
)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc_ctxtags.UnaryServerInterceptor(),
grpc_zap.UnaryServerInterceptor(zapLoggergRPC),
)),
)
manageproto.RegisterAPIServer(grpcServer, vapi)

lis, err := net.Listen("tcp", net.JoinHostPort(bindIP, bindPort))
if err != nil {
zapLoggergRPC.Fatal("failed to create listener", zap.Error(err))
}
zapLoggergRPC.Info("server listener created", zap.String("address", lis.Addr().String()))
core.State.Advance(state.AcceptingInit)
go core.TryJoinCluster(context.Background())

var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
go func() {
defer wg.Done()
if err := grpcServer.Serve(lis); err != nil {
zapLoggergRPC.Fatal("failed to serve gRPC", zap.Error(err))
}
}()
}
8 changes: 8 additions & 0 deletions agent/manageapi/core.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/* SPDX-License-Identifier: AGPL-3.0-only
* Copyright (c) Benedict Schlueter
*/

package manageapi

// Core interface contains functions to access the state Core data.
type Core interface{}
34 changes: 17 additions & 17 deletions agent/vmapi/exec.go → agent/manageapi/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) Benedict Schlueter
*/

package vmapi
package manageapi

import (
"bytes"
Expand All @@ -12,7 +12,7 @@ import (
"io"
"os/exec"

"github.com/benschlueter/delegatio/agent/vmapi/vmproto"
"github.com/benschlueter/delegatio/agent/manageapi/manageproto"
"github.com/creack/pty"
"go.uber.org/zap"
"golang.org/x/sys/unix"
Expand All @@ -25,7 +25,7 @@ func setSize(fd uintptr, size *TerminalSize) error {
return unix.IoctlSetWinsize(int(fd), unix.TIOCSWINSZ, winsize)
}

func (a *API) ttyCmd(execCmd *exec.Cmd, stdin io.Reader, stdout io.WriteCloser, handler *TerminalSizeHandler) error {
func (a *ManageAPI) ttyCmd(execCmd *exec.Cmd, stdin io.Reader, stdout io.WriteCloser, handler *TerminalSizeHandler) error {
p, err := pty.Start(execCmd)
if err != nil {
return err
Expand Down Expand Up @@ -67,7 +67,7 @@ func (a *API) ttyCmd(execCmd *exec.Cmd, stdin io.Reader, stdout io.WriteCloser,

// ExecCommandStream executes a command in the VM and streams the output to the caller.
// This is useful if the command needs much time to run and we want to log the current state, i.e. kubeadm.
func (a *API) ExecCommandStream(srv vmproto.API_ExecCommandStreamServer) error {
func (a *ManageAPI) ExecCommandStream(srv manageproto.API_ExecCommandStreamServer) error {
a.logger.Info("ExecCommandStream")
in, err := srv.Recv()
if err != nil {
Expand All @@ -84,17 +84,17 @@ func (a *API) ExecCommandStream(srv vmproto.API_ExecCommandStreamServer) error {

errorStreamWriter := &streamWriterWrapper{
forwardFunc: func(b []byte) error {
return srv.Send(&vmproto.ExecCommandStreamResponse{
Content: &vmproto.ExecCommandStreamResponse_Stderr{
return srv.Send(&manageproto.ExecCommandStreamResponse{
Content: &manageproto.ExecCommandStreamResponse_Stderr{
Stderr: b,
},
})
},
}
stdoutStreamWrtier := &streamWriterWrapper{
forwardFunc: func(b []byte) error {
return srv.Send(&vmproto.ExecCommandStreamResponse{
Content: &vmproto.ExecCommandStreamResponse_Stdout{
return srv.Send(&manageproto.ExecCommandStreamResponse{
Content: &manageproto.ExecCommandStreamResponse_Stdout{
Stdout: b,
},
})
Expand Down Expand Up @@ -161,8 +161,8 @@ func (a *API) ExecCommandStream(srv vmproto.API_ExecCommandStreamServer) error {
exitCode = -1
}
// Instead of done we should return the exit code of the command.
if err := srv.Send(&vmproto.ExecCommandStreamResponse{
Content: &vmproto.ExecCommandStreamResponse_Err{
if err := srv.Send(&manageproto.ExecCommandStreamResponse{
Content: &manageproto.ExecCommandStreamResponse_Err{
Err: fmt.Sprint(exitCode),
},
}); err != nil {
Expand All @@ -174,13 +174,13 @@ func (a *API) ExecCommandStream(srv vmproto.API_ExecCommandStreamServer) error {

// ExecCommandReturnStream executes a command in the VM and streams the output to the caller.
// This is useful if the command needs much time to run and we want to log the current state, i.e. kubeadm.
func (a *API) ExecCommandReturnStream(in *vmproto.ExecCommandRequest, srv vmproto.API_ExecCommandReturnStreamServer) error {
func (a *ManageAPI) ExecCommandReturnStream(in *manageproto.ExecCommandRequest, srv manageproto.API_ExecCommandReturnStreamServer) error {
a.logger.Info("request to execute command", zap.String("command", in.Command), zap.Strings("args", in.Args))
command := exec.Command(in.Command, in.Args...)
streamer := &streamWriterWrapper{forwardFunc: func(b []byte) error {
return srv.Send(&vmproto.ExecCommandReturnStreamResponse{
Content: &vmproto.ExecCommandReturnStreamResponse_Log{
Log: &vmproto.Log{
return srv.Send(&manageproto.ExecCommandReturnStreamResponse{
Content: &manageproto.ExecCommandReturnStreamResponse_Log{
Log: &manageproto.Log{
Message: string(b),
},
},
Expand All @@ -198,18 +198,18 @@ func (a *API) ExecCommandReturnStream(in *vmproto.ExecCommandRequest, srv vmprot
if err := command.Wait(); err != nil {
return status.Errorf(codes.Internal, "command exited with error code: %v and output: %s", err, stdoutBuf.Bytes())
}
return srv.Send(&vmproto.ExecCommandReturnStreamResponse{Content: &vmproto.ExecCommandReturnStreamResponse_Output{Output: stdoutBuf.Bytes()}})
return srv.Send(&manageproto.ExecCommandReturnStreamResponse{Content: &manageproto.ExecCommandReturnStreamResponse_Output{Output: stdoutBuf.Bytes()}})
}

// ExecCommand executes a command in the VM.
func (a *API) ExecCommand(_ context.Context, in *vmproto.ExecCommandRequest) (*vmproto.ExecCommandResponse, error) {
func (a *ManageAPI) ExecCommand(_ context.Context, in *manageproto.ExecCommandRequest) (*manageproto.ExecCommandResponse, error) {
a.logger.Info("request to execute command", zap.String("command", in.Command), zap.Strings("args", in.Args))
command := exec.Command(in.Command, in.Args...)
output, err := command.Output()
if err != nil {
return nil, status.Errorf(codes.Internal, "command exited with error code: %v and output: %s", err, string(output))
}
return &vmproto.ExecCommandResponse{Output: output}, nil
return &manageproto.ExecCommandResponse{Output: output}, nil
}

type streamWriterWrapper struct {
Expand Down
Loading

0 comments on commit 414c9f7

Please sign in to comment.