Skip to content

Commit 044368c

Browse files
committed
feat: header CAR transport codec
1 parent 06a2c2d commit 044368c

4 files changed

Lines changed: 261 additions & 6 deletions

File tree

transport/headercar/codec.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package car
2+
3+
import (
4+
"net/http"
5+
6+
"github.com/storacha/go-ucanto/core/message"
7+
"github.com/storacha/go-ucanto/transport"
8+
"github.com/storacha/go-ucanto/transport/car/request"
9+
"github.com/storacha/go-ucanto/transport/car/response"
10+
thttp "github.com/storacha/go-ucanto/transport/http"
11+
)
12+
13+
type carOutbound struct{}
14+
15+
func (oc *carOutbound) Encode(msg message.AgentMessage) (transport.HTTPRequest, error) {
16+
return request.Encode(msg)
17+
}
18+
19+
func (oc *carOutbound) Decode(res transport.HTTPResponse) (message.AgentMessage, error) {
20+
return response.Decode(res)
21+
}
22+
23+
var _ transport.OutboundCodec = (*carOutbound)(nil)
24+
25+
func NewOutboundCodec() transport.OutboundCodec {
26+
return &carOutbound{}
27+
}
28+
29+
type carInboundAcceptCodec struct{}
30+
31+
func (cic *carInboundAcceptCodec) Encoder() transport.ResponseEncoder {
32+
return cic
33+
}
34+
35+
func (cic *carInboundAcceptCodec) Decoder() transport.RequestDecoder {
36+
return cic
37+
}
38+
39+
func (cic *carInboundAcceptCodec) Encode(msg message.AgentMessage) (transport.HTTPResponse, error) {
40+
return response.Encode(msg)
41+
}
42+
43+
func (cic *carInboundAcceptCodec) Decode(req transport.HTTPRequest) (message.AgentMessage, error) {
44+
return request.Decode(req)
45+
}
46+
47+
type carInbound struct {
48+
codec transport.InboundAcceptCodec
49+
}
50+
51+
func (ic *carInbound) Accept(req transport.HTTPRequest) (transport.InboundAcceptCodec, transport.HTTPError) {
52+
msgHdr := req.Headers().Get("X-Agent-Message")
53+
if msgHdr == "" {
54+
return nil, thttp.NewHTTPError(
55+
"The server cannot process the request because the payload format is not supported. Please send the X-Agent-Message header.",
56+
http.StatusUnsupportedMediaType,
57+
http.Header{},
58+
)
59+
}
60+
return ic.codec, nil
61+
}
62+
63+
var _ transport.InboundCodec = (*carInbound)(nil)
64+
65+
func NewInboundCodec() transport.InboundCodec {
66+
return &carInbound{codec: &carInboundAcceptCodec{}}
67+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package request
2+
3+
import (
4+
"bytes"
5+
"compress/gzip"
6+
"errors"
7+
"fmt"
8+
"io"
9+
"net/http"
10+
11+
"github.com/multiformats/go-multibase"
12+
"github.com/storacha/go-ucanto/core/car"
13+
"github.com/storacha/go-ucanto/core/dag/blockstore"
14+
"github.com/storacha/go-ucanto/core/ipld"
15+
"github.com/storacha/go-ucanto/core/message"
16+
"github.com/storacha/go-ucanto/transport"
17+
uhttp "github.com/storacha/go-ucanto/transport/http"
18+
)
19+
20+
func Encode(message message.AgentMessage) (transport.HTTPRequest, error) {
21+
headers := http.Header{}
22+
body := car.Encode([]ipld.Link{message.Root().Link()}, message.Blocks())
23+
24+
r, w := io.Pipe()
25+
go func() {
26+
gz := gzip.NewWriter(w)
27+
_, err := io.Copy(gz, body)
28+
gz.Close()
29+
w.CloseWithError(err)
30+
}()
31+
32+
var b bytes.Buffer
33+
_, err := b.ReadFrom(r)
34+
if err != nil {
35+
return nil, fmt.Errorf("reading encoded CAR: %w", err)
36+
}
37+
38+
msgHdr, err := multibase.Encode(multibase.Base64, b.Bytes())
39+
headers.Set("X-Agent-Message", msgHdr)
40+
reader := car.Encode([]ipld.Link{message.Root().Link()}, message.Blocks())
41+
return uhttp.NewHTTPRequest(reader, headers), nil
42+
}
43+
44+
func Decode(req transport.HTTPRequest) (message.AgentMessage, error) {
45+
msgHdr := req.Headers().Get("X-Agent-Message")
46+
if msgHdr == "" {
47+
return nil, errors.New("missing X-Agent-Message header in request")
48+
}
49+
_, data, err := multibase.Decode(msgHdr)
50+
if msgHdr == "" {
51+
return nil, fmt.Errorf("multibase decoding X-Agent-Message header: %w", err)
52+
}
53+
gz, err := gzip.NewReader(bytes.NewReader(data))
54+
if err != nil {
55+
return nil, fmt.Errorf("creating gzip reader: %w", err)
56+
}
57+
defer gz.Close()
58+
roots, blocks, err := car.Decode(gz)
59+
if err != nil {
60+
return nil, fmt.Errorf("decoding CAR: %w", err)
61+
}
62+
bstore, err := blockstore.NewBlockReader(blockstore.WithBlocksIterator(blocks))
63+
if err != nil {
64+
return nil, fmt.Errorf("creating blockstore: %w", err)
65+
}
66+
return message.NewMessage(roots, bstore)
67+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package response
2+
3+
import (
4+
"bytes"
5+
"compress/gzip"
6+
"errors"
7+
"fmt"
8+
"io"
9+
"net/http"
10+
11+
"github.com/multiformats/go-multibase"
12+
"github.com/storacha/go-ucanto/core/car"
13+
"github.com/storacha/go-ucanto/core/dag/blockstore"
14+
"github.com/storacha/go-ucanto/core/ipld"
15+
"github.com/storacha/go-ucanto/core/message"
16+
"github.com/storacha/go-ucanto/transport"
17+
uhttp "github.com/storacha/go-ucanto/transport/http"
18+
)
19+
20+
func Encode(msg message.AgentMessage) (transport.HTTPResponse, error) {
21+
headers := http.Header{}
22+
body := car.Encode([]ipld.Link{msg.Root().Link()}, msg.Blocks())
23+
24+
r, w := io.Pipe()
25+
go func() {
26+
gz := gzip.NewWriter(w)
27+
_, err := io.Copy(gz, body)
28+
gz.Close()
29+
w.CloseWithError(err)
30+
}()
31+
32+
var b bytes.Buffer
33+
_, err := b.ReadFrom(r)
34+
if err != nil {
35+
return nil, fmt.Errorf("reading encoded CAR: %w", err)
36+
}
37+
38+
msgHdr, err := multibase.Encode(multibase.Base64, b.Bytes())
39+
headers.Set("X-Agent-Message", msgHdr)
40+
return uhttp.NewHTTPResponse(http.StatusOK, nil, headers), nil
41+
}
42+
43+
func Decode(response transport.HTTPResponse) (message.AgentMessage, error) {
44+
msgHdr := response.Headers().Get("X-Agent-Message")
45+
if msgHdr == "" {
46+
return nil, errors.New("missing X-Agent-Message header in response")
47+
}
48+
_, data, err := multibase.Decode(msgHdr)
49+
if msgHdr == "" {
50+
return nil, fmt.Errorf("multibase decoding X-Agent-Message header: %w", err)
51+
}
52+
gz, err := gzip.NewReader(bytes.NewReader(data))
53+
if err != nil {
54+
return nil, fmt.Errorf("creating gzip reader: %w", err)
55+
}
56+
defer gz.Close()
57+
roots, blocks, err := car.Decode(gz)
58+
if err != nil {
59+
return nil, fmt.Errorf("decoding CAR: %w", err)
60+
}
61+
bstore, err := blockstore.NewBlockReader(blockstore.WithBlocksIterator(blocks))
62+
if err != nil {
63+
return nil, fmt.Errorf("creating blockstore: %w", err)
64+
}
65+
return message.NewMessage(roots, bstore)
66+
}

transport/http/channel.go

Lines changed: 61 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,53 @@ import (
55
"net/http"
66
"net/url"
77

8+
"slices"
9+
810
"github.com/storacha/go-ucanto/transport"
911
)
1012

13+
// Option is an option configuring a HTTP channel.
14+
type Option func(cfg *chanConfig)
15+
16+
type chanConfig struct {
17+
client *http.Client
18+
method string
19+
statuses []int
20+
}
21+
22+
// WithClient configures the HTTP client the channel should use to make
23+
// requests.
24+
func WithClient(c *http.Client) Option {
25+
return func(cfg *chanConfig) {
26+
cfg.client = c
27+
}
28+
}
29+
30+
// WithMethod configures the HTTP method the channel should use when making
31+
// requests.
32+
func WithMethod(method string) Option {
33+
return func(cfg *chanConfig) {
34+
cfg.method = method
35+
}
36+
}
37+
38+
// WithSuccessStatusCode configures the HTTP status code(s) that will indicate a
39+
// successful request.
40+
func WithSuccessStatusCode(codes ...int) Option {
41+
return func(cfg *chanConfig) {
42+
cfg.statuses = codes
43+
}
44+
}
45+
1146
type channel struct {
12-
url *url.URL
13-
client *http.Client
47+
url *url.URL
48+
client *http.Client
49+
method string
50+
statuses []int
1451
}
1552

1653
func (c *channel) Request(req transport.HTTPRequest) (transport.HTTPResponse, error) {
17-
hr, err := http.NewRequest("POST", c.url.String(), req.Body())
54+
hr, err := http.NewRequest(c.method, c.url.String(), req.Body())
1855
if err != nil {
1956
return nil, fmt.Errorf("creating HTTP request: %s", err)
2057
}
@@ -24,13 +61,31 @@ func (c *channel) Request(req transport.HTTPRequest) (transport.HTTPResponse, er
2461
if err != nil {
2562
return nil, fmt.Errorf("doing HTTP request: %s", err)
2663
}
27-
if res.StatusCode != http.StatusOK {
64+
if !slices.Contains(c.statuses, res.StatusCode) {
2865
return nil, NewHTTPError(fmt.Sprintf("HTTP Request failed. %s %s → %d", hr.Method, c.url.String(), res.StatusCode), res.StatusCode, res.Header)
2966
}
3067

3168
return NewHTTPResponse(res.StatusCode, res.Body, res.Header), nil
3269
}
3370

34-
func NewHTTPChannel(url *url.URL) transport.Channel {
35-
return &channel{url: url, client: &http.Client{}}
71+
func NewHTTPChannel(url *url.URL, options ...Option) transport.Channel {
72+
cfg := chanConfig{}
73+
for _, opt := range options {
74+
opt(&cfg)
75+
}
76+
if cfg.client == nil {
77+
cfg.client = &http.Client{}
78+
}
79+
if cfg.method == "" {
80+
cfg.method = "POST"
81+
}
82+
if len(cfg.statuses) == 0 {
83+
cfg.statuses = append(cfg.statuses, http.StatusOK)
84+
}
85+
return &channel{
86+
url: url,
87+
client: cfg.client,
88+
method: cfg.method,
89+
statuses: cfg.statuses,
90+
}
3691
}

0 commit comments

Comments
 (0)