-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbreathcare_eco.py
More file actions
156 lines (131 loc) · 5.55 KB
/
Copy pathbreathcare_eco.py
File metadata and controls
156 lines (131 loc) · 5.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#!/usr/bin/env python3
import os
from pathlib import Path
import struct
from datetime import datetime, timedelta
from decimal import Decimal
from dataclasses import dataclass
from matplotlib import pyplot as plt
import matplotlib.dates as mdates
import numpy as np
"""
Read DJMed CPAP files Yuwell BreathCare ECO
"""
DATA = Path("data") / "YH550"
class InvalidCPAPFormat(Exception):
...
@dataclass
class CPAPLogLine:
logged_time: datetime
pressure: Decimal
leakage: Decimal
oai: int
hi: int
cai: int
u6: int
@dataclass
class CPAPFile:
start: datetime
end: datetime
mode: int
ramp_time: int
initial_pressure: Decimal
minimum_pressure: Decimal
maximum_pressure: Decimal
humidity: int
average_leak_volume: Decimal
average_pressure: Decimal
device_serial: str
logs: list[CPAPLogLine]
@classmethod
def from_file(cls, file_name: Path) -> "CPAPFile":
with open(file_name, "rb") as cpap_file:
start_year, start_month, start_day, start_hour, start_minute, start_second = struct.unpack("BBBBBB", cpap_file.read(6))
end_year, end_month, end_day, end_hour, end_minute, end_second = struct.unpack("BBBBBB", cpap_file.read(6))
mode, ramp_up_time, initial_pressure, minimum_pressure, maximum_pressure = struct.unpack("BBBBB", cpap_file.read(5))
unknown2 = cpap_file.read(1)
humidity, = struct.unpack("B", cpap_file.read(1))
unknown3 = cpap_file.read(7)
average_leak_volume, = struct.unpack("B", cpap_file.read(1))
unknown4 = cpap_file.read(1)
average_pressure, = struct.unpack("B", cpap_file.read(1))
unknown5 = cpap_file.read(1)
serial, record_count = struct.unpack("16sh", cpap_file.read(18))
unknown6 = cpap_file.read(2)
unknown7, = struct.unpack("B", cpap_file.read(1)) # Always 0xF9?
if unknown7 != 0xF9:
raise InvalidCPAPFormat("Invalid CPAP format")
log_start = datetime(2000 + start_year, start_month, start_day, start_hour, start_minute, start_second)
log_end = datetime(2000 + end_year, end_month, end_day, end_hour, end_minute, end_second)
log_lines = []
for minute in range(record_count):
pressure, = struct.unpack("B", cpap_file.read(1))
u1, = struct.unpack("B", cpap_file.read(1))
u2, = struct.unpack("B", cpap_file.read(1))
oai, = struct.unpack("B", cpap_file.read(1))
hi, = struct.unpack("B", cpap_file.read(1))
cai, = struct.unpack("B", cpap_file.read(1))
u6, = struct.unpack("B", cpap_file.read(1))
u7, = struct.unpack("B", cpap_file.read(1))
u8, = struct.unpack("B", cpap_file.read(1))
leakage, = struct.unpack("B", cpap_file.read(1))
log_time = log_start + timedelta(minutes=minute)
# Always zero, one is possibly SPo2, another is the pulse rate
# If we find values, we're interested to know
if u6 > 0:
print(f"u6 threshold {u6}, {u1}, {u2}, {u7}, {u8} - {pressure}, {oai}, {hi}, {cai}, {leakage} from {file_name}, {log_time}")
if u1 + u2 + u7 + u8 > 0:
raise InvalidCPAPFormat("Invalid CPAP format, unexpected data")
# Missing AH, HI events (possibly 1/0)
log_lines.append(CPAPLogLine(log_time, pressure / 10, leakage / 10, oai, hi, cai, u6))
return cls(
log_start,
log_end,
mode,
ramp_up_time,
initial_pressure / 10,
minimum_pressure / 10,
maximum_pressure / 10,
humidity,
average_leak_volume / 10,
average_pressure / 10,
serial,
log_lines
)
SHOW_CHARTS = False
def main():
"""
For the time being, this just charts the minute logs in matplotlib
"""
for file in os.scandir(DATA):
log_file = CPAPFile.from_file(file)
if SHOW_CHARTS:
print(log_file.logs)
times = np.array([log.logged_time for log in log_file.logs])
pressures = np.array([log.pressure for log in log_file.logs])
leakages = np.array([log.leakage for log in log_file.logs])
hi = np.array([log.hi for log in log_file.logs])
oai = np.array([log.oai for log in log_file.logs])
cai = np.array([log.cai for log in log_file.logs])
u6 = np.array([log.u6 for log in log_file.logs])
fig, ax = plt.subplots(figsize=(30, 15))
ax.plot(times, pressures)
ax.plot(times, leakages)
ax.plot(times, hi)
ax.plot(times, oai)
ax.plot(times, cai)
ax.plot(times, u6)
locator = mdates.SecondLocator(interval=500)
ax.xaxis.set_major_locator(locator)
ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))
fig.autofmt_xdate()
ax.set_xlabel("Time")
if log_file.mode == 0:
ax.set_ylabel("CPAP Pressure (cmH2O)")
else:
ax.set_ylabel("APAP Pressure (cmH2O)")
ax.set_title(f"FILE: {file.name}: Starting {log_file.start.strftime('%Y-%m-%d')} (Between: {log_file.start.strftime('%H:%M')} and {log_file.end.strftime('%H:%M')}, Duration: {log_file.end - log_file.start})")
plt.grid()
plt.show()
if __name__ == "__main__":
main()