-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathkeepalive.py
More file actions
146 lines (126 loc) · 3.84 KB
/
Copy pathkeepalive.py
File metadata and controls
146 lines (126 loc) · 3.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!/usr/bin/env python3
"""
Keepalive script - maintains last setpoint during main process restart.
Runs for a short time, sending cached setpoint to prevent Victron standby.
"""
import subprocess
import time
import sys
import os
# How long to run (seconds)
DURATION = int(os.environ.get("KEEPALIVE_DURATION", "30"))
INTERVAL = 1.0
def dbus_get(service, path):
"""Get value from D-Bus"""
try:
result = subprocess.run(
[
"dbus-send",
"--system",
"--print-reply",
"--dest=" + service,
path,
"com.victronenergy.BusItem.GetValue",
],
capture_output=True,
text=True,
timeout=2,
)
if result.returncode == 0:
for line in result.stdout.split("\n"):
if "int32" in line or "double" in line:
return int(line.split()[-1])
return None
except Exception:
return None
def dbus_set(service, path, value):
"""Set value on D-Bus"""
try:
subprocess.run(
[
"dbus-send",
"--system",
"--print-reply",
"--dest=" + service,
path,
"com.victronenergy.BusItem.SetValue",
f"variant:int32:{value}",
],
capture_output=True,
timeout=2,
)
return True
except Exception:
return False
def find_vebus_service():
"""Find vebus service name"""
try:
result = subprocess.run(
[
"dbus-send",
"--system",
"--print-reply",
"--dest=com.victronenergy.dbusmonitor",
"/",
"com.victronenergy.dbusmonitor.GetServices",
],
capture_output=True,
text=True,
timeout=2,
)
for line in result.stdout.split("\n"):
if "com.victronenergy.vebus" in line:
# Extract service name
import re
match = re.search(r"(com\.victronenergy\.vebus\.\w+)", line)
if match:
return match.group(1)
except Exception:
pass
# Fallback to common name
return "com.victronenergy.vebus.ttyUSB2"
def main():
print(f"[Keepalive] Starting, will run for {DURATION}s")
vebus = find_vebus_service()
print(f"[Keepalive] Using vebus: {vebus}")
# Get current setpoint
setpoint = dbus_get(vebus, "/Hub4/L1/AcPowerSetpoint")
if setpoint is None:
setpoint = 0
print("[Keepalive] Could not read setpoint, using 0")
else:
print(f"[Keepalive] Current setpoint: {setpoint}W")
start = time.time()
count = 0
while time.time() - start < DURATION:
# Check if main process is back
try:
result = subprocess.run(
[
"curl",
"-s",
"-o",
"/dev/null",
"-w",
"%{http_code}",
"http://localhost:8080/api/state", # nosec B310 — local API
],
capture_output=True,
text=True,
timeout=2,
)
if result.stdout.strip() == "200":
print("[Keepalive] Main process is back, exiting")
return 0
except Exception:
pass
# Send setpoint
if dbus_set(vebus, "/Hub4/L1/AcPowerSetpoint", setpoint):
count += 1
if count % 5 == 0:
print(f"[Keepalive] Sent setpoint {setpoint}W ({count} times)")
time.sleep(INTERVAL)
print(f"[Keepalive] Timeout after {DURATION}s, exiting")
return 1
if __name__ == "__main__":
sys.exit(main())