feat!: retrieval server and client#48
Conversation
044368c to
d303891
Compare
|
Note to self: need to set caching headers to expire when delegation expires + |
8544963 to
af2d42d
Compare
frrist
left a comment
There was a problem hiding this comment.
Good stuff, there's a lot here!
Will give another look after some conversation in comments and related RFC.
| return nil, nil, fmt.Errorf("decoding body: %w", err) | ||
| } | ||
| if len(model.Proofs) == 0 { | ||
| return nil, nil, fmt.Errorf("missing missing proofs: %w", err) |
There was a problem hiding this comment.
maybe something like: "Server did not include missing proofs in response"?
| return c.hasher() | ||
| } | ||
|
|
||
| func Execute(ctx context.Context, inv invocation.Invocation, conn client.Connection) (client.ExecutionResponse, transport.HTTPResponse, error) { |
There was a problem hiding this comment.
nit: preference for this method to be broken down a bit more. Ideaally separate functions for "regular" requests and "multipart" requests at a minimum.
There was a problem hiding this comment.
Additionally, I'd appreciate a comment on the top of this method describing the flow of things, something like:
Execute performs a UCAN invocation using the headercar transport, implementing
a "probe and retry" pattern to handle HTTP header size limitations.
The method first attempts to send the complete invocation (including all proofs)
in HTTP headers. If this fails due to size constraints (4KB header limit), it
falls back to a multipart negotiation protocol:
1. Send invocation with ALL proofs omitted
2. Server responds with 510 (Not Extended) listing missing proof CIDs
3. Send partial invocations with each missing proof attached one by one as requested (TODO I probably have this wrong)
4. Repeat until server has all required proofs (200/206 response)
This approach optimizes for the common case (shallow delegation chains that fit
in headers) while also handling deep proof chains that require
multiple round trips. The server caches proofs between requests, so each proof
only needs to be sent once per session.
Note: The current implementation processes missing proofs sequentially rather
than in batches, which means deep delegation chains will result in multiple
HTTP round trips. This trade-off prioritizes implementation simplicity over
network efficiency, which is acceptable given current delegation chain depths
but may need optimization as authorization hierarchies grow deeper.
Returns the execution response, the final HTTP response, and any error encountered.
Based on a read of the implementation, I think my comment here: https://github.com/storacha/specs/pull/139/files#r2304572067 is probably an incorrect understanding. Though this does motivate, to me at least, a separate endpoint on nodes where proofs can be Put once before a Get, though I haven't really thought this through completely, so I might be speaking non-sense 😅
There was a problem hiding this comment.
👍 will do. Please check my response https://github.com/storacha/specs/pull/139/files#r2334017925
| }.Sum(bytes) | ||
| return cidlink.Link{Cid: c} | ||
| } | ||
|
|
There was a problem hiding this comment.
might want to replace this package with https://github.com/storacha/go-libstoracha/blob/main/testutil in the future.
There was a problem hiding this comment.
That ends up being circular I think...
| return nil, fmt.Errorf("decoding CAR: %w", err) | ||
| } | ||
| if len(roots) != 1 { | ||
| return nil, fmt.Errorf("unexpected number of roots: %d", len(roots)) |
There was a problem hiding this comment.
nit: include the expected number of roots in the error message, i.e. 1
| if len(roots) != 1 { | ||
| return nil, fmt.Errorf("unexpected number of roots: %d", len(roots)) | ||
| } |
| r, w := io.Pipe() | ||
| go func() { | ||
| gz := gzip.NewWriter(w) | ||
| _, err := io.Copy(gz, data) | ||
| gz.Close() | ||
| w.CloseWithError(err) | ||
| }() | ||
|
|
||
| var b bytes.Buffer | ||
| _, err := b.ReadFrom(r) | ||
| if err != nil { | ||
| return "", fmt.Errorf("reading encoded CAR: %w", err) | ||
| } |
There was a problem hiding this comment.
Can we remove this go routine, maybe something like this?
var b bytes.Buffer
gz := gzip.NewWriter(&b)
_, err := io.Copy(gz, data)
if err != nil {
gz.Close()
return "", fmt.Errorf("compressing CAR data: %w", err)
}
if err := gz.Close(); err != nil {
return "", fmt.Errorf("closing gzip writer: %w", err)
}
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
Co-authored-by: Forrest <forrest@storacha.network>
frrist
left a comment
There was a problem hiding this comment.
I don't see anything in here that causes me to want to block a merge. I'd like the Execute method to be broken into smaller methods, but not worth blocking on that point alone.
sooo LGTM 🚢 🚂 good stuff!
(though, it would probably be good to have a second set of eyes/approval - your call ofc)
|
@frrist feedback addressed. I have also added an upper bound to the number of requests that will be made, if you have a delegation chain longer than 50 then you cannot use this code... Let me know if you have any issues with that and I'll address in a new PR. |
| // if the header fields are too big, we need to split the delegation into | ||
| // multiple requests... | ||
| if multi { | ||
| response, err = sendPartialInvocations(ctx, inv, conn) | ||
| if err != nil { | ||
| return nil, nil, fmt.Errorf("sending partial invocations: %w", err) | ||
| } | ||
| } | ||
|
|
||
| output, err := conn.Codec().Decode(response) | ||
| if err != nil { | ||
| return nil, nil, fmt.Errorf("decoding message: %w", err) | ||
| } | ||
|
|
||
| return client.ExecutionResponse(output), response, nil |
There was a problem hiding this comment.
This is way better! Much easier to follow chain of execution logic 🫶
| return nil, fmt.Errorf("unexpected status code: %d", res.Status()) | ||
| } | ||
|
|
||
| body, err := io.ReadAll(res.Body()) |
There was a problem hiding this comment.
do we need to close the res.Body()?
| } | ||
|
|
||
| // now send the parts | ||
| for range MaxPartialInvocationReqs { |
There was a problem hiding this comment.
In a future version, we could do something like:
for {
select {
case ctx.Done():
return ctx.Err()
default:
// continue with existing logic
}
// existing logic
}This would allow the caller to decide how long they want to let this request run; MaxPartialInvocationReqs seems pragmatic enough for now.
SGTM, I suspect this case will be very unlikely, but proposed an alternative in the comments. My only concern is this: #48 (comment) - it looks like we may have a resource leak by not closing the body, but unsure. |
This PR implements the RFC here. It exposes a server and client implementation that allows UCAN authorized retrieval requests via invocations (and receipts) passed in HTTP headers. This leaves the HTTP response body available to be used for retrieved bytes.
A retrieval server is very similar to a normal Ucanto server, except it requires invocations to be sent using the
headercartransport codec. The only other difference is that invocation handlers receive an extra argument - the HTTP request info, and can return and additional value - a HTTP response.The retrieval client is also very similar to a Ucanto client, except that it has the ability to send an invocation in multiple parts, if it does not fit in HTTP headers. Essentially it'll send proofs one by one until the server has all the proofs required to execute the invocation. The server has an LRU cache allowing for this.
The PR also includes a transport codec that encodes agent messages into HTTP headers.
🎬 Demo: https://youtu.be/11np-cGTe48?si=kw88R1DAlMSq-b1T
resolves #59