Skip to content

Commit 341a09b

Browse files
Increase mailbox worker capacity
Make mailbox worker capacity configurable, raise Fly defaults, and quiet normal subscription lifecycle logs.
1 parent 48e6e33 commit 341a09b

5 files changed

Lines changed: 89 additions & 5 deletions

File tree

fly/mainnet.fly.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ primary_region = "iad"
1616
[env]
1717
LNURL_DOMAIN = "noahwallet.io"
1818
SERVER_NETWORK = "bitcoin"
19+
MAILBOX_WORKER_CONCURRENCY_LIMIT = "300"
20+
POSTGRES_MAX_CONNECTIONS = "20"
21+
REDIS_POOL_SIZE = "64"
1922

2023
[http_service]
2124
internal_port = 3000

fly/signet.fly.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ primary_region = 'iad'
1313
strategy = "bluegreen"
1414
wait_timeout = "10m"
1515

16+
[env]
17+
MAILBOX_WORKER_CONCURRENCY_LIMIT = "300"
18+
POSTGRES_MAX_CONNECTIONS = "20"
19+
REDIS_POOL_SIZE = "64"
20+
1621
[http_service]
1722
internal_port = 3000
1823
force_https = true

server/src/bin/mailbox_worker.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ async fn main() -> Result<()> {
2222

2323
let app_state = build_app_state(config).await?;
2424
let transport = Arc::new(Beta8MailboxTransport);
25-
let worker = MailboxWorker::new(app_state, transport, MailboxWorkerConfig::default());
25+
let mailbox_worker_config = MailboxWorkerConfig::from_env();
26+
mailbox_worker_config.log();
27+
let worker = MailboxWorker::new(app_state, transport, mailbox_worker_config);
2628

2729
worker.run().await
2830
}

server/src/mailbox_worker.rs

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ pub struct MailboxWorkerConfig {
4444
impl Default for MailboxWorkerConfig {
4545
fn default() -> Self {
4646
Self {
47-
concurrency_limit: 50,
47+
concurrency_limit: 250,
4848
scan_interval: Duration::from_secs(15),
4949
batch_size: 100,
5050
base_retry_delay: Duration::from_secs(5),
@@ -56,6 +56,77 @@ impl Default for MailboxWorkerConfig {
5656
}
5757
}
5858

59+
impl MailboxWorkerConfig {
60+
pub fn from_env() -> Self {
61+
let default = Self::default();
62+
Self {
63+
concurrency_limit: env_usize(
64+
"MAILBOX_WORKER_CONCURRENCY_LIMIT",
65+
default.concurrency_limit,
66+
),
67+
scan_interval: env_duration_secs(
68+
"MAILBOX_WORKER_SCAN_INTERVAL_SECS",
69+
default.scan_interval,
70+
),
71+
batch_size: env_i64("MAILBOX_WORKER_BATCH_SIZE", default.batch_size),
72+
base_retry_delay: env_duration_secs(
73+
"MAILBOX_WORKER_BASE_RETRY_DELAY_SECS",
74+
default.base_retry_delay,
75+
),
76+
max_retry_delay: env_duration_secs(
77+
"MAILBOX_WORKER_MAX_RETRY_DELAY_SECS",
78+
default.max_retry_delay,
79+
),
80+
claim_ttl: env_duration_secs("MAILBOX_WORKER_CLAIM_TTL_SECS", default.claim_ttl),
81+
claim_renew_interval: env_duration_secs(
82+
"MAILBOX_WORKER_CLAIM_RENEW_INTERVAL_SECS",
83+
default.claim_renew_interval,
84+
),
85+
stream_idle_reconnect: env_duration_secs(
86+
"MAILBOX_WORKER_STREAM_IDLE_RECONNECT_SECS",
87+
default.stream_idle_reconnect,
88+
),
89+
}
90+
}
91+
92+
pub fn log(&self) {
93+
tracing::info!(
94+
service = "mailbox_worker",
95+
concurrency_limit = self.concurrency_limit,
96+
scan_interval_secs = self.scan_interval.as_secs(),
97+
batch_size = self.batch_size,
98+
base_retry_delay_secs = self.base_retry_delay.as_secs(),
99+
max_retry_delay_secs = self.max_retry_delay.as_secs(),
100+
claim_ttl_secs = self.claim_ttl.as_secs(),
101+
claim_renew_interval_secs = self.claim_renew_interval.as_secs(),
102+
stream_idle_reconnect_secs = self.stream_idle_reconnect.as_secs(),
103+
"mailbox worker config loaded"
104+
);
105+
}
106+
}
107+
108+
fn env_usize(key: &str, default: usize) -> usize {
109+
std::env::var(key)
110+
.ok()
111+
.and_then(|value| value.parse().ok())
112+
.unwrap_or(default)
113+
}
114+
115+
fn env_i64(key: &str, default: i64) -> i64 {
116+
std::env::var(key)
117+
.ok()
118+
.and_then(|value| value.parse().ok())
119+
.unwrap_or(default)
120+
}
121+
122+
fn env_duration_secs(key: &str, default: Duration) -> Duration {
123+
std::env::var(key)
124+
.ok()
125+
.and_then(|value| value.parse().ok())
126+
.map(Duration::from_secs)
127+
.unwrap_or(default)
128+
}
129+
59130
#[derive(Debug, Clone)]
60131
pub struct MailboxSessionContext {
61132
pub worker_id: String,
@@ -436,7 +507,7 @@ impl MailboxTransport for Beta8MailboxTransport {
436507
Err(status) => return Ok(map_tonic_status(status)),
437508
};
438509

439-
tracing::info!(
510+
tracing::trace!(
440511
service = "mailbox_worker",
441512
pubkey = %mailbox.pubkey,
442513
checkpoint,
@@ -459,7 +530,7 @@ impl MailboxTransport for Beta8MailboxTransport {
459530
}
460531
}
461532
_ = &mut idle_reconnect => {
462-
tracing::debug!(
533+
tracing::trace!(
463534
service = "mailbox_worker",
464535
pubkey = %mailbox.pubkey,
465536
checkpoint,

server/src/main.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,10 +211,13 @@ async fn start_server(config: Config) -> anyhow::Result<()> {
211211
if run_mailbox_worker {
212212
let mailbox_worker_app_state = app_state.clone();
213213
tokio::spawn(async move {
214+
let mailbox_worker_config = MailboxWorkerConfig::from_env();
215+
mailbox_worker_config.log();
216+
214217
let worker = MailboxWorker::new(
215218
mailbox_worker_app_state,
216219
Arc::new(Beta8MailboxTransport),
217-
MailboxWorkerConfig::default(),
220+
mailbox_worker_config,
218221
);
219222

220223
if let Err(e) = worker.run().await {

0 commit comments

Comments
 (0)