diff --git a/Makefile b/Makefile index 147a86b2b..5330a6fdb 100644 --- a/Makefile +++ b/Makefile @@ -170,6 +170,29 @@ proto_clean_pulsar: ## TODO_TECHDEBT(@bryanchriswhite): Add a proper explanation .PHONY: proto_regen proto_regen: proto_clean proto_ignite_gen proto_fix_self_import ## Regenerate protobuf artifacts +######################## +### Localnet Helpers ### +######################## + +.PHONY: localnet_relayminer1_ping +localnet_relayminer1_ping: + @echo "Pinging relayminer 1..." + @curl -X GET localhost:7001 || (echo "Failed to ping relayminer1. Make sure your localnet environment or the relayminer 1 pod is up and running"; exit 1) + @echo "OK" + +.PHONY: localnet_relayminer2_ping +localnet_relayminer2_ping: + @echo "Pinging relayminer 2..." + @curl -X GET localhost:7002 || (echo "Failed to ping relayminer2. Make sure your localnet environment or the relayminer 2 pod is up and running"; exit 1) + @echo "OK" + +.PHONY: localnet_relayminer3_ping +localnet_relayminer3_ping: + @echo "Pinging relayminer 3..." + @curl -X GET localhost:7003 || (echo "Failed to ping relayminer3. Make sure your localnet environment or the relayminer 3 pod is up and running"; exit 1) + @echo "OK" + + ####################### ### Docker Helpers ### ####################### @@ -404,7 +427,6 @@ act_reviewdog: check_act check_gh ## Run the reviewdog workflow locally like so: @echo "Detected architecture: $(CONTAINER_ARCH)" act -v -s GITHUB_TOKEN=$(GITHUB_TOKEN) -W .github/workflows/reviewdog.yml --container-architecture $(CONTAINER_ARCH) - ########################### ### Release Helpers ### ########################### diff --git a/Tiltfile b/Tiltfile index 7a9af482d..1e05fe885 100644 --- a/Tiltfile +++ b/Tiltfile @@ -264,10 +264,8 @@ helm_resource( actor_number = 0 for x in range(localnet_config["relayminers"]["count"]): actor_number = actor_number + 1 - helm_resource( - "relayminer" + str(actor_number), - chart_prefix + "relayminer", - flags=[ + + flags=[ "--values=./localnet/kubernetes/values-common.yaml", "--values=./localnet/kubernetes/values-relayminer-common.yaml", "--values=./localnet/kubernetes/values-relayminer-" + str(actor_number) + ".yaml", @@ -275,10 +273,38 @@ for x in range(localnet_config["relayminers"]["count"]): "--set=development.delve.enabled=" + str(localnet_config["relayminers"]["delve"]["enabled"]), "--set=logLevel=" + str(localnet_config["relayminers"]["logs"]["level"]), "--set=image.repository=poktrolld", - ], + ] + + supplier_number = 0 + + flags.append("--set=config.suppliers["+str(supplier_number)+"].service_id=anvil") + flags.append("--set=config.suppliers["+str(supplier_number)+"].listen_url=http://0.0.0.0:8545") + flags.append("--set=config.suppliers["+str(supplier_number)+"].service_config.backend_url=http://anvil:8547/") + flags.append("--set=config.suppliers["+str(supplier_number)+"].service_config.publicly_exposed_endpoints[0]=relayminer"+str(actor_number)) + supplier_number = supplier_number + 1 + + if localnet_config["rest"]["enabled"]: + flags.append("--set=config.suppliers["+str(supplier_number)+"].service_id=rest") + flags.append("--set=config.suppliers["+str(supplier_number)+"].listen_url=http://0.0.0.0:8545") + flags.append("--set=config.suppliers["+str(supplier_number)+"].service_config.backend_url=http://rest:10000/") + flags.append("--set=config.suppliers["+str(supplier_number)+"].service_config.publicly_exposed_endpoints[0]=relayminer"+str(actor_number)) + supplier_number = supplier_number + 1 + + if localnet_config["ollama"]["enabled"]: + flags.append("--set=config.suppliers["+str(supplier_number)+"].service_id=ollama") + flags.append("--set=config.suppliers["+str(supplier_number)+"].listen_url=http://0.0.0.0:8545") + flags.append("--set=config.suppliers["+str(supplier_number)+"].service_config.backend_url=http://ollama:11434/") + flags.append("--set=config.suppliers["+str(supplier_number)+"].service_config.publicly_exposed_endpoints[0]=relayminer"+str(actor_number)) + supplier_number = supplier_number + 1 + + helm_resource( + "relayminer" + str(actor_number), + chart_prefix + "relayminer", + flags=flags, image_deps=["poktrolld"], image_keys=[("image.repository", "image.tag")], ) + k8s_resource( "relayminer" + str(actor_number), labels=["suppliers"], @@ -299,6 +325,7 @@ for x in range(localnet_config["relayminers"]["count"]): # Use with pprof like this: `go tool pprof -http=:3333 http://localhost:6070/debug/pprof/goroutine` str(6069 + actor_number) + ":6060", # Relayminer pprof port. relayminer1 - exposes 6070, relayminer2 exposes 6071, etc. + str(7000 + actor_number) + ":8081", # Relayminer ping port. relayminer1 - exposes 7001, relayminer2 exposes 7002, etc. ], ) diff --git a/docusaurus/docs/operate/configs/relayminer_config.md b/docusaurus/docs/operate/configs/relayminer_config.md index 3b1272b98..f9967f009 100644 --- a/docusaurus/docs/operate/configs/relayminer_config.md +++ b/docusaurus/docs/operate/configs/relayminer_config.md @@ -23,6 +23,7 @@ You can find a fully featured example configuration at [relayminer_config_full_e - [`smt_store_path`](#smt_store_path) - [`metrics`](#metrics) - [`pprof`](#pprof) + - [`ping`](#ping) - [Pocket node connectivity](#pocket-node-connectivity) - [`query_node_rpc_url`](#query_node_rpc_url) - [`query_node_grpc_url`](#query_node_grpc_url) @@ -173,6 +174,23 @@ pprof: You can learn how to use that endpoint on the [Performance Troubleshooting](../../develop/developer_guide/performance_troubleshooting.md) page. +### `ping` + +_`Optional`_ + +Configures a `ping` server to test the connectivity of all backend URLs. If +all the backend URLs are reachable, the endpoint returns a 204 HTTP +Code. If one or more backend URLs aren't reachable, the service +returns an appropriate HTTP error. + +Example configuration: + +```yaml +ping: + enabled: true + addr: localhost:8081 +``` + ## Pocket node connectivity ```yaml @@ -528,4 +546,4 @@ can disrupt the operator’s participation in the Pocket Network. To maintain a smooth operation, avoid being slashed, and earn your rewards, operators must plan and manage their account balance as part of their operational procedures. -::: \ No newline at end of file +::: diff --git a/go.mod b/go.mod index 162807307..8e32e4018 100644 --- a/go.mod +++ b/go.mod @@ -80,6 +80,7 @@ require ( require ( cosmossdk.io/x/tx v0.13.4 + github.com/foxcpp/go-mockdns v1.1.0 github.com/jhump/protoreflect v1.16.0 github.com/mitchellh/mapstructure v1.5.0 ) @@ -223,6 +224,7 @@ require ( github.com/manifoldco/promptui v0.9.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/miekg/dns v1.1.57 // indirect github.com/minio/highwayhash v1.0.2 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/go-testing-interface v1.14.1 // indirect diff --git a/go.sum b/go.sum index 761f5909e..7edca2823 100644 --- a/go.sum +++ b/go.sum @@ -498,6 +498,8 @@ github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2 github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/foxcpp/go-mockdns v1.1.0 h1:jI0rD8M0wuYAxL7r/ynTrCQQq0BVqfB99Vgk7DlmewI= +github.com/foxcpp/go-mockdns v1.1.0/go.mod h1:IhLeSFGed3mJIAXPH2aiRQB+kqz7oqu8ld2qVbOu7Wk= github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= @@ -889,6 +891,8 @@ github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzp github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= +github.com/miekg/dns v1.1.57 h1:Jzi7ApEIzwEPLHWRcafCN9LZSBbqQpxjt/wpgvg7wcM= +github.com/miekg/dns v1.1.57/go.mod h1:uqRjCRUuEAA6qsOiJvDd+CFo/vW+y5WR6SNmHE55hZk= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= @@ -1235,6 +1239,9 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g= golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1276,6 +1283,9 @@ golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/mod v0.19.0 h1:fEdghXQSo20giMthA7cd28ZC+jts4amQ3YMXiP5oMQ8= golang.org/x/mod v0.19.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1337,6 +1347,11 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug golang.org/x/net v0.0.0-20220909164309-bea034e7d591/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= golang.org/x/net v0.0.0-20221014081412-f15817d10f9b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -1380,6 +1395,10 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1479,13 +1498,22 @@ golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/term v0.14.0/go.mod h1:TySc+nGkYR6qt8km8wUhuFRTVSMIX3XPR58y2lC8vww= golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q= golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1498,6 +1526,10 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1569,6 +1601,9 @@ golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= +golang.org/x/tools v0.15.0/go.mod h1:hpksKq4dtpQWS1uQ61JkdqWM3LscIS6Slf+VVkm+wQk= golang.org/x/tools v0.23.0 h1:SGsXPZ+2l4JsgaCKkx+FQ9YZ5XEtA1GZYuoDjenLjvg= golang.org/x/tools v0.23.0/go.mod h1:pnu6ufv6vQkll6szChhK3C3L/ruaIv5eBeztNG8wtsI= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/localnet/kubernetes/values-relayminer-1.yaml b/localnet/kubernetes/values-relayminer-1.yaml index b17bd5765..0a365afe9 100644 --- a/localnet/kubernetes/values-relayminer-1.yaml +++ b/localnet/kubernetes/values-relayminer-1.yaml @@ -4,22 +4,4 @@ config: signing_key_name: supplier1 default_signing_key_names: [supplier1] - suppliers: - - service_id: anvil - listen_url: http://0.0.0.0:8545 - service_config: - backend_url: http://anvil:8547/ - publicly_exposed_endpoints: - - relayminer1 - - service_id: ollama - listen_url: http://0.0.0.0:8545 - service_config: - backend_url: http://ollama:11434/ - publicly_exposed_endpoints: - - relayminer1 - - service_id: rest - listen_url: http://0.0.0.0:8545 - service_config: - backend_url: http://rest:10000/ - publicly_exposed_endpoints: - - relayminer1 + suppliers: [] # suppliers list is dynamically defined in poktroll/Tiltfile. diff --git a/localnet/kubernetes/values-relayminer-2.yaml b/localnet/kubernetes/values-relayminer-2.yaml index de12138d4..2d67535b5 100644 --- a/localnet/kubernetes/values-relayminer-2.yaml +++ b/localnet/kubernetes/values-relayminer-2.yaml @@ -4,16 +4,4 @@ config: signing_key_name: supplier2 default_signing_key_names: [supplier2] - suppliers: - - service_id: anvil - listen_url: http://0.0.0.0:8545 - service_config: - backend_url: http://anvil:8547/ - publicly_exposed_endpoints: - - relayminer2 - - service_id: ollama - listen_url: http://0.0.0.0:8545 - service_config: - backend_url: http://ollama:11434/ - publicly_exposed_endpoints: - - relayminer2 + suppliers: [] # suppliers list is dynamically defined in poktroll/Tiltfile. diff --git a/localnet/kubernetes/values-relayminer-3.yaml b/localnet/kubernetes/values-relayminer-3.yaml index 624aaa1bf..e6b21ad0a 100644 --- a/localnet/kubernetes/values-relayminer-3.yaml +++ b/localnet/kubernetes/values-relayminer-3.yaml @@ -4,16 +4,4 @@ config: signing_key_name: supplier3 default_signing_key_names: [supplier3] - suppliers: - - service_id: anvil - listen_url: http://0.0.0.0:8545 - service_config: - backend_url: http://anvil:8547/ - publicly_exposed_endpoints: - - relayminer3 - - service_id: ollama - listen_url: http://0.0.0.0:8545 - service_config: - backend_url: http://ollama:11434/ - publicly_exposed_endpoints: - - relayminer3 + suppliers: [] # suppliers list is dynamically defined in poktroll/Tiltfile. diff --git a/localnet/kubernetes/values-relayminer-common.yaml b/localnet/kubernetes/values-relayminer-common.yaml index 207c636b2..f4fbe4b5b 100644 --- a/localnet/kubernetes/values-relayminer-common.yaml +++ b/localnet/kubernetes/values-relayminer-common.yaml @@ -11,3 +11,6 @@ config: pprof: enabled: true addr: localhost:6060 + ping: + enabled: true + addr: localhost:8081 diff --git a/localnet/poktrolld/config/relayminer_config.yaml b/localnet/poktrolld/config/relayminer_config.yaml index 1b9895122..71d71cf94 100644 --- a/localnet/poktrolld/config/relayminer_config.yaml +++ b/localnet/poktrolld/config/relayminer_config.yaml @@ -17,3 +17,6 @@ suppliers: pprof: enabled: false addr: localhost:6060 +ping: + enabled: false + addr: localhost:8082 diff --git a/localnet/poktrolld/config/relayminer_config_full_example.yaml b/localnet/poktrolld/config/relayminer_config_full_example.yaml index 7196024fd..5cf0c4f10 100644 --- a/localnet/poktrolld/config/relayminer_config_full_example.yaml +++ b/localnet/poktrolld/config/relayminer_config_full_example.yaml @@ -18,6 +18,12 @@ pprof: enabled: false addr: localhost:6060 +# Ping server configuration to test the connectivity of every +# suppliers.[].service_config.backend_url +ping: + enabled: false + addr: localhost:8081 + pocket_node: # Pocket node URL exposing the CometBFT JSON-RPC API. # Used by the Cosmos client SDK, event subscriptions, etc. diff --git a/localnet/poktrolld/config/relayminer_config_localnet_vscode.yaml b/localnet/poktrolld/config/relayminer_config_localnet_vscode.yaml index de20c4d13..ee56b0c1c 100644 --- a/localnet/poktrolld/config/relayminer_config_localnet_vscode.yaml +++ b/localnet/poktrolld/config/relayminer_config_localnet_vscode.yaml @@ -47,3 +47,6 @@ suppliers: pprof: enabled: false addr: localhost:6070 +ping: + enabled: false + addr: localhost:8081 diff --git a/pkg/relayer/cmd/cmd.go b/pkg/relayer/cmd/cmd.go index 574f405b4..b73ce1984 100644 --- a/pkg/relayer/cmd/cmd.go +++ b/pkg/relayer/cmd/cmd.go @@ -139,6 +139,12 @@ func runRelayer(cmd *cobra.Command, _ []string) error { } } + if relayMinerConfig.Ping.Enabled { + if err := relayMiner.ServePing(ctx, "tcp", relayMinerConfig.Ping.Addr); err != nil { + return fmt.Errorf("failed to start ping endpoint: %w", err) + } + } + // Start the relay miner logger.Info().Msg("Starting relay miner...") if err := relayMiner.Start(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) { diff --git a/pkg/relayer/config/relayminer_configs_reader.go b/pkg/relayer/config/relayminer_configs_reader.go index 1cc708908..319303fe1 100644 --- a/pkg/relayer/config/relayminer_configs_reader.go +++ b/pkg/relayer/config/relayminer_configs_reader.go @@ -42,6 +42,11 @@ func ParseRelayMinerConfigs(configContent []byte) (*RelayMinerConfig, error) { Addr: yamlRelayMinerConfig.Pprof.Addr, } + relayMinerConfig.Ping = &RelayMinerPingConfig{ + Enabled: yamlRelayMinerConfig.Ping.Enabled, + Addr: yamlRelayMinerConfig.Ping.Addr, + } + // Hydrate the pocket node urls if err := relayMinerConfig.HydratePocketNodeUrls(&yamlRelayMinerConfig.PocketNode); err != nil { return nil, err diff --git a/pkg/relayer/config/types.go b/pkg/relayer/config/types.go index 8c6ece4ab..b81ba8b5f 100644 --- a/pkg/relayer/config/types.go +++ b/pkg/relayer/config/types.go @@ -24,6 +24,14 @@ type YAMLRelayMinerConfig struct { Pprof YAMLRelayMinerPprofConfig `yaml:"pprof"` SmtStorePath string `yaml:"smt_store_path"` Suppliers []YAMLRelayMinerSupplierConfig `yaml:"suppliers"` + Ping YAMLRelayMinerPingConfig `yaml:"ping"` +} + +// YAMLRelayMinerPingConfig represents the configuration to expose a ping server. +type YAMLRelayMinerPingConfig struct { + Enabled bool `yaml:"enabled"` + // Addr is the address to bind to (format: 'hostname:port') where 'hostname' can be a DNS name or an IP + Addr string `yaml:"addr"` } // YAMLRelayMinerPocketNodeConfig is the structure used to unmarshal the pocket @@ -83,6 +91,15 @@ type RelayMinerConfig struct { Pprof *RelayMinerPprofConfig Servers map[string]*RelayMinerServerConfig SmtStorePath string + Ping *RelayMinerPingConfig +} + +// RelayMinerPingConfig is the structure resulting from parsing the ping +// server configuration. +type RelayMinerPingConfig struct { + Enabled bool + // Addr is the address to bind to (format: hostname:port) where 'hostname' can be a DNS name or an IP + Addr string } // RelayMinerPocketNodeConfig is the structure resulting from parsing the pocket diff --git a/pkg/relayer/interface.go b/pkg/relayer/interface.go index ee9c8e484..3b8656db6 100644 --- a/pkg/relayer/interface.go +++ b/pkg/relayer/interface.go @@ -73,6 +73,9 @@ type RelayerProxy interface { // TODO_TECHDEBT(@red-0ne): This method should be moved out of the RelayerProxy interface // that should not be responsible for signing relay responses. SignRelayResponse(relayResponse *servicetypes.RelayResponse, supplierOperatorAddr string) error + + // PingAll tests the connectivity between all the managed relay servers and their respective backend URLs. + PingAll(ctx context.Context) error } type RelayerProxyOption func(RelayerProxy) @@ -84,8 +87,14 @@ type RelayServer interface { // Stop terminates the service server and returns an error if it fails. Stop(ctx context.Context) error + + // Ping tests the connection between the relay server and its backend URL. + Ping(ctx context.Context) error } +// RelayServers aggregates a slice of RelayServer interface. +type RelayServers []RelayServer + // RelayerSessionsManager is responsible for managing the relayer's session lifecycles. // It handles the creation and retrieval of SMSTs (trees) for a given session, as // well as the respective and subsequent claim creation and proof submission. diff --git a/pkg/relayer/proxy/errors.go b/pkg/relayer/proxy/errors.go index ff2fc285f..f940c3f13 100644 --- a/pkg/relayer/proxy/errors.go +++ b/pkg/relayer/proxy/errors.go @@ -20,4 +20,5 @@ var ( ErrRelayerProxyUnknownSession = sdkerrors.Register(codespace, 12, "relayer proxy encountered unknown session") ErrRelayerProxyRateLimited = sdkerrors.Register(codespace, 13, "offchain rate limit hit by relayer proxy") ErrRelayerProxyUnclaimRelayPrice = sdkerrors.Register(codespace, 14, "failed to unclaim relay price") + ErrRelayerProxySupplierNotReachable = sdkerrors.Register(codespace, 15, "supplier(s) not reachable") ) diff --git a/pkg/relayer/proxy/proxy.go b/pkg/relayer/proxy/proxy.go index 9a76eb953..8a6ad09cc 100644 --- a/pkg/relayer/proxy/proxy.go +++ b/pkg/relayer/proxy/proxy.go @@ -2,6 +2,7 @@ package proxy import ( "context" + "errors" "cosmossdk.io/depinject" "github.com/cosmos/cosmos-sdk/crypto/keyring" @@ -166,6 +167,13 @@ func (rp *relayerProxy) Start(ctx context.Context) error { for _, relayServer := range rp.servers { server := relayServer // create a new variable scoped to the anonymous function + + // Ensure that each backing data node responds to a ping request + // (at least) before continuing operation. + if err := server.Ping(ctx); err != nil { + return err + } + startGroup.Go(func() error { return server.Start(ctx) }) } @@ -206,3 +214,22 @@ func (rp *relayerProxy) validateConfig() error { return nil } + +// PingAll tests the connectivity between all the managed relay servers and their respective backend URLs. +func (rp *relayerProxy) PingAll(ctx context.Context) error { + var err error + + for _, srv := range rp.servers { + if e := srv.Ping(ctx); e != nil { + err = errors.Join(err, e) + } + } + + if err != nil { + rp.logger.Error().Err(err). + Msg("an unexpected error occured while pinging backend URL(s)") + return err + } + + return nil +} diff --git a/pkg/relayer/proxy/proxy_test.go b/pkg/relayer/proxy/proxy_test.go index b9cb0b642..5e77db425 100644 --- a/pkg/relayer/proxy/proxy_test.go +++ b/pkg/relayer/proxy/proxy_test.go @@ -3,8 +3,10 @@ package proxy_test import ( "bytes" "context" + "errors" "fmt" "io" + "net" "net/http" "net/url" "strings" @@ -12,8 +14,10 @@ import ( "time" "github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1" + "github.com/foxcpp/go-mockdns" sdktypes "github.com/pokt-network/shannon-sdk/types" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" "github.com/pokt-network/poktroll/pkg/relayer/config" "github.com/pokt-network/poktroll/pkg/relayer/proxy" @@ -151,6 +155,9 @@ func TestRelayerProxy_StartAndStop(t *testing.T) { // Block so relayerProxy has sufficient time to start time.Sleep(100 * time.Millisecond) + err = rp.PingAll(ctx) + require.NoError(t, err) + // Test that RelayerProxy is handling requests (ignoring the actual response content) res, err := http.DefaultClient.Get(fmt.Sprintf("http://%s/", servicesConfigMap[defaultRelayMinerServer].ListenAddress)) require.NoError(t, err) @@ -551,6 +558,489 @@ func TestRelayerProxy_Relays(t *testing.T) { } } +// RelayProxyPingAllSuite implements the suite to test the relay proxy ping +// application logic. +type RelayProxyPingAllSuite struct { + suite.Suite + relayerProxyBehavior []func(*testproxy.TestBehavior) + servicesConfigMap map[string]*config.RelayMinerServerConfig + supplierEndpoints map[string][]*sharedtypes.SupplierEndpoint +} + +// TestRelayProxyPingAllSuite executes the RelayProxyPingAllSuite test suite. +func TestRelayProxyPingAllSuite(t *testing.T) { + suite.Run(t, new(RelayProxyPingAllSuite)) +} + +// SetupSuite setups a single default relayminer with one supplier. The +// default relayminer will be reused in every subsequent tests in the +// suite. +func (t *RelayProxyPingAllSuite) SetupSuite() { + appPrivateKey := secp256k1.GenPrivKey() + defaultRelayMinerServerAddress := "127.0.0.1:8245" + supplierEndpoints := map[string][]*sharedtypes.SupplierEndpoint{ + defaultService: { + { + Url: "http://supplier1pingall:8645", + RpcType: sharedtypes.RPCType_JSON_RPC, + }, + }, + } + + t.servicesConfigMap = map[string]*config.RelayMinerServerConfig{ + defaultRelayMinerServerAddress: { + ServerType: config.RelayMinerServerTypeHTTP, + ListenAddress: defaultRelayMinerServerAddress, + SupplierConfigsMap: map[string]*config.RelayMinerSupplierConfig{ + defaultService: { + ServiceId: defaultService, + ServerType: config.RelayMinerServerTypeHTTP, + ServiceConfig: &config.RelayMinerSupplierServiceConfig{ + BackendUrl: &url.URL{ + Scheme: "http", + Host: "127.0.0.1:8645", + Path: "/", + }, + }, + PubliclyExposedEndpoints: []string{ + "supplier1pingall", + }, + }, + }, + }, + } + + t.relayerProxyBehavior = []func(*testproxy.TestBehavior){ + testproxy.WithRelayerProxyDependenciesForBlockHeight(supplierOperatorKeyName, 1), + testproxy.WithServicesConfigMap(t.servicesConfigMap), + testproxy.WithDefaultSupplier(supplierOperatorKeyName, supplierEndpoints), + testproxy.WithDefaultApplication(appPrivateKey), + testproxy.WithDefaultSessionSupplier(supplierOperatorKeyName, defaultService, appPrivateKey), + testproxy.WithRelayMeter(), + } +} + +// TestOKPingAllWithSingleRelayServer reuses the default relayminer with one +// supplier to test the relayproxy.PingAll method. +func (t *RelayProxyPingAllSuite) TestOKPingAllWithSingleRelayServer() { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + testBehavoirs := testproxy.NewRelayerProxyTestBehavior(ctx, t.T(), t.relayerProxyBehavior...) + + rp, err := proxy.NewRelayerProxy( + testBehavoirs.Deps, + proxy.WithSigningKeyNames([]string{supplierOperatorKeyName}), + proxy.WithServicesConfigMap(t.servicesConfigMap), + ) + require.NoError(t.T(), err) + + go func() { + err := rp.Start(ctx) + if !errors.Is(err, http.ErrServerClosed) { + require.NoError(t.T(), err) + } + }() + + // waiting for relayer proxy to start + // and perform ping request. + time.Sleep(100 * time.Millisecond) + + err = rp.PingAll(ctx) + require.NoError(t.T(), err) + + err = rp.Stop(ctx) + require.NoError(t.T(), err) +} + +// TestOKPingAllWithMultipleRelayServers reuses default relayminer and +// instantiates an additional one to test the connectivity. +func (t *RelayProxyPingAllSuite) TestOKPingAllWithMultipleRelayServers() { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + firstRelayMinerAddr := "127.0.0.1:8246" + firstServiceName := "firstService" + secondRelayMinerAddr := "127.0.0.1:8247" + secondServiceName := "secondService" + + newSupplierOperatorKeyName := "newSupplierKeyName" + appPrivateKey := secp256k1.GenPrivKey() + + // adding supplier endpoint. + supplierEndpoints := map[string][]*sharedtypes.SupplierEndpoint{ + firstServiceName: []*sharedtypes.SupplierEndpoint{ + { + Url: "http://firstservice:8646", + RpcType: sharedtypes.RPCType_JSON_RPC, + }, + }, + secondServiceName: []*sharedtypes.SupplierEndpoint{ + { + Url: "http://secondservice:8647", + RpcType: sharedtypes.RPCType_JSON_RPC, + }, + }, + } + + servicesConfigMap := map[string]*config.RelayMinerServerConfig{ + firstRelayMinerAddr: &config.RelayMinerServerConfig{ + ServerType: config.RelayMinerServerTypeHTTP, + ListenAddress: firstRelayMinerAddr, + SupplierConfigsMap: map[string]*config.RelayMinerSupplierConfig{ + firstServiceName: { + ServiceId: firstServiceName, + ServerType: config.RelayMinerServerTypeHTTP, + ServiceConfig: &config.RelayMinerSupplierServiceConfig{ + BackendUrl: &url.URL{ + Scheme: "http", + Host: "127.0.0.1:8646", + Path: "/", + }, + }, + PubliclyExposedEndpoints: []string{ + "firstservice", + }, + }, + }, + }, + secondRelayMinerAddr: &config.RelayMinerServerConfig{ + ServerType: config.RelayMinerServerTypeHTTP, + ListenAddress: secondRelayMinerAddr, + SupplierConfigsMap: map[string]*config.RelayMinerSupplierConfig{ + secondServiceName: { + ServiceId: secondServiceName, + ServerType: config.RelayMinerServerTypeHTTP, + ServiceConfig: &config.RelayMinerSupplierServiceConfig{ + BackendUrl: &url.URL{ + Scheme: "http", + Host: "127.0.0.1:8647", + Path: "/", + }, + }, + PubliclyExposedEndpoints: []string{ + "secondservice", + }, + }, + }, + }, + } + + relayerProxyBehavior := []func(*testproxy.TestBehavior){ + testproxy.WithRelayerProxyDependenciesForBlockHeight(newSupplierOperatorKeyName, 1), + testproxy.WithDefaultSupplier(newSupplierOperatorKeyName, supplierEndpoints), + testproxy.WithServicesConfigMap(servicesConfigMap), + testproxy.WithDefaultApplication(appPrivateKey), + testproxy.WithDefaultSessionSupplier(newSupplierOperatorKeyName, defaultService, appPrivateKey), + testproxy.WithRelayMeter(), + } + + testBehavoirs := testproxy.NewRelayerProxyTestBehavior(ctx, t.T(), relayerProxyBehavior...) + + rp, err := proxy.NewRelayerProxy( + testBehavoirs.Deps, + proxy.WithSigningKeyNames([]string{newSupplierOperatorKeyName}), + proxy.WithServicesConfigMap(servicesConfigMap), + ) + require.NoError(t.T(), err) + + go func() { + err := rp.Start(ctx) + if !errors.Is(err, http.ErrServerClosed) { + require.NoError(t.T(), err) + } + }() + + // waiting for relayer proxy to start + // and perform ping request. + time.Sleep(100 * time.Millisecond) + + err = rp.PingAll(ctx) + require.NoError(t.T(), err) + + err = rp.Stop(ctx) + require.NoError(t.T(), err) +} + +// TestNOKPingAllWithPartialFailureAtStartup test the connectivity for multiple +// relayminer with a partial sucess/failure at startup. +func (t *RelayProxyPingAllSuite) TestNOKPingAllWithPartialFailureAtStartup() { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + failingRelayMinerAddr := "127.0.0.1:8247" + failingServiceName := "failingservice" + + supplierEndpoints := map[string][]*sharedtypes.SupplierEndpoint{ + failingServiceName: { + { + Url: "http://failingservice:8647", + RpcType: sharedtypes.RPCType_JSON_RPC, + }, + }, + } + + servicesConfigMap := map[string]*config.RelayMinerServerConfig{ + failingRelayMinerAddr: &config.RelayMinerServerConfig{ + ListenAddress: failingRelayMinerAddr, + ServerType: config.RelayMinerServerTypeHTTP, + SupplierConfigsMap: map[string]*config.RelayMinerSupplierConfig{ + failingServiceName: &config.RelayMinerSupplierConfig{ + ServerType: config.RelayMinerServerTypeHTTP, + ServiceId: failingServiceName, + ServiceConfig: &config.RelayMinerSupplierServiceConfig{ + BackendUrl: &url.URL{ + Scheme: "http", + Host: "127.0.0.1:8647", + Path: "/", + }, + }, + PubliclyExposedEndpoints: []string{ + "failingservice", + }, + }, + }, + }, + } + + relayProxyBehavior := append(t.relayerProxyBehavior, []func(*testproxy.TestBehavior){ + testproxy.WithDefaultSupplier(supplierOperatorKeyName, supplierEndpoints), + testproxy.WithServicesConfigMap(servicesConfigMap), + }...) + + test := testproxy.NewRelayerProxyTestBehavior(ctx, t.T(), relayProxyBehavior...) + + // copying the default relayminer in the test service config + // map for the relay proxy. + for k, v := range t.servicesConfigMap { + servicesConfigMap[k] = v + } + + rp, err := proxy.NewRelayerProxy( + test.Deps, + proxy.WithSigningKeyNames([]string{supplierOperatorKeyName}), + proxy.WithServicesConfigMap(servicesConfigMap), + ) + require.NoError(t.T(), err) + + // we are explicitly shutting down a supplier to simulate a + // failure while testing the connectivity at startup. + err = test.ShutdownServiceID(failingServiceName) + require.NoError(t.T(), err) + + // testing connectivity at startup + err = rp.Start(ctx) + require.Error(t.T(), err) + require.True(t.T(), strings.Contains(err.Error(), "connection refused")) + + err = rp.Stop(ctx) + require.NoError(t.T(), err) +} + +// TestNOKPingAllWithPartialFailureAfterStartup test the connectivity for multiple +// relayminer with a partial sucess/failure at runtime. +func (t *RelayProxyPingAllSuite) TestNOKPingAllWithPartialFailureAfterStartup() { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + failingRelayMinerAddr := "127.0.0.1:8248" + failingServiceName := "faillingservice" + + supplierEndpoints := map[string][]*sharedtypes.SupplierEndpoint{ + failingServiceName: { + { + Url: "http://failingservice:8648", + RpcType: sharedtypes.RPCType_JSON_RPC, + }, + }, + } + + servicesConfigMap := map[string]*config.RelayMinerServerConfig{ + failingRelayMinerAddr: &config.RelayMinerServerConfig{ + ListenAddress: failingRelayMinerAddr, + ServerType: config.RelayMinerServerTypeHTTP, + SupplierConfigsMap: map[string]*config.RelayMinerSupplierConfig{ + failingServiceName: &config.RelayMinerSupplierConfig{ + ServerType: config.RelayMinerServerTypeHTTP, + ServiceId: failingServiceName, + ServiceConfig: &config.RelayMinerSupplierServiceConfig{ + BackendUrl: &url.URL{ + Scheme: "http", + Host: "127.0.0.1:8648", + Path: "/", + }, + }, + PubliclyExposedEndpoints: []string{ + "failingservice", + }, + }, + }, + }, + } + + relayProxyBehavior := append(t.relayerProxyBehavior, []func(*testproxy.TestBehavior){ + testproxy.WithDefaultSupplier(supplierOperatorKeyName, supplierEndpoints), + testproxy.WithServicesConfigMap(servicesConfigMap), + }...) + + test := testproxy.NewRelayerProxyTestBehavior(ctx, t.T(), relayProxyBehavior...) + + // copying the default relayminer in the test service config + // map for the relay proxy. + for k, v := range t.servicesConfigMap { + servicesConfigMap[k] = v + } + + rp, err := proxy.NewRelayerProxy( + test.Deps, + proxy.WithSigningKeyNames([]string{supplierOperatorKeyName}), + proxy.WithServicesConfigMap(servicesConfigMap), + ) + require.NoError(t.T(), err) + + go func() { + err := rp.Start(ctx) + if !errors.Is(err, http.ErrServerClosed) { + require.NoError(t.T(), err) + } + }() + + // waiting for relayer proxy to start + // and perform ping request. + time.Sleep(100 * time.Millisecond) + + // we are explicitly shutting down a supplier to simulate an error + // while testing the connectivity. + err = test.ShutdownServiceID(failingServiceName) + require.NoError(t.T(), err) + + err = rp.PingAll(ctx) + require.Error(t.T(), err) + require.True(t.T(), strings.Contains(err.Error(), "connection refused")) + + err = rp.Stop(ctx) + require.NoError(t.T(), err) +} + +// TestOKPingAllDifferentEndpoint test the connectivity with different type of +// endpoints (ipv6, domain name). +func (t *RelayProxyPingAllSuite) TestOKPingAllDifferentEndpoint() { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + srv, err := mockdns.NewServer(map[string]mockdns.Zone{ + "exampleservice.org.": { + A: []string{"127.0.0.1"}, + }, + }, false) + require.NoError(t.T(), err) + defer srv.Close() + + srv.PatchNet(net.DefaultResolver) + defer mockdns.UnpatchNet(net.DefaultResolver) + + relayminerDomainNameAddr := "exampleservice.org:8249" + domainNameServiceName := "exampleservice.org" + + relayminerIPV6ServiceAddr := "localhost:8250" + IPV6ServiceName := "ipv6service" + + supplierEndpoints := map[string][]*sharedtypes.SupplierEndpoint{ + domainNameServiceName: { + { + Url: "http://exampleservice.org:8649", + RpcType: sharedtypes.RPCType_JSON_RPC, + }, + }, + IPV6ServiceName: { + { + Url: "http://ipv6service:8650", + RpcType: sharedtypes.RPCType_JSON_RPC, + }, + }, + } + + servicesConfigMap := map[string]*config.RelayMinerServerConfig{ + relayminerDomainNameAddr: &config.RelayMinerServerConfig{ + ListenAddress: relayminerDomainNameAddr, + ServerType: config.RelayMinerServerTypeHTTP, + SupplierConfigsMap: map[string]*config.RelayMinerSupplierConfig{ + domainNameServiceName: &config.RelayMinerSupplierConfig{ + ServerType: config.RelayMinerServerTypeHTTP, + ServiceId: domainNameServiceName, + ServiceConfig: &config.RelayMinerSupplierServiceConfig{ + BackendUrl: &url.URL{ + Scheme: "http", + Host: "exampleservice.org:8649", + Path: "/", + }, + }, + PubliclyExposedEndpoints: []string{ + "exampleservice.org", + }, + }, + }, + }, + relayminerIPV6ServiceAddr: &config.RelayMinerServerConfig{ + ListenAddress: relayminerIPV6ServiceAddr, + ServerType: config.RelayMinerServerTypeHTTP, + SupplierConfigsMap: map[string]*config.RelayMinerSupplierConfig{ + IPV6ServiceName: &config.RelayMinerSupplierConfig{ + ServerType: config.RelayMinerServerTypeHTTP, + ServiceId: IPV6ServiceName, + ServiceConfig: &config.RelayMinerSupplierServiceConfig{ + BackendUrl: &url.URL{ + Scheme: "http", + Host: "[::1]:8650", + Path: "/", + }, + }, + PubliclyExposedEndpoints: []string{ + "ipv6service", + }, + }, + }, + }, + } + + relayProxyBehavior := append(t.relayerProxyBehavior, []func(*testproxy.TestBehavior){ + testproxy.WithDefaultSupplier(supplierOperatorKeyName, supplierEndpoints), + testproxy.WithServicesConfigMap(servicesConfigMap), + }...) + + test := testproxy.NewRelayerProxyTestBehavior(ctx, t.T(), relayProxyBehavior...) + + // copying the default relayminer in the test service config + // map for the relay proxy. + for k, v := range t.servicesConfigMap { + servicesConfigMap[k] = v + } + + rp, err := proxy.NewRelayerProxy( + test.Deps, + proxy.WithSigningKeyNames([]string{supplierOperatorKeyName}), + proxy.WithServicesConfigMap(servicesConfigMap), + ) + require.NoError(t.T(), err) + + go func() { + err := rp.Start(ctx) + if !errors.Is(err, http.ErrServerClosed) { + require.NoError(t.T(), err) + } + }() + + // waiting for relayer proxy to start + // and perform ping request. + time.Sleep(100 * time.Millisecond) + + err = rp.PingAll(ctx) + require.NoError(t.T(), err) + + err = rp.Stop(ctx) + require.NoError(t.T(), err) +} + func sendRequestWithUnparsableBody( t *testing.T, test *testproxy.TestBehavior, diff --git a/pkg/relayer/proxy/synchronous.go b/pkg/relayer/proxy/synchronous.go index a94af25dc..2b1cf652d 100644 --- a/pkg/relayer/proxy/synchronous.go +++ b/pkg/relayer/proxy/synchronous.go @@ -97,6 +97,29 @@ func (sync *synchronousRPCServer) Stop(ctx context.Context) error { return sync.server.Shutdown(ctx) } +// Ping tries to dial the suppliers backend URLs to test the connection. +func (sync *synchronousRPCServer) Ping(ctx context.Context) error { + for _, supplierCfg := range sync.serverConfig.SupplierConfigsMap { + httpClient := &http.Client{Timeout: 2 * time.Second} + endpointURL := supplierCfg.ServiceConfig.BackendUrl.String() + + resp, err := httpClient.Head(endpointURL) + if err != nil { + return err + } + _ = resp.Body.Close() + + // DEV_NOTE: Return ANY HTTP error. + if resp.StatusCode >= http.StatusBadRequest { + return ErrRelayerProxySupplierNotReachable.Wrapf( + "endpoint URL %q; status code: %d", + supplierCfg.ServiceConfig.BackendUrl.String(), resp.StatusCode) + } + } + + return nil +} + // ServeHTTP listens for incoming relay requests. It implements the respective // method of the http.Handler interface. It is called by http.ListenAndServe() // when synchronousRPCServer is used as an http.Handler with an http.Server. diff --git a/pkg/relayer/relayminer.go b/pkg/relayer/relayminer.go index 505d5dbd5..d43c371ef 100644 --- a/pkg/relayer/relayminer.go +++ b/pkg/relayer/relayminer.go @@ -2,9 +2,11 @@ package relayer import ( "context" + "errors" "net" "net/http" "net/http/pprof" + "net/url" "cosmossdk.io/depinject" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -134,3 +136,48 @@ func (rel *relayMiner) ServePprof(ctx context.Context, addr string) error { return nil } + +// ServePing exposes ping HTTP server to check the reachability between the +// relay miner and its dependencies (Ex: relay server and their respective +// backend URLs). +func (rel *relayMiner) ServePing(ctx context.Context, network, addr string) error { + ln, err := net.Listen(network, addr) + if err != nil { + return err + } + + // Start a long-lived goroutine that starts an HTTP server responding to + // ping requests. A single ping request on the relay server broadcasts a + // ping to all backing services/data nodes. + go func() { + if err := http.Serve(ln, rel.newPinghandlerFn(ctx)); err != nil && !errors.Is(http.ErrServerClosed, err) { + rel.logger.Error().Err(err).Msg("ping server unexpectedly closed") + } + }() + + go func() { + <-ctx.Done() + rel.logger.Info().Str("endpoint", addr).Msg("stopping ping server") + _ = ln.Close() + }() + + return nil +} + +func (rel *relayMiner) newPinghandlerFn(ctx context.Context) http.HandlerFunc { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + rel.logger.Debug().Msg("pinging relay servers...") + + if err := rel.relayerProxy.PingAll(ctx); err != nil { + var urlError *url.Error + if errors.As(err, &urlError) && urlError.Temporary() { + w.WriteHeader(http.StatusGatewayTimeout) + } else { + w.WriteHeader(http.StatusBadGateway) + } + return + } + + w.WriteHeader(http.StatusNoContent) + }) +} diff --git a/pkg/relayer/relayminer_test.go b/pkg/relayer/relayminer_test.go index f7de39d39..7946d5b98 100644 --- a/pkg/relayer/relayminer_test.go +++ b/pkg/relayer/relayminer_test.go @@ -2,15 +2,24 @@ package relayer_test import ( "context" + "errors" + "net" + "net/http" + "net/url" + "path/filepath" "testing" "time" "cosmossdk.io/depinject" + "github.com/cometbft/cometbft/libs/os" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" "github.com/pokt-network/poktroll/pkg/observable/channel" "github.com/pokt-network/poktroll/pkg/polylog/polyzero" "github.com/pokt-network/poktroll/pkg/relayer" + "github.com/pokt-network/poktroll/testutil/mockrelayer" "github.com/pokt-network/poktroll/testutil/testrelayer" servicetypes "github.com/pokt-network/poktroll/x/service/types" ) @@ -57,3 +66,217 @@ func TestRelayMiner_StartAndStop(t *testing.T) { err = relayminer.Stop(ctx) require.NoError(t, err) } + +type RelayMinerPingSuite struct { + suite.Suite + + servedRelaysObs relayer.RelaysObservable + minedRelaysObs relayer.MinedRelaysObservable + + relayerProxyMock *mockrelayer.MockRelayerProxy + minerMock *mockrelayer.MockMiner + relayerSessionsManagerMock *mockrelayer.MockRelayerSessionsManager +} + +func TestRelayMinerPingSuite(t *testing.T) { + suite.Run(t, new(RelayMinerPingSuite)) +} + +func (t *RelayMinerPingSuite) SetupTest() { + // servedRelaysObs is NEVER published to. It exists to satisfy test mocks. + srObs, _ := channel.NewObservable[*servicetypes.Relay]() + t.servedRelaysObs = relayer.RelaysObservable(srObs) + + // minedRelaysObs is NEVER published to. It exists to satisfy test mocks. + mrObs, _ := channel.NewObservable[*relayer.MinedRelay]() + t.minedRelaysObs = relayer.MinedRelaysObservable(mrObs) +} + +func (t *RelayMinerPingSuite) TestOKPingAll() { + ctx := polyzero.NewLogger().WithContext(context.Background()) + relayerProxyMock := testrelayer.NewMockOneTimeRelayerProxyWithPing( + ctx, t.T(), + t.servedRelaysObs, + ) + + minerMock := testrelayer.NewMockOneTimeMiner( + ctx, t.T(), + t.servedRelaysObs, + t.minedRelaysObs, + ) + + relayerSessionsManagerMock := testrelayer.NewMockOneTimeRelayerSessionsManager( + ctx, t.T(), + t.minedRelaysObs, + ) + + deps := depinject.Supply( + relayerProxyMock, + minerMock, + relayerSessionsManagerMock, + ) + + relayminer, err := relayer.NewRelayMiner(ctx, deps) + require.NoError(t.T(), err) + require.NotNil(t.T(), relayminer) + + err = relayminer.Start(ctx) + require.NoError(t.T(), err) + + time.Sleep(time.Millisecond) + + relayminerSocketPath := filepath.Join(t.T().TempDir(), "1d031ace") + + relayminer.ServePing(ctx, "unix", relayminerSocketPath) + + time.Sleep(time.Millisecond) + + transport := &http.Transport{ + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + return net.Dial("unix", relayminerSocketPath) + }, + } + + // Override transport configuration to adapt the http client to the unix socket listener. + httpClient := http.Client{Transport: transport} + require.NoError(t.T(), err) + + resp, err := httpClient.Get("http://unix") + require.NoError(t.T(), err) + + require.Equal(t.T(), http.StatusNoContent, resp.StatusCode) + + err = relayminer.Stop(ctx) + require.NoError(t.T(), err) +} + +func (t *RelayMinerPingSuite) TestNOKPingAllWithTemporaryError() { + ctx := polyzero.NewLogger().WithContext(context.Background()) + relayerProxyMock := testrelayer.NewMockOneTimeRelayerProxy(ctx, t.T(), t.servedRelaysObs) + + urlErr := url.Error{ + Op: http.MethodGet, + URL: "http://unix", + Err: &net.DNSError{ + Err: "fake temporary and timeout error", + Name: "example.com", + Server: "8.8.8.8", + IsTemporary: true, + IsTimeout: true, + }, + } + + relayerProxyMock.EXPECT().PingAll(gomock.Eq(ctx)). + Times(1).Return(&urlErr) + + minerMock := testrelayer.NewMockOneTimeMiner( + ctx, t.T(), + t.servedRelaysObs, + t.minedRelaysObs, + ) + + relayerSessionsManagerMock := testrelayer.NewMockOneTimeRelayerSessionsManager( + ctx, t.T(), + t.minedRelaysObs, + ) + + deps := depinject.Supply( + relayerProxyMock, + minerMock, + relayerSessionsManagerMock, + ) + + relayminer, err := relayer.NewRelayMiner(ctx, deps) + require.NoError(t.T(), err) + require.NotNil(t.T(), relayminer) + + err = relayminer.Start(ctx) + require.NoError(t.T(), err) + + time.Sleep(time.Millisecond) + + relayminerSocketPath := filepath.Join(t.T().TempDir(), "5478a402") + + err = relayminer.ServePing(ctx, "unix", relayminerSocketPath) + require.NoError(t.T(), err) + + require.True(t.T(), os.FileExists(relayminerSocketPath)) + + time.Sleep(time.Millisecond) + + transport := &http.Transport{ + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + return net.Dial("unix", relayminerSocketPath) + }, + } + + // Override transport configuration to adapt the http client to the unix socket listener. + httpClient := http.Client{Transport: transport} + require.NoError(t.T(), err) + + resp, err := httpClient.Get("http://unix") + require.NoError(t.T(), err) + + require.Equal(t.T(), http.StatusGatewayTimeout, resp.StatusCode) + + err = relayminer.Stop(ctx) + require.NoError(t.T(), err) +} + +func (t *RelayMinerPingSuite) NOKPingWithoutTemporaryError() { + ctx := polyzero.NewLogger().WithContext(context.Background()) + relayerProxyMock := testrelayer.NewMockOneTimeRelayerProxy(ctx, t.T(), t.servedRelaysObs) + + relayerProxyMock.EXPECT().PingAll(gomock.Eq(ctx)). + Times(1).Return(errors.New("fake")) + + minerMock := testrelayer.NewMockOneTimeMiner( + ctx, t.T(), + t.servedRelaysObs, + t.minedRelaysObs, + ) + + relayerSessionsManagerMock := testrelayer.NewMockOneTimeRelayerSessionsManager( + ctx, t.T(), + t.minedRelaysObs, + ) + + deps := depinject.Supply( + relayerProxyMock, + minerMock, + relayerSessionsManagerMock, + ) + + relayminer, err := relayer.NewRelayMiner(ctx, deps) + require.NoError(t.T(), err) + require.NotNil(t.T(), relayminer) + + err = relayminer.Start(ctx) + require.NoError(t.T(), err) + + time.Sleep(time.Millisecond) + + relayminerSocketPath := filepath.Join(t.T().TempDir(), "aae252f8") + + relayminer.ServePing(ctx, "unix", relayminerSocketPath) + + time.Sleep(time.Millisecond) + + transport := &http.Transport{ + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + return net.Dial("unix", relayminerSocketPath) + }, + } + + // Override transport configuration to adapt the http client to the unix socket listener. + httpClient := http.Client{Transport: transport} + require.NoError(t.T(), err) + + resp, err := httpClient.Get("http://unix") + require.NoError(t.T(), err) + + require.Equal(t.T(), http.StatusBadGateway, resp.StatusCode) + + err = relayminer.Stop(ctx) + require.NoError(t.T(), err) +} diff --git a/testutil/testproxy/relayerproxy.go b/testutil/testproxy/relayerproxy.go index e3d0981a6..92003d491 100644 --- a/testutil/testproxy/relayerproxy.go +++ b/testutil/testproxy/relayerproxy.go @@ -8,10 +8,12 @@ import ( "errors" "fmt" "io" + "net" "net/http" "net/url" "os" "testing" + "time" "cosmossdk.io/depinject" ring_secp256k1 "github.com/athanorlabs/go-dleq/secp256k1" @@ -20,14 +22,17 @@ import ( "github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1" cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" "github.com/cosmos/cosmos-sdk/types/bech32" + "github.com/golang/mock/gomock" "github.com/pokt-network/ring-go" sdktypes "github.com/pokt-network/shannon-sdk/types" "github.com/stretchr/testify/require" "github.com/pokt-network/poktroll/pkg/observable/channel" "github.com/pokt-network/poktroll/pkg/polylog" + "github.com/pokt-network/poktroll/pkg/relayer" "github.com/pokt-network/poktroll/pkg/relayer/config" "github.com/pokt-network/poktroll/pkg/signer" + "github.com/pokt-network/poktroll/testutil/mockrelayer" testsession "github.com/pokt-network/poktroll/testutil/session" "github.com/pokt-network/poktroll/testutil/testclient/testblock" "github.com/pokt-network/poktroll/testutil/testclient/testdelegation" @@ -97,6 +102,19 @@ func NewRelayerProxyTestBehavior( return test } +// ShutdownServiceID gracefully shuts down the http server for a given service id. +func (t *TestBehavior) ShutdownServiceID(serviceID string) error { + srv, ok := t.proxyServersMap[serviceID] + if !ok { + return fmt.Errorf("shutdown service id: not found") + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + return srv.Shutdown(ctx) +} + // WithRelayerProxyDependenciesForBlockHeight creates the dependencies for the relayer proxy // from the TestBehavior.mocks so they have the right interface and can be // used by the dependency injection framework. @@ -139,10 +157,21 @@ func WithRelayerProxyDependenciesForBlockHeight( } } +// WithRelayMeter creates the dependencies mocks for the relayproxy to use a relay meter. +func WithRelayMeter() func(*TestBehavior) { + return func(test *TestBehavior) { + relayMeter := newMockRelayMeter(test.t) + test.Deps = depinject.Configs(test.Deps, depinject.Supply(relayMeter)) + } +} + // WithServicesConfigMap creates the services that the relayer proxy will -// proxy requests to. -// It creates an HTTP server for each service and starts listening on the -// provided host. +// proxy requests to. It creates an HTTP server for each service and starts +// listening on the provided host. +// +// It is recommended to run this function on the main Go routine to ensure that +// the HTTP servers created for each service are fully initialized and ready to +// receive requests before executing the test cases. func WithServicesConfigMap( servicesConfigMap map[string]*config.RelayMinerServerConfig, ) func(*TestBehavior) { @@ -158,13 +187,17 @@ $ go test -v -count=1 -run TestRelayerProxy ./pkg/relayer/...`) } for _, serviceConfig := range servicesConfigMap { for serviceId, supplierConfig := range serviceConfig.SupplierConfigsMap { - server := &http.Server{Addr: supplierConfig.ServiceConfig.BackendUrl.Host} - server.Handler = http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - sendJSONRPCResponse(test.t, w) - }) + listener, err := net.Listen("tcp", supplierConfig.ServiceConfig.BackendUrl.Host) + require.NoError(test.t, err) + + server := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + sendJSONRPCResponse(test.t, w) + }), + } go func() { - err := server.ListenAndServe() + err := server.Serve(listener) if err != nil && !errors.Is(err, http.ErrServerClosed) { require.NoError(test.t, err) } @@ -277,6 +310,15 @@ func WithSuccessiveSessions( } } +func newMockRelayMeter(t *testing.T) relayer.RelayMeter { + ctrl := gomock.NewController(t) + + relayMeter := mockrelayer.NewMockRelayMeter(ctrl) + relayMeter.EXPECT().Start(gomock.Any()).Return(nil).AnyTimes() + + return relayMeter +} + // MarshalAndSend marshals the request and sends it to the provided service. func MarshalAndSend( test *TestBehavior, diff --git a/testutil/testrelayer/proxy.go b/testutil/testrelayer/proxy.go index a876ebec3..86168984d 100644 --- a/testutil/testrelayer/proxy.go +++ b/testutil/testrelayer/proxy.go @@ -33,5 +33,23 @@ func NewMockOneTimeRelayerProxy( ServedRelays(). Return(returnedRelaysObs). Times(1) + + return relayerProxyMock +} + +// NewMockOneTimeRelayerProxyWithPing creates a new mock RelayerProxy. This mock +// RelayerProxy will expect a call to ServedRelays with the given context, and +// when that call is made, returnedRelaysObs is returned. It also expects a call +// to Start, Ping, and Stop with the given context. +func NewMockOneTimeRelayerProxyWithPing( + ctx context.Context, + t *testing.T, + returnedRelaysObs relayer.RelaysObservable, +) *mockrelayer.MockRelayerProxy { + relayerProxyMock := NewMockOneTimeRelayerProxy(ctx, t, returnedRelaysObs) + relayerProxyMock.EXPECT(). + PingAll(gomock.Eq(ctx)). + Times(1) + return relayerProxyMock }