-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbrel_serial.py
More file actions
102 lines (88 loc) · 3.07 KB
/
Copy pathbrel_serial.py
File metadata and controls
102 lines (88 loc) · 3.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import serial
import time
import threading
from typing import Optional
SERIAL_PORT = '/dev/ttyUSB0'
BAUD_RATE = 38400
# Map MAC addresses to unit codes (from devices.json)
MAC_TO_UNIT = {
"500291f770d50002": (0x42, 0x02), # Rolluik 1: (BlindsT21 unit byte, DDxxxx unit code)
"500291f770d50006": (0x43, 0x03), # Rolluik 2
"500291f770d50005": (0x41, 0x01), # Screen
}
COMMANDS = {
'up': 0x00,
'down': 0x01,
'stop': 0x02,
}
class BrelSerialController:
def __init__(self, port: str = SERIAL_PORT, baud: int = BAUD_RATE, timeout: float = 0.5):
self.port = port
self.baud = baud
self.timeout = timeout
self.ser: Optional[serial.Serial] = None
self._lock = threading.Lock()
self._connect()
def _connect(self) -> None:
if self.ser is not None and self.ser.is_open:
return
self.ser = serial.Serial(self.port, self.baud, timeout=self.timeout)
def _disconnect(self) -> None:
if self.ser is not None:
try:
if self.ser.is_open:
self.ser.close()
finally:
self.ser = None
def _reconnect(self) -> None:
self._disconnect()
time.sleep(0.1)
self._connect()
def _send_packet(self, packet: bytes) -> bytes:
self._connect()
assert self.ser is not None
self.ser.reset_input_buffer()
self.ser.write(packet)
self.ser.flush()
time.sleep(0.1)
return self.ser.read(10)
def _send_packet_with_retry(self, packet: bytes) -> bytes:
with self._lock:
try:
return self._send_packet(packet)
except (serial.SerialException, OSError):
self._reconnect()
return self._send_packet(packet)
def __del__(self):
self._disconnect()
def send_command(self, mac: str, command: str) -> Optional[str]:
unit_tuple = MAC_TO_UNIT.get(mac.lower())
if unit_tuple is None or command not in COMMANDS:
return None
unit_byte, _ = unit_tuple
cmd_byte = COMMANDS[command]
packet = bytes([
0x09, 0x19, 0x15, 0x00,
0x10, 0x20, 0x30,
unit_byte,
cmd_byte,
0x00,
])
response = self._send_packet_with_retry(packet)
return f"Sent {command.upper()} to {mac} (unit {unit_byte:02X}), ACK: {' '.join(f'{b:02X}' for b in response)}"
def set_position(self, mac: str, pos: int) -> Optional[str]:
unit_tuple = MAC_TO_UNIT.get(mac.lower())
if unit_tuple is None or not (0 <= pos <= 100):
return None
_, unit_code = unit_tuple
percent = max(0, min(100, int(pos)))
packet = bytes([
0x0C, 0x31, 0x00, 0x00,
0x10, 0x20, 0x30, 0x40,
unit_code,
0x04,
percent,
0x00, 0x00,
])
response = self._send_packet_with_retry(packet)
return f"Sent POSITION {percent}% to {mac} (unit {unit_code:02X}), ACK: {' '.join(f'{b:02X}' for b in response)}"