From 8422730bd5da607d2bdabf7ad3f23dd4d302ec07 Mon Sep 17 00:00:00 2001 From: Nick Peters Date: Sun, 17 Sep 2023 19:51:50 -0700 Subject: [PATCH] Adding [slightly better] socket support. --- command/socket/leak.go | 117 +++++++++++++++++++++++++++++++++++++ command/socket/register.go | 48 +++++++++++++++ command/tcp/leak.go | 33 ----------- command/tcp/register.go | 24 -------- go.mod | 2 + go.sum | 5 ++ main.go | 4 +- metrics/metrics.go | 16 +++++ 8 files changed, 190 insertions(+), 59 deletions(-) create mode 100644 command/socket/leak.go create mode 100644 command/socket/register.go delete mode 100644 command/tcp/leak.go delete mode 100644 command/tcp/register.go diff --git a/command/socket/leak.go b/command/socket/leak.go new file mode 100644 index 0000000..8c496e7 --- /dev/null +++ b/command/socket/leak.go @@ -0,0 +1,117 @@ +package socket + +import ( + "fmt" + "github.com/strategicpause/memory-leak/metrics" + "golang.org/x/sys/unix" + "time" +) + +const ( + // MaxConnections defines the maximum number of connections that can be accepted by a given socket. This value is + // defined by /proc/sys/net/core/somaxconn. + MaxConnections = 4096 + StartPort = 9090 + // By default each established socket connection will write 1 KiB to the buffer. + KiB = 1024 +) + +var ( + LocalAddr = [4]byte{127, 0, 0, 1} +) + +type Params struct { + NumSockets int64 + NetworkAddressDomain int + ConnectionType int + CommunicationProtocol int + PauseTimeInSeconds time.Duration +} + +func tcpLeak(params *Params) error { + PrintParams(params) + + var stopChan chan bool + var err error + + nextPort := StartPort + currentPort := nextPort + + for i := int64(0); i < params.NumSockets; i++ { + // Setup a new service every 4096 connections. + if i%MaxConnections == 0 { + if stopChan != nil { + stopChan <- true + } + currentPort = nextPort + stopChan, err = resetServer(params, currentPort) + if err != nil { + return err + } + nextPort = nextPort + 1 + } + + clientFd, err := unix.Socket(params.NetworkAddressDomain, params.ConnectionType, params.CommunicationProtocol) + if err != nil { + return err + } + + if err = unix.Connect(clientFd, &unix.SockaddrInet4{Port: currentPort, Addr: LocalAddr}); err != nil { + return err + } + + _, err = unix.Write(clientFd, make([]byte, KiB)) + + if i%100 == 0 { + metrics.PrintSocketStats() + time.Sleep(params.PauseTimeInSeconds) + } + } + + return nil +} + +// resetServer will create a new service socket after 4096 connections (which is the maximum backlog value for listen). +func resetServer(params *Params, port int) (chan bool, error) { + stopChan := make(chan bool, 1) + serverFd := Must(unix.Socket(params.NetworkAddressDomain, params.ConnectionType, params.CommunicationProtocol)) + + serviceAddr := &unix.SockaddrInet4{ + Port: port, + Addr: LocalAddr, + } + + if err := unix.Bind(serverFd, serviceAddr); err != nil { + return stopChan, err + } + + if err := unix.Listen(serverFd, MaxConnections); err != nil { + return stopChan, err + } + fmt.Printf("Service bound to 127.0.0.1:%d.\n", port) + + go func() { + for { + select { + case <-stopChan: + unix.Close(serverFd) + return + default: + unix.Accept(serverFd) + } + } + }() + + return stopChan, nil +} + +func PrintParams(params *Params) { + fmt.Printf("NumSockets: %v\n", params.NumSockets) +} + +func Must[T any](obj T, err error) T { + if err != nil { + panic(err) + } + return obj +} diff --git a/command/socket/register.go b/command/socket/register.go new file mode 100644 index 0000000..5085e21 --- /dev/null +++ b/command/socket/register.go @@ -0,0 +1,48 @@ +package socket + +import ( + "github.com/urfave/cli" + "golang.org/x/sys/unix" + "time" +) + +const ( + NumSocketsName = "num-sockets" + CommunicationProtocolName = "comm-protocol" + PauseDurationName = "pause" +) + +func Register() cli.Command { + return cli.Command{ + Name: "socket", + Usage: "Reproduces a socket leak.", + Action: action, + Flags: flags(), + } +} + +func flags() []cli.Flag { + return []cli.Flag{ + cli.Int64Flag{ + Name: NumSocketsName, + Usage: "specify the number of sockets to create.", + Value: 9223372036854775807, + }, + cli.DurationFlag{ + Name: PauseDurationName, + Usage: "Time between allocations in seconds.", + Value: time.Second, + }, + } +} + +func action(ctx *cli.Context) error { + params := &Params{ + NumSockets: ctx.Int64(NumSocketsName), + NetworkAddressDomain: unix.AF_INET, + ConnectionType: unix.SOCK_STREAM, + CommunicationProtocol: unix.IPPROTO_TCP, + PauseTimeInSeconds: ctx.Duration(PauseDurationName), + } + return tcpLeak(params) +} diff --git a/command/tcp/leak.go b/command/tcp/leak.go deleted file mode 100644 index 30f9ad1..0000000 --- a/command/tcp/leak.go +++ /dev/null @@ -1,33 +0,0 @@ -package tcp - -import ( - "fmt" - "github.com/strategicpause/memory-leak/metrics" - "net/http" - "time" -) - -func tcpLeak() error { - go func() { - fmt.Println("Listening on port", Port) - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - _, _ = w.Write([]byte(time.Now().String())) - }) - _ = http.ListenAndServe(Port, nil) - }() - for { - go func() { - req := Must(http.NewRequest("GET", "http://localhost:8080/", nil)) - client := http.Client{} - Must(client.Do(req)) - metrics.PrintMemory() - }() - } -} - -func Must[T any](obj T, err error) T { - if err != nil { - panic(err) - } - return obj -} diff --git a/command/tcp/register.go b/command/tcp/register.go deleted file mode 100644 index 57a55c1..0000000 --- a/command/tcp/register.go +++ /dev/null @@ -1,24 +0,0 @@ -package tcp - -import ( - "github.com/urfave/cli" -) - -const Port = ":8080" - -func Register() cli.Command { - return cli.Command{ - Name: "tcp", - Usage: "Reproduces a TCP socket leak.", - Action: action, - Flags: flags(), - } -} - -func flags() []cli.Flag { - return []cli.Flag{} -} - -func action(_ *cli.Context) error { - return tcpLeak() -} diff --git a/go.mod b/go.mod index f17cfb1..509b692 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,9 @@ go 1.19 require ( github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf + github.com/prometheus/procfs v0.11.1 github.com/urfave/cli v1.22.14 + golang.org/x/sys v0.12.0 ) require ( diff --git a/go.sum b/go.sum index 5591ee3..a35b7ee 100644 --- a/go.sum +++ b/go.sum @@ -4,10 +4,13 @@ github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf h1:FtEj8sfIcaaBfAKrE1Cwb61YDtYq9JxChK1c7AKce7s= github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf/go.mod h1:yrqSXGoD/4EKfF26AOGzscPOgTTJcyAwM2rpixWT+t4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI= +github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -19,6 +22,8 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/urfave/cli v1.22.14 h1:ebbhrRiGK2i4naQJr+1Xj92HXZCrK7MsyTS/ob3HnAk= github.com/urfave/cli v1.22.14/go.mod h1:X0eDS6pD6Exaclxm99NJ3FiCDRED7vIHpx2mDOHLvkA= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go index 3e7bc99..b53b058 100644 --- a/main.go +++ b/main.go @@ -2,7 +2,7 @@ package main import ( "github.com/strategicpause/memory-leak/command/memory" - "github.com/strategicpause/memory-leak/command/tcp" + "github.com/strategicpause/memory-leak/command/socket" "github.com/urfave/cli" "log" "os" @@ -21,6 +21,6 @@ func main() { func RegisterCommands() cli.Commands { return cli.Commands{ memory.Register(), - tcp.Register(), + socket.Register(), } } diff --git a/metrics/metrics.go b/metrics/metrics.go index eb6b323..bb5a168 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -2,6 +2,7 @@ package metrics import ( "fmt" + "github.com/prometheus/procfs" "runtime" ) @@ -39,3 +40,18 @@ func BToMiB[T uint64 | uint32](b T) uint64 { func NsToUs(s uint64) uint64 { return s / 1000 } + +func PrintSocketStats() { + fs, _ := procfs.NewDefaultFS() + stats, _ := fs.NetSockstat() + for _, protocol := range stats.Protocols { + if protocol.Protocol == "TCP" { + fmt.Printf("TCP: inuse %d orphan %d tw %d alloc %d mem %d total %d\n", protocol.InUse, + *protocol.Orphan, + *protocol.TW, + *protocol.Alloc, + *protocol.Mem, + *stats.Used) + } + } +}