Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added SQL repository support for assets. #267

Merged
merged 1 commit into from
Jan 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions asset/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ The information provided on this project is strictly for informational purposes
- [type Repository](<#Repository>)
- [func NewRepository\(name, config string\) \(Repository, error\)](<#NewRepository>)
- [type RepositoryBuilderFunc](<#RepositoryBuilderFunc>)
- [type SQLRepository](<#SQLRepository>)
- [func NewSQLRepository\(dbDriver, dbURL string, dialect SQLRepositoryDialect\) \(\*SQLRepository, error\)](<#NewSQLRepository>)
- [func \(s \*SQLRepository\) Append\(name string, snapshots \<\-chan \*Snapshot\) error](<#SQLRepository.Append>)
- [func \(s \*SQLRepository\) Assets\(\) \(\[\]string, error\)](<#SQLRepository.Assets>)
- [func \(s \*SQLRepository\) Close\(\) error](<#SQLRepository.Close>)
- [func \(s \*SQLRepository\) Drop\(\) error](<#SQLRepository.Drop>)
- [func \(s \*SQLRepository\) Get\(name string\) \(\<\-chan \*Snapshot, error\)](<#SQLRepository.Get>)
- [func \(s \*SQLRepository\) GetSince\(name string, date time.Time\) \(\<\-chan \*Snapshot, error\)](<#SQLRepository.GetSince>)
- [func \(s \*SQLRepository\) LastDate\(name string\) \(time.Time, error\)](<#SQLRepository.LastDate>)
- [type SQLRepositoryDialect](<#SQLRepositoryDialect>)
- [type Snapshot](<#Snapshot>)
- [type Sync](<#Sync>)
- [func NewSync\(\) \*Sync](<#NewSync>)
Expand Down Expand Up @@ -348,6 +358,116 @@ RepositoryBuilderFunc defines a function to build a new repository using the giv
type RepositoryBuilderFunc func(config string) (Repository, error)
```

<a name="SQLRepository"></a>
## type [SQLRepository](<https://github.com/cinar/indicator/blob/master/asset/sql_repository.go#L17-L35>)

SQLRepository provides a SQL backed storage facility for financial market data.

```go
type SQLRepository struct {
// contains filtered or unexported fields
}
```

<a name="NewSQLRepository"></a>
### func [NewSQLRepository](<https://github.com/cinar/indicator/blob/master/asset/sql_repository.go#L38>)

```go
func NewSQLRepository(dbDriver, dbURL string, dialect SQLRepositoryDialect) (*SQLRepository, error)
```

NewSQLRepository takes a database driver, URL, and dialect for the asset repository and connects to it.

<a name="SQLRepository.Append"></a>
### func \(\*SQLRepository\) [Append](<https://github.com/cinar/indicator/blob/master/asset/sql_repository.go#L170>)

```go
func (s *SQLRepository) Append(name string, snapshots <-chan *Snapshot) error
```

Append adds the given snapshots to the asset with the given name.

<a name="SQLRepository.Assets"></a>
### func \(\*SQLRepository\) [Assets](<https://github.com/cinar/indicator/blob/master/asset/sql_repository.go#L87>)

```go
func (s *SQLRepository) Assets() ([]string, error)
```

Assets returns the names of all assets in the respository.

<a name="SQLRepository.Close"></a>
### func \(\*SQLRepository\) [Close](<https://github.com/cinar/indicator/blob/master/asset/sql_repository.go#L82>)

```go
func (s *SQLRepository) Close() error
```

Close closes the database connection.

<a name="SQLRepository.Drop"></a>
### func \(\*SQLRepository\) [Drop](<https://github.com/cinar/indicator/blob/master/asset/sql_repository.go#L193>)

```go
func (s *SQLRepository) Drop() error
```

Drop drops the snapshots table.

<a name="SQLRepository.Get"></a>
### func \(\*SQLRepository\) [Get](<https://github.com/cinar/indicator/blob/master/asset/sql_repository.go#L112>)

```go
func (s *SQLRepository) Get(name string) (<-chan *Snapshot, error)
```

Get attempts to return a channel of snapshots for the asset with the given name.

<a name="SQLRepository.GetSince"></a>
### func \(\*SQLRepository\) [GetSince](<https://github.com/cinar/indicator/blob/master/asset/sql_repository.go#L117>)

```go
func (s *SQLRepository) GetSince(name string, date time.Time) (<-chan *Snapshot, error)
```

GetSince attempts to return a channel of snapshots for the asset with the given name since the given date.

<a name="SQLRepository.LastDate"></a>
### func \(\*SQLRepository\) [LastDate](<https://github.com/cinar/indicator/blob/master/asset/sql_repository.go#L152>)

```go
func (s *SQLRepository) LastDate(name string) (time.Time, error)
```

LastDate returns the date of the last snapshot for the asset with the given name.

<a name="SQLRepositoryDialect"></a>
## type [SQLRepositoryDialect](<https://github.com/cinar/indicator/blob/master/asset/sql_repository_dialect.go#L8-L26>)

SQLRepositoryDialect defines the SQL dialect for the SQL repository.

```go
type SQLRepositoryDialect interface {
// CreateTable returns the SQL statement to create the repository table.
CreateTable() string

// DropTable returns the SQL statement to drop the repository table.
DropTable() string

// Assets returns the SQL statement to get the names of all assets in the respository.
Assets() string

// GetSince returns the SQL statement to query snapshots for the asset with the given name since the given date.
GetSince() string

// LastDate returns the SQL statement to query for the last date for the asset with the given name.
LastDate() string

// Appends returns the SQL statement to add the given snapshots to the asset with the given name.
Append() string
}
```

<a name="Snapshot"></a>
## type [Snapshot](<https://github.com/cinar/indicator/blob/master/asset/snapshot.go#L15-L38>)

Expand Down
200 changes: 200 additions & 0 deletions asset/sql_repository.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// Copyright (c) 2021-2024 Onur Cinar.
// The source code is provided under GNU AGPLv3 License.
// https://github.com/cinar/indicator

package asset

import (
"database/sql"
"fmt"
"log"
"time"

"github.com/cinar/indicator/v2/helper"
)

// SQLRepository provides a SQL backed storage facility for financial market data.
type SQLRepository struct {
// db is the database connection.
db *sql.DB

// dialect is the database dialect to use.
dialect SQLRepositoryDialect

// assetsQuery is the prepared assets query.
assetsQuery *sql.Stmt

// getSinceQuery is the prepared get since query.
getSinceQuery *sql.Stmt

// lastDateQuery is the prepared last date query.
lastDateQuery *sql.Stmt

// appendQuery is the prepared append query.
appendQuery *sql.Stmt
}

// NewSQLRepository takes a database driver, URL, and dialect for the asset repository and connects to it.
func NewSQLRepository(dbDriver, dbURL string, dialect SQLRepositoryDialect) (*SQLRepository, error) {
db, err := sql.Open(dbDriver, dbURL)
if err != nil {
return nil, fmt.Errorf("unable to connect database: %w", err)
}

_, err = db.Exec(dialect.CreateTable())
if err != nil {
return nil, helper.CloseDatabaseWithError(db, fmt.Errorf("unable to create table: %w", err))
}

assetQuery, err := db.Prepare(dialect.Assets())
if err != nil {
return nil, helper.CloseDatabaseWithError(db, fmt.Errorf("unable to prepare assets: %w", err))
}

getSinceQuery, err := db.Prepare(dialect.GetSince())
if err != nil {
return nil, helper.CloseDatabaseWithError(db, fmt.Errorf("unable to prepare get since query: %w", err))
}

lastDateQuery, err := db.Prepare(dialect.LastDate())
if err != nil {
return nil, helper.CloseDatabaseWithError(db, fmt.Errorf("unable to prepare last date query: %w", err))
}

appendQuery, err := db.Prepare(dialect.Append())
if err != nil {
return nil, helper.CloseDatabaseWithError(db, fmt.Errorf("unable to prepare append: %w", err))
}

repository := &SQLRepository{
db,
dialect,
assetQuery,
getSinceQuery,
lastDateQuery,
appendQuery,
}

return repository, nil
}

// Close closes the database connection.
func (s *SQLRepository) Close() error {
return helper.CloseDatabaseWithError(s.db, nil)
}

// Assets returns the names of all assets in the respository.
func (s *SQLRepository) Assets() ([]string, error) {
rows, err := s.assetsQuery.Query()
if err != nil {
return nil, fmt.Errorf("unable to get assets: %w", err)
}

defer helper.CloseDatabaseRows(rows)

var assets []string

for rows.Next() {
var name string

err := rows.Scan(&name)
if err != nil {
return nil, fmt.Errorf("unable to scan assets: %w", err)
}

assets = append(assets, name)
}

return assets, nil
}

// Get attempts to return a channel of snapshots for the asset with the given name.
func (s *SQLRepository) Get(name string) (<-chan *Snapshot, error) {
return s.GetSince(name, time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
}

// GetSince attempts to return a channel of snapshots for the asset with the given name since the given date.
func (s *SQLRepository) GetSince(name string, date time.Time) (<-chan *Snapshot, error) {
rows, err := s.getSinceQuery.Query(name, date)
if err != nil {
return nil, fmt.Errorf("unable to get since: %w", err)
}

snapshots := make(chan *Snapshot)

go func() {
defer helper.CloseDatabaseRows(rows)
defer close(snapshots)

for rows.Next() {
snapshot := &Snapshot{}

err := rows.Scan(
&snapshot.Date,
&snapshot.Open,
&snapshot.High,
&snapshot.Low,
&snapshot.Close,
&snapshot.Volume,
)
if err != nil {
log.Printf("unable to scan row: %v", err)
}

snapshots <- snapshot
}
}()

return snapshots, nil
}

// LastDate returns the date of the last snapshot for the asset with the given name.
func (s *SQLRepository) LastDate(name string) (time.Time, error) {
row := s.lastDateQuery.QueryRow(name)

var date time.Time

err := row.Scan(&date)
if err != nil {
if err == sql.ErrNoRows {
return date, fmt.Errorf("unable to find asset")
}

return date, fmt.Errorf("unable to get the last date: %w", err)
}

return date, nil
}

// Append adds the given snapshots to the asset with the given name.
func (s *SQLRepository) Append(name string, snapshots <-chan *Snapshot) error {
go func() {
for snapshot := range snapshots {
_, err := s.appendQuery.Exec(
name,
snapshot.Date,
snapshot.Open,
snapshot.High,
snapshot.Low,
snapshot.Close,
snapshot.Volume,
)

if err != nil {
log.Printf("unable to append snapshot: %v", err)
}
}
}()

return nil
}

// Drop drops the snapshots table.
func (s *SQLRepository) Drop() error {
_, err := s.db.Exec(s.dialect.DropTable())
if err != nil {
return fmt.Errorf("unable to drop repository: %w", err)
}

return nil
}
26 changes: 26 additions & 0 deletions asset/sql_repository_dialect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) 2021-2024 Onur Cinar.
// The source code is provided under GNU AGPLv3 License.
// https://github.com/cinar/indicator

package asset

// SQLRepositoryDialect defines the SQL dialect for the SQL repository.
type SQLRepositoryDialect interface {
// CreateTable returns the SQL statement to create the repository table.
CreateTable() string

// DropTable returns the SQL statement to drop the repository table.
DropTable() string

// Assets returns the SQL statement to get the names of all assets in the respository.
Assets() string

// GetSince returns the SQL statement to query snapshots for the asset with the given name since the given date.
GetSince() string

// LastDate returns the SQL statement to query for the last date for the asset with the given name.
LastDate() string

// Appends returns the SQL statement to add the given snapshots to the asset with the given name.
Append() string
}
20 changes: 20 additions & 0 deletions helper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ The information provided on this project is strictly for informational purposes
- [func CheckEquals\[T comparable\]\(inputs ...\<\-chan T\) error](<#CheckEquals>)
- [func CloseAndLogError\(closer io.Closer, message string\)](<#CloseAndLogError>)
- [func CloseAndLogErrorWithLogger\(closer io.Closer, message string, logger \*slog.Logger\)](<#CloseAndLogErrorWithLogger>)
- [func CloseDatabaseRows\(rows \*sql.Rows\)](<#CloseDatabaseRows>)
- [func CloseDatabaseWithError\(db \*sql.DB, err error\) error](<#CloseDatabaseWithError>)
- [func CommonPeriod\(periods ...int\) int](<#CommonPeriod>)
- [func Count\[T Number, O any\]\(from T, other \<\-chan O\) \<\-chan T](<#Count>)
- [func DaysBetween\(from, to time.Time\) int](<#DaysBetween>)
Expand Down Expand Up @@ -337,6 +339,24 @@ func CloseAndLogErrorWithLogger(closer io.Closer, message string, logger *slog.L

CloseAndLogErrorWithLogger attempts to close the closer and logs any error to the given logger.

<a name="CloseDatabaseRows"></a>
## func [CloseDatabaseRows](<https://github.com/cinar/indicator/blob/master/helper/database.go#L31>)

```go
func CloseDatabaseRows(rows *sql.Rows)
```

CloseDatabaseRows closes the database rows.

<a name="CloseDatabaseWithError"></a>
## func [CloseDatabaseWithError](<https://github.com/cinar/indicator/blob/master/helper/database.go#L14>)

```go
func CloseDatabaseWithError(db *sql.DB, err error) error
```

CloseDatabaseWithError closes the database after an error.

<a name="CommonPeriod"></a>
## func [CommonPeriod](<https://github.com/cinar/indicator/blob/master/helper/sync.go#L24>)

Expand Down
Loading
Loading