Skip to content

Commit

Permalink
api: New API with health endpoint (#139)
Browse files Browse the repository at this point in the history
  • Loading branch information
winder authored Aug 22, 2023
1 parent 0095fc9 commit 9243819
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 11 deletions.
57 changes: 57 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package api

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"

log "github.com/sirupsen/logrus"

"github.com/algorand/conduit/conduit/pipeline"
)

// StatusProvider is a subset of the Pipeline interface required by the health handler.
type StatusProvider interface {
Status() (pipeline.Status, error)
}

func makeHealthHandler(p StatusProvider) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
status, err := p.Status()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, `{"error": "%s"}`, err)
return
}
w.WriteHeader(http.StatusOK)
data, _ := json.Marshal(status)
fmt.Fprint(w, string(data))
}
}

// StartServer starts an http server that exposes a health check endpoint.
// A callback is returned that can be used to gracefully shutdown the server.
func StartServer(logger *log.Logger, p StatusProvider, address string) (func(ctx context.Context), error) {
mux := http.NewServeMux()
mux.HandleFunc("/health", makeHealthHandler(p))

srv := &http.Server{
Addr: address,
Handler: mux,
}

go func() {
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Fatalf("failed to start API server: %s", err)
}
}()

shutdownCallback := func(ctx context.Context) {
if err := srv.Shutdown(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Fatalf("failed to shutdown API server: %s", err)
}
}
return shutdownCallback, nil
}
103 changes: 103 additions & 0 deletions api/api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package api

import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/require"

"github.com/algorand/conduit/conduit/pipeline"
)

type mockStatusProvider struct {
s pipeline.Status
err error
}

func (m mockStatusProvider) Status() (pipeline.Status, error) {
if m.err != nil {
return pipeline.Status{}, m.err
}
return m.s, nil
}

func TestStartServer_BadAddress(t *testing.T) {
l, h := test.NewNullLogger()
// capture the fatal log if any.
l.ExitFunc = func(int) {}
sp := mockStatusProvider{}

shutdown, err := StartServer(l, sp, "bad address")
defer shutdown(context.Background())
require.NoError(t, err)
time.Sleep(1 * time.Millisecond)

require.Len(t, h.Entries, 1)
require.Equal(t, h.LastEntry().Level, logrus.FatalLevel)
}

func TestStartServer_GracefulShutdown(t *testing.T) {
l, h := test.NewNullLogger()
// capture the fatal log if any.
l.ExitFunc = func(int) {}
sp := mockStatusProvider{}
shutdown, err := StartServer(l, sp, "bad address")
defer shutdown(context.Background())
require.NoError(t, err)
require.Len(t, h.Entries, 0)
}

func TestStartServer_HealthCheck(t *testing.T) {
l, _ := test.NewNullLogger()
// capture the fatal log if any.
l.ExitFunc = func(int) {}
sp := mockStatusProvider{
s: pipeline.Status{
Round: 999,
},
}

// Find an open port...
listener, err := net.Listen("tcp", ":0")
addr := listener.Addr().String()
listener.Close()
require.NoError(t, err)

// Start server.
shutdown, err := StartServer(l, sp, addr)
defer shutdown(context.Background())
require.NoError(t, err)

// Make request.
resp, err := http.Get("http://" + addr + "/health")
require.NoError(t, err)

// Make sure we got the right response.
require.Equal(t, http.StatusOK, resp.StatusCode)
var respStatus pipeline.Status
json.NewDecoder(resp.Body).Decode(&respStatus)
require.NoError(t, err)
require.Equal(t, sp.s, respStatus)
}

func TestHealthHandlerError(t *testing.T) {
sp := mockStatusProvider{
err: fmt.Errorf("some error"),
}
handler := makeHealthHandler(sp)
rec := httptest.NewRecorder()
handler(rec, nil)

// validate response
resp := rec.Result()
require.Equal(t, http.StatusInternalServerError, resp.StatusCode)
require.Contains(t, rec.Body.String(), "some error")
}
6 changes: 6 additions & 0 deletions conduit/data/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ type Telemetry struct {
Password string `yaml:"password"`
}

// API defines parameters for the Conduit API server.
type API struct {
Address string `yaml:"addr"`
}

// Config stores configuration specific to the conduit pipeline
type Config struct {
// ConduitArgs are the program inputs. Should not be serialized for config.
Expand All @@ -72,6 +77,7 @@ type Config struct {
Processors []NameConfigPair `yaml:"processors"`
Exporter NameConfigPair `yaml:"exporter"`
Metrics Metrics `yaml:"metrics"`
API API `yaml:"api"`
// RetryCount is the number of retries to perform for an error in the pipeline
RetryCount uint64 `yaml:"retry-count"`
// RetryDelay is a duration amount interpreted from a string
Expand Down
23 changes: 23 additions & 0 deletions conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ type Pipeline interface {
Stop()
Error() error
Wait()
Status() (Status, error)
}

// Status is a struct that contains the current pipeline status.
type Status struct {
Round uint64 `json:"round"`
}

type pipelineImpl struct {
Expand All @@ -58,6 +64,9 @@ type pipelineImpl struct {
completeCallback []conduit.OnCompleteFunc

pipelineMetadata state

statusMu sync.Mutex
status Status
}

type pluginChannel chan data.BlockData
Expand Down Expand Up @@ -382,6 +391,10 @@ func (p *pipelineImpl) Init() error {
go p.startMetricsServer()
}

p.statusMu.Lock()
defer p.statusMu.Unlock()
p.status.Round = p.pipelineMetadata.NextRound

return err
}

Expand Down Expand Up @@ -602,6 +615,9 @@ func (p *pipelineImpl) exporterHandler(exporter exporters.Exporter, blkChan plug
// Increment Round, update metadata
nextRound := lastRound + 1
p.pipelineMetadata.NextRound = nextRound
p.statusMu.Lock()
p.status.Round = nextRound
p.statusMu.Unlock()
lastError = p.pipelineMetadata.encodeToFile(p.cfg.ConduitArgs.ConduitDataDir)
if lastError != nil {
lastError = fmt.Errorf("aborting after updating NextRound=%d BUT failing to save metadata: %w", nextRound, lastError)
Expand Down Expand Up @@ -688,6 +704,13 @@ func (p *pipelineImpl) Wait() {
p.wg.Wait()
}

func (p *pipelineImpl) Status() (Status, error) {
p.statusMu.Lock()
ret := p.status
p.statusMu.Unlock()
return ret, nil
}

// start a http server serving /metrics
func (p *pipelineImpl) startMetricsServer() {
http.Handle("/metrics", promhttp.Handler())
Expand Down
15 changes: 15 additions & 0 deletions pkg/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"

"github.com/algorand/conduit/api"
"github.com/algorand/conduit/conduit/data"
"github.com/algorand/conduit/conduit/loggers"
"github.com/algorand/conduit/conduit/pipeline"
Expand Down Expand Up @@ -101,6 +102,20 @@ func runConduitCmdWithConfig(args *data.Args) error {
}
pline.Start()
defer pline.Stop()

// Start server
if pCfg.API.Address != "" {
shutdown, err := api.StartServer(logger, pline, pCfg.API.Address)
if err != nil {
// Suppress log, it is about to be printed to stderr.
if pCfg.LogFile != "" {
logger.Error(err)
}
return fmt.Errorf("failed to start API server: %w", err)
}
defer shutdown(context.Background())
}

pline.Wait()
return pline.Error()
}
Expand Down
15 changes: 4 additions & 11 deletions pkg/cli/internal/initialize/conduit.yml.example
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ retry-delay: "1s"
# Whether or not to print the conduit banner on startup.
hide-banner: false

# When the address is not empty information is available on '/health'
api:
addr: ":8981"

# When enabled prometheus metrics are available on '/metrics'
metrics:
mode: OFF
Expand All @@ -33,14 +37,3 @@ processors:
# An exporter is defined to do something with the data.
exporter:
%s

# Enable telemetry for conduit
telemetry:
enabled: false

# By default the following fields will be configured to send data to Algorand.
# To store your own telemetry events, they can be overridden.
# uri: ""
# index: ""
# username: ""
# password: ""

0 comments on commit 9243819

Please sign in to comment.