-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsubscribe_redis.py
More file actions
35 lines (28 loc) · 846 Bytes
/
Copy pathsubscribe_redis.py
File metadata and controls
35 lines (28 loc) · 846 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import redis
import uuid
r = redis.Redis(host="localhost", port=6379)
exit_condition = False
stream = "temperature"
group = f"group_{str(uuid.uuid4())}"
consumer = "subscriber_uuid"
try:
r.xgroup_create(stream, group, id="0", mkstream=True)
except redis.exceptions.ResponseError as e:
if "BUSYGROUP" not in str(e):
raise
print("Waiting...")
while not exit_condition:
entries = r.xreadgroup(
groupname=group,
consumername=consumer,
streams={stream: '>'},
block=0
)
for _, msgs in entries:
for msg_id, data in msgs:
value = data[b'value'].decode()
print(f"Received value: {value}")
r.xack(stream, group, msg_id)
if value == "exit":
exit_condition = True
break