From 654e678b5809b4cefbec6087597d92300ff40266 Mon Sep 17 00:00:00 2001 From: Anton Osenenko <418868+a0s@users.noreply.github.com> Date: Thu, 1 Oct 2020 12:58:37 +0300 Subject: [PATCH] mass refactoring --- config/config.go | 89 ++++++++--- dictionary/dict.go | 18 ++- eventer/event_receiver.go | 197 +++++++++++++++++++++++++ main.go | 57 ++++++-- main_scope.go | 300 -------------------------------------- writer/writer.go | 106 ++++++++++++++ 6 files changed, 428 insertions(+), 339 deletions(-) create mode 100644 eventer/event_receiver.go delete mode 100644 main_scope.go create mode 100644 writer/writer.go diff --git a/config/config.go b/config/config.go index ffa5253..b985f47 100644 --- a/config/config.go +++ b/config/config.go @@ -2,35 +2,35 @@ package config import ( "flag" - "fmt" "github.com/octago/sflags/gen/gflag" - "log" - "os" + "path/filepath" + "strings" "time" + dict "tinkoff-invest-dumper/dictionary" ) var VersionString = "development" -var Conf Config +type Logger interface { + Fatalln(v ...interface{}) +} type Config struct { - Token string `flag:"token" desc:"your sandbox's token"` - Path string `flag:"path" desc:"path to storage dir"` - + Token string `flag:"token" desc:"your sandbox's token"` + Path string `flag:"path" desc:"path to storage dir"` TimeSuffixEnabled bool `flag:"time-suffix-enabled" desc:"add the time suffix to every filename on (re)start"` TimeSuffixFormat string `flag:"time-suffix-format" desc:"go format of the time suffix (see https://golang.org/src/time/format.go)"` TimeSuffixStartedAt time.Time `flag:"-"` + Orderbook string `flag:"orderbook" desc:"list of tickers to subscribe for orderbooks"` + OrderbookDepth int `flag:"orderbook-depth" desc:"depth of orderbook: from 1 to 20"` + Candle string `flag:"candle" desc:"list of tickers to subscribe for candles"` + CandleInterval string `flag:"candle-interval" desc:"interval of candles: 1min,2min,3min,5min,10min,15min,30min,hour,2hour,4hour,day,week,month"` + Version bool `flag:"version" desc:"show version info"` - Orderbook string `flag:"orderbook" desc:"list of tickers to subscribe for orderbooks"` - OrderbookDepth int `flag:"orderbook-depth" desc:"depth of orderbook: from 1 to 20"` - - Candle string `flag:"candle" desc:"list of tickers to subscribe for candles"` - CandleInterval string `flag:"candle-interval" desc:"interval of candles: 1min,2min,3min,5min,10min,15min,30min,hour,2hour,4hour,day,week,month"` - - Version bool `flag:"version" desc:"show version info"` + logger Logger } -func NewConfig() *Config { +func NewConfig(l Logger) *Config { config := &Config{ Path: ".", TimeSuffixEnabled: false, @@ -39,20 +39,67 @@ func NewConfig() *Config { OrderbookDepth: 20, CandleInterval: "1min", Version: false, + + logger: l, } err := gflag.ParseToDef(config) if err != nil { - log.Fatalf("config: %v", err) + l.Fatalln("new config:", err) } flag.Parse() return config } -func init() { - Conf = *NewConfig() - if Conf.Version { - fmt.Printf("%s\n", VersionString) - os.Exit(0) +func (c *Config) BuildOrderbookPath(ticker dict.Ticker) string { + var arr []string + + arr = append(arr, string(ticker)) + if c.TimeSuffixEnabled { + startedAt := c.TimeSuffixStartedAt.Format(c.TimeSuffixFormat) + arr = append(arr, startedAt) + } + arr = append(arr, "obk") + + path, err := filepath.Abs(filepath.Join(c.Path, strings.Join(arr, "-"))) + if err != nil { + c.logger.Fatalln(err) + } + return path +} + +func (c *Config) BuildCandlePath(ticker dict.Ticker) string { + var arr []string + + arr = append(arr, string(ticker)) + if c.TimeSuffixEnabled { + startedAt := c.TimeSuffixStartedAt.Format(c.TimeSuffixFormat) + arr = append(arr, startedAt) + } + arr = append(arr, "cdl") + + path, err := filepath.Abs(filepath.Join(c.Path, strings.Join(arr, "-"))) + if err != nil { + c.logger.Fatalln(err) + } + return path +} + +func (c *Config) GetOrderbookTickers() []dict.Ticker { + return stringToTickerList(c.Orderbook) +} + +func (c *Config) GetCandleTickers() []dict.Ticker { + return stringToTickerList(c.Candle) +} + +func stringToTickerList(flag string) []dict.Ticker { + var tickers []dict.Ticker + flags := strings.Split(flag, ",") + for _, f := range flags { + if f != "" { + tickers = append(tickers, dict.Ticker(f)) + } } + return tickers } diff --git a/dictionary/dict.go b/dictionary/dict.go index a166935..c46680a 100644 --- a/dictionary/dict.go +++ b/dictionary/dict.go @@ -51,12 +51,22 @@ TICKERS: return &Dictionary{figiInstrument: fs, tickerInstrument: ts}, nil } -func (d *Dictionary) GetFIGIByTicker(t Ticker) Figi { - return Figi(d.tickerInstrument[t].FIGI) +func (d *Dictionary) GetFIGIByTicker(t Ticker) (Figi, error) { + ins, ok := d.tickerInstrument[t] + if !ok { + return "", errors.New(fmt.Sprint("ticker not found:", t)) + } + + return Figi(ins.FIGI), nil } -func (d *Dictionary) GetTickerByFIGI(f Figi) Ticker { - return Ticker(d.figiInstrument[f].Ticker) +func (d *Dictionary) GetTickerByFIGI(f Figi) (Ticker, error) { + ins, ok := d.figiInstrument[f] + if !ok { + return "", errors.New(fmt.Sprint("figi not found:", f)) + } + + return Ticker(ins.Ticker), nil } type sortTicker []Ticker diff --git a/eventer/event_receiver.go b/eventer/event_receiver.go new file mode 100644 index 0000000..a8dc74b --- /dev/null +++ b/eventer/event_receiver.go @@ -0,0 +1,197 @@ +package eventer + +import ( + sdk "github.com/TinkoffCreditSystems/invest-openapi-go-sdk" + "math/rand" + "reflect" + "time" + dict "tinkoff-invest-dumper/dictionary" +) + +type Logger interface { + Fatalln(v ...interface{}) +} + +type Dictionary interface { + GetFIGIByTicker(t dict.Ticker) (dict.Figi, error) + GetTickerByFIGI(figi dict.Figi) (dict.Ticker, error) +} + +type OrderbookEvent struct { + Figi dict.Figi + Ticker dict.Ticker + LocalTime time.Time + Event sdk.OrderBookEvent +} + +type CandleEvent struct { + Figi dict.Figi + Ticker dict.Ticker + LocalTime time.Time + Event sdk.CandleEvent +} + +type EventReceiver struct { + streamingClient *sdk.StreamingClient + logger Logger + dictionary Dictionary + + orderbooks map[dict.Ticker][]chan OrderbookEvent + candles map[dict.Ticker][]chan CandleEvent +} + +func NewEventReceiver(lg Logger, sc *sdk.StreamingClient, dc Dictionary, ) *EventReceiver { + return &EventReceiver{ + streamingClient: sc, + logger: lg, + dictionary: dc, + orderbooks: make(map[dict.Ticker][]chan OrderbookEvent), + candles: make(map[dict.Ticker][]chan CandleEvent), + } +} + +func (l *EventReceiver) SubscribeToOrderbook(ticker dict.Ticker, depth int) chan OrderbookEvent { + _, ok := l.orderbooks[ticker] + if !ok { + figi, err := l.dictionary.GetFIGIByTicker(ticker) + if err != nil { + l.logger.Fatalln("new subscription to orderbook:", err) + } + + err = l.streamingClient.SubscribeOrderbook(string(figi), depth, requestID()) + if err != nil { + l.logger.Fatalln("new subscription to orderbook:", err) + } + l.orderbooks[ticker] = []chan OrderbookEvent{} + } + + ch := make(chan OrderbookEvent) + l.orderbooks[ticker] = append(l.orderbooks[ticker], ch) + return ch +} + +func (l *EventReceiver) SubscribeToCandle(ticker dict.Ticker, interval string) chan CandleEvent { + _, ok := l.candles[ticker] + if !ok { + figi, err := l.dictionary.GetFIGIByTicker(ticker) + if err != nil { + l.logger.Fatalln("new candle subscription:", err) + } + + err = l.streamingClient.SubscribeCandle(string(figi), sdk.CandleInterval(interval), requestID()) + if err != nil { + l.logger.Fatalln("new candle subscription:", err) + } + l.candles[ticker] = []chan CandleEvent{} + } + + ch := make(chan CandleEvent) + l.candles[ticker] = append(l.candles[ticker], ch) + return ch +} + +func (l *EventReceiver) WrapOrderbookEvent(e sdk.OrderBookEvent) *OrderbookEvent { + figi := dict.Figi(e.OrderBook.FIGI) + + ticker, err := l.dictionary.GetTickerByFIGI(figi) + if err != nil { + l.logger.Fatalln("create orderbook event:", err) + } + + return &OrderbookEvent{ + Figi: figi, + Ticker: ticker, + LocalTime: time.Now(), + Event: e, + } +} + +func (l *EventReceiver) WrapCandleEvent(e sdk.CandleEvent) *CandleEvent { + figi := dict.Figi(e.Candle.FIGI) + + ticker, err := l.dictionary.GetTickerByFIGI(figi) + if err != nil { + l.logger.Fatalln("create candle event:", err) + } + + return &CandleEvent{ + Figi: figi, + Ticker: ticker, + LocalTime: time.Now(), + Event: e, + } +} + +func (l *EventReceiver) Start() { + for { + err := l.streamingClient.RunReadLoop(func(event interface{}) error { + switch sdkEvent := event.(type) { + case sdk.OrderBookEvent: + ob := l.WrapOrderbookEvent(sdkEvent) + channels, ok := l.orderbooks[ob.Ticker] + if !ok { + l.logger.Fatalln("event receiver unknown channel:", ob.Ticker) + } + for _, ch := range channels { + ch <- *ob + } + + case sdk.CandleEvent: + cd := l.WrapCandleEvent(sdkEvent) + channels, ok := l.candles[cd.Ticker] + if !ok { + l.logger.Fatalln("event receiver unknown channel:", cd.Ticker) + } + for _, ch := range channels { + ch <- *cd + } + + default: + l.logger.Fatalln("event receiver unsupported event type:", reflect.TypeOf(event)) + } + + return nil + }) + if err != nil { + l.logger.Fatalln("event lister:", err) + } + } +} + +func requestID() string { + var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + + b := make([]rune, 12) + for i := range b { + b[i] = letterRunes[rand.Intn(len(letterRunes))] + } + + return string(b) +} + +func init() { + rand.Seed(time.Now().UnixNano()) // for requestID +} +// +//func (s *mainScope) unsubscribeOrderbook(streamingClient *sdk.StreamingClient) { +// for _, ticker := range s.orderbookTickers { +// figi := s.dict.GetFIGIByTicker(ticker) +// err := streamingClient.UnsubscribeOrderbook(string(figi), config.Conf.OrderbookDepth, requestID()) +// if err != nil { +// s.logger.Fatalln(err) +// } +// s.logger.Println("Unsubscribed from orderbook", ticker, figi) +// } +//} +// +// +//func (s *mainScope) unsubscribeCandles(streamingClient *sdk.StreamingClient) { +// for _, ticker := range s.candleTickers { +// figi := s.dict.GetFIGIByTicker(ticker) +// err := streamingClient.UnsubscribeCandle(string(figi), sdk.CandleInterval(config.Conf.CandleInterval), requestID()) +// if err != nil { +// s.logger.Fatalln(err) +// } +// s.logger.Println("Unsubscribed from candles", ticker, figi) +// } +//} diff --git a/main.go b/main.go index 332286d..0f3d823 100644 --- a/main.go +++ b/main.go @@ -1,40 +1,69 @@ package main import ( + "fmt" sdk "github.com/TinkoffCreditSystems/invest-openapi-go-sdk" "log" "os" - "tinkoff-invest-dumper/config" + conf "tinkoff-invest-dumper/config" + dict "tinkoff-invest-dumper/dictionary" + "tinkoff-invest-dumper/eventer" + "tinkoff-invest-dumper/writer" ) func main() { logger := log.New(os.Stdout, "", log.LstdFlags) - sandboxRestClient := sdk.NewSandboxRestClient(config.Conf.Token) - streamingClient, err := sdk.NewStreamingClient(logger, config.Conf.Token) + config := conf.NewConfig(logger) + if config.Version { + fmt.Printf("%s\n", conf.VersionString) + os.Exit(0) + } + + sandboxRestClient := sdk.NewSandboxRestClient(config.Token) + + streamingClient, err := sdk.NewStreamingClient(logger, config.Token) if err != nil { - logger.Fatalln("create streaming client:", err) + logger.Fatalln("streaming client:", err) } defer func() { err := streamingClient.Close() if err != nil { - logger.Fatalln("close streaming client:", err) + logger.Fatalln("streaming client:", err) } }() - scope, err := NewMainScope(sandboxRestClient, parseTickersList(config.Conf.Orderbook), parseTickersList(config.Conf.Candle), logger) + dictionary, err := dict.NewDictionary(sandboxRestClient, dict.MergeTickers(config.GetOrderbookTickers(), config.GetCandleTickers())) if err != nil { - logger.Fatalln(err) + logger.Fatalln("dictionary:", err) } - scope.initChannels() - scope.initDiskWriters() - go scope.eventReceiver(streamingClient) + receiver := eventer.NewEventReceiver(logger, streamingClient, dictionary) + writer := writer.NewWriter(logger, dictionary) + + for _, ticker := range config.GetOrderbookTickers() { + channel := receiver.SubscribeToOrderbook(ticker, config.OrderbookDepth) + figi, err := dictionary.GetFIGIByTicker(ticker) + if err != nil { + logger.Fatalln("subscribe ticker:", err) + } + logger.Println("Subscribed to orderbook", ticker, figi) + path := config.BuildOrderbookPath(ticker) + go writer.OrderbookWriter(channel, path) + } + + for _, ticker := range config.GetCandleTickers() { + channel := receiver.SubscribeToCandle(ticker, config.CandleInterval) + figi, err := dictionary.GetFIGIByTicker(ticker) + if err != nil { + logger.Fatalln("subscribe candle:", err) + } + logger.Println("Subscribed to candles", ticker, figi) + path := config.BuildCandlePath(ticker) + go writer.CandleWriter(channel, path) + } - scope.subscribeOrderbook(streamingClient) - scope.subscribeCandles(streamingClient) - defer scope.unsubscribeOrderbook(streamingClient) - defer scope.unsubscribeCandles(streamingClient) + go receiver.Start() select {} // sleep(0), epta } diff --git a/main_scope.go b/main_scope.go deleted file mode 100644 index 13ddcf4..0000000 --- a/main_scope.go +++ /dev/null @@ -1,300 +0,0 @@ -package main - -import ( - "encoding/json" - "fmt" - sdk "github.com/TinkoffCreditSystems/invest-openapi-go-sdk" - "log" - "math/rand" - "os" - "path/filepath" - "strings" - "time" - "tinkoff-invest-dumper/config" - dict "tinkoff-invest-dumper/dictionary" -) - -type wrappedEvent struct { - time time.Time - ticker dict.Ticker - event interface{} -} -type eventChannel chan *wrappedEvent - -type mainScope struct { - orderbookTickers []dict.Ticker - candleTickers []dict.Ticker - - orderbookFigiChannels map[dict.Figi]eventChannel - candlesFigiChannels map[dict.Figi]eventChannel - - dict *dict.Dictionary - logger *log.Logger -} - -func NewMainScope(restClient *sdk.SandboxRestClient, orderbookTickers []dict.Ticker, candleTickers []dict.Ticker, logger *log.Logger) (*mainScope, error) { - dictionary, err := dict.NewDictionary(restClient, dict.MergeTickers(orderbookTickers, candleTickers)) - if err != nil { - return nil, err - } - - scope := &mainScope{ - orderbookTickers: orderbookTickers, - candleTickers: candleTickers, - - orderbookFigiChannels: map[dict.Figi]eventChannel{}, - candlesFigiChannels: map[dict.Figi]eventChannel{}, - - dict: dictionary, - logger: logger, - } - - return scope, nil -} - -func (s *mainScope) initChannels() { - for _, ticker := range dict.MergeTickers(s.orderbookTickers, s.candleTickers) { - figi := s.dict.GetFIGIByTicker(ticker) - - if _, ok := findTicker(s.orderbookTickers, ticker); ok { - s.orderbookFigiChannels[figi] = make(eventChannel) - } - - if _, ok := findTicker(s.candleTickers, ticker); ok { - s.candlesFigiChannels[figi] = make(eventChannel) - } - } -} - -func (s *mainScope) eventReceiver(streamingClient *sdk.StreamingClient) { - err := streamingClient.RunReadLoop(func(event interface{}) error { - var f dict.Figi - - switch realEvent := event.(type) { - case sdk.OrderBookEvent: - f = dict.Figi(realEvent.OrderBook.FIGI) - case sdk.CandleEvent: - f = dict.Figi(realEvent.Candle.FIGI) - default: - s.logger.Fatalln("unsupported event type", event) - } - - t := s.dict.GetTickerByFIGI(f) - - ce := wrappedEvent{ - time: time.Now(), - ticker: t, - event: event, - } - - switch event.(type) { - case sdk.OrderBookEvent: - s.orderbookFigiChannels[f] <- &ce - case sdk.CandleEvent: - s.candlesFigiChannels[f] <- &ce - } - - return nil - }) - if err != nil { - s.logger.Fatalln(err) - } -} - -func (s *mainScope) subscribeOrderbook(streamingClient *sdk.StreamingClient) { - for _, ticker := range s.orderbookTickers { - figi := s.dict.GetFIGIByTicker(ticker) - err := streamingClient.SubscribeOrderbook(string(figi), config.Conf.OrderbookDepth, requestID()) - if err != nil { - s.logger.Fatalln(err) - } - s.logger.Println("Subscribed to orderbook", ticker, figi) - } -} - -func (s *mainScope) unsubscribeOrderbook(streamingClient *sdk.StreamingClient) { - for _, ticker := range s.orderbookTickers { - figi := s.dict.GetFIGIByTicker(ticker) - err := streamingClient.UnsubscribeOrderbook(string(figi), config.Conf.OrderbookDepth, requestID()) - if err != nil { - s.logger.Fatalln(err) - } - s.logger.Println("Unsubscribed from orderbook", ticker, figi) - } -} - -func (s *mainScope) subscribeCandles(streamingClient *sdk.StreamingClient) { - for _, ticker := range s.candleTickers { - figi := s.dict.GetFIGIByTicker(ticker) - err := streamingClient.SubscribeCandle(string(figi), sdk.CandleInterval(config.Conf.CandleInterval), requestID()) - if err != nil { - s.logger.Fatalln(err) - } - s.logger.Println("Subscribed to candles", ticker, figi) - } -} - -func (s *mainScope) unsubscribeCandles(streamingClient *sdk.StreamingClient) { - for _, ticker := range s.candleTickers { - figi := s.dict.GetFIGIByTicker(ticker) - err := streamingClient.UnsubscribeCandle(string(figi), sdk.CandleInterval(config.Conf.CandleInterval), requestID()) - if err != nil { - s.logger.Fatalln(err) - } - s.logger.Println("Unsubscribed from candles", ticker, figi) - } -} - -func (s *mainScope) orderbookWriter(ch eventChannel, filePath string) { - file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - s.logger.Fatalln(err) - } - - defer func() { - if err := file.Close(); err != nil { - s.logger.Fatalln(err) - } - }() - - for wrappedEvent := range ch { - event := wrappedEvent.event.(sdk.OrderBookEvent) - row := map[string]interface{}{ - "ticker": wrappedEvent.ticker, - "figi": dict.Figi(event.OrderBook.FIGI), - - "t": event.Time, - "lt": wrappedEvent.time.Format(time.RFC3339Nano), - "b": event.OrderBook.Bids, - "a": event.OrderBook.Asks, - } - - jsonBytes, err := json.Marshal(row) - if err != nil { - s.logger.Fatalln(err) - } - - _, err = file.WriteString(fmt.Sprintf("%v\n", string(jsonBytes))) - if err != nil { - s.logger.Fatalln(err) - } - } -} - -func (s *mainScope) candleWriter(ch eventChannel, filePath string) { - file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - s.logger.Fatalln(err) - } - - defer func() { - if err := file.Close(); err != nil { - s.logger.Fatalln(err) - } - }() - - for wrappedEvent := range ch { - event := wrappedEvent.event.(sdk.CandleEvent) - row := map[string]interface{}{ - "ticker": wrappedEvent.ticker, - "figi": dict.Figi(event.Candle.FIGI), - - "t": event.Time, - "lt": wrappedEvent.time.Format(time.RFC3339Nano), - "o": event.Candle.OpenPrice, - "c": event.Candle.ClosePrice, - "h": event.Candle.HighPrice, - "l": event.Candle.LowPrice, - "v": event.Candle.Volume, - "ts": event.Candle.TS, - "i": event.Candle.Interval, - } - - jsonBytes, err := json.Marshal(row) - if err != nil { - s.logger.Fatalln(err) - } - - _, err = file.WriteString(fmt.Sprintf("%v\n", string(jsonBytes))) - if err != nil { - s.logger.Fatalln(err) - } - } -} - -func (s *mainScope) buildFileName(ticker dict.Ticker) (orderbookName, candleName string) { - var orderbook []string - var candle []string - - orderbook = append(orderbook, string(ticker)) - candle = append(candle, string(ticker)) - - if config.Conf.TimeSuffixEnabled { - startedAt := config.Conf.TimeSuffixStartedAt.Format(config.Conf.TimeSuffixFormat) - orderbook = append(orderbook, startedAt) - candle = append(candle, startedAt) - } - - orderbook = append(orderbook, "obk") - candle = append(candle, "cdl") - - var err error - orderbookName, err = filepath.Abs(filepath.Join(config.Conf.Path, strings.Join(orderbook, "-"))) - if err != nil { - s.logger.Fatalln(err) - } - candleName, err = filepath.Abs(filepath.Join(config.Conf.Path, strings.Join(candle, "-"))) - if err != nil { - s.logger.Fatalln(err) - } - return -} - -func (s *mainScope) initDiskWriters() { - for _, ticker := range dict.MergeTickers(s.orderbookTickers, s.candleTickers) { - figi := s.dict.GetFIGIByTicker(ticker) - - orderbookFilePath, candleFilePath := s.buildFileName(ticker) - - if _, ok := findTicker(s.orderbookTickers, ticker); ok { - ch := s.orderbookFigiChannels[figi] - go s.orderbookWriter(ch, orderbookFilePath) - } - - if _, ok := findTicker(s.candleTickers, ticker); ok { - ch := s.candlesFigiChannels[figi] - go s.candleWriter(ch, candleFilePath) - } - } -} - -func parseTickersList(flag string) []dict.Ticker { - var tickers []dict.Ticker - flags := strings.Split(flag, ",") - for _, f := range flags { - if f != "" { - tickers = append(tickers, dict.Ticker(f)) - } - } - return tickers -} - -func requestID() string { - var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") - - b := make([]rune, 12) - for i := range b { - b[i] = letterRunes[rand.Intn(len(letterRunes))] - } - - return string(b) -} - -func findTicker(slice []dict.Ticker, val dict.Ticker) (int, bool) { - for i, item := range slice { - if item == val { - return i, true - } - } - return -1, false -} diff --git a/writer/writer.go b/writer/writer.go new file mode 100644 index 0000000..b77dcf4 --- /dev/null +++ b/writer/writer.go @@ -0,0 +1,106 @@ +package writer + +import ( + "encoding/json" + "fmt" + "os" + "time" + dict "tinkoff-invest-dumper/dictionary" + "tinkoff-invest-dumper/eventer" +) + +type Logger interface { + Fatalln(v ...interface{}) +} + +type Dictionary interface { + GetFIGIByTicker(t dict.Ticker) (dict.Figi, error) + GetTickerByFIGI(figi dict.Figi) (dict.Ticker, error) +} + +type Writer struct { + dictionary Dictionary + logger Logger +} + +func NewWriter(lg Logger, dc Dictionary) *Writer { + return &Writer{ + dictionary: dc, + logger: lg, + } +} + +func (s *Writer) OrderbookWriter(ch chan eventer.OrderbookEvent, filePath string) { + file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + s.logger.Fatalln(err) + } + + defer func() { + if err := file.Close(); err != nil { + s.logger.Fatalln(err) + } + }() + + for event := range ch { + row := map[string]interface{}{ + "ticker": event.Ticker, + "figi": event.Figi, + + "t": event.Event.Time, + "lt": event.LocalTime.Format(time.RFC3339Nano), + "b": event.Event.OrderBook.Bids, + "a": event.Event.OrderBook.Asks, + } + + jsonBytes, err := json.Marshal(row) + if err != nil { + s.logger.Fatalln(err) + } + + _, err = file.WriteString(fmt.Sprintf("%v\n", string(jsonBytes))) + if err != nil { + s.logger.Fatalln(err) + } + } +} + +func (s *Writer) CandleWriter(ch chan eventer.CandleEvent, filePath string) { + file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + s.logger.Fatalln(err) + } + + defer func() { + if err := file.Close(); err != nil { + s.logger.Fatalln(err) + } + }() + + for event := range ch { + row := map[string]interface{}{ + "ticker": event.Ticker, + "figi": event.Figi, + + "t": event.Event.Time, + "lt": event.LocalTime.Format(time.RFC3339Nano), + "o": event.Event.Candle.OpenPrice, + "c": event.Event.Candle.ClosePrice, + "h": event.Event.Candle.HighPrice, + "l": event.Event.Candle.LowPrice, + "v": event.Event.Candle.Volume, + "ts": event.Event.Candle.TS, + "i": event.Event.Candle.Interval, + } + + jsonBytes, err := json.Marshal(row) + if err != nil { + s.logger.Fatalln(err) + } + + _, err = file.WriteString(fmt.Sprintf("%v\n", string(jsonBytes))) + if err != nil { + s.logger.Fatalln(err) + } + } +}