Skip to content

Commit

Permalink
Add back the AMQP code (hopefully temporarily)
Browse files Browse the repository at this point in the history
It's likely safer for our job queue migration to have the DBHub.io
daemons be able to run with either job queue system.

That way if our new job queue code has problems we can revert to
using AMQP until the problems are fixed.

With this commit the daemons can run in either AMQP mode or job
queue server mode.  There's a hard coded boolean value near the
top of common/live_amqp.go to switch between them.

The Cypress tests pass in both modes, so things should be ok.
  • Loading branch information
justinclift committed Dec 11, 2023
1 parent e1cf6d5 commit 4001719
Show file tree
Hide file tree
Showing 17 changed files with 1,247 additions and 178 deletions.
2 changes: 1 addition & 1 deletion api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1340,7 +1340,7 @@ func uploadHandler(w http.ResponseWriter, r *http.Request) {
com.SanitiseLogString(dbOwner), com.SanitiseLogString(dbName), numBytes)

// Send a request to the job queue to set up the database
liveNode, err := com.LiveCreateDB(dbOwner, dbName, objectID)
liveNode, err := com.LiveCreateDB(com.AmqpChan, dbOwner, dbName, objectID)
if err != nil {
log.Println(err)
jsonErr(w, err.Error(), http.StatusInternalServerError)
Expand Down
14 changes: 8 additions & 6 deletions api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func main() {
}

// Connect to job queue server
err = com.ConnectQueue()
com.AmqpChan, err = com.ConnectQueue()
if err != nil {
log.Fatal(err)
}
Expand All @@ -98,11 +98,13 @@ func main() {
}

// Start background goroutines to handle job queue responses
com.ResponseWaiters = com.NewResponseReceiver()
com.CheckResponsesQueue = make(chan struct{})
com.SubmitterInstance = com.RandomString(3)
go com.ResponseQueueCheck()
go com.ResponseQueueListen()
if !com.UseAMQP {
com.ResponseWaiters = com.NewResponseReceiver()
com.CheckResponsesQueue = make(chan struct{})
com.SubmitterInstance = com.RandomString(3)
go com.ResponseQueueCheck()
go com.ResponseQueueListen()
}

// Load our self signed CA chain
ourCAPool = x509.NewCertPool()
Expand Down
10 changes: 10 additions & 0 deletions common/config_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type TomlConfig struct {
Live LiveInfo
Memcache MemcacheInfo
Minio MinioInfo
MQ MQInfo
Pg PGInfo
Sign SigningInfo
UserMgmt UserMgmtInfo
Expand Down Expand Up @@ -92,6 +93,15 @@ type MinioInfo struct {
Server string
}

type MQInfo struct {
CertFile string `toml:"cert_file"`
KeyFile string `toml:"key_file"`
Password string `toml:"password"`
Port int `toml:"port"`
Server string `toml:"server"`
Username string `toml:"username"`
}

// PGInfo contains the PostgreSQL connection parameters
type PGInfo struct {
Database string
Expand Down
2 changes: 1 addition & 1 deletion common/cypress.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func CypressSeed(w http.ResponseWriter, r *http.Request) {
// Send the live database file to our job queue backend for setup
dbOwner := "default"
dbName := "Join Testing with index.sqlite"
liveNode, err := LiveCreateDB(dbOwner, dbName, objectID)
liveNode, err := LiveCreateDB(AmqpChan, dbOwner, dbName, objectID)
if err != nil {
log.Println(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down
Loading

0 comments on commit 4001719

Please sign in to comment.