Skip to content

Commit

Permalink
Merge pull request #66 from ddosify/develop
Browse files Browse the repository at this point in the history
fix memory leaks related to hpack
  • Loading branch information
fatihbaltaci authored Jan 3, 2024
2 parents 1ec1a53 + 3221027 commit c3fed66
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 135 deletions.
152 changes: 81 additions & 71 deletions aggregator/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,7 @@ func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{}, ec *eb
PidToSocketMap: make(map[uint32]*SocketMap, 0),
}

go clearSocketLines(ctx, clusterInfo.PidToSocketMap)

return &Aggregator{
a := &Aggregator{
ctx: ctx,
k8sChan: k8sChan,
ebpfChan: ec.EbpfEvents(),
Expand All @@ -151,6 +149,10 @@ func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{}, ec *eb
h2Parsers: make(map[string]*http2Parser),
liveProcesses: make(map[uint32]struct{}),
}

go a.clearSocketLines(ctx)

return a
}

func (a *Aggregator) Run() {
Expand Down Expand Up @@ -245,27 +247,8 @@ func (a *Aggregator) processEbpf(ctx context.Context) {
}
go a.processTcpConnect(tcpConnectEvent)
case l7_req.L7_EVENT:
d := data.(l7_req.L7Event) // copy data's value

// copy payload slice
payload := [1024]uint8{}
copy(payload[:], d.Payload[:])

l7Event := l7_req.L7Event{
Fd: d.Fd,
Pid: d.Pid,
Status: d.Status,
Duration: d.Duration,
Protocol: d.Protocol,
Tls: d.Tls,
Method: d.Method,
Payload: payload,
PayloadSize: d.PayloadSize,
PayloadReadComplete: d.PayloadReadComplete,
Failed: d.Failed,
WriteTimeNs: d.WriteTimeNs,
}
go a.processL7(ctx, l7Event)
d := data.(*l7_req.L7Event) // copy data's value
go a.processL7(ctx, d)
case proc.PROC_EVENT:
d := data.(proc.ProcEvent) // copy data's value
if d.Type_ == proc.EVENT_PROC_EXEC {
Expand Down Expand Up @@ -298,6 +281,19 @@ func (a *Aggregator) stopHttp2Worker(pid uint32) {
if ch, ok := a.h2Ch[pid]; ok {
close(ch)
delete(a.h2Ch, pid)

a.h2ParserMu.Lock()
for key, parser := range a.h2Parsers {
// h2Parsers map[string]*http2Parser // pid-fd -> http2Parser
if strings.HasPrefix(key, fmt.Sprint(pid)) {
parser.clientHpackDecoder.Close()
parser.serverHpackDecoder.Close()

delete(a.h2Parsers, key)
}
}
a.h2ParserMu.Unlock()

}
}

Expand Down Expand Up @@ -388,7 +384,13 @@ func (a *Aggregator) processTcpConnect(data interface{}) {

// remove h2Parser if exists
a.h2ParserMu.Lock()
delete(a.h2Parsers, a.getConnKey(d.Pid, d.Fd))
key := a.getConnKey(d.Pid, d.Fd)
h2Parser, ok := a.h2Parsers[key]
if ok {
h2Parser.clientHpackDecoder.Close()
h2Parser.serverHpackDecoder.Close()
}
delete(a.h2Parsers, key)
a.h2ParserMu.Unlock()

}
Expand Down Expand Up @@ -431,7 +433,7 @@ type FrameArrival struct {
}

// called once per pid
func (a *Aggregator) processHttp2Frames(ch chan *l7_req.L7Event) {
func (a *Aggregator) processHttp2Frames(pid uint32, ch chan *l7_req.L7Event) {
mu := sync.RWMutex{}

createFrameKey := func(fd uint64, streamId uint32) string {
Expand All @@ -440,12 +442,31 @@ func (a *Aggregator) processHttp2Frames(ch chan *l7_req.L7Event) {
// fd-streamId -> frame
frames := make(map[string]*FrameArrival)

persistReq := func(d *l7_req.L7Event, req *datastore.Request, statusCode uint32, grpcStatus uint32) {
skInfo := a.findRelatedSocket(a.ctx, d)
if skInfo == nil {
return
done := make(chan bool, 1)

go func() {
t := time.NewTicker(1 * time.Minute)
defer t.Stop()

for {
select {
case <-t.C:
mu.Lock()
for key, f := range frames {
if f.ClientHeadersFrameArrived && !f.ServerHeadersFrameArrived {
delete(frames, key)
} else if !f.ClientHeadersFrameArrived && f.ServerHeadersFrameArrived {
delete(frames, key)
}
}
mu.Unlock()
case <-done:
return
}
}
}()

persistReq := func(d *l7_req.L7Event, req *datastore.Request, statusCode uint32, grpcStatus uint32) {
if req.Method == "" || req.Path == "" {
// if we couldn't parse the request, discard
// this is possible because of hpack dynamic table, we can't parse the request until a new connection is established
Expand All @@ -456,6 +477,11 @@ func (a *Aggregator) processHttp2Frames(ch chan *l7_req.L7Event) {
return
}

skInfo := a.findRelatedSocket(a.ctx, d)
if skInfo == nil {
return
}

req.StartTime = time.Now().UnixMilli()
req.Latency = d.WriteTimeNs - req.Latency
req.Completed = true
Expand Down Expand Up @@ -566,7 +592,6 @@ func (a *Aggregator) processHttp2Frames(ch chan *l7_req.L7Event) {
}

fa := frames[key]
mu.Unlock()
fa.ClientHeadersFrameArrived = true
fa.req.Latency = d.WriteTimeNs // set latency to write time here, will be updated later

Expand Down Expand Up @@ -600,10 +625,15 @@ func (a *Aggregator) processHttp2Frames(ch chan *l7_req.L7Event) {
// if ReadFrame were used, f.HeaderBlockFragment()
h2Parser.clientHpackDecoder.Write(buf[offset:endOfFrame])

if fh.Flags.Has(http2.FlagHeadersEndHeaders) {
break
}
offset = endOfFrame

if fa.ServerHeadersFrameArrived {
req := *fa.req
go persistReq(d, &req, fa.statusCode, fa.grpcStatus)
delete(frames, key)
}
mu.Unlock()
break
}
} else if d.Method == l7_req.SERVER_FRAME {
for {
Expand All @@ -621,13 +651,14 @@ func (a *Aggregator) processHttp2Frames(ch chan *l7_req.L7Event) {
break
}

if fh.Type != http2.FrameHeaders && fh.Type != http2.FrameData {
streamId := fh.StreamID
key := createFrameKey(fd, streamId)

if fh.Type != http2.FrameHeaders {
offset = endOfFrame
continue
}

streamId := fh.StreamID
key := createFrameKey(fd, streamId)
if fh.Type == http2.FrameHeaders {
mu.Lock()
if _, ok := frames[key]; !ok {
Expand All @@ -637,7 +668,6 @@ func (a *Aggregator) processHttp2Frames(ch chan *l7_req.L7Event) {
}
}
fa := frames[key]
mu.Unlock()
fa.ServerHeadersFrameArrived = true
// Process server headers frame
respHeaderSet := func(req *datastore.Request) func(hf hpack.HeaderField) {
Expand All @@ -655,36 +685,13 @@ func (a *Aggregator) processHttp2Frames(ch chan *l7_req.L7Event) {
h2Parser.serverHpackDecoder.SetEmitFunc(respHeaderSet(fa.req))
h2Parser.serverHpackDecoder.Write(buf[offset:endOfFrame])

if fa.ClientHeadersFrameArrived && fa.ServerHeadersFrameArrived {
mu.Lock()
if fa.ClientHeadersFrameArrived {
req := *fa.req
go persistReq(d, &req, fa.statusCode, fa.grpcStatus)
delete(frames, key)
mu.Unlock()
}

} else if fh.Type == http2.FrameData {
mu.Lock()
if _, ok := frames[key]; !ok {
frames[key] = &FrameArrival{
ServerDataFrameArrived: true,
req: &datastore.Request{},
}
}
fa := frames[key]
mu.Unlock()
fa.ServerDataFrameArrived = true
fa.event = d

if fa.ClientHeadersFrameArrived && fa.ServerHeadersFrameArrived {
mu.Lock()
req := *fa.req // copy
go persistReq(d, &req, fa.statusCode, fa.grpcStatus)
delete(frames, key)
mu.Unlock()
}

break // only process the first data frame for now
break
}
}
} else {
Expand All @@ -693,6 +700,7 @@ func (a *Aggregator) processHttp2Frames(ch chan *l7_req.L7Event) {
}
}

done <- true // signal cleaning goroutine
}

func (a *Aggregator) setFromTo(skInfo *SockInfo, d *l7_req.L7Event, reqDto *datastore.Request, hostHeader string) error {
Expand Down Expand Up @@ -751,7 +759,7 @@ func (a *Aggregator) getConnKey(pid uint32, fd uint64) string {
return fmt.Sprintf("%d-%d", pid, fd)
}

func (a *Aggregator) processL7(ctx context.Context, d l7_req.L7Event) {
func (a *Aggregator) processL7(ctx context.Context, d *l7_req.L7Event) {
// other protocols events come as whole, but http2 events come as frames
// we need to aggregate frames to get the whole request
defer func() {
Expand Down Expand Up @@ -791,19 +799,19 @@ func (a *Aggregator) processL7(ctx context.Context, d l7_req.L7Event) {
}

// initialize channel
h2ChPid := make(chan *l7_req.L7Event, 1000)
h2ChPid := make(chan *l7_req.L7Event, 100)
a.h2ChMu.Lock()
a.h2Ch[d.Pid] = h2ChPid
ch = h2ChPid
a.h2ChMu.Unlock()
go a.processHttp2Frames(ch) // worker per pid, will be called once
go a.processHttp2Frames(d.Pid, ch) // worker per pid, will be called once
}

ch <- &d
ch <- d
return
}

skInfo := a.findRelatedSocket(ctx, &d)
skInfo := a.findRelatedSocket(ctx, d)
if skInfo == nil {
return
}
Expand Down Expand Up @@ -836,7 +844,7 @@ func (a *Aggregator) processL7(ctx context.Context, d l7_req.L7Event) {
_, reqDto.Path, _, reqHostHeader = parseHttpPayload(string(d.Payload[0:d.PayloadSize]))
}

err := a.setFromTo(skInfo, &d, &reqDto, reqHostHeader)
err := a.setFromTo(skInfo, d, &reqDto, reqHostHeader)
if err != nil {
return
}
Expand Down Expand Up @@ -988,7 +996,7 @@ func parseSqlCommand(r []uint8) string {
return sqlStatement
}

func clearSocketLines(ctx context.Context, pidToSocketMap map[uint32]*SocketMap) {
func (a *Aggregator) clearSocketLines(ctx context.Context) {
ticker := time.NewTicker(1 * time.Minute)
skLineCh := make(chan *SocketLine, 1000)

Expand All @@ -1004,10 +1012,12 @@ func clearSocketLines(ctx context.Context, pidToSocketMap map[uint32]*SocketMap)
}()

for range ticker.C {
for _, socketMap := range pidToSocketMap {
a.clusterInfo.mu.RLock()
for _, socketMap := range a.clusterInfo.PidToSocketMap {
for _, socketLine := range socketMap.M {
skLineCh <- socketLine
}
}
a.clusterInfo.mu.RUnlock()
}
}
Loading

0 comments on commit c3fed66

Please sign in to comment.