-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker.py
More file actions
121 lines (99 loc) · 3.52 KB
/
Copy pathworker.py
File metadata and controls
121 lines (99 loc) · 3.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
"""
Ingestion worker — now provider-aware.
Change: constructs LLM and embedding model from provider factory.
Everything else (RabbitMQ, idempotency, file handling) unchanged.
Run from project root so relative paths in messages resolve.
Usage:
python worker.py
"""
import logging
import sys
from pathlib import Path
# Run from project root
PROJECT_ROOT = Path(__file__).resolve().parent
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from config import get_settings, get_provider
from app.idempotency import content_hash, is_processed, record_processed
from app.ingestion import ingest_files
from app.logging_config import setup_logging
from app.messaging import consume_ingest_tasks
setup_logging()
logger = logging.getLogger(__name__)
_provider = None
def _get_provider():
global _provider
if _provider is None:
_provider = get_provider(get_settings())
return _provider
def process_one_task(data: dict, channel, method) -> None:
settings = get_settings()
task_id = data.get("task_id", "")
file_path = data.get("file_path", "")
filename = data.get("filename", "")
if not file_path:
channel.basic_nack(method.delivery_tag, requeue=False)
return
path = Path(file_path)
if not path.is_absolute():
path = PROJECT_ROOT / path
if not path.exists():
channel.basic_nack(method.delivery_tag, requeue=False)
return
try:
file_hash = content_hash(path)
except Exception:
channel.basic_nack(method.delivery_tag, requeue=False)
return
if is_processed(settings.processed_hashes_db, file_hash):
logger.info("Skipping duplicate", extra={"task_id": task_id, "hash": file_hash})
path.unlink(missing_ok=True)
channel.basic_ack(method.delivery_tag)
return
logger.info("Processing task", extra={"task_id": task_id, "file": filename})
try:
provider = _get_provider()
llm = provider.get_fast_model()
embedding_model = provider.get_embedding_model()
result = ingest_files(
file_paths=[path],
persist_directory=settings.chroma_persist_dir,
fallback_collection=settings.default_fallback_collection,
llm=llm,
embedding_model=embedding_model,
chunk_size=settings.chunk_size,
chunk_overlap=settings.chunk_overlap,
use_semantic_chunking=settings.use_semantic_chunking,
)
collection = ""
if result.get("routing"):
collection = result["routing"][0].get("collection", "")
record_processed(settings.processed_hashes_db, file_hash, filename, collection)
logger.info(
"Ingestion complete",
extra={
"task_id": task_id,
"collection": collection,
"chunks": result.get("chunks_added", 0),
},
)
except Exception as e:
logger.error(
"Ingestion failed",
extra={"task_id": task_id, "file": filename},
exc_info=True,
)
channel.basic_nack(method.delivery_tag, requeue=False)
return
channel.basic_ack(method.delivery_tag)
path.unlink(missing_ok=True)
def main() -> None:
settings = get_settings()
logger.info("Worker started", extra={"queue": settings.ingestion_queue_name})
consume_ingest_tasks(
process_one_task,
rabbitmq_url=settings.rabbitmq_url,
queue_name=settings.ingestion_queue_name,
)
if __name__ == "__main__":
main()