Skip to content

Commit

Permalink
add time prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
a0s committed Aug 24, 2020
1 parent f903a86 commit 241b01b
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 20 deletions.
10 changes: 7 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ var (
token = flag.String("token", "", "your sandbox's token")
path = flag.String("path", ".", "path to storage dir")

timeSuffixEnabled = flag.Bool("time-suffix-enabled", false, "add the time suffix to every filename on (re)start")
timeSuffixFormat = flag.String("time-suffix-format", "2006010215", "go format of the time suffix (see https://golang.org/src/time/format.go)")
timeSuffixStartedAt = time.Now()

orderbook = flag.String("orderbook", "", "list of tickers to subscribe for orderbooks")
orderbookDepth = flag.Int("orderbook-depth", 20, "depth of orderbook: from 1 to 20")

Expand Down Expand Up @@ -41,16 +45,16 @@ func main() {
}
defer streamingClient.Close()

scope := NewMainScope(listToTickers(*orderbook), listToTickers(*candle), logger)
scope := NewMainScope(parseTickersList(*orderbook), parseTickersList(*candle), logger)
scope.initInstruments(sandboxRestClient)
scope.initChannels()
scope.initDiskWriters()

go scope.eventReceiver(streamingClient)

scope.subscribeOrderbooks(streamingClient)
scope.subscribeOrderbook(streamingClient)
scope.subscribeCandles(streamingClient)
defer scope.unsubscribeOrderbooks(streamingClient)
defer scope.unsubscribeOrderbook(streamingClient)
defer scope.unsubscribeCandles(streamingClient)

select {} // sleep(0), epta
Expand Down
55 changes: 38 additions & 17 deletions main_scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"time"
)

var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

type ticker string // eg. MSFT
type figi string // eg. BBG000BPH459

Expand Down Expand Up @@ -98,7 +100,7 @@ func (s *mainScope) eventReceiver(streamingClient *sdk.StreamingClient) {
case sdk.CandleEvent:
f = figi(realEvent.Candle.FIGI)
default:
s.logger.Fatalln("unsupport event type", event)
s.logger.Fatalln("unsupported event type", event)
}
t = ticker(s.figiInstrument[f].Ticker)

Expand All @@ -122,7 +124,7 @@ func (s *mainScope) eventReceiver(streamingClient *sdk.StreamingClient) {
}
}

func (s *mainScope) subscribeOrderbooks(streamingClient *sdk.StreamingClient) {
func (s *mainScope) subscribeOrderbook(streamingClient *sdk.StreamingClient) {
for _, ticker := range s.orderbookTickers {
instrument := s.tickerInstrument[ticker]
err := streamingClient.SubscribeOrderbook(instrument.FIGI, *orderbookDepth, requestID())
Expand All @@ -133,7 +135,7 @@ func (s *mainScope) subscribeOrderbooks(streamingClient *sdk.StreamingClient) {
}
}

func (s *mainScope) unsubscribeOrderbooks(streamingClient *sdk.StreamingClient) {
func (s *mainScope) unsubscribeOrderbook(streamingClient *sdk.StreamingClient) {
for _, ticker := range s.orderbookTickers {
instrument := s.tickerInstrument[ticker]
err := streamingClient.UnsubscribeOrderbook(instrument.FIGI, *orderbookDepth, requestID())
Expand Down Expand Up @@ -243,28 +245,49 @@ func (s *mainScope) candleWriter(ch eventChannel, filePath string) {
}
}

func (s *mainScope) buildFileName(ticker ticker) (orderbookName, candleName string) {
var orderbook []string
var candle []string

orderbook = append(orderbook, string(ticker))
candle = append(candle, string(ticker))

if *timeSuffixEnabled {
startedAt := timeSuffixStartedAt.Format(*timeSuffixFormat)
orderbook = append(orderbook, startedAt)
candle = append(candle, startedAt)
}

orderbook = append(orderbook, "obk")
candle = append(candle, "cdl")

var err error
orderbookName, err = filepath.Abs(filepath.Join(*path, strings.Join(orderbook, "-")))
if err != nil {
s.logger.Fatalln(err)
}
candleName, err = filepath.Abs(filepath.Join(*path, strings.Join(candle, "-")))
if err != nil {
s.logger.Fatalln(err)
}
return
}

func (s *mainScope) initDiskWriters() {
for _, ticker := range s.allTickers() {

instrument := s.tickerInstrument[ticker]
figi := figi(instrument.FIGI)

orderbookFilePath, candleFilePath := s.buildFileName(ticker)

if _, ok := findTicker(s.orderbookTickers, ticker); ok {
filePath, err := filepath.Abs(filepath.Join(*path, fmt.Sprintf("%s%s", string(ticker), "_orderbook")))
if err != nil {
s.logger.Fatalln(err)
}
ch := s.orderbookFigiChannels[figi]
go s.orderbookWriter(ch, filePath)
go s.orderbookWriter(ch, orderbookFilePath)
}

if _, ok := findTicker(s.candleTickers, ticker); ok {
filePath, err := filepath.Abs(filepath.Join(*path, fmt.Sprintf("%s%s", string(ticker), "_candles")))
if err != nil {
s.logger.Fatalln(err)
}
ch := s.candlesFigiChannels[figi]
go s.candleWriter(ch, filePath)
go s.candleWriter(ch, candleFilePath)
}
}
}
Expand All @@ -287,7 +310,7 @@ func (s *mainScope) allTickers() []ticker {
return keys
}

func listToTickers(flag string) []ticker {
func parseTickersList(flag string) []ticker {
var tickers []ticker
flags := strings.Split(flag, ",")
for _, f := range flags {
Expand All @@ -298,8 +321,6 @@ func listToTickers(flag string) []ticker {
return tickers
}

var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

func requestID() string {
b := make([]rune, 12)
for i := range b {
Expand Down

0 comments on commit 241b01b

Please sign in to comment.