diff --git a/api/handlers.go b/api/handlers.go index 9908bf3e5..e55685e06 100644 --- a/api/handlers.go +++ b/api/handlers.go @@ -1340,7 +1340,7 @@ func uploadHandler(w http.ResponseWriter, r *http.Request) { com.SanitiseLogString(dbOwner), com.SanitiseLogString(dbName), numBytes) // Send a request to the job queue to set up the database - liveNode, err := com.LiveCreateDB(dbOwner, dbName, objectID) + liveNode, err := com.LiveCreateDB(com.AmqpChan, dbOwner, dbName, objectID) if err != nil { log.Println(err) jsonErr(w, err.Error(), http.StatusInternalServerError) diff --git a/api/main.go b/api/main.go index 3d681e61f..7b5c939d5 100644 --- a/api/main.go +++ b/api/main.go @@ -74,7 +74,7 @@ func main() { } // Connect to job queue server - err = com.ConnectQueue() + com.AmqpChan, err = com.ConnectQueue() if err != nil { log.Fatal(err) } @@ -98,11 +98,13 @@ func main() { } // Start background goroutines to handle job queue responses - com.ResponseWaiters = com.NewResponseReceiver() - com.CheckResponsesQueue = make(chan struct{}) - com.SubmitterInstance = com.RandomString(3) - go com.ResponseQueueCheck() - go com.ResponseQueueListen() + if !com.UseAMQP { + com.ResponseWaiters = com.NewResponseReceiver() + com.CheckResponsesQueue = make(chan struct{}) + com.SubmitterInstance = com.RandomString(3) + go com.ResponseQueueCheck() + go com.ResponseQueueListen() + } // Load our self signed CA chain ourCAPool = x509.NewCertPool() diff --git a/common/config_types.go b/common/config_types.go index d68a51156..f249b0c73 100644 --- a/common/config_types.go +++ b/common/config_types.go @@ -14,6 +14,7 @@ type TomlConfig struct { Live LiveInfo Memcache MemcacheInfo Minio MinioInfo + MQ MQInfo Pg PGInfo Sign SigningInfo UserMgmt UserMgmtInfo @@ -92,6 +93,15 @@ type MinioInfo struct { Server string } +type MQInfo struct { + CertFile string `toml:"cert_file"` + KeyFile string `toml:"key_file"` + Password string `toml:"password"` + Port int `toml:"port"` + Server string `toml:"server"` + Username string `toml:"username"` +} + // PGInfo contains the PostgreSQL connection parameters type PGInfo struct { Database string diff --git a/common/cypress.go b/common/cypress.go index 53eb81b52..37784bb58 100644 --- a/common/cypress.go +++ b/common/cypress.go @@ -87,7 +87,7 @@ func CypressSeed(w http.ResponseWriter, r *http.Request) { // Send the live database file to our job queue backend for setup dbOwner := "default" dbName := "Join Testing with index.sqlite" - liveNode, err := LiveCreateDB(dbOwner, dbName, objectID) + liveNode, err := LiveCreateDB(AmqpChan, dbOwner, dbName, objectID) if err != nil { log.Println(err) http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/common/live.go b/common/live.go index 84e9afbbd..e475c6132 100644 --- a/common/live.go +++ b/common/live.go @@ -2,6 +2,7 @@ package common import ( "context" + "crypto/tls" "encoding/json" "errors" "fmt" @@ -16,6 +17,7 @@ import ( sqlite "github.com/gwenn/gosqlite" "github.com/jackc/pgx/v5" pgpool "github.com/jackc/pgx/v5/pgxpool" + amqp "github.com/rabbitmq/amqp091-go" ) const ( @@ -35,117 +37,289 @@ var ( ) // ConnectQueue creates the connections to the backend queue server -func ConnectQueue() (err error) { - // Connect to PostgreSQL based queue server - // Note: JobListenConn uses a dedicated, non-pooled connection to the job queue database, while JobQueueConn uses - // a standard database connection pool - JobListenConn, err = pgx.ConnectConfig(context.Background(), listenConfig) - if err != nil { - return fmt.Errorf("%s: couldn't connect to backend queue server: %v", Conf.Live.Nodename, err) - } - JobQueueConn, err = pgpool.New(context.Background(), pgConfig.ConnString()) - if err != nil { - return fmt.Errorf("%s: couldn't connect to backend queue server: %v", Conf.Live.Nodename, err) +func ConnectQueue() (channel *amqp.Channel, err error) { + if UseAMQP { + // AMQP only + var conn *amqp.Connection + if Conf.Environment.Environment == "production" { + // If certificate/key files have been provided, then we can use mutual TLS (mTLS) + if Conf.MQ.CertFile != "" && Conf.MQ.KeyFile != "" { + var cert tls.Certificate + cert, err = tls.LoadX509KeyPair(Conf.MQ.CertFile, Conf.MQ.KeyFile) + if err != nil { + return + } + cfg := &tls.Config{Certificates: []tls.Certificate{cert}} + conn, err = amqp.DialTLS(fmt.Sprintf("amqps://%s:%s@%s:%d/", Conf.MQ.Username, Conf.MQ.Password, Conf.MQ.Server, Conf.MQ.Port), cfg) + if err != nil { + return + } + log.Printf("%s connected to AMQP server using mutual TLS (mTLS): %v:%d", Conf.Live.Nodename, Conf.MQ.Server, Conf.MQ.Port) + } else { + // Fallback to just verifying the server certs for TLS. This is needed by the DB4S end point, as it + // uses certs from our own CA, so mTLS won't easily work with it. + conn, err = amqp.Dial(fmt.Sprintf("amqps://%s:%s@%s:%d/", Conf.MQ.Username, Conf.MQ.Password, Conf.MQ.Server, Conf.MQ.Port)) + if err != nil { + return + } + log.Printf("%s connected to AMQP server with server-only TLS: %v:%d", Conf.Live.Nodename, Conf.MQ.Server, Conf.MQ.Port) + } + } else { + // Everywhere else (eg docker container) doesn't *have* to use TLS + conn, err = amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", Conf.MQ.Username, Conf.MQ.Password, Conf.MQ.Server, Conf.MQ.Port)) + if err != nil { + return + } + log.Printf("%s connected to AMQP server without encryption: %v:%d", Conf.Live.Nodename, Conf.MQ.Server, Conf.MQ.Port) + } + + channel, err = conn.Channel() + } else { + // Connect to PostgreSQL based queue server + // Note: JobListenConn uses a dedicated, non-pooled connection to the job queue database, while JobQueueConn uses + // a standard database connection pool + JobListenConn, err = pgx.ConnectConfig(context.Background(), listenConfig) + if err != nil { + return nil, fmt.Errorf("%s: couldn't connect to backend queue server: %v", Conf.Live.Nodename, err) + } + JobQueueConn, err = pgpool.New(context.Background(), pgConfig.ConnString()) + if err != nil { + return nil, fmt.Errorf("%s: couldn't connect to backend queue server: %v", Conf.Live.Nodename, err) + } } return } // LiveBackup asks the job queue backend to store the given database back into Minio func LiveBackup(liveNode, loggedInUser, dbOwner, dbName string) (err error) { - // Send the backup request to our job queue backend - var resp JobResponseDBError - err = JobSubmit(&resp, liveNode, "backup", loggedInUser, dbOwner, dbName, "") - if err != nil { - return - } + if UseAMQP { + var rawResponse []byte + rawResponse, err = MQRequest(AmqpChan, liveNode, "backup", loggedInUser, dbOwner, dbName, "") + if err != nil { + return + } + + // Decode the response + var resp LiveDBErrorResponse + err = json.Unmarshal(rawResponse, &resp) + if err != nil { + return + } + + // If the backup failed, then provide the error message to the user + if resp.Error != "" { + err = errors.New(resp.Error) + return + } + if resp.Node == "" { + log.Println("A node responded to a 'backup' request, but didn't identify itself.") + return + } + } else { + // Send the backup request to our job queue backend + var resp JobResponseDBError + err = JobSubmit(&resp, liveNode, "backup", loggedInUser, dbOwner, dbName, "") + if err != nil { + return + } - log.Printf("%s: node which handled the database backup request: %s", Conf.Live.Nodename, liveNode) + log.Printf("%s: node which handled the database backup request: %s", Conf.Live.Nodename, liveNode) - // Handle error response from the live node - if resp.Err != "" { - err = errors.New(resp.Err) - log.Printf("%s: an error was returned during database backup on '%s': '%v'", Conf.Live.Nodename, liveNode, resp.Err) + // Handle error response from the live node + if resp.Err != "" { + err = errors.New(resp.Err) + log.Printf("%s: an error was returned during database backup on '%s': '%v'", Conf.Live.Nodename, liveNode, resp.Err) + } } return } // LiveColumns requests the job queue backend to return a list of all columns of the given table func LiveColumns(liveNode, loggedInUser, dbOwner, dbName, table string) (columns []sqlite.Column, pk []string, err error) { - // Send the column list request to our job queue backend - var resp JobResponseDBColumns - err = JobSubmit(&resp, liveNode, "columns", loggedInUser, dbOwner, dbName, table) - if err != nil { - return - } + if UseAMQP { + var rawResponse []byte + rawResponse, err = MQRequest(AmqpChan, liveNode, "columns", loggedInUser, dbOwner, dbName, table) + if err != nil { + return + } - // Return the requested data - columns = resp.Columns - pk = resp.PkColumns + // Decode the response + var resp LiveDBColumnsResponse + err = json.Unmarshal(rawResponse, &resp) + if err != nil { + return + } + if resp.Error != "" { + err = errors.New(resp.Error) + return + } + if resp.Node == "" { + log.Println("A node responded to a 'columns' request, but didn't identify itself.") + return + } + columns = resp.Columns + pk = resp.PkColumns + } else { + // Send the column list request to our job queue backend + var resp JobResponseDBColumns + err = JobSubmit(&resp, liveNode, "columns", loggedInUser, dbOwner, dbName, table) + if err != nil { + return + } + + // Return the requested data + columns = resp.Columns + pk = resp.PkColumns - // Handle error response from the live node - if resp.Err != "" { - err = errors.New(resp.Err) - log.Printf("%s: an error was returned when retrieving the column list for '%s/%s': '%v'", Conf.Live.Nodename, dbOwner, dbName, resp.Err) + // Handle error response from the live node + if resp.Err != "" { + err = errors.New(resp.Err) + log.Printf("%s: an error was returned when retrieving the column list for '%s/%s': '%v'", Conf.Live.Nodename, dbOwner, dbName, resp.Err) + } } return } // LiveCreateDB requests the job queue backend create a new live SQLite database -func LiveCreateDB(dbOwner, dbName, objectID string) (liveNode string, err error) { - // Send the database setup request to our job queue backend - var resp JobResponseDBCreate - err = JobSubmit(&resp, "any", "createdb", "", dbOwner, dbName, objectID) - if err != nil { - return - } +func LiveCreateDB(channel *amqp.Channel, dbOwner, dbName, objectID string) (liveNode string, err error) { + if UseAMQP { + // Send the database setup request to our AMQP backend + var rawResponse []byte + rawResponse, err = MQRequest(channel, "create_queue", "createdb", "", dbOwner, dbName, objectID) + if err != nil { + return + } - // Return the name of the node which has the database - liveNode = resp.NodeName + // Decode the response + var resp LiveDBResponse + err = json.Unmarshal(rawResponse, &resp) + if err != nil { + log.Println(err) + return + } + if resp.Error != "" { + err = errors.New(resp.Error) + return + } + if resp.Node == "" { + log.Println("A node responded to a 'create' request, but didn't identify itself.") + return + } + if resp.Result != "success" { + err = errors.New(fmt.Sprintf("LIVE database (%s/%s) creation apparently didn't fail, but the response didn't include a success message", + dbOwner, dbName)) + return + } - log.Printf("%s: node which handled the database creation request: %s", Conf.Live.Nodename, liveNode) + // Return the name of the node which has the database + liveNode = resp.Node - // Handle error response from the live node - if resp.Err != "" { - err = errors.New(resp.Err) - log.Printf("%s: an error was returned during database creation on '%s': '%v'", Conf.Live.Nodename, resp.NodeName, resp.Err) + } else { + // Send the database setup request to our job queue backend + var resp JobResponseDBCreate + err = JobSubmit(&resp, "any", "createdb", "", dbOwner, dbName, objectID) + if err != nil { + return + } + + // Return the name of the node which has the database + liveNode = resp.NodeName + + log.Printf("%s: node which handled the database creation request: %s", Conf.Live.Nodename, liveNode) + + // Handle error response from the live node + if resp.Err != "" { + err = errors.New(resp.Err) + log.Printf("%s: an error was returned during database creation on '%s': '%v'", Conf.Live.Nodename, resp.NodeName, resp.Err) + } } return } // LiveDelete asks our job queue backend to delete a database func LiveDelete(liveNode, loggedInUser, dbOwner, dbName string) (err error) { - // Send the database setup request to our job queue backend - var resp JobResponseDBError - err = JobSubmit(&resp, liveNode, "delete", loggedInUser, dbOwner, dbName, "") - if err != nil { - return - } + if UseAMQP { + // Delete the database from our AMQP backend + var rawResponse []byte + rawResponse, err = MQRequest(AmqpChan, liveNode, "delete", loggedInUser, dbOwner, dbName, "") + if err != nil { + log.Println(err) + return + } + + // Decode the response + var resp LiveDBErrorResponse + err = json.Unmarshal(rawResponse, &resp) + if err != nil { + log.Println(err) + return + } + if resp.Error != "" { + err = errors.New(resp.Error) + return + } + if resp.Node == "" { + log.Println("A node responded to a 'delete' request, but didn't identify itself.") + return + } + } else { + // Send the database setup request to our job queue backend + var resp JobResponseDBError + err = JobSubmit(&resp, liveNode, "delete", loggedInUser, dbOwner, dbName, "") + if err != nil { + return + } - // Handle error response from the live node - if resp.Err != "" { - err = errors.New(resp.Err) - log.Printf("%s: an error was returned during database deletion on '%s': '%v'", Conf.Live.Nodename, liveNode, resp.Err) + // Handle error response from the live node + if resp.Err != "" { + err = errors.New(resp.Err) + log.Printf("%s: an error was returned during database deletion on '%s': '%v'", Conf.Live.Nodename, liveNode, resp.Err) + } } return } // LiveExecute asks our job queue backend to execute a SQL statement on a database func LiveExecute(liveNode, loggedInUser, dbOwner, dbName, sql string) (rowsChanged int, err error) { - // Send the execute request to our job queue backend - var resp JobResponseDBExecute - err = JobSubmit(&resp, liveNode, "execute", loggedInUser, dbOwner, dbName, sql) - if err != nil { - return - } + if UseAMQP { + var rawResponse []byte + rawResponse, err = MQRequest(AmqpChan, liveNode, "execute", loggedInUser, dbOwner, dbName, sql) + if err != nil { + return + } - // Return the number of rows changed by the execution run - rowsChanged = resp.RowsChanged + // Decode the response + var resp LiveDBExecuteResponse + err = json.Unmarshal(rawResponse, &resp) + if err != nil { + log.Println(err) + return + } - // Handle error response from the live node - if resp.Err != "" { - err = errors.New(resp.Err) - if !strings.HasPrefix(err.Error(), "don't use exec with") { - log.Printf("%s: an error was returned when retrieving the execution result for '%s/%s': '%v'", Conf.Live.Nodename, dbOwner, dbName, resp.Err) + // If the SQL execution failed, then provide the error message to the user + if resp.Error != "" { + err = errors.New(resp.Error) + return + } + rowsChanged = resp.RowsChanged + + } else { + // Send the execute request to our job queue backend + var resp JobResponseDBExecute + err = JobSubmit(&resp, liveNode, "execute", loggedInUser, dbOwner, dbName, sql) + if err != nil { + return + } + + // Return the number of rows changed by the execution run + rowsChanged = resp.RowsChanged + + // Handle error response from the live node + if resp.Err != "" { + err = errors.New(resp.Err) + if !strings.HasPrefix(err.Error(), "don't use exec with") { + log.Printf("%s: an error was returned when retrieving the execution result for '%s/%s': '%v'", Conf.Live.Nodename, dbOwner, dbName, resp.Err) + } } } return @@ -153,108 +327,244 @@ func LiveExecute(liveNode, loggedInUser, dbOwner, dbName, sql string) (rowsChang // LiveIndexes asks our job queue backend to provide the list of indexes in a database func LiveIndexes(liveNode, loggedInUser, dbOwner, dbName string) (indexes []APIJSONIndex, err error) { - // Send the index request to our job queue backend - var resp JobResponseDBIndexes - err = JobSubmit(&resp, liveNode, "indexes", loggedInUser, dbOwner, dbName, "") - if err != nil { - return - } + if UseAMQP { + // Send the index request to our job queue backend + var rawResponse []byte + rawResponse, err = MQRequest(AmqpChan, liveNode, "indexes", loggedInUser, dbOwner, dbName, "") + if err != nil { + return + } + + // Decode the response + var resp LiveDBIndexesResponse + err = json.Unmarshal(rawResponse, &resp) + if err != nil { + return + } + if resp.Error != "" { + err = errors.New(resp.Error) + return + } + if resp.Node == "" { + log.Println("A node responded to a 'indexes' request, but didn't identify itself.") + return + } + // Return the index list for the live database + indexes = resp.Indexes + + } else { + // Send the index request to our job queue backend + var resp JobResponseDBIndexes + err = JobSubmit(&resp, liveNode, "indexes", loggedInUser, dbOwner, dbName, "") + if err != nil { + return + } - // Return the index list for the live database - indexes = resp.Indexes + // Return the index list for the live database + indexes = resp.Indexes - // Handle error response from the live node - if resp.Err != "" { - err = errors.New(resp.Err) - log.Printf("%s: an error was returned when retrieving the index list for '%s/%s': '%v'", Conf.Live.Nodename, dbOwner, dbName, resp.Err) + // Handle error response from the live node + if resp.Err != "" { + err = errors.New(resp.Err) + log.Printf("%s: an error was returned when retrieving the index list for '%s/%s': '%v'", Conf.Live.Nodename, dbOwner, dbName, resp.Err) + } } return } // LiveQuery sends a SQLite query to a live database on its hosting node func LiveQuery(liveNode, loggedInUser, dbOwner, dbName, query string) (rows SQLiteRecordSet, err error) { - // Send the query to our job queue backend - var resp JobResponseDBQuery - err = JobSubmit(&resp, liveNode, "query", loggedInUser, dbOwner, dbName, query) - if err != nil { - return - } + if UseAMQP { + // Send the query request to our AMQP backend + var rawResponse []byte + rawResponse, err = MQRequest(AmqpChan, liveNode, "query", loggedInUser, dbOwner, dbName, query) + if err != nil { + return + } + + // Decode the response + var resp LiveDBQueryResponse + err = json.Unmarshal(rawResponse, &resp) + if err != nil { + log.Println(err) + return + } + if resp.Error != "" { + err = errors.New(resp.Error) + return + } + // Return the query response + rows = resp.Results + + } else { + // Send the query to our job queue backend + var resp JobResponseDBQuery + err = JobSubmit(&resp, liveNode, "query", loggedInUser, dbOwner, dbName, query) + if err != nil { + return + } - // Return the query response - rows = resp.Results + // Return the query response + rows = resp.Results - // Handle error response from the live node - if resp.Err != "" { - err = errors.New(resp.Err) - log.Printf("%s: an error was returned when retrieving the query response for '%s/%s': '%v'", Conf.Live.Nodename, dbOwner, dbName, resp.Err) + // Handle error response from the live node + if resp.Err != "" { + err = errors.New(resp.Err) + log.Printf("%s: an error was returned when retrieving the query response for '%s/%s': '%v'", Conf.Live.Nodename, dbOwner, dbName, resp.Err) + } } return } // LiveRowData asks our job queue backend to send us the SQLite table data for a given range of rows func LiveRowData(liveNode, loggedInUser, dbOwner, dbName string, reqData JobRequestRows) (rowData SQLiteRecordSet, err error) { - // Serialise the row data request to JSON - // NOTE - This actually causes the serialised field to be stored in PG as base64 instead. Not sure why, but we can work with it. - reqJSON, err := json.Marshal(reqData) - if err != nil { - log.Println(err) - return - } + if UseAMQP { + var rawResponse []byte + rawResponse, err = MQRequest(AmqpChan, liveNode, "rowdata", loggedInUser, dbOwner, dbName, reqData) + if err != nil { + log.Println(err) + return + } - // Send the row data request to our job queue backend - var resp JobResponseDBRows - err = JobSubmit(&resp, liveNode, "rowdata", loggedInUser, dbOwner, dbName, reqJSON) - if err != nil { - return - } + // Decode the response + var resp LiveDBRowsResponse + err = json.Unmarshal(rawResponse, &resp) + if err != nil { + log.Println(err) + return + } + if resp.Error != "" { + err = errors.New(resp.Error) + log.Println(err) + return + } + if resp.Node == "" { + log.Println("A node responded to a 'rowdata' request, but didn't identify itself.") + return + } - // Return the row data for the requested table - rowData = resp.RowData + // Return the row data for the requested table + rowData = resp.RowData + + } else { + // Serialise the row data request to JSON + // NOTE - This actually causes the serialised field to be stored in PG as base64 instead. Not sure why, but we can work with it. + var reqJSON []byte + reqJSON, err = json.Marshal(reqData) + if err != nil { + log.Println(err) + return + } - // Handle error response from the live node - if resp.Err != "" { - err = errors.New(resp.Err) - log.Printf("%s: an error was returned when retrieving the row data for '%s/%s': '%v'", Conf.Live.Nodename, dbOwner, dbName, resp.Err) + // Send the row data request to our job queue backend + var resp JobResponseDBRows + err = JobSubmit(&resp, liveNode, "rowdata", loggedInUser, dbOwner, dbName, reqJSON) + if err != nil { + return + } + + // Return the row data for the requested table + rowData = resp.RowData + + // Handle error response from the live node + if resp.Err != "" { + err = errors.New(resp.Err) + log.Printf("%s: an error was returned when retrieving the row data for '%s/%s': '%v'", Conf.Live.Nodename, dbOwner, dbName, resp.Err) + } } return } // LiveSize asks our job queue backend for the file size of a database func LiveSize(liveNode, loggedInUser, dbOwner, dbName string) (size int64, err error) { - // Send the size request to our job queue backend - var resp JobResponseDBSize - err = JobSubmit(&resp, liveNode, "size", loggedInUser, dbOwner, dbName, "") - if err != nil { - return - } + if UseAMQP { + // Send the size request to our AMQP backend + var rawResponse []byte + rawResponse, err = MQRequest(AmqpChan, liveNode, "size", loggedInUser, dbOwner, dbName, "") + if err != nil { + return + } - // Return the size of the live database - size = resp.Size + // Decode the response + var resp LiveDBSizeResponse + err = json.Unmarshal(rawResponse, &resp) + if err != nil { + return + } + if resp.Error != "" { + err = errors.New(resp.Error) + return + } + if resp.Node == "" { + log.Println("A node responded to a 'size' request, but didn't identify itself.") + return + } + // Return the size of the live database + size = resp.Size + + } else { + // Send the size request to our job queue backend + var resp JobResponseDBSize + err = JobSubmit(&resp, liveNode, "size", loggedInUser, dbOwner, dbName, "") + if err != nil { + return + } + + // Return the size of the live database + size = resp.Size - // Handle error response from the live node - if resp.Err != "" { - err = errors.New(resp.Err) - log.Printf("%s: an error was returned when checking the on disk database size for '%s/%s': '%v'", Conf.Live.Nodename, dbOwner, dbName, resp.Err) + // Handle error response from the live node + if resp.Err != "" { + err = errors.New(resp.Err) + log.Printf("%s: an error was returned when checking the on disk database size for '%s/%s': '%v'", Conf.Live.Nodename, dbOwner, dbName, resp.Err) + } } return } // LiveTables asks our job queue backend to provide the list of tables (not including views!) in a database func LiveTables(liveNode, loggedInUser, dbOwner, dbName string) (tables []string, err error) { - // Send the tables request to our job queue backend - var resp JobResponseDBTables - err = JobSubmit(&resp, liveNode, "tables", loggedInUser, dbOwner, dbName, "") - if err != nil { - return - } + if UseAMQP { + // Send the tables request to our AMQP backend + var rawResponse []byte + rawResponse, err = MQRequest(AmqpChan, liveNode, "tables", loggedInUser, dbOwner, dbName, "") + if err != nil { + return + } + + // Decode the response + var resp LiveDBTablesResponse + err = json.Unmarshal(rawResponse, &resp) + if err != nil { + return + } + if resp.Error != "" { + err = errors.New(resp.Error) + return + } + if resp.Node == "" { + log.Println("A node responded to a 'tables' request, but didn't identify itself.") + return + } + // Return the table list for the live database + tables = resp.Tables + + } else { + // Send the tables request to our job queue backend + var resp JobResponseDBTables + err = JobSubmit(&resp, liveNode, "tables", loggedInUser, dbOwner, dbName, "") + if err != nil { + return + } - // Return the table list for the live database - tables = resp.Tables + // Return the table list for the live database + tables = resp.Tables - // Handle error response from the live node - if resp.Err != "" { - err = errors.New(resp.Err) - log.Printf("%s: an error was returned when retrieving the table list for '%s/%s': '%v'", Conf.Live.Nodename, dbOwner, dbName, resp.Err) + // Handle error response from the live node + if resp.Err != "" { + err = errors.New(resp.Err) + log.Printf("%s: an error was returned when retrieving the table list for '%s/%s': '%v'", Conf.Live.Nodename, dbOwner, dbName, resp.Err) + } } return } @@ -282,20 +592,46 @@ func LiveTablesAndViews(liveNode, loggedInUser, dbOwner, dbName string) (list [] // LiveViews asks our job queue backend to provide the list of views (not including tables!) in a database func LiveViews(liveNode, loggedInUser, dbOwner, dbName string) (views []string, err error) { - // Send the views request to our job queue backend - var resp JobResponseDBViews - err = JobSubmit(&resp, liveNode, "views", loggedInUser, dbOwner, dbName, "") - if err != nil { - return - } + if UseAMQP { + var rawResponse []byte + rawResponse, err = MQRequest(AmqpChan, liveNode, "views", loggedInUser, dbOwner, dbName, "") + if err != nil { + return + } - // Return the view list for the live database - views = resp.Views + // Decode the response + var resp LiveDBViewsResponse + err = json.Unmarshal(rawResponse, &resp) + if err != nil { + return + } + if resp.Error != "" { + err = errors.New(resp.Error) + return + } + if resp.Node == "" { + log.Println("A node responded to a 'views' request, but didn't identify itself.") + return + } + // Return the view list for the live database + views = resp.Views + + } else { + // Send the views request to our job queue backend + var resp JobResponseDBViews + err = JobSubmit(&resp, liveNode, "views", loggedInUser, dbOwner, dbName, "") + if err != nil { + return + } - // Handle error response from the live node - if resp.Err != "" { - err = errors.New(resp.Err) - log.Printf("%s: an error was returned when retrieving the view list for '%s/%s': '%v'", Conf.Live.Nodename, dbOwner, dbName, resp.Err) + // Return the view list for the live database + views = resp.Views + + // Handle error response from the live node + if resp.Err != "" { + err = errors.New(resp.Err) + log.Printf("%s: an error was returned when retrieving the view list for '%s/%s': '%v'", Conf.Live.Nodename, dbOwner, dbName, resp.Err) + } } return } diff --git a/common/live_amqp.go b/common/live_amqp.go new file mode 100644 index 000000000..4ed9a35d9 --- /dev/null +++ b/common/live_amqp.go @@ -0,0 +1,183 @@ +package common + +import ( + "context" + "encoding/json" + "fmt" + "log" + + amqp "github.com/rabbitmq/amqp091-go" +) + +var ( + // AmqpChan is the AMQP channel handle we use for communication with our AMQP backend + AmqpChan *amqp.Channel + + // UseAMQP switches between running in AMQP mode (true) or job queue server mode (false) + UseAMQP = true +) + +// CloseMQChannel closes an open AMQP channel +func CloseMQChannel(channel *amqp.Channel) (err error) { + err = channel.Close() + return +} + +// CloseMQConnection closes an open AMQP connection +func CloseMQConnection(connection *amqp.Connection) (err error) { + err = connection.Close() + return +} + +// MQResponse sends an AMQP response back to its requester +func MQResponse(requestType string, msg amqp.Delivery, channel *amqp.Channel, nodeName string, responseData interface{}) (err error) { + var z []byte + z, err = json.Marshal(responseData) + if err != nil { + log.Println(err) + // It's super unlikely we can safely return here without ack-ing the message. So as something has gone + // wrong with json.Marshall() we'd better just attempt passing back info about that error message instead (!) + z = []byte(fmt.Sprintf(`{"node":"%s","error":"%s"}`, nodeName, err.Error())) // This is a LiveDBErrorResponse structure + } + + // Send the message + ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) + defer cancel() + err = channel.PublishWithContext(ctx, "", msg.ReplyTo, false, false, + amqp.Publishing{ + ContentType: "text/json", + CorrelationId: msg.CorrelationId, + Body: z, + }) + if err != nil { + log.Println(err) + } + + // Acknowledge the request, so it doesn't stick around in the queue + err = msg.Ack(false) + if err != nil { + log.Println(err) + } + + if JobQueueDebug > 0 { + log.Printf("[%s] Live node '%s' responded with ACK to message with correlationID: '%s', msg.ReplyTo: '%s'", requestType, nodeName, msg.CorrelationId, msg.ReplyTo) + return + } + return +} + +// MQCreateDBQueue creates a queue on the MQ server for "create database" messages +func MQCreateDBQueue(channel *amqp.Channel) (queue amqp.Queue, err error) { + queue, err = channel.QueueDeclare("create_queue", true, false, false, false, nil) + if err != nil { + return + } + + // FIXME: Re-read the docs for this, and work out if this is needed + err = channel.Qos(1, 0, false) + if err != nil { + return + } + return +} + +// MQCreateQueryQueue creates a queue on the MQ server for sending database queries to +func MQCreateQueryQueue(channel *amqp.Channel, nodeName string) (queue amqp.Queue, err error) { + queue, err = channel.QueueDeclare(nodeName, false, false, false, false, nil) + if err != nil { + return + } + + // FIXME: Re-read the docs for this, and work out if this is needed + err = channel.Qos(0, 0, false) + if err != nil { + return + } + return +} + +// MQCreateResponse sends a success/failure response back +func MQCreateResponse(msg amqp.Delivery, channel *amqp.Channel, nodeName, result string) (err error) { + // Construct the response. It's such a simple string we just create it directly instead of using json.Marshall() + resp := fmt.Sprintf(`{"node":"%s","dbowner":"","dbname":"","result":"%s","error":""}`, nodeName, result) + + // Send the message + ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) + defer cancel() + err = channel.PublishWithContext(ctx, "", msg.ReplyTo, false, false, + amqp.Publishing{ + ContentType: "text/json", + CorrelationId: msg.CorrelationId, + Body: []byte(resp), + }) + if err != nil { + log.Println(err) + } + msg.Ack(false) + if JobQueueDebug > 0 { + log.Printf("[CREATE] Live node '%s' responded with ACK to message with correlationID: '%s', msg.ReplyTo: '%s'", nodeName, msg.CorrelationId, msg.ReplyTo) + } + return +} + +// MQRequest is the main function used for sending requests to our AMQP backend +func MQRequest(channel *amqp.Channel, queue, operation, requestingUser, dbOwner, dbName string, data interface{}) (result []byte, err error) { + // Create a temporary AMQP queue for receiving the response + var q amqp.Queue + q, err = channel.QueueDeclare("", false, false, true, false, nil) + if err != nil { + return + } + + // Construct the request + bar := LiveDBRequest{ + Operation: operation, + DBOwner: dbOwner, + DBName: dbName, + Data: data, + RequestingUser: requestingUser, + } + var z []byte + z, err = json.Marshal(bar) + if err != nil { + log.Println(err) + return + } + + // Send the request via AMQP + ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) + defer cancel() + corrID := RandomString(32) + err = channel.PublishWithContext(ctx, "", queue, false, false, + amqp.Publishing{ + ContentType: "text/json", + CorrelationId: corrID, + ReplyTo: q.Name, + Body: z, + }) + if err != nil { + log.Println(err) + return + } + + // Start processing messages from the AMQP response queue + msgs, err := channel.Consume(q.Name, "", true, false, false, false, nil) + if err != nil { + return + } + + // Wait for, then extract the response. Without json unmarshalling it yet + for d := range msgs { + if corrID == d.CorrelationId { + result = d.Body + break + } + } + + // Delete the temporary queue + _, err = channel.QueueDelete(q.Name, false, false, false) + if err != nil { + log.Println(err) + } + return +} diff --git a/common/live_types.go b/common/live_types.go index df5a65ee8..ceada8db5 100644 --- a/common/live_types.go +++ b/common/live_types.go @@ -102,7 +102,7 @@ type ResponseReceivers struct { receivers map[int]*chan ResponseInfo } -// NewResponseReceiver is the constructor function for correcting creating new ResponseReceivers structures +// NewResponseReceiver is the constructor function for creating new ResponseReceivers func NewResponseReceiver() *ResponseReceivers { z := ResponseReceivers{ Mutex: sync.Mutex{}, @@ -125,3 +125,99 @@ func (r *ResponseReceivers) RemoveReceiver(jobID int) { delete(r.receivers, jobID) r.Unlock() } + +// *** Legacy (hopefully) AMQP related types + +// LiveDBColumnsResponse holds the fields used for receiving column list responses from our AMQP backend +type LiveDBColumnsResponse struct { + Node string `json:"node"` + Columns []sqlite.Column `json:"columns"` + PkColumns []string `json:"pkColuns"` + Error string `json:"error"` + ErrCode JobQueueErrorCode `json:"error_code"` +} + +// LiveDBErrorResponse holds just the node name and any error message used in responses by our AMQP backend +// It's useful for error message, and other responses where no other fields are needed +type LiveDBErrorResponse struct { + Node string `json:"node"` + Error string `json:"error"` +} + +// LiveDBExecuteResponse returns the number of rows changed by an Execute() call +type LiveDBExecuteResponse struct { + Node string `json:"node"` + RowsChanged int `json:"rows_changed"` + Error string `json:"error"` +} + +// LiveDBIndexesResponse holds the fields used for receiving index list responses from our AMQP backend +type LiveDBIndexesResponse struct { + Node string `json:"node"` + Indexes []APIJSONIndex `json:"indexes"` + Error string `json:"error"` +} + +// LiveDBQueryResponse holds the fields used for receiving query responses from our AMQP backend +type LiveDBQueryResponse struct { + Node string `json:"node"` + Results SQLiteRecordSet `json:"results"` + Error string `json:"error"` +} + +// LiveDBRequest holds the fields used for sending requests to our AMQP backend +type LiveDBRequest struct { + Operation string `json:"operation"` + DBOwner string `json:"dbowner"` + DBName string `json:"dbname"` + Data interface{} `json:"data,omitempty"` + RequestingUser string `json:"requesting_user"` +} + +// LiveDBResponse holds the fields used for receiving (non-query) responses from our AMQP backend +type LiveDBResponse struct { + Node string `json:"node"` + Result string `json:"result"` + Error string `json:"error"` +} + +// LiveDBRowsRequest holds the data used when making an AMQP rows request +type LiveDBRowsRequest struct { + DbTable string `json:"db_table"` + SortCol string `json:"sort_col"` + SortDir string `json:"sort_dir"` + CommitID string `json:"commit_id"` + RowOffset int `json:"row_offset"` + MaxRows int `json:"max_rows"` +} + +// LiveDBRowsResponse holds the fields used for receiving database page row responses from our AMQP backend +type LiveDBRowsResponse struct { + Node string `json:"node"` + DatabaseSize int64 `json:"database_size"` + DefaultTable string `json:"default_table"` + Error string `json:"error"` + RowData SQLiteRecordSet `json:"row_data"` + Tables []string `json:"tables"` +} + +// LiveDBSizeResponse holds the fields used for receiving database size responses from our AMQP backend +type LiveDBSizeResponse struct { + Node string `json:"node"` + Size int64 `json:"size"` + Error string `json:"error"` +} + +// LiveDBTablesResponse holds the fields used for receiving table list responses from our AMQP backend +type LiveDBTablesResponse struct { + Node string `json:"node"` + Tables []string `json:"tables"` + Error string `json:"error"` +} + +// LiveDBViewsResponse holds the fields used for receiving view list responses from our AMQP backend +type LiveDBViewsResponse struct { + Node string `json:"node"` + Views []string `json:"views"` + Error string `json:"error"` +} diff --git a/database/migrations/000005_job_submission_tables.down.sql b/database/migrations/000005_job_submission_tables.down.sql index aa959f0ef..0bcc3c4c5 100644 --- a/database/migrations/000005_job_submission_tables.down.sql +++ b/database/migrations/000005_job_submission_tables.down.sql @@ -1,8 +1,15 @@ BEGIN; -DROP TABLE IF EXISTS job_submissions; -DROP TABLE IF EXISTS job_responses; DROP TRIGGER IF EXISTS job_submissions_trigger ON job_submissions; -DROP TRIGGER IF EXISTS job_responses_trigger ON job_responses; DROP FUNCTION IF EXISTS job_submissions_notify(); +DROP INDEX IF EXISTS job_submissions_completed_date_index; +DROP INDEX IF EXISTS job_submissions_state_index; +DROP INDEX IF EXISTS job_submissions_submission_date_index; +DROP INDEX IF EXISTS job_submissions_target_node_index; +DROP TABLE IF EXISTS job_submissions; +DROP TRIGGER IF EXISTS job_responses_trigger ON job_responses; DROP FUNCTION IF EXISTS job_responses_notify(); +DROP INDEX IF EXISTS job_responses_processed_date_index; +DROP INDEX IF EXISTS job_responses_response_date_index; +DROP INDEX IF EXISTS job_responses_submitter_node_index; +DROP TABLE IF EXISTS job_responses; COMMIT; \ No newline at end of file diff --git a/database/migrations/000005_job_submission_tables.up.sql b/database/migrations/000005_job_submission_tables.up.sql index af3b9ff91..0261c26b7 100644 --- a/database/migrations/000005_job_submission_tables.up.sql +++ b/database/migrations/000005_job_submission_tables.up.sql @@ -12,6 +12,18 @@ CREATE TABLE IF NOT EXISTS job_submissions ( completed_date TIMESTAMP WITH TIME ZONE ); +CREATE INDEX job_submissions_completed_date_index + ON job_submissions (completed_date); + +CREATE INDEX job_submissions_state_index + ON job_submissions (state); + +CREATE INDEX job_submissions_submission_date_index + ON job_submissions (submission_date); + +CREATE INDEX job_submissions_target_node_index + ON job_submissions (target_node); + -- job_responses table CREATE TABLE IF NOT EXISTS job_responses ( @@ -27,6 +39,15 @@ CREATE TABLE IF NOT EXISTS job_responses processed_date TIMESTAMP WITH TIME ZONE ); +CREATE INDEX job_responses_processed_date_index + ON job_responses (processed_date); + +CREATE INDEX job_responses_response_date_index + ON job_responses (response_date); + +CREATE INDEX job_responses_submitter_node_index + ON job_responses (submitter_node); + -- notify function for the job_submissions table CREATE OR REPLACE FUNCTION job_submissions_notify() RETURNS trigger AS $$ diff --git a/docker/Dockerfile b/docker/Dockerfile index 9cc79ed6b..73dc359ac 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,10 +1,26 @@ # vim:set ft=dockerfile: +# We grab RabbitMQ and Erlang from the official RabbitMQ image +FROM rabbitmq:3-alpine AS rabbit + # Build our own image FROM alpine:3.19 LABEL maintainer="Justin Clift " +# Copy the RabbitMQ files we need +COPY --from=rabbit /etc/rabbitmq /etc/rabbitmq +COPY --from=rabbit /opt/erlang /opt/erlang +COPY --from=rabbit /opt/openssl /opt/openssl +COPY --from=rabbit /opt/rabbitmq /opt/rabbitmq +COPY --from=rabbit /var/lib/rabbitmq /var/lib/rabbitmq +COPY --from=rabbit /var/log/rabbitmq /var/log/rabbitmq + +# Create the rabbitmq user and group +RUN addgroup -S rabbitmq && \ + adduser -S -D -h /var/lib/rabbitmq -s /sbin/nologin -g "Linux User,,," -G rabbitmq rabbitmq && \ + chown -Rh rabbitmq: /opt/rabbitmq /var/log/rabbitmq + # Use a fast Australian mirror for the Alpine package repositories # Without doing this, building the image can take 2+ hours. :( RUN echo "https://mirror.aarnet.edu.au/pub/alpine/v3.19/main" > /etc/apk/repositories && \ @@ -75,8 +91,13 @@ RUN echo "echo 127.0.0.1 docker-dev.dbhub.io docker-dev >> /etc/hosts" >> /usr/l echo "su - minio -c '/usr/bin/minio server --quiet --anonymous /var/lib/minio/data 2>&1 &'" >> /usr/local/bin/start.sh && \ echo "su - postgres -c '/usr/libexec/postgresql/pg_ctl start'" >> /usr/local/bin/start.sh && \ echo "" >> /usr/local/bin/start.sh && \ - echo "# Delay long enough for the DBHub.io daemons to start" >> /usr/local/bin/start.sh && \ - echo "sleep 1" >> /usr/local/bin/start.sh && \ + echo "unset CONFIG_FILE" >> /usr/local/bin/start.sh && \ + echo "export RABBITMQ_CONFIG_FILES=/etc/rabbitmq/conf.d" >> /usr/local/bin/start.sh && \ + echo "export PATH=/opt/rabbitmq/sbin:/opt/erlang/bin:/opt/openssl/bin:$PATH" >> /usr/local/bin/start.sh && \ + echo "/opt/rabbitmq/sbin/rabbitmq-server &" >> /usr/local/bin/start.sh && \ + echo "" >> /usr/local/bin/start.sh && \ + echo "# Wait for RabbitMQ to start before launching the DBHub.io daemons" >> /usr/local/bin/start.sh && \ + echo "sleep 15" >> /usr/local/bin/start.sh && \ echo "" >> /usr/local/bin/start.sh && \ echo "su - dbhub -c 'if [ -f "${DBHUB_SOURCE}/.env" ]; then source ${DBHUB_SOURCE}/.env; fi; CONFIG_FILE=${CONFIG_FILE} /usr/local/bin/dbhub-webui >>/home/dbhub/output.log 2>&1 &'" >> /usr/local/bin/start.sh && \ echo "su - dbhub -c 'if [ -f "${DBHUB_SOURCE}/.env" ]; then source ${DBHUB_SOURCE}/.env; fi; CONFIG_FILE=${CONFIG_FILE} /usr/local/bin/dbhub-api >>/home/dbhub/output.log 2>&1 &'" >> /usr/local/bin/start.sh && \ diff --git a/docker/config.toml b/docker/config.toml index 420ca9c2a..9640ddca8 100644 --- a/docker/config.toml +++ b/docker/config.toml @@ -45,6 +45,14 @@ access_key = "minio" secret = "minio123" https = false +[mq] +cert_file = "" +key_file = "" +password = "guest" +port = 5672 +server = "localhost" +username = "guest" + [pg] database = "dbhub" num_connections = 5 diff --git a/go.mod b/go.mod index 8217476f9..bea79c343 100644 --- a/go.mod +++ b/go.mod @@ -50,6 +50,7 @@ require ( github.com/leodido/go-urn v1.2.0 // indirect github.com/memcachier/mc v2.0.1+incompatible // indirect github.com/microcosm-cc/bluemonday v1.0.16 // indirect + github.com/rabbitmq/amqp091-go v1.9.0 // indirect github.com/rogpeppe/go-internal v1.10.0 // indirect github.com/russross/blackfriday v2.0.0+incompatible // indirect github.com/sergi/go-diff v1.1.0 // indirect diff --git a/go.sum b/go.sum index ffbd003f6..d5d9545db 100644 --- a/go.sum +++ b/go.sum @@ -95,6 +95,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo= +github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= @@ -133,12 +135,16 @@ github.com/sqlitebrowser/blackfriday v9.0.0+incompatible/go.mod h1:/zga9sqpWzcew github.com/sqlitebrowser/github_flavored_markdown v0.0.0-20190120045821-b8cf8f054e47 h1:s0+Ea95n1LrsKh6rtclU/9Qb2/5ofvnfnR7gDDiFTw8= github.com/sqlitebrowser/github_flavored_markdown v0.0.0-20190120045821-b8cf8f054e47/go.mod h1:8vPIKi5FslxCXEgfQxrFtWfdclGy6VWAc9NA1ZTYCJg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= @@ -172,6 +178,7 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= @@ -179,3 +186,4 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/live/main.go b/live/main.go index 2bae81573..a17356e04 100644 --- a/live/main.go +++ b/live/main.go @@ -3,10 +3,14 @@ package main // Internal daemon for running SQLite queries sent by the other DBHub.io daemons import ( + "encoding/json" "errors" + "fmt" "log" "os" + "path/filepath" + sqlite "github.com/gwenn/gosqlite" com "github.com/sqlitebrowser/dbhub.io/common" ) @@ -55,15 +59,370 @@ func main() { } // Connect to the job queue - com.CheckJobQueue = make(chan struct{}) - err = com.ConnectQueue() + if !com.UseAMQP { + com.CheckJobQueue = make(chan struct{}) + } + ch, err := com.ConnectQueue() if err != nil { log.Fatal(err) } + // Make sure the channel to the AMQP server is still open + if com.UseAMQP { + // Create queue for receiving new database creation requests + createQueue, err := com.MQCreateDBQueue(ch) + if err != nil { + log.Fatal(err) + } + + // Start consuming database creation requests + createDBMsgs, err := ch.Consume(createQueue.Name, "", false, false, false, false, nil) + + go func() { + for d := range createDBMsgs { + // Decode JSON request + var req com.LiveDBRequest + err = json.Unmarshal(d.Body, &req) + if err != nil { + log.Println(err) + err = com.MQCreateResponse(d, ch, com.Conf.Live.Nodename, "failure") + if err != nil { + log.Printf("Error: occurred on live node '%s' in the create db code, while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) + } + continue + } + + // Verify that the object ID was passed through the interface correctly + objectID, ok := req.Data.(string) + if !ok { + err = com.MQCreateResponse(d, ch, com.Conf.Live.Nodename, "failure") + if err != nil { + log.Printf("Error: occurred on live node '%s' in the create db code, while converting the Minio object ID to a string: '%s'", com.Conf.Live.Nodename, err) + } + continue + } + + // Set up the live database locally + _, err = com.LiveRetrieveDatabaseMinio(com.Conf.Live.StorageDir, req.DBOwner, req.DBName, objectID) + if err != nil { + log.Println(err) + err = com.MQCreateResponse(d, ch, com.Conf.Live.Nodename, "failure") + if err != nil { + log.Printf("Error: occurred on live node '%s' in the create db code, while constructing an AMQP error message response (location 2): '%s'", com.Conf.Live.Nodename, err) + } + continue + } + + // Respond to the creation request with a success message + err = com.MQCreateResponse(d, ch, com.Conf.Live.Nodename, "success") + if err != nil { + continue + } + } + }() + + // Create the queue for receiving database queries + queryQueue, err := com.MQCreateQueryQueue(ch, com.Conf.Live.Nodename) + if err != nil { + log.Fatal(err) + } + + // Start consuming database query requests + requests, err := ch.Consume(queryQueue.Name, "", false, false, false, false, nil) + if err != nil { + log.Fatal(err) + } + go func() { + for msg := range requests { + if com.JobQueueDebug > 1 { + log.Printf("'%s' received AMQP REQUEST (of not-yet-determined type)", com.Conf.Live.Nodename) + } + + // Decode JSON request + var req com.LiveDBRequest + err = json.Unmarshal(msg.Body, &req) + if err != nil { + resp := com.LiveDBErrorResponse{Node: com.Conf.Live.Nodename, Error: err.Error()} + err = com.MQResponse("NOT-YET-DETERMINED", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' the main live node switch{} while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) + } + continue + } + + if com.JobQueueDebug > 1 { + log.Printf("Decoded request on '%s'. Correlation ID: '%s', request operation: '%s', request query: '%v'", com.Conf.Live.Nodename, msg.CorrelationId, req.Operation, req.Data) + } else if com.JobQueueDebug == 1 { + log.Printf("Decoded request on '%s'. Correlation ID: '%s', request operation: '%s'", com.Conf.Live.Nodename, msg.CorrelationId, req.Operation) + } + + // Handle each operation + switch req.Operation { + case "backup": + err = com.SQLiteBackupLive(com.Conf.Live.StorageDir, req.DBOwner, req.DBName) + if err != nil { + resp := com.LiveDBErrorResponse{Node: com.Conf.Live.Nodename, Error: err.Error()} + err = com.MQResponse("BACKUP", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) + } + continue + } + + if com.JobQueueDebug > 0 { + log.Printf("Running [BACKUP] on '%s/%s'", req.DBOwner, req.DBName) + } + + // Return a success message to the caller + resp := com.LiveDBErrorResponse{Node: com.Conf.Live.Nodename, Error: ""} // Use an empty error message to indicate success + err = com.MQResponse("BACKUP", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP backup response: '%s'", com.Conf.Live.Nodename, err) + } + continue + + case "columns": + columns, pk, err, errCode := com.SQLiteGetColumnsLive(com.Conf.Live.StorageDir, req.DBOwner, req.DBName, fmt.Sprintf("%s", req.Data)) + if err != nil { + resp := com.LiveDBColumnsResponse{Node: com.Conf.Live.Nodename, Columns: []sqlite.Column{}, PkColumns: nil, Error: err.Error(), ErrCode: errCode} + err = com.MQResponse("COLUMNS", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) + } + continue + } + + if com.JobQueueDebug > 0 { + log.Printf("Running [COLUMNS] on '%s/%s': '%s'", req.DBOwner, req.DBName, req.Data) + } + + // Return the columns list to the caller + resp := com.LiveDBColumnsResponse{Node: com.Conf.Live.Nodename, Columns: columns, PkColumns: pk, Error: "", ErrCode: com.JobQueueNoError} + err = com.MQResponse("COLUMNS", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP columns list response: '%s'", com.Conf.Live.Nodename, err) + } + continue + + case "delete": + // Delete the database file on the node + err = com.RemoveLiveDB(req.DBOwner, req.DBName) + if err != nil { + resp := com.LiveDBErrorResponse{Node: com.Conf.Live.Nodename, Error: err.Error()} + err = com.MQResponse("DELETE", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) + } + continue + } + + if com.JobQueueDebug > 0 { + log.Printf("Running [DELETE] on '%s/%s'", req.DBOwner, req.DBName) + } + + // Return a success message (empty string in this case) to the caller + resp := com.LiveDBErrorResponse{Node: com.Conf.Live.Nodename, Error: ""} + err = com.MQResponse("DELETE", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP delete database response: '%s'", com.Conf.Live.Nodename, err) + } + continue + + case "execute": + // Execute a SQL statement on the database file + var rowsChanged int + rowsChanged, err = com.SQLiteExecuteQueryLive(com.Conf.Live.StorageDir, req.DBOwner, req.DBName, req.RequestingUser, fmt.Sprintf("%s", req.Data)) + if err != nil { + resp := com.LiveDBExecuteResponse{Node: com.Conf.Live.Nodename, RowsChanged: 0, Error: err.Error()} + err = com.MQResponse("EXECUTE", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) + } + continue + } + + if com.JobQueueDebug > 0 { + log.Printf("Running [EXECUTE] on '%s/%s': '%s'", req.DBOwner, req.DBName, req.Data) + } + + // Return a success message to the caller + resp := com.LiveDBExecuteResponse{Node: com.Conf.Live.Nodename, RowsChanged: rowsChanged, Error: ""} + err = com.MQResponse("EXECUTE", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP execute query response: '%s'", com.Conf.Live.Nodename, err) + } + continue + + case "indexes": + var indexes []com.APIJSONIndex + indexes, err = com.SQLiteGetIndexesLive(com.Conf.Live.StorageDir, req.DBOwner, req.DBName) + if err != nil { + resp := com.LiveDBIndexesResponse{Node: com.Conf.Live.Nodename, Indexes: []com.APIJSONIndex{}, Error: err.Error()} + err = com.MQResponse("INDEXES", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) + } + continue + } + + if com.JobQueueDebug > 0 { + log.Printf("Running [INDEXES] on '%s/%s'", req.DBOwner, req.DBName) + } + + // Return the indexes list to the caller + resp := com.LiveDBIndexesResponse{Node: com.Conf.Live.Nodename, Indexes: indexes, Error: ""} + err = com.MQResponse("INDEXES", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP indexes list response: '%s'", com.Conf.Live.Nodename, err) + } + continue + + case "query": + var rows com.SQLiteRecordSet + rows, err = com.SQLiteRunQueryLive(com.Conf.Live.StorageDir, req.DBOwner, req.DBName, req.RequestingUser, fmt.Sprintf("%s", req.Data)) + if err != nil { + resp := com.LiveDBQueryResponse{Node: com.Conf.Live.Nodename, Results: com.SQLiteRecordSet{}, Error: err.Error()} + err = com.MQResponse("QUERY", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) + } + continue + } + + if com.JobQueueDebug > 0 { + log.Printf("Running [QUERY] on '%s/%s': '%s'", req.DBOwner, req.DBName, req.Data) + } + + // Return the query response to the caller + resp := com.LiveDBQueryResponse{Node: com.Conf.Live.Nodename, Results: rows, Error: ""} + err = com.MQResponse("QUERY", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP query response: '%s'", com.Conf.Live.Nodename, err) + } + continue + + case "rowdata": + // Extract the request information + // FIXME: Add type checks for safety instead of blind coercing + reqData := req.Data.(map[string]interface{}) + dbTable := reqData["db_table"].(string) + sortCol := reqData["sort_col"].(string) + sortDir := reqData["sort_dir"].(string) + commitID := reqData["commit_id"].(string) + maxRows := int(reqData["max_rows"].(float64)) + rowOffset := int(reqData["row_offset"].(float64)) + + // Open the SQLite database and read the row data + resp := com.LiveDBRowsResponse{Node: com.Conf.Live.Nodename, RowData: com.SQLiteRecordSet{}} + resp.Tables, resp.DefaultTable, resp.RowData, resp.DatabaseSize, err = + com.SQLiteReadDatabasePage("", "", req.RequestingUser, req.DBOwner, req.DBName, dbTable, sortCol, sortDir, commitID, rowOffset, maxRows, true) + if err != nil { + resp := com.LiveDBErrorResponse{Node: com.Conf.Live.Nodename, Error: err.Error()} + err = com.MQResponse("ROWDATA", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) + } + continue + } + + if com.JobQueueDebug > 0 { + log.Printf("Running [ROWDATA] on '%s/%s'", req.DBOwner, req.DBName) + } + + // Return the row data to the caller + err = com.MQResponse("ROWDATA", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP query response: '%s'", com.Conf.Live.Nodename, err) + } + continue + + case "size": + dbPath := filepath.Join(com.Conf.Live.StorageDir, req.DBOwner, req.DBName, "live.sqlite") + var db os.FileInfo + db, err = os.Stat(dbPath) + if err != nil { + resp := com.LiveDBSizeResponse{Node: com.Conf.Live.Nodename, Size: 0, Error: err.Error()} + err = com.MQResponse("SIZE", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) + } + continue + } + + if com.JobQueueDebug > 0 { + log.Printf("Running [SIZE] on '%s/%s'", req.DBOwner, req.DBName) + } + + // Return the database size to the caller + resp := com.LiveDBSizeResponse{Node: com.Conf.Live.Nodename, Size: db.Size(), Error: ""} + err = com.MQResponse("SIZE", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP size response: '%s'", com.Conf.Live.Nodename, err) + } + continue + + case "tables": + var tables []string + tables, err = com.SQLiteGetTablesLive(com.Conf.Live.StorageDir, req.DBOwner, req.DBName) + if err != nil { + resp := com.LiveDBTablesResponse{Node: com.Conf.Live.Nodename, Tables: nil, Error: err.Error()} + err = com.MQResponse("TABLES", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) + } + continue + } + + if com.JobQueueDebug > 0 { + log.Printf("Running [TABLES] on '%s/%s'", req.DBOwner, req.DBName) + } + + // Return the tables list to the caller + resp := com.LiveDBTablesResponse{Node: com.Conf.Live.Nodename, Tables: tables, Error: ""} + err = com.MQResponse("TABLES", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP tables list response: '%s'", com.Conf.Live.Nodename, err) + } + continue + + case "views": + var views []string + views, err = com.SQLiteGetViewsLive(com.Conf.Live.StorageDir, req.DBOwner, req.DBName) + if err != nil { + resp := com.LiveDBViewsResponse{Node: com.Conf.Live.Nodename, Views: nil, Error: err.Error()} + err = com.MQResponse("VIEWS", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) + } + continue + } + + if com.JobQueueDebug > 0 { + log.Printf("Running [VIEWS] on '%s/%s'", req.DBOwner, req.DBName) + } + + // Return the views list to the caller + resp := com.LiveDBViewsResponse{Node: com.Conf.Live.Nodename, Views: views, Error: ""} + err = com.MQResponse("VIEWS", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP views list response: '%s'", com.Conf.Live.Nodename, err) + } + continue + + default: + log.Printf("'%s' received unknown '%s' request on this queue for %s/%s", com.Conf.Live.Nodename, req.Operation, req.DBOwner, req.DBName) + } + } + }() + } + + log.Printf("Live server '%s' listening for requests", com.Conf.Live.Nodename) + // Launch go workers to process submitted jobs - go com.JobQueueCheck() - go com.JobQueueListen() + if !com.UseAMQP { + go com.JobQueueCheck() + go com.JobQueueListen() + } // Launch goroutine event generator for checking submitted jobs // TODO: This seems to work fine, but is kind of a pita to have enabled while developing this code atm. So we disable it for now. @@ -84,4 +443,9 @@ func main() { // Endless loop var forever chan struct{} <-forever + + // Close the channel to the MQ server + if com.UseAMQP { + _ = com.CloseMQChannel(ch) + } } diff --git a/package.json b/package.json index 358fe3149..c728f3dc3 100644 --- a/package.json +++ b/package.json @@ -42,7 +42,12 @@ "docker:start": "docker run -itd --rm --name dbhub-build -p 9443-9445:9443-9445/tcp -p 5550:5550/tcp dbhub-build:latest", "docker:startlocal": "docker run -itd --rm --name dbhub-build --net host --mount type=bind,src=\"$(pwd)\",target=/dbhub.io dbhub-build:latest", "docker:stop": "docker container stop dbhub-build", - "docker:tail": "docker exec -it dbhub-build tail -F /home/dbhub/output.log" + "docker:tail": "docker exec -it dbhub-build tail -F /home/dbhub/output.log", + "mq:bind": "docker exec -it dbhub-build rabbitmqctl list_bindings", + "mq:conn": "docker exec -it dbhub-build rabbitmqctl list_connections", + "mq:cons": "docker exec -it dbhub-build rabbitmqctl list_consumers", + "mq:ex": "docker exec -it dbhub-build rabbitmqctl list_exchanges", + "mq:q": "docker exec -it dbhub-build rabbitmqctl list_queues" }, "engines": { "node": "^20.10.0", diff --git a/standalone/analysis/main.go b/standalone/analysis/main.go index 44139f3c0..41d724583 100644 --- a/standalone/analysis/main.go +++ b/standalone/analysis/main.go @@ -41,7 +41,7 @@ func main() { // Connect to job queue server com.Conf.Live.Nodename = "Usage Analysis" - err = com.ConnectQueue() + com.AmqpChan, err = com.ConnectQueue() if err != nil { log.Fatal(err) } diff --git a/webui/main.go b/webui/main.go index d5897d130..c8214723f 100644 --- a/webui/main.go +++ b/webui/main.go @@ -3164,7 +3164,7 @@ func main() { } // Connect to job queue server - err = com.ConnectQueue() + com.AmqpChan, err = com.ConnectQueue() if err != nil { log.Fatal(err) } @@ -3188,11 +3188,13 @@ func main() { go com.SendEmails() // Start background goroutines to handle job queue responses - com.ResponseWaiters = com.NewResponseReceiver() - com.CheckResponsesQueue = make(chan struct{}) - com.SubmitterInstance = com.RandomString(3) - go com.ResponseQueueCheck() - go com.ResponseQueueListen() + if !com.UseAMQP { + com.ResponseWaiters = com.NewResponseReceiver() + com.CheckResponsesQueue = make(chan struct{}) + com.SubmitterInstance = com.RandomString(3) + go com.ResponseQueueCheck() + go com.ResponseQueueListen() + } // Our pages http.Handle("/", gz.GzipHandler(logReq(mainHandler))) @@ -3503,6 +3505,11 @@ func main() { if err != nil { log.Println(err) } + + err = com.CloseMQChannel(com.AmqpChan) + if err != nil { + log.Fatal(err) + } } func mainHandler(w http.ResponseWriter, r *http.Request) { @@ -5677,7 +5684,7 @@ func uploadDataHandler(w http.ResponseWriter, r *http.Request) { com.SanitiseLogString(dbOwner), com.SanitiseLogString(dbName), numBytes) // Send a request to the job queue to set up the database - liveNode, err := com.LiveCreateDB(dbOwner, dbName, objectID) + liveNode, err := com.LiveCreateDB(com.AmqpChan, dbOwner, dbName, objectID) if err != nil { log.Println(err) errorPage(w, r, http.StatusInternalServerError, err.Error())