Skip to content

Commit

Permalink
Added SQL repository support for assets. (#267)
Browse files Browse the repository at this point in the history
# Describe Request

Added SQL repository support for assets. It does not contain any actual
implementation for a particular database. I will follow up with a
separate repo or submodule.

Related #266 

# Change Type

SQL repository.
  • Loading branch information
cinar authored Jan 18, 2025
1 parent 3ae91ac commit 8812266
Show file tree
Hide file tree
Showing 5 changed files with 402 additions and 0 deletions.
120 changes: 120 additions & 0 deletions asset/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ The information provided on this project is strictly for informational purposes
- [type Repository](<#Repository>)
- [func NewRepository\(name, config string\) \(Repository, error\)](<#NewRepository>)
- [type RepositoryBuilderFunc](<#RepositoryBuilderFunc>)
- [type SQLRepository](<#SQLRepository>)
- [func NewSQLRepository\(dbDriver, dbURL string, dialect SQLRepositoryDialect\) \(\*SQLRepository, error\)](<#NewSQLRepository>)
- [func \(s \*SQLRepository\) Append\(name string, snapshots \<\-chan \*Snapshot\) error](<#SQLRepository.Append>)
- [func \(s \*SQLRepository\) Assets\(\) \(\[\]string, error\)](<#SQLRepository.Assets>)
- [func \(s \*SQLRepository\) Close\(\) error](<#SQLRepository.Close>)
- [func \(s \*SQLRepository\) Drop\(\) error](<#SQLRepository.Drop>)
- [func \(s \*SQLRepository\) Get\(name string\) \(\<\-chan \*Snapshot, error\)](<#SQLRepository.Get>)
- [func \(s \*SQLRepository\) GetSince\(name string, date time.Time\) \(\<\-chan \*Snapshot, error\)](<#SQLRepository.GetSince>)
- [func \(s \*SQLRepository\) LastDate\(name string\) \(time.Time, error\)](<#SQLRepository.LastDate>)
- [type SQLRepositoryDialect](<#SQLRepositoryDialect>)
- [type Snapshot](<#Snapshot>)
- [type Sync](<#Sync>)
- [func NewSync\(\) \*Sync](<#NewSync>)
Expand Down Expand Up @@ -348,6 +358,116 @@ RepositoryBuilderFunc defines a function to build a new repository using the giv
type RepositoryBuilderFunc func(config string) (Repository, error)
```

<a name="SQLRepository"></a>
## type [SQLRepository](<https://github.com/cinar/indicator/blob/master/asset/sql_repository.go#L17-L35>)

SQLRepository provides a SQL backed storage facility for financial market data.

```go
type SQLRepository struct {
// contains filtered or unexported fields
}
```

<a name="NewSQLRepository"></a>
### func [NewSQLRepository](<https://github.com/cinar/indicator/blob/master/asset/sql_repository.go#L38>)

```go
func NewSQLRepository(dbDriver, dbURL string, dialect SQLRepositoryDialect) (*SQLRepository, error)
```

NewSQLRepository takes a database driver, URL, and dialect for the asset repository and connects to it.

<a name="SQLRepository.Append"></a>
### func \(\*SQLRepository\) [Append](<https://github.com/cinar/indicator/blob/master/asset/sql_repository.go#L170>)

```go
func (s *SQLRepository) Append(name string, snapshots <-chan *Snapshot) error
```

Append adds the given snapshots to the asset with the given name.

<a name="SQLRepository.Assets"></a>
### func \(\*SQLRepository\) [Assets](<https://github.com/cinar/indicator/blob/master/asset/sql_repository.go#L87>)

```go
func (s *SQLRepository) Assets() ([]string, error)
```

Assets returns the names of all assets in the respository.

<a name="SQLRepository.Close"></a>
### func \(\*SQLRepository\) [Close](<https://github.com/cinar/indicator/blob/master/asset/sql_repository.go#L82>)

```go
func (s *SQLRepository) Close() error
```

Close closes the database connection.

<a name="SQLRepository.Drop"></a>
### func \(\*SQLRepository\) [Drop](<https://github.com/cinar/indicator/blob/master/asset/sql_repository.go#L193>)

```go
func (s *SQLRepository) Drop() error
```

Drop drops the snapshots table.

<a name="SQLRepository.Get"></a>
### func \(\*SQLRepository\) [Get](<https://github.com/cinar/indicator/blob/master/asset/sql_repository.go#L112>)

```go
func (s *SQLRepository) Get(name string) (<-chan *Snapshot, error)
```

Get attempts to return a channel of snapshots for the asset with the given name.

<a name="SQLRepository.GetSince"></a>
### func \(\*SQLRepository\) [GetSince](<https://github.com/cinar/indicator/blob/master/asset/sql_repository.go#L117>)

```go
func (s *SQLRepository) GetSince(name string, date time.Time) (<-chan *Snapshot, error)
```

GetSince attempts to return a channel of snapshots for the asset with the given name since the given date.

<a name="SQLRepository.LastDate"></a>
### func \(\*SQLRepository\) [LastDate](<https://github.com/cinar/indicator/blob/master/asset/sql_repository.go#L152>)

```go
func (s *SQLRepository) LastDate(name string) (time.Time, error)
```

LastDate returns the date of the last snapshot for the asset with the given name.

<a name="SQLRepositoryDialect"></a>
## type [SQLRepositoryDialect](<https://github.com/cinar/indicator/blob/master/asset/sql_repository_dialect.go#L8-L26>)

SQLRepositoryDialect defines the SQL dialect for the SQL repository.

```go
type SQLRepositoryDialect interface {
// CreateTable returns the SQL statement to create the repository table.
CreateTable() string

// DropTable returns the SQL statement to drop the repository table.
DropTable() string

// Assets returns the SQL statement to get the names of all assets in the respository.
Assets() string

// GetSince returns the SQL statement to query snapshots for the asset with the given name since the given date.
GetSince() string

// LastDate returns the SQL statement to query for the last date for the asset with the given name.
LastDate() string

// Appends returns the SQL statement to add the given snapshots to the asset with the given name.
Append() string
}
```

<a name="Snapshot"></a>
## type [Snapshot](<https://github.com/cinar/indicator/blob/master/asset/snapshot.go#L15-L38>)

Expand Down
200 changes: 200 additions & 0 deletions asset/sql_repository.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// Copyright (c) 2021-2024 Onur Cinar.
// The source code is provided under GNU AGPLv3 License.
// https://github.com/cinar/indicator

package asset

import (
"database/sql"
"fmt"
"log"
"time"

"github.com/cinar/indicator/v2/helper"
)

// SQLRepository provides a SQL backed storage facility for financial market data.
type SQLRepository struct {
// db is the database connection.
db *sql.DB

// dialect is the database dialect to use.
dialect SQLRepositoryDialect

// assetsQuery is the prepared assets query.
assetsQuery *sql.Stmt

// getSinceQuery is the prepared get since query.
getSinceQuery *sql.Stmt

// lastDateQuery is the prepared last date query.
lastDateQuery *sql.Stmt

// appendQuery is the prepared append query.
appendQuery *sql.Stmt
}

// NewSQLRepository takes a database driver, URL, and dialect for the asset repository and connects to it.
func NewSQLRepository(dbDriver, dbURL string, dialect SQLRepositoryDialect) (*SQLRepository, error) {
db, err := sql.Open(dbDriver, dbURL)
if err != nil {
return nil, fmt.Errorf("unable to connect database: %w", err)
}

_, err = db.Exec(dialect.CreateTable())
if err != nil {
return nil, helper.CloseDatabaseWithError(db, fmt.Errorf("unable to create table: %w", err))
}

assetQuery, err := db.Prepare(dialect.Assets())
if err != nil {
return nil, helper.CloseDatabaseWithError(db, fmt.Errorf("unable to prepare assets: %w", err))
}

getSinceQuery, err := db.Prepare(dialect.GetSince())
if err != nil {
return nil, helper.CloseDatabaseWithError(db, fmt.Errorf("unable to prepare get since query: %w", err))
}

lastDateQuery, err := db.Prepare(dialect.LastDate())
if err != nil {
return nil, helper.CloseDatabaseWithError(db, fmt.Errorf("unable to prepare last date query: %w", err))
}

appendQuery, err := db.Prepare(dialect.Append())
if err != nil {
return nil, helper.CloseDatabaseWithError(db, fmt.Errorf("unable to prepare append: %w", err))
}

repository := &SQLRepository{
db,
dialect,
assetQuery,
getSinceQuery,
lastDateQuery,
appendQuery,
}

return repository, nil
}

// Close closes the database connection.
func (s *SQLRepository) Close() error {
return helper.CloseDatabaseWithError(s.db, nil)
}

// Assets returns the names of all assets in the respository.
func (s *SQLRepository) Assets() ([]string, error) {
rows, err := s.assetsQuery.Query()
if err != nil {
return nil, fmt.Errorf("unable to get assets: %w", err)
}

defer helper.CloseDatabaseRows(rows)

var assets []string

for rows.Next() {
var name string

err := rows.Scan(&name)
if err != nil {
return nil, fmt.Errorf("unable to scan assets: %w", err)
}

assets = append(assets, name)
}

return assets, nil
}

// Get attempts to return a channel of snapshots for the asset with the given name.
func (s *SQLRepository) Get(name string) (<-chan *Snapshot, error) {
return s.GetSince(name, time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
}

// GetSince attempts to return a channel of snapshots for the asset with the given name since the given date.
func (s *SQLRepository) GetSince(name string, date time.Time) (<-chan *Snapshot, error) {
rows, err := s.getSinceQuery.Query(name, date)
if err != nil {
return nil, fmt.Errorf("unable to get since: %w", err)
}

snapshots := make(chan *Snapshot)

go func() {
defer helper.CloseDatabaseRows(rows)
defer close(snapshots)

for rows.Next() {
snapshot := &Snapshot{}

err := rows.Scan(
&snapshot.Date,
&snapshot.Open,
&snapshot.High,
&snapshot.Low,
&snapshot.Close,
&snapshot.Volume,
)
if err != nil {
log.Printf("unable to scan row: %v", err)
}

snapshots <- snapshot
}
}()

return snapshots, nil
}

// LastDate returns the date of the last snapshot for the asset with the given name.
func (s *SQLRepository) LastDate(name string) (time.Time, error) {
row := s.lastDateQuery.QueryRow(name)

var date time.Time

err := row.Scan(&date)
if err != nil {
if err == sql.ErrNoRows {
return date, fmt.Errorf("unable to find asset")
}

return date, fmt.Errorf("unable to get the last date: %w", err)
}

return date, nil
}

// Append adds the given snapshots to the asset with the given name.
func (s *SQLRepository) Append(name string, snapshots <-chan *Snapshot) error {
go func() {
for snapshot := range snapshots {
_, err := s.appendQuery.Exec(
name,
snapshot.Date,
snapshot.Open,
snapshot.High,
snapshot.Low,
snapshot.Close,
snapshot.Volume,
)

if err != nil {
log.Printf("unable to append snapshot: %v", err)
}
}
}()

return nil
}

// Drop drops the snapshots table.
func (s *SQLRepository) Drop() error {
_, err := s.db.Exec(s.dialect.DropTable())
if err != nil {
return fmt.Errorf("unable to drop repository: %w", err)
}

return nil
}
26 changes: 26 additions & 0 deletions asset/sql_repository_dialect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) 2021-2024 Onur Cinar.
// The source code is provided under GNU AGPLv3 License.
// https://github.com/cinar/indicator

package asset

// SQLRepositoryDialect defines the SQL dialect for the SQL repository.
type SQLRepositoryDialect interface {
// CreateTable returns the SQL statement to create the repository table.
CreateTable() string

// DropTable returns the SQL statement to drop the repository table.
DropTable() string

// Assets returns the SQL statement to get the names of all assets in the respository.
Assets() string

// GetSince returns the SQL statement to query snapshots for the asset with the given name since the given date.
GetSince() string

// LastDate returns the SQL statement to query for the last date for the asset with the given name.
LastDate() string

// Appends returns the SQL statement to add the given snapshots to the asset with the given name.
Append() string
}
20 changes: 20 additions & 0 deletions helper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ The information provided on this project is strictly for informational purposes
- [func CheckEquals\[T comparable\]\(inputs ...\<\-chan T\) error](<#CheckEquals>)
- [func CloseAndLogError\(closer io.Closer, message string\)](<#CloseAndLogError>)
- [func CloseAndLogErrorWithLogger\(closer io.Closer, message string, logger \*slog.Logger\)](<#CloseAndLogErrorWithLogger>)
- [func CloseDatabaseRows\(rows \*sql.Rows\)](<#CloseDatabaseRows>)
- [func CloseDatabaseWithError\(db \*sql.DB, err error\) error](<#CloseDatabaseWithError>)
- [func CommonPeriod\(periods ...int\) int](<#CommonPeriod>)
- [func Count\[T Number, O any\]\(from T, other \<\-chan O\) \<\-chan T](<#Count>)
- [func DaysBetween\(from, to time.Time\) int](<#DaysBetween>)
Expand Down Expand Up @@ -337,6 +339,24 @@ func CloseAndLogErrorWithLogger(closer io.Closer, message string, logger *slog.L

CloseAndLogErrorWithLogger attempts to close the closer and logs any error to the given logger.

<a name="CloseDatabaseRows"></a>
## func [CloseDatabaseRows](<https://github.com/cinar/indicator/blob/master/helper/database.go#L31>)

```go
func CloseDatabaseRows(rows *sql.Rows)
```

CloseDatabaseRows closes the database rows.

<a name="CloseDatabaseWithError"></a>
## func [CloseDatabaseWithError](<https://github.com/cinar/indicator/blob/master/helper/database.go#L14>)

```go
func CloseDatabaseWithError(db *sql.DB, err error) error
```

CloseDatabaseWithError closes the database after an error.

<a name="CommonPeriod"></a>
## func [CommonPeriod](<https://github.com/cinar/indicator/blob/master/helper/sync.go#L24>)

Expand Down
Loading

0 comments on commit 8812266

Please sign in to comment.