The aggregation service consumes validated votes from RabbitMQ, batches them for efficiency, and updates PostgreSQL with aggregated vote counts.
- Batch Processing: Collects votes in batches (100 votes or 1 second timeout)
- Efficient Database Updates: Uses UPSERT operations with
INSERT ... ON CONFLICT - Retry Logic: Automatic retry with exponential backoff for failed batches
- Prometheus Metrics: Comprehensive metrics for monitoring
- Auto-Reconnect: Automatic reconnection to RabbitMQ on connection loss
- Graceful Shutdown: Processes remaining votes before shutdown
- Connection Pooling: PostgreSQL connection pool for performance
RabbitMQ (votes.aggregation)
↓
Aggregator (batching)
↓
PostgreSQL (vote_results table)
↓
Prometheus Metrics
Primary aggregation table storing vote counts per law:
law_id(PK): Law identifieroui_count: Count of "oui" votesnon_count: Count of "non" votesupdated_at: Last update timestamp
Audit log of individual votes:
vote_hash(UNIQUE): Hash of the votecitizen_id: Voter identifierlaw_id: Law identifierchoice: Vote choice (oui/non)timestamp: Vote timestamp
Tracks duplicate vote attempts:
vote_hash: Reference to original votecitizen_id: Voter who attemptedattempt_timestamp: When attempt occurred
votes_aggregated_total{law_id, choice}: Total votes aggregated (counter)current_vote_totals{law_id, choice}: Current vote counts (gauge)batch_processing_duration_seconds: Batch processing time (gauge)batch_size_processed_total: Total votes in processed batches (counter)aggregation_errors_total{error_type}: Aggregation errors (counter)
Environment variables (see .env.example):
RABBITMQ_HOST: RabbitMQ host (default: localhost)RABBITMQ_PORT: RabbitMQ port (default: 5672)RABBITMQ_USER: Username (default: guest)RABBITMQ_PASSWORD: Password (default: guest)RABBITMQ_QUEUE: Queue name (default: votes.aggregation)RABBITMQ_PREFETCH_COUNT: Prefetch count (default: 100)
POSTGRES_HOST: PostgreSQL host (default: localhost)POSTGRES_PORT: PostgreSQL port (default: 5432)POSTGRES_DB: Database name (default: election_votes)POSTGRES_USER: Username (default: postgres)POSTGRES_PASSWORD: PasswordPOSTGRES_MIN_CONNECTIONS: Min pool size (default: 2)POSTGRES_MAX_CONNECTIONS: Max pool size (default: 10)
BATCH_SIZE: Votes per batch (default: 100)BATCH_TIMEOUT_SECONDS: Batch timeout (default: 1.0)
MAX_RETRY_ATTEMPTS: Max retries (default: 3)RETRY_DELAY_SECONDS: Initial retry delay (default: 1.0)
PROMETHEUS_PORT: Metrics port (default: 8001)LOG_LEVEL: Logging level (default: INFO)
- Install dependencies:
pip install -r requirements.txt- Configure environment:
cp .env.example .env
# Edit .env with your settings- Initialize database:
psql -U postgres -d election_votes -f init_db.sql- Run the service:
python aggregator.py- Build image:
docker build -t vote-aggregator .- Run container:
docker run -d \
--name vote-aggregator \
-e RABBITMQ_HOST=rabbitmq \
-e POSTGRES_HOST=postgres \
-e POSTGRES_PASSWORD=secret \
-p 8001:8001 \
vote-aggregatorversion: '3.8'
services:
aggregator:
build: .
environment:
RABBITMQ_HOST: rabbitmq
POSTGRES_HOST: postgres
POSTGRES_PASSWORD: secret
ports:
- "8001:8001"
depends_on:
- rabbitmq
- postgresThe service expects JSON messages from RabbitMQ:
{
"vote_hash": "abc123...",
"citizen_id": "citizen-123",
"law_id": "law-001",
"choice": "oui",
"timestamp": "2025-11-14T12:00:00Z"
}-- Get vote counts for a specific law
SELECT * FROM vote_results WHERE law_id = 'law-001';
-- Get vote statistics
SELECT * FROM vote_statistics;
-- Find top voted laws
SELECT law_id, total_votes
FROM vote_statistics
ORDER BY total_votes DESC
LIMIT 10;Access Prometheus metrics:
curl http://localhost:8001/metricsThe service uses intelligent batching:
- Size-based: Processes when batch reaches 100 votes
- Time-based: Processes every 1 second if votes are pending
- Shutdown: Processes all remaining votes on graceful shutdown
This ensures low latency while maintaining high throughput.
- Failed batches retry up to 3 times
- Exponential backoff: 1s, 2s, 4s
- After max retries, errors are logged and tracked
- RabbitMQ: Auto-reconnect on connection loss
- PostgreSQL: Connection pool handles reconnection
- Malformed JSON: Rejected without requeue
- Processing errors: Requeued for retry
- Successful processing: Acknowledged in batch
- Connection pooling (PostgreSQL)
- Batch UPSERT operations
- Prefetch count of 100
- Execute batch with page size 100
- Efficient indexing
- Processes ~10,000 votes/second (depending on hardware)
- Batch size tunable for throughput vs latency
- Horizontal scaling supported (multiple instances)
The service handles SIGINT and SIGTERM:
- Stops accepting new messages
- Processes remaining votes in batch
- Closes RabbitMQ connection
- Closes database connections
- Exits cleanly
# Graceful shutdown
kill -TERM <pid>
# Or with Docker
docker stop vote-aggregator- Reduce
BATCH_TIMEOUT_SECONDS - Increase database connection pool size
- Increase
BATCH_SIZE - Scale horizontally (multiple instances)
- Check RabbitMQ/PostgreSQL availability
- Verify credentials and network connectivity
- Reduce
BATCH_SIZE - Reduce
POSTGRES_MAX_CONNECTIONS
pytest tests/black .
flake8 .mypy *.pyMIT License