Skip to content

Commit

Permalink
feat(client): client package for indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahhoward committed Oct 25, 2024
1 parent 12c4721 commit e86d6ae
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 4 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/multiformats/go-multihash v0.2.3
github.com/multiformats/go-varint v0.0.7
github.com/redis/go-redis/v9 v9.6.1
github.com/storacha/go-ucanto v0.1.1-0.20241022025657-f12c0d06a4ea
github.com/storacha/go-ucanto v0.1.1-0.20241025161953-704b565bf837
github.com/storacha/ipni-publisher v0.0.0-20241018055706-032286a2dc3f
github.com/stretchr/testify v1.9.0
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -587,8 +587,8 @@ github.com/storacha/go-capabilities v0.0.0-20241022031210-04ae6a36f715 h1:cRhUpc
github.com/storacha/go-capabilities v0.0.0-20241022031210-04ae6a36f715/go.mod h1:1x2qMcvOUvF6prSiWXsEeEfV3OcX/L2fk6iNgPMNfZU=
github.com/storacha/go-metadata v0.0.0-20241021141939-f94d93dcda78 h1:NAti9hMLMo8F0Iyz5ldS41CY6MyukRH0OrTPV4u2340=
github.com/storacha/go-metadata v0.0.0-20241021141939-f94d93dcda78/go.mod h1:DcwwQnyFuTk531cKD9sUkQg/gzpwKaIqH9I7oZYyeRc=
github.com/storacha/go-ucanto v0.1.1-0.20241022025657-f12c0d06a4ea h1:v+OX7eIiHz+3F107dqjmGYOEEcFGfYIPxHHHLPN6auc=
github.com/storacha/go-ucanto v0.1.1-0.20241022025657-f12c0d06a4ea/go.mod h1:Bi7DFuo0nj9/QmkqbLNLWf41xnOoJSFGg21G+UtzWoY=
github.com/storacha/go-ucanto v0.1.1-0.20241025161953-704b565bf837 h1:7cxCkkdW3G8cRF78QBK8arESIUt2kzKkviIgpa+oFQw=
github.com/storacha/go-ucanto v0.1.1-0.20241025161953-704b565bf837/go.mod h1:P7OqkzmO01Bbb/fS6xBDrjFpKAKVyJXXa3LjTeFfYXg=
github.com/storacha/ipni-publisher v0.0.0-20241018055706-032286a2dc3f h1:62fTASO3wRPCWCkl6we2DftsFy/DfbmVpwJyqK7gmUc=
github.com/storacha/ipni-publisher v0.0.0-20241018055706-032286a2dc3f/go.mod h1:fEuGSF5WMaOSAyDQCYAvow6Y+YKzpXczEk3A+H+s1fQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
162 changes: 162 additions & 0 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package client

import (
"context"
"errors"
"fmt"
"io"
gohttp "net/http"
"net/url"

"github.com/multiformats/go-multibase"
"github.com/storacha/go-capabilities/pkg/assert"
"github.com/storacha/go-capabilities/pkg/claim"
"github.com/storacha/go-ucanto/client"
"github.com/storacha/go-ucanto/core/invocation"
"github.com/storacha/go-ucanto/core/receipt"
"github.com/storacha/go-ucanto/core/result"
"github.com/storacha/go-ucanto/core/result/failure"
fdm "github.com/storacha/go-ucanto/core/result/failure/datamodel"
unit "github.com/storacha/go-ucanto/core/result/ok"
udm "github.com/storacha/go-ucanto/core/result/ok/datamodel"
"github.com/storacha/go-ucanto/principal"
"github.com/storacha/go-ucanto/transport/http"
"github.com/storacha/go-ucanto/ucan"
"github.com/storacha/indexing-service/pkg/service/queryresult"
"github.com/storacha/indexing-service/pkg/types"
)

const claimsPath = "/claims"

var ErrNoReceiptFound = errors.New("missing receipt link")

type ErrFailedResponse struct {
StatusCode int
Body string
}

func errFromResponse(res *http.Response) ErrFailedResponse {

Check failure on line 38 in pkg/client/client.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go this)

undefined: http.Response

Check failure on line 38 in pkg/client/client.go

View workflow job for this annotation

GitHub Actions / go-check / All

undefined: http.Response

Check failure on line 38 in pkg/client/client.go

View workflow job for this annotation

GitHub Actions / go-test / windows (go this)

undefined: http.Response

Check failure on line 38 in pkg/client/client.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go this)

undefined: http.Response
err := ErrFailedResponse{StatusCode: res.StatusCode}

message, merr := io.ReadAll(res.Body)
if merr != nil {
err.Body = merr.Error()
} else {
err.Body = string(message)
}
return err
}

func (e ErrFailedResponse) Error() string {
return fmt.Sprintf("http request failed, status: %d %s, message: %s", e.StatusCode, http.StatusText(e.StatusCode), e.Body)

Check failure on line 51 in pkg/client/client.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go this)

undefined: http.StatusText

Check failure on line 51 in pkg/client/client.go

View workflow job for this annotation

GitHub Actions / go-check / All

undefined: http.StatusText (compile)

Check failure on line 51 in pkg/client/client.go

View workflow job for this annotation

GitHub Actions / go-test / windows (go this)

undefined: http.StatusText

Check failure on line 51 in pkg/client/client.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go this)

undefined: http.StatusText
}

type Client struct {
servicePrincipal ucan.Principal
serviceURL string
}

func (c *Client) connect() (client.Connection, error) {
url, err := url.Parse(c.serviceURL)
if err != nil {
return nil, fmt.Errorf("parsing service URL: %w", err)
}
return client.NewConnection(c.servicePrincipal, http.NewHTTPChannel(url.JoinPath(claimsPath)))
}

func (c *Client) execute(inv invocation.Invocation) error {
connection, err := c.connect()
if err != nil {
return fmt.Errorf("establishing client connection: %w", err)
}

resp, err := client.Execute([]invocation.Invocation{inv}, connection)
if err != nil {
return fmt.Errorf("sending invocation: %w", err)
}
rcptlnk, ok := resp.Get(inv.Link())
if !ok {
return ErrNoReceiptFound
}

reader, err := receipt.NewReceiptReaderFromTypes[unit.Unit, fdm.FailureModel](udm.UnitType(), fdm.FailureType())
if err != nil {
return fmt.Errorf("generating receipt reader: %w")
}

rcpt, err := reader.Read(rcptlnk, resp.Blocks())
if err != nil {
return fmt.Errorf("reading receipt: %w")
}

_, err = result.Unwrap(result.MapError(rcpt.Out(), failure.FromFailureModel))
return err
}

func (c *Client) PublishIndexClaim(ctx context.Context, issuer principal.Signer, caveats assert.IndexCaveats) error {
inv, err := assert.Index.Invoke(issuer, c.servicePrincipal, c.servicePrincipal.DID().String(), caveats)
if err != nil {
return fmt.Errorf("generating invocation")
}
return c.execute(inv)
}

func (c *Client) PublishEqualsClaim(ctx context.Context, issuer principal.Signer, caveats assert.EqualsCaveats) error {
inv, err := assert.Equals.Invoke(issuer, c.servicePrincipal, c.servicePrincipal.DID().String(), caveats)
if err != nil {
return fmt.Errorf("generating invocation")
}
return c.execute(inv)
}

func (c *Client) CacheClaim(ctx context.Context, issuer principal.Signer, provider claim.Provider, caveats assert.LocationCaveats) error {
lc, err := assert.Location.Invoke(issuer, issuer.DID(), caveats.Space.String(), caveats)
if err != nil {
return fmt.Errorf("building location commitment: %w", err)
}
inv, err := claim.Cache.Invoke(issuer, c.servicePrincipal, c.servicePrincipal.DID().String(), claim.CacheCaveats{
Claim: lc.Link(),
Provider: provider,
})
if err != nil {
return fmt.Errorf("generating invocation: %w", err)
}
for blk, err := range lc.Blocks() {
if err != nil {
return fmt.Errorf("reading blocks from location commitment: %w", err)
}
if err := inv.Attach(blk); err != nil {
return fmt.Errorf("attaching location commitment block: %w", err)
}
}

return c.execute(inv)
}

func (c *Client) QueryClaims(ctx context.Context, query types.Query) (types.QueryResult, error) {
url, err := url.Parse(c.serviceURL)
if err != nil {
return nil, fmt.Errorf("parsing service URL: %w", err)
}
url = url.JoinPath(claimsPath)
q := url.Query()
for _, mh := range query.Hashes {
mhString, err := multibase.Encode(multibase.Base64pad, mh)
if err != nil {
return nil, fmt.Errorf("encoding query multihash")
}
q.Add("multihash", mhString)
}
for _, space := range query.Match.Subject {
q.Add("spaces", space.String())
}
url.RawQuery = q.Encode()
res, err := gohttp.DefaultClient.Get(url.String())
if err != nil {
return nil, fmt.Errorf("sending query to server: %w", err)
}
if res.StatusCode < 200 || res.StatusCode > 299 {
return nil, errFromResponse(res)
}
return queryresult.Extract(res.Body)
}
28 changes: 28 additions & 0 deletions pkg/service/contentclaims/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package contentclaims
import (
"context"
"fmt"
"net/url"
"testing"

"github.com/ipld/go-ipld-prime/datamodel"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/storacha/go-capabilities/pkg/assert"
"github.com/storacha/go-capabilities/pkg/claim"
"github.com/storacha/go-ucanto/client"
"github.com/storacha/go-ucanto/core/delegation"
"github.com/storacha/go-ucanto/core/invocation"
Expand Down Expand Up @@ -35,6 +38,30 @@ func TestServer(t *testing.T) {
conn, err := client.NewConnection(testutil.Service, server)
require.NoError(t, err)

locationCommitment := testutil.Must(assert.Location.Delegate(testutil.Alice,
testutil.Alice,
testutil.Alice.DID().String(),
assert.LocationCaveats{
Content: assert.FromHash(testutil.RandomMultihash()),
Location: []url.URL{*testutil.Must(url.Parse("https://www.yahoo.com"))(t)},
Space: testutil.Bob.DID(),
}))(t)

cacheInvocation := testutil.Must(claim.Cache.Invoke(testutil.Service,
testutil.Service,
testutil.Service.DID().String(), claim.CacheCaveats{
Claim: locationCommitment.Link(),
Provider: claim.Provider{
Addresses: []multiaddr.Multiaddr{testutil.RandomMultiaddr()},
},
}))(t)
for b, err := range locationCommitment.Blocks() {
if err != nil {
t.Log(fmt.Sprintf("iterating claim blocks: %s", err))

Check failure on line 60 in pkg/service/contentclaims/server_test.go

View workflow job for this annotation

GitHub Actions / go-check / All

should use t.Logf(...) instead of t.Log(fmt.Sprintf(...)) (S1038)
t.FailNow()
}
require.NoError(t, cacheInvocation.Attach(b))
}
invs := []invocation.Invocation{
testutil.Must(assert.Equals.Invoke(
testutil.Service,
Expand All @@ -54,6 +81,7 @@ func TestServer(t *testing.T) {
Index: testutil.RandomCID(),
},
))(t),
cacheInvocation,
}

for _, inv := range invs {
Expand Down
6 changes: 5 additions & 1 deletion pkg/service/contentclaims/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package contentclaims

import (
"context"
"fmt"

logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/crypto"
Expand Down Expand Up @@ -66,7 +67,10 @@ func NewService(indexer types.Service) map[ucan.Ability]server.ServiceMethod[ok.
return ok.Unit{}, nil, NewMissingClaimError()
}

claim := delegation.NewDelegation(rootbl, bs)
claim, err := delegation.NewDelegation(rootbl, bs)
if err != nil {
return ok.Unit{}, nil, fmt.Errorf("generating delegation: %w", err)
}
err = indexer.CacheClaim(context.TODO(), provider, claim)
if err != nil {
log.Errorf("caching claim: %w", err)
Expand Down
32 changes: 32 additions & 0 deletions pkg/service/queryresult/queryresult.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package queryresult

import (
"fmt"
"io"
"iter"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime/datamodel"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
multihash "github.com/multiformats/go-multihash/core"
"github.com/storacha/go-ucanto/core/car"
"github.com/storacha/go-ucanto/core/dag/blockstore"
"github.com/storacha/go-ucanto/core/delegation"
"github.com/storacha/go-ucanto/core/ipld"
Expand Down Expand Up @@ -51,6 +53,36 @@ func (q *queryResult) Root() block.Block {
return q.root
}

func Extract(r io.Reader) (types.QueryResult, error) {
roots, blocks, err := car.Decode(r)
if err != nil {
return nil, fmt.Errorf("extracting car: %w", err)
}

if len(roots) != 1 {
return nil, types.ErrWrongRootCount
}

blks, err := blockstore.NewBlockReader(blockstore.WithBlocksIterator(blocks))
if err != nil {
return nil, fmt.Errorf("reading blocks from car: %w", err)
}
root, has, err := blks.Get(roots[0])
if err != nil {
return nil, fmt.Errorf("reading root block: %w", err)
}
if !has {
return nil, types.ErrNoRootBlock
}

var queryResultModel qdm.QueryResultModel
err = block.Decode(root, &queryResultModel, qdm.QueryResultType(), cbor.Codec, sha256.Hasher)
if err != nil {
return nil, fmt.Errorf("decoding query result: %w", err)
}
return &queryResult{root, queryResultModel.Result0_1, blks}, nil
}

// Build generates a new encodable QueryResult
func Build(claims map[cid.Cid]delegation.Delegation, indexes bytemap.ByteMap[types.EncodedContextID, blobindex.ShardedDagIndexView]) (types.QueryResult, error) {
bs, err := blockstore.NewBlockStore()
Expand Down
7 changes: 7 additions & 0 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ func (c ContextID) ToEncoded() (EncodedContextID, error) {
// ErrKeyNotFound means the key did not exist in the cache
var ErrKeyNotFound = errors.New("cache key not found")

// ErrWrongRootCount indicates a car file with multiple roots being unable to interpret
// as a query result
var ErrWrongRootCount = errors.New("query result should have exactly one root")

// ErrNoRootBlock indicates a root that is specified but not found in a CAR file
var ErrNoRootBlock = errors.New("query root block not found in car")

// Cache describes a generic cache interface
type Cache[Key, Value any] interface {
Set(ctx context.Context, key Key, value Value, expires bool) error
Expand Down

0 comments on commit e86d6ae

Please sign in to comment.