Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client): client package for indexer #31

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/multiformats/go-multihash v0.2.3
github.com/multiformats/go-varint v0.0.7
github.com/redis/go-redis/v9 v9.6.1
github.com/storacha/go-ucanto v0.1.1-0.20241022025657-f12c0d06a4ea
github.com/storacha/go-ucanto v0.1.1-0.20241025161953-704b565bf837
github.com/storacha/ipni-publisher v0.0.0-20241018055706-032286a2dc3f
github.com/stretchr/testify v1.9.0
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -587,8 +587,8 @@ github.com/storacha/go-capabilities v0.0.0-20241022031210-04ae6a36f715 h1:cRhUpc
github.com/storacha/go-capabilities v0.0.0-20241022031210-04ae6a36f715/go.mod h1:1x2qMcvOUvF6prSiWXsEeEfV3OcX/L2fk6iNgPMNfZU=
github.com/storacha/go-metadata v0.0.0-20241021141939-f94d93dcda78 h1:NAti9hMLMo8F0Iyz5ldS41CY6MyukRH0OrTPV4u2340=
github.com/storacha/go-metadata v0.0.0-20241021141939-f94d93dcda78/go.mod h1:DcwwQnyFuTk531cKD9sUkQg/gzpwKaIqH9I7oZYyeRc=
github.com/storacha/go-ucanto v0.1.1-0.20241022025657-f12c0d06a4ea h1:v+OX7eIiHz+3F107dqjmGYOEEcFGfYIPxHHHLPN6auc=
github.com/storacha/go-ucanto v0.1.1-0.20241022025657-f12c0d06a4ea/go.mod h1:Bi7DFuo0nj9/QmkqbLNLWf41xnOoJSFGg21G+UtzWoY=
github.com/storacha/go-ucanto v0.1.1-0.20241025161953-704b565bf837 h1:7cxCkkdW3G8cRF78QBK8arESIUt2kzKkviIgpa+oFQw=
github.com/storacha/go-ucanto v0.1.1-0.20241025161953-704b565bf837/go.mod h1:P7OqkzmO01Bbb/fS6xBDrjFpKAKVyJXXa3LjTeFfYXg=
github.com/storacha/ipni-publisher v0.0.0-20241018055706-032286a2dc3f h1:62fTASO3wRPCWCkl6we2DftsFy/DfbmVpwJyqK7gmUc=
github.com/storacha/ipni-publisher v0.0.0-20241018055706-032286a2dc3f/go.mod h1:fEuGSF5WMaOSAyDQCYAvow6Y+YKzpXczEk3A+H+s1fQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
153 changes: 153 additions & 0 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package client

import (
"context"
"errors"
"fmt"
"io"
gohttp "net/http"
"net/url"

"github.com/multiformats/go-multibase"
"github.com/storacha/go-capabilities/pkg/assert"
"github.com/storacha/go-capabilities/pkg/claim"
"github.com/storacha/go-ucanto/client"
"github.com/storacha/go-ucanto/core/delegation"
"github.com/storacha/go-ucanto/core/invocation"
"github.com/storacha/go-ucanto/core/receipt"
"github.com/storacha/go-ucanto/core/result"
"github.com/storacha/go-ucanto/core/result/failure"
fdm "github.com/storacha/go-ucanto/core/result/failure/datamodel"
unit "github.com/storacha/go-ucanto/core/result/ok"
udm "github.com/storacha/go-ucanto/core/result/ok/datamodel"
"github.com/storacha/go-ucanto/principal"
"github.com/storacha/go-ucanto/transport/http"
"github.com/storacha/go-ucanto/ucan"
"github.com/storacha/indexing-service/pkg/service/queryresult"
"github.com/storacha/indexing-service/pkg/types"
)

const claimsPath = "/claims"

var ErrNoReceiptFound = errors.New("missing receipt link")

type ErrFailedResponse struct {
StatusCode int
Body string
}

func errFromResponse(res *gohttp.Response) ErrFailedResponse {
err := ErrFailedResponse{StatusCode: res.StatusCode}

message, merr := io.ReadAll(res.Body)
if merr != nil {
err.Body = merr.Error()
} else {
err.Body = string(message)
}
return err
}

func (e ErrFailedResponse) Error() string {
return fmt.Sprintf("http request failed, status: %d %s, message: %s", e.StatusCode, gohttp.StatusText(e.StatusCode), e.Body)
}

type Client struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need a constructor

servicePrincipal ucan.Principal
serviceURL url.URL
connection client.Connection
}

func (c *Client) execute(inv invocation.Invocation) error {
resp, err := client.Execute([]invocation.Invocation{inv}, c.connection)
if err != nil {
return fmt.Errorf("sending invocation: %w", err)
}
rcptlnk, ok := resp.Get(inv.Link())
if !ok {
return ErrNoReceiptFound
}

reader, err := receipt.NewReceiptReaderFromTypes[unit.Unit, fdm.FailureModel](udm.UnitType(), fdm.FailureType())
if err != nil {
return fmt.Errorf("generating receipt reader: %w", err)
}

rcpt, err := reader.Read(rcptlnk, resp.Blocks())
if err != nil {
return fmt.Errorf("reading receipt: %w", err)
}

_, err = result.Unwrap(result.MapError(rcpt.Out(), failure.FromFailureModel))
return err
}

func (c *Client) PublishIndexClaim(ctx context.Context, issuer principal.Signer, caveats assert.IndexCaveats, options ...delegation.Option) error {
inv, err := assert.Index.Invoke(issuer, c.servicePrincipal, c.servicePrincipal.DID().String(), caveats, options...)
if err != nil {
return fmt.Errorf("generating invocation: %w", err)
}
return c.execute(inv)
}

func (c *Client) PublishEqualsClaim(ctx context.Context, issuer principal.Signer, caveats assert.EqualsCaveats, options ...delegation.Option) error {
inv, err := assert.Equals.Invoke(issuer, c.servicePrincipal, c.servicePrincipal.DID().String(), caveats, options...)
if err != nil {
return fmt.Errorf("generating invocation: %w", err)
}
return c.execute(inv)
}

func (c *Client) CacheClaim(ctx context.Context, issuer principal.Signer, cacheClaim delegation.Delegation, provider claim.Provider, options ...delegation.Option) error {
inv, err := claim.Cache.Invoke(issuer, c.servicePrincipal, c.servicePrincipal.DID().String(), claim.CacheCaveats{
Claim: cacheClaim.Link(),
Provider: provider,
}, options...)
if err != nil {
return fmt.Errorf("generating invocation: %w", err)
}

for blk, err := range cacheClaim.Blocks() {
if err != nil {
return fmt.Errorf("reading claim blocks: %w", err)
}
if err := inv.Attach(blk); err != nil {
return fmt.Errorf("attaching claim block: %w", err)
}
}

return c.execute(inv)
}

func (c *Client) QueryClaims(ctx context.Context, query types.Query) (types.QueryResult, error) {
url := c.serviceURL.JoinPath(claimsPath)
q := url.Query()
for _, mh := range query.Hashes {
mhString, err := multibase.Encode(multibase.Base64pad, mh)
if err != nil {
return nil, fmt.Errorf("encoding query multihash")
}
q.Add("multihash", mhString)
}
for _, space := range query.Match.Subject {
q.Add("spaces", space.String())
}
url.RawQuery = q.Encode()
res, err := gohttp.DefaultClient.Get(url.String())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably allow this to be configured...

if err != nil {
return nil, fmt.Errorf("sending query to server: %w", err)
}
if res.StatusCode < 200 || res.StatusCode > 299 {
return nil, errFromResponse(res)
}
return queryresult.Extract(res.Body)
}

func New(servicePrincipal ucan.Principal, serviceURL url.URL) (*Client, error) {
channel := http.NewHTTPChannel(serviceURL.JoinPath(claimsPath))
conn, err := client.NewConnection(servicePrincipal, channel)
if err != nil {
return nil, fmt.Errorf("creating connection: %w", err)
}
return &Client{servicePrincipal, serviceURL, conn}, nil
}
28 changes: 28 additions & 0 deletions pkg/service/contentclaims/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
import (
"context"
"fmt"
"net/url"
"testing"

"github.com/ipld/go-ipld-prime/datamodel"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/storacha/go-capabilities/pkg/assert"
"github.com/storacha/go-capabilities/pkg/claim"
"github.com/storacha/go-ucanto/client"
"github.com/storacha/go-ucanto/core/delegation"
"github.com/storacha/go-ucanto/core/invocation"
Expand Down Expand Up @@ -35,6 +38,30 @@
conn, err := client.NewConnection(testutil.Service, server)
require.NoError(t, err)

locationCommitment := testutil.Must(assert.Location.Delegate(testutil.Alice,
testutil.Alice,
testutil.Alice.DID().String(),
assert.LocationCaveats{
Content: assert.FromHash(testutil.RandomMultihash()),
Location: []url.URL{*testutil.Must(url.Parse("https://www.yahoo.com"))(t)},
Space: testutil.Bob.DID(),
}))(t)

cacheInvocation := testutil.Must(claim.Cache.Invoke(testutil.Service,
testutil.Service,
testutil.Service.DID().String(), claim.CacheCaveats{
Claim: locationCommitment.Link(),
Provider: claim.Provider{
Addresses: []multiaddr.Multiaddr{testutil.RandomMultiaddr()},
},
}))(t)
for b, err := range locationCommitment.Blocks() {
if err != nil {
t.Log(fmt.Sprintf("iterating claim blocks: %s", err))

Check failure on line 60 in pkg/service/contentclaims/server_test.go

View workflow job for this annotation

GitHub Actions / go-check / All

should use t.Logf(...) instead of t.Log(fmt.Sprintf(...)) (S1038)
t.FailNow()
}
require.NoError(t, cacheInvocation.Attach(b))
}
invs := []invocation.Invocation{
testutil.Must(assert.Equals.Invoke(
testutil.Service,
Expand All @@ -54,6 +81,7 @@
Index: testutil.RandomCID(),
},
))(t),
cacheInvocation,
}

for _, inv := range invs {
Expand Down
6 changes: 5 additions & 1 deletion pkg/service/contentclaims/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package contentclaims

import (
"context"
"fmt"

logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/crypto"
Expand Down Expand Up @@ -66,7 +67,10 @@ func NewService(indexer types.Service) map[ucan.Ability]server.ServiceMethod[ok.
return ok.Unit{}, nil, NewMissingClaimError()
}

claim := delegation.NewDelegation(rootbl, bs)
claim, err := delegation.NewDelegation(rootbl, bs)
if err != nil {
return ok.Unit{}, nil, fmt.Errorf("generating delegation: %w", err)
}
err = indexer.CacheClaim(context.TODO(), provider, claim)
if err != nil {
log.Errorf("caching claim: %w", err)
Expand Down
32 changes: 32 additions & 0 deletions pkg/service/queryresult/queryresult.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package queryresult

import (
"fmt"
"io"
"iter"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime/datamodel"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
multihash "github.com/multiformats/go-multihash/core"
"github.com/storacha/go-ucanto/core/car"
"github.com/storacha/go-ucanto/core/dag/blockstore"
"github.com/storacha/go-ucanto/core/delegation"
"github.com/storacha/go-ucanto/core/ipld"
Expand Down Expand Up @@ -51,6 +53,36 @@ func (q *queryResult) Root() block.Block {
return q.root
}

func Extract(r io.Reader) (types.QueryResult, error) {
roots, blocks, err := car.Decode(r)
if err != nil {
return nil, fmt.Errorf("extracting car: %w", err)
}

if len(roots) != 1 {
return nil, types.ErrWrongRootCount
}

blks, err := blockstore.NewBlockReader(blockstore.WithBlocksIterator(blocks))
if err != nil {
return nil, fmt.Errorf("reading blocks from car: %w", err)
}
root, has, err := blks.Get(roots[0])
if err != nil {
return nil, fmt.Errorf("reading root block: %w", err)
}
if !has {
return nil, types.ErrNoRootBlock
}

var queryResultModel qdm.QueryResultModel
err = block.Decode(root, &queryResultModel, qdm.QueryResultType(), cbor.Codec, sha256.Hasher)
if err != nil {
return nil, fmt.Errorf("decoding query result: %w", err)
}
return &queryResult{root, queryResultModel.Result0_1, blks}, nil
}

// Build generates a new encodable QueryResult
func Build(claims map[cid.Cid]delegation.Delegation, indexes bytemap.ByteMap[types.EncodedContextID, blobindex.ShardedDagIndexView]) (types.QueryResult, error) {
bs, err := blockstore.NewBlockStore()
Expand Down
7 changes: 7 additions & 0 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ func (c ContextID) ToEncoded() (EncodedContextID, error) {
// ErrKeyNotFound means the key did not exist in the cache
var ErrKeyNotFound = errors.New("cache key not found")

// ErrWrongRootCount indicates a car file with multiple roots being unable to interpret
// as a query result
var ErrWrongRootCount = errors.New("query result should have exactly one root")

// ErrNoRootBlock indicates a root that is specified but not found in a CAR file
var ErrNoRootBlock = errors.New("query root block not found in car")

// Cache describes a generic cache interface
type Cache[Key, Value any] interface {
Set(ctx context.Context, key Key, value Value, expires bool) error
Expand Down
Loading