Skip to content

Commit

Permalink
Merge pull request #79 from ddosify/develop
Browse files Browse the repository at this point in the history
Feat/testbed
  • Loading branch information
fatihbaltaci authored Feb 7, 2024
2 parents b709e4d + a649900 commit a682796
Show file tree
Hide file tree
Showing 16 changed files with 1,220 additions and 425 deletions.
293 changes: 195 additions & 98 deletions aggregator/data.go

Large diffs are not rendered by default.

24 changes: 12 additions & 12 deletions aggregator/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,19 @@ func (a *Aggregator) processPod(d k8s.K8sResourceMessage) {

switch d.EventType {
case k8s.ADD:
a.clusterInfo.mu.Lock()
a.clusterInfo.k8smu.Lock()
a.clusterInfo.PodIPToPodUid[pod.Status.PodIP] = pod.UID
a.clusterInfo.mu.Unlock()
a.clusterInfo.k8smu.Unlock()
go a.persistPod(dtoPod, ADD)
case k8s.UPDATE:
a.clusterInfo.mu.Lock()
a.clusterInfo.k8smu.Lock()
a.clusterInfo.PodIPToPodUid[pod.Status.PodIP] = pod.UID
a.clusterInfo.mu.Unlock()
a.clusterInfo.k8smu.Unlock()
go a.persistPod(dtoPod, UPDATE)
case k8s.DELETE:
a.clusterInfo.mu.Lock()
a.clusterInfo.k8smu.Lock()
delete(a.clusterInfo.PodIPToPodUid, pod.Status.PodIP)
a.clusterInfo.mu.Unlock()
a.clusterInfo.k8smu.Unlock()
go a.persistPod(dtoPod, DELETE)
}
}
Expand Down Expand Up @@ -110,19 +110,19 @@ func (a *Aggregator) processSvc(d k8s.K8sResourceMessage) {

switch d.EventType {
case k8s.ADD:
a.clusterInfo.mu.Lock()
a.clusterInfo.k8smu.Lock()
a.clusterInfo.ServiceIPToServiceUid[service.Spec.ClusterIP] = service.UID
a.clusterInfo.mu.Unlock()
a.clusterInfo.k8smu.Unlock()
go a.persistSvc(dtoSvc, ADD)
case k8s.UPDATE:
a.clusterInfo.mu.Lock()
a.clusterInfo.k8smu.Lock()
a.clusterInfo.ServiceIPToServiceUid[service.Spec.ClusterIP] = service.UID
a.clusterInfo.mu.Unlock()
a.clusterInfo.k8smu.Unlock()
go a.persistSvc(dtoSvc, UPDATE)
case k8s.DELETE:
a.clusterInfo.mu.Lock()
a.clusterInfo.k8smu.Lock()
delete(a.clusterInfo.ServiceIPToServiceUid, service.Spec.ClusterIP)
a.clusterInfo.mu.Unlock()
a.clusterInfo.k8smu.Unlock()
go a.persistSvc(dtoSvc, DELETE)
}
}
Expand Down
4 changes: 3 additions & 1 deletion aggregator/sock_num_line.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ func NewSocketLine(pid uint32, fd uint64) *SocketLine {
fd: fd,
Values: make([]TimestampedSocket, 0),
}
go skLine.DeleteUnused()

return skLine
}
Expand Down Expand Up @@ -107,6 +106,9 @@ func (nl *SocketLine) DeleteUnused() {
}

if lastMatchedReqTime == 0 {
// in case of tracking only tcp sockets without any requests matching them, socketLine will consume memory over time
// we need to delete all values in this case
nl.Values = make([]TimestampedSocket, 0)
return
}

Expand Down
5 changes: 3 additions & 2 deletions config/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ type PostgresConfig struct {
DBName string
}

type BackendConfig struct {
type BackendDSConfig struct {
Host string
Port string
MetricsExport bool
MetricsExportInterval int // in seconds

ReqBufferSize int
}
11 changes: 8 additions & 3 deletions datastore/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ var kernelVersion string
var cloudProvider CloudProvider

func init() {

TestMode := os.Getenv("TEST_MODE")
if TestMode == "true" {
return
}

MonitoringID = os.Getenv("MONITORING_ID")
if MonitoringID == "" {
log.Logger.Fatal().Msg("MONITORING_ID is not set")
Expand Down Expand Up @@ -186,7 +192,7 @@ func (ll LeveledLogger) Warn(msg string, keysAndValues ...interface{}) {
ll.l.Warn().Fields(keysAndValues).Msg(msg)
}

func NewBackendDS(parentCtx context.Context, conf config.BackendConfig) *BackendDS {
func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *BackendDS {
ctx, _ := context.WithCancel(parentCtx)
rand.Seed(time.Now().UnixNano())

Expand Down Expand Up @@ -270,12 +276,11 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendConfig) *Backend
ds := &BackendDS{
ctx: ctx,
host: conf.Host,
port: conf.Port,
c: client,
batchSize: bs,
reqInfoPool: newReqInfoPool(func() *ReqInfo { return &ReqInfo{} }, func(r *ReqInfo) {}),
traceInfoPool: newTraceInfoPool(func() *TraceInfo { return &TraceInfo{} }, func(r *TraceInfo) {}),
reqChanBuffer: make(chan *ReqInfo, 40000),
reqChanBuffer: make(chan *ReqInfo, conf.ReqBufferSize),
podEventChan: make(chan interface{}, 100),
svcEventChan: make(chan interface{}, 100),
rsEventChan: make(chan interface{}, 100),
Expand Down
48 changes: 0 additions & 48 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package datastore

import (
"github.com/ddosify/alaz/ebpf/l7_req"
"github.com/ddosify/alaz/log"
)

type DataStore interface {
Expand All @@ -18,50 +17,3 @@ type DataStore interface {

PersistTraceEvent(trace *l7_req.TraceEvent) error
}

type MockDataStore struct {
}

func (m *MockDataStore) PersistPod(pod Pod, eventType string) error {
log.Logger.Debug().Str("pod", pod.Name).Msg("PersistPod")
return nil
}

func (m *MockDataStore) PersistService(service Service, eventType string) error {
log.Logger.Debug().Str("service", service.Name).Msg("PersistService")
return nil
}

func (m *MockDataStore) PersistReplicaSet(rs ReplicaSet, eventType string) error {
log.Logger.Debug().Str("replicaset", rs.Name).Msg("PersistReplicaSet")
return nil
}

func (m *MockDataStore) PersistDeployment(d Deployment, eventType string) error {
log.Logger.Debug().Str("deployment", d.Name).Msg("PersistDeployment")
return nil
}

func (m *MockDataStore) PersistEndpoints(e Endpoints, eventType string) error {
log.Logger.Debug().Str("endpoints", e.Name).Msg("PersistEndpoints")
return nil
}

func (m *MockDataStore) PersistContainer(c Container, eventType string) error {
log.Logger.Debug().Str("container", c.Name).Msg("PersistContainer")
return nil
}

func (m *MockDataStore) PersistDaemonSet(ds DaemonSet, eventType string) error {
log.Logger.Debug().Str("daemonset", ds.Name).Msg("PersistDaemonSet")
return nil
}

func (m *MockDataStore) PersistRequest(request *Request) error {
log.Logger.Debug().Bool("isTls", request.Tls).Str("path", request.Path).Msg("PersistRequest")
return nil
}

func (m *MockDataStore) PersistTraceEvent(trace *l7_req.TraceEvent) error {
return nil
}
11 changes: 11 additions & 0 deletions ebpf/bpf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package ebpf

import "context"

type Program interface {
Load() // Load bpf program to kernel
Attach() // attach links to programs, in case error process must exit
InitMaps() // initialize bpf map readers, must be called before Consume
Consume(ctx context.Context, ch chan interface{}) // consume bpf events, publishes to chan provided
Close() // release resources
}
64 changes: 39 additions & 25 deletions ebpf/deploy.go → ebpf/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ type EbpfCollector struct {
done chan struct{}
ebpfEvents chan interface{}
ebpfProcEvents chan interface{}
ebpfTcpEvents chan interface{}

tlsAttachQueue chan uint32

// TODO: objectify l7_req and tcp_state
bpfPrograms map[string]Program

sslWriteUprobes map[uint32]link.Link
sslReadEnterUprobes map[uint32]link.Link
Expand All @@ -59,11 +61,19 @@ type EbpfCollector struct {
func NewEbpfCollector(parentCtx context.Context) *EbpfCollector {
ctx, _ := context.WithCancel(parentCtx)

bpfPrograms := make(map[string]Program)

// initialize bpfPrograms
bpfPrograms["tcp_state_prog"] = tcp_state.InitTcpStateProg(nil)
bpfPrograms["l7_prog"] = l7_req.InitL7Prog(nil)
bpfPrograms["proc_prog"] = proc.InitProcProg(nil)

return &EbpfCollector{
ctx: ctx,
done: make(chan struct{}),
ebpfEvents: make(chan interface{}, 100000), // interface is 16 bytes, 16 * 100000 = 8 Megabytes
ebpfProcEvents: make(chan interface{}, 100),
ebpfTcpEvents: make(chan interface{}, 1000),
tlsPidMap: make(map[uint32]struct{}),
sslWriteUprobes: make(map[uint32]link.Link),
sslReadEnterUprobes: make(map[uint32]link.Link),
Expand All @@ -72,6 +82,7 @@ func NewEbpfCollector(parentCtx context.Context) *EbpfCollector {
goTlsReadUprobes: make(map[uint32]link.Link),
goTlsReadUretprobes: make(map[uint32][]link.Link),
tlsAttachQueue: make(chan uint32, 10),
bpfPrograms: bpfPrograms,
}
}

Expand All @@ -87,43 +98,46 @@ func (e *EbpfCollector) EbpfProcEvents() chan interface{} {
return e.ebpfProcEvents
}

func (e *EbpfCollector) Deploy() {
// load programs and convert them to user space structs
go e.AttachUprobesForEncrypted()
func (e *EbpfCollector) EbpfTcpEvents() chan interface{} {
return e.ebpfTcpEvents
}

tcp_state.LoadBpfObjects()
l7_req.LoadBpfObjects()
proc.LoadBpfObjects()
func (e *EbpfCollector) TlsAttachQueue() chan uint32 {
return e.tlsAttachQueue
}

// function to version to program
func (e *EbpfCollector) Init() {
for _, p := range e.bpfPrograms {
p.Load()
p.Attach()
p.InitMaps()
}

wg := sync.WaitGroup{}
wg.Add(3)
go func() {
defer wg.Done()
tcp_state.DeployAndWait(e.ctx, e.ebpfEvents)
}()
go func() {
defer wg.Done()
l7_req.DeployAndWait(e.ctx, e.ebpfEvents)
}()
go func() {
defer wg.Done()
proc.DeployAndWait(e.ctx, e.ebpfProcEvents)
<-e.ctx.Done()
e.close()
close(e.done)
}()
wg.Wait()
}

log.Logger.Info().Msg("reading ebpf maps stopped")
e.close()
close(e.done)
func (e *EbpfCollector) ListenEvents() {
go e.bpfPrograms["tcp_state_prog"].Consume(e.ctx, e.ebpfTcpEvents)
go e.bpfPrograms["l7_prog"].Consume(e.ctx, e.ebpfEvents)
go e.bpfPrograms["proc_prog"].Consume(e.ctx, e.ebpfProcEvents)

// go listenDebugMsgs()
go e.AttachUprobesForEncrypted()
}

func (e *EbpfCollector) close() {
log.Logger.Info().Msg("closing ebpf links")

for _, p := range e.bpfPrograms {
p.Close()
}

close(e.ebpfEvents)
close(e.ebpfProcEvents)
close(e.ebpfTcpEvents)

e.probesMu.Lock()
defer e.probesMu.Unlock()
Expand Down
Loading

0 comments on commit a682796

Please sign in to comment.