From 48cef8056614b7230a47c2d74a9f8b09aaad0ca9 Mon Sep 17 00:00:00 2001 From: Redouane Lakrache Date: Thu, 9 Nov 2023 20:02:10 +0100 Subject: [PATCH 01/12] feat: Add Relayer struct --- pkg/relayer/interface.go | 2 +- pkg/relayer/miner/miner.go | 18 +++++++------- pkg/relayer/proxy/proxy.go | 2 +- pkg/relayer/relayer.go | 49 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 60 insertions(+), 11 deletions(-) create mode 100644 pkg/relayer/relayer.go diff --git a/pkg/relayer/interface.go b/pkg/relayer/interface.go index d0aa062fc..3af354bb7 100644 --- a/pkg/relayer/interface.go +++ b/pkg/relayer/interface.go @@ -20,7 +20,7 @@ import ( type Miner interface { MineRelays( ctx context.Context, - servedRelays observable.Observable[servicetypes.Relay], + servedRelays observable.Observable[*servicetypes.Relay], ) } diff --git a/pkg/relayer/miner/miner.go b/pkg/relayer/miner/miner.go index a1f3043e9..380951baf 100644 --- a/pkg/relayer/miner/miner.go +++ b/pkg/relayer/miner/miner.go @@ -41,7 +41,7 @@ type miner struct { // minedRelay is a wrapper around a relay that has been serialized and hashed. type minedRelay struct { - servicetypes.Relay + *servicetypes.Relay bytes []byte hash []byte } @@ -82,13 +82,13 @@ func NewMiner( // It does not block as map operations run in their own goroutines. func (mnr *miner) MineRelays( ctx context.Context, - servedRelays observable.Observable[servicetypes.Relay], + servedRelays observable.Observable[*servicetypes.Relay], ) { // sessiontypes.Relay ==> either.Either[minedRelay] eitherMinedRelays := mnr.mineRelays(ctx, servedRelays) // either.Either[minedRelay] ==> error - miningErrors := mnr.addReplayToSessionTree(ctx, eitherMinedRelays) + miningErrors := mnr.addRelayToSessionTree(ctx, eitherMinedRelays) logging.LogErrors(ctx, miningErrors) claimedSessions := mnr.createClaims(ctx) @@ -161,20 +161,20 @@ func (mnr *miner) validateConfigAndSetDefaults() error { // method to each relay. It returns an observable of the mined relays. func (mnr *miner) mineRelays( ctx context.Context, - servedRelays observable.Observable[servicetypes.Relay], + servedRelays observable.Observable[*servicetypes.Relay], ) observable.Observable[either.Either[minedRelay]] { // servicetypes.Relay ==> either.Either[minedRelay] return channel.Map(ctx, servedRelays, mnr.mapMineRelay) } // mapMineRelay is intended to be used as a MapFn. It hashes the relay and compares -// its difficulty to the minimum threshold. If the relay difficulty is sifficient, +// its difficulty to the minimum threshold. If the relay difficulty is sufficient, // it returns an either populated with the minedRelay value. Otherwise, it skips // the relay. If it encounters an error, it returns an either populated with the // error. func (mnr *miner) mapMineRelay( _ context.Context, - relay servicetypes.Relay, + relay *servicetypes.Relay, ) (_ either.Either[minedRelay], skip bool) { relayBz, err := relay.Marshal() if err != nil { @@ -198,10 +198,10 @@ func (mnr *miner) mapMineRelay( }), false } -// addReplayToSessionTree maps over the eitherMinedRelays observable, applying the +// addRelayToSessionTree maps over the eitherMinedRelays observable, applying the // mapAddRelayToSessionTree method to each relay. It returns an observable of the // errors encountered. -func (mnr *miner) addReplayToSessionTree( +func (mnr *miner) addRelayToSessionTree( ctx context.Context, eitherMinedRelays observable.Observable[either.Either[minedRelay]], ) observable.Observable[error] { @@ -311,7 +311,7 @@ func (mnr *miner) waitForBlock(ctx context.Context, height int64) client.Block { } // newMapClaimSessionFn returns a new MapFn that creates a claim for the given -// session. Any session which encouters errors while creating a claim is sent +// session. Any session which encounters errors while creating a claim is sent // on the failedCreateClaimSessions channel. func (mnr *miner) newMapClaimSessionFn( failedCreateClaimSessions chan<- relayer.SessionTree, diff --git a/pkg/relayer/proxy/proxy.go b/pkg/relayer/proxy/proxy.go index 6c64516cd..13309b47c 100644 --- a/pkg/relayer/proxy/proxy.go +++ b/pkg/relayer/proxy/proxy.go @@ -102,7 +102,7 @@ func NewRelayerProxy( } // Start concurrently starts all advertised relay servers and returns an error if any of them fails to start. -// This method is blocking until all RelayServers are started. +// This method is blocking as long as all RelayServers are running. func (rp *relayerProxy) Start(ctx context.Context) error { // The provided services map is built from the supplier's on-chain advertised information, // which is a runtime parameter that can be changed by the supplier. diff --git a/pkg/relayer/relayer.go b/pkg/relayer/relayer.go new file mode 100644 index 000000000..bdad5bb1d --- /dev/null +++ b/pkg/relayer/relayer.go @@ -0,0 +1,49 @@ +package relayer + +import ( + "context" + + "cosmossdk.io/depinject" +) + +type RelayerOption func(*Relayer) + +// Relayer is the main struct that encapsulates the relayer's responsibilities. +// It starts and stops the RelayerProxy and provide the served relays observable to them miner. +type Relayer struct { + relayerProxy RelayerProxy + miner Miner +} + +// NewRelayer creates a new Relayer instance with the given dependencies. +// It injects the dependencies into the Relayer instance and returns it. +func NewRelayer( + deps depinject.Config, + opts ...RelayerOption, +) (*Relayer, error) { + rel := &Relayer{} + + if err := depinject.Inject(deps, &rel.relayerProxy, &rel.miner); err != nil { + return nil, err + } + + for _, opt := range opts { + opt(rel) + } + + return rel, nil +} + +// Start provides the miner with the served relays observable and starts the relayer proxy. +// This method is blocking while the relayer proxy is running and returns when Stop is called +// or when the relayer proxy fails to start. +func (rel *Relayer) Start(ctx context.Context) error { + rel.miner.MineRelays(ctx, rel.relayerProxy.ServedRelays()) + return rel.relayerProxy.Start(ctx) +} + +// Stop stops the relayer proxy which in turn stops all advertised relay servers +// and unsubscribes the miner from the served relays observable. +func (rel *Relayer) Stop(ctx context.Context) error { + return rel.relayerProxy.Stop(ctx) +} From cba145e67b49bae2089b0ea334f0666d28d544e5 Mon Sep 17 00:00:00 2001 From: Redouane Lakrache Date: Thu, 9 Nov 2023 22:16:30 +0100 Subject: [PATCH 02/12] chore: Rename to RelayMiner --- pkg/relayer/relayer.go | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/pkg/relayer/relayer.go b/pkg/relayer/relayer.go index bdad5bb1d..a5225557a 100644 --- a/pkg/relayer/relayer.go +++ b/pkg/relayer/relayer.go @@ -6,22 +6,22 @@ import ( "cosmossdk.io/depinject" ) -type RelayerOption func(*Relayer) +type RelayerOption func(*relayMiner) -// Relayer is the main struct that encapsulates the relayer's responsibilities. +// relayMiner is the main struct that encapsulates the relayer's responsibilities (i.e. Relay Mining). // It starts and stops the RelayerProxy and provide the served relays observable to them miner. -type Relayer struct { +type relayMiner struct { relayerProxy RelayerProxy miner Miner } -// NewRelayer creates a new Relayer instance with the given dependencies. +// NewRelayMiner creates a new Relayer instance with the given dependencies. // It injects the dependencies into the Relayer instance and returns it. -func NewRelayer( +func NewRelayMiner( deps depinject.Config, opts ...RelayerOption, -) (*Relayer, error) { - rel := &Relayer{} +) (*relayMiner, error) { + rel := &relayMiner{} if err := depinject.Inject(deps, &rel.relayerProxy, &rel.miner); err != nil { return nil, err @@ -37,13 +37,14 @@ func NewRelayer( // Start provides the miner with the served relays observable and starts the relayer proxy. // This method is blocking while the relayer proxy is running and returns when Stop is called // or when the relayer proxy fails to start. -func (rel *Relayer) Start(ctx context.Context) error { - rel.miner.MineRelays(ctx, rel.relayerProxy.ServedRelays()) +func (rel *relayMiner) Start(ctx context.Context) error { + // MineRelays does not block and subscribes to the served relays observable. + rel.miner.StartMiningRelays(ctx, rel.relayerProxy.ServedRelays()) return rel.relayerProxy.Start(ctx) } // Stop stops the relayer proxy which in turn stops all advertised relay servers // and unsubscribes the miner from the served relays observable. -func (rel *Relayer) Stop(ctx context.Context) error { +func (rel *relayMiner) Stop(ctx context.Context) error { return rel.relayerProxy.Stop(ctx) } From c81c274d583d775f1e983525be67fd267376c048 Mon Sep 17 00:00:00 2001 From: Redouane Lakrache Date: Thu, 9 Nov 2023 22:18:13 +0100 Subject: [PATCH 03/12] chore: Rename relay miner file --- pkg/relayer/{relayer.go => relayminer.go} | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) rename pkg/relayer/{relayer.go => relayminer.go} (94%) diff --git a/pkg/relayer/relayer.go b/pkg/relayer/relayminer.go similarity index 94% rename from pkg/relayer/relayer.go rename to pkg/relayer/relayminer.go index a5225557a..6cf9bbf97 100644 --- a/pkg/relayer/relayer.go +++ b/pkg/relayer/relayminer.go @@ -23,7 +23,11 @@ func NewRelayMiner( ) (*relayMiner, error) { rel := &relayMiner{} - if err := depinject.Inject(deps, &rel.relayerProxy, &rel.miner); err != nil { + if err := depinject.Inject( + deps, + &rel.relayerProxy, + &rel.miner, + ); err != nil { return nil, err } From 0e38e7dc9a183ea88413957179f3329b1f2956b2 Mon Sep 17 00:00:00 2001 From: Redouane Lakrache Date: Thu, 9 Nov 2023 22:20:44 +0100 Subject: [PATCH 04/12] chore: Remove unused RelayerOption parameter --- pkg/relayer/relayminer.go | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/pkg/relayer/relayminer.go b/pkg/relayer/relayminer.go index 6cf9bbf97..16c16f7b9 100644 --- a/pkg/relayer/relayminer.go +++ b/pkg/relayer/relayminer.go @@ -6,8 +6,6 @@ import ( "cosmossdk.io/depinject" ) -type RelayerOption func(*relayMiner) - // relayMiner is the main struct that encapsulates the relayer's responsibilities (i.e. Relay Mining). // It starts and stops the RelayerProxy and provide the served relays observable to them miner. type relayMiner struct { @@ -17,10 +15,7 @@ type relayMiner struct { // NewRelayMiner creates a new Relayer instance with the given dependencies. // It injects the dependencies into the Relayer instance and returns it. -func NewRelayMiner( - deps depinject.Config, - opts ...RelayerOption, -) (*relayMiner, error) { +func NewRelayMiner(deps depinject.Config) (*relayMiner, error) { rel := &relayMiner{} if err := depinject.Inject( @@ -31,10 +26,6 @@ func NewRelayMiner( return nil, err } - for _, opt := range opts { - opt(rel) - } - return rel, nil } From e0414728e3240d68a995cabed5aa76e49d76eeb5 Mon Sep 17 00:00:00 2001 From: Redouane Lakrache Date: Thu, 9 Nov 2023 22:25:34 +0100 Subject: [PATCH 05/12] chore: update start mining comment --- pkg/relayer/relayminer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/relayer/relayminer.go b/pkg/relayer/relayminer.go index 16c16f7b9..f4f8b6981 100644 --- a/pkg/relayer/relayminer.go +++ b/pkg/relayer/relayminer.go @@ -33,7 +33,7 @@ func NewRelayMiner(deps depinject.Config) (*relayMiner, error) { // This method is blocking while the relayer proxy is running and returns when Stop is called // or when the relayer proxy fails to start. func (rel *relayMiner) Start(ctx context.Context) error { - // MineRelays does not block and subscribes to the served relays observable. + // StartMiningRelays does not block, it only subscribes to the served relays observable. rel.miner.StartMiningRelays(ctx, rel.relayerProxy.ServedRelays()) return rel.relayerProxy.Start(ctx) } From 8cf07843b42c26e60ab527cd1752071c9b586fcb Mon Sep 17 00:00:00 2001 From: Redouane Lakrache Date: Thu, 9 Nov 2023 22:28:40 +0100 Subject: [PATCH 06/12] fix: Update Miner interface --- pkg/relayer/interface.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/relayer/interface.go b/pkg/relayer/interface.go index 3af354bb7..6a048bbaa 100644 --- a/pkg/relayer/interface.go +++ b/pkg/relayer/interface.go @@ -18,7 +18,7 @@ import ( // - "Creating claims": The session SMST is flushed and a claim is created on-chain. // - "Submitting proofs": The session SMST is proven and a proof is submitted on-chain. type Miner interface { - MineRelays( + StartMiningRelays( ctx context.Context, servedRelays observable.Observable[*servicetypes.Relay], ) From 5af2ae9e9869e759df615c7e3517ae5d02731c3d Mon Sep 17 00:00:00 2001 From: Bryan White Date: Fri, 10 Nov 2023 22:00:49 +0100 Subject: [PATCH 07/12] [Miner] feat: add `Miner` component (#168) * refactor: `MapFn`s receive context arg * chore: add `ForEach` map shorthand operator * chore: add `/pkg/observable/filter` * chore: add `/pkg/observable/logging` * chore: add `/pkg/relayer/protocol` * chore: add `Miner` interface * feat: add `Miner` implementation * test: `Miner` implementation * chore: fix comment * chore: add godoc comments * [Test] First step for automated E2E Relay test (#167) - Fixed helpers for localnet regenesis - Added an application & supplier to the genesis file - Initializing appMap & supplierMap in E2E tests - Add support for the app's codec (for unmarshaling responses) in E2E tests - Adding a placeholder for `e2e/tests/relay.feature` --- Co-authored-by: harry <53987565+h5law@users.noreply.github.com> * [Relayer] refactor: simplify `RelayerSessionsManager` (#169) * refactor: `MapFn`s receive context arg * feat: add `MapExpand` observable operator * refactor: `RelayerSessionsManager` to be more reactive * chore: add godoc comment * chore: review feedback improvements * trigger CI * chore: review feedback improvements Co-authored-by: Daniel Olshansky * chore: review feedback improvements * fix: import cycle & goimports * chore: review feedback improvements * chore: cleanup TODO_THIS_COMMIT comments * chore: improve var & func names for clarity and consistency * refactor: move claim/proof lifecycle concerns to `relayerSessionsManager`. * chore: review feedback improvements * chore: review feedback improvements * refactor: `miner#hash()` method * chore: tidy up * chore: simplify * chore: review feedback improvements Co-authored-by: Daniel Olshansky * chore: review feedback improvements Co-authored-by: Daniel Olshansky * chore: review feedback improvements Co-authored-by: Daniel Olshansky * chore: review feedback improvements * chore: review feedback improvements * fix: incomplete refactor * chore: simplify --------- Co-authored-by: Daniel Olshansky Co-authored-by: harry <53987565+h5law@users.noreply.github.com> --- pkg/either/types.go | 7 +- pkg/observable/types.go | 5 ++ pkg/relayer/interface.go | 50 ++++++++--- pkg/relayer/miner/miner.go | 122 ++++++++++++++++++++++++++ pkg/relayer/miner/miner_test.go | 10 +++ pkg/relayer/protocol/block_heights.go | 45 ++++++++++ pkg/relayer/protocol/difficulty.go | 17 ++++ pkg/relayer/session/claim.go | 117 ++++++++++++++++++++++++ pkg/relayer/session/proof.go | 116 ++++++++++++++++++++++++ pkg/relayer/session/session.go | 87 ++++++++++++++++-- pkg/relayer/types.go | 10 +++ 11 files changed, 565 insertions(+), 21 deletions(-) create mode 100644 pkg/observable/types.go create mode 100644 pkg/relayer/miner/miner.go create mode 100644 pkg/relayer/miner/miner_test.go create mode 100644 pkg/relayer/protocol/block_heights.go create mode 100644 pkg/relayer/protocol/difficulty.go create mode 100644 pkg/relayer/session/claim.go create mode 100644 pkg/relayer/session/proof.go create mode 100644 pkg/relayer/types.go diff --git a/pkg/either/types.go b/pkg/either/types.go index ae7092479..4f5f53f00 100644 --- a/pkg/either/types.go +++ b/pkg/either/types.go @@ -1,9 +1,12 @@ package either +import "github.com/pokt-network/poktroll/pkg/relayer" + type ( // AsyncError represents a value which could either be a synchronous error or // an asynchronous error (sent through a channel). It wraps the more generic // `Either` type specific for error channels. - AsyncError Either[chan error] - Bytes = Either[[]byte] + AsyncError Either[chan error] + Bytes = Either[[]byte] + SessionTree = Either[relayer.SessionTree] ) diff --git a/pkg/observable/types.go b/pkg/observable/types.go new file mode 100644 index 000000000..04df98201 --- /dev/null +++ b/pkg/observable/types.go @@ -0,0 +1,5 @@ +package observable + +type ( + Error = Observable[error] +) diff --git a/pkg/relayer/interface.go b/pkg/relayer/interface.go index 24ef17dfd..134ac8f53 100644 --- a/pkg/relayer/interface.go +++ b/pkg/relayer/interface.go @@ -7,10 +7,23 @@ import ( "github.com/pokt-network/poktroll/pkg/observable" "github.com/pokt-network/poktroll/x/service/types" + servicetypes "github.com/pokt-network/poktroll/x/service/types" sessiontypes "github.com/pokt-network/poktroll/x/session/types" sharedtypes "github.com/pokt-network/poktroll/x/shared/types" ) +// Miner is responsible for observing servedRelayObs, hashing and checking the +// difficulty of each, finally publishing those with sufficient difficulty to +// minedRelayObs as they are applicable for relay volume. +type Miner interface { + MinedRelays( + ctx context.Context, + servedRelayObs observable.Observable[*servicetypes.Relay], + ) (minedRelaysObs observable.Observable[*MinedRelay]) +} + +type MinerOption func(Miner) + // RelayerProxy is the interface for the proxy that serves relays to the application. // It is responsible for starting and stopping all supported RelayServers. // While handling requests and responding in a closed loop, it also notifies @@ -59,19 +72,32 @@ type RelayServer interface { Service() *sharedtypes.Service } -// RelayerSessionsManager is an interface for managing the relayer's sessions and Sparse -// Merkle Sum Trees (SMSTs). It provides notifications about closing sessions that are -// ready to be claimed, and handles the creation and retrieval of SMSTs for a given session. -// It also handles the creation and retrieval of SMSTs for a given session. +// RelayerSessionsManager is responsible for managing the relayer's session lifecycles. +// It handles the creation and retrieval of SMSTs (trees) for a given session, as +// well as the respective and subsequent claim creation and proof submission. +// This is largely accomplished by pipelining observables of relays and sessions +// through a series of map operations. +// +// TODO_TECHDEBT: add architecture diagrams covering observable flows throughout +// the relayer package. type RelayerSessionsManager interface { - // SessionsToClaim returns an observable that notifies of sessions ready to be claimed. - SessionsToClaim() observable.Observable[SessionTree] - - // EnsureSessionTree returns the SMST (Sparse Merkle State Tree) for a given session header. - // It is used to retrieve the SMST and update it when a Relay has been successfully served. - // If the session is seen for the first time, it creates a new SMST for it before returning it. - // An error is returned if the corresponding KVStore for SMST fails to be created. - EnsureSessionTree(sessionHeader *sessiontypes.SessionHeader) (SessionTree, error) + // InsertRelays receives an observable of relays that should be included + // in their respective session's SMST (tree). + InsertRelays(minedRelaysObs observable.Observable[*MinedRelay]) + + // Start iterates over the session trees at the end of each, respective, session. + // The session trees are piped through a series of map operations which progress + // them through the claim/proof lifecycle, broadcasting transactions to the + // network as necessary. + Start(ctx context.Context) + + // Stop unsubscribes all observables from the InsertRelays observable which + // will close downstream observables as they drain. + // + // TODO_TECHDEBT: Either add a mechanism to wait for draining to complete + // and/or ensure that the state at each pipeline stage is persisted to disk + // and exit as early as possible. + Stop() } type RelayerSessionsManagerOption func(RelayerSessionsManager) diff --git a/pkg/relayer/miner/miner.go b/pkg/relayer/miner/miner.go new file mode 100644 index 000000000..79d905ac7 --- /dev/null +++ b/pkg/relayer/miner/miner.go @@ -0,0 +1,122 @@ +package miner + +import ( + "context" + "crypto/sha256" + "hash" + + "github.com/pokt-network/poktroll/pkg/either" + "github.com/pokt-network/poktroll/pkg/observable" + "github.com/pokt-network/poktroll/pkg/observable/channel" + "github.com/pokt-network/poktroll/pkg/observable/filter" + "github.com/pokt-network/poktroll/pkg/observable/logging" + "github.com/pokt-network/poktroll/pkg/relayer" + "github.com/pokt-network/poktroll/pkg/relayer/protocol" + servicetypes "github.com/pokt-network/poktroll/x/service/types" +) + +var ( + _ relayer.Miner = (*miner)(nil) + defaultRelayHasher = sha256.New + // TODO_BLOCKER: query on-chain governance params once available. + // Setting this to 0 to effectively disables mining for now. + // I.e., all relays are added to the tree. + defaultRelayDifficulty = 0 +) + +// Miner is responsible for observing servedRelayObs, hashing and checking the +// difficulty of each, finally publishing those with sufficient difficulty to +// minedRelayObs as they are applicable for relay volume. +// +// TODO_BLOCKER: The relay hashing and relay difficulty mechanisms & values must come +type miner struct { + // relayHasher is a function which returns a hash.Hash interfact type. It is + // used to hash serialized relays to measure their mining difficulty. + relayHasher func() hash.Hash + // relayDifficulty is the minimum difficulty that a relay must have to be + // volume / reward applicable. + relayDifficulty int +} + +// NewMiner creates a new miner from the given dependencies and options. It +// returns an error if it has not been sufficiently configured or supplied. +func NewMiner( + opts ...relayer.MinerOption, +) (*miner, error) { + mnr := &miner{} + + for _, opt := range opts { + opt(mnr) + } + + mnr.setDefaults() + + return mnr, nil +} + +// MinedRelays maps servedRelaysObs through a pipeline which: +// 1. Hashes the relay +// 2. Checks if it's above the mining difficulty +// 3. Adds it to the session tree if so +// It DOES NOT BLOCK as map operations run in their own goroutines. +func (mnr *miner) MinedRelays( + ctx context.Context, + servedRelaysObs observable.Observable[*servicetypes.Relay], +) observable.Observable[*relayer.MinedRelay] { + // Map servedRelaysObs to a new observable of an either type, populated with + // the minedRelay or an error. It is notified after the relay has been mined + // or an error has been encountered, respectively. + eitherMinedRelaysObs := channel.Map(ctx, servedRelaysObs, mnr.mapMineRelay) + logging.LogErrors(ctx, filter.EitherError(ctx, eitherMinedRelaysObs)) + + return filter.EitherSuccess(ctx, eitherMinedRelaysObs) +} + +// setDefaults ensures that the miner has been configured with a hasherConstructor and uses +// the default hasherConstructor if not. +func (mnr *miner) setDefaults() { + if mnr.relayHasher == nil { + mnr.relayHasher = defaultRelayHasher + } +} + +// mapMineRelay is intended to be used as a MapFn. +// 1. It hashes the relay and compares its difficult to the minimum threshold. +// 2. If the relay difficulty is sufficient -> return an Either[MineRelay Value] +// 3. If an error is encountered -> return an Either[error] +// 4. Otherwise, skip the relay. +func (mnr *miner) mapMineRelay( + _ context.Context, + relay *servicetypes.Relay, +) (_ either.Either[*relayer.MinedRelay], skip bool) { + relayBz, err := relay.Marshal() + if err != nil { + return either.Error[*relayer.MinedRelay](err), false + } + + // TODO_BLOCKER: Centralize the logic of hashing a relay. It should live + // alongside signing & verification. + // + // TODO_IMPROVE: We need to hash the key; it would be nice if smst.Update() could do it + // since smst has a reference to the hasherConstructor + relayHash := mnr.hash(relayBz) + + // The relay IS NOT volume / reward applicable + if !protocol.BytesDifficultyGreaterThan(relayHash, defaultRelayDifficulty) { + return either.Success[*relayer.MinedRelay](nil), true + } + + // The relay IS volume / reward applicable + return either.Success(&relayer.MinedRelay{ + Relay: *relay, + Bytes: relayBz, + Hash: relayHash, + }), false +} + +// hash constructs a new hasher and hashes the given input bytes. +func (mnr *miner) hash(inputBz []byte) []byte { + hasher := mnr.relayHasher() + hasher.Write(inputBz) + return hasher.Sum(nil) +} diff --git a/pkg/relayer/miner/miner_test.go b/pkg/relayer/miner/miner_test.go new file mode 100644 index 000000000..c362005bd --- /dev/null +++ b/pkg/relayer/miner/miner_test.go @@ -0,0 +1,10 @@ +package miner_test + +import ( + "testing" +) + +// TODO_TECHDEBT(@bryanchriswhite): add all the test coverage... +func TestNewMiner(t *testing.T) { + t.Skip("TODO_TECHDEBT(@bryanchriswhite): add all the test coverage...") +} diff --git a/pkg/relayer/protocol/block_heights.go b/pkg/relayer/protocol/block_heights.go new file mode 100644 index 000000000..b372376f7 --- /dev/null +++ b/pkg/relayer/protocol/block_heights.go @@ -0,0 +1,45 @@ +package protocol + +import ( + "encoding/binary" + "log" + "math/rand" + + "github.com/pokt-network/poktroll/pkg/client" +) + +// GetEarliestCreateClaimHeight returns the earliest block height at which a claim +// for a session with the given createClaimWindowStartHeight can be created. +// +// TODO_TEST(@bryanchriswhite): Add test coverage and more logs +func GetEarliestCreateClaimHeight(createClaimWindowStartBlock client.Block) int64 { + createClaimWindowStartBlockHash := createClaimWindowStartBlock.Hash() + log.Printf("using createClaimWindowStartBlock %d's hash %x as randomness", createClaimWindowStartBlock.Height(), createClaimWindowStartBlockHash) + rngSeed, _ := binary.Varint(createClaimWindowStartBlockHash) + randomNumber := rand.NewSource(rngSeed).Int63() + + // TODO_TECHDEBT: query the on-chain governance parameter once available. + // randCreateClaimHeightOffset := randomNumber % (claimproofparams.GovCreateClaimIntervalBlocks - claimproofparams.GovCreateClaimWindowBlocks - 1) + _ = randomNumber + randCreateClaimHeightOffset := int64(0) + + return createClaimWindowStartBlock.Height() + randCreateClaimHeightOffset +} + +// GetEarliestSubmitProofHeight returns the earliest block height at which a proof +// for a session with the given submitProofWindowStartHeight can be submitted. +// +// TODO_TEST(@bryanchriswhite): Add test coverage and more logs +func GetEarliestSubmitProofHeight(submitProofWindowStartBlock client.Block) int64 { + earliestSubmitProofBlockHash := submitProofWindowStartBlock.Hash() + log.Printf("using submitProofWindowStartBlock %d's hash %x as randomness", submitProofWindowStartBlock.Height(), earliestSubmitProofBlockHash) + rngSeed, _ := binary.Varint(earliestSubmitProofBlockHash) + randomNumber := rand.NewSource(rngSeed).Int63() + + // TODO_TECHDEBT: query the on-chain governance parameter once available. + // randSubmitProofHeightOffset := randomNumber % (claimproofparams.GovSubmitProofIntervalBlocks - claimproofparams.GovSubmitProofWindowBlocks - 1) + _ = randomNumber + randSubmitProofHeightOffset := int64(0) + + return submitProofWindowStartBlock.Height() + randSubmitProofHeightOffset +} diff --git a/pkg/relayer/protocol/difficulty.go b/pkg/relayer/protocol/difficulty.go new file mode 100644 index 000000000..4743d546d --- /dev/null +++ b/pkg/relayer/protocol/difficulty.go @@ -0,0 +1,17 @@ +package protocol + +import ( + "encoding/hex" + "strings" +) + +// TODO_BLOCKER: Revisit this part of the algorithm after initial TestNet Launch. +// TODO_TEST: Add extensive tests for the core relay mining business logic. +// BytesDifficultyGreaterThan determines if the bytes exceed a certain difficulty, and it +// is used to determine if a relay is volume applicable. See the spec for more details: https://github.com/pokt-network/pocket-network-protocol +func BytesDifficultyGreaterThan(bz []byte, compDifficultyBytes int) bool { + hexZerosPrefix := strings.Repeat("0", compDifficultyBytes*2) // 2 hex chars per byte. + hexBz := hex.EncodeToString(bz) + + return strings.HasPrefix(hexBz, hexZerosPrefix) +} diff --git a/pkg/relayer/session/claim.go b/pkg/relayer/session/claim.go new file mode 100644 index 000000000..712e7a9e5 --- /dev/null +++ b/pkg/relayer/session/claim.go @@ -0,0 +1,117 @@ +package session + +import ( + "context" + "log" + + "github.com/pokt-network/poktroll/pkg/either" + "github.com/pokt-network/poktroll/pkg/observable" + "github.com/pokt-network/poktroll/pkg/observable/channel" + "github.com/pokt-network/poktroll/pkg/observable/filter" + "github.com/pokt-network/poktroll/pkg/observable/logging" + "github.com/pokt-network/poktroll/pkg/relayer" + "github.com/pokt-network/poktroll/pkg/relayer/protocol" +) + +// createClaims maps over the sessionsToClaimObs observable. For each claim, it: +// 1. Calculates the earliest block height at which it is safe to CreateClaim +// 2. Waits for said block and creates the claim on-chain +// 3. Maps errors to a new observable and logs them +// 4. Returns an observable of the successfully claimed sessions +// It DOES NOT BLOCK as map operations run in their own goroutines. +func (rs *relayerSessionsManager) createClaims(ctx context.Context) observable.Observable[relayer.SessionTree] { + // Map sessionsToClaimObs to a new observable of the same type which is notified + // when the session is eligible to be claimed. + sessionsWithOpenClaimWindowObs := channel.Map( + ctx, rs.sessionsToClaimObs, + rs.mapWaitForEarliestCreateClaimHeight, + ) + + failedCreateClaimSessionsObs, failedCreateClaimSessionsPublishCh := + channel.NewObservable[relayer.SessionTree]() + + // Map sessionsWithOpenClaimWindowObs to a new observable of an either type, + // populated with the session or an error, which is notified after the session + // claim has been created or an error has been encountered, respectively. + eitherClaimedSessionsObs := channel.Map( + ctx, sessionsWithOpenClaimWindowObs, + rs.newMapClaimSessionFn(failedCreateClaimSessionsPublishCh), + ) + + // TODO_TECHDEBT: pass failed create claim sessions to some retry mechanism. + _ = failedCreateClaimSessionsObs + logging.LogErrors(ctx, filter.EitherError(ctx, eitherClaimedSessionsObs)) + + // Map eitherClaimedSessions to a new observable of relayer.SessionTree which + // is notified when the corresponding claim creation succeeded. + return filter.EitherSuccess(ctx, eitherClaimedSessionsObs) +} + +// mapWaitForEarliestCreateClaimHeight is intended to be used as a MapFn. It +// calculates and waits for the earliest block height, allowed by the protocol, +// at which a claim can be created for the given session, then emits the session +// **at that moment**. +func (rs *relayerSessionsManager) mapWaitForEarliestCreateClaimHeight( + ctx context.Context, + session relayer.SessionTree, +) (_ relayer.SessionTree, skip bool) { + rs.waitForEarliestCreateClaimHeight( + ctx, session.GetSessionHeader().GetSessionEndBlockHeight(), + ) + return session, false +} + +// waitForEarliestCreateClaimHeight calculates and waits for (blocking until) the +// earliest block height, allowed by the protocol, at which a claim can be created +// for a session with the given sessionEndHeight. It is calculated relative to +// sessionEndHeight using on-chain governance parameters and randomized input. +// It IS A BLOCKING function. +func (rs *relayerSessionsManager) waitForEarliestCreateClaimHeight( + ctx context.Context, + sessionEndHeight int64, +) { + // TODO_TECHDEBT: refactor this logic to a shared package. + + createClaimWindowStartHeight := sessionEndHeight + // TODO_TECHDEBT: query the on-chain governance parameter once available. + // + claimproofparams.GovCreateClaimWindowStartHeightOffset + + // we wait for createClaimWindowStartHeight to be received before proceeding since we need its hash + // to know where this servicer's claim submission window starts. + log.Printf("waiting & blocking for global earliest claim submission createClaimWindowStartBlock height: %d", createClaimWindowStartHeight) + createClaimWindowStartBlock := rs.waitForBlock(ctx, createClaimWindowStartHeight) + + log.Printf("received earliest claim submission createClaimWindowStartBlock height: %d, use its hash to have a random submission for the servicer", createClaimWindowStartBlock.Height()) + + earliestCreateClaimHeight := + protocol.GetEarliestCreateClaimHeight(createClaimWindowStartBlock) + + log.Printf("earliest claim submission createClaimWindowStartBlock height for this supplier: %d", earliestCreateClaimHeight) + _ = rs.waitForBlock(ctx, earliestCreateClaimHeight) +} + +// newMapClaimSessionFn returns a new MapFn that creates a claim for the given +// session. Any session which encouters an error while creating a claim is sent +// on the failedCreateClaimSessions channel. +func (rs *relayerSessionsManager) newMapClaimSessionFn( + failedCreateClaimSessionsPublishCh chan<- relayer.SessionTree, +) channel.MapFn[relayer.SessionTree, either.SessionTree] { + return func( + ctx context.Context, + session relayer.SessionTree, + ) (_ either.SessionTree, skip bool) { + // this session should no longer be updated + claimRoot, err := session.Flush() + if err != nil { + return either.Error[relayer.SessionTree](err), false + } + + sessionHeader := session.GetSessionHeader() + if err := rs.supplierClient.CreateClaim(ctx, *sessionHeader, claimRoot); err != nil { + failedCreateClaimSessionsPublishCh <- session + return either.Error[relayer.SessionTree](err), false + } + + return either.Success(session), false + } +} diff --git a/pkg/relayer/session/proof.go b/pkg/relayer/session/proof.go new file mode 100644 index 000000000..4a7c415aa --- /dev/null +++ b/pkg/relayer/session/proof.go @@ -0,0 +1,116 @@ +package session + +import ( + "context" + "log" + + "github.com/pokt-network/poktroll/pkg/either" + "github.com/pokt-network/poktroll/pkg/observable" + "github.com/pokt-network/poktroll/pkg/observable/channel" + "github.com/pokt-network/poktroll/pkg/observable/filter" + "github.com/pokt-network/poktroll/pkg/observable/logging" + "github.com/pokt-network/poktroll/pkg/relayer" + "github.com/pokt-network/poktroll/pkg/relayer/protocol" +) + +// submitProofs maps over the given claimedSessions observable. +// For each session, it: +// 1. Calculates the earliest block height at which to submit a proof +// 2. Waits for said height and submits the proof on-chain +// 3. Maps errors to a new observable and logs them +// It DOES NOT BLOCKas map operations run in their own goroutines. +func (rs *relayerSessionsManager) submitProofs( + ctx context.Context, + claimedSessionsObs observable.Observable[relayer.SessionTree], +) { + // Map claimedSessionsObs to a new observable of the same type which is notified + // when the session is eligible to be proven. + sessionsWithOpenProofWindowObs := channel.Map( + ctx, claimedSessionsObs, + rs.mapWaitForEarliestSubmitProofHeight, + ) + + failedSubmitProofSessionsObs, failedSubmitProofSessionsPublishCh := + channel.NewObservable[relayer.SessionTree]() + + // Map sessionsWithOpenProofWindow to a new observable of an either type, + // populated with the session or an error, which is notified after the session + // proof has been submitted or an error has been encountered, respectively. + eitherProvenSessionsObs := channel.Map( + ctx, sessionsWithOpenProofWindowObs, + rs.newMapProveSessionFn(failedSubmitProofSessionsPublishCh), + ) + + // TODO_TECHDEBT: pass failed submit proof sessions to some retry mechanism. + _ = failedSubmitProofSessionsObs + logging.LogErrors(ctx, filter.EitherError(ctx, eitherProvenSessionsObs)) +} + +// mapWaitForEarliestSubmitProofHeight is intended to be used as a MapFn. It +// calculates and waits for the earliest block height, allowed by the protocol, +// at which a proof can be submitted for the given session, then emits the session +// **at that moment**. +func (rs *relayerSessionsManager) mapWaitForEarliestSubmitProofHeight( + ctx context.Context, + session relayer.SessionTree, +) (_ relayer.SessionTree, skip bool) { + rs.waitForEarliestSubmitProofHeight( + ctx, session.GetSessionHeader().GetSessionEndBlockHeight(), + ) + return session, false +} + +// waitForEarliestSubmitProofHeight calculates and waits for (blocking until) the +// earliest block height, allowed by the protocol, at which a proof can be submitted +// for a session which was claimed at createClaimHeight. It is calculated relative +// to createClaimHeight using on-chain governance parameters and randomized input. +// It IS A BLOCKING function. +func (rs *relayerSessionsManager) waitForEarliestSubmitProofHeight( + ctx context.Context, + createClaimHeight int64, +) { + submitProofWindowStartHeight := createClaimHeight + // TODO_TECHDEBT: query the on-chain governance parameter once available. + // + claimproofparams.GovSubmitProofWindowStartHeightOffset + + // we wait for submitProofWindowStartHeight to be received before proceeding since we need its hash + log.Printf("waiting and blocking for global earliest proof submission submitProofWindowStartBlock height: %d", submitProofWindowStartHeight) + submitProofWindowStartBlock := rs.waitForBlock(ctx, submitProofWindowStartHeight) + + earliestSubmitProofHeight := protocol.GetEarliestSubmitProofHeight(submitProofWindowStartBlock) + _ = rs.waitForBlock(ctx, earliestSubmitProofHeight) +} + +// newMapProveSessionFn returns a new MapFn that submits a proof for the given +// session. Any session which encouters errors while submitting a proof is sent +// on the failedSubmitProofSessions channel. +func (rs *relayerSessionsManager) newMapProveSessionFn( + failedSubmitProofSessionsCh chan<- relayer.SessionTree, +) channel.MapFn[relayer.SessionTree, either.SessionTree] { + return func( + ctx context.Context, + session relayer.SessionTree, + ) (_ either.SessionTree, skip bool) { + // TODO_BLOCKER: The block that'll be used as a source of entropy for which + // branch(es) to prove should be deterministic and use on-chain governance params + // rather than latest. + latestBlock := rs.blockClient.LatestBlock(ctx) + proof, err := session.ProveClosest(latestBlock.Hash()) + if err != nil { + return either.Error[relayer.SessionTree](err), false + } + + log.Printf("currentBlock: %d, submitting proof", latestBlock.Height()+1) + // SubmitProof ensures on-chain proof inclusion so we can safely prune the tree. + if err := rs.supplierClient.SubmitProof( + ctx, + *session.GetSessionHeader(), + proof, + ); err != nil { + failedSubmitProofSessionsCh <- session + return either.Error[relayer.SessionTree](err), false + } + + return either.Success(session), false + } +} diff --git a/pkg/relayer/session/session.go b/pkg/relayer/session/session.go index 7a45880e1..4cf8b0d2c 100644 --- a/pkg/relayer/session/session.go +++ b/pkg/relayer/session/session.go @@ -6,9 +6,11 @@ import ( "sync" "cosmossdk.io/depinject" + "github.com/pokt-network/poktroll/pkg/client" "github.com/pokt-network/poktroll/pkg/observable" "github.com/pokt-network/poktroll/pkg/observable/channel" + "github.com/pokt-network/poktroll/pkg/observable/logging" "github.com/pokt-network/poktroll/pkg/relayer" sessiontypes "github.com/pokt-network/poktroll/x/session/types" ) @@ -20,8 +22,10 @@ type sessionsTreesMap = map[int64]map[string]relayer.SessionTree // relayerSessionsManager is an implementation of the RelayerSessions interface. // TODO_TEST: Add tests to the relayerSessionsManager. type relayerSessionsManager struct { - // sessionsToClaim notifies about sessions that are ready to be claimed. - sessionsToClaim observable.Observable[relayer.SessionTree] + relayObs observable.Observable[*relayer.MinedRelay] + + // sessionsToClaimObs notifies about sessions that are ready to be claimed. + sessionsToClaimObs observable.Observable[relayer.SessionTree] // sessionTrees is a map of block heights pointing to a map of SessionTrees // indexed by their sessionId. @@ -33,6 +37,9 @@ type relayerSessionsManager struct { // blockClient is used to get the notifications of committed blocks. blockClient client.BlockClient + // supplierClient is used to create claims and submit proofs for sessions. + supplierClient client.SupplierClient + // storesDirectory points to a path on disk where KVStore data files are created. storesDirectory string } @@ -50,6 +57,7 @@ func NewRelayerSessions( if err := depinject.Inject( deps, &rs.blockClient, + &rs.supplierClient, ); err != nil { return nil, err } @@ -62,7 +70,7 @@ func NewRelayerSessions( return nil, err } - rs.sessionsToClaim = channel.MapExpand[client.Block, relayer.SessionTree]( + rs.sessionsToClaimObs = channel.MapExpand[client.Block, relayer.SessionTree]( ctx, rs.blockClient.CommittedBlocksSequence(ctx), rs.mapBlockToSessionsToClaim, @@ -71,14 +79,40 @@ func NewRelayerSessions( return rs, nil } +// Start iterates over the session trees at the end of each, respective, session. +// The session trees are piped through a series of map operations which progress +// them through the claim/proof lifecycle, broadcasting transactions to the +// network as necessary. +func (rs *relayerSessionsManager) Start(ctx context.Context) { + // Map eitherMinedRelays to a new observable of an error type which is + // notified if an error was encountered while attampting to add the relay to + // the session tree. + miningErrorsObs := channel.Map(ctx, rs.relayObs, rs.mapAddRelayToSessionTree) + logging.LogErrors(ctx, miningErrorsObs) + + // Start claim/proof pipeline. + claimedSessionsObs := rs.createClaims(ctx) + rs.submitProofs(ctx, claimedSessionsObs) +} + +// Stop unsubscribes all observables from the InsertRelays observable which +// will close downstream observables as they drain. +// +// TODO_TECHDEBT: Either add a mechanism to wait for draining to complete +// and/or ensure that the state at each pipeline stage is persisted to disk +// and exit as early as possible. +func (rs *relayerSessionsManager) Stop() { + rs.relayObs.UnsubscribeAll() +} + // SessionsToClaim returns an observable that notifies when sessions are ready to be claimed. -func (rs *relayerSessionsManager) SessionsToClaim() observable.Observable[relayer.SessionTree] { - return rs.sessionsToClaim +func (rs *relayerSessionsManager) InsertRelays(relays observable.Observable[*relayer.MinedRelay]) { + rs.relayObs = relays } -// EnsureSessionTree returns the SessionTree for a given session. +// ensureSessionTree returns the SessionTree for a given session. // If no tree for the session exists, a new SessionTree is created before returning. -func (rs *relayerSessionsManager) EnsureSessionTree(sessionHeader *sessiontypes.SessionHeader) (relayer.SessionTree, error) { +func (rs *relayerSessionsManager) ensureSessionTree(sessionHeader *sessiontypes.SessionHeader) (relayer.SessionTree, error) { rs.sessionsTreesMu.Lock() defer rs.sessionsTreesMu.Unlock() @@ -157,3 +191,42 @@ func (rp *relayerSessionsManager) validateConfig() error { return nil } + +// waitForBlock blocks until the block at the given height (or greater) is +// observed as having been committed. +func (rs *relayerSessionsManager) waitForBlock(ctx context.Context, height int64) client.Block { + subscription := rs.blockClient.CommittedBlocksSequence(ctx).Subscribe(ctx) + defer subscription.Unsubscribe() + + for block := range subscription.Ch() { + if block.Height() >= height { + return block + } + } + + return nil +} + +// mapAddRelayToSessionTree is intended to be used as a MapFn. It adds the relay +// to the session tree. If it encounters an error, it returns the error. Otherwise, +// it skips output (only outputs errors). +func (rs *relayerSessionsManager) mapAddRelayToSessionTree( + _ context.Context, + relay *relayer.MinedRelay, +) (_ error, skip bool) { + // ensure the session tree exists for this relay + sessionHeader := relay.GetReq().GetMeta().GetSessionHeader() + smst, err := rs.ensureSessionTree(sessionHeader) + if err != nil { + log.Printf("failed to ensure session tree: %s\n", err) + return err, false + } + + if err := smst.Update(relay.Hash, relay.Bytes, 1); err != nil { + log.Printf("failed to update smt: %s\n", err) + return err, false + } + + // Skip because this map function only outputs errors. + return nil, true +} diff --git a/pkg/relayer/types.go b/pkg/relayer/types.go new file mode 100644 index 000000000..1216dd25e --- /dev/null +++ b/pkg/relayer/types.go @@ -0,0 +1,10 @@ +package relayer + +import "github.com/pokt-network/poktroll/x/service/types" + +// MinedRelay is a wrapper around a relay that has been serialized and hashed. +type MinedRelay struct { + types.Relay + Bytes []byte + Hash []byte +} From 059b7dffe94884fbeec864463fadf12b1b2f165d Mon Sep 17 00:00:00 2001 From: Redouane Lakrache Date: Fri, 10 Nov 2023 22:26:10 +0100 Subject: [PATCH 08/12] chore: Reflect responsibility changes of session manager --- pkg/relayer/interface.go | 3 +-- pkg/relayer/relayminer.go | 31 +++++++++++++++++++++++++------ pkg/relayer/session/session.go | 2 +- 3 files changed, 27 insertions(+), 9 deletions(-) diff --git a/pkg/relayer/interface.go b/pkg/relayer/interface.go index 134ac8f53..539361f2d 100644 --- a/pkg/relayer/interface.go +++ b/pkg/relayer/interface.go @@ -7,7 +7,6 @@ import ( "github.com/pokt-network/poktroll/pkg/observable" "github.com/pokt-network/poktroll/x/service/types" - servicetypes "github.com/pokt-network/poktroll/x/service/types" sessiontypes "github.com/pokt-network/poktroll/x/session/types" sharedtypes "github.com/pokt-network/poktroll/x/shared/types" ) @@ -18,7 +17,7 @@ import ( type Miner interface { MinedRelays( ctx context.Context, - servedRelayObs observable.Observable[*servicetypes.Relay], + servedRelayObs observable.Observable[*types.Relay], ) (minedRelaysObs observable.Observable[*MinedRelay]) } diff --git a/pkg/relayer/relayminer.go b/pkg/relayer/relayminer.go index f4f8b6981..dbbe25aec 100644 --- a/pkg/relayer/relayminer.go +++ b/pkg/relayer/relayminer.go @@ -2,6 +2,7 @@ package relayer import ( "context" + "log" "cosmossdk.io/depinject" ) @@ -9,23 +10,30 @@ import ( // relayMiner is the main struct that encapsulates the relayer's responsibilities (i.e. Relay Mining). // It starts and stops the RelayerProxy and provide the served relays observable to them miner. type relayMiner struct { - relayerProxy RelayerProxy - miner Miner + relayerProxy RelayerProxy + miner Miner + relayerSessionsManager RelayerSessionsManager } // NewRelayMiner creates a new Relayer instance with the given dependencies. // It injects the dependencies into the Relayer instance and returns it. -func NewRelayMiner(deps depinject.Config) (*relayMiner, error) { +func NewRelayMiner(ctx context.Context, deps depinject.Config) (*relayMiner, error) { rel := &relayMiner{} if err := depinject.Inject( deps, &rel.relayerProxy, &rel.miner, + &rel.relayerSessionsManager, ); err != nil { return nil, err } + // Set up relay pipeline + servedRelaysObs := rel.relayerProxy.ServedRelays() + minedRelaysObs := rel.miner.MinedRelays(ctx, servedRelaysObs) + rel.relayerSessionsManager.InsertRelays(minedRelaysObs) + return rel, nil } @@ -33,9 +41,20 @@ func NewRelayMiner(deps depinject.Config) (*relayMiner, error) { // This method is blocking while the relayer proxy is running and returns when Stop is called // or when the relayer proxy fails to start. func (rel *relayMiner) Start(ctx context.Context) error { - // StartMiningRelays does not block, it only subscribes to the served relays observable. - rel.miner.StartMiningRelays(ctx, rel.relayerProxy.ServedRelays()) - return rel.relayerProxy.Start(ctx) + // relayerSessionsManager.Start does not block. + // Set up the session (proof/claim) lifecycle pipeline. + log.Println("INFO: Starting relayer sessions manager...") + rel.relayerSessionsManager.Start(ctx) + + // Start the flow of relays by starting relayer proxy. + // This is a blocking call as it waits for the waitgroup to be done. + log.Println("INFO: Starting relayer proxy...") + if err := rel.relayerProxy.Start(ctx); err != nil { + return err + } + + log.Println("INFO: Relayer proxy stopped; exiting") + return nil } // Stop stops the relayer proxy which in turn stops all advertised relay servers diff --git a/pkg/relayer/session/session.go b/pkg/relayer/session/session.go index 4cf8b0d2c..4a444dfd0 100644 --- a/pkg/relayer/session/session.go +++ b/pkg/relayer/session/session.go @@ -85,7 +85,7 @@ func NewRelayerSessions( // network as necessary. func (rs *relayerSessionsManager) Start(ctx context.Context) { // Map eitherMinedRelays to a new observable of an error type which is - // notified if an error was encountered while attampting to add the relay to + // notified if an error was encountered while attempting to add the relay to // the session tree. miningErrorsObs := channel.Map(ctx, rs.relayObs, rs.mapAddRelayToSessionTree) logging.LogErrors(ctx, miningErrorsObs) From 5f9bd5f7d33cd643f1ed868de1a86db09280e3cb Mon Sep 17 00:00:00 2001 From: Redouane Lakrache Date: Fri, 10 Nov 2023 22:47:52 +0100 Subject: [PATCH 09/12] feat: Use relay miner to start --- pkg/relayer/cmd/cmd.go | 44 ++++++++++++++++++--------------------- pkg/relayer/relayminer.go | 12 +++++------ 2 files changed, 26 insertions(+), 30 deletions(-) diff --git a/pkg/relayer/cmd/cmd.go b/pkg/relayer/cmd/cmd.go index 216d53078..886667d5d 100644 --- a/pkg/relayer/cmd/cmd.go +++ b/pkg/relayer/cmd/cmd.go @@ -66,25 +66,14 @@ func runRelayer(cmd *cobra.Command, _ []string) error { return err } - var ( - relayerProxy relayer.RelayerProxy - miner relayer.Miner - relayerSessionsManager relayer.RelayerSessionsManager - ) + var relayMiner relayer.RelayMiner if err := depinject.Inject( deps, - &relayerProxy, - &miner, - &relayerSessionsManager, + &relayMiner, ); err != nil { return err } - // Set up relay pipeline. - servedRelaysObs := relayerProxy.ServedRelays() - minedRelaysObs := miner.MinedRelays(ctx, servedRelaysObs) - relayerSessionsManager.InsertRelays(minedRelaysObs) - // Handle interrupts in a goroutine. go func() { sigCh := make(chan os.Signal, 1) @@ -97,18 +86,11 @@ func runRelayer(cmd *cobra.Command, _ []string) error { cancelCtx() }() - // Set up the session (proof/claim) lifecycle pipeline. - log.Println("INFO: Starting relayer sessions manager...") - relayerSessionsManager.Start(ctx) - - // Start the flow of relays by starting relayer proxy. - // This is a blocking call as it waits for the waitgroup to be done. - log.Println("INFO: Starting relayer proxy...") - if err := relayerProxy.Start(ctx); err != nil { - return err - } + // Start the relay miner + log.Println("INFO: Starting relay miner...") + relayMiner.Start(ctx) - log.Println("INFO: Relayer proxy stopped; exiting") + log.Println("INFO: Relay miner stopped; exiting") return nil } @@ -151,6 +133,11 @@ func setupRelayerDependencies( return nil, err } + deps, err = supplyRelayMiner(ctx, deps) + if err != nil { + return nil, err + } + return deps, nil } @@ -252,3 +239,12 @@ func supplyRelayerProxy(deps depinject.Config) (depinject.Config, error) { return depinject.Configs(deps, depinject.Supply(relayerProxy)), nil } + +func supplyRelayMiner(ctx context.Context, deps depinject.Config) (depinject.Config, error) { + relayMiner, err := relayer.NewRelayMiner(ctx, deps) + if err != nil { + return nil, err + } + + return depinject.Configs(deps, depinject.Supply(relayMiner)), nil +} diff --git a/pkg/relayer/relayminer.go b/pkg/relayer/relayminer.go index dbbe25aec..e8602a2c4 100644 --- a/pkg/relayer/relayminer.go +++ b/pkg/relayer/relayminer.go @@ -7,9 +7,9 @@ import ( "cosmossdk.io/depinject" ) -// relayMiner is the main struct that encapsulates the relayer's responsibilities (i.e. Relay Mining). +// RelayMiner is the main struct that encapsulates the relayer's responsibilities (i.e. Relay Mining). // It starts and stops the RelayerProxy and provide the served relays observable to them miner. -type relayMiner struct { +type RelayMiner struct { relayerProxy RelayerProxy miner Miner relayerSessionsManager RelayerSessionsManager @@ -17,8 +17,8 @@ type relayMiner struct { // NewRelayMiner creates a new Relayer instance with the given dependencies. // It injects the dependencies into the Relayer instance and returns it. -func NewRelayMiner(ctx context.Context, deps depinject.Config) (*relayMiner, error) { - rel := &relayMiner{} +func NewRelayMiner(ctx context.Context, deps depinject.Config) (*RelayMiner, error) { + rel := &RelayMiner{} if err := depinject.Inject( deps, @@ -40,7 +40,7 @@ func NewRelayMiner(ctx context.Context, deps depinject.Config) (*relayMiner, err // Start provides the miner with the served relays observable and starts the relayer proxy. // This method is blocking while the relayer proxy is running and returns when Stop is called // or when the relayer proxy fails to start. -func (rel *relayMiner) Start(ctx context.Context) error { +func (rel *RelayMiner) Start(ctx context.Context) error { // relayerSessionsManager.Start does not block. // Set up the session (proof/claim) lifecycle pipeline. log.Println("INFO: Starting relayer sessions manager...") @@ -59,6 +59,6 @@ func (rel *relayMiner) Start(ctx context.Context) error { // Stop stops the relayer proxy which in turn stops all advertised relay servers // and unsubscribes the miner from the served relays observable. -func (rel *relayMiner) Stop(ctx context.Context) error { +func (rel *RelayMiner) Stop(ctx context.Context) error { return rel.relayerProxy.Stop(ctx) } From 2673bb27ef4e0d108f2cae50187ffdd9e751aa61 Mon Sep 17 00:00:00 2001 From: Redouane Lakrache Date: Fri, 10 Nov 2023 23:42:38 +0100 Subject: [PATCH 10/12] [Relayer] feat: Add Relayer struct (#172) * refactor: `MapFn`s receive context arg * chore: add `ForEach` map shorthand operator * chore: add `/pkg/observable/filter` * chore: add `/pkg/observable/logging` * chore: add `/pkg/relayer/protocol` * chore: add `Miner` interface * feat: add `Miner` implementation * test: `Miner` implementation * chore: fix comment * chore: add godoc comments * feat: Add Relayer struct * chore: Rename to RelayMiner * chore: Rename relay miner file * chore: Remove unused RelayerOption parameter * [Test] First step for automated E2E Relay test (#167) - Fixed helpers for localnet regenesis - Added an application & supplier to the genesis file - Initializing appMap & supplierMap in E2E tests - Add support for the app's codec (for unmarshaling responses) in E2E tests - Adding a placeholder for `e2e/tests/relay.feature` --- Co-authored-by: harry <53987565+h5law@users.noreply.github.com> * [Relayer] refactor: simplify `RelayerSessionsManager` (#169) * refactor: `MapFn`s receive context arg * feat: add `MapExpand` observable operator * refactor: `RelayerSessionsManager` to be more reactive * chore: add godoc comment * chore: review feedback improvements * trigger CI * chore: review feedback improvements Co-authored-by: Daniel Olshansky * chore: review feedback improvements * chore: update start mining comment * fix: Update Miner interface * fix: import cycle & goimports * chore: review feedback improvements * chore: cleanup TODO_THIS_COMMIT comments * chore: improve var & func names for clarity and consistency * refactor: move claim/proof lifecycle concerns to `relayerSessionsManager`. * chore: review feedback improvements * chore: review feedback improvements * refactor: `miner#hash()` method * chore: tidy up * chore: simplify * chore: review feedback improvements Co-authored-by: Daniel Olshansky * chore: review feedback improvements Co-authored-by: Daniel Olshansky * chore: review feedback improvements Co-authored-by: Daniel Olshansky * chore: review feedback improvements * chore: review feedback improvements * fix: incomplete refactor * chore: simplify * chore: Reflect responsibility changes of session manager * chore: Improve comments about waitgroup --------- Co-authored-by: Bryan White Co-authored-by: Daniel Olshansky Co-authored-by: harry <53987565+h5law@users.noreply.github.com> --- pkg/relayer/interface.go | 3 +- pkg/relayer/proxy/proxy.go | 2 +- pkg/relayer/relayminer.go | 65 ++++++++++++++++++++++++++++++++++ pkg/relayer/session/session.go | 2 +- 4 files changed, 68 insertions(+), 4 deletions(-) create mode 100644 pkg/relayer/relayminer.go diff --git a/pkg/relayer/interface.go b/pkg/relayer/interface.go index 134ac8f53..539361f2d 100644 --- a/pkg/relayer/interface.go +++ b/pkg/relayer/interface.go @@ -7,7 +7,6 @@ import ( "github.com/pokt-network/poktroll/pkg/observable" "github.com/pokt-network/poktroll/x/service/types" - servicetypes "github.com/pokt-network/poktroll/x/service/types" sessiontypes "github.com/pokt-network/poktroll/x/session/types" sharedtypes "github.com/pokt-network/poktroll/x/shared/types" ) @@ -18,7 +17,7 @@ import ( type Miner interface { MinedRelays( ctx context.Context, - servedRelayObs observable.Observable[*servicetypes.Relay], + servedRelayObs observable.Observable[*types.Relay], ) (minedRelaysObs observable.Observable[*MinedRelay]) } diff --git a/pkg/relayer/proxy/proxy.go b/pkg/relayer/proxy/proxy.go index ea73031c8..58b9549fd 100644 --- a/pkg/relayer/proxy/proxy.go +++ b/pkg/relayer/proxy/proxy.go @@ -113,7 +113,7 @@ func NewRelayerProxy( } // Start concurrently starts all advertised relay servers and returns an error if any of them fails to start. -// This method is blocking until all RelayServers are started. +// This method is blocking as long as all RelayServers are running. func (rp *relayerProxy) Start(ctx context.Context) error { // The provided services map is built from the supplier's on-chain advertised information, // which is a runtime parameter that can be changed by the supplier. diff --git a/pkg/relayer/relayminer.go b/pkg/relayer/relayminer.go new file mode 100644 index 000000000..46ac9d5d8 --- /dev/null +++ b/pkg/relayer/relayminer.go @@ -0,0 +1,65 @@ +package relayer + +import ( + "context" + "log" + + "cosmossdk.io/depinject" +) + +// relayMiner is the main struct that encapsulates the relayer's responsibilities (i.e. Relay Mining). +// It starts and stops the RelayerProxy and provide the served relays observable to the miner. +type relayMiner struct { + relayerProxy RelayerProxy + miner Miner + relayerSessionsManager RelayerSessionsManager +} + +// NewRelayMiner creates a new Relayer instance with the given dependencies. +// It injects the dependencies into the Relayer instance and returns it. +func NewRelayMiner(ctx context.Context, deps depinject.Config) (*relayMiner, error) { + rel := &relayMiner{} + + if err := depinject.Inject( + deps, + &rel.relayerProxy, + &rel.miner, + &rel.relayerSessionsManager, + ); err != nil { + return nil, err + } + + // Set up relay pipeline + servedRelaysObs := rel.relayerProxy.ServedRelays() + minedRelaysObs := rel.miner.MinedRelays(ctx, servedRelaysObs) + rel.relayerSessionsManager.InsertRelays(minedRelaysObs) + + return rel, nil +} + +// Start provides the miner with the served relays observable and starts the relayer proxy. +// This method is blocking while the relayer proxy is running and returns when Stop is called +// or when the relayer proxy fails to start. +func (rel *relayMiner) Start(ctx context.Context) error { + // relayerSessionsManager.Start does not block. + // Set up the session (proof/claim) lifecycle pipeline. + log.Println("INFO: Starting relayer sessions manager...") + rel.relayerSessionsManager.Start(ctx) + + // Start the flow of relays by starting relayer proxy. + // This is a blocking call as it waits for the waitgroup in relayerProxy.Start() + // that starts all the relay servers to be done. + log.Println("INFO: Starting relayer proxy...") + if err := rel.relayerProxy.Start(ctx); err != nil { + return err + } + + log.Println("INFO: Relayer proxy stopped; exiting") + return nil +} + +// Stop stops the relayer proxy which in turn stops all advertised relay servers +// and unsubscribes the miner from the served relays observable. +func (rel *relayMiner) Stop(ctx context.Context) error { + return rel.relayerProxy.Stop(ctx) +} diff --git a/pkg/relayer/session/session.go b/pkg/relayer/session/session.go index 4cf8b0d2c..4a444dfd0 100644 --- a/pkg/relayer/session/session.go +++ b/pkg/relayer/session/session.go @@ -85,7 +85,7 @@ func NewRelayerSessions( // network as necessary. func (rs *relayerSessionsManager) Start(ctx context.Context) { // Map eitherMinedRelays to a new observable of an error type which is - // notified if an error was encountered while attampting to add the relay to + // notified if an error was encountered while attempting to add the relay to // the session tree. miningErrorsObs := channel.Map(ctx, rs.relayObs, rs.mapAddRelayToSessionTree) logging.LogErrors(ctx, miningErrorsObs) From 1dcdbbe8cfb05d9520649f2192ea1635ca8a2598 Mon Sep 17 00:00:00 2001 From: Redouane Lakrache Date: Fri, 10 Nov 2023 23:49:58 +0100 Subject: [PATCH 11/12] chore: Improve comment about startig relayer proxy --- pkg/relayer/proxy/proxy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/relayer/proxy/proxy.go b/pkg/relayer/proxy/proxy.go index 09b05539b..685e13f86 100644 --- a/pkg/relayer/proxy/proxy.go +++ b/pkg/relayer/proxy/proxy.go @@ -114,7 +114,7 @@ func NewRelayerProxy( // Start concurrently starts all advertised relay servers and returns an error // if any of them errors. -// This method is blocking until all RelayServers are stopped. +// This method IS BLOCKING until all RelayServers are stopped. func (rp *relayerProxy) Start(ctx context.Context) error { // The provided services map is built from the supplier's on-chain advertised information, // which is a runtime parameter that can be changed by the supplier. From 81a808a5e9a8fd9ab14bdd73d986fa91375b844d Mon Sep 17 00:00:00 2001 From: Redouane Lakrache Date: Sat, 11 Nov 2023 00:18:21 +0100 Subject: [PATCH 12/12] [AppGate] Implement the MVP AppGateServer (#108) Co-authored-by: h5law <53987565+h5law@users.noreply.github.com> --- cmd/pocketd/cmd/root.go | 6 + go.mod | 2 + go.sum | 4 + pkg/appgateserver/cmd/cmd.go | 147 ++++++++++ pkg/appgateserver/endpoint_selector.go | 45 +++ pkg/appgateserver/errors.go | 13 + pkg/appgateserver/jsonrpc.go | 132 +++++++++ pkg/appgateserver/options.go | 19 ++ pkg/appgateserver/relay_verifier.go | 73 +++++ pkg/appgateserver/rings.go | 146 ++++++++++ pkg/appgateserver/server.go | 284 +++++++++++++++++++ pkg/appgateserver/session.go | 47 +++ pkg/client/block/client.go | 7 +- pkg/client/gomock_reflect_3526400147/prog.go | 66 ----- pkg/relayer/proxy/errors.go | 2 + pkg/relayer/proxy/proxy.go | 13 + pkg/relayer/proxy/relay_signer.go | 23 +- pkg/relayer/proxy/relay_verifier.go | 64 +++-- pkg/relayer/proxy/rings.go | 119 ++++++++ pkg/signer/interface.go | 9 + pkg/signer/ring_signer.go | 38 +++ pkg/signer/simple_signer.go | 23 ++ x/service/types/relay.go | 23 ++ 23 files changed, 1214 insertions(+), 91 deletions(-) create mode 100644 pkg/appgateserver/cmd/cmd.go create mode 100644 pkg/appgateserver/endpoint_selector.go create mode 100644 pkg/appgateserver/errors.go create mode 100644 pkg/appgateserver/jsonrpc.go create mode 100644 pkg/appgateserver/options.go create mode 100644 pkg/appgateserver/relay_verifier.go create mode 100644 pkg/appgateserver/rings.go create mode 100644 pkg/appgateserver/server.go create mode 100644 pkg/appgateserver/session.go delete mode 100644 pkg/client/gomock_reflect_3526400147/prog.go create mode 100644 pkg/relayer/proxy/rings.go create mode 100644 pkg/signer/interface.go create mode 100644 pkg/signer/ring_signer.go create mode 100644 pkg/signer/simple_signer.go create mode 100644 x/service/types/relay.go diff --git a/cmd/pocketd/cmd/root.go b/cmd/pocketd/cmd/root.go index 5b1fb3276..cf58f2447 100644 --- a/cmd/pocketd/cmd/root.go +++ b/cmd/pocketd/cmd/root.go @@ -43,6 +43,7 @@ import ( "github.com/pokt-network/poktroll/app" appparams "github.com/pokt-network/poktroll/app/params" + appgateservercmd "github.com/pokt-network/poktroll/pkg/appgateserver/cmd" ) // NewRootCmd creates a new root command for a Cosmos SDK application @@ -148,6 +149,11 @@ func initRootCmd( txCommand(), keys.Commands(app.DefaultNodeHome), ) + + // add the appgate server command + rootCmd.AddCommand( + appgateservercmd.AppGateServerCmd(), + ) } // queryCommand returns the sub-command to send queries to the app diff --git a/go.mod b/go.mod index 51aa32938..adbedcf85 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( cosmossdk.io/depinject v1.0.0-alpha.3 cosmossdk.io/errors v1.0.0-beta.7 cosmossdk.io/math v1.0.1 + github.com/athanorlabs/go-dleq v0.1.0 github.com/cometbft/cometbft v0.37.2 github.com/cometbft/cometbft-db v0.8.0 github.com/cosmos/cosmos-proto v1.0.0-beta.2 @@ -20,6 +21,7 @@ require ( github.com/gorilla/websocket v1.5.0 github.com/grpc-ecosystem/grpc-gateway v1.16.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.2 + github.com/noot/ring-go v0.0.0-20231019173746-6c4b33bcf03f github.com/pokt-network/smt v0.7.1 github.com/regen-network/gocuke v0.6.2 github.com/spf13/cast v1.5.1 diff --git a/go.sum b/go.sum index b554afc3b..24322a896 100644 --- a/go.sum +++ b/go.sum @@ -300,6 +300,8 @@ github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgI github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A= github.com/ashanbrown/forbidigo v1.3.0/go.mod h1:vVW7PEdqEFqapJe95xHkTfB1+XvZXBFg8t0sG2FIxmI= github.com/ashanbrown/makezero v1.1.1/go.mod h1:i1bJLCRSCHOcOa9Y6MyF2FTfMZMFdHvxKHxgO5Z1axI= +github.com/athanorlabs/go-dleq v0.1.0 h1:0/llWZG8fz2uintMBKOiBC502zCsDA8nt8vxI73W9Qc= +github.com/athanorlabs/go-dleq v0.1.0/go.mod h1:DWry6jSD7A13MKmeZA0AX3/xBeQCXDoygX99VPwL3yU= github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU= github.com/aws/aws-sdk-go v1.23.20/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.25.37/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= @@ -1481,6 +1483,8 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA github.com/nishanths/exhaustive v0.8.1/go.mod h1:qj+zJJUgJ76tR92+25+03oYUhzF4R7/2Wk7fGTfCHmg= github.com/nishanths/predeclared v0.0.0-20190419143655-18a43bb90ffc/go.mod h1:62PewwiQTlm/7Rj+cxVYqZvDIUc+JjZq6GHAC1fsObQ= github.com/nishanths/predeclared v0.2.2/go.mod h1:RROzoN6TnGQupbC+lqggsOlcgysk3LMK/HI84Mp280c= +github.com/noot/ring-go v0.0.0-20231019173746-6c4b33bcf03f h1:1+NP/H13eFAqBYrGpRkbJUWVWIO2Zr2eP7a/q0UtZVQ= +github.com/noot/ring-go v0.0.0-20231019173746-6c4b33bcf03f/go.mod h1:0t3gzoSfW2bkTce1E/Jis3MQpjiKGhAgqieFK+nkQsI= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= diff --git a/pkg/appgateserver/cmd/cmd.go b/pkg/appgateserver/cmd/cmd.go new file mode 100644 index 000000000..21052e270 --- /dev/null +++ b/pkg/appgateserver/cmd/cmd.go @@ -0,0 +1,147 @@ +package cmd + +import ( + "context" + "errors" + "fmt" + "log" + "net/http" + "net/url" + "os" + "os/signal" + + "cosmossdk.io/depinject" + cosmosclient "github.com/cosmos/cosmos-sdk/client" + "github.com/cosmos/cosmos-sdk/client/flags" + "github.com/spf13/cobra" + + "github.com/pokt-network/poktroll/pkg/appgateserver" + blockclient "github.com/pokt-network/poktroll/pkg/client/block" + eventsquery "github.com/pokt-network/poktroll/pkg/client/events_query" +) + +var ( + flagSigningKey string + flagSelfSigning bool + flagListeningEndpoint string + flagCometWebsocketUrl string +) + +func AppGateServerCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "appgate-server", + Short: "Starts the AppGate server", + Long: `Starts the AppGate server that listens for incoming relay requests and handles +the necessary on-chain interactions (sessions, suppliers, etc) to receive the +respective relay response. + +-- App Mode (Flag)- - +If the server is started with a defined '--self-signing' flag, it will behave +as an Application. Any incoming requests will be signed by using the private +key and ring associated with the '--signing-key' flag. + +-- Gateway Mode (Flag)-- +If the '--self-signing' flag is not provided, the server will behave as a Gateway. +It will sign relays on behalf of any Application sending it relays, provided +that the address associated with '--signing-key' has been delegated to. This is +necessary for the application<->gateway ring signature to function. + +-- App Mode (HTTP) -- +If an application doesn't provide the '--self-signing' flag, it can still send +relays to the AppGate server and function as an Application, provided that: +1. Each request contains the '?senderAddress=[address]' query parameter +2. The key associated with the '--signing-key' flag belongs to the address + provided in the request, otherwise the ring signature will not be valid.`, + Args: cobra.NoArgs, + RunE: runAppGateServer, + } + + cmd.Flags().StringVar(&flagSigningKey, "signing-key", "", "The name of the key that will be used to sign relays") + cmd.Flags().StringVar(&flagListeningEndpoint, "listening-endpoint", "http://localhost:42069", "The host and port that the appgate server will listen on") + cmd.Flags().StringVar(&flagCometWebsocketUrl, "comet-websocket-url", "ws://localhost:36657/websocket", "The URL of the comet websocket endpoint to communicate with the pocket blockchain") + cmd.Flags().BoolVar(&flagSelfSigning, "self-signing", false, "Whether the server should sign all incoming requests with its own ring (for applications)") + + cmd.Flags().String(flags.FlagKeyringBackend, "", "Select keyring's backend (os|file|kwallet|pass|test)") + cmd.Flags().String(flags.FlagNode, "tcp://localhost:36657", "The URL of the comet tcp endpoint to communicate with the pocket blockchain") + + return cmd +} + +func runAppGateServer(cmd *cobra.Command, _ []string) error { + // Create a context that is canceled when the command is interrupted + ctx, cancelCtx := context.WithCancel(cmd.Context()) + defer cancelCtx() + + // Handle interrupts in a goroutine. + go func() { + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, os.Interrupt) + + // Block until we receive an interrupt or kill signal (OS-agnostic) + <-sigCh + log.Println("INFO: Interrupt signal received, shutting down...") + + // Signal goroutines to stop + cancelCtx() + }() + + // Parse the listening endpoint. + listeningUrl, err := url.Parse(flagListeningEndpoint) + if err != nil { + return fmt.Errorf("failed to parse listening endpoint: %w", err) + } + + // Setup the AppGate server dependencies. + appGateServerDeps, err := setupAppGateServerDependencies(cmd, ctx, flagCometWebsocketUrl) + if err != nil { + return fmt.Errorf("failed to setup AppGate server dependencies: %w", err) + } + + log.Println("INFO: Creating AppGate server...") + + // Create the AppGate server. + appGateServer, err := appgateserver.NewAppGateServer( + appGateServerDeps, + appgateserver.WithSigningInformation(&appgateserver.SigningInformation{ + // provide the name of the key to use for signing all incoming requests + SigningKeyName: flagSigningKey, + // provide whether the appgate server should sign all incoming requests + // with its own ring (for applications) or not (for gateways) + SelfSigning: flagSelfSigning, + }), + appgateserver.WithListeningUrl(listeningUrl), + ) + if err != nil { + return fmt.Errorf("failed to create AppGate server: %w", err) + } + + log.Printf("INFO: Starting AppGate server, listening on %s...", listeningUrl.String()) + + // Start the AppGate server. + if err := appGateServer.Start(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) { + return fmt.Errorf("failed to start app gate server: %w", err) + } else if errors.Is(err, http.ErrServerClosed) { + log.Println("INFO: AppGate server stopped") + } + + return nil +} + +func setupAppGateServerDependencies(cmd *cobra.Command, ctx context.Context, cometWebsocketUrl string) (depinject.Config, error) { + // Retrieve the client context for the chain interactions. + clientCtx := cosmosclient.GetClientContextFromCmd(cmd) + + // Create the events client. + eventsQueryClient := eventsquery.NewEventsQueryClient(flagCometWebsocketUrl) + + // Create the block client. + log.Printf("INFO: Creating block client, using comet websocket URL: %s...", flagCometWebsocketUrl) + deps := depinject.Supply(eventsQueryClient) + blockClient, err := blockclient.NewBlockClient(ctx, deps, flagCometWebsocketUrl) + if err != nil { + return nil, fmt.Errorf("failed to create block client: %w", err) + } + + // Return the dependencie config. + return depinject.Supply(clientCtx, blockClient), nil +} diff --git a/pkg/appgateserver/endpoint_selector.go b/pkg/appgateserver/endpoint_selector.go new file mode 100644 index 000000000..380c5dad5 --- /dev/null +++ b/pkg/appgateserver/endpoint_selector.go @@ -0,0 +1,45 @@ +package appgateserver + +import ( + "context" + "log" + "net/url" + + sessiontypes "github.com/pokt-network/poktroll/x/session/types" + sharedtypes "github.com/pokt-network/poktroll/x/shared/types" +) + +// TODO_IMPROVE: This implements a naive greedy approach that defaults to the +// first available supplier. Future optimizations (e.g. Quality-of-Service) can be introduced here. +// TODO(@h5law): Look into different endpoint selection depending on their suitability. +// getRelayerUrl gets the URL of the relayer for the given service. +func (app *appGateServer) getRelayerUrl( + ctx context.Context, + serviceId string, + rpcType sharedtypes.RPCType, + session *sessiontypes.Session, +) (supplierUrl *url.URL, supplierAddress string, err error) { + for _, supplier := range session.Suppliers { + for _, service := range supplier.Services { + // Skip services that don't match the requested serviceId. + if service.Service.Id != serviceId { + continue + } + + for _, endpoint := range service.Endpoints { + // Return the first endpoint url that matches the JSON RPC RpcType. + if endpoint.RpcType == rpcType { + supplierUrl, err := url.Parse(endpoint.Url) + if err != nil { + log.Printf("error parsing url: %s", err) + continue + } + return supplierUrl, supplier.Address, nil + } + } + } + } + + // Return an error if no relayer endpoints were found. + return nil, "", ErrAppGateNoRelayEndpoints +} diff --git a/pkg/appgateserver/errors.go b/pkg/appgateserver/errors.go new file mode 100644 index 000000000..2c8f281bd --- /dev/null +++ b/pkg/appgateserver/errors.go @@ -0,0 +1,13 @@ +package appgateserver + +import sdkerrors "cosmossdk.io/errors" + +var ( + codespace = "appgateserver" + ErrAppGateInvalidRelayResponseSignature = sdkerrors.Register(codespace, 1, "invalid relay response signature") + ErrAppGateNoRelayEndpoints = sdkerrors.Register(codespace, 2, "no relay endpoints found") + ErrAppGateInvalidRequestURL = sdkerrors.Register(codespace, 3, "invalid request URL") + ErrAppGateMissingAppAddress = sdkerrors.Register(codespace, 4, "missing application address") + ErrAppGateMissingSigningInformation = sdkerrors.Register(codespace, 5, "missing app client signing information") + ErrAppGateMissingListeningEndpoint = sdkerrors.Register(codespace, 6, "missing app client listening endpoint") +) diff --git a/pkg/appgateserver/jsonrpc.go b/pkg/appgateserver/jsonrpc.go new file mode 100644 index 000000000..3be01cca7 --- /dev/null +++ b/pkg/appgateserver/jsonrpc.go @@ -0,0 +1,132 @@ +package appgateserver + +import ( + "bytes" + "context" + "io" + "log" + "net/http" + + "github.com/cometbft/cometbft/crypto" + + "github.com/pokt-network/poktroll/x/service/types" + sharedtypes "github.com/pokt-network/poktroll/x/shared/types" +) + +// handleJSONRPCRelay handles JSON RPC relay requests. +// It does everything from preparing, signing and sending the request. +// It then blocks on the response to come back and forward it to the provided writer. +func (app *appGateServer) handleJSONRPCRelay( + ctx context.Context, + appAddress, serviceId string, + request *http.Request, + writer http.ResponseWriter, +) error { + // Read the request body bytes. + payloadBz, err := io.ReadAll(request.Body) + if err != nil { + return err + } + + // Create the relay request payload. + relayRequestPayload := &types.RelayRequest_JsonRpcPayload{} + relayRequestPayload.JsonRpcPayload.Unmarshal(payloadBz) + + session, err := app.getCurrentSession(ctx, appAddress, serviceId) + if err != nil { + return err + } + log.Printf("DEBUG: Current session ID: %s", session.SessionId) + + // Get a supplier URL and address for the given service and session. + supplierUrl, supplierAddress, err := app.getRelayerUrl(ctx, serviceId, sharedtypes.RPCType_JSON_RPC, session) + if err != nil { + return err + } + + // Create the relay request. + relayRequest := &types.RelayRequest{ + Meta: &types.RelayRequestMetadata{ + SessionHeader: session.Header, + Signature: nil, // signature added below + }, + Payload: relayRequestPayload, + } + + // Get the application's signer. + signer, err := app.getRingSingerForAppAddress(ctx, appAddress) + if err != nil { + return err + } + + // Hash and sign the request's signable bytes. + signableBz, err := relayRequest.GetSignableBytes() + if err != nil { + return err + } + + hash := crypto.Sha256(signableBz) + signature, err := signer.Sign(hash) + if err != nil { + return err + } + relayRequest.Meta.Signature = signature + + // Marshal the relay request to bytes and create a reader to be used as an HTTP request body. + relayRequestBz, err := relayRequest.Marshal() + if err != nil { + return err + } + relayRequestReader := io.NopCloser(bytes.NewReader(relayRequestBz)) + + // Create the HTTP request to send the request to the relayer. + relayHTTPRequest := &http.Request{ + Method: request.Method, + Header: request.Header, + URL: supplierUrl, + Body: relayRequestReader, + } + + // Perform the HTTP request to the relayer. + log.Printf("DEBUG: Sending signed relay request to %s", supplierUrl) + relayHTTPResponse, err := http.DefaultClient.Do(relayHTTPRequest) + if err != nil { + return err + } + + // Read the response body bytes. + relayResponseBz, err := io.ReadAll(relayHTTPResponse.Body) + if err != nil { + return err + } + + // Unmarshal the response bytes into a RelayResponse. + relayResponse := &types.RelayResponse{} + if err := relayResponse.Unmarshal(relayResponseBz); err != nil { + return err + } + + // Verify the response signature. We use the supplier address that we got from + // the getRelayerUrl function since this is the address we are expecting to sign the response. + // TODO_TECHDEBT: if the RelayResponse is an internal error response, we should not verify the signature + // as in some relayer early failures, it may not be signed by the supplier. + // TODO_IMPROVE: Add more logging & telemetry so we can get visibility and signal into + // failed responses. + log.Println("DEBUG: Verifying signed relay response from...") + if err := app.verifyResponse(ctx, supplierAddress, relayResponse); err != nil { + return err + } + + // Marshal the response payload to bytes to be sent back to the application. + var responsePayloadBz []byte + if _, err = relayResponse.Payload.MarshalTo(responsePayloadBz); err != nil { + return err + } + + // Reply with the RelayResponse payload. + if _, err := writer.Write(relayRequestBz); err != nil { + return err + } + + return nil +} diff --git a/pkg/appgateserver/options.go b/pkg/appgateserver/options.go new file mode 100644 index 000000000..fb164029b --- /dev/null +++ b/pkg/appgateserver/options.go @@ -0,0 +1,19 @@ +package appgateserver + +import ( + "net/url" +) + +// WithSigningInformation sets the signing information for the appgate server. +func WithSigningInformation(signingInfo *SigningInformation) appGateServerOption { + return func(appGateServer *appGateServer) { + appGateServer.signingInformation = signingInfo + } +} + +// WithListeningUrl sets the listening URL for the appgate server. +func WithListeningUrl(listeningUrl *url.URL) appGateServerOption { + return func(appGateServer *appGateServer) { + appGateServer.listeningEndpoint = listeningUrl + } +} diff --git a/pkg/appgateserver/relay_verifier.go b/pkg/appgateserver/relay_verifier.go new file mode 100644 index 000000000..712eda7f9 --- /dev/null +++ b/pkg/appgateserver/relay_verifier.go @@ -0,0 +1,73 @@ +package appgateserver + +import ( + "context" + + "github.com/cometbft/cometbft/crypto" + cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" + accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types" + + "github.com/pokt-network/poktroll/x/service/types" +) + +// verifyResponse verifies the relay response signature. +func (app *appGateServer) verifyResponse( + ctx context.Context, + supplierAddress string, + relayResponse *types.RelayResponse, +) error { + // Get the supplier's public key. + supplierPubKey, err := app.getSupplierPubKeyFromAddress(ctx, supplierAddress) + if err != nil { + return err + } + + // Extract the supplier's signature + supplierSignature := relayResponse.Meta.SupplierSignature + + // Get the relay response signable bytes and hash them. + responseBz, err := relayResponse.GetSignableBytes() + if err != nil { + return err + } + hash := crypto.Sha256(responseBz) + + // Verify the relay response signature. + if !supplierPubKey.VerifySignature(hash, supplierSignature) { + return ErrAppGateInvalidRelayResponseSignature + } + + return nil +} + +// getSupplierPubKeyFromAddress gets the supplier's public key from the cache or +// queries if it is not found. The public key is then cached before being returned. +func (app *appGateServer) getSupplierPubKeyFromAddress( + ctx context.Context, + supplierAddress string, +) (cryptotypes.PubKey, error) { + supplierPubKey, ok := app.supplierAccountCache[supplierAddress] + if ok { + return supplierPubKey, nil + } + + // Query for the supplier account to get the application's public key + // to verify the relay request signature. + accQueryReq := &accounttypes.QueryAccountRequest{Address: supplierAddress} + accQueryRes, err := app.accountQuerier.Account(ctx, accQueryReq) + if err != nil { + return nil, err + } + + // Unmarshal the query response into a BaseAccount. + account := new(accounttypes.BaseAccount) + if err := account.Unmarshal(accQueryRes.Account.Value); err != nil { + return nil, err + } + + fetchedPubKey := account.GetPubKey() + // Cache the retrieved public key. + app.supplierAccountCache[supplierAddress] = fetchedPubKey + + return fetchedPubKey, nil +} diff --git a/pkg/appgateserver/rings.go b/pkg/appgateserver/rings.go new file mode 100644 index 000000000..945a5e13b --- /dev/null +++ b/pkg/appgateserver/rings.go @@ -0,0 +1,146 @@ +// TODO_BLOCKER(@h5law): Move all this logic out into a shared package to avoid +// the duplication of core business logic between `pkg/relayer/proxy/rings.go` +// and `pkg/appgateserver/rings.go` +package appgateserver + +import ( + "context" + "fmt" + "log" + + ring_secp256k1 "github.com/athanorlabs/go-dleq/secp256k1" + ringtypes "github.com/athanorlabs/go-dleq/types" + "github.com/cosmos/cosmos-sdk/codec" + codectypes "github.com/cosmos/cosmos-sdk/codec/types" + "github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1" + accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types" + ring "github.com/noot/ring-go" + + "github.com/pokt-network/poktroll/pkg/signer" + apptypes "github.com/pokt-network/poktroll/x/application/types" +) + +// getRingSingerForAppAddress returns the RingSinger used to sign relays. +// This method first attempts to get the points of the ring from the cache, if it +// fails it queries the application module for the points and creates the ring. +func (app *appGateServer) getRingSingerForAppAddress(ctx context.Context, appAddress string) (*signer.RingSigner, error) { + var ring *ring.Ring + var err error + + // lock the cache for reading + app.ringCacheMutex.RLock() + defer app.ringCacheMutex.RUnlock() + + // check if the ring is in the cache + points, ok := app.ringCache[appAddress] + if !ok { + // if the ring is not in the cache, get it from the application module + log.Printf("DEBUG: No ring cached for address: %s", appAddress) + ring, err = app.getRingForAppAddress(ctx, appAddress) + } else { + // if the ring is in the cache, create it from the points + log.Printf("DEBUG: Ring cached for address: %s", appAddress) + ring, err = newRingFromPoints(points) + } + if err != nil { + log.Printf("ERROR: Unable to get ring for address: %s [%v]", appAddress, err) + return nil, err + } + + // return the ring signer + return signer.NewRingSigner(ring, app.signingInformation.SigningKey), nil +} + +// getRingForAppAddress returns the RingSinger used to sign relays. It does so by fetching +// the latest information from the application module and creating the correct ring. +// This method also caches the ring's public keys for future use. +func (app *appGateServer) getRingForAppAddress(ctx context.Context, appAddress string) (*ring.Ring, error) { + points, err := app.getDelegatedPubKeysForAddress(ctx, appAddress) + if err != nil { + return nil, err + } + return newRingFromPoints(points) +} + +// newRingFromPoints creates a new ring from a slice of points on the secp256k1 curve +func newRingFromPoints(points []ringtypes.Point) (*ring.Ring, error) { + return ring.NewFixedKeyRingFromPublicKeys(ring_secp256k1.NewCurve(), points) +} + +// getDelegatedPubKeysForAddress returns the ring used to sign a message for the given +// application address, by querying the application module for it's delegated pubkeys +// and converting them to points on the secp256k1 curve in order to create the ring. +func (app *appGateServer) getDelegatedPubKeysForAddress( + ctx context.Context, + appAddress string, +) ([]ringtypes.Point, error) { + app.ringCacheMutex.RLock() + defer app.ringCacheMutex.RUnlock() + + // get the application's on chain state + req := &apptypes.QueryGetApplicationRequest{Address: appAddress} + res, err := app.applicationQuerier.Application(ctx, req) + if err != nil { + return nil, fmt.Errorf("unable to retrieve application for address: %s [%w]", appAddress, err) + } + + // create a slice of addresses for the ring + ringAddresses := make([]string, 0) + ringAddresses = append(ringAddresses, appAddress) // app address is index 0 + if len(res.Application.DelegateeGatewayAddresses) < 1 { + // add app address twice to make the ring size of mininmum 2 + // TODO_HACK: We are adding the appAddress twice because a ring + // signature requires AT LEAST two pubKeys. When the Application has + // not delegated to any gateways, we add the application's own address + // twice. This is a HACK and should be investigated as to what is the + // best approach to take in this situation. + ringAddresses = append(ringAddresses, appAddress) + } else if len(res.Application.DelegateeGatewayAddresses) > 0 { + // add the delegatee gateway addresses + ringAddresses = append(ringAddresses, res.Application.DelegateeGatewayAddresses...) + } + + // get the points on the secp256k1 curve for the addresses + points, err := app.addressesToPoints(ctx, ringAddresses) + if err != nil { + return nil, err + } + + // update the cache overwriting the previous value + app.ringCache[appAddress] = points + + // return the public key points on the secp256k1 curve + return points, nil +} + +// addressesToPoints converts a slice of addresses to a slice of points on the +// secp256k1 curve, by querying the account module for the public key for each +// address and converting them to the corresponding points on the secp256k1 curve +func (app *appGateServer) addressesToPoints(ctx context.Context, addresses []string) ([]ringtypes.Point, error) { + curve := ring_secp256k1.NewCurve() + points := make([]ringtypes.Point, len(addresses)) + for i, addr := range addresses { + pubKeyReq := &accounttypes.QueryAccountRequest{Address: addr} + pubKeyRes, err := app.accountQuerier.Account(ctx, pubKeyReq) + if err != nil { + return nil, fmt.Errorf("unable to get account for address: %s [%w]", addr, err) + } + var acc accounttypes.AccountI + reg := codectypes.NewInterfaceRegistry() + accounttypes.RegisterInterfaces(reg) + cdc := codec.NewProtoCodec(reg) + if err := cdc.UnpackAny(pubKeyRes.Account, &acc); err != nil { + return nil, fmt.Errorf("unable to deserialise account for address: %s [%w]", addr, err) + } + key := acc.GetPubKey() + if _, ok := key.(*secp256k1.PubKey); !ok { + return nil, fmt.Errorf("public key is not a secp256k1 key: got %T", key) + } + point, err := curve.DecodeToPoint(key.Bytes()) + if err != nil { + return nil, err + } + points[i] = point + } + return points, nil +} diff --git a/pkg/appgateserver/server.go b/pkg/appgateserver/server.go new file mode 100644 index 000000000..d6410021c --- /dev/null +++ b/pkg/appgateserver/server.go @@ -0,0 +1,284 @@ +package appgateserver + +import ( + "context" + "fmt" + "log" + "net/http" + "net/url" + "strings" + "sync" + + "cosmossdk.io/depinject" + ring_secp256k1 "github.com/athanorlabs/go-dleq/secp256k1" + ringtypes "github.com/athanorlabs/go-dleq/types" + sdkclient "github.com/cosmos/cosmos-sdk/client" + "github.com/cosmos/cosmos-sdk/crypto/keyring" + "github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1" + cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" + accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types" + + blocktypes "github.com/pokt-network/poktroll/pkg/client" + apptypes "github.com/pokt-network/poktroll/x/application/types" + "github.com/pokt-network/poktroll/x/service/types" + sessiontypes "github.com/pokt-network/poktroll/x/session/types" +) + +type SigningInformation struct { + // SelfSigning indicates whether the server is running in self-signing mode + SelfSigning bool + + // SigningKeyName is the name of the key in the keyring that corresponds to the + // private key used to sign relay requests. + SigningKeyName string + + // SigningKey is the scalar point on the appropriate curve corresponding to the + // signer's private key, and is used to sign relay requests via a ring signature + SigningKey ringtypes.Scalar + + // AppAddress is the address of the application that the server is serving if + // If it is nil, then the application address must be included in each request via a query parameter. + AppAddress string +} + +// appGateServer is the server that listens for application requests and relays them to the supplier. +// It is responsible for maintaining the current session for the application, signing the requests, +// and verifying the response signatures. +// The appGateServer is the basis for both applications and gateways, depending on whether the application +// is running their own instance of the appGateServer or they are sending requests to a gateway running an +// instance of the appGateServer, they will need to either include the application address in the request or not. +type appGateServer struct { + // signing information holds the signing key and application address for the server + signingInformation *SigningInformation + + // ringCache is a cache of the public keys used to create the ring for a given application + // they are stored in a map of application address to a slice of points on the secp256k1 curve + // TODO(@h5law): subscribe to on-chain events to update this cache as the ring changes over time + ringCache map[string][]ringtypes.Point + ringCacheMutex *sync.RWMutex + + // clientCtx is the client context for the application. + // It is used to query for the application's account to unmarshal the supplier's account + // and get the public key to verify the relay response signature. + clientCtx sdkclient.Context + + // sessionQuerier is the querier for the session module. + // It used to get the current session for the application given a requested service. + sessionQuerier sessiontypes.QueryClient + + // sessionMu is a mutex to protect currentSession map reads and and updates. + sessionMu sync.RWMutex + + // currentSessions is the current session for the application given a block height. + // It is updated by the goListenForNewSessions goroutine. + currentSessions map[string]*sessiontypes.Session + + // accountQuerier is the querier for the account module. + // It is used to get the the supplier's public key to verify the relay response signature. + accountQuerier accounttypes.QueryClient + + // applicationQuerier is the querier for the application module. + // It is used to get the ring for a given application address. + applicationQuerier apptypes.QueryClient + + // blockClient is the client for the block module. + // It is used to get the current block height to query for the current session. + blockClient blocktypes.BlockClient + + // listeningEndpoint is the endpoint that the appGateServer will listen on. + listeningEndpoint *url.URL + + // server is the HTTP server that will be used capture application requests + // so that they can be signed and relayed to the supplier. + server *http.Server + + // accountCache is a cache of the supplier accounts that has been queried + // TODO_TECHDEBT: Add a size limit to the cache. + supplierAccountCache map[string]cryptotypes.PubKey +} + +func NewAppGateServer( + deps depinject.Config, + opts ...appGateServerOption, +) (*appGateServer, error) { + app := &appGateServer{ + ringCacheMutex: &sync.RWMutex{}, + ringCache: make(map[string][]ringtypes.Point), + currentSessions: make(map[string]*sessiontypes.Session), + supplierAccountCache: make(map[string]cryptotypes.PubKey), + } + + if err := depinject.Inject( + deps, + &app.clientCtx, + &app.blockClient, + ); err != nil { + return nil, err + } + + for _, opt := range opts { + opt(app) + } + + if err := app.validateConfig(); err != nil { + return nil, err + } + + keyRecord, err := app.clientCtx.Keyring.Key(app.signingInformation.SigningKeyName) + if err != nil { + return nil, fmt.Errorf("failed to get key from keyring: %w", err) + } + + appAddress, err := keyRecord.GetAddress() + if err != nil { + return nil, fmt.Errorf("failed to get address from key: %w", err) + } + if app.signingInformation.SelfSigning { + app.signingInformation.AppAddress = appAddress.String() + } + + // Convert the key record to a private key and return the scalar + // point on the secp256k1 curve that it corresponds to. + // If the key is not a secp256k1 key, this will return an error. + signingKey, err := recordLocalToScalar(keyRecord.GetLocal()) + if err != nil { + return nil, fmt.Errorf("failed to convert private key to scalar: %w", err) + } + app.signingInformation.SigningKey = signingKey + + app.sessionQuerier = sessiontypes.NewQueryClient(app.clientCtx) + app.accountQuerier = accounttypes.NewQueryClient(app.clientCtx) + app.applicationQuerier = apptypes.NewQueryClient(app.clientCtx) + app.server = &http.Server{Addr: app.listeningEndpoint.Host} + + return app, nil +} + +// Start starts the appgate server and blocks until the context is done +// or the server returns an error. +func (app *appGateServer) Start(ctx context.Context) error { + // Shutdown the HTTP server when the context is done. + go func() { + <-ctx.Done() + app.server.Shutdown(ctx) + }() + + // Set the HTTP handler. + app.server.Handler = app + + // Start the HTTP server. + return app.server.ListenAndServe() +} + +// Stop stops the appgate server and returns any error that occurred. +func (app *appGateServer) Stop(ctx context.Context) error { + return app.server.Shutdown(ctx) +} + +// ServeHTTP is the HTTP handler for the appgate server. +// It captures the application request, signs it, and sends it to the supplier. +// After receiving the response from the supplier, it verifies the response signature +// before returning the response to the application. +// The serviceId is extracted from the request path. +// The request's path should be of the form: +// +// "://host:port/serviceId[/other/path/segments]?senderAddr=" +// +// where the serviceId is the id of the service that the application is requesting +// and the other (possible) path segments are the JSON RPC request path. +// TODO_TECHDEBT: Revisit the requestPath above based on the SDK that'll be exposed in the future. +func (app *appGateServer) ServeHTTP(writer http.ResponseWriter, request *http.Request) { + ctx := request.Context() + + // Extract the serviceId from the request path. + path := request.URL.Path + serviceId := strings.Split(path, "/")[1] + + // Determine the application address. + appAddress := app.signingInformation.AppAddress + if appAddress == "" { + appAddress = request.URL.Query().Get("senderAddr") + } + if appAddress == "" { + app.replyWithError(writer, ErrAppGateMissingAppAddress) + log.Print("ERROR: no application address provided") + } + + // TODO_TECHDEBT: Currently, there is no information about the RPC type requested. It should + // be extracted from the request and used to determine the RPC type to handle. handle*Relay() + // calls should be wrapped into a switch statement to handle different types of relays. + err := app.handleJSONRPCRelay(ctx, appAddress, serviceId, request, writer) + if err != nil { + // Reply with an error response if there was an error handling the relay. + app.replyWithError(writer, err) + log.Printf("ERROR: failed handling relay: %s", err) + return + } + + log.Print("INFO: request serviced successfully") +} + +// replyWithError replies to the application with an error response. +// TODO_TECHDEBT: This method should be aware of the nature of the error to use the appropriate JSONRPC +// Code, Message and Data. Possibly by augmenting the passed in error with the adequate information. +func (app *appGateServer) replyWithError(writer http.ResponseWriter, err error) { + relayResponse := &types.RelayResponse{ + Payload: &types.RelayResponse_JsonRpcPayload{ + JsonRpcPayload: &types.JSONRPCResponsePayload{ + Id: make([]byte, 0), + Jsonrpc: "2.0", + Error: &types.JSONRPCResponseError{ + // Using conventional error code indicating internal server error. + Code: -32000, + Message: err.Error(), + Data: nil, + }, + }, + }, + } + + relayResponseBz, err := relayResponse.Marshal() + if err != nil { + log.Printf("ERROR: failed marshaling relay response: %s", err) + return + } + + if _, err = writer.Write(relayResponseBz); err != nil { + log.Printf("ERROR: failed writing relay response: %s", err) + return + } +} + +// validateConfig validates the appGateServer configuration. +func (app *appGateServer) validateConfig() error { + if app.signingInformation == nil { + return ErrAppGateMissingSigningInformation + } + if app.listeningEndpoint == nil { + return ErrAppGateMissingListeningEndpoint + } + return nil +} + +// recordLocalToScalar converts the private key obtained from a +// key record to a scalar point on the secp256k1 curve +func recordLocalToScalar(local *keyring.Record_Local) (ringtypes.Scalar, error) { + if local == nil { + return nil, fmt.Errorf("cannot extract private key from key record: nil") + } + priv, ok := local.PrivKey.GetCachedValue().(cryptotypes.PrivKey) + if !ok { + return nil, fmt.Errorf("cannot extract private key from key record: %T", local.PrivKey.GetCachedValue()) + } + if _, ok := priv.(*secp256k1.PrivKey); !ok { + return nil, fmt.Errorf("unexpected private key type: %T, want %T", priv, &secp256k1.PrivKey{}) + } + crv := ring_secp256k1.NewCurve() + privKey, err := crv.DecodeToScalar(priv.Bytes()) + if err != nil { + return nil, fmt.Errorf("failed to decode private key: %w", err) + } + return privKey, nil +} + +type appGateServerOption func(*appGateServer) diff --git a/pkg/appgateserver/session.go b/pkg/appgateserver/session.go new file mode 100644 index 000000000..5db25465d --- /dev/null +++ b/pkg/appgateserver/session.go @@ -0,0 +1,47 @@ +package appgateserver + +import ( + "context" + + sessiontypes "github.com/pokt-network/poktroll/x/session/types" + sharedtypes "github.com/pokt-network/poktroll/x/shared/types" +) + +// getCurrentSession gets the current session for the given service +// It returns the current session if it exists and is still valid, otherwise it +// queries for the latest session, caches and returns it. +func (app *appGateServer) getCurrentSession( + ctx context.Context, + appAddress, serviceId string, +) (*sessiontypes.Session, error) { + app.sessionMu.RLock() + defer app.sessionMu.RUnlock() + + latestBlock := app.blockClient.LatestBlock(ctx) + if currentSession, ok := app.currentSessions[serviceId]; ok { + sessionEndBlockHeight := currentSession.Header.SessionStartBlockHeight + currentSession.NumBlocksPerSession + + // Return the current session if it is still valid. + if latestBlock.Height() < sessionEndBlockHeight { + return currentSession, nil + } + } + + // Query for the current session. + sessionQueryReq := sessiontypes.QueryGetSessionRequest{ + ApplicationAddress: appAddress, + Service: &sharedtypes.Service{Id: serviceId}, + BlockHeight: latestBlock.Height(), + } + sessionQueryRes, err := app.sessionQuerier.GetSession(ctx, &sessionQueryReq) + if err != nil { + return nil, err + } + + session := sessionQueryRes.Session + + // Cache the current session. + app.currentSessions[serviceId] = session + + return session, nil +} diff --git a/pkg/client/block/client.go b/pkg/client/block/client.go index 3fd6489a7..375171d28 100644 --- a/pkg/client/block/client.go +++ b/pkg/client/block/client.go @@ -82,8 +82,7 @@ func NewBlockClient( ) (client.BlockClient, error) { // Initialize block client bClient := &blockClient{endpointURL: cometWebsocketURL} - bClient.latestBlockObsvbls, bClient.latestBlockObsvblsReplayPublishCh = - channel.NewReplayObservable[client.BlocksObservable](ctx, latestBlockObsvblsReplayBufferSize) + bClient.latestBlockObsvbls, bClient.latestBlockObsvblsReplayPublishCh = channel.NewReplayObservable[client.BlocksObservable](ctx, latestBlockObsvblsReplayBufferSize) // Inject dependencies if err := depinject.Inject(deps, &bClient.eventsClient); err != nil { @@ -141,7 +140,9 @@ func (bClient *blockClient) goPublishBlocks(ctx context.Context) { // If we get here, the retry limit was reached and the retry loop exited. // Since this function runs in a goroutine, we can't return the error to the // caller. Instead, we panic. - panic(fmt.Errorf("BlockClient.goPublishBlocks shold never reach this spot: %w", publishErr)) + if publishErr != nil { + panic(fmt.Errorf("BlockClient.goPublishBlocks should never reach this spot: %w", publishErr)) + } } // retryPublishBlocksFactory returns a function which is intended to be passed to diff --git a/pkg/client/gomock_reflect_3526400147/prog.go b/pkg/client/gomock_reflect_3526400147/prog.go deleted file mode 100644 index 6003ba81a..000000000 --- a/pkg/client/gomock_reflect_3526400147/prog.go +++ /dev/null @@ -1,66 +0,0 @@ -package main - -import ( - "encoding/gob" - "flag" - "fmt" - "os" - "path" - "reflect" - - "github.com/golang/mock/mockgen/model" - - pkg_ "github.com/pokt-network/poktroll/pkg/client" -) - -var output = flag.String("output", "", "The output file name, or empty to use stdout.") - -func main() { - flag.Parse() - - its := []struct { - sym string - typ reflect.Type - }{ - - {"TxContext", reflect.TypeOf((*pkg_.TxContext)(nil)).Elem()}, - - {"TxClient", reflect.TypeOf((*pkg_.TxClient)(nil)).Elem()}, - } - pkg := &model.Package{ - // NOTE: This behaves contrary to documented behaviour if the - // package name is not the final component of the import path. - // The reflect package doesn't expose the package name, though. - Name: path.Base("github.com/pokt-network/poktroll/pkg/client"), - } - - for _, it := range its { - intf, err := model.InterfaceFromInterfaceType(it.typ) - if err != nil { - fmt.Fprintf(os.Stderr, "Reflection: %v\n", err) - os.Exit(1) - } - intf.Name = it.sym - pkg.Interfaces = append(pkg.Interfaces, intf) - } - - outfile := os.Stdout - if len(*output) != 0 { - var err error - outfile, err = os.Create(*output) - if err != nil { - fmt.Fprintf(os.Stderr, "failed to open output file %q", *output) - } - defer func() { - if err := outfile.Close(); err != nil { - fmt.Fprintf(os.Stderr, "failed to close output file %q", *output) - os.Exit(1) - } - }() - } - - if err := gob.NewEncoder(outfile).Encode(pkg); err != nil { - fmt.Fprintf(os.Stderr, "gob encode: %v\n", err) - os.Exit(1) - } -} diff --git a/pkg/relayer/proxy/errors.go b/pkg/relayer/proxy/errors.go index 6a0815b58..e16d41e17 100644 --- a/pkg/relayer/proxy/errors.go +++ b/pkg/relayer/proxy/errors.go @@ -10,4 +10,6 @@ var ( ErrRelayerProxyInvalidSupplier = sdkerrors.Register(codespace, 4, "invalid relayer proxy supplier") ErrRelayerProxyUndefinedSigningKeyName = sdkerrors.Register(codespace, 5, "undefined relayer proxy signing key name") ErrRelayerProxyUndefinedProxiedServicesEndpoints = sdkerrors.Register(codespace, 6, "undefined proxied services endpoints for relayer proxy") + ErrRelayerProxyInvalidRelayRequest = sdkerrors.Register(codespace, 7, "invalid relay request") + ErrRelayerProxyInvalidRelayResponse = sdkerrors.Register(codespace, 8, "invalid relay response") ) diff --git a/pkg/relayer/proxy/proxy.go b/pkg/relayer/proxy/proxy.go index 58b9549fd..9ed38e963 100644 --- a/pkg/relayer/proxy/proxy.go +++ b/pkg/relayer/proxy/proxy.go @@ -3,8 +3,10 @@ package proxy import ( "context" "net/url" + "sync" "cosmossdk.io/depinject" + ringtypes "github.com/athanorlabs/go-dleq/types" sdkclient "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/crypto/keyring" accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types" @@ -14,6 +16,7 @@ import ( "github.com/pokt-network/poktroll/pkg/observable" "github.com/pokt-network/poktroll/pkg/observable/channel" "github.com/pokt-network/poktroll/pkg/relayer" + apptypes "github.com/pokt-network/poktroll/x/application/types" "github.com/pokt-network/poktroll/x/service/types" sessiontypes "github.com/pokt-network/poktroll/x/session/types" suppliertypes "github.com/pokt-network/poktroll/x/supplier/types" @@ -54,6 +57,10 @@ type relayerProxy struct { // which is needed to check if the relay proxy should be serving an incoming relay request. sessionQuerier sessiontypes.QueryClient + // applicationQuerier is the querier for the application module. + // It is used to get the ring for a given application address. + applicationQuerier apptypes.QueryClient + // advertisedRelayServers is a map of the services provided by the relayer proxy. Each provided service // has the necessary information to start the server that listens for incoming relay requests and // the client that relays the request to the supported proxied service. @@ -69,6 +76,12 @@ type relayerProxy struct { // servedRelays observable can fan out the notifications to its subscribers. servedRelaysProducer chan<- *types.Relay + // ringCache is a cache of the public keys used to create the ring for a given application + // they are stored in a map of application address to a slice of points on the secp256k1 curve + // TODO(@h5law): subscribe to on-chain events to update this cache as the ring changes over time + ringCache map[string][]ringtypes.Point + ringCacheMutex *sync.RWMutex + // clientCtx is the Cosmos' client context used to build the needed query clients and unmarshal their replies. clientCtx sdkclient.Context diff --git a/pkg/relayer/proxy/relay_signer.go b/pkg/relayer/proxy/relay_signer.go index 5ab929cbe..ac3ec2089 100644 --- a/pkg/relayer/proxy/relay_signer.go +++ b/pkg/relayer/proxy/relay_signer.go @@ -1,8 +1,10 @@ package proxy import ( + sdkerrors "cosmossdk.io/errors" "github.com/cometbft/cometbft/crypto" + "github.com/pokt-network/poktroll/pkg/signer" "github.com/pokt-network/poktroll/x/service/types" ) @@ -12,14 +14,23 @@ import ( // that should not be responsible for signing relay responses. // See https://github.com/pokt-network/poktroll/issues/160 for a better design. func (rp *relayerProxy) SignRelayResponse(relayResponse *types.RelayResponse) error { - var responseBz []byte - _, err := relayResponse.MarshalTo(responseBz) + // create a simple signer for the request + signer := signer.NewSimpleSigner(rp.keyring, rp.signingKeyName) + + // extract and hash the relay response's signable bytes + signableBz, err := relayResponse.GetSignableBytes() if err != nil { - return err + return sdkerrors.Wrapf(ErrRelayerProxyInvalidRelayResponse, "error getting signable bytes: %v", err) } + hash := crypto.Sha256(signableBz) - hash := crypto.Sha256(responseBz) - relayResponse.Meta.SupplierSignature, _, err = rp.keyring.Sign(rp.signingKeyName, hash) + // sign the relay response + sig, err := signer.Sign(hash) + if err != nil { + return sdkerrors.Wrapf(ErrRelayerProxyInvalidRelayResponse, "error signing relay response: %v", err) + } - return err + // set the relay response's signature + relayResponse.Meta.SupplierSignature = sig + return nil } diff --git a/pkg/relayer/proxy/relay_verifier.go b/pkg/relayer/proxy/relay_verifier.go index db9cbe7dc..e64955d55 100644 --- a/pkg/relayer/proxy/relay_verifier.go +++ b/pkg/relayer/proxy/relay_verifier.go @@ -3,8 +3,10 @@ package proxy import ( "context" + sdkerrors "cosmossdk.io/errors" + ring_secp256k1 "github.com/athanorlabs/go-dleq/secp256k1" "github.com/cometbft/cometbft/crypto" - accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types" + "github.com/noot/ring-go" "github.com/pokt-network/poktroll/x/service/types" sessiontypes "github.com/pokt-network/poktroll/x/session/types" @@ -17,33 +19,63 @@ func (rp *relayerProxy) VerifyRelayRequest( relayRequest *types.RelayRequest, service *sharedtypes.Service, ) error { - // Query for the application account to get the application's public key to verify the relay request signature. - applicationAddress := relayRequest.Meta.SessionHeader.ApplicationAddress - accQueryReq := &accounttypes.QueryAccountRequest{Address: applicationAddress} - accQueryRes, err := rp.accountsQuerier.Account(ctx, accQueryReq) + // extract the relay request's ring signature + signature := relayRequest.Meta.Signature + if signature == nil { + return sdkerrors.Wrapf( + ErrRelayerProxyInvalidRelayRequest, + "missing signature from relay request: %v", relayRequest, + ) + } + + ringSig := new(ring.RingSig) + if err := ringSig.Deserialize(ring_secp256k1.NewCurve(), signature); err != nil { + return sdkerrors.Wrapf( + ErrRelayerProxyInvalidRelayRequestSignature, + "error deserializing ring signature: %v", err, + ) + } + + // get the ring for the application address of the relay request + appAddress := relayRequest.Meta.SessionHeader.ApplicationAddress + appRing, err := rp.getRingForAppAddress(ctx, appAddress) if err != nil { - return err + return sdkerrors.Wrapf( + ErrRelayerProxyInvalidRelayRequest, + "error getting ring for application address %s: %v", appAddress, err, + ) } - var payloadBz []byte - if _, err = relayRequest.Payload.MarshalTo(payloadBz); err != nil { - return err + // verify the ring signature against the ring + if !ringSig.Ring().Equals(appRing) { + return sdkerrors.Wrapf( + ErrRelayerProxyInvalidRelayRequestSignature, + "ring signature does not match ring for application address %s", appAddress, + ) } - hash := crypto.Sha256(payloadBz) - account := new(accounttypes.BaseAccount) - if err := account.Unmarshal(accQueryRes.Account.Value); err != nil { - return err + // get and hash the signable bytes of the relay request + signableBz, err := relayRequest.GetSignableBytes() + if err != nil { + return sdkerrors.Wrapf(ErrRelayerProxyInvalidRelayRequest, "error getting signable bytes: %v", err) } - if !account.GetPubKey().VerifySignature(hash, relayRequest.Meta.Signature) { - return ErrRelayerProxyInvalidRelayRequestSignature + hash := crypto.Sha256(signableBz) + var hash32 [32]byte + copy(hash32[:], hash) + + // verify the relay request's signature + if valid := ringSig.Verify(hash32); !valid { + return sdkerrors.Wrapf( + ErrRelayerProxyInvalidRelayRequestSignature, + "invalid ring signature", + ) } // Query for the current session to check if relayRequest sessionId matches the current session. currentBlock := rp.blockClient.LatestBlock(ctx) sessionQuery := &sessiontypes.QueryGetSessionRequest{ - ApplicationAddress: applicationAddress, + ApplicationAddress: appAddress, Service: service, BlockHeight: currentBlock.Height(), } diff --git a/pkg/relayer/proxy/rings.go b/pkg/relayer/proxy/rings.go new file mode 100644 index 000000000..59a19ae70 --- /dev/null +++ b/pkg/relayer/proxy/rings.go @@ -0,0 +1,119 @@ +// TODO_BLOCKER(@h5law): Move all this logic out into a shared package to avoid +// the duplication of core business logic between `pkg/relayer/proxy/rings.go` +// and `pkg/appgateserver/rings.go` +package proxy + +import ( + "context" + "fmt" + + ring_secp256k1 "github.com/athanorlabs/go-dleq/secp256k1" + ringtypes "github.com/athanorlabs/go-dleq/types" + "github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1" + accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types" + ring "github.com/noot/ring-go" + apptypes "github.com/pokt-network/poktroll/x/application/types" +) + +// getRingForAppAddress returns the RingSinger used to sign relays. It does so by fetching +// the latest information from the application module and creating the correct ring. +// This method also caches the ring's public keys for future use. +func (rp *relayerProxy) getRingForAppAddress(ctx context.Context, appAddress string) (*ring.Ring, error) { + // lock the cache for reading + rp.ringCacheMutex.RLock() + + // check if the ring is in the cache + points, ok := rp.ringCache[appAddress] + rp.ringCacheMutex.RUnlock() // unlock the cache incase not found in cache + var err error + if !ok { + // if the ring is not in the cache, get it from the application module + points, err = rp.getDelegatedPubKeysForAddress(ctx, appAddress) + } + if err != nil { + return nil, err + } + + // create the ring from the points + return newRingFromPoints(points) +} + +// newRingFromPoints creates a new ring from a slice of points on the secp256k1 curve +func newRingFromPoints(points []ringtypes.Point) (*ring.Ring, error) { + return ring.NewFixedKeyRingFromPublicKeys(ring_secp256k1.NewCurve(), points) +} + +// getDelegatedPubKeysForAddress returns the ring used to sign a message for the given +// application address, by querying the application module for it's delegated pubkeys +// and converting them to points on the secp256k1 curve in order to create the ring. +func (rp *relayerProxy) getDelegatedPubKeysForAddress( + ctx context.Context, + appAddress string, +) ([]ringtypes.Point, error) { + rp.ringCacheMutex.RLock() + defer rp.ringCacheMutex.RUnlock() + + // get the application's on chain state + req := &apptypes.QueryGetApplicationRequest{Address: appAddress} + res, err := rp.applicationQuerier.Application(ctx, req) + if err != nil { + return nil, fmt.Errorf("unable to retrieve application for address: %s [%w]", appAddress, err) + } + + // create a slice of addresses for the ring + ringAddresses := make([]string, 0) + ringAddresses = append(ringAddresses, appAddress) // app address is index 0 + if len(res.Application.DelegateeGatewayAddresses) < 1 { + // add app address twice to make the ring size of mininmum 2 + // TODO_TECHDEBT: We are adding the appAddress twice because a ring + // signature requires AT LEAST two pubKeys. When the Application has + // not delegated to any gateways, we add the application's own address + // twice. This is a HACK and should be investigated as to what is the + // best approach to take in this situation. + ringAddresses = append(ringAddresses, appAddress) + } else if len(res.Application.DelegateeGatewayAddresses) > 0 { + // add the delegatee gateway addresses + ringAddresses = append(ringAddresses, res.Application.DelegateeGatewayAddresses...) + } + + // get the points on the secp256k1 curve for the addresses + points, err := rp.addressesToPoints(ctx, ringAddresses) + if err != nil { + return nil, err + } + + // update the cache overwriting the previous value + rp.ringCache[appAddress] = points + + // return the public key points on the secp256k1 curve + return points, nil +} + +// addressesToPoints converts a slice of addresses to a slice of points on the +// secp256k1 curve, by querying the account module for the public key for each +// address and converting them to the corresponding points on the secp256k1 curve +func (rp *relayerProxy) addressesToPoints(ctx context.Context, addresses []string) ([]ringtypes.Point, error) { + curve := ring_secp256k1.NewCurve() + points := make([]ringtypes.Point, len(addresses)) + for i, addr := range addresses { + pubKeyReq := &accounttypes.QueryAccountRequest{Address: addr} + pubKeyRes, err := rp.accountsQuerier.Account(ctx, pubKeyReq) + if err != nil { + return nil, fmt.Errorf("unable to get account for address: %s [%w]", addr, err) + } + acc := new(accounttypes.BaseAccount) + if err := acc.Unmarshal(pubKeyRes.Account.Value); err != nil { + return nil, fmt.Errorf("unable to deserialise account for address: %s [%w]", addr, err) + } + key := acc.GetPubKey() + if _, ok := key.(*secp256k1.PubKey); !ok { + return nil, fmt.Errorf("public key is not a secp256k1 key: got %T", key) + } + point, err := curve.DecodeToPoint(key.Bytes()) + if err != nil { + return nil, err + } + points[i] = point + } + return points, nil +} diff --git a/pkg/signer/interface.go b/pkg/signer/interface.go new file mode 100644 index 000000000..5d91ae42a --- /dev/null +++ b/pkg/signer/interface.go @@ -0,0 +1,9 @@ +package signer + +// Signer is an interface that abstracts the signing of a message, it is used +// to sign both relay requests and responses via one of the two implementations. +// The Signer interface expects a 32 byte message (sha256 hash) and returns a +// byte slice containing the signature or any error that occurred during signing. +type Signer interface { + Sign(msg []byte) (signature []byte, err error) +} diff --git a/pkg/signer/ring_signer.go b/pkg/signer/ring_signer.go new file mode 100644 index 000000000..de401c25f --- /dev/null +++ b/pkg/signer/ring_signer.go @@ -0,0 +1,38 @@ +package signer + +import ( + "fmt" + + ringtypes "github.com/athanorlabs/go-dleq/types" + ring "github.com/noot/ring-go" +) + +var _ Signer = (*RingSigner)(nil) + +// RingSigner is a signer implementation that uses a ring to sign messages, for +// verification the ring signature must be verified and confirmed to be using +// the expected ring. +type RingSigner struct { + ring *ring.Ring + privKey ringtypes.Scalar +} + +// NewRingSigner creates a new RingSigner instance with the ring and private key provided +func NewRingSigner(ring *ring.Ring, privKey ringtypes.Scalar) *RingSigner { + return &RingSigner{ring: ring, privKey: privKey} +} + +// Sign uses the ring and private key to sign the message provided and returns the +// serialised ring signature that can be deserialised and verified by the verifier +func (r *RingSigner) Sign(msg []byte) ([]byte, error) { + if len(msg) != 32 { + return nil, fmt.Errorf("message must be 32 bytes long, got %d", len(msg)) + } + var msg32 [32]byte + copy(msg32[:], msg) + ringSig, err := r.ring.Sign(msg32, r.privKey) + if err != nil { + return nil, fmt.Errorf("failed to sign message [%v]: %w", msg, err) + } + return ringSig.Serialize() +} diff --git a/pkg/signer/simple_signer.go b/pkg/signer/simple_signer.go new file mode 100644 index 000000000..208d56f67 --- /dev/null +++ b/pkg/signer/simple_signer.go @@ -0,0 +1,23 @@ +package signer + +import "github.com/cosmos/cosmos-sdk/crypto/keyring" + +var _ Signer = (*SimpleSigner)(nil) + +// SimpleSigner is a signer implementation that uses the local keyring to sign +// messages, for verification using the signer's corresponding public key +type SimpleSigner struct { + keyring keyring.Keyring + keyName string +} + +// NewSimpleSigner creates a new SimpleSigner instance with the keyring and keyName provided +func NewSimpleSigner(keyring keyring.Keyring, keyName string) *SimpleSigner { + return &SimpleSigner{keyring: keyring, keyName: keyName} +} + +// Sign signs the given message using the SimpleSigner's keyring and keyName +func (s *SimpleSigner) Sign(msg []byte) (signature []byte, err error) { + sig, _, err := s.keyring.Sign(s.keyName, msg[:]) + return sig, err +} diff --git a/x/service/types/relay.go b/x/service/types/relay.go new file mode 100644 index 000000000..c7b2a1894 --- /dev/null +++ b/x/service/types/relay.go @@ -0,0 +1,23 @@ +package types + +// GetSignableBytes returns the signable bytes for the relay request +// this involves setting the signature to nil and marshaling the message. +// A value receiver is used to avoid overwriting any pre-existing signature +func (req RelayRequest) GetSignableBytes() ([]byte, error) { + // set signature to nil + req.Meta.Signature = nil + + // return the marshaled message + return req.Marshal() +} + +// GetSignableBytes returns the signable bytes for the relay response +// this involves setting the signature to nil and marshaling the message. +// A value receiver is used to avoid overwriting any pre-existing signature +func (res RelayResponse) GetSignableBytes() ([]byte, error) { + // set signature to nil + res.Meta.SupplierSignature = nil + + // return the marshaled message + return res.Marshal() +}