Skip to content

Commit

Permalink
Merge pull request #160 from getanteon/develop
Browse files Browse the repository at this point in the history
get address pair from ebpf combined with event
  • Loading branch information
fatihbaltaci authored Jul 14, 2024
2 parents 6971c60 + 9bfefe8 commit 51d81ea
Show file tree
Hide file tree
Showing 12 changed files with 397 additions and 140 deletions.
212 changes: 97 additions & 115 deletions aggregator/data.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
package aggregator

// aggregate data from different sources
// 1. k8s
// 2. containerd (TODO)
// 3. ebpf
// 4. cgroup (TODO)
// 5. docker (TODO)

import (
"bytes"
"context"
Expand Down Expand Up @@ -324,6 +317,7 @@ func (a *Aggregator) processEbpf(ctx context.Context) {
case l7_req.L7_EVENT:
d := data.(*l7_req.L7Event) // copy data's value
ctxPid := context.WithValue(a.ctx, log.LOG_CONTEXT, fmt.Sprint(d.Pid))
go a.signalTlsAttachment(d.Pid)
a.processL7(ctxPid, d)
case l7_req.TRACE_EVENT:
d := data.(*l7_req.TraceEvent)
Expand Down Expand Up @@ -460,7 +454,9 @@ func (a *Aggregator) processTcpConnect(ctx context.Context, d *tcp_state.TcpConn
}

var skLine *SocketLine
sockMap.mu.RLock()
skLine, ok = sockMap.M[d.Fd]
sockMap.mu.RUnlock()
if !ok {
return
}
Expand Down Expand Up @@ -571,19 +567,16 @@ func (a *Aggregator) processHttp2Frames() {
return
}

skInfo, err := a.findRelatedSocket(a.ctx, d)
if skInfo == nil || err != nil {
return
}
addrPair := extractAddressPair(d)

req.Latency = d.WriteTimeNs - req.Latency
req.StartTime = int64(convertKernelTimeToUserspaceTime(d.WriteTimeNs) / 1e6) // nano to milli
req.Completed = true
req.FromIP = skInfo.Saddr
req.ToIP = skInfo.Daddr
req.FromIP = addrPair.Saddr
req.ToIP = addrPair.Daddr
req.Tls = d.Tls
req.FromPort = skInfo.Sport
req.ToPort = skInfo.Dport
req.FromPort = addrPair.Sport
req.ToPort = addrPair.Dport
req.FailReason = ""
if req.Protocol == "" {
req.Protocol = "HTTP2"
Expand All @@ -596,7 +589,7 @@ func (a *Aggregator) processHttp2Frames() {
}

// toUID is set to :authority header in client frame
err = a.setFromTo(skInfo, d, req, req.ToUID)
err := a.setFromToV2(addrPair, d, req, req.ToUID)
if err != nil {
return
}
Expand Down Expand Up @@ -814,6 +807,52 @@ func (a *Aggregator) getSvcWithIP(addr string) (types.UID, bool) {
svcUid, ok := a.clusterInfo.ServiceIPToServiceUid[addr]
a.clusterInfo.k8smu.RUnlock() // unlock for reading
return svcUid, ok

}

func (a *Aggregator) setFromToV2(addrPair *AddressPair, d *l7_req.L7Event, event datastore.DirectionalEvent, hostHeader string) error {
// find pod info
podUid, ok := a.getPodWithIP(addrPair.Saddr)
if !ok {
return fmt.Errorf("error finding pod with sockets saddr")
}

event.SetFromUID(string(podUid))
event.SetFromType(POD)
event.SetFromPort(addrPair.Sport)
event.SetToPort(addrPair.Dport)

// find service info
svcUid, ok := a.getSvcWithIP(addrPair.Daddr)
if ok {
event.SetToUID(string(svcUid))
event.SetToType(SVC)
} else {
podUid, ok := a.getPodWithIP(addrPair.Daddr)

if ok {
event.SetToUID(string(podUid))
event.SetToType(POD)
} else {
// 3rd party url
if hostHeader != "" {
event.SetToUID(hostHeader)
event.SetToType(OUTBOUND)
} else {
remoteDnsHost, err := getHostnameFromIP(addrPair.Daddr)
if err == nil {
// dns lookup successful
event.SetToUID(remoteDnsHost)
event.SetToType(OUTBOUND)
} else {
event.SetToUID(addrPair.Daddr)
event.SetToType(OUTBOUND)
}
}
}
}

return nil
}

func (a *Aggregator) setFromTo(skInfo *SockInfo, d *l7_req.L7Event, event datastore.DirectionalEvent, hostHeader string) error {
Expand Down Expand Up @@ -978,39 +1017,20 @@ func (a *Aggregator) processKafkaEvent(ctx context.Context, d *l7_req.L7Event) {
return
}

skInfo, err := a.findRelatedSocket(ctx, d)
if skInfo == nil || err != nil {
// requeue event if this is its first time
if !d.PutBack {
d.PutBack = true
a.ebpfChan <- d
return
}

log.Logger.Debug().
Ctx(ctx).
Err(err).
Uint32("pid", d.Pid).
Uint64("fd", d.Fd).
Uint64("writeTime", d.WriteTimeNs).
Str("protocol", d.Protocol).
Any("payload", string(d.Payload[:d.PayloadSize])).
Msg("discarding kafka event, socket not found")
addrPair := extractAddressPair(d)

return
}
for _, msg := range kafkaMessages {
event := &datastore.KafkaEvent{
StartTime: int64(convertKernelTimeToUserspaceTime(d.WriteTimeNs) / 1e6),
Latency: d.Duration,
FromIP: skInfo.Saddr,
FromIP: addrPair.Saddr,
FromType: "",
FromUID: "",
FromPort: 0,
ToIP: skInfo.Daddr,
FromPort: addrPair.Sport,
ToIP: addrPair.Daddr,
ToType: "",
ToUID: "",
ToPort: 0,
ToPort: addrPair.Dport,
Tls: d.Tls,
Topic: msg.TopicName,
Partition: uint32(msg.Partition),
Expand All @@ -1021,7 +1041,7 @@ func (a *Aggregator) processKafkaEvent(ctx context.Context, d *l7_req.L7Event) {
Seq: d.Seq,
}

err := a.setFromTo(skInfo, d, event, "")
err := a.setFromToV2(addrPair, d, event, "")
if err != nil {
return
}
Expand All @@ -1042,26 +1062,13 @@ func (a *Aggregator) processKafkaEvent(ctx context.Context, d *l7_req.L7Event) {
}

func (a *Aggregator) processAmqpEvent(ctx context.Context, d *l7_req.L7Event) {
skInfo, err := a.findRelatedSocket(ctx, d)
if skInfo == nil || err != nil {
// requeue event if this is its first time
if !d.PutBack {
d.PutBack = true
a.ebpfChan <- d
return
}
log.Logger.Debug().Uint32("pid", d.Pid).Err(err).
Uint64("fd", d.Fd).Uint64("writeTime", d.WriteTimeNs).
Str("protocol", d.Protocol).Any("payload", string(d.Payload[:d.PayloadSize])).
Msg("discarding amqp event, socket not found")
return
}
addrPair := extractAddressPair(d)

reqDto := &datastore.Request{
StartTime: int64(convertKernelTimeToUserspaceTime(d.WriteTimeNs) / 1e6),
Latency: d.Duration,
FromIP: skInfo.Saddr,
ToIP: skInfo.Daddr,
FromIP: addrPair.Saddr,
ToIP: addrPair.Daddr,
Protocol: d.Protocol,
Tls: d.Tls,
Completed: true,
Expand All @@ -1073,7 +1080,7 @@ func (a *Aggregator) processAmqpEvent(ctx context.Context, d *l7_req.L7Event) {
Seq: d.Seq,
}

err = a.setFromTo(skInfo, d, reqDto, "")
err := a.setFromToV2(addrPair, d, reqDto, "")
if err != nil {
return
}
Expand All @@ -1095,28 +1102,13 @@ func (a *Aggregator) processAmqpEvent(ctx context.Context, d *l7_req.L7Event) {
func (a *Aggregator) processRedisEvent(ctx context.Context, d *l7_req.L7Event) {
query := string(d.Payload[0:d.PayloadSize])

skInfo, err := a.findRelatedSocket(ctx, d)
if skInfo == nil || err != nil {
// requeue event if this is its first time
if !d.PutBack {
d.PutBack = true
a.ebpfChan <- d
return
}
log.Logger.Debug().
Ctx(ctx).
Err(err).
Uint32("pid", d.Pid).
Uint64("fd", d.Fd).Uint64("writeTime", d.WriteTimeNs).
Str("protocol", d.Protocol).Any("payload", string(d.Payload[:d.PayloadSize])).Msg("discarding redis event, socket not found")
return
}
addrPair := extractAddressPair(d)

reqDto := &datastore.Request{
StartTime: int64(convertKernelTimeToUserspaceTime(d.WriteTimeNs) / 1e6),
Latency: d.Duration,
FromIP: skInfo.Saddr,
ToIP: skInfo.Daddr,
FromIP: addrPair.Saddr,
ToIP: addrPair.Daddr,
Protocol: d.Protocol,
Tls: d.Tls,
Completed: true,
Expand All @@ -1128,7 +1120,7 @@ func (a *Aggregator) processRedisEvent(ctx context.Context, d *l7_req.L7Event) {
Seq: d.Seq,
}

err = a.setFromTo(skInfo, d, reqDto, "")
err := a.setFromToV2(addrPair, d, reqDto, "")
if err != nil {
return
}
Expand Down Expand Up @@ -1202,28 +1194,13 @@ func (a *Aggregator) processHttpEvent(ctx context.Context, d *l7_req.L7Event) {
_, path, _, reqHostHeader = parseHttpPayload(string(d.Payload[0:d.PayloadSize]))
}

skInfo, err := a.findRelatedSocket(ctx, d)
if skInfo == nil || err != nil {
// requeue event if this is its first time
if !d.PutBack {
d.PutBack = true
a.ebpfChan <- d
return
}
log.Logger.Debug().Uint32("pid", d.Pid).
Err(err).
Uint64("fd", d.Fd).Uint64("writeTime", d.WriteTimeNs).
Str("protocol", d.Protocol).
Any("payload", string(d.Payload[:d.PayloadSize])).
Msg("discarding http event, socket not found")
return
}
addrPair := extractAddressPair(d)

reqDto := &datastore.Request{
StartTime: int64(convertKernelTimeToUserspaceTime(d.WriteTimeNs) / 1e6),
Latency: d.Duration,
FromIP: skInfo.Saddr,
ToIP: skInfo.Daddr,
FromIP: addrPair.Saddr,
ToIP: addrPair.Daddr,
Protocol: d.Protocol,
Tls: d.Tls,
Completed: true,
Expand All @@ -1235,7 +1212,7 @@ func (a *Aggregator) processHttpEvent(ctx context.Context, d *l7_req.L7Event) {
Seq: d.Seq,
}

err = a.setFromTo(skInfo, d, reqDto, reqHostHeader)
err := a.setFromToV2(addrPair, d, reqDto, reqHostHeader)
if err != nil {
return
}
Expand All @@ -1262,28 +1239,13 @@ func (a *Aggregator) processPostgresEvent(ctx context.Context, d *l7_req.L7Event
return
}

skInfo, err := a.findRelatedSocket(ctx, d)
if skInfo == nil {
// requeue event if this is its first time
if !d.PutBack {
d.PutBack = true
a.ebpfChan <- d
return
}

log.Logger.Debug().Uint32("pid", d.Pid).
Err(err).
Uint64("fd", d.Fd).Uint64("writeTime", d.WriteTimeNs).
Str("protocol", d.Protocol).Any("payload", string(d.Payload[:d.PayloadSize])).Msg("discarding postgres event, socket not found")

return
}
addrPair := extractAddressPair(d)

reqDto := &datastore.Request{
StartTime: int64(convertKernelTimeToUserspaceTime(d.WriteTimeNs) / 1e6),
Latency: d.Duration,
FromIP: skInfo.Saddr,
ToIP: skInfo.Daddr,
FromIP: addrPair.Saddr,
ToIP: addrPair.Daddr,
Protocol: d.Protocol,
Tls: d.Tls,
Completed: true,
Expand All @@ -1295,7 +1257,7 @@ func (a *Aggregator) processPostgresEvent(ctx context.Context, d *l7_req.L7Event
Seq: d.Seq,
}

err = a.setFromTo(skInfo, d, reqDto, "")
err = a.setFromToV2(addrPair, d, reqDto, "")
if err != nil {
return
}
Expand Down Expand Up @@ -1580,3 +1542,23 @@ func convertKernelTimeToUserspaceTime(writeTime uint64) uint64 {
func convertUserTimeToKernelTime(now uint64) uint64 {
return l7_req.FirstKernelTime - (l7_req.FirstUserspaceTime - now)
}

// IntToIPv4 converts IP address of version 4 from integer to net.IP
// representation.
func IntToIPv4(ipaddr uint32) net.IP {
ip := make(net.IP, net.IPv4len)

// Proceed conversion
binary.BigEndian.PutUint32(ip, ipaddr)

return ip
}

func extractAddressPair(d *l7_req.L7Event) *AddressPair {
return &AddressPair{
Saddr: IntToIPv4(d.Saddr).String(),
Sport: d.Sport,
Daddr: IntToIPv4(d.Daddr).String(),
Dport: d.Dport,
}
}
1 change: 0 additions & 1 deletion aggregator/sock_num_line.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ func (nl *SocketLine) DeleteUnused() {
i-- // we deleted two values, so we need to decrement i by 2
}
}

}

type sock struct {
Expand Down
7 changes: 7 additions & 0 deletions aggregator/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ import (
"github.com/ddosify/alaz/log"
)

type AddressPair struct {
Saddr string `json:"saddr"`
Sport uint16 `json:"sport"`
Daddr string `json:"daddr"`
Dport uint16 `json:"dport"`
}

// We need to keep track of the following
// in order to build find relationships between
// connections and pods/services
Expand Down
Loading

0 comments on commit 51d81ea

Please sign in to comment.