Skip to content

Commit

Permalink
mass refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
a0s committed Oct 1, 2020
1 parent a594f81 commit 654e678
Show file tree
Hide file tree
Showing 6 changed files with 428 additions and 339 deletions.
89 changes: 68 additions & 21 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,35 @@ package config

import (
"flag"
"fmt"
"github.com/octago/sflags/gen/gflag"
"log"
"os"
"path/filepath"
"strings"
"time"
dict "tinkoff-invest-dumper/dictionary"
)

var VersionString = "development"

var Conf Config
type Logger interface {
Fatalln(v ...interface{})
}

type Config struct {
Token string `flag:"token" desc:"your sandbox's token"`
Path string `flag:"path" desc:"path to storage dir"`

Token string `flag:"token" desc:"your sandbox's token"`
Path string `flag:"path" desc:"path to storage dir"`
TimeSuffixEnabled bool `flag:"time-suffix-enabled" desc:"add the time suffix to every filename on (re)start"`
TimeSuffixFormat string `flag:"time-suffix-format" desc:"go format of the time suffix (see https://golang.org/src/time/format.go)"`
TimeSuffixStartedAt time.Time `flag:"-"`
Orderbook string `flag:"orderbook" desc:"list of tickers to subscribe for orderbooks"`
OrderbookDepth int `flag:"orderbook-depth" desc:"depth of orderbook: from 1 to 20"`
Candle string `flag:"candle" desc:"list of tickers to subscribe for candles"`
CandleInterval string `flag:"candle-interval" desc:"interval of candles: 1min,2min,3min,5min,10min,15min,30min,hour,2hour,4hour,day,week,month"`
Version bool `flag:"version" desc:"show version info"`

Orderbook string `flag:"orderbook" desc:"list of tickers to subscribe for orderbooks"`
OrderbookDepth int `flag:"orderbook-depth" desc:"depth of orderbook: from 1 to 20"`

Candle string `flag:"candle" desc:"list of tickers to subscribe for candles"`
CandleInterval string `flag:"candle-interval" desc:"interval of candles: 1min,2min,3min,5min,10min,15min,30min,hour,2hour,4hour,day,week,month"`

Version bool `flag:"version" desc:"show version info"`
logger Logger
}

func NewConfig() *Config {
func NewConfig(l Logger) *Config {
config := &Config{
Path: ".",
TimeSuffixEnabled: false,
Expand All @@ -39,20 +39,67 @@ func NewConfig() *Config {
OrderbookDepth: 20,
CandleInterval: "1min",
Version: false,

logger: l,
}
err := gflag.ParseToDef(config)
if err != nil {
log.Fatalf("config: %v", err)
l.Fatalln("new config:", err)
}
flag.Parse()

return config
}

func init() {
Conf = *NewConfig()
if Conf.Version {
fmt.Printf("%s\n", VersionString)
os.Exit(0)
func (c *Config) BuildOrderbookPath(ticker dict.Ticker) string {
var arr []string

arr = append(arr, string(ticker))
if c.TimeSuffixEnabled {
startedAt := c.TimeSuffixStartedAt.Format(c.TimeSuffixFormat)
arr = append(arr, startedAt)
}
arr = append(arr, "obk")

path, err := filepath.Abs(filepath.Join(c.Path, strings.Join(arr, "-")))
if err != nil {
c.logger.Fatalln(err)
}
return path
}

func (c *Config) BuildCandlePath(ticker dict.Ticker) string {
var arr []string

arr = append(arr, string(ticker))
if c.TimeSuffixEnabled {
startedAt := c.TimeSuffixStartedAt.Format(c.TimeSuffixFormat)
arr = append(arr, startedAt)
}
arr = append(arr, "cdl")

path, err := filepath.Abs(filepath.Join(c.Path, strings.Join(arr, "-")))
if err != nil {
c.logger.Fatalln(err)
}
return path
}

func (c *Config) GetOrderbookTickers() []dict.Ticker {
return stringToTickerList(c.Orderbook)
}

func (c *Config) GetCandleTickers() []dict.Ticker {
return stringToTickerList(c.Candle)
}

func stringToTickerList(flag string) []dict.Ticker {
var tickers []dict.Ticker
flags := strings.Split(flag, ",")
for _, f := range flags {
if f != "" {
tickers = append(tickers, dict.Ticker(f))
}
}
return tickers
}
18 changes: 14 additions & 4 deletions dictionary/dict.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,22 @@ TICKERS:
return &Dictionary{figiInstrument: fs, tickerInstrument: ts}, nil
}

func (d *Dictionary) GetFIGIByTicker(t Ticker) Figi {
return Figi(d.tickerInstrument[t].FIGI)
func (d *Dictionary) GetFIGIByTicker(t Ticker) (Figi, error) {
ins, ok := d.tickerInstrument[t]
if !ok {
return "", errors.New(fmt.Sprint("ticker not found:", t))
}

return Figi(ins.FIGI), nil
}

func (d *Dictionary) GetTickerByFIGI(f Figi) Ticker {
return Ticker(d.figiInstrument[f].Ticker)
func (d *Dictionary) GetTickerByFIGI(f Figi) (Ticker, error) {
ins, ok := d.figiInstrument[f]
if !ok {
return "", errors.New(fmt.Sprint("figi not found:", f))
}

return Ticker(ins.Ticker), nil
}

type sortTicker []Ticker
Expand Down
197 changes: 197 additions & 0 deletions eventer/event_receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package eventer

import (
sdk "github.com/TinkoffCreditSystems/invest-openapi-go-sdk"
"math/rand"
"reflect"
"time"
dict "tinkoff-invest-dumper/dictionary"
)

type Logger interface {
Fatalln(v ...interface{})
}

type Dictionary interface {
GetFIGIByTicker(t dict.Ticker) (dict.Figi, error)
GetTickerByFIGI(figi dict.Figi) (dict.Ticker, error)
}

type OrderbookEvent struct {
Figi dict.Figi
Ticker dict.Ticker
LocalTime time.Time
Event sdk.OrderBookEvent
}

type CandleEvent struct {
Figi dict.Figi
Ticker dict.Ticker
LocalTime time.Time
Event sdk.CandleEvent
}

type EventReceiver struct {
streamingClient *sdk.StreamingClient
logger Logger
dictionary Dictionary

orderbooks map[dict.Ticker][]chan OrderbookEvent
candles map[dict.Ticker][]chan CandleEvent
}

func NewEventReceiver(lg Logger, sc *sdk.StreamingClient, dc Dictionary, ) *EventReceiver {
return &EventReceiver{
streamingClient: sc,
logger: lg,
dictionary: dc,
orderbooks: make(map[dict.Ticker][]chan OrderbookEvent),
candles: make(map[dict.Ticker][]chan CandleEvent),
}
}

func (l *EventReceiver) SubscribeToOrderbook(ticker dict.Ticker, depth int) chan OrderbookEvent {
_, ok := l.orderbooks[ticker]
if !ok {
figi, err := l.dictionary.GetFIGIByTicker(ticker)
if err != nil {
l.logger.Fatalln("new subscription to orderbook:", err)
}

err = l.streamingClient.SubscribeOrderbook(string(figi), depth, requestID())
if err != nil {
l.logger.Fatalln("new subscription to orderbook:", err)
}
l.orderbooks[ticker] = []chan OrderbookEvent{}
}

ch := make(chan OrderbookEvent)
l.orderbooks[ticker] = append(l.orderbooks[ticker], ch)
return ch
}

func (l *EventReceiver) SubscribeToCandle(ticker dict.Ticker, interval string) chan CandleEvent {
_, ok := l.candles[ticker]
if !ok {
figi, err := l.dictionary.GetFIGIByTicker(ticker)
if err != nil {
l.logger.Fatalln("new candle subscription:", err)
}

err = l.streamingClient.SubscribeCandle(string(figi), sdk.CandleInterval(interval), requestID())
if err != nil {
l.logger.Fatalln("new candle subscription:", err)
}
l.candles[ticker] = []chan CandleEvent{}
}

ch := make(chan CandleEvent)
l.candles[ticker] = append(l.candles[ticker], ch)
return ch
}

func (l *EventReceiver) WrapOrderbookEvent(e sdk.OrderBookEvent) *OrderbookEvent {
figi := dict.Figi(e.OrderBook.FIGI)

ticker, err := l.dictionary.GetTickerByFIGI(figi)
if err != nil {
l.logger.Fatalln("create orderbook event:", err)
}

return &OrderbookEvent{
Figi: figi,
Ticker: ticker,
LocalTime: time.Now(),
Event: e,
}
}

func (l *EventReceiver) WrapCandleEvent(e sdk.CandleEvent) *CandleEvent {
figi := dict.Figi(e.Candle.FIGI)

ticker, err := l.dictionary.GetTickerByFIGI(figi)
if err != nil {
l.logger.Fatalln("create candle event:", err)
}

return &CandleEvent{
Figi: figi,
Ticker: ticker,
LocalTime: time.Now(),
Event: e,
}
}

func (l *EventReceiver) Start() {
for {
err := l.streamingClient.RunReadLoop(func(event interface{}) error {
switch sdkEvent := event.(type) {
case sdk.OrderBookEvent:
ob := l.WrapOrderbookEvent(sdkEvent)
channels, ok := l.orderbooks[ob.Ticker]
if !ok {
l.logger.Fatalln("event receiver unknown channel:", ob.Ticker)
}
for _, ch := range channels {
ch <- *ob
}

case sdk.CandleEvent:
cd := l.WrapCandleEvent(sdkEvent)
channels, ok := l.candles[cd.Ticker]
if !ok {
l.logger.Fatalln("event receiver unknown channel:", cd.Ticker)
}
for _, ch := range channels {
ch <- *cd
}

default:
l.logger.Fatalln("event receiver unsupported event type:", reflect.TypeOf(event))
}

return nil
})
if err != nil {
l.logger.Fatalln("event lister:", err)
}
}
}

func requestID() string {
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

b := make([]rune, 12)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}

return string(b)
}

func init() {
rand.Seed(time.Now().UnixNano()) // for requestID
}
//
//func (s *mainScope) unsubscribeOrderbook(streamingClient *sdk.StreamingClient) {
// for _, ticker := range s.orderbookTickers {
// figi := s.dict.GetFIGIByTicker(ticker)
// err := streamingClient.UnsubscribeOrderbook(string(figi), config.Conf.OrderbookDepth, requestID())
// if err != nil {
// s.logger.Fatalln(err)
// }
// s.logger.Println("Unsubscribed from orderbook", ticker, figi)
// }
//}
//
//
//func (s *mainScope) unsubscribeCandles(streamingClient *sdk.StreamingClient) {
// for _, ticker := range s.candleTickers {
// figi := s.dict.GetFIGIByTicker(ticker)
// err := streamingClient.UnsubscribeCandle(string(figi), sdk.CandleInterval(config.Conf.CandleInterval), requestID())
// if err != nil {
// s.logger.Fatalln(err)
// }
// s.logger.Println("Unsubscribed from candles", ticker, figi)
// }
//}
Loading

0 comments on commit 654e678

Please sign in to comment.