|
6 | 6 |
|
7 | 7 | from __future__ import annotations |
8 | 8 |
|
| 9 | +import asyncio |
9 | 10 | from unittest.mock import AsyncMock, MagicMock, patch |
10 | 11 |
|
11 | 12 | import pytest |
12 | 13 |
|
13 | 14 | from custom_components.govee.models import ( |
14 | 15 | GoveeDeviceState, |
| 16 | + PowerCommand, |
15 | 17 | RGBColor, |
16 | 18 | SegmentColorCommand, |
17 | 19 | ) |
@@ -181,3 +183,93 @@ async def test_different_segment_counts(self): |
181 | 183 | for segment_count in [1, 4, 8, 16]: |
182 | 184 | entity = _make_grouped_segment_entity(segment_count=segment_count) |
183 | 185 | assert entity._segment_indices == tuple(range(segment_count)) |
| 186 | + |
| 187 | + @pytest.mark.asyncio |
| 188 | + async def test_turn_off_yields_before_flag_check(self): |
| 189 | + """asyncio.sleep(0) is called before checking the power-off flag.""" |
| 190 | + entity = _make_grouped_segment_entity(power_state=True, power_off_pending=False) |
| 191 | + |
| 192 | + call_order: list[str] = [] |
| 193 | + original_sleep = asyncio.sleep |
| 194 | + |
| 195 | + async def tracking_sleep(delay: float, *args: object) -> None: |
| 196 | + if delay == 0: |
| 197 | + call_order.append("sleep_0") |
| 198 | + await original_sleep(delay) |
| 199 | + |
| 200 | + entity.coordinator.is_power_off_pending = MagicMock( |
| 201 | + side_effect=lambda _: (call_order.append("flag_check"), False)[1] |
| 202 | + ) |
| 203 | + |
| 204 | + with patch("asyncio.sleep", side_effect=tracking_sleep): |
| 205 | + await entity.async_turn_off() |
| 206 | + |
| 207 | + assert call_order == ["sleep_0", "flag_check"] |
| 208 | + |
| 209 | + @pytest.mark.asyncio |
| 210 | + async def test_concurrent_turn_off_with_main_entity(self): |
| 211 | + """Concurrent area turn_off: grouped segment defers to main entity's PowerCommand. |
| 212 | +
|
| 213 | + Simulates asyncio.gather(main_turn_off, grouped_segment_turn_off) and verifies |
| 214 | + the grouped segment skips its SegmentColorCommand because the main entity sets |
| 215 | + the power-off flag first. |
| 216 | + """ |
| 217 | + coordinator = MagicMock() |
| 218 | + coordinator.last_update_success = True |
| 219 | + |
| 220 | + state = GoveeDeviceState.create_empty("AA:BB:CC:DD:EE:FF:00:11") |
| 221 | + state.power_state = True |
| 222 | + coordinator.get_state = MagicMock(return_value=state) |
| 223 | + |
| 224 | + # Track commands sent and implement real pending-power-off logic |
| 225 | + pending_power_off: set[str] = set() |
| 226 | + commands_sent: list[object] = [] |
| 227 | + |
| 228 | + # Use an event so the mock API call holds the flag until the |
| 229 | + # segment has had a chance to check it (mirrors real API latency). |
| 230 | + api_done = asyncio.Event() |
| 231 | + |
| 232 | + async def mock_control(device_id: str, command: object) -> bool: |
| 233 | + is_power_off = isinstance(command, PowerCommand) and not command.power_on |
| 234 | + if is_power_off: |
| 235 | + pending_power_off.add(device_id) |
| 236 | + commands_sent.append(command) |
| 237 | + if is_power_off: |
| 238 | + await api_done.wait() |
| 239 | + pending_power_off.discard(device_id) |
| 240 | + return True |
| 241 | + |
| 242 | + coordinator.async_control_device = mock_control |
| 243 | + coordinator.is_power_off_pending = lambda did: did in pending_power_off |
| 244 | + |
| 245 | + # Build grouped segment entity |
| 246 | + with patch.object(GoveeGroupedSegmentEntity, "__init__", lambda self, *a, **kw: None): |
| 247 | + entity = GoveeGroupedSegmentEntity.__new__(GoveeGroupedSegmentEntity) |
| 248 | + entity.coordinator = coordinator |
| 249 | + entity._device_id = "AA:BB:CC:DD:EE:FF:00:11" |
| 250 | + entity._segment_indices = tuple(range(8)) |
| 251 | + entity._is_on = True |
| 252 | + entity._brightness = 255 |
| 253 | + entity._rgb_color = (255, 255, 255) |
| 254 | + entity.async_write_ha_state = MagicMock() |
| 255 | + |
| 256 | + # Simulate main entity turn_off (sends PowerCommand directly) |
| 257 | + async def main_turn_off() -> None: |
| 258 | + await coordinator.async_control_device( |
| 259 | + "AA:BB:CC:DD:EE:FF:00:11", |
| 260 | + PowerCommand(power_on=False), |
| 261 | + ) |
| 262 | + |
| 263 | + async def run_both() -> None: |
| 264 | + await asyncio.gather(main_turn_off(), entity.async_turn_off()) |
| 265 | + |
| 266 | + # Let both coroutines run, then release the API mock |
| 267 | + task = asyncio.create_task(run_both()) |
| 268 | + await asyncio.sleep(0) # let gather start both coroutines |
| 269 | + await asyncio.sleep(0) # let segment's sleep(0) yield |
| 270 | + api_done.set() |
| 271 | + await task |
| 272 | + |
| 273 | + # Only PowerCommand should have been sent, not SegmentColorCommand |
| 274 | + assert len(commands_sent) == 1 |
| 275 | + assert isinstance(commands_sent[0], PowerCommand) |
0 commit comments