Skip to content

Commit

Permalink
Merge pull request #71 from ddosify/develop
Browse files Browse the repository at this point in the history
Distributed Tracing Init
  • Loading branch information
fatihbaltaci authored Jan 11, 2024
2 parents ec2d8fd + 9ab170c commit f025174
Show file tree
Hide file tree
Showing 21 changed files with 638 additions and 102 deletions.
128 changes: 95 additions & 33 deletions aggregator/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import (
"fmt"
"net"
"os/exec"
"regexp"
"strconv"
"strings"
"sync"
"syscall"
"time"

"github.com/ddosify/alaz/datastore"
Expand Down Expand Up @@ -126,8 +128,20 @@ var usePgDs bool = false
var useBackendDs bool = true // default to true
var reverseDnsCache *cache.Cache

var re *regexp.Regexp

func init() {
reverseDnsCache = cache.New(defaultExpiration, purgeTime)

keywords := []string{"SELECT", "INSERT", "UPDATE", "DELETE", "CREATE", "DROP", "ALTER", "FROM", "WHERE", "JOIN", "INNER", "OUTER", "LEFT", "RIGHT", "GROUP", "BY", "ORDER", "HAVING", "UNION", "ALL", "BEGIN", "COMMIT"}

// Case-insensitive matching
re = regexp.MustCompile(strings.Join(keywords, "|"))
}

// Check if a string contains SQL keywords
func containsSQLKeywords(input string) bool {
return re.MatchString(strings.ToUpper(input))
}

func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{}, ec *ebpf.EbpfCollector, ds datastore.DataStore) *Aggregator {
Expand Down Expand Up @@ -182,6 +196,41 @@ func (a *Aggregator) Run() {
}

}()
go func() {
// every 5 minutes, check alive processes, and clear the ones left behind
// since we process events concurrently, some short-lived processes exit event can come before exec events
// this causes zombie http2 workers

t := time.NewTicker(2 * time.Minute)
defer t.Stop()

for range t.C {
a.liveProcessesMu.Lock()

for pid, _ := range a.liveProcesses {
// https://man7.org/linux/man-pages/man2/kill.2.html
// If sig is 0, then no signal is sent, but existence and permission
// checks are still performed; this can be used to check for the
// existence of a process ID or process group ID that the caller is
// permitted to signal.

err := syscall.Kill(int(pid), 0)
if err != nil {
// pid does not exist
delete(a.liveProcesses, pid)

a.clusterInfo.mu.Lock()
delete(a.clusterInfo.PidToSocketMap, pid)
a.clusterInfo.mu.Unlock()

// close http2Worker if exist
a.stopHttp2Worker(pid)
}
}

a.liveProcessesMu.Unlock()
}
}()
go a.processk8s()

numWorker := 100
Expand Down Expand Up @@ -234,45 +283,43 @@ func (a *Aggregator) processEbpf(ctx context.Context) {
}
switch bpfEvent.Type() {
case tcp_state.TCP_CONNECT_EVENT:
d := data.(tcp_state.TcpConnectEvent) // copy data's value
tcpConnectEvent := tcp_state.TcpConnectEvent{
Fd: d.Fd,
Timestamp: d.Timestamp,
Type_: d.Type_,
Pid: d.Pid,
SPort: d.SPort,
DPort: d.DPort,
SAddr: d.SAddr,
DAddr: d.DAddr,
}
go a.processTcpConnect(tcpConnectEvent)
d := data.(*tcp_state.TcpConnectEvent) // copy data's value
go a.processTcpConnect(d)
case l7_req.L7_EVENT:
d := data.(*l7_req.L7Event) // copy data's value
go a.processL7(ctx, d)
case proc.PROC_EVENT:
d := data.(proc.ProcEvent) // copy data's value
d := data.(*proc.ProcEvent) // copy data's value
if d.Type_ == proc.EVENT_PROC_EXEC {
go a.processExec(d)
} else if d.Type_ == proc.EVENT_PROC_EXIT {
go a.processExit(d)
go a.stopHttp2Worker(d.Pid)
go a.processExit(d.Pid)
}
case l7_req.TRACE_EVENT:
d := data.(*l7_req.TraceEvent)
a.ds.PersistTraceEvent(d)
}

}
}
}

func (a *Aggregator) processExec(d proc.ProcEvent) {
func (a *Aggregator) processExec(d *proc.ProcEvent) {
a.liveProcessesMu.Lock()
a.liveProcesses[d.Pid] = struct{}{}
a.liveProcessesMu.Unlock()
}

func (a *Aggregator) processExit(d proc.ProcEvent) {
func (a *Aggregator) processExit(pid uint32) {
a.liveProcessesMu.Lock()
delete(a.liveProcesses, d.Pid)
delete(a.liveProcesses, pid)
a.liveProcessesMu.Unlock()

a.clusterInfo.mu.Lock()
delete(a.clusterInfo.PidToSocketMap, pid)
a.clusterInfo.mu.Unlock()

// close http2Worker if exist
a.stopHttp2Worker(pid)
}

func (a *Aggregator) stopHttp2Worker(pid uint32) {
Expand All @@ -297,8 +344,7 @@ func (a *Aggregator) stopHttp2Worker(pid uint32) {
}
}

func (a *Aggregator) processTcpConnect(data interface{}) {
d := data.(tcp_state.TcpConnectEvent)
func (a *Aggregator) processTcpConnect(d *tcp_state.TcpConnectEvent) {
go a.ec.ListenForEncryptedReqs(d.Pid)
if d.Type_ == tcp_state.EVENT_TCP_ESTABLISHED {
// filter out localhost connections
Expand Down Expand Up @@ -482,7 +528,7 @@ func (a *Aggregator) processHttp2Frames(pid uint32, ch chan *l7_req.L7Event) {
return
}

req.StartTime = time.Now().UnixMilli()
req.StartTime = d.EventReadTime
req.Latency = d.WriteTimeNs - req.Latency
req.Completed = true
req.FromIP = skInfo.Saddr
Expand Down Expand Up @@ -773,6 +819,13 @@ func (a *Aggregator) processL7(ctx context.Context, d *l7_req.L7Event) {
var ok bool
var ch chan *l7_req.L7Event

a.liveProcessesMu.RLock()
_, ok = a.liveProcesses[d.Pid]
a.liveProcessesMu.RUnlock()
if !ok {
return // if a late event comes, do not create parsers and new worker to avoid memory leak
}

connKey := a.getConnKey(d.Pid, d.Fd)
a.h2ParserMu.RLock()
_, ok = a.h2Parsers[connKey]
Expand All @@ -791,13 +844,6 @@ func (a *Aggregator) processL7(ctx context.Context, d *l7_req.L7Event) {
ch, ok = a.h2Ch[d.Pid]
a.h2ChMu.RUnlock()
if !ok {
a.liveProcessesMu.RLock()
_, ok = a.liveProcesses[d.Pid]
a.liveProcessesMu.RUnlock()
if !ok {
return // if a late event comes, we do not open a new worker to avoid memory leak
}

// initialize channel
h2ChPid := make(chan *l7_req.L7Event, 100)
a.h2ChMu.Lock()
Expand All @@ -820,7 +866,7 @@ func (a *Aggregator) processL7(ctx context.Context, d *l7_req.L7Event) {
// TCP events and L7 events can be processed out of order

reqDto := datastore.Request{
StartTime: time.Now().UnixMilli(),
StartTime: d.EventReadTime,
Latency: d.Duration,
FromIP: skInfo.Saddr,
ToIP: skInfo.Daddr,
Expand All @@ -830,13 +876,21 @@ func (a *Aggregator) processL7(ctx context.Context, d *l7_req.L7Event) {
StatusCode: d.Status,
FailReason: "",
Method: d.Method,
Tid: d.Tid,
Seq: d.Seq,
}

if d.Protocol == l7_req.L7_PROTOCOL_POSTGRES && d.Method == l7_req.SIMPLE_QUERY {
// parse sql command from payload
// path = sql command
// method = sql message type
reqDto.Path = parseSqlCommand(d.Payload[0:d.PayloadSize])
var err error
reqDto.Path, err = parseSqlCommand(d.Payload[0:d.PayloadSize])
if err != nil {
log.Logger.Error().AnErr("err", err)
return
}

}
var reqHostHeader string
// parse http payload, extract path, query params, headers
Expand Down Expand Up @@ -981,7 +1035,8 @@ func (a *Aggregator) findRelatedSocket(ctx context.Context, d *l7_req.L7Event) *
return skInfo

}
func parseSqlCommand(r []uint8) string {

func parseSqlCommand(r []uint8) (string, error) {
// Q, 4 bytes of length, sql command

// skip Q, (simple query)
Expand All @@ -993,7 +1048,14 @@ func parseSqlCommand(r []uint8) string {
// get sql command
sqlStatement := string(r)

return sqlStatement
// garbage data can come for postgres, we need to filter out
// search statement for sql keywords like
if containsSQLKeywords(sqlStatement) {
return sqlStatement, nil
} else {
return "", fmt.Errorf("no sql command found")
}

}

func (a *Aggregator) clearSocketLines(ctx context.Context) {
Expand Down
3 changes: 1 addition & 2 deletions aggregator/sock_num_line.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (nl *SocketLine) DeleteUnused() {
}

// assumedInterval is inversely proportional to the number of requests being discarded
assumedInterval := uint64(5 * time.Minute)
assumedInterval := uint64(3 * time.Minute)

// delete all values that
// closed and its LastMatch + assumedInterval < lastMatchedReqTime
Expand Down Expand Up @@ -155,7 +155,6 @@ func (nl *SocketLine) GetAlreadyExistingSockets() {
fdDir := strings.Join([]string{"/proc", fmt.Sprint(nl.pid), "fd"}, "/")
fdEntries, err := os.ReadDir(fdDir)
if err != nil {
log.Logger.Warn().Err(err).Msgf("failed to read directory %s", fdDir)
return
}

Expand Down
Loading

0 comments on commit f025174

Please sign in to comment.