Skip to content

Commit 1ce60bb

Browse files
rycegclaude
andauthored
feat(alerts): Home Assistant as first-class alert delivery target (#216)
* feat(alerts): add home_assistant channel type Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(alerts): add metadata jsonb column to alert_rule_channels Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(alerts): add HomeAssistantHub for HA SignalR integration Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(alerts): add HomeAssistantProvider for targeted SignalR delivery Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(alerts): wire HomeAssistantProvider into delivery pipeline and DI Add HomeAssistant case to DispatchToProviderAsync switch, register HomeAssistantProvider in DI, and map the HomeAssistantHub endpoint. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(alerts): relay glucose and alert events to HomeAssistantHub subscribers Add IHubContext<HomeAssistantHub> to SignalRBroadcastService and relay glucose readings and alert events (resolved, acknowledged) to connected Home Assistant instances via the ha-glucose tenant group. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: address code review issues in HomeAssistantHub and provider - Replace connectionId-based tracking with ConcurrentDictionary<string,int> count-based tracking in HomeAssistantHub; use Context.Items to store instanceId for clean decrement in OnDisconnectedAsync - Add IsInstanceConnected static method checking count > 0 - Add ha-alerts group subscription in Subscribe; relay alert events to ha-alerts group in SignalRBroadcastService (not ha-glucose) - Make HomeAssistantProvider.SendAsync return Task<bool>; skip dispatch and return false when no client is connected - Update AlertDeliveryService to conditionally call MarkDeliveredAsync based on bool return; call MarkFailedAsync when not connected so catch-up mechanism can retry on reconnect - Pass channelMetadata through DispatchToProviderAsync; parse allow_ack from metadata and send channelMeta alongside payload - Add AlertRuleChannelSnapshot.Metadata property and propagate in AlertRepository; pass allow_ack in CatchUpFailedDeliveriesAsync - Replace FindFirst scope check with FindAll in Acknowledge (Fix 4) - Replace CancellationToken.None with Context.ConnectionAborted throughout CatchUpFailedDeliveriesAsync and Acknowledge (Fix 5) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(alerts): address HA hub review issues — delivery tracking, group naming, scope parsing All 5 review fixes now applied: Fix 1 (Critical): HomeAssistantHub tracks active instances via ConcurrentDictionary<string,int>; OnDisconnectedAsync decrements count. HomeAssistantProvider.SendAsync returns bool and skips dispatch when no HA client is connected. AlertDeliveryService calls MarkFailedAsync (not MarkDeliveredAsync) when provider returns false so the catch-up mechanism in CatchUpFailedDeliveriesAsync can retry on reconnect. Fix 2 (Critical): Subscribe now joins ha-alerts group in addition to ha-glucose. SignalRBroadcastService.BroadcastAlertEventAsync relays alert events to ha-alerts group instead of ha-glucose, separating glucose and alert subscribers. Fix 3 (Important): CatchUpFailedDeliveriesAsync includes AlertRuleChannel in its query and parses allow_ack from channel metadata, sending it as a second argument alongside the payload. AlertRuleChannelSnapshot gains a Metadata property (default null); AlertRepository populates it. DispatchToProviderAsync threads channelMetadata to HomeAssistantProvider. Fix 4 (Important): Acknowledge uses FindAll("scope") + SelectMany to handle multiple scope claims and space-delimited values, replacing the fragile FindFirst approach. Fix 5 (Important): Subscribe and Acknowledge pass Context.ConnectionAborted to all async calls instead of CancellationToken.None. Also fix cref attribute resolution in HomeAssistantHub XML doc comment. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(tests): add IHubContext<HomeAssistantHub> to SignalRBroadcastService test constructors Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 9244e6b commit 1ce60bb

15 files changed

Lines changed: 8635 additions & 11 deletions

File tree

src/API/Nocturne.API/Extensions/ServiceRegistrationExtensions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -689,6 +689,7 @@ IConfiguration configuration
689689
services.AddScoped<Nocturne.API.Services.Alerts.Providers.InAppProvider>();
690690
services.AddScoped<Nocturne.API.Services.Alerts.Providers.WebhookProvider>();
691691
services.AddScoped<Nocturne.API.Services.Alerts.Providers.ChatBotProvider>();
692+
services.AddScoped<Nocturne.API.Services.Alerts.Providers.HomeAssistantProvider>();
692693
services.AddHttpClient("ChatBot");
693694

694695
// Chat identity
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
using System.Collections.Concurrent;
2+
using System.Text.Json;
3+
using Microsoft.AspNetCore.SignalR;
4+
using Microsoft.EntityFrameworkCore;
5+
using Nocturne.Core.Contracts.Alerts;
6+
using Nocturne.Core.Models;
7+
using Nocturne.Core.Models.Alerts;
8+
using Nocturne.Infrastructure.Data;
9+
10+
namespace Nocturne.API.Hubs;
11+
12+
/// <summary>
13+
/// SignalR hub for Home Assistant integration. HA instances subscribe to receive
14+
/// real-time glucose relays and alert dispatches, and can acknowledge excursions
15+
/// when the channel is configured to allow it.
16+
/// Mounted at /hubs/home-assistant.
17+
/// </summary>
18+
public class HomeAssistantHub : TenantAwareHub
19+
{
20+
/// <summary>
21+
/// Tracks active HA instance connection counts. Key is the tenant-scoped group name for
22+
/// "ha:{instanceId}", value is the number of active connections in that group.
23+
/// Used by <see cref="Services.Alerts.Providers.HomeAssistantProvider"/> to gate delivery marking.
24+
/// </summary>
25+
private static readonly ConcurrentDictionary<string, int> _instanceConnectionCounts = new();
26+
27+
/// <summary>
28+
/// Subscribe the calling connection to glucose relay and per-instance alert groups.
29+
/// Also performs catch-up delivery for any failed HA deliveries targeting this instance.
30+
/// </summary>
31+
/// <param name="instanceId">The Home Assistant instance identifier (matches channel Destination).</param>
32+
public async Task Subscribe(string instanceId)
33+
{
34+
var ct = Context.ConnectionAborted;
35+
36+
if (string.IsNullOrWhiteSpace(instanceId))
37+
throw new HubException("instanceId must not be empty.");
38+
39+
var tenantId = TenantContext?.TenantId
40+
?? throw new HubException("No tenant context resolved.");
41+
42+
// Join tenant-scoped glucose relay, per-instance, and alert groups
43+
await Groups.AddToGroupAsync(Context.ConnectionId, TenantGroup("ha-glucose"), ct);
44+
await Groups.AddToGroupAsync(Context.ConnectionId, TenantGroup($"ha:{instanceId}"), ct);
45+
await Groups.AddToGroupAsync(Context.ConnectionId, TenantGroup("ha-alerts"), ct);
46+
47+
// Track connection count for this instance and store instanceId for cleanup on disconnect
48+
var instanceGroupKey = FormatTenantGroup(tenantId.ToString(), $"ha:{instanceId}");
49+
_instanceConnectionCounts.AddOrUpdate(instanceGroupKey, 1, (_, count) => count + 1);
50+
Context.Items["ha_instance_id"] = instanceId;
51+
52+
// Catch-up: re-dispatch failed deliveries for this instance
53+
await CatchUpFailedDeliveriesAsync(tenantId, instanceId, ct);
54+
}
55+
56+
/// <summary>
57+
/// Decrements the connection count for the instance this connection was subscribed to.
58+
/// </summary>
59+
public override async Task OnDisconnectedAsync(Exception? exception)
60+
{
61+
var instanceId = Context.Items.TryGetValue("ha_instance_id", out var val) ? val as string : null;
62+
if (instanceId is not null)
63+
{
64+
var tenantId = TenantContext?.TenantId.ToString();
65+
if (tenantId is not null)
66+
{
67+
var groupKey = FormatTenantGroup(tenantId, $"ha:{instanceId}");
68+
_instanceConnectionCounts.AddOrUpdate(groupKey, 0, (_, count) => Math.Max(0, count - 1));
69+
}
70+
}
71+
72+
await base.OnDisconnectedAsync(exception);
73+
}
74+
75+
/// <summary>
76+
/// Returns whether a specific HA instance currently has at least one active SignalR connection.
77+
/// Used by <see cref="Services.Alerts.Providers.HomeAssistantProvider"/> to decide whether to mark delivery as delivered.
78+
/// </summary>
79+
public static bool IsInstanceConnected(string tenantId, string instanceId)
80+
{
81+
var key = FormatTenantGroup(tenantId, $"ha:{instanceId}");
82+
return _instanceConnectionCounts.TryGetValue(key, out var count) && count > 0;
83+
}
84+
85+
/// <summary>
86+
/// Acknowledge a specific excursion from the Home Assistant side.
87+
/// Requires the "alerts.readwrite" OAuth scope and the channel's metadata must have allow_ack enabled.
88+
/// </summary>
89+
/// <param name="excursionId">The excursion to acknowledge.</param>
90+
/// <param name="acknowledgedBy">Display name or identifier of the person acknowledging.</param>
91+
public async Task Acknowledge(Guid excursionId, string acknowledgedBy)
92+
{
93+
var ct = Context.ConnectionAborted;
94+
95+
var tenantId = TenantContext?.TenantId
96+
?? throw new HubException("No tenant context resolved.");
97+
98+
// Gate 1: OAuth scope check — require "alerts.readwrite"
99+
var scopes = Context.User?.FindAll("scope")
100+
.SelectMany(c => c.Value.Split(' ', StringSplitOptions.RemoveEmptyEntries))
101+
.ToHashSet() ?? new HashSet<string>();
102+
103+
if (!scopes.Contains("alerts.readwrite"))
104+
throw new HubException("Insufficient permissions: alerts.readwrite scope required.");
105+
106+
// Gate 2: Channel config check — find HA channels for this excursion's rule and verify allow_ack
107+
var services = Context.GetHttpContext()!.RequestServices;
108+
var contextFactory = services.GetRequiredService<IDbContextFactory<NocturneDbContext>>();
109+
110+
await using var db = await contextFactory.CreateDbContextAsync(ct);
111+
db.TenantId = tenantId;
112+
113+
var excursion = await db.AlertExcursions
114+
.AsNoTracking()
115+
.Where(e => e.Id == excursionId && e.TenantId == tenantId)
116+
.Select(e => new { e.AlertRuleId })
117+
.FirstOrDefaultAsync(ct);
118+
119+
if (excursion is null)
120+
throw new HubException("Excursion not found.");
121+
122+
var haChannels = await db.AlertRuleChannels
123+
.AsNoTracking()
124+
.Where(c => c.AlertRuleId == excursion.AlertRuleId
125+
&& c.TenantId == tenantId
126+
&& c.ChannelType == ChannelType.HomeAssistant)
127+
.Select(c => c.Metadata)
128+
.ToListAsync(ct);
129+
130+
var allowAck = haChannels.Any(metadata =>
131+
{
132+
if (string.IsNullOrEmpty(metadata))
133+
return false;
134+
135+
try
136+
{
137+
using var doc = JsonDocument.Parse(metadata);
138+
return doc.RootElement.TryGetProperty("allow_ack", out var prop)
139+
&& prop.ValueKind == JsonValueKind.True;
140+
}
141+
catch (JsonException)
142+
{
143+
return false;
144+
}
145+
});
146+
147+
if (!allowAck)
148+
throw new HubException("Acknowledgement is not permitted for this alert channel.");
149+
150+
// Both gates passed — acknowledge
151+
var ackService = services.GetRequiredService<IAlertAcknowledgementService>();
152+
await ackService.AcknowledgeExcursionAsync(tenantId, excursionId, acknowledgedBy, broadcast: true, ct);
153+
}
154+
155+
private async Task CatchUpFailedDeliveriesAsync(Guid tenantId, string instanceId, CancellationToken ct)
156+
{
157+
var services = Context.GetHttpContext()!.RequestServices;
158+
var contextFactory = services.GetRequiredService<IDbContextFactory<NocturneDbContext>>();
159+
160+
await using var db = await contextFactory.CreateDbContextAsync(ct);
161+
db.TenantId = tenantId;
162+
163+
// Find failed HA deliveries for this instance that belong to open excursions
164+
var failedDeliveries = await db.AlertDeliveries
165+
.Include(d => d.AlertInstance)
166+
.ThenInclude(i => i!.AlertExcursion)
167+
.Include(d => d.AlertRuleChannel)
168+
.Where(d => d.TenantId == tenantId
169+
&& d.ChannelType == ChannelType.HomeAssistant
170+
&& d.Destination == instanceId
171+
&& d.Status == "failed"
172+
&& d.AlertInstance != null
173+
&& d.AlertInstance.AlertExcursion != null
174+
&& d.AlertInstance.AlertExcursion.EndedAt == null)
175+
.ToListAsync(ct);
176+
177+
foreach (var delivery in failedDeliveries)
178+
{
179+
try
180+
{
181+
// Re-dispatch the payload to the caller, including allow_ack from channel metadata
182+
var payload = JsonSerializer.Deserialize<AlertPayload>(delivery.Payload);
183+
if (payload is not null)
184+
{
185+
var allowAck = false;
186+
if (!string.IsNullOrEmpty(delivery.AlertRuleChannel?.Metadata))
187+
{
188+
try
189+
{
190+
using var doc = JsonDocument.Parse(delivery.AlertRuleChannel.Metadata);
191+
allowAck = doc.RootElement.TryGetProperty("allow_ack", out var prop)
192+
&& prop.ValueKind == JsonValueKind.True;
193+
}
194+
catch (JsonException) { }
195+
}
196+
197+
var channelMeta = new { allowAck };
198+
await Clients.Caller.SendCoreAsync("alert_dispatch", new object[] { payload, channelMeta }, ct);
199+
200+
// Mark as delivered
201+
delivery.Status = "delivered";
202+
delivery.DeliveredAt = DateTime.UtcNow;
203+
}
204+
}
205+
catch (JsonException)
206+
{
207+
// Payload is malformed — skip this delivery
208+
}
209+
}
210+
211+
if (failedDeliveries.Count > 0)
212+
{
213+
await db.SaveChangesAsync(ct);
214+
}
215+
}
216+
}

src/API/Nocturne.API/Program.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,7 @@
390390
app.MapHub<AlarmHub>("/hubs/alarms");
391391
app.MapHub<AlertHub>("/hubs/alerts");
392392
app.MapHub<ConfigHub>("/hubs/config");
393+
app.MapHub<HomeAssistantHub>("/hubs/home-assistant");
393394

394395
// Serve OpenAPI specs at /openapi/{documentName}.json
395396
app.MapOpenApi();

src/API/Nocturne.API/Services/Alerts/AlertDeliveryService.cs

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,11 +100,13 @@ public async Task DispatchAsync(
100100

101101
// Hand each persisted row to its provider. Provider failures are caught and recorded
102102
// on the delivery row; one bad webhook does not abort the rest of the batch.
103-
foreach (var delivery in deliveryRows)
103+
for (var i = 0; i < deliveryRows.Count; i++)
104104
{
105+
var delivery = deliveryRows[i];
106+
var channelMetadata = channels[i].Metadata;
105107
try
106108
{
107-
await DispatchToProviderAsync(delivery, payload, ct);
109+
await DispatchToProviderAsync(delivery, payload, channelMetadata, ct);
108110
}
109111
catch (Exception ex)
110112
{
@@ -208,11 +210,13 @@ public async Task TestFireAsync(
208210
}
209211
await db.SaveChangesAsync(ct);
210212

211-
foreach (var delivery in deliveryRows)
213+
for (var i = 0; i < deliveryRows.Count; i++)
212214
{
215+
var delivery = deliveryRows[i];
216+
var channelMetadata = channels[i].Metadata;
213217
try
214218
{
215-
await DispatchToProviderAsync(delivery, payload, ct);
219+
await DispatchToProviderAsync(delivery, payload, channelMetadata, ct);
216220
}
217221
catch (Exception ex)
218222
{
@@ -250,7 +254,7 @@ public async Task TestFireDryRunAsync(
250254
ChannelType = channel.ChannelType,
251255
Destination = channel.Destination,
252256
};
253-
await DispatchToProviderAsync(fauxDelivery, payload, ct);
257+
await DispatchToProviderAsync(fauxDelivery, payload, channel.Metadata, ct);
254258
}
255259
catch (Exception ex)
256260
{
@@ -286,7 +290,7 @@ public async Task MarkFailedAsync(Guid deliveryId, string error, CancellationTok
286290
await db.SaveChangesAsync(ct);
287291
}
288292

289-
private async Task DispatchToProviderAsync(AlertDeliveryEntity delivery, AlertPayload payload, CancellationToken ct)
293+
private async Task DispatchToProviderAsync(AlertDeliveryEntity delivery, AlertPayload payload, string? channelMetadata, CancellationToken ct)
290294
{
291295
switch (delivery.ChannelType)
292296
{
@@ -325,6 +329,38 @@ private async Task DispatchToProviderAsync(AlertDeliveryEntity delivery, AlertPa
325329
}
326330
break;
327331

332+
case ChannelType.HomeAssistant:
333+
var haProvider = serviceProvider.GetService<Providers.HomeAssistantProvider>();
334+
if (haProvider is not null)
335+
{
336+
object? channelMeta = null;
337+
if (!string.IsNullOrEmpty(channelMetadata))
338+
{
339+
try
340+
{
341+
using var doc = System.Text.Json.JsonDocument.Parse(channelMetadata);
342+
var allowAck = doc.RootElement.TryGetProperty("allow_ack", out var prop)
343+
&& prop.ValueKind == System.Text.Json.JsonValueKind.True;
344+
channelMeta = new { allowAck };
345+
}
346+
catch (System.Text.Json.JsonException)
347+
{
348+
channelMeta = new { allowAck = false };
349+
}
350+
}
351+
else
352+
{
353+
channelMeta = new { allowAck = false };
354+
}
355+
356+
var delivered = await haProvider.SendAsync(delivery.TenantId, delivery.Destination, payload, channelMeta, ct);
357+
if (delivered)
358+
await MarkDeliveredAsync(delivery.Id, null, null, ct);
359+
else
360+
await MarkFailedAsync(delivery.Id, "No Home Assistant instance connected", ct);
361+
}
362+
break;
363+
328364
default:
329365
logger.LogWarning("Unsupported channel type '{ChannelType}' for delivery {DeliveryId}",
330366
delivery.ChannelType, delivery.Id);
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
using Microsoft.AspNetCore.SignalR;
2+
using Nocturne.API.Hubs;
3+
using Nocturne.Core.Models;
4+
5+
namespace Nocturne.API.Services.Alerts.Providers;
6+
7+
/// <summary>
8+
/// Delivers alert payloads to a specific Home Assistant instance via the
9+
/// HomeAssistantHub's per-instance SignalR group.
10+
/// </summary>
11+
internal sealed class HomeAssistantProvider(
12+
IHubContext<HomeAssistantHub> hubContext,
13+
ILogger<HomeAssistantProvider> logger)
14+
{
15+
/// <summary>
16+
/// Sends an alert payload to the HA instance identified by <paramref name="destination"/>
17+
/// (the OAuth client ID). Targets the SignalR group "ha:{destination}" within the tenant.
18+
/// Returns <c>true</c> if the instance is connected and the message was sent;
19+
/// <c>false</c> if no connection is registered for the instance.
20+
/// </summary>
21+
public async Task<bool> SendAsync(Guid tenantId, string destination, AlertPayload payload, object? channelMeta, CancellationToken ct)
22+
{
23+
if (!HomeAssistantHub.IsInstanceConnected(tenantId.ToString(), destination))
24+
{
25+
logger.LogDebug("HA instance {Destination} not connected, skipping dispatch", destination);
26+
return false;
27+
}
28+
29+
var group = TenantAwareHub.FormatTenantGroup(tenantId.ToString(), $"ha:{destination}");
30+
31+
try
32+
{
33+
await hubContext.Clients.Group(group)
34+
.SendCoreAsync("alert_dispatch", new object[] { payload, channelMeta ?? new { allowAck = false } }, ct);
35+
36+
logger.LogDebug(
37+
"HA alert dispatched to instance {Destination} for alert instance {InstanceId}",
38+
destination, payload.InstanceId);
39+
40+
return true;
41+
}
42+
catch (Exception ex)
43+
{
44+
logger.LogError(ex,
45+
"Failed to dispatch HA alert to instance {Destination} for alert instance {InstanceId}",
46+
destination, payload.InstanceId);
47+
throw;
48+
}
49+
}
50+
}

0 commit comments

Comments
 (0)