From 9243819c97c3564a7745ba820b74966dfca4c0ff Mon Sep 17 00:00:00 2001 From: Will Winder Date: Tue, 22 Aug 2023 14:05:38 -0400 Subject: [PATCH] api: New API with health endpoint (#139) --- api/api.go | 57 ++++++++++ api/api_test.go | 103 ++++++++++++++++++ conduit/data/config.go | 6 + conduit/pipeline/pipeline.go | 23 ++++ pkg/cli/cli.go | 15 +++ .../internal/initialize/conduit.yml.example | 15 +-- 6 files changed, 208 insertions(+), 11 deletions(-) create mode 100644 api/api.go create mode 100644 api/api_test.go diff --git a/api/api.go b/api/api.go new file mode 100644 index 00000000..09c02591 --- /dev/null +++ b/api/api.go @@ -0,0 +1,57 @@ +package api + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + + log "github.com/sirupsen/logrus" + + "github.com/algorand/conduit/conduit/pipeline" +) + +// StatusProvider is a subset of the Pipeline interface required by the health handler. +type StatusProvider interface { + Status() (pipeline.Status, error) +} + +func makeHealthHandler(p StatusProvider) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + status, err := p.Status() + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, `{"error": "%s"}`, err) + return + } + w.WriteHeader(http.StatusOK) + data, _ := json.Marshal(status) + fmt.Fprint(w, string(data)) + } +} + +// StartServer starts an http server that exposes a health check endpoint. +// A callback is returned that can be used to gracefully shutdown the server. +func StartServer(logger *log.Logger, p StatusProvider, address string) (func(ctx context.Context), error) { + mux := http.NewServeMux() + mux.HandleFunc("/health", makeHealthHandler(p)) + + srv := &http.Server{ + Addr: address, + Handler: mux, + } + + go func() { + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + logger.Fatalf("failed to start API server: %s", err) + } + }() + + shutdownCallback := func(ctx context.Context) { + if err := srv.Shutdown(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) { + logger.Fatalf("failed to shutdown API server: %s", err) + } + } + return shutdownCallback, nil +} diff --git a/api/api_test.go b/api/api_test.go new file mode 100644 index 00000000..92cfc714 --- /dev/null +++ b/api/api_test.go @@ -0,0 +1,103 @@ +package api + +import ( + "context" + "encoding/json" + "fmt" + "net" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/require" + + "github.com/algorand/conduit/conduit/pipeline" +) + +type mockStatusProvider struct { + s pipeline.Status + err error +} + +func (m mockStatusProvider) Status() (pipeline.Status, error) { + if m.err != nil { + return pipeline.Status{}, m.err + } + return m.s, nil +} + +func TestStartServer_BadAddress(t *testing.T) { + l, h := test.NewNullLogger() + // capture the fatal log if any. + l.ExitFunc = func(int) {} + sp := mockStatusProvider{} + + shutdown, err := StartServer(l, sp, "bad address") + defer shutdown(context.Background()) + require.NoError(t, err) + time.Sleep(1 * time.Millisecond) + + require.Len(t, h.Entries, 1) + require.Equal(t, h.LastEntry().Level, logrus.FatalLevel) +} + +func TestStartServer_GracefulShutdown(t *testing.T) { + l, h := test.NewNullLogger() + // capture the fatal log if any. + l.ExitFunc = func(int) {} + sp := mockStatusProvider{} + shutdown, err := StartServer(l, sp, "bad address") + defer shutdown(context.Background()) + require.NoError(t, err) + require.Len(t, h.Entries, 0) +} + +func TestStartServer_HealthCheck(t *testing.T) { + l, _ := test.NewNullLogger() + // capture the fatal log if any. + l.ExitFunc = func(int) {} + sp := mockStatusProvider{ + s: pipeline.Status{ + Round: 999, + }, + } + + // Find an open port... + listener, err := net.Listen("tcp", ":0") + addr := listener.Addr().String() + listener.Close() + require.NoError(t, err) + + // Start server. + shutdown, err := StartServer(l, sp, addr) + defer shutdown(context.Background()) + require.NoError(t, err) + + // Make request. + resp, err := http.Get("http://" + addr + "/health") + require.NoError(t, err) + + // Make sure we got the right response. + require.Equal(t, http.StatusOK, resp.StatusCode) + var respStatus pipeline.Status + json.NewDecoder(resp.Body).Decode(&respStatus) + require.NoError(t, err) + require.Equal(t, sp.s, respStatus) +} + +func TestHealthHandlerError(t *testing.T) { + sp := mockStatusProvider{ + err: fmt.Errorf("some error"), + } + handler := makeHealthHandler(sp) + rec := httptest.NewRecorder() + handler(rec, nil) + + // validate response + resp := rec.Result() + require.Equal(t, http.StatusInternalServerError, resp.StatusCode) + require.Contains(t, rec.Body.String(), "some error") +} diff --git a/conduit/data/config.go b/conduit/data/config.go index e82b9158..9cd94a96 100644 --- a/conduit/data/config.go +++ b/conduit/data/config.go @@ -56,6 +56,11 @@ type Telemetry struct { Password string `yaml:"password"` } +// API defines parameters for the Conduit API server. +type API struct { + Address string `yaml:"addr"` +} + // Config stores configuration specific to the conduit pipeline type Config struct { // ConduitArgs are the program inputs. Should not be serialized for config. @@ -72,6 +77,7 @@ type Config struct { Processors []NameConfigPair `yaml:"processors"` Exporter NameConfigPair `yaml:"exporter"` Metrics Metrics `yaml:"metrics"` + API API `yaml:"api"` // RetryCount is the number of retries to perform for an error in the pipeline RetryCount uint64 `yaml:"retry-count"` // RetryDelay is a duration amount interpreted from a string diff --git a/conduit/pipeline/pipeline.go b/conduit/pipeline/pipeline.go index c492ae0c..61e95da6 100644 --- a/conduit/pipeline/pipeline.go +++ b/conduit/pipeline/pipeline.go @@ -38,6 +38,12 @@ type Pipeline interface { Stop() Error() error Wait() + Status() (Status, error) +} + +// Status is a struct that contains the current pipeline status. +type Status struct { + Round uint64 `json:"round"` } type pipelineImpl struct { @@ -58,6 +64,9 @@ type pipelineImpl struct { completeCallback []conduit.OnCompleteFunc pipelineMetadata state + + statusMu sync.Mutex + status Status } type pluginChannel chan data.BlockData @@ -382,6 +391,10 @@ func (p *pipelineImpl) Init() error { go p.startMetricsServer() } + p.statusMu.Lock() + defer p.statusMu.Unlock() + p.status.Round = p.pipelineMetadata.NextRound + return err } @@ -602,6 +615,9 @@ func (p *pipelineImpl) exporterHandler(exporter exporters.Exporter, blkChan plug // Increment Round, update metadata nextRound := lastRound + 1 p.pipelineMetadata.NextRound = nextRound + p.statusMu.Lock() + p.status.Round = nextRound + p.statusMu.Unlock() lastError = p.pipelineMetadata.encodeToFile(p.cfg.ConduitArgs.ConduitDataDir) if lastError != nil { lastError = fmt.Errorf("aborting after updating NextRound=%d BUT failing to save metadata: %w", nextRound, lastError) @@ -688,6 +704,13 @@ func (p *pipelineImpl) Wait() { p.wg.Wait() } +func (p *pipelineImpl) Status() (Status, error) { + p.statusMu.Lock() + ret := p.status + p.statusMu.Unlock() + return ret, nil +} + // start a http server serving /metrics func (p *pipelineImpl) startMetricsServer() { http.Handle("/metrics", promhttp.Handler()) diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 23cf4776..de9cb2f9 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -10,6 +10,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "github.com/algorand/conduit/api" "github.com/algorand/conduit/conduit/data" "github.com/algorand/conduit/conduit/loggers" "github.com/algorand/conduit/conduit/pipeline" @@ -101,6 +102,20 @@ func runConduitCmdWithConfig(args *data.Args) error { } pline.Start() defer pline.Stop() + + // Start server + if pCfg.API.Address != "" { + shutdown, err := api.StartServer(logger, pline, pCfg.API.Address) + if err != nil { + // Suppress log, it is about to be printed to stderr. + if pCfg.LogFile != "" { + logger.Error(err) + } + return fmt.Errorf("failed to start API server: %w", err) + } + defer shutdown(context.Background()) + } + pline.Wait() return pline.Error() } diff --git a/pkg/cli/internal/initialize/conduit.yml.example b/pkg/cli/internal/initialize/conduit.yml.example index 56709af6..ffe64324 100644 --- a/pkg/cli/internal/initialize/conduit.yml.example +++ b/pkg/cli/internal/initialize/conduit.yml.example @@ -17,6 +17,10 @@ retry-delay: "1s" # Whether or not to print the conduit banner on startup. hide-banner: false +# When the address is not empty information is available on '/health' +api: + addr: ":8981" + # When enabled prometheus metrics are available on '/metrics' metrics: mode: OFF @@ -33,14 +37,3 @@ processors: # An exporter is defined to do something with the data. exporter: %s - -# Enable telemetry for conduit -telemetry: - enabled: false - - # By default the following fields will be configured to send data to Algorand. - # To store your own telemetry events, they can be overridden. - # uri: "" - # index: "" - # username: "" - # password: ""