Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Self Join + Initial GCP Code #202

Merged
merged 3 commits into from
Nov 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/build
/images
/images
/3rdParty
13 changes: 13 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,25 @@
# Unignore all dirs
!*/

*.tfstate

!/images/mkosi.skeleton/**
/images/mkosi.skeleton/usr/bin/delegatio-agent
!/container/sshd_config

/images/*qcow2*
/images/*.cmdline
/images/mkosi.cache/**
/images/.cache/**
/build
/3rdParty
/cli/infrastructure/terraform/gcp/delegatio.json

# Terraform
*.tfstate
*.tfstate.backup
.terraform
.terraform.tfstate.lock.info
*.tfvars
terraform.log
delegatio.json
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ add_custom_target(images ALL
#
add_custom_target(delegatio-agent ALL
go build -o ${CMAKE_BINARY_DIR}/delegatio-agent
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/agent/server
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/agent/vm
BYPRODUCTS delegatio-agent
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) Benedict Schlueter
*/

package vmapi
package containerapi

import (
"context"
Expand All @@ -13,7 +13,7 @@ import (
"strconv"
"syscall"

"github.com/benschlueter/delegatio/agent/vmapi/vmproto"
"github.com/benschlueter/delegatio/agent/manageapi/manageproto"
"github.com/benschlueter/delegatio/internal/config"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand All @@ -22,8 +22,8 @@ import (
"k8s.io/client-go/tools/remotecommand"
)

// VMAPI interface contains functions to access the agent.
type VMAPI interface {
// ContainerAPI interface contains functions to access the container agent.
type ContainerAPI interface {
CreateExecInPodgRPC(context.Context, string, *config.KubeExecConfig) error
WriteFileInPodgRPC(context.Context, string, *config.KubeFileWriteConfig) error
}
Expand All @@ -33,7 +33,7 @@ type API struct {
logger *zap.Logger
core Core
dialer Dialer
vmproto.UnimplementedAPIServer
manageproto.UnimplementedAPIServer
}

// New creates a new API.
Expand Down Expand Up @@ -74,9 +74,9 @@ func (a *API) WriteFileInPodgRPC(ctx context.Context, endpoint string, conf *con
return err
}
defer conn.Close()
client := vmproto.NewAPIClient(conn)
client := manageproto.NewAPIClient(conn)
_, err = client.WriteFile(ctx,
&vmproto.WriteFileRequest{
&manageproto.WriteFileRequest{
Filepath: conf.FilePath,
Filename: conf.FileName,
Content: conf.FileData,
Expand All @@ -98,14 +98,14 @@ func (a *API) CreateExecInPodgRPC(ctx context.Context, endpoint string, conf *co
return err
}
defer conn.Close()
client := vmproto.NewAPIClient(conn)
client := manageproto.NewAPIClient(conn)
resp, err := client.ExecCommandStream(ctx)
if err != nil {
return err
}
err = resp.Send(&vmproto.ExecCommandStreamRequest{
Content: &vmproto.ExecCommandStreamRequest_Command{
Command: &vmproto.ExecCommandRequest{
err = resp.Send(&manageproto.ExecCommandStreamRequest{
Content: &manageproto.ExecCommandStreamRequest_Command{
Command: &manageproto.ExecCommandRequest{
Command: conf.Command,
Tty: conf.Tty,
},
Expand All @@ -132,7 +132,7 @@ func (a *API) CreateExecInPodgRPC(ctx context.Context, endpoint string, conf *co
return err
}

func (a *API) termSizeHandler(ctx context.Context, resp vmproto.API_ExecCommandStreamClient, resizeData remotecommand.TerminalSizeQueue) error {
func (a *API) termSizeHandler(ctx context.Context, resp manageproto.API_ExecCommandStreamClient, resizeData remotecommand.TerminalSizeQueue) error {
queue := make(chan *remotecommand.TerminalSize, 1)
go func() {
for {
Expand All @@ -155,9 +155,9 @@ func (a *API) termSizeHandler(ctx context.Context, resp vmproto.API_ExecCommandS
a.logger.Debug("terminalSizeHandler queue closed")
return errors.New("window size queue closed")
}
err := resp.Send(&vmproto.ExecCommandStreamRequest{
Content: &vmproto.ExecCommandStreamRequest_Termsize{
Termsize: &vmproto.TerminalSizeRequest{
err := resp.Send(&manageproto.ExecCommandStreamRequest{
Content: &manageproto.ExecCommandStreamRequest_Termsize{
Termsize: &manageproto.TerminalSizeRequest{
Width: int32(item.Width),
Height: int32(item.Height),
},
Expand All @@ -173,7 +173,7 @@ func (a *API) termSizeHandler(ctx context.Context, resp vmproto.API_ExecCommandS

// receiver is called from the agent.
// It receives data from the agent and writes it to the SSH Client (end-user).
func (a *API) receiver(ctx context.Context, cancel context.CancelFunc, resp vmproto.API_ExecCommandStreamClient, stdout io.Writer, stderr io.Writer) error {
func (a *API) receiver(ctx context.Context, cancel context.CancelFunc, resp manageproto.API_ExecCommandStreamClient, stdout io.Writer, stderr io.Writer) error {
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -207,7 +207,7 @@ func (a *API) receiver(ctx context.Context, cancel context.CancelFunc, resp vmpr

// we don't need to cancel the context. If we fail to send something receiving will either return EOF or an error.
// Thus the receiver will stop and cancel the context.
func (a *API) sender(ctx context.Context, resp vmproto.API_ExecCommandStreamClient, stdin io.Reader) error {
func (a *API) sender(ctx context.Context, resp manageproto.API_ExecCommandStreamClient, stdin io.Reader) error {
// g, _ := errgroup.WithContext(ctx)
errChan := make(chan error, 1)

Expand All @@ -227,8 +227,8 @@ func (a *API) sender(ctx context.Context, resp vmproto.API_ExecCommandStreamClie
errChan <- err
return
}
err = resp.Send(&vmproto.ExecCommandStreamRequest{
Content: &vmproto.ExecCommandStreamRequest_Stdin{
err = resp.Send(&manageproto.ExecCommandStreamRequest{
Content: &manageproto.ExecCommandStreamRequest_Stdin{
Stdin: copier[:n],
},
})
Expand Down
8 changes: 8 additions & 0 deletions agent/container/containerapi/core.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/* SPDX-License-Identifier: AGPL-3.0-only
* Copyright (c) Benedict Schlueter
*/

package containerapi

// Core interface contains functions to access the state Core data.
type Core interface{}
9 changes: 6 additions & 3 deletions agent/core/core.go → agent/container/core/core.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
/* SPDX-License-Identifier: AGPL-3.0-only
* Copyright (c) Benedict Schlueter
* Copyright (c) Edgeless Systems GmbH
*/

package core

import (
"sync"

"go.uber.org/zap"
)

// Core is responsible for maintaining state information
// of the VM-agent. Currently we do not need any state.
// Core is responsible for maintaining state information of the container-agent.
type Core struct {
zaplogger *zap.Logger
mux sync.Mutex
}

// NewCore creates and initializes a new Core object.
func NewCore(zapLogger *zap.Logger) (*Core, error) {
c := &Core{
zaplogger: zapLogger,
mux: sync.Mutex{},
}

return c, nil
}
32 changes: 31 additions & 1 deletion agent/server/main.go → agent/container/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"log"
"net"

"cloud.google.com/go/compute/metadata"
"github.com/benschlueter/delegatio/internal/config"
"github.com/benschlueter/delegatio/internal/config/definitions"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -46,5 +48,33 @@ func main() {
bindPort = config.PublicAPIport
dialer := &net.Dialer{}

run(dialer, bindIP, bindPort, zapLoggerCore, containerMode)
var ipAddr string
if metadata.OnGCE() {
ipAddr, err = metadata.InstanceAttributeValue("loadbalancer")
if err != nil {
zapLoggerCore.Fatal("failed to get loadbalancer ip from metadata | not running in cloud", zap.Error(err))
}

localIP, err := metadata.InternalIP()
if err != nil {
zapLoggerCore.Fatal("failed to get local ip from metadata", zap.Error(err))
}
zapLoggerCore.Info("local ip", zap.String("ip", localIP))

attr, err := metadata.ProjectAttributes()
if err != nil {
zapLoggerCore.Fatal("failed to get project attributes from metadata", zap.Error(err))
}
zapLoggerCore.Info("project attributes", zap.Any("attributes", attr))

iattr, err := metadata.InstanceAttributes()
if err != nil {
zapLoggerCore.Fatal("failed to get instance attributes from metadata", zap.Error(err))
}
zapLoggerCore.Info("instance attributes", zap.Any("attributes", iattr))
} else {
ipAddr = definitions.NetworkXMLConfig.IPs[0].Address
}

run(dialer, bindIP, bindPort, zapLoggerCore, containerMode, ipAddr)
}
82 changes: 82 additions & 0 deletions agent/container/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/* SPDX-License-Identifier: AGPL-3.0-only
* Copyright (c) Edgeless Systems GmbH
* Copyright (c) Benedict Schlueter
* Copyright (c) Leonard Cohnen
*/

package main

import (
"context"
"net"
"sync"

"github.com/benschlueter/delegatio/agent/manageapi"
"github.com/benschlueter/delegatio/agent/manageapi/manageproto"
"github.com/benschlueter/delegatio/agent/vm/core"
"github.com/benschlueter/delegatio/agent/vm/core/state"
"github.com/benschlueter/delegatio/internal/config"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

var version = "0.0.0"

/*
* This will run on the VM's bare matel. We try to contact the control plane
* via the loadbalancerIPAddr to give us the join token. At the same time
* we are waiting for the init-request from a user *only* if we are a control plane.
*/
func run(dialer manageapi.Dialer, bindIP, bindPort string, zapLoggerCore *zap.Logger, containerMode *bool, loadbalancerIPAddr string) {
defer func() { _ = zapLoggerCore.Sync() }()
zapLoggerCore.Info("starting delegatio agent", zap.String("version", version), zap.String("commit", config.Commit))

if *containerMode {
zapLoggerCore.Info("running in container mode")
} else {
zapLoggerCore.Info("running in qemu mode")
}

core, err := core.NewCore(zapLoggerCore, loadbalancerIPAddr)
if err != nil {
zapLoggerCore.Fatal("failed to create core", zap.Error(err))
}

vapi := manageapi.New(zapLoggerCore.Named("vmapi"), core, dialer)
zapLoggergRPC := zapLoggerCore.Named("gRPC")

grpcServer := grpc.NewServer(
grpc.Creds(insecure.NewCredentials()),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpc_ctxtags.StreamServerInterceptor(),
grpc_zap.StreamServerInterceptor(zapLoggergRPC),
)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc_ctxtags.UnaryServerInterceptor(),
grpc_zap.UnaryServerInterceptor(zapLoggergRPC),
)),
)
manageproto.RegisterAPIServer(grpcServer, vapi)

lis, err := net.Listen("tcp", net.JoinHostPort(bindIP, bindPort))
if err != nil {
zapLoggergRPC.Fatal("failed to create listener", zap.Error(err))
}
zapLoggergRPC.Info("server listener created", zap.String("address", lis.Addr().String()))
core.State.Advance(state.AcceptingInit)
go core.TryJoinCluster(context.Background())

var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
go func() {
defer wg.Done()
if err := grpcServer.Serve(lis); err != nil {
zapLoggergRPC.Fatal("failed to serve gRPC", zap.Error(err))
}
}()
}
2 changes: 1 addition & 1 deletion agent/vmapi/core.go → agent/manageapi/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) Benedict Schlueter
*/

package vmapi
package manageapi

// Core interface contains functions to access the state Core data.
type Core interface{}
Loading
Loading