Skip to content

Commit 9910b43

Browse files
committed
refactor: address feedback
1 parent dc52000 commit 9910b43

8 files changed

Lines changed: 167 additions & 103 deletions

File tree

client/retrieval/connnection.go

Lines changed: 116 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ import (
2929
"github.com/storacha/go-ucanto/ucan"
3030
)
3131

32+
// MaxPartialInvocationReqs is the maximum number of requests we'll send in
33+
// order to get an invocation executed.
34+
const MaxPartialInvocationReqs = 50
35+
3236
// Option is an option configuring a retrieval connection.
3337
type Option func(cfg *config)
3438

@@ -93,19 +97,45 @@ func (c *Connection) Hasher() hash.Hash {
9397
return c.hasher()
9498
}
9599

100+
// Execute performs a UCAN invocation using the headercar transport,
101+
// implementing a "probe and retry" pattern to handle HTTP header size
102+
// limitations when the invocation is too large to fit.
103+
//
104+
// The method first attempts to send the complete invocation (including all
105+
// proofs) in HTTP headers. If this fails due to size constraints (typically 4KB
106+
// header limit), it falls back to a multipart negotiation protocol:
107+
//
108+
// 1. Send invocation with ALL proofs omitted
109+
// 2. Server responds with 510 (Not Extended) listing missing proof CID(s)
110+
// 3. Send partial invocations with each missing proof attached one by one
111+
// 4. Repeat until server has all required proofs (200/206 response)
112+
//
113+
// This approach optimizes for the common case (shallow delegation chains that
114+
// fit in headers) while also handling deep proof chains that require
115+
// multiple round trips. The server caches proofs between requests, so each
116+
// proof only needs to be sent once per session.
117+
//
118+
// Note: The current implementation processes missing proofs sequentially rather
119+
// than in batches, which means deep delegation chains will result in multiple
120+
// HTTP round trips. This trade-off prioritizes implementation simplicity over
121+
// network efficiency, which is acceptable given current delegation chain depths
122+
// but may need optimization as authorization hierarchies grow deeper.
123+
//
124+
// Returns the execution response, the final HTTP response, and any error
125+
// encountered.
96126
func Execute(ctx context.Context, inv invocation.Invocation, conn client.Connection) (client.ExecutionResponse, transport.HTTPResponse, error) {
97127
input, err := message.Build([]invocation.Invocation{inv}, nil)
98128
if err != nil {
99129
return nil, nil, fmt.Errorf("building message: %w", err)
100130
}
101131

102132
var response transport.HTTPResponse
103-
needMultipartRequest := false
133+
multi := false
104134

105135
req, err := conn.Codec().Encode(input)
106136
if err != nil {
107137
if errors.Is(err, hcmsg.ErrHeaderTooLarge) {
108-
needMultipartRequest = true
138+
multi = true
109139
} else {
110140
return nil, nil, fmt.Errorf("encoding message: %w", err)
111141
}
@@ -116,7 +146,7 @@ func Execute(ctx context.Context, inv invocation.Invocation, conn client.Connect
116146
}
117147

118148
if response.Status() == http.StatusRequestHeaderFieldsTooLarge {
119-
needMultipartRequest = true
149+
multi = true
120150
err := response.Body().Close() // we don't need this anymore
121151
if err != nil {
122152
return nil, nil, fmt.Errorf("closing response body: %w", err)
@@ -126,98 +156,106 @@ func Execute(ctx context.Context, inv invocation.Invocation, conn client.Connect
126156

127157
// if the header fields are too big, we need to split the delegation into
128158
// multiple requests...
129-
if needMultipartRequest {
130-
br, err := blockstore.NewBlockReader(blockstore.WithBlocksIterator(inv.Export()))
131-
if err != nil {
132-
return nil, nil, fmt.Errorf("reading invocation blocks: %w", err)
133-
}
134-
part, err := omitProofs(inv)
159+
if multi {
160+
response, err = sendPartialInvocations(ctx, inv, conn)
135161
if err != nil {
136-
return nil, nil, fmt.Errorf("creating invocation %s with omitted proofs: %w", inv.Link().String(), err)
162+
return nil, nil, fmt.Errorf("sending partial invocations: %w", err)
137163
}
164+
}
138165

139-
parts := map[string]delegation.Delegation{}
140-
prfs := inv.Proofs()
141-
for len(prfs) > 0 {
142-
root := prfs[0]
143-
prfs = prfs[1:]
144-
prf, err := delegation.NewDelegationView(root, br)
145-
if err != nil {
146-
return nil, nil, fmt.Errorf("creating delegation: %w", err)
147-
}
148-
prfs = append(prfs, prf.Proofs()...)
149-
// now export without proofs
150-
prf, err = omitProofs(prf)
151-
if err != nil {
152-
return nil, nil, fmt.Errorf("creating delegation %s with omitted proofs: %w", prf.Link().String(), err)
153-
}
154-
parts[prf.Link().String()] = prf
166+
output, err := conn.Codec().Decode(response)
167+
if err != nil {
168+
return nil, nil, fmt.Errorf("decoding message: %w", err)
169+
}
170+
171+
return client.ExecutionResponse(output), response, nil
172+
}
173+
174+
func sendPartialInvocations(ctx context.Context, inv invocation.Invocation, conn client.Connection) (transport.HTTPResponse, error) {
175+
br, err := blockstore.NewBlockReader(blockstore.WithBlocksIterator(inv.Export()))
176+
if err != nil {
177+
return nil, fmt.Errorf("reading invocation blocks: %w", err)
178+
}
179+
part, err := omitProofs(inv)
180+
if err != nil {
181+
return nil, fmt.Errorf("creating invocation %s with omitted proofs: %w", inv.Link().String(), err)
182+
}
183+
184+
parts := map[string]delegation.Delegation{}
185+
prfs := inv.Proofs()
186+
for len(prfs) > 0 {
187+
root := prfs[0]
188+
prfs = prfs[1:]
189+
prf, err := delegation.NewDelegationView(root, br)
190+
if err != nil {
191+
return nil, fmt.Errorf("creating delegation: %w", err)
155192
}
156-
// we already tried this
157-
if len(parts) == 0 {
158-
return nil, nil, errors.New("invocation is too big to send in HTTP headers")
193+
prfs = append(prfs, prf.Proofs()...)
194+
// now export without proofs
195+
prf, err = omitProofs(prf)
196+
if err != nil {
197+
return nil, fmt.Errorf("creating delegation %s with omitted proofs: %w", prf.Link().String(), err)
159198
}
199+
parts[prf.Link().String()] = prf
200+
}
201+
// we already tried this
202+
if len(parts) == 0 {
203+
return nil, errors.New("invocation is too big to send in HTTP headers")
204+
}
160205

161-
// now send the parts
162-
for {
163-
input, err := newPartialInvocationMessage(inv.Link(), part)
164-
if err != nil {
165-
return nil, nil, fmt.Errorf("building message: %w", err)
166-
}
167-
168-
req, err := conn.Codec().Encode(input)
169-
if err != nil {
170-
return nil, nil, fmt.Errorf("encoding message: %w", err)
171-
}
206+
// now send the parts
207+
for range MaxPartialInvocationReqs {
208+
input, err := newPartialInvocationMessage(inv.Link(), part)
209+
if err != nil {
210+
return nil, fmt.Errorf("building message: %w", err)
211+
}
172212

173-
res, err := conn.Channel().Request(ctx, req)
174-
if err != nil {
175-
return nil, nil, fmt.Errorf("sending message: %w", err)
176-
}
213+
req, err := conn.Codec().Encode(input)
214+
if err != nil {
215+
return nil, fmt.Errorf("encoding message: %w", err)
216+
}
177217

178-
if res.Status() == http.StatusPartialContent || res.Status() == http.StatusOK {
179-
response = res
180-
break
181-
}
218+
res, err := conn.Channel().Request(ctx, req)
219+
if err != nil {
220+
return nil, fmt.Errorf("sending message: %w", err)
221+
}
182222

183-
// if still too big, then fail
184-
if res.Status() == http.StatusRequestHeaderFieldsTooLarge {
185-
return nil, nil, errors.New("invocation is too big to send in HTTP headers")
186-
}
223+
if res.Status() == http.StatusPartialContent || res.Status() == http.StatusOK {
224+
return res, nil
225+
}
187226

188-
if res.Status() != http.StatusNotExtended {
189-
return nil, nil, fmt.Errorf("unexpected status code: %d", res.Status())
190-
}
227+
// if still too big, then fail
228+
if res.Status() == http.StatusRequestHeaderFieldsTooLarge {
229+
return nil, errors.New("invocation is too big to send in HTTP headers")
230+
}
191231

192-
body, err := io.ReadAll(res.Body())
193-
if err != nil {
194-
return nil, nil, fmt.Errorf("reading not extended body: %w", err)
195-
}
232+
if res.Status() != http.StatusNotExtended {
233+
return nil, fmt.Errorf("unexpected status code: %d", res.Status())
234+
}
196235

197-
var model rdm.MissingProofsModel
198-
err = json.Decode(body, &model, rdm.MissingProofsType())
199-
if err != nil {
200-
return nil, nil, fmt.Errorf("decoding body: %w", err)
201-
}
202-
if len(model.Proofs) == 0 {
203-
return nil, nil, fmt.Errorf("missing missing proofs: %w", err)
204-
}
236+
body, err := io.ReadAll(res.Body())
237+
if err != nil {
238+
return nil, fmt.Errorf("reading not extended body: %w", err)
239+
}
205240

206-
p, ok := parts[model.Proofs[0].String()]
207-
if !ok {
208-
return nil, nil, fmt.Errorf("missing proof not found or was already sent: %s", model.Proofs[0].String())
209-
}
210-
part = p
211-
delete(parts, p.Link().String())
241+
var model rdm.MissingProofsModel
242+
err = json.Decode(body, &model, rdm.MissingProofsType())
243+
if err != nil {
244+
return nil, fmt.Errorf("decoding body: %w", err)
245+
}
246+
if len(model.Proofs) == 0 {
247+
return nil, errors.New("server did not include missing proofs in response")
212248
}
213-
}
214249

215-
output, err := conn.Codec().Decode(response)
216-
if err != nil {
217-
return nil, nil, fmt.Errorf("decoding message: %w", err)
250+
p, ok := parts[model.Proofs[0].String()]
251+
if !ok {
252+
return nil, fmt.Errorf("missing proof not found or was already sent: %s", model.Proofs[0].String())
253+
}
254+
part = p
255+
delete(parts, p.Link().String())
218256
}
219257

220-
return client.ExecutionResponse(output), response, nil
258+
return nil, fmt.Errorf("maximum partial invocation requests exceeded: %d", MaxPartialInvocationReqs)
221259
}
222260

223261
func omitProofs(dlg delegation.Delegation) (delegation.Delegation, error) {

server/retrieval/cache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
"github.com/storacha/go-ucanto/core/ipld"
1010
)
1111

12-
var MemoryDelegationCacheSize = 100
12+
const MemoryDelegationCacheSize = 100
1313

1414
type MemoryDelegationCache struct {
1515
data *lru.Cache[string, delegation.Delegation]

server/retrieval/cache_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package retrieval
22

33
import (
4+
"fmt"
45
"testing"
56

67
"github.com/storacha/go-ucanto/core/delegation"
@@ -52,4 +53,30 @@ func TestMemoryDelegationCache(t *testing.T) {
5253
require.False(t, ok)
5354
require.Nil(t, cached)
5455
})
56+
57+
t.Run("uses default size if not specified", func(t *testing.T) {
58+
cache, err := NewMemoryDelegationCache(-1)
59+
require.NoError(t, err)
60+
61+
for i := range MemoryDelegationCacheSize + 1 {
62+
dlg, err := delegation.Delegate(
63+
fixtures.Alice,
64+
fixtures.Alice,
65+
[]ucan.Capability[ucan.NoCaveats]{
66+
ucan.NewCapability(
67+
"test/cache",
68+
fixtures.Alice.DID().String(),
69+
ucan.NoCaveats{},
70+
),
71+
},
72+
delegation.WithNonce(fmt.Sprintf("%d", i)),
73+
)
74+
require.NoError(t, err)
75+
76+
err = cache.Put(t.Context(), dlg)
77+
require.NoError(t, err)
78+
}
79+
80+
require.Equal(t, MemoryDelegationCacheSize, cache.data.Len())
81+
})
5582
}

server/retrieval/error.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,26 @@ import (
55
rdm "github.com/storacha/go-ucanto/server/retrieval/datamodel"
66
)
77

8-
type AgentMessageInvocationError struct{}
8+
type AgentMessageInvocationCountError struct{}
99

10-
func (amie AgentMessageInvocationError) Error() string {
10+
func (amie AgentMessageInvocationCountError) Error() string {
1111
return "Agent Message is required to have a single invocation."
1212
}
1313

14-
func (amie AgentMessageInvocationError) Name() string {
14+
func (amie AgentMessageInvocationCountError) Name() string {
1515
return "AgentMessageInvocationError"
1616
}
1717

18-
func (amie AgentMessageInvocationError) ToIPLD() (ipld.Node, error) {
18+
func (amie AgentMessageInvocationCountError) ToIPLD() (ipld.Node, error) {
1919
mdl := rdm.AgentMessageInvocationErrorModel{
2020
Name: amie.Name(),
2121
Message: amie.Error(),
2222
}
2323
return ipld.WrapWithRecovery(&mdl, rdm.AgentMessageInvocationErrorType())
2424
}
2525

26-
func NewAgentMessageInvocationError() AgentMessageInvocationError {
27-
return AgentMessageInvocationError{}
26+
func NewAgentMessageInvocationCountError() AgentMessageInvocationCountError {
27+
return AgentMessageInvocationCountError{}
2828
}
2929

3030
type MissingProofs struct {

server/retrieval/server.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func NewServer(id principal.Signer, options ...Option) (*Server, error) {
106106

107107
dlgCache := cfg.delegationCache
108108
if dlgCache == nil {
109-
dc, err := NewMemoryDelegationCache(-1)
109+
dc, err := NewMemoryDelegationCache(MemoryDelegationCacheSize)
110110
if err != nil {
111111
return nil, err
112112
}
@@ -221,15 +221,16 @@ func handle(ctx context.Context, srv CachingServer, request transport.HTTPReques
221221

222222
msg, err := selection.Decoder().Decode(request)
223223
if err != nil {
224-
return thttp.NewResponse(http.StatusBadRequest, io.NopCloser(strings.NewReader("The server failed to decode the request payload. Please format the payload according to the specified media type.")), nil), nil
224+
msg := fmt.Sprintf("The server failed to decode the request payload. Please format the payload according to the specified media type: %s", err.Error())
225+
return thttp.NewResponse(http.StatusBadRequest, io.NopCloser(strings.NewReader(msg)), nil), nil
225226
}
226227

227228
// retrieval server supports only 1 invocation in the agent message, since
228229
// only a single handler can use the body.
229230
invs := msg.Invocations()
230231
if len(invs) != 1 {
231232
var rcpts []receipt.AnyReceipt
232-
res := result.NewFailure(NewAgentMessageInvocationError())
233+
res := result.NewFailure(NewAgentMessageInvocationCountError())
233234
for _, l := range invs {
234235
rcpt, err := receipt.Issue(srv.ID(), res, ran.FromLink(l))
235236
if err != nil {
@@ -298,7 +299,7 @@ func Execute(ctx context.Context, srv CachingServer, msg message.AgentMessage, r
298299
invs := msg.Invocations()
299300
if len(invs) != 1 {
300301
var rcpts []receipt.AnyReceipt
301-
res := result.NewFailure(NewAgentMessageInvocationError())
302+
res := result.NewFailure(NewAgentMessageInvocationCountError())
302303
for _, l := range invs {
303304
rcpt, err := receipt.Issue(srv.ID(), res, ran.FromLink(l))
304305
if err != nil {

transport/car/request/request.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func Decode(req transport.HTTPRequest) (message.AgentMessage, error) {
2929
return nil, fmt.Errorf("decoding CAR: %w", err)
3030
}
3131
if len(roots) != 1 {
32-
return nil, fmt.Errorf("unexpected number of roots: %d", len(roots))
32+
return nil, fmt.Errorf("unexpected number of roots: %d, expected: 1", len(roots))
3333
}
3434
bstore, err := blockstore.NewBlockReader(blockstore.WithBlocksIterator(blocks))
3535
if err != nil {

transport/car/response/response.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func Decode(response transport.HTTPResponse) (message.AgentMessage, error) {
2727
return nil, fmt.Errorf("decoding CAR: %w", err)
2828
}
2929
if len(roots) != 1 {
30-
return nil, fmt.Errorf("unexpected number of roots: %d", len(roots))
30+
return nil, fmt.Errorf("unexpected number of roots: %d, expected: 1", len(roots))
3131
}
3232
bstore, err := blockstore.NewBlockReader(blockstore.WithBlocksIterator(blocks))
3333
if err != nil {

0 commit comments

Comments
 (0)