Skip to content

Commit

Permalink
Merge pull request #86 from ddosify/develop
Browse files Browse the repository at this point in the history
Send Alive Connections & capture prepared stmts with parse and bind frames
  • Loading branch information
fatihbaltaci authored Feb 11, 2024
2 parents a9fb231 + 35b6e8a commit 6fd6daf
Show file tree
Hide file tree
Showing 12 changed files with 568 additions and 140 deletions.
322 changes: 285 additions & 37 deletions aggregator/data.go

Large diffs are not rendered by default.

122 changes: 122 additions & 0 deletions aggregator/pg_test.go

Large diffs are not rendered by default.

87 changes: 0 additions & 87 deletions aggregator/sock_num_line.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,11 @@ import (
"fmt"
"net"
"os"
"path"
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/ddosify/alaz/log"

"inet.af/netaddr"
)

Expand Down Expand Up @@ -105,13 +101,6 @@ func (nl *SocketLine) DeleteUnused() {
}
}

if lastMatchedReqTime == 0 {
// in case of tracking only tcp sockets without any requests matching them, socketLine will consume memory over time
// we need to delete all values in this case
nl.Values = make([]TimestampedSocket, 0)
return
}

// assumedInterval is inversely proportional to the number of requests being discarded
assumedInterval := uint64(1 * time.Minute)

Expand All @@ -130,82 +119,6 @@ func (nl *SocketLine) DeleteUnused() {

}

func (nl *SocketLine) GetAlreadyExistingSockets() {
nl.mu.Lock()
defer nl.mu.Unlock()

log.Logger.Debug().Msgf("getting already existing sockets for pid %d, fd %d", nl.pid, nl.fd)

socks := map[string]sock{}

// Get the sockets for the process.
var err error
for _, f := range []string{"tcp", "tcp6"} {
sockPath := strings.Join([]string{"/proc", fmt.Sprint(nl.pid), "net", f}, "/")

ss, err := readSockets(sockPath)
if err != nil {
continue
}

for _, s := range ss {
socks[s.Inode] = sock{TcpSocket: s}
}
}

// Get the file descriptors for the process.
fdDir := strings.Join([]string{"/proc", fmt.Sprint(nl.pid), "fd"}, "/")
fdEntries, err := os.ReadDir(fdDir)
if err != nil {
return
}

fds := make([]Fd, 0, len(fdEntries))
for _, entry := range fdEntries {
fd, err := strconv.ParseUint(entry.Name(), 10, 64)
if err != nil {
log.Logger.Warn().Err(err).Uint32("pid", nl.pid).
Uint64("fd", nl.fd).Msgf("failed to parse %s as uint", entry.Name())
continue
}
dest, err := os.Readlink(path.Join(fdDir, entry.Name()))
if err != nil {
log.Logger.Warn().Err(err).
Uint32("pid", nl.pid).
Uint64("fd", nl.fd).Msgf("failed to read link %s", path.Join(fdDir, entry.Name()))
continue
}
var socketInode string
if strings.HasPrefix(dest, "socket:[") && strings.HasSuffix(dest, "]") {
socketInode = dest[len("socket:[") : len(dest)-1]
}
fds = append(fds, Fd{Fd: fd, Dest: dest, SocketInode: socketInode})
}

// Match the sockets to the file descriptors.
for _, fd := range fds {
if fd.SocketInode != "" && nl.fd == fd.Fd {
// add to values
s := socks[fd.SocketInode].TcpSocket
ts := TimestampedSocket{
Timestamp: 0, // start time unknown
LastMatch: 0,
SockInfo: &SockInfo{
Pid: nl.pid,
Fd: fd.Fd,
Saddr: s.SAddr.IP().String(),
Sport: s.SAddr.Port(),
Daddr: s.DAddr.IP().String(),
Dport: s.DAddr.Port(),
},
}
log.Logger.Debug().Any("skInfo", ts).Uint32("pid", nl.pid).
Uint64("fd", nl.fd).Msg("adding already established socket")
nl.Values = append(nl.Values, ts)
}
}
}

type sock struct {
pid uint32
fd uint64
Expand Down
3 changes: 2 additions & 1 deletion config/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ type BackendDSConfig struct {
MetricsExport bool
MetricsExportInterval int // in seconds

ReqBufferSize int
ReqBufferSize int
ConnBufferSize int
}
115 changes: 102 additions & 13 deletions datastore/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,10 @@ type BackendDS struct {
c *http.Client
batchSize uint64

reqChanBuffer chan *ReqInfo
reqInfoPool *poolutil.Pool[*ReqInfo]
reqChanBuffer chan *ReqInfo
connChanBuffer chan *ConnInfo
reqInfoPool *poolutil.Pool[*ReqInfo]
aliveConnPool *poolutil.Pool[*ConnInfo]

traceEventQueue *list.List
traceEventMu sync.RWMutex
Expand All @@ -161,18 +163,19 @@ type BackendDS struct {
}

const (
podEndpoint = "/alaz/k8s/pod/"
svcEndpoint = "/alaz/k8s/svc/"
rsEndpoint = "/alaz/k8s/replicaset/"
depEndpoint = "/alaz/k8s/deployment/"
epEndpoint = "/alaz/k8s/endpoint/"
containerEndpoint = "/alaz/k8s/container/"
dsEndpoint = "/alaz/k8s/daemonset/"
reqEndpoint = "/alaz/"
podEndpoint = "/pod/"
svcEndpoint = "/svc/"
rsEndpoint = "/replicaset/"
depEndpoint = "/deployment/"
epEndpoint = "/endpoint/"
containerEndpoint = "/container/"
dsEndpoint = "/daemonset/"
reqEndpoint = "/requests/"
connEndpoint = "/connections/"

traceEventEndpoint = "/dist_tracing/traffic/"

healthCheckEndpoint = "/alaz/healthcheck/"
healthCheckEndpoint = "/healthcheck/"
)

type LeveledLogger struct {
Expand Down Expand Up @@ -279,8 +282,10 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe
c: client,
batchSize: bs,
reqInfoPool: newReqInfoPool(func() *ReqInfo { return &ReqInfo{} }, func(r *ReqInfo) {}),
aliveConnPool: newAliveConnPool(func() *ConnInfo { return &ConnInfo{} }, func(r *ConnInfo) {}),
traceInfoPool: newTraceInfoPool(func() *TraceInfo { return &TraceInfo{} }, func(r *TraceInfo) {}),
reqChanBuffer: make(chan *ReqInfo, conf.ReqBufferSize),
connChanBuffer: make(chan *ConnInfo, conf.ConnBufferSize),
podEventChan: make(chan interface{}, 100),
svcEventChan: make(chan interface{}, 100),
rsEventChan: make(chan interface{}, 100),
Expand All @@ -292,6 +297,7 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe
}

go ds.sendReqsInBatch(bs)
go ds.sendConnsInBatch(bs)
go ds.sendTraceEventsInBatch(10 * bs)

eventsInterval := 10 * time.Second
Expand Down Expand Up @@ -339,7 +345,7 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe
return
}

req, err = http.NewRequest(http.MethodPost, fmt.Sprintf("%s/alaz/metrics/scrape/?instance=%s&monitoring_id=%s", ds.host, NodeID, MonitoringID), resp.Body)
req, err = http.NewRequest(http.MethodPost, fmt.Sprintf("%s/metrics/scrape/?instance=%s&monitoring_id=%s", ds.host, NodeID, MonitoringID), resp.Body)
if err != nil {
log.Logger.Error().Msgf("error creating metrics request: %v", err)
return
Expand Down Expand Up @@ -449,6 +455,18 @@ func convertReqsToPayload(batch []*ReqInfo) RequestsPayload {
}
}

func convertConnsToPayload(batch []*ConnInfo) ConnInfoPayload {
return ConnInfoPayload{
Metadata: Metadata{
MonitoringID: MonitoringID,
IdempotencyKey: string(uuid.NewUUID()),
NodeID: NodeID,
AlazVersion: tag,
},
Connections: batch,
}
}

func convertTraceEventsToPayload(batch []*TraceInfo) TracePayload {
return TracePayload{
Metadata: Metadata{
Expand Down Expand Up @@ -555,6 +573,49 @@ func (b *BackendDS) sendReqsInBatch(batchSize uint64) {

}

func (b *BackendDS) sendConnsInBatch(batchSize uint64) {
t := time.NewTicker(30 * time.Second)
defer t.Stop()

send := func() {
batch := make([]*ConnInfo, 0, batchSize)
loop := true

for i := 0; (i < int(batchSize)) && loop; i++ {
select {
case conn := <-b.connChanBuffer:
batch = append(batch, conn)
case <-time.After(50 * time.Millisecond):
loop = false
}
}

if len(batch) == 0 {
return
}

connsPayload := convertConnsToPayload(batch)
log.Logger.Debug().Any("conns", connsPayload).Msgf("sending %d conns to backend", len(batch))
go b.sendToBackend(http.MethodPost, connsPayload, connEndpoint)

// return openConns to the pool
for _, conn := range batch {
b.aliveConnPool.Put(conn)
}
}

for {
select {
case <-b.ctx.Done():
log.Logger.Info().Msg("stopping sending reqs to backend")
return
case <-t.C:
send()
}
}

}

func (b *BackendDS) send(ch <-chan interface{}, endpoint string) {
batch := make([]interface{}, 0, resourceBatchSize)
loop := true
Expand Down Expand Up @@ -611,6 +672,14 @@ func newReqInfoPool(factory func() *ReqInfo, close func(*ReqInfo)) *poolutil.Poo
}
}

func newAliveConnPool(factory func() *ConnInfo, close func(*ConnInfo)) *poolutil.Pool[*ConnInfo] {
return &poolutil.Pool[*ConnInfo]{
Items: make(chan *ConnInfo, 500),
Factory: factory,
Close: close,
}
}

func newTraceInfoPool(factory func() *TraceInfo, close func(*TraceInfo)) *poolutil.Pool[*TraceInfo] {
return &poolutil.Pool[*TraceInfo]{
Items: make(chan *TraceInfo, 50000),
Expand All @@ -619,11 +688,31 @@ func newTraceInfoPool(factory func() *TraceInfo, close func(*TraceInfo)) *poolut
}
}

func (b *BackendDS) PersistAliveConnection(aliveConn *AliveConnection) error {
// get a connInfo from the pool
oc := b.aliveConnPool.Get()

// overwrite the connInfo, all fields must be set in order to avoid conflict
oc[0] = aliveConn.CheckTime
oc[1] = aliveConn.FromIP
oc[2] = aliveConn.FromType
oc[3] = aliveConn.FromUID
oc[4] = aliveConn.FromPort
oc[5] = aliveConn.ToIP
oc[6] = aliveConn.ToType
oc[7] = aliveConn.ToUID
oc[8] = aliveConn.ToPort

b.connChanBuffer <- oc

return nil
}

func (b *BackendDS) PersistRequest(request *Request) error {
// get a reqInfo from the pool
reqInfo := b.reqInfoPool.Get()

// overwrite the reqInfo, all fields must be set in order to avoid comple
// overwrite the reqInfo, all fields must be set in order to avoid conflict
reqInfo[0] = request.StartTime
reqInfo[1] = request.Latency
reqInfo[2] = request.FromIP
Expand Down
2 changes: 2 additions & 0 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ type DataStore interface {
PersistRequest(request *Request) error

PersistTraceEvent(trace *l7_req.TraceEvent) error

PersistAliveConnection(trace *AliveConnection) error
}
12 changes: 12 additions & 0 deletions datastore/dto.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,18 @@ type Container struct {
} `json:"ports"`
}

type AliveConnection struct {
CheckTime int64 // connection is alive at this time, ms
FromIP string
FromType string
FromUID string
FromPort uint16
ToIP string
ToType string
ToUID string
ToPort uint16
}

type Request struct {
StartTime int64
Latency uint64 // in ns
Expand Down
17 changes: 16 additions & 1 deletion datastore/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,29 @@ type ContainerEvent struct {
// 15) Encrypted (bool)
// 16) Seq
// 17) Tid

type ReqInfo [18]interface{}

type RequestsPayload struct {
Metadata Metadata `json:"metadata"`
Requests []*ReqInfo `json:"requests"`
}

// 0) CheckTime // connection is alive at that time
// 1) Source IP
// 2) Source Type
// 3) Source ID
// 4) Source Port
// 5) Destination IP
// 6) Destination Type
// 7) Destination ID
// 8) Destination Port
type ConnInfo [9]interface{}

type ConnInfoPayload struct {
Metadata Metadata `json:"metadata"`
Connections []*ConnInfo `json:"connections"`
}

// 0) Timestamp
// 1) Tcp Seq Num
// 2) Tid
Expand Down
2 changes: 2 additions & 0 deletions ebpf/l7_req/l7.c
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,8 @@ int process_exit_of_syscalls_read_recvfrom(void* ctx, __u64 id, __u32 pid, __s64
e->status = parse_postgres_server_resp(read_info->buf, ret);
if (active_req->request_type == POSTGRES_MESSAGE_SIMPLE_QUERY) {
e->method = METHOD_SIMPLE_QUERY;
}else if (active_req->request_type == POSTGRES_MESSAGE_PARSE || active_req->request_type == POSTGRES_MESSAGE_BIND){
e->method = METHOD_EXTENDED_QUERY;
}
}
}else{
Expand Down
4 changes: 4 additions & 0 deletions ebpf/l7_req/l7.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ const (
BPF_POSTGRES_METHOD_UNKNOWN = iota
BPF_POSTGRES_METHOD_STATEMENT_CLOSE_OR_CONN_TERMINATE
BPF_POSTGRES_METHOD_SIMPLE_QUERY
BPF_POSTGRES_METHOD_EXTENDED_QUERY // for prepared statements

// BPF_POSTGRES_METHOD_QUERY
// BPF_POSTGRES_METHOD_EXECUTE
Expand Down Expand Up @@ -143,6 +144,7 @@ const (
const (
CLOSE_OR_TERMINATE = "CLOSE_OR_TERMINATE"
SIMPLE_QUERY = "SIMPLE_QUERY"
EXTENDED_QUERY = "EXTENDED_QUERY"
)

// for http2, user space
Expand Down Expand Up @@ -205,6 +207,8 @@ func (e PostgresMethodConversion) String() string {
return CLOSE_OR_TERMINATE
case BPF_POSTGRES_METHOD_SIMPLE_QUERY:
return SIMPLE_QUERY
case BPF_POSTGRES_METHOD_EXTENDED_QUERY:
return EXTENDED_QUERY
default:
return "Unknown"
}
Expand Down
Loading

0 comments on commit 6fd6daf

Please sign in to comment.