From 403a495340aa69dc831de926bf3892b87755a127 Mon Sep 17 00:00:00 2001 From: fryd Date: Tue, 5 Jul 2022 15:28:36 +0200 Subject: [PATCH] feat: add coingecko (#37) --- go.mod | 1 + go.sum | 2 + pricing/coingecko.go | 146 +++++++++++++++++++++++++++++++++++++++++++ pricing/pricing.go | 10 +++ service/service.go | 2 + 5 files changed, 161 insertions(+) create mode 100644 pricing/coingecko.go diff --git a/go.mod b/go.mod index af6c7ba..4be0f6d 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/kr/pretty v0.1.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e // indirect + golang.org/x/time v0.0.0-20220609170525-579cf78fd858 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect gopkg.in/yaml.v2 v2.2.2 // indirect gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect diff --git a/go.sum b/go.sum index c30eb38..57347e2 100644 --- a/go.sum +++ b/go.sum @@ -36,6 +36,8 @@ golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e h1:CsOuNlbOuf0mzxJIefr6Q4uAUetRUwZE4qt7VfzP+xo= golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/time v0.0.0-20220609170525-579cf78fd858 h1:Dpdu/EMxGMFgq0CeYMh4fazTD2vtlZRYE7wyynxJb9U= +golang.org/x/time v0.0.0-20220609170525-579cf78fd858/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pricing/coingecko.go b/pricing/coingecko.go new file mode 100644 index 0000000..39baf4d --- /dev/null +++ b/pricing/coingecko.go @@ -0,0 +1,146 @@ +package pricing + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + "time" + + "code.vegaprotocol.io/priceproxy/config" + log "github.com/sirupsen/logrus" + "golang.org/x/time/rate" +) + +var ( + coingeckoExtraPairs = []config.PriceConfig{} + coingeckoSourceName = "coingecko" +) + +func coingeckoAddExtraPriceConfig(priceconfig config.PriceConfig) error { + coingeckoExtraPairs = append(coingeckoExtraPairs, priceconfig) + return nil +} + +type priceBoard interface { + UpdatePrice(pricecfg config.PriceConfig, newPrice PriceInfo) +} + +func coingeckoStartFetching( + board priceBoard, + sourcecfg config.SourceConfig, +) { + var ( + fetchURL = sourcecfg.URL.String() + oneRequestEvery = time.Duration(sourcecfg.SleepReal) * time.Second + rateLimiter = rate.NewLimiter(rate.Every(oneRequestEvery), 1) + ctx = context.Background() + err error + ) + log.WithFields(log.Fields{ + "sourceName": coingeckoSourceName, + "URL": fetchURL, + "rateLimitDuration": oneRequestEvery, + }).Infof("Starting Coingecko Fetching\n") + for { + if err = rateLimiter.Wait(ctx); err != nil { + log.WithFields(log.Fields{ + "error": err.Error(), + "sourceName": coingeckoSourceName, + "URL": fetchURL, + "rateLimitDuration": oneRequestEvery, + }).Errorln("Rate Limiter Failed. Falling back to Sleep.") + // fallback + time.Sleep(oneRequestEvery) + } + + prices, err := coingeckoSingleFetch(fetchURL) + if err != nil { + log.WithFields(log.Fields{ + "error": err.Error(), + "sourceName": coingeckoSourceName, + "URL": fetchURL, + "rateLimitDuration": oneRequestEvery, + }).Errorf("Retry in %d sec.\n", oneRequestEvery) + continue + } + + for base, data := range *prices { + board.UpdatePrice( + config.PriceConfig{ + Source: coingeckoSourceName, + Base: base, + Quote: "ETH", + Factor: 1.0, + Wander: true, + }, + PriceInfo{ + Price: data.ETH, + LastUpdatedReal: time.Unix(int64(data.LastUpdatedAt), 0), + LastUpdatedWander: time.Now().Round(0), + }, + ) + } + + for _, extraPair := range coingeckoExtraPairs { + base, ok := (*prices)[extraPair.Base] + if !ok { + log.WithFields(log.Fields{ + "sourceName": coingeckoSourceName, + "URL": fetchURL, + "rateLimitDuration": oneRequestEvery, + }).Errorf("Failed to get base %s for extra pair %v\n", extraPair.Base, extraPair) + continue + } + var price float64 + if strings.EqualFold(extraPair.Quote, "EUR") { + price = base.EUR + } else if strings.EqualFold(extraPair.Quote, "USD") { + price = base.USD + } else if strings.EqualFold(extraPair.Quote, "BTC") { + price = base.BTC + } else { + quote, ok := (*prices)[extraPair.Quote] + if !ok { + log.WithFields(log.Fields{ + "sourceName": coingeckoSourceName, + "URL": fetchURL, + "rateLimitDuration": oneRequestEvery, + }).Errorf("Failed to get quote %s for extra pair %v\n", extraPair.Source, extraPair) + continue + } + price = base.USD / quote.USD + } + board.UpdatePrice( + extraPair, + PriceInfo{ + Price: price, + LastUpdatedReal: time.Unix(int64(base.LastUpdatedAt), 0), + LastUpdatedWander: time.Now().Round(0), + }, + ) + } + } +} + +type coingeckoFetchData map[string]struct { + USD float64 `json:"usd"` + EUR float64 `json:"eur"` + BTC float64 `json:"btc"` + ETH float64 `json:"eth"` + LastUpdatedAt uint64 `json:"last_updated_at"` +} + +func coingeckoSingleFetch(url string) (*coingeckoFetchData, error) { + resp, err := http.Get(url) + if err != nil { + return nil, fmt.Errorf("failed to get coingecko data, %w", err) + } + defer resp.Body.Close() + var prices coingeckoFetchData + if err = json.NewDecoder(resp.Body).Decode(&prices); err != nil { + return nil, fmt.Errorf("failed to parse coingecko data, %w", err) + } + return &prices, nil +} diff --git a/pricing/pricing.go b/pricing/pricing.go index a97a093..94e1221 100644 --- a/pricing/pricing.go +++ b/pricing/pricing.go @@ -38,6 +38,8 @@ type Engine interface { GetPrice(pricecfg config.PriceConfig) (PriceInfo, error) GetPrices() map[config.PriceConfig]PriceInfo UpdatePrice(pricecfg config.PriceConfig, newPrice PriceInfo) + + StartFetching() } type engine struct { @@ -129,6 +131,8 @@ func (e *engine) AddPrice(pricecfg config.PriceConfig) error { go e.stream(pricecfg, source, nil, headers, getPriceCoinmarketcap) } else if strings.HasPrefix(source.Name, "ftx-") { go e.stream(pricecfg, source, nil, headers, getPriceFTX) + } else if source.Name == coingeckoSourceName { + coingeckoAddExtraPriceConfig(pricecfg) } else { return fmt.Errorf("no source for %s", source.Name) } @@ -185,6 +189,12 @@ func (e *engine) UpdatePrice(pricecfg config.PriceConfig, newPrice PriceInfo) { e.pricesMu.Unlock() } +func (e *engine) StartFetching() { + if sourcecfg, ok := e.sources[coingeckoSourceName]; ok { + go coingeckoStartFetching(e, sourcecfg) + } +} + func (e *engine) stream(pricecfg config.PriceConfig, sourcecfg config.SourceConfig, u *url.URL, headers map[string][]string, fetchPrice fetchPriceFunc) { if u == nil { p2 := config.PriceConfig{ diff --git a/service/service.go b/service/service.go index cd36f73..21f8d27 100644 --- a/service/service.go +++ b/service/service.go @@ -145,6 +145,8 @@ func (s *Service) initPricingEngine() error { }).Info("Added price") } + s.pe.StartFetching() + for _, pricecfg := range s.config.Prices { pi := s.pe.WaitForPrice(*pricecfg) log.WithFields(log.Fields{