Adding a new source (where readings come from) or sink (where readings go) takes one new file plus a Cargo feature flag — never a refactor.
Sources and sinks are small async traits defined in gluco-hub-core:
// gluco-hub-core/src/source.rs
#[async_trait]
pub trait Source: Send + Sync + 'static {
fn id(&self) -> &SourceId;
async fn fetch_latest(&self) -> Result<Vec<Reading>, CoreError>;
}
// gluco-hub-core/src/sink.rs
#[async_trait]
pub trait Sink: Send + Sync + 'static {
fn name(&self) -> &'static str;
async fn push(&self, readings: &[Reading]) -> Result<(), CoreError>;
}The poller fetches from one Source, caches the latest reading, and fans it out to every configured Sink in parallel. Each sink fails independently; the others keep running.
Building a complete sink takes five steps. The example below adds a sink that POSTs readings to a custom webhook.
gluco-hub/src/sinks/webhook/
├── mod.rs
└── sink.rs
// gluco-hub/src/sinks/webhook/sink.rs
use async_trait::async_trait;
use gluco_hub_core::{CoreError, Reading, Sink};
pub struct WebhookSink {
url: String,
client: reqwest::Client,
}
#[async_trait]
impl Sink for WebhookSink {
fn name(&self) -> &'static str { "webhook" }
async fn push(&self, readings: &[Reading]) -> Result<(), CoreError> {
self.client
.post(&self.url)
.json(readings)
.send()
.await
.map_err(|e| CoreError::Sink(e.to_string()))?
.error_for_status()
.map_err(|e| CoreError::Sink(e.to_string()))?;
Ok(())
}
}# gluco-hub/Cargo.toml
[features]
sink-webhook = ["dep:reqwest"]// gluco-hub/src/sinks/mod.rs
#[cfg(feature = "sink-webhook")]
pub mod webhook;// gluco-hub/src/main.rs — inside build_sinks()
#[cfg(feature = "sink-webhook")]
if let Some(cfg) = cfg.sink.webhook.as_ref() {
sinks.push(Arc::new(WebhookSink::new(cfg.url.clone())));
}Add the matching [sink.webhook] section to Config in gluco-hub/src/config.rs. Build with --features sink-webhook and configure it via TOML or GLUCO_HUB__SINK__WEBHOOK__URL.
Sources follow the same pattern. The example below adds a source that reads from a local CGM file.
// gluco-hub/src/sources/file/source.rs
use async_trait::async_trait;
use gluco_hub_core::{CoreError, Reading, Source, SourceId};
pub struct FileSource {
id: SourceId,
path: PathBuf,
}
#[async_trait]
impl Source for FileSource {
fn id(&self) -> &SourceId { &self.id }
async fn fetch_latest(&self) -> Result<Vec<Reading>, CoreError> {
let bytes = tokio::fs::read(&self.path).await
.map_err(|e| CoreError::Source(e.to_string()))?;
let readings: Vec<Reading> = serde_json::from_slice(&bytes)
.map_err(|e| CoreError::Source(e.to_string()))?;
Ok(readings)
}
}# gluco-hub/Cargo.toml
[features]
source-file = []// gluco-hub/src/sources/mod.rs
#[cfg(feature = "source-file")]
pub mod file;Add a branch for your variant to the source-selection code in main.rs, which picks one source per run based on [source.*] config.
The source-ns-socket feature (module gluco-hub/src/sources/ns_socket/) uses a
Nightscout site as the upstream via its Socket.IO real-time feed. As of this
writing it is a scaffold: the module, config ([source.ns_socket]), Source
impl, and [NSS0xx] error codes exist, but NsSocketClient::connect is stubbed
and returns [NSS001]. The wire contract below was verified against the official
cgm-remote-monitor source so the eventual implementation does not have to
re-derive it.
- Transport / namespace: Socket.IO v4 over an Engine.IO websocket, on the
default namespace (
/) — Nightscout uses no custom namespace. Engine.IO path is the default/socket.io/. Use wss forhttpsorigins. - Auth handshake: the client emits an
authorizeevent with{ client: "<id>", token | secret, history: <hours> }:token— a Nightscout access token (e.g.myreader-0123456789abcdef), preferred on modern deployments (auth = "token");secret— the SHA-1 hash of the API secret (auth = "api_secret");history— hours of history to replay (server default 48). On success the server emitsconnectedand acks with{ read, write, write_treatment }booleans.
- Data push: the server broadcasts
dataUpdateto authorized clients. The payload is a delta object (delta: true,lastUpdatedms) carrying ansgvsarray when glucose changed (full dataset on the first push). - Entry fields: each
sgvcarriesmills/date(epoch ms),sgv/mgdl(mg/dL), anddirection(trend string —Flat,SingleUp,FortyFiveDown,NOT COMPUTABLE,RATE OUT OF RANGE, …, normalising ontogluco_hub_core::Trend).
Sources: cgm-remote-monitor
(lib/server/websocket.js, lib/data/calcdelta.js),
Socket.IO v4 client API,
Nightscout setup variables.
Dependency note: a Socket.IO client (candidate
rust-socketio) must use a rustls
backend (no OpenSSL) and may only be added once cargo deny check still passes.
The scaffold adds zero new runtime deps.
For sources, use the in-memory MockSource in gluco-hub-core::mock to drive tests without external services. For sinks, use wiremock to mock the HTTP target. See gluco-hub/src/e2e_tests.rs for end-to-end examples.
#[tokio::test]
async fn webhook_sink_posts_readings() {
let server = wiremock::MockServer::start().await;
Mock::given(method("POST"))
.respond_with(ResponseTemplate::new(200))
.mount(&server)
.await;
let sink = WebhookSink::new(server.uri());
sink.push(&[Reading::test_fixture()]).await.unwrap();
}- Errors: use
CoreError::Sink/CoreError::Sourcewith a stable error code prefix (SNK*/SRC*). - Logs: emit
tracingevents with structured fields — never log secrets. - Idempotency: the poller may retry
push(); deduplicate on the receiving side or via local state. - Validation: validate config at startup via
validator, not at push time. - No new dependencies without
cargo deny checkpassing.
See ARCHITECTURE.md for the full data flow and module map.