diff --git a/asset/README.md b/asset/README.md index da73b35..7f6025c 100644 --- a/asset/README.md +++ b/asset/README.md @@ -50,6 +50,16 @@ The information provided on this project is strictly for informational purposes - [type Repository](<#Repository>) - [func NewRepository\(name, config string\) \(Repository, error\)](<#NewRepository>) - [type RepositoryBuilderFunc](<#RepositoryBuilderFunc>) +- [type SQLRepository](<#SQLRepository>) + - [func NewSQLRepository\(dbDriver, dbURL string, dialect SQLRepositoryDialect\) \(\*SQLRepository, error\)](<#NewSQLRepository>) + - [func \(s \*SQLRepository\) Append\(name string, snapshots \<\-chan \*Snapshot\) error](<#SQLRepository.Append>) + - [func \(s \*SQLRepository\) Assets\(\) \(\[\]string, error\)](<#SQLRepository.Assets>) + - [func \(s \*SQLRepository\) Close\(\) error](<#SQLRepository.Close>) + - [func \(s \*SQLRepository\) Drop\(\) error](<#SQLRepository.Drop>) + - [func \(s \*SQLRepository\) Get\(name string\) \(\<\-chan \*Snapshot, error\)](<#SQLRepository.Get>) + - [func \(s \*SQLRepository\) GetSince\(name string, date time.Time\) \(\<\-chan \*Snapshot, error\)](<#SQLRepository.GetSince>) + - [func \(s \*SQLRepository\) LastDate\(name string\) \(time.Time, error\)](<#SQLRepository.LastDate>) +- [type SQLRepositoryDialect](<#SQLRepositoryDialect>) - [type Snapshot](<#Snapshot>) - [type Sync](<#Sync>) - [func NewSync\(\) \*Sync](<#NewSync>) @@ -348,6 +358,116 @@ RepositoryBuilderFunc defines a function to build a new repository using the giv type RepositoryBuilderFunc func(config string) (Repository, error) ``` + +## type [SQLRepository]() + +SQLRepository provides a SQL backed storage facility for financial market data. + +```go +type SQLRepository struct { + // contains filtered or unexported fields +} +``` + + +### func [NewSQLRepository]() + +```go +func NewSQLRepository(dbDriver, dbURL string, dialect SQLRepositoryDialect) (*SQLRepository, error) +``` + +NewSQLRepository takes a database driver, URL, and dialect for the asset repository and connects to it. + + +### func \(\*SQLRepository\) [Append]() + +```go +func (s *SQLRepository) Append(name string, snapshots <-chan *Snapshot) error +``` + +Append adds the given snapshots to the asset with the given name. + + +### func \(\*SQLRepository\) [Assets]() + +```go +func (s *SQLRepository) Assets() ([]string, error) +``` + +Assets returns the names of all assets in the respository. + + +### func \(\*SQLRepository\) [Close]() + +```go +func (s *SQLRepository) Close() error +``` + +Close closes the database connection. + + +### func \(\*SQLRepository\) [Drop]() + +```go +func (s *SQLRepository) Drop() error +``` + +Drop drops the snapshots table. + + +### func \(\*SQLRepository\) [Get]() + +```go +func (s *SQLRepository) Get(name string) (<-chan *Snapshot, error) +``` + +Get attempts to return a channel of snapshots for the asset with the given name. + + +### func \(\*SQLRepository\) [GetSince]() + +```go +func (s *SQLRepository) GetSince(name string, date time.Time) (<-chan *Snapshot, error) +``` + +GetSince attempts to return a channel of snapshots for the asset with the given name since the given date. + + +### func \(\*SQLRepository\) [LastDate]() + +```go +func (s *SQLRepository) LastDate(name string) (time.Time, error) +``` + +LastDate returns the date of the last snapshot for the asset with the given name. + + +## type [SQLRepositoryDialect]() + +SQLRepositoryDialect defines the SQL dialect for the SQL repository. + +```go +type SQLRepositoryDialect interface { + // CreateTable returns the SQL statement to create the repository table. + CreateTable() string + + // DropTable returns the SQL statement to drop the repository table. + DropTable() string + + // Assets returns the SQL statement to get the names of all assets in the respository. + Assets() string + + // GetSince returns the SQL statement to query snapshots for the asset with the given name since the given date. + GetSince() string + + // LastDate returns the SQL statement to query for the last date for the asset with the given name. + LastDate() string + + // Appends returns the SQL statement to add the given snapshots to the asset with the given name. + Append() string +} +``` + ## type [Snapshot]() diff --git a/asset/sql_repository.go b/asset/sql_repository.go new file mode 100644 index 0000000..8164931 --- /dev/null +++ b/asset/sql_repository.go @@ -0,0 +1,200 @@ +// Copyright (c) 2021-2024 Onur Cinar. +// The source code is provided under GNU AGPLv3 License. +// https://github.com/cinar/indicator + +package asset + +import ( + "database/sql" + "fmt" + "log" + "time" + + "github.com/cinar/indicator/v2/helper" +) + +// SQLRepository provides a SQL backed storage facility for financial market data. +type SQLRepository struct { + // db is the database connection. + db *sql.DB + + // dialect is the database dialect to use. + dialect SQLRepositoryDialect + + // assetsQuery is the prepared assets query. + assetsQuery *sql.Stmt + + // getSinceQuery is the prepared get since query. + getSinceQuery *sql.Stmt + + // lastDateQuery is the prepared last date query. + lastDateQuery *sql.Stmt + + // appendQuery is the prepared append query. + appendQuery *sql.Stmt +} + +// NewSQLRepository takes a database driver, URL, and dialect for the asset repository and connects to it. +func NewSQLRepository(dbDriver, dbURL string, dialect SQLRepositoryDialect) (*SQLRepository, error) { + db, err := sql.Open(dbDriver, dbURL) + if err != nil { + return nil, fmt.Errorf("unable to connect database: %w", err) + } + + _, err = db.Exec(dialect.CreateTable()) + if err != nil { + return nil, helper.CloseDatabaseWithError(db, fmt.Errorf("unable to create table: %w", err)) + } + + assetQuery, err := db.Prepare(dialect.Assets()) + if err != nil { + return nil, helper.CloseDatabaseWithError(db, fmt.Errorf("unable to prepare assets: %w", err)) + } + + getSinceQuery, err := db.Prepare(dialect.GetSince()) + if err != nil { + return nil, helper.CloseDatabaseWithError(db, fmt.Errorf("unable to prepare get since query: %w", err)) + } + + lastDateQuery, err := db.Prepare(dialect.LastDate()) + if err != nil { + return nil, helper.CloseDatabaseWithError(db, fmt.Errorf("unable to prepare last date query: %w", err)) + } + + appendQuery, err := db.Prepare(dialect.Append()) + if err != nil { + return nil, helper.CloseDatabaseWithError(db, fmt.Errorf("unable to prepare append: %w", err)) + } + + repository := &SQLRepository{ + db, + dialect, + assetQuery, + getSinceQuery, + lastDateQuery, + appendQuery, + } + + return repository, nil +} + +// Close closes the database connection. +func (s *SQLRepository) Close() error { + return helper.CloseDatabaseWithError(s.db, nil) +} + +// Assets returns the names of all assets in the respository. +func (s *SQLRepository) Assets() ([]string, error) { + rows, err := s.assetsQuery.Query() + if err != nil { + return nil, fmt.Errorf("unable to get assets: %w", err) + } + + defer helper.CloseDatabaseRows(rows) + + var assets []string + + for rows.Next() { + var name string + + err := rows.Scan(&name) + if err != nil { + return nil, fmt.Errorf("unable to scan assets: %w", err) + } + + assets = append(assets, name) + } + + return assets, nil +} + +// Get attempts to return a channel of snapshots for the asset with the given name. +func (s *SQLRepository) Get(name string) (<-chan *Snapshot, error) { + return s.GetSince(name, time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)) +} + +// GetSince attempts to return a channel of snapshots for the asset with the given name since the given date. +func (s *SQLRepository) GetSince(name string, date time.Time) (<-chan *Snapshot, error) { + rows, err := s.getSinceQuery.Query(name, date) + if err != nil { + return nil, fmt.Errorf("unable to get since: %w", err) + } + + snapshots := make(chan *Snapshot) + + go func() { + defer helper.CloseDatabaseRows(rows) + defer close(snapshots) + + for rows.Next() { + snapshot := &Snapshot{} + + err := rows.Scan( + &snapshot.Date, + &snapshot.Open, + &snapshot.High, + &snapshot.Low, + &snapshot.Close, + &snapshot.Volume, + ) + if err != nil { + log.Printf("unable to scan row: %v", err) + } + + snapshots <- snapshot + } + }() + + return snapshots, nil +} + +// LastDate returns the date of the last snapshot for the asset with the given name. +func (s *SQLRepository) LastDate(name string) (time.Time, error) { + row := s.lastDateQuery.QueryRow(name) + + var date time.Time + + err := row.Scan(&date) + if err != nil { + if err == sql.ErrNoRows { + return date, fmt.Errorf("unable to find asset") + } + + return date, fmt.Errorf("unable to get the last date: %w", err) + } + + return date, nil +} + +// Append adds the given snapshots to the asset with the given name. +func (s *SQLRepository) Append(name string, snapshots <-chan *Snapshot) error { + go func() { + for snapshot := range snapshots { + _, err := s.appendQuery.Exec( + name, + snapshot.Date, + snapshot.Open, + snapshot.High, + snapshot.Low, + snapshot.Close, + snapshot.Volume, + ) + + if err != nil { + log.Printf("unable to append snapshot: %v", err) + } + } + }() + + return nil +} + +// Drop drops the snapshots table. +func (s *SQLRepository) Drop() error { + _, err := s.db.Exec(s.dialect.DropTable()) + if err != nil { + return fmt.Errorf("unable to drop repository: %w", err) + } + + return nil +} diff --git a/asset/sql_repository_dialect.go b/asset/sql_repository_dialect.go new file mode 100644 index 0000000..72d465c --- /dev/null +++ b/asset/sql_repository_dialect.go @@ -0,0 +1,26 @@ +// Copyright (c) 2021-2024 Onur Cinar. +// The source code is provided under GNU AGPLv3 License. +// https://github.com/cinar/indicator + +package asset + +// SQLRepositoryDialect defines the SQL dialect for the SQL repository. +type SQLRepositoryDialect interface { + // CreateTable returns the SQL statement to create the repository table. + CreateTable() string + + // DropTable returns the SQL statement to drop the repository table. + DropTable() string + + // Assets returns the SQL statement to get the names of all assets in the respository. + Assets() string + + // GetSince returns the SQL statement to query snapshots for the asset with the given name since the given date. + GetSince() string + + // LastDate returns the SQL statement to query for the last date for the asset with the given name. + LastDate() string + + // Appends returns the SQL statement to add the given snapshots to the asset with the given name. + Append() string +} diff --git a/helper/README.md b/helper/README.md index 57369c8..0e6ff70 100644 --- a/helper/README.md +++ b/helper/README.md @@ -38,6 +38,8 @@ The information provided on this project is strictly for informational purposes - [func CheckEquals\[T comparable\]\(inputs ...\<\-chan T\) error](<#CheckEquals>) - [func CloseAndLogError\(closer io.Closer, message string\)](<#CloseAndLogError>) - [func CloseAndLogErrorWithLogger\(closer io.Closer, message string, logger \*slog.Logger\)](<#CloseAndLogErrorWithLogger>) +- [func CloseDatabaseRows\(rows \*sql.Rows\)](<#CloseDatabaseRows>) +- [func CloseDatabaseWithError\(db \*sql.DB, err error\) error](<#CloseDatabaseWithError>) - [func CommonPeriod\(periods ...int\) int](<#CommonPeriod>) - [func Count\[T Number, O any\]\(from T, other \<\-chan O\) \<\-chan T](<#Count>) - [func DaysBetween\(from, to time.Time\) int](<#DaysBetween>) @@ -337,6 +339,24 @@ func CloseAndLogErrorWithLogger(closer io.Closer, message string, logger *slog.L CloseAndLogErrorWithLogger attempts to close the closer and logs any error to the given logger. + +## func [CloseDatabaseRows]() + +```go +func CloseDatabaseRows(rows *sql.Rows) +``` + +CloseDatabaseRows closes the database rows. + + +## func [CloseDatabaseWithError]() + +```go +func CloseDatabaseWithError(db *sql.DB, err error) error +``` + +CloseDatabaseWithError closes the database after an error. + ## func [CommonPeriod]() diff --git a/helper/database.go b/helper/database.go new file mode 100644 index 0000000..de60672 --- /dev/null +++ b/helper/database.go @@ -0,0 +1,36 @@ +// Copyright (c) 2021-2024 Onur Cinar. +// The source code is provided under GNU AGPLv3 License. +// https://github.com/cinar/indicator + +package helper + +import ( + "database/sql" + "fmt" + "log" +) + +// CloseDatabaseWithError closes the database after an error. +func CloseDatabaseWithError(db *sql.DB, err error) error { + closeErr := db.Close() + if closeErr == nil { + return err + } + + closeErr = fmt.Errorf("unable to close database: %w", closeErr) + + if err != nil { + log.Println(closeErr) + return err + } + + return closeErr +} + +// CloseDatabaseRows closes the database rows. +func CloseDatabaseRows(rows *sql.Rows) { + err := rows.Close() + if err != nil { + log.Println(err) + } +}