Skip to content

Commit

Permalink
Merge pull request #74 from ddosify/develop
Browse files Browse the repository at this point in the history
Performance Improvement
  • Loading branch information
fatihbaltaci authored Jan 23, 2024
2 parents e61df08 + 2cc1266 commit 7167336
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 151 deletions.
233 changes: 127 additions & 106 deletions aggregator/data.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion aggregator/sock_num_line.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (nl *SocketLine) DeleteUnused() {
}

// assumedInterval is inversely proportional to the number of requests being discarded
assumedInterval := uint64(3 * time.Minute)
assumedInterval := uint64(1 * time.Minute)

// delete all values that
// closed and its LastMatch + assumedInterval < lastMatchedReqTime
Expand Down
65 changes: 44 additions & 21 deletions datastore/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package datastore

import (
"bytes"
"container/list"
"context"
"encoding/json"
"fmt"
Expand All @@ -12,6 +13,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/ddosify/alaz/config"
Expand Down Expand Up @@ -133,8 +135,10 @@ type BackendDS struct {
reqChanBuffer chan *ReqInfo
reqInfoPool *poolutil.Pool[*ReqInfo]

traceEventChan chan *TraceInfo
traceInfoPool *poolutil.Pool[*TraceInfo]
traceEventQueue *list.List
traceEventMu sync.RWMutex

traceInfoPool *poolutil.Pool[*TraceInfo]

podEventChan chan interface{} // *PodEvent
svcEventChan chan interface{} // *SvcEvent
Expand Down Expand Up @@ -279,7 +283,7 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendConfig) *Backend
epEventChan: make(chan interface{}, 100),
containerEventChan: make(chan interface{}, 100),
dsEventChan: make(chan interface{}, 20),
traceEventChan: make(chan *TraceInfo, 100000), // 8 bytes * 100000 = 0.8 mb
traceEventQueue: list.New(),
}

go ds.sendReqsInBatch(bs)
Expand Down Expand Up @@ -377,6 +381,33 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendConfig) *Backend
return ds
}

func (b *BackendDS) enqueueTraceInfo(traceInfo *TraceInfo) {
b.traceEventMu.Lock()
defer b.traceEventMu.Unlock()
b.traceEventQueue.PushBack(traceInfo)
}

func (b *BackendDS) dequeueTraceEvents(batchSize uint64) []*TraceInfo {
b.traceEventMu.Lock()
defer b.traceEventMu.Unlock()

batch := make([]*TraceInfo, 0, batchSize)

for i := 0; i < int(batchSize); i++ {
if b.traceEventQueue.Len() == 0 {
return batch
}

elem := b.traceEventQueue.Front()
b.traceEventQueue.Remove(elem)
tInfo, _ := elem.Value.(*TraceInfo)

batch = append(batch, tInfo)
}

return batch
}

func (b *BackendDS) DoRequest(req *http.Request) error {
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
Expand Down Expand Up @@ -446,21 +477,11 @@ func (b *BackendDS) sendToBackend(method string, payload interface{}, endpoint s
}

func (b *BackendDS) sendTraceEventsInBatch(batchSize uint64) {
t := time.NewTicker(5 * time.Second)
t := time.NewTicker(1 * time.Second)
defer t.Stop()

send := func() {
batch := make([]*TraceInfo, 0, batchSize)
loop := true

for i := 0; (i < int(batchSize)) && loop; i++ {
select {
case trace := <-b.traceEventChan:
batch = append(batch, trace)
case <-time.After(50 * time.Millisecond):
loop = false
}
}
batch := b.dequeueTraceEvents(batchSize)

if len(batch) == 0 {
return
Expand Down Expand Up @@ -640,7 +661,7 @@ func (b *BackendDS) PersistTraceEvent(trace *l7_req.TraceEvent) error {

t[3] = ingress

go func() { b.traceEventChan <- t }()
b.enqueueTraceInfo(t)
return nil
}

Expand Down Expand Up @@ -686,7 +707,7 @@ func (b *BackendDS) PersistContainer(c Container, eventType string) error {
return nil
}

func (b *BackendDS) SendHealthCheck(ebpf bool, metrics bool, k8sVersion string) {
func (b *BackendDS) SendHealthCheck(ebpf bool, metrics bool, dist bool, k8sVersion string) {
t := time.NewTicker(10 * time.Second)
defer t.Stop()

Expand All @@ -699,11 +720,13 @@ func (b *BackendDS) SendHealthCheck(ebpf bool, metrics bool, k8sVersion string)
AlazVersion: tag,
},
Info: struct {
EbpfEnabled bool `json:"ebpf"`
MetricsEnabled bool `json:"metrics"`
EbpfEnabled bool `json:"ebpf"`
MetricsEnabled bool `json:"metrics"`
DistTracingEnabled bool `json:"traffic"`
}{
EbpfEnabled: ebpf,
MetricsEnabled: metrics,
EbpfEnabled: ebpf,
MetricsEnabled: metrics,
DistTracingEnabled: dist,
},
Telemetry: struct {
KernelVersion string `json:"kernel_version"`
Expand Down
5 changes: 3 additions & 2 deletions datastore/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ type Metadata struct {
type HealthCheckPayload struct {
Metadata Metadata `json:"metadata"`
Info struct {
EbpfEnabled bool `json:"ebpf"`
MetricsEnabled bool `json:"metrics"`
EbpfEnabled bool `json:"ebpf"`
MetricsEnabled bool `json:"metrics"`
DistTracingEnabled bool `json:"traffic"`
} `json:"alaz_info"`
Telemetry struct {
KernelVersion string `json:"kernel_version"`
Expand Down
9 changes: 8 additions & 1 deletion ebpf/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type EbpfCollector struct {
ctx context.Context
done chan struct{}
ebpfEvents chan interface{}
ebpfProcEvents chan interface{}
tlsAttachQueue chan uint32

// TODO: objectify l7_req and tcp_state
Expand All @@ -62,6 +63,7 @@ func NewEbpfCollector(parentCtx context.Context) *EbpfCollector {
ctx: ctx,
done: make(chan struct{}),
ebpfEvents: make(chan interface{}, 100000), // interface is 16 bytes, 16 * 100000 = 8 Megabytes
ebpfProcEvents: make(chan interface{}, 100),
tlsPidMap: make(map[uint32]struct{}),
sslWriteUprobes: make(map[uint32]link.Link),
sslReadEnterUprobes: make(map[uint32]link.Link),
Expand All @@ -81,6 +83,10 @@ func (e *EbpfCollector) EbpfEvents() chan interface{} {
return e.ebpfEvents
}

func (e *EbpfCollector) EbpfProcEvents() chan interface{} {
return e.ebpfProcEvents
}

func (e *EbpfCollector) Deploy() {
// load programs and convert them to user space structs
go e.AttachUprobesForEncrypted()
Expand All @@ -103,7 +109,7 @@ func (e *EbpfCollector) Deploy() {
}()
go func() {
defer wg.Done()
proc.DeployAndWait(e.ctx, e.ebpfEvents)
proc.DeployAndWait(e.ctx, e.ebpfProcEvents)
}()
wg.Wait()

Expand All @@ -117,6 +123,7 @@ func (e *EbpfCollector) Deploy() {
func (e *EbpfCollector) close() {
log.Logger.Info().Msg("closing ebpf links")
close(e.ebpfEvents)
close(e.ebpfProcEvents)

e.probesMu.Lock()
defer e.probesMu.Unlock()
Expand Down
4 changes: 4 additions & 0 deletions ebpf/l7_req/http2.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ int is_http2_magic_2(char *buf){

static __always_inline
int is_http2_frame(char *buf, __u64 size) {
if (size < 9) {
return 0;
}

// magic message is not a frame
if (is_http2_magic_2(buf)){
return 1;
Expand Down
53 changes: 37 additions & 16 deletions ebpf/l7_req/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"os"
"strconv"
"time"
"unsafe"

Expand All @@ -14,6 +15,23 @@ import (
"github.com/cilium/ebpf/rlimit"
)

var DIST_TRACING_ENABLED bool

func init() {
DIST_TRACING_ENABLED = false

flag := os.Getenv("DIST_TRACING_ENABLED")

if flag != "" {
enabled, err := strconv.ParseBool(flag)
if err != nil {
log.Logger.Warn().Str("flag", "DIST_TRACING_ENABLED").Err(err).Msg("flag set incorrect")
DIST_TRACING_ENABLED = false
}
DIST_TRACING_ENABLED = enabled
}
}

// match with values in l7_req.c
const (
BPF_L7_PROTOCOL_UNKNOWN = iota
Expand Down Expand Up @@ -363,7 +381,7 @@ func DeployAndWait(parentCtx context.Context, ch chan interface{}) {
}()

// initialize perf event readers
l7Events, err := perf.NewReader(L7BpfProgsAndMaps.L7Events, 512*os.Getpagesize())
l7Events, err := perf.NewReader(L7BpfProgsAndMaps.L7Events, 4096*os.Getpagesize())
if err != nil {
log.Logger.Fatal().Err(err).Msg("error creating perf event array reader")
}
Expand All @@ -381,12 +399,12 @@ func DeployAndWait(parentCtx context.Context, ch chan interface{}) {
logs.Close()
}()

distTraceCalls, err := perf.NewReader(L7BpfProgsAndMaps.IngressEgressCalls, 512*os.Getpagesize())
distTraceCalls, err := perf.NewReader(L7BpfProgsAndMaps.IngressEgressCalls, 4096*os.Getpagesize())
if err != nil {
log.Logger.Fatal().Err(err).Msg("error creating ringbuf reader")
log.Logger.Fatal().Err(err).Msg("error creating perf reader")
}
defer func() {
log.Logger.Info().Msg("closing distTraceCalls ringbuf reader")
log.Logger.Info().Msg("closing distTraceCalls perf reader")
distTraceCalls.Close()
}()

Expand All @@ -404,11 +422,11 @@ func DeployAndWait(parentCtx context.Context, ch chan interface{}) {
}

if record.LostSamples != 0 {
log.Logger.Warn().Msgf("lost #%d samples due to ring buffer's full", record.LostSamples)
log.Logger.Warn().Msgf("lost #%d bpf logs", record.LostSamples)
}

if record.RawSample == nil || len(record.RawSample) == 0 {
log.Logger.Warn().Msgf("read empty record from perf array")
log.Logger.Warn().Msgf("read empty record from log perf array")
return
}

Expand Down Expand Up @@ -584,22 +602,25 @@ func DeployAndWait(parentCtx context.Context, ch chan interface{}) {
log.Logger.Warn().Msgf("lost samples dist-trace %d", record.LostSamples)
}

// TODO: investigate why this is happening
if record.RawSample == nil || len(record.RawSample) == 0 {
log.Logger.Warn().Msgf("read sample dist-trace nil or empty")
return
}

bpfTraceEvent := (*bpfTraceEvent)(unsafe.Pointer(&record.RawSample[0]))

traceEvent := TraceEvent{
Pid: bpfTraceEvent.Pid,
Tid: bpfTraceEvent.Tid,
Tx: time.Now().UnixMilli(),
Type_: bpfTraceEvent.Type_,
Seq: bpfTraceEvent.Seq,
// TODO: we need to compile bpf bytecode accordingly and select the one complies with the flag to attach into kernel.
// That way, we'll not send data from bpf that will not get into processing in user space.
if DIST_TRACING_ENABLED {
bpfTraceEvent := (*bpfTraceEvent)(unsafe.Pointer(&record.RawSample[0]))

traceEvent := TraceEvent{
Pid: bpfTraceEvent.Pid,
Tid: bpfTraceEvent.Tid,
Tx: time.Now().UnixMilli(),
Type_: bpfTraceEvent.Type_,
Seq: bpfTraceEvent.Seq,
}
ch <- &traceEvent
}
ch <- &traceEvent
}
for {
select {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ require (
golang.org/x/sys v0.15.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/time v0.3.0
golang.org/x/tools v0.11.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.31.0 // indirect
Expand Down
12 changes: 10 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,16 @@ func main() {
go k8sCollector.Init(kubeEvents)
}

ebpfEnabled, _ := strconv.ParseBool(os.Getenv("EBPF_ENABLED"))
// EBPF_ENABLED changed to SERVICE_MAP_ENABLED
// for backwards compatibility
ebpfEnabled, err := strconv.ParseBool(os.Getenv("SERVICE_MAP_ENABLED"))
if err != nil {
// if SERVICE_MAP_ENABLED not given, check EBPF_ENABLED
ebpfEnabled, _ = strconv.ParseBool(os.Getenv("EBPF_ENABLED"))
}

metricsEnabled, _ := strconv.ParseBool(os.Getenv("METRICS_ENABLED"))
distTracingEnabled, _ := strconv.ParseBool(os.Getenv("DIST_TRACING_ENABLED"))

// datastore backend
dsBackend := datastore.NewBackendDS(ctx, config.BackendConfig{
Expand All @@ -57,7 +65,7 @@ func main() {
MetricsExport: metricsEnabled,
MetricsExportInterval: 10,
})
go dsBackend.SendHealthCheck(ebpfEnabled, metricsEnabled, k8sVersion)
go dsBackend.SendHealthCheck(ebpfEnabled, metricsEnabled, distTracingEnabled, k8sVersion)

// deploy ebpf programs
var ec *ebpf.EbpfCollector
Expand Down
4 changes: 3 additions & 1 deletion resources/alaz.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,12 @@ spec:
hostPID: true
containers:
- env:
- name: EBPF_ENABLED
- name: SERVICE_MAP_ENABLED
value: "true"
- name: METRICS_ENABLED
value: "true"
- name: DIST_TRACING_ENABLED
value: "true"
- name: BACKEND_HOST
value: https://api.ddosify.com:443
- name: LOG_LEVEL
Expand Down

0 comments on commit 7167336

Please sign in to comment.