Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Benedict Schlueter <[email protected]>
  • Loading branch information
benschlueter committed Nov 6, 2024
1 parent 92910dd commit d307154
Show file tree
Hide file tree
Showing 12 changed files with 109 additions and 53 deletions.
3 changes: 2 additions & 1 deletion agent/container/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"flag"
"log"
"net"
"strconv"

"github.com/benschlueter/delegatio/internal/config"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
Expand Down Expand Up @@ -42,7 +43,7 @@ func main() {
zapLoggerCore := zapLogger.Named("core")

bindIP = config.DefaultIP
bindPort = config.PublicAPIport
bindPort = strconv.Itoa(config.AgentPort)
dialer := &net.Dialer{}

run(dialer, bindIP, bindPort, zapLoggerCore)
Expand Down
6 changes: 3 additions & 3 deletions agent/vm/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,20 +193,20 @@ func (c *Core) executeKubeadm(ctx context.Context, endpoint, token, caCert strin
}

// TryJoinCluster tries to join the cluster every 5 seconds until it succeeds.
func (c *Core) TryJoinCluster(ctx context.Context) {
func (c *Core) TryJoinCluster(ctx context.Context) error {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if c.State.Get() >= state.JoiningCluster {
return
return nil
}
if err := c.JoinCluster(ctx); err != nil {
c.zaplogger.Info("Failed to join cluster, retrying in 5 seconds", zap.Error(err))
}
case <-ctx.Done():
return
return nil
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions agent/vm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"flag"
"log"
"net"
"strconv"

"cloud.google.com/go/compute/metadata"
"github.com/benschlueter/delegatio/internal/config"
Expand All @@ -25,7 +26,7 @@ import (
* CLI rpc calls.
*/
func main() {
var bindIP, bindPort string
var bindIP, agentPort, vmPort string
cfg := zap.NewDevelopmentConfig()

logLevelUser := flag.Bool("debug", false, "enables gRPC debug output")
Expand All @@ -45,7 +46,8 @@ func main() {
zapLoggerCore := zapLogger.Named("core")

bindIP = config.DefaultIP
bindPort = config.PublicAPIport
agentPort = strconv.Itoa(config.AgentPort)
vmPort = strconv.Itoa(config.VMAgentPort)
dialer := &net.Dialer{}

var ipAddr string
Expand Down Expand Up @@ -76,5 +78,5 @@ func main() {
ipAddr = definitions.NetworkXMLConfig.IPs[0].Address
}

run(dialer, bindIP, bindPort, zapLoggerCore, containerMode, ipAddr)
run(dialer, bindIP, agentPort, vmPort, zapLoggerCore, containerMode, ipAddr)
}
80 changes: 66 additions & 14 deletions agent/vm/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ package main
import (
"context"
"net"
"sync"
"os/signal"
"syscall"

"github.com/benschlueter/delegatio/agent/manageapi"
"github.com/benschlueter/delegatio/agent/manageapi/manageproto"
Expand All @@ -22,6 +23,7 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
Expand All @@ -33,7 +35,7 @@ var version = "0.0.0"
* via the loadbalancerIPAddr to give us the join token. At the same time
* we are waiting for the init-request from a user *only* if we are a control plane.
*/
func run(dialer vmapi.Dialer, bindIP, bindPort string, zapLoggerCore *zap.Logger, containerMode *bool, loadbalancerIPAddr string) {
func run(dialer vmapi.Dialer, bindIP, agentPort, vmPort string, zapLoggerCore *zap.Logger, containerMode *bool, loadbalancerIPAddr string) {
defer func() { _ = zapLoggerCore.Sync() }()
zapLoggerCore.Info("starting delegatio agent", zap.String("version", version), zap.String("commit", config.Commit))

Expand Down Expand Up @@ -61,7 +63,7 @@ func run(dialer vmapi.Dialer, bindIP, bindPort string, zapLoggerCore *zap.Logger
}
zapLoggerCore.Info("TLS config generated")

grpcServer := grpc.NewServer(
grpcServerAgent := grpc.NewServer(
grpc.Creds(credentials.NewTLS(tlsConfig)),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpc_ctxtags.StreamServerInterceptor(),
Expand All @@ -72,24 +74,74 @@ func run(dialer vmapi.Dialer, bindIP, bindPort string, zapLoggerCore *zap.Logger
grpc_zap.UnaryServerInterceptor(zapLoggergRPC),
)),
)
vmproto.RegisterAPIServer(grpcServer, vapi)
manageproto.RegisterAPIServer(grpcServer, mapi)

lis, err := net.Listen("tcp", net.JoinHostPort(bindIP, bindPort))
grpcServerVM := grpc.NewServer(
grpc.Creds(credentials.NewTLS(tlsConfig)),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpc_ctxtags.StreamServerInterceptor(),
grpc_zap.StreamServerInterceptor(zapLoggergRPC),
)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc_ctxtags.UnaryServerInterceptor(),
grpc_zap.UnaryServerInterceptor(zapLoggergRPC),
)),
)

vmproto.RegisterAPIServer(grpcServerVM, vapi)

manageproto.RegisterAPIServer(grpcServerAgent, mapi)

lisAgent, err := net.Listen("tcp", net.JoinHostPort(bindIP, agentPort))
if err != nil {
zapLoggergRPC.Fatal("failed to create listener", zap.Error(err))
}
zapLoggergRPC.Info("server listener created", zap.String("address", lisAgent.Addr().String()))
lisVM, err := net.Listen("tcp", net.JoinHostPort(bindIP, vmPort))
if err != nil {
zapLoggergRPC.Fatal("failed to create listener", zap.Error(err))
}
zapLoggergRPC.Info("server listener created", zap.String("address", lis.Addr().String()))
zapLoggergRPC.Info("server listener created", zap.String("address", lisAgent.Addr().String()))

core.State.Advance(state.AcceptingInit)
go core.TryJoinCluster(context.Background())

var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
ctx, cancel := registerSignalHandler(context.Background(), zapLoggerCore)
defer cancel()
g, _ := errgroup.WithContext(ctx)
g.Go(func() error {
return grpcServerAgent.Serve(lisAgent)
})
g.Go(func() error {
return grpcServerVM.Serve(lisVM)
})
g.Go(func() error {
return core.TryJoinCluster(context.Background())
})

if err := g.Wait(); err != nil {
zapLoggergRPC.Fatal("server error", zap.Error(err))
}
}

func registerSignalHandler(ctx context.Context, log *zap.Logger) (context.Context, context.CancelFunc) {
ctx, cancelFunc := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
stopped := make(chan struct{}, 1)
done := make(chan struct{}, 1)

go func() {
defer wg.Done()
if err := grpcServer.Serve(lis); err != nil {
zapLoggergRPC.Fatal("failed to serve gRPC", zap.Error(err))
defer func() {
cancelFunc()
stopped <- struct{}{}
}()
select {
case <-ctx.Done():
log.Info("ctrl+c caught, stopping gracefully")
case <-done:
log.Info("done signal received, stopping gracefully")
}
}()

return ctx, func() {
done <- struct{}{}
<-stopped
}
}
12 changes: 6 additions & 6 deletions agent/vm/vmapi/vmapipub.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (

// VMAPI is the interface for the VM API.
type VMAPI interface {
InstallKubernetes(context.Context, string, []byte) error
GetKubernetesConfig(context.Context, string) ([]byte, error)
GetEtcdCredentials(context.Context, string) ([]byte, []byte, error)
InstallKubernetes(context.Context, []byte) error
GetKubernetesConfig(context.Context) ([]byte, error)
GetEtcdCredentials(context.Context) ([]byte, []byte, error)
GetJoinData(ctx context.Context) (*vmproto.GetJoinDataKubeResponse, error)
}

Expand Down Expand Up @@ -116,7 +116,7 @@ type Dialer interface {
}

// InstallKubernetes initializes a kubernetes cluster using the gRPC API.
func (a *APIExternal) InstallKubernetes(ctx context.Context, _ string, kubernetesInitConfiguration []byte) (err error) {
func (a *APIExternal) InstallKubernetes(ctx context.Context, kubernetesInitConfiguration []byte) (err error) {
conn, err := a.dialFirstMaster()
if err != nil {
a.logger.Error("dial", zap.Error(err))
Expand Down Expand Up @@ -168,7 +168,7 @@ func (a *APIExternal) executeKubeadm(ctx context.Context, client vmproto.APIClie
}

// GetKubernetesConfig returns the kubernetes config for the instance.
func (a *APIExternal) GetKubernetesConfig(ctx context.Context, _ string) (output []byte, err error) {
func (a *APIExternal) GetKubernetesConfig(ctx context.Context) (output []byte, err error) {
conn, err := a.dialFirstMaster()
if err != nil {
return nil, err
Expand All @@ -187,7 +187,7 @@ func (a *APIExternal) GetKubernetesConfig(ctx context.Context, _ string) (output
}

// GetEtcdCredentials returns the etcd credentials for the instance.
func (a *APIExternal) GetEtcdCredentials(ctx context.Context, _ string) ([]byte, []byte, error) {
func (a *APIExternal) GetEtcdCredentials(ctx context.Context) ([]byte, []byte, error) {
conn, err := a.dialFirstMaster()
if err != nil {
return nil, nil, err
Expand Down
4 changes: 2 additions & 2 deletions cli/bootstrapper/bootstrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ type Bootstrapper interface {
}

// NewKubernetes creates a new Kubernetes bootstrapper.
func NewKubernetes(log *zap.Logger, controlPlaneEndpoint string, k8sConfig []byte) (Bootstrapper, error) {
instance, err := kubernetes.NewBootstrapper(log, controlPlaneEndpoint, k8sConfig)
func NewKubernetes(log *zap.Logger, k8sConfig []byte) (Bootstrapper, error) {
instance, err := kubernetes.NewBootstrapper(log, k8sConfig)
if err != nil {
return nil, err
}
Expand Down
24 changes: 11 additions & 13 deletions cli/bootstrapper/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,30 @@ import (

// Bootstrapper communicates with the agent inside the control-plane VM after Kubernetes was initialized.
type Bootstrapper struct {
log *zap.Logger
client kubernetes.Interface
adminConf []byte
controlPlaneEndpoint string
k8sConfig []byte
vmAPI vmapi.VMAPI
log *zap.Logger
client kubernetes.Interface
adminConf []byte
k8sConfig []byte
vmAPI vmapi.VMAPI
}

// NewBootstrapper creates a new agent object.
func NewBootstrapper(log *zap.Logger, controlPlaneEndpoint string, k8sConfig []byte) (*Bootstrapper, error) {
func NewBootstrapper(log *zap.Logger, k8sConfig []byte) (*Bootstrapper, error) {
agentLog := log.Named("bootstrapper")
vmapi, err := vmapi.NewExternal(log.Named("vmapi"), &net.Dialer{})
if err != nil {
return nil, err
}
return &Bootstrapper{
log: agentLog,
controlPlaneEndpoint: controlPlaneEndpoint,
k8sConfig: k8sConfig,
vmAPI: vmapi,
log: agentLog,
k8sConfig: k8sConfig,
vmAPI: vmapi,
}, nil
}

// BootstrapKubernetes initializes the kubernetes cluster.
func (a *Bootstrapper) BootstrapKubernetes(ctx context.Context) (*config.EtcdCredentials, error) {
if err := a.vmAPI.InstallKubernetes(ctx, a.controlPlaneEndpoint, a.k8sConfig); err != nil {
if err := a.vmAPI.InstallKubernetes(ctx, a.k8sConfig); err != nil {
return nil, err
}
a.log.Info("kubernetes init successful")
Expand All @@ -53,7 +51,7 @@ func (a *Bootstrapper) BootstrapKubernetes(ctx context.Context) (*config.EtcdCre
return nil, err
}
a.log.Info("kubernetes configured")
caCert, caKey, err := a.vmAPI.GetEtcdCredentials(ctx, a.controlPlaneEndpoint)
caCert, caKey, err := a.vmAPI.GetEtcdCredentials(ctx)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cli/bootstrapper/kubernetes/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

// writeKubeconfigToDisk writes the kubeconfig to disk.
func (a *Bootstrapper) writeKubeconfigToDisk(ctx context.Context) (err error) {
file, err := a.vmAPI.GetKubernetesConfig(ctx, a.controlPlaneEndpoint)
file, err := a.vmAPI.GetKubernetesConfig(ctx)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion cli/infrastructure/qemu/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
_ "embed"
"fmt"
"net"
"strconv"
"time"

"github.com/benschlueter/delegatio/agent/manageapi/manageproto"
Expand Down Expand Up @@ -111,7 +112,8 @@ func (l *LibvirtInstance) blockUntilDelegatioAgentIsReady(ctx context.Context, i
if err != nil {
return err
}
conn, err := grpc.DialContext(ctx, net.JoinHostPort(ip, config.PublicAPIport), grpc.WithTransportCredentials(credentials.NewTLS(tlsconfig)))
// No DNS available, so we have to use the IP
conn, err := grpc.DialContext(ctx, net.JoinHostPort(ip, strconv.Itoa(config.AgentPort)), grpc.WithTransportCredentials(credentials.NewTLS(tlsconfig)))
if err != nil {
return err
}
Expand Down
12 changes: 6 additions & 6 deletions cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package main

import (
"context"
"net"

"github.com/benschlueter/delegatio/cli/bootstrapper"
"github.com/benschlueter/delegatio/cli/infrastructure"
Expand Down Expand Up @@ -45,7 +44,9 @@ func run(ctx context.Context, log *zap.Logger, imageLocation string) error {
}
log.Info("finished infrastructure initialization")
/// --- kubernetes ---
creds, err := bootstrapKubernetes(ctx, log)
controlPlaneIP := definitions.NetworkXMLConfig.IPs[0].Address

creds, err := bootstrapKubernetes(ctx, controlPlaneIP, log)
if err != nil {
log.With(zap.Error(err)).DPanic("bootstrap kubernetes")
}
Expand All @@ -60,13 +61,12 @@ func run(ctx context.Context, log *zap.Logger, imageLocation string) error {
return handleTermination(log, creds)
}

func bootstrapKubernetes(ctx context.Context, log *zap.Logger) (*config.EtcdCredentials, error) {
controlPlaneIP := definitions.NetworkXMLConfig.IPs[0].Address
kubeConf, err := utils.GetKubeInitConfig(controlPlaneIP)
func bootstrapKubernetes(ctx context.Context, loadbalancerIP string, log *zap.Logger) (*config.EtcdCredentials, error) {
kubeConf, err := utils.GetKubeInitConfig(loadbalancerIP)
if err != nil {
log.With(zap.Error(err)).DPanic("failed to get kubeConfig")
}
agent, err := bootstrapper.NewKubernetes(log, net.JoinHostPort(controlPlaneIP, config.PublicAPIport), kubeConf)
agent, err := bootstrapper.NewKubernetes(log, kubeConf)
if err != nil {
log.Error("failed to initialize bootstrapper", zap.Error(err))
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions internal/config/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ const (
Version = "0.0.1"
// DefaultIP is the default IP address to bind to.
DefaultIP = "0.0.0.0"
// PublicAPIport is the port where we can access the public API.
PublicAPIport = "9000"
// GradeAPIport is the port where a client can request grading of the exercises.
GradeAPIport = 9027
// DefaultTimeout for the API.
Expand All @@ -60,8 +58,10 @@ const (
UserNamespace = "users"
// NodeNameEnvVariable is the environment variable name of the node a user-pod is running on.
NodeNameEnvVariable = "NODE_NAME"
// AgentPort is the port where the agent is listening.
// AgentPort is the port where the default agent gRPCs are listening (for VM and Container).
AgentPort = 9000
// VMAgentPort is the port where the VM-specific agent is listening.
VMAgentPort = 9010
// SSHServiceAccountName is the name of the Kubernetes ssh service account with cluster access.
SSHServiceAccountName = "development-ssh"
// SSHPort is the port where the ssh server is listening.
Expand Down
1 change: 1 addition & 0 deletions internal/k8sapi/templates/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func Pod(identifier *config.KubeRessourceIdentifier) *coreAPI.PodSpec {
Exec: &coreAPI.ExecAction{
Command: []string{"whoami"},
},
GRPC: &v1.GRPCAction{},
},
},
VolumeMounts: []coreAPI.VolumeMount{
Expand Down

0 comments on commit d307154

Please sign in to comment.