diff --git a/CHANGELOG.md b/CHANGELOG.md index db94688..3e93406 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,7 @@ -# v0.7 (Unreleased) +# v0.7 (2017-12-30) + +- Add "-wait-after-start" option for services that aren't quite ready at + the moment their port is open. # v0.6 (2017-12-29) diff --git a/README.mkdn b/README.mkdn index 6872518..d8ebb0e 100644 --- a/README.mkdn +++ b/README.mkdn @@ -29,12 +29,13 @@ automatically if a request to them is made. One service can be configured on the command line: - -proxy # Address pairs to listen on/proxy to ("from:port to:port from2 to2") - -proxy-sep # Alternate character to separate proxy addresses - -timeout # Duration to wait before aborting connection - -stop-after # Duration after last connection to signal command to stop - -stop-signal # Signal to send to command (default it INT) - args... # Command to run + -proxy # Address pairs to listen on/proxy to ("from:port to:port from2 to2") + -proxy-sep # Alternate character to separate proxy addresses + -stop-after # Duration after last connection to signal command to stop + -stop-signal # Signal to send to command (default it INT) + -timeout # Duration to wait before aborting connection + -wait-after-start # Duration to wait after starting command before forwarding connections + args... # Command to run Alternatively (or additionally) multiple services can be configured by specifying a JSON configuration file: @@ -51,14 +52,17 @@ with something like this: ":6543": "localhost:7654" }, "Command": ["run", "some", "server"], - "Timeout": "10s", "StopAfter": "10m", "StopSignal": "TERM", + "Timeout": "10s", + "WaitAfterStart": "2s" } ] } -The command to run is optional, without it ynetd is just a port forwarder: +Everything is optional (except for "proxy"). + +Without a command to run, ynetd can be used as a simple port forwarder: ynetd -proxy "localhost:5001 remote.host:8080" @@ -67,14 +71,14 @@ or as "interface:name:port" ("interface:eth0:5001") to listen on (all addresses of) the named interface. This can be useful if you don't know the ip address up front -but want to "forward from public interface to loopback" +but want to forward from public interface to loopback to avoid having to use additional port numbers: -proxy "interface:eth0:5000 localhost:5000" ## Why? -- To reduce memory consumption for services until they are needed. +- To reduce resource utilization for services until they are needed. - To learn a little go. ## Why the name? diff --git a/config/cli.go b/config/cli.go index 09aebdf..c89e516 100644 --- a/config/cli.go +++ b/config/cli.go @@ -14,16 +14,18 @@ var proxySpec string var timeout = DefaultTimeout var stopAfter = DefaultStopAfter var stopSignal = "INT" +var waitAfterStart = DefaultWaitAfterStart func init() { const ( - configUsage = "Path to configuration file" - listenUsage = "Address to listen on (deprecated)" - proxySepUsage = "Separator character for -proxy" - proxyUsage = "Addresses to proxy, separated by spaces (\"fromhost:port tohost:port from to\")" - timeoutUsage = "Duration of time to allow command to start up" - stopAfterUsage = "Duration of time after the last client connection to stop the command" - stopSignalUsage = "Signal to send to stop" + configUsage = "Path to configuration file" + listenUsage = "Address to listen on (deprecated)" + proxySepUsage = "Separator character for -proxy" + proxyUsage = "Addresses to proxy, separated by spaces (\"fromhost:port tohost:port from to\")" + timeoutUsage = "Duration of time to allow connections to attempt to forward" + stopAfterUsage = "Duration of time after the last client connection to stop the command" + stopSignalUsage = "Signal to send to stop" + waitAfterStartUsage = "Duration of time to wait while command starts before forwarding" ) flag.StringVar(&configfile, "config", "", configUsage) @@ -43,6 +45,8 @@ func init() { flag.DurationVar(&timeout, "timeout", timeout, timeoutUsage) flag.DurationVar(&timeout, "t", timeout, timeoutUsage+" (shorthand)") + + flag.DurationVar(&waitAfterStart, "wait-after-start", waitAfterStart, waitAfterStartUsage) } // Load config from cli arguments. @@ -85,11 +89,12 @@ func Load(args []string) (cfg Config, err error) { if len(proxy) > 0 { cfg.Services = append(cfg.Services, Service{ - Proxy: proxy, - Command: args, - Timeout: timeout.String(), - StopAfter: stopAfter.String(), - StopSignal: stopSignal, + Proxy: proxy, + Command: args, + Timeout: timeout.String(), + StopAfter: stopAfter.String(), + StopSignal: stopSignal, + WaitAfterStart: waitAfterStart.String(), }) } diff --git a/config/cli_test.go b/config/cli_test.go index 7cf8a30..3dca6d7 100644 --- a/config/cli_test.go +++ b/config/cli_test.go @@ -27,6 +27,7 @@ func TestLoadArgs(t *testing.T) { listenAddress = "" proxySpec = ":5000 localhost:5001 some:6001 some:7001" timeout = 2 * time.Second + waitAfterStart = 500 * time.Millisecond cfg, err := Load([]string{"foo", "bar"}) if err != nil { @@ -47,6 +48,9 @@ func TestLoadArgs(t *testing.T) { if svc.Timeout != "2s" { t.Errorf("Timeout incorrect: %s", svc.Timeout) } + if svc.WaitAfterStart != "500ms" { + t.Errorf("WaitAfterStart incorrect: %s", svc.WaitAfterStart) + } } func TestLoadProxySep(t *testing.T) { @@ -147,7 +151,8 @@ func TestLoadConfigFile(t *testing.T) { { "Proxy": {":5000": "localhost:5001"}, "Command": ["3", "4"], - "Timeout": "15ms" + "Timeout": "15ms", + "WaitAfterStart": "25ms" } ] }`)) @@ -175,6 +180,9 @@ func TestLoadConfigFile(t *testing.T) { if svc.Timeout != "15ms" { t.Errorf("Timeout incorrect: %s", svc.Timeout) } + if svc.WaitAfterStart != "25ms" { + t.Errorf("WaitAfterStart incorrect: %s", svc.WaitAfterStart) + } } func TestLoadConfigFileError(t *testing.T) { diff --git a/config/config.go b/config/config.go index 9afa646..aa6d47c 100644 --- a/config/config.go +++ b/config/config.go @@ -9,11 +9,28 @@ type Config struct { Services []Service } -// DefaultTimeout is the default timeout duration for new connections -// to proxy to the service. +// DefaultTimeout is the default duration to allow new connections +// to attempt to forward to the service. var DefaultTimeout = 5 * time.Minute // DefaultStopAfter is the default duration of inactivity after which -// a command will be stopped. -// The default is zero (the command will not be stopped). +// a command will be stopped (zero means the command will not be stopped). var DefaultStopAfter = time.Duration(0) + +// DefaultWaitAfterStart is the default duration to wait after starting a +// command before forwarding connections (zero means as soon as the port is open). +var DefaultWaitAfterStart = time.Duration(0) + +// ParseDuration is a wrapper around time.ParseDuration. +// It returns the provided default if the string is blank +// and panics if there is an error in parsing. +func ParseDuration(str string, defaultVal time.Duration) time.Duration { + if str == "" { + return defaultVal + } + duration, err := time.ParseDuration(str) + if err != nil { + panic(err) + } + return duration +} diff --git a/config/json_test.go b/config/json_test.go index 502f6d4..f5d0e83 100644 --- a/config/json_test.go +++ b/config/json_test.go @@ -21,14 +21,16 @@ func TestParseConfigFile(t *testing.T) { "Command": ["sleep", "1"], "StopAfter": "10m", "StopSignal": "INT", - "Timeout": "150ms" + "Timeout": "150ms", + "WaitAfterStart": "250ms" }, { "Proxy": {":3001": "localhost:4001"}, "Command": ["sleep", "2"], "StopAfter": "11m", "StopSignal": "TERM", - "Timeout": "151ms" + "Timeout": "151ms", + "WaitAfterStart": "251ms" } ] }`)) @@ -45,17 +47,14 @@ func TestParseConfigFile(t *testing.T) { } for i, svc := range cfg.Services { - if len(svc.Proxy) != 1 { - t.Errorf("unexpected proxy: %v", svc.Proxy) - } - if svc.Proxy[fmt.Sprintf(":%d", i+3000)] != fmt.Sprintf("localhost:%d", i+4000) { - t.Errorf("unexpected proxy: %v", svc.Proxy) + if len(svc.Proxy) != 1 || svc.Proxy[fmt.Sprintf(":%d", i+3000)] != fmt.Sprintf("localhost:%d", i+4000) { + t.Errorf("unexpected Proxy: %v", svc.Proxy) } if fmt.Sprintf("%s", svc.Command) != fmt.Sprintf("[sleep %d]", i+1) { - t.Errorf("unexpected command: %s", svc.Command) + t.Errorf("unexpected Command: %s", svc.Command) } if svc.Timeout != fmt.Sprintf("%dms", i+150) { - t.Errorf("unexpected timeout: %s", svc.Timeout) + t.Errorf("unexpected Timeout: %s", svc.Timeout) } if svc.StopAfter != fmt.Sprintf("%dm", i+10) { t.Errorf("unexpected StopAfter: %s", svc.StopAfter) @@ -63,6 +62,9 @@ func TestParseConfigFile(t *testing.T) { if svc.StopSignal != []string{"INT", "TERM"}[i] { t.Errorf("unexpected StopSignal: %s", svc.StopSignal) } + if svc.WaitAfterStart != fmt.Sprintf("%dms", i+250) { + t.Errorf("unexpected WaitAfterStart: %s", svc.WaitAfterStart) + } } } diff --git a/config/service.go b/config/service.go index afd1210..030221f 100644 --- a/config/service.go +++ b/config/service.go @@ -2,9 +2,10 @@ package config // Service holds string representations of Service attributes. type Service struct { - Proxy map[string]string - Command []string - Timeout string - StopAfter string - StopSignal string + Proxy map[string]string + Command []string + StopAfter string + StopSignal string + Timeout string + WaitAfterStart string } diff --git a/procman/manager.go b/procman/manager.go index 86a1d36..73cde3a 100644 --- a/procman/manager.go +++ b/procman/manager.go @@ -4,6 +4,7 @@ import ( "os" "os/exec" "syscall" + "time" "github.com/hashicorp/go-reap" "github.com/rwstauner/ynetd/config" @@ -12,7 +13,7 @@ import ( // ProcessManager manages launching and reaping of processes. type ProcessManager struct { procs map[int]*Process // pid -> proc - launcher chan *Process + launcher chan *launchRequest stopper chan *Process signals chan os.Signal } @@ -20,7 +21,7 @@ type ProcessManager struct { // New returns a new ProcessManager. func New() *ProcessManager { return &ProcessManager{ - launcher: make(chan *Process), + launcher: make(chan *launchRequest), stopper: make(chan *Process), procs: make(map[int]*Process), signals: make(chan os.Signal), @@ -34,9 +35,10 @@ func (m *ProcessManager) Process(cfg config.Service) *Process { return nil } return &Process{ - argv: cfg.Command, - manager: m, - stopSignal: getSignal(cfg.StopSignal, syscall.SIGINT), + argv: cfg.Command, + manager: m, + stopSignal: getSignal(cfg.StopSignal, syscall.SIGINT), + waitAfterStart: config.ParseDuration(cfg.WaitAfterStart, config.DefaultWaitAfterStart), } } @@ -95,11 +97,21 @@ func (m *ProcessManager) Manage() { } return - case proc := <-m.launcher: + case req := <-m.launcher: + proc := req.process if proc.cmd == nil { proc.cmd = m.launch(proc) + proc.started = time.Now() + if proc.waitAfterStart > 0 { + proc.waitUntil = proc.started.Add(proc.waitAfterStart) + } m.procs[proc.cmd.Process.Pid] = proc } + if proc.waitAfterStart > 0 && proc.waitUntil.After(time.Now()) { + time.AfterFunc(proc.waitUntil.Sub(time.Now()), func() { req.ready <- true }) + } else { + req.ready <- true + } case proc := <-m.stopper: if proc.cmd != nil { diff --git a/procman/process.go b/procman/process.go index 4122b69..403879c 100644 --- a/procman/process.go +++ b/procman/process.go @@ -4,21 +4,31 @@ import ( "fmt" "os" "os/exec" - "sync" + "time" ) // Process represents a command managed by a ProcessManager. type Process struct { - argv []string - cmd *exec.Cmd - manager *ProcessManager - mutex *sync.Mutex - stopSignal os.Signal + argv []string + cmd *exec.Cmd + manager *ProcessManager + started time.Time + stopSignal os.Signal + waitAfterStart time.Duration + waitUntil time.Time +} + +type launchRequest struct { + process *Process + ready chan bool } // LaunchOnce launches the process if it isn't already running. func (p *Process) LaunchOnce() { - p.manager.launcher <- p + ready := make(chan bool) + req := &launchRequest{process: p, ready: ready} + p.manager.launcher <- req + <-ready } // Stop sends the configured signal to the process. diff --git a/service/config.go b/service/config.go index 9a45131..c22bf56 100644 --- a/service/config.go +++ b/service/config.go @@ -1,23 +1,10 @@ package service import ( - "time" - "github.com/rwstauner/ynetd/config" "github.com/rwstauner/ynetd/procman" ) -func parseDuration(str string, defaultVal time.Duration) time.Duration { - if str == "" { - return defaultVal - } - duration, err := time.ParseDuration(str) - if err != nil { - panic(err) - } - return duration -} - // New returns the address to a new Service based on the provided Config. func New(c config.Service, pm *procman.ProcessManager) (svc *Service, err error) { defer func() { @@ -28,8 +15,8 @@ func New(c config.Service, pm *procman.ProcessManager) (svc *Service, err error) svc = &Service{ Proxy: c.Proxy, Command: pm.Process(c), - Timeout: parseDuration(c.Timeout, config.DefaultTimeout), - StopAfter: parseDuration(c.StopAfter, config.DefaultStopAfter), + Timeout: config.ParseDuration(c.Timeout, config.DefaultTimeout), + StopAfter: config.ParseDuration(c.StopAfter, config.DefaultStopAfter), } return } diff --git a/test/wait_after_start.bats b/test/wait_after_start.bats new file mode 100755 index 0000000..a3f38a9 --- /dev/null +++ b/test/wait_after_start.bats @@ -0,0 +1,48 @@ +#!/usr/bin/env bats + +load helpers + +# Verify that ytester waits properly. +@test "no waiting" { + YARGS=() + ytester -serve "wait$YTAG" -serve-after 2s + + ! running ytester + + start=`date +%s` + is "`ysend ready`" = "not yet" + end=`date +%s` + + is $((end - start)) -lt 2 +} + +@test "wait after start" { + YARGS=(-wait-after-start 2500ms) + ytester -loop -serve "wait$YTAG" -serve-after 2s + + ! running ytester + + # First connection waits. + start=`date +%s` + is "`ysend ready`" = "wait$YTAG" + end=`date +%s` + + is $((end - start)) -ge 2 + + # Second does not. + start=`date +%s` + is "`ysend ready`" = "wait$YTAG" + end=`date +%s` + + is $((end - start)) -lt 2 + + kill -s INT `ypidof ytester` + ! running ytester + + # Each new start should wait. + start=`date +%s` + is "`ysend ready`" = "wait$YTAG" + end=`date +%s` + + is $((end - start)) -ge 2 +} diff --git a/test/ytester.go b/test/ytester.go index 3eb7618..e951181 100644 --- a/test/ytester.go +++ b/test/ytester.go @@ -10,15 +10,16 @@ import ( ) var ( - after = 0 * time.Millisecond - before = 0 * time.Millisecond - knock = false - loop = false - port = "" - send = "" - serve = "" - timeout = 2 * time.Second - logger = log.New(os.Stderr, "ytester ", log.Ldate|log.Ltime|log.Lmicroseconds) + after = 0 * time.Millisecond + before = 0 * time.Millisecond + knock = false + loop = false + port = "" + send = "" + serve = "" + serveAfter = 0 * time.Second + timeout = 2 * time.Second + logger = log.New(os.Stderr, "ytester ", log.Ldate|log.Ltime|log.Lmicroseconds) ) func init() { @@ -29,6 +30,7 @@ func init() { flag.StringVar(&port, "port", port, "port") flag.StringVar(&send, "send", send, "send") flag.StringVar(&serve, "serve", serve, "serve") + flag.DurationVar(&serveAfter, "serve-after", serveAfter, "serve-after") flag.DurationVar(&timeout, "timeout", timeout, "timeout") } @@ -45,6 +47,10 @@ func listen(addr string) { defer ln.Close() flog("listening %s", port) + + msg := "not yet" + time.AfterFunc(serveAfter, func() { msg = serve }) + for { conn, err := ln.Accept() if err != nil { @@ -52,8 +58,8 @@ func listen(addr string) { continue } - flog("serving: %s", serve) - conn.Write([]byte(serve + "\n")) + flog("serving: %s", msg) + conn.Write([]byte(msg + "\n")) conn.Close() time.Sleep(after)