Skip to content

Commit 4ae4a6d

Browse files
committed
feat: server implementation
1 parent af2d42d commit 4ae4a6d

24 files changed

Lines changed: 610 additions & 809 deletions

File tree

client/retrieval/client.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package retrieval
2+
3+
import (
4+
"context"
5+
"crypto/sha256"
6+
"errors"
7+
"fmt"
8+
"hash"
9+
"net/http"
10+
"net/url"
11+
12+
"github.com/storacha/go-ucanto/client"
13+
"github.com/storacha/go-ucanto/core/invocation"
14+
"github.com/storacha/go-ucanto/core/message"
15+
"github.com/storacha/go-ucanto/transport"
16+
"github.com/storacha/go-ucanto/transport/headercar"
17+
thttp "github.com/storacha/go-ucanto/transport/http"
18+
"github.com/storacha/go-ucanto/ucan"
19+
)
20+
21+
// NewConnection creates a new connection to a retrieval server that uses the
22+
// headercar transport.
23+
func NewConnection(id ucan.Principal, endpoint string) (*Connection, error) {
24+
hasher := sha256.New
25+
url, err := url.Parse(endpoint)
26+
if err != nil {
27+
return nil, err
28+
}
29+
channel := thttp.NewChannel(
30+
url,
31+
thttp.WithMethod("GET"),
32+
thttp.WithSuccessStatusCode(
33+
http.StatusOK,
34+
http.StatusPartialContent,
35+
http.StatusNoContent,
36+
http.StatusNotExtended,
37+
),
38+
)
39+
codec := headercar.NewOutboundCodec()
40+
return &Connection{id, channel, codec, hasher}, nil
41+
}
42+
43+
type Connection struct {
44+
id ucan.Principal
45+
channel transport.Channel
46+
codec transport.OutboundCodec
47+
hasher func() hash.Hash
48+
}
49+
50+
var _ client.Connection = (*Connection)(nil)
51+
52+
func (c *Connection) ID() ucan.Principal {
53+
return c.id
54+
}
55+
56+
func (c *Connection) Codec() transport.OutboundCodec {
57+
return c.codec
58+
}
59+
60+
func (c *Connection) Channel() transport.Channel {
61+
return c.channel
62+
}
63+
64+
func (c *Connection) Hasher() hash.Hash {
65+
return c.hasher()
66+
}
67+
68+
func Execute(ctx context.Context, inv invocation.Invocation, conn client.Connection) (client.ExecutionResponse, transport.HTTPResponse, error) {
69+
input, err := message.Build([]invocation.Invocation{inv}, nil)
70+
if err != nil {
71+
return nil, nil, fmt.Errorf("building message: %w", err)
72+
}
73+
74+
var response transport.HTTPResponse
75+
for {
76+
req, err := conn.Codec().Encode(input)
77+
if err != nil {
78+
return nil, nil, fmt.Errorf("encoding message: %w", err)
79+
}
80+
81+
// TODO: split request if too large
82+
83+
res, err := conn.Channel().Request(ctx, req)
84+
if err != nil {
85+
return nil, nil, fmt.Errorf("sending message: %w", err)
86+
}
87+
88+
if res.Status() == http.StatusNotExtended {
89+
return nil, nil, errors.New("not implemented")
90+
}
91+
92+
response = res
93+
break
94+
}
95+
96+
output, err := conn.Codec().Decode(response)
97+
if err != nil {
98+
return nil, nil, fmt.Errorf("decoding message: %w", err)
99+
}
100+
101+
return client.ExecutionResponse(output), response, nil
102+
}

retrieval/client/connection.go

Lines changed: 0 additions & 70 deletions
This file was deleted.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package datamodel
2+
3+
import (
4+
// for go:embed
5+
_ "embed"
6+
"fmt"
7+
8+
"github.com/ipld/go-ipld-prime"
9+
"github.com/ipld/go-ipld-prime/schema"
10+
)
11+
12+
//go:embed errors.ipldsch
13+
var errorsch []byte
14+
15+
var (
16+
errorTypeSystem *schema.TypeSystem
17+
)
18+
19+
func init() {
20+
ts, err := ipld.LoadSchemaBytes(errorsch)
21+
if err != nil {
22+
panic(fmt.Errorf("failed to load IPLD schema: %w", err))
23+
}
24+
errorTypeSystem = ts
25+
}
26+
27+
func Schema() []byte {
28+
return errorsch
29+
}
30+
31+
func MissingProofsType() schema.Type {
32+
return errorTypeSystem.TypeByName("MissingProofs")
33+
}
34+
35+
type MissingProofsModel struct {
36+
Name string
37+
Message string
38+
Proofs []ipld.Link
39+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
type MissingProofs struct {
2+
name String
3+
message String
4+
proofs [Link]
5+
}

server/retrieval/error.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package retrieval
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
7+
"github.com/storacha/go-ucanto/core/ipld"
8+
"github.com/storacha/go-ucanto/core/result/failure"
9+
rdm "github.com/storacha/go-ucanto/server/retrieval/datamodel"
10+
)
11+
12+
type AgentMessageInvocationError struct{}
13+
14+
func (amie AgentMessageInvocationError) Error() string {
15+
return "Agent Message is required to have a single invocation."
16+
}
17+
18+
func (amie AgentMessageInvocationError) Name() string {
19+
return "AgentMessageInvocationError"
20+
}
21+
22+
func (amie AgentMessageInvocationError) ToIPLD() (ipld.Node, error) {
23+
return failure.FromError(amie).ToIPLD()
24+
}
25+
26+
func NewAgentMessageInvocationError() AgentMessageInvocationError {
27+
return AgentMessageInvocationError{}
28+
}
29+
30+
type MissingProofs struct {
31+
proofs []ipld.Link
32+
}
33+
34+
func (mpe MissingProofs) Error() string {
35+
var links []string
36+
for _, p := range mpe.proofs {
37+
links = append(links, p.String())
38+
}
39+
return fmt.Sprintf("Missing proofs: %s", strings.Join(links, ", "))
40+
}
41+
42+
func (mpe MissingProofs) Name() string {
43+
return "MissingProofs"
44+
}
45+
46+
func (mpe MissingProofs) Proofs() []ipld.Link {
47+
return mpe.proofs
48+
}
49+
50+
func (mpe MissingProofs) ToIPLD() (ipld.Node, error) {
51+
mdl := rdm.MissingProofsModel{
52+
Name: mpe.Name(),
53+
Message: mpe.Error(),
54+
Proofs: mpe.Proofs(),
55+
}
56+
return ipld.WrapWithRecovery(&mdl, rdm.MissingProofsType())
57+
}
58+
59+
func NewMissingProofsError(proofs []ipld.Link) MissingProofs {
60+
return MissingProofs{proofs}
61+
}

server/retrieval/handler.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,27 +6,38 @@ import (
66
"github.com/storacha/go-ucanto/core/invocation"
77
"github.com/storacha/go-ucanto/core/ipld"
88
"github.com/storacha/go-ucanto/core/receipt/fx"
9+
"github.com/storacha/go-ucanto/core/result"
10+
"github.com/storacha/go-ucanto/core/result/failure"
911
"github.com/storacha/go-ucanto/server"
1012
"github.com/storacha/go-ucanto/server/transaction"
1113
"github.com/storacha/go-ucanto/ucan"
1214
"github.com/storacha/go-ucanto/validator"
1315
)
1416

1517
// HandlerFunc is an invocation handler function. It is different to
16-
// [server.HandlerFunc] in that it allows an [RetrievalResponse] to be returned,
18+
// [server.HandlerFunc] in that it allows an [Response] to be returned,
1719
// which for a retrieval server will determine the HTTP headers and body content
1820
// of the HTTP response. The usual handler response (out and effects) are added
1921
// to the X-Agent-Message HTTP header.
20-
type HandlerFunc[C any, O ipld.Builder] func(ctx context.Context, capability ucan.Capability[C], invocation invocation.Invocation, context server.InvocationContext) (O, fx.Effects, *RetrievalResponse, error)
22+
type HandlerFunc[C any, O ipld.Builder, X failure.IPLDBuilderFailure] func(
23+
ctx context.Context,
24+
capability ucan.Capability[C],
25+
invocation invocation.Invocation,
26+
context server.InvocationContext,
27+
request Request,
28+
) (result result.Result[O, X], fx fx.Effects, resp Response, err error)
2129

2230
// Provide is used to define given capability provider. It decorates the passed
2331
// handler and takes care of UCAN validation. It only calls the handler
2432
// when validation succeeds.
25-
func Provide[C any, O ipld.Builder](capability validator.CapabilityParser[C], handler HandlerFunc[C, O]) ServiceMethod[O] {
26-
return func(ctx context.Context, inv invocation.Invocation, ictx server.InvocationContext) (transaction.Transaction[O, ipld.Builder], *RetrievalResponse, error) {
27-
var response *RetrievalResponse
28-
method := server.Provide(capability, func(ctx context.Context, capability ucan.Capability[C], inv invocation.Invocation, ictx server.InvocationContext) (O, fx.Effects, error) {
29-
out, fx, res, err := handler(ctx, capability, inv, ictx)
33+
func Provide[C any, O ipld.Builder, X failure.IPLDBuilderFailure](
34+
capability validator.CapabilityParser[C],
35+
handler HandlerFunc[C, O, X],
36+
) ServiceMethod[O, failure.IPLDBuilderFailure] {
37+
return func(ctx context.Context, inv invocation.Invocation, ictx server.InvocationContext, req Request) (transaction.Transaction[O, failure.IPLDBuilderFailure], Response, error) {
38+
var response Response
39+
method := server.Provide(capability, func(ctx context.Context, capability ucan.Capability[C], inv invocation.Invocation, ictx server.InvocationContext) (result.Result[O, X], fx.Effects, error) {
40+
out, fx, res, err := handler(ctx, capability, inv, ictx, req)
3041
response = res
3142
return out, fx, err
3243
})

server/retrieval/options.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,15 @@ import (
77
"github.com/storacha/go-ucanto/core/invocation"
88
"github.com/storacha/go-ucanto/core/ipld"
99
"github.com/storacha/go-ucanto/core/result"
10+
"github.com/storacha/go-ucanto/core/result/failure"
1011
"github.com/storacha/go-ucanto/server"
1112
"github.com/storacha/go-ucanto/server/transaction"
1213
"github.com/storacha/go-ucanto/ucan"
1314
"github.com/storacha/go-ucanto/validator"
1415
)
1516

1617
// Option is an option configuring a ucanto retrieval server. It does not
17-
// include a transport codec option as it must be [headercar]. Service methods
18-
// also have a different signature to allow them to return response body data,
19-
// HTTP status codes and HTTP headers.
18+
// include a transport codec option as it must be [headercar].
2019
type Option func(cfg *srvConfig) error
2120

2221
type srvConfig struct {
@@ -32,15 +31,19 @@ type srvConfig struct {
3231
delegationCache delegation.Store
3332
}
3433

35-
func WithServiceMethod[O ipld.Builder](can string, handleFunc ServiceMethod[O]) Option {
34+
func WithServiceMethod[O ipld.Builder, X failure.IPLDBuilderFailure](can string, handler ServiceMethod[O, X]) Option {
3635
return func(cfg *srvConfig) error {
37-
cfg.service[can] = func(ctx context.Context, input invocation.Invocation, ictx server.InvocationContext, request *RetrievalRequest) (transaction.Transaction[ipld.Builder, ipld.Builder], *RetrievalResponse, error) {
38-
tx, res, err := handleFunc(ctx, input, ictx, request)
36+
cfg.service[can] = func(ctx context.Context, input invocation.Invocation, invCtx server.InvocationContext, req Request) (transaction.Transaction[ipld.Builder, failure.IPLDBuilderFailure], Response, error) {
37+
tx, resp, err := handler(ctx, input, invCtx, req)
3938
if err != nil {
40-
return nil, nil, err
39+
return nil, resp, err
4140
}
42-
out := result.MapOk(tx.Out(), func(o O) ipld.Builder { return o })
43-
return transaction.NewTransaction(out, transaction.WithEffects(tx.Fx())), res, nil
41+
out := result.MapResultR0(
42+
tx.Out(),
43+
func(o O) ipld.Builder { return o },
44+
func(x X) failure.IPLDBuilderFailure { return x },
45+
)
46+
return transaction.NewTransaction(out, transaction.WithEffects(tx.Fx())), resp, nil
4447
}
4548
return nil
4649
}

0 commit comments

Comments
 (0)