|
1 | 1 | package retrieval |
2 | 2 |
|
3 | 3 | import ( |
| 4 | + "bytes" |
4 | 5 | "context" |
5 | 6 | "errors" |
6 | 7 | "fmt" |
7 | 8 | "io" |
8 | 9 | "net/http" |
9 | 10 | "strings" |
| 11 | + "time" |
10 | 12 |
|
11 | 13 | "github.com/storacha/go-ucanto/core/dag/blockstore" |
12 | 14 | "github.com/storacha/go-ucanto/core/delegation" |
13 | 15 | "github.com/storacha/go-ucanto/core/invocation" |
14 | 16 | "github.com/storacha/go-ucanto/core/invocation/ran" |
15 | 17 | "github.com/storacha/go-ucanto/core/ipld" |
| 18 | + "github.com/storacha/go-ucanto/core/ipld/codec/json" |
16 | 19 | "github.com/storacha/go-ucanto/core/message" |
17 | 20 | "github.com/storacha/go-ucanto/core/receipt" |
18 | 21 | "github.com/storacha/go-ucanto/core/result" |
19 | 22 | "github.com/storacha/go-ucanto/core/result/failure" |
20 | 23 | "github.com/storacha/go-ucanto/principal" |
21 | 24 | "github.com/storacha/go-ucanto/server" |
| 25 | + rdm "github.com/storacha/go-ucanto/server/retrieval/datamodel" |
22 | 26 | "github.com/storacha/go-ucanto/server/transaction" |
23 | 27 | "github.com/storacha/go-ucanto/transport" |
24 | 28 | "github.com/storacha/go-ucanto/transport/headercar" |
@@ -66,9 +70,9 @@ type ServiceMethod[O ipld.Builder, X failure.IPLDBuilderFailure] func( |
66 | 70 | // service implementation. |
67 | 71 | type Service = map[ucan.Ability]ServiceMethod[ipld.Builder, failure.IPLDBuilderFailure] |
68 | 72 |
|
69 | | -// CachingServer is a retrieval that also caches invocations/delegations to |
70 | | -// allow invocations with delegations chains bigger than HTTP header size limits |
71 | | -// to be executed as multiple requests. |
| 73 | +// CachingServer is a retrieval server that also caches invocations/delegations |
| 74 | +// to allow invocations with delegations chains bigger than HTTP header size |
| 75 | +// limits to be executed as multiple requests. |
72 | 76 | type CachingServer interface { |
73 | 77 | server.Server[Service] |
74 | 78 | Cache() delegation.Store |
@@ -229,6 +233,12 @@ func Handle(ctx context.Context, srv CachingServer, request transport.HTTPReques |
229 | 233 | return nil, fmt.Errorf("executing invocations: %w", err) |
230 | 234 | } |
231 | 235 |
|
| 236 | + // if there is no agent message to respond with, we simply respond with the |
| 237 | + // response from execution (i.e. missing proofs response) |
| 238 | + if out == nil { |
| 239 | + return thttp.NewResponse(execResp.Status, execResp.Body, execResp.Headers), nil |
| 240 | + } |
| 241 | + |
232 | 242 | encResp, err := selection.Encoder().Encode(out) |
233 | 243 | if err != nil { |
234 | 244 | return nil, fmt.Errorf("encoding response message: %w", err) |
@@ -282,15 +292,18 @@ func Execute(ctx context.Context, srv CachingServer, msg message.AgentMessage, r |
282 | 292 | if err != nil { |
283 | 293 | mpe := MissingProofs{} |
284 | 294 | if errors.As(err, &mpe) { |
285 | | - rcpt, err := receipt.Issue(srv.ID(), result.NewFailure(mpe), ran.FromLink(invs[0])) |
286 | | - if err != nil { |
287 | | - return nil, Response{}, fmt.Errorf("issuing missing proofs receipt: %w", err) |
288 | | - } |
289 | | - out, err := message.Build(nil, []receipt.AnyReceipt{rcpt}) |
| 295 | + body, err := json.Encode(&mpe, rdm.MissingProofsType()) |
290 | 296 | if err != nil { |
291 | | - return nil, Response{}, fmt.Errorf("building missing proofs error message: %w", err) |
| 297 | + return nil, Response{}, fmt.Errorf("encoding missing proofs repsonse: %w", err) |
292 | 298 | } |
293 | | - return out, Response{Status: http.StatusNotExtended}, nil |
| 299 | + headers := http.Header{} |
| 300 | + expiry := time.Now().Add(10 * time.Minute).Unix() // TODO: honour this? |
| 301 | + headers.Set("X-UCAN-Cache-Expiry", fmt.Sprintf("%d", expiry)) |
| 302 | + return nil, Response{ |
| 303 | + Status: http.StatusNotExtended, |
| 304 | + Body: bytes.NewReader(body), |
| 305 | + Headers: headers, |
| 306 | + }, nil |
294 | 307 | } |
295 | 308 | return nil, Response{}, err |
296 | 309 | } |
|
0 commit comments