Resumable execution for Python. One decorator. Zero retry loops.
You wrote a Python script that loops through 10,000 things — sending welcome emails, downloading files, calling an API for each user in your database, resizing images, scraping URLs. Somewhere around item 6,432 the network blips, a rate-limit kicks in, or someone unplugs your laptop. Everything dies. You have no idea what was done and what wasn't.
The usual fix is a thicket of try/except blocks, manual retry loops, a "last
processed ID" column in some side database, and a --resume-from CLI flag.
safe-state deletes all of that:
from safe_state import safe_state
@safe_state
def send_welcome_emails(users, mailer):
for user in users:
mailer.send(user.email, "Welcome!", render_template(user))
send_welcome_emails(load_users(), open_mailer())
# Crashes at user 6,432? Just run the script again. It skips the first 6,431
# and picks up at 6,432. No code changes needed.Python's built-in pickle can serialize dictionaries, lists, integers, and most
plain objects. It cannot serialize:
- Open network sockets
- Live database connections (
sqlite3,psycopg2,pymongo) - Open file handles
requests.Sessionobjects with active TCP keep-alives- Any object holding a C-level resource
So a naive "just pickle everything" checkpointer crashes the moment your script
holds anything useful. safe-state solves this with a reconnect registry:
when it finds a live object, it serializes a small metadata record describing
how to recreate the object, then rebuilds a fresh one on resume.
Built-in handlers ship for sqlite3.Connection, socket.socket,
requests.Session, and file handles. Custom types are a five-line
register_reconnector() call away.
pip install safe-stateRequires Python 3.9+ and dill (the only runtime dependency; pickle isn't
powerful enough on its own).
Optional extras:
pip install safe-state[redis] # adds Redis-backed shared state@safe_state does three things to the function it wraps:
- Intercepts the first iterable argument. The function still sees a normal
iterable, but
safe-stateis silently tracking which items have completed. - Persists progress after every item (or every N items — configurable) to
a
.safestatefile on disk (or Redis, SQL, or any custom backend) via a durable atomic write. - Captures locals on failure. When an exception escapes the function,
safe-statewalks the traceback, grabs the local variables from the failing frame, freezes them withdillplus the reconnect registry, and writes them to the checkpoint. The exception then re-raises as normal —safe-statenever silently swallows errors.
On the next invocation with the same job name, the checkpoint is loaded, already-completed indices are skipped, and the iteration resumes from where it stopped.
On successful completion, the checkpoint file is deleted.
import requests
from safe_state import safe_state
@safe_state(name="image-scrape", verbose=True)
def download_all(urls, session):
for url in urls:
filename = url.split("/")[-1]
response = session.get(url, timeout=10)
response.raise_for_status()
with open(f"downloads/{filename}", "wb") as f:
f.write(response.content)
if __name__ == "__main__":
urls = open("urls.txt").read().splitlines()
download_all(urls, requests.Session())Run 1 — connection times out on file 234:
[safe_state] starting fresh job 'image-scrape'
[safe_state] 'image-scrape' failed at item 233:
ConnectionError: HTTPSConnectionPool... Read timed out.
Progress 233/500 saved to .safe_state/image-scrape.safestate
Traceback (most recent call last): ...
Run 2 — same command, no flags, no edits:
[safe_state] resuming 'image-scrape': 233/500 done (run #2)
[safe_state] skip index 0 (done)
...
[safe_state] skip index 232 (done)
# resumes at item 233, completes through 499
[✓] Job complete. Checkpoint cleared.
A checkpoint records "iteration 14 done", but if a crash happened mid-iteration the side effect (file written, email sent, row inserted) might be only partially complete. On resume, you'd re-execute it — a duplicate.
Two helpers protect against this:
from safe_state import safe_state, skip_if_exists, idempotent
# Pattern 1: one-line check inside the loop, for filesystem artifacts.
@safe_state
def download_all(urls, dest_dir):
for url in urls:
target = dest_dir / url.split("/")[-1]
if skip_if_exists(target):
continue
download(url, target)
# Pattern 2: declarative decorator on the side-effect function.
@idempotent(check=lambda url, dest: dest.exists() and dest)
def download(url, dest):
response = requests.get(url)
dest.write_bytes(response.content)For side effects with no natural artifact (an API call, a webhook), use
IdempotencyCache to track which keys were processed:
from safe_state import safe_state, IdempotencyCache
cache = IdempotencyCache()
@safe_state
def send_messages(messages, client):
for msg in messages:
if cache.seen(msg.id):
continue
client.send(msg)
cache.mark(msg.id)When an iteration does multiple writes that must succeed or fail together,
wrap them in transaction(). On exception, every connection rolls back, the
iteration is not marked complete, and the next run retries it cleanly.
from safe_state import safe_state, transaction
@safe_state
def process(items, db):
for item in items:
with transaction(db):
db.execute("INSERT INTO ledger ...", item)
db.execute("UPDATE balances ...", item)
external_api_call(item) # if this raises, both DB writes roll backWorks with any PEP 249 DB-API connection (sqlite3, psycopg, psycopg2,
mysqlclient) and with SQLAlchemy connections. Pass multiple connections to
commit or roll back together:
with transaction(primary_db, replica_db, cache_conn):
...Note: this is not a distributed 2PC. Each connection commits independently. If commit succeeds on one but fails on another, the first is already committed. For true cross-system atomicity, use a saga pattern.
The default FileBackend writes one .safestate file per job to disk. For
containers, distributed workers, or environments without persistent local
storage, swap it for one of these:
from safe_state import safe_state, FileBackend, MemoryBackend, RedisBackend, SQLBackend
# Local files (default).
@safe_state(state_dir=".safe_state")
def f(items): ...
# In-memory — useful for tests or short-lived workers.
@safe_state(backend=MemoryBackend())
def f(items): ...
# Redis — shared state across containers and workers.
# Requires: pip install safe-state[redis]
@safe_state(backend=RedisBackend.from_url("redis://localhost:6379/0"))
def f(items): ...
# Any SQL database via a DB-API connection.
import psycopg
conn = psycopg.connect("postgresql://user:pass@host/db")
@safe_state(backend=SQLBackend(conn, dialect="postgres"))
def f(items): ...
import sqlite3
conn = sqlite3.connect("checkpoints.db")
@safe_state(backend=SQLBackend(conn)) # sqlite is the default dialect
def f(items): ...Custom backends are a four-method subclass — load, save, delete,
exists. See safe_state/backends.py for the reference implementations.
By default, if an item raises, the loop stops and you rerun the script to retry. That's fine for transient failures (rate limits, network blips) but useless for permanent failures from bad data — you'd be stuck rerunning forever. v0.3.0 adds two ways to keep going:
from safe_state import safe_state
# Try each item up to 3 times in a single run; after that, dead-letter it
# and continue with the next item. The job completes even with broken inputs.
@safe_state(max_attempts=3)
def process(items):
for item in items:
do_work(item)
# Fine-grained per-error policy.
def policy(item, exc, attempt):
if isinstance(exc, PermissionError):
return "dead_letter" # never going to work, give up immediately
if attempt >= 5:
return "dead_letter"
return "retry" # transient — try again
@safe_state(on_error=policy)
def process(items): ...The four strategies the on_error callback can return:
| Strategy | Meaning |
|---|---|
"retry" |
Re-raise the exception. With max_attempts, retries in-run; with on_error alone, user reruns. Default. |
"skip" |
Mark the item complete, continue with the next. As if it succeeded. |
"dead_letter" |
Record the failure in checkpoint.dead_letters, continue with the next item. The item is never retried on later runs. |
"fail" |
Record in dead-letters AND re-raise. Stops the job. |
Dead-lettered items are inspectable after the job:
cp = process.peek_checkpoint()
for dl in cp.dead_letters:
print(dl["index"], dl["exception_type"], dl["message"], dl["attempts"])Every resume now prints a one-line summary:
[safe_state] Resumed 'newsletter-blast': skipped 4,521/10,000 previously completed items. Estimated time saved (based on 4521 measurements): ~2h 14m.
The estimate uses the median of actual per-item durations recorded during
the original run — robust to outliers, never fabricated. With fewer than 3
measurements, only the skipped count is shown; no time figure is invented.
Set SAFE_STATE_QUIET=1 to suppress.
@safe_state(progress_bar=True)
def process(items):
for item in items:
do_work(item)On resume, the tqdm bar visibly jumps to the skip point and continues from
there — instant proof to the user that their previous progress is preserved.
Requires pip install safe-state[progress].
Use a custom backend (rich, alive_progress, anything) by passing a callable:
@safe_state(progress_bar=lambda iterable, initial, total, desc: my_bar(iterable, ...))
def process(items): ...After a crash, inspect the checkpoint file from the command line without writing any code:
safe-state inspect state/myjob.safestate
safe-state inspect state/myjob.safestate --traceback
safe-state inspect state/myjob.safestate --localsOutput includes job progress, last failure (type, message, optionally traceback), dead-letter queue contents, frozen frame locals (opt-in), and median per-item timing. Read-only — never modifies the file or attempts to resume.
Anything that loops through a batch of work benefits from this:
# Bulk database backfill
@safe_state(name="backfill-2026")
def backfill(user_ids, conn):
for uid in user_ids:
new_value = expensive_computation(uid)
conn.execute("UPDATE users SET score = ? WHERE id = ?", (new_value, uid))
conn.commit()
# Processing a giant CSV
@safe_state(name="csv-cleanup")
def clean_rows(rows, output_writer):
for row in rows:
cleaned = normalize(row)
output_writer.writerow(cleaned)
# Calling an API for every record
@safe_state(name="enrich-leads", save_every=10)
def enrich(leads, api_client):
for lead in leads:
data = api_client.lookup(lead.email)
lead.enriched_data = data
lead.save()
# Resizing thousands of images
@safe_state(name="thumbnails")
def make_thumbs(image_paths):
for path in image_paths:
img = Image.open(path)
img.thumbnail((256, 256))
img.save(path.replace(".jpg", "_thumb.jpg"))In every case, if the script crashes partway, you just rerun it. No retry logic, no progress columns, no resume flags.
@safe_state(
name=None, # job identifier; defaults to {module}.{qualname}.{path_hash}
state_dir=".safe_state", # checkpoint directory (when using FileBackend)
backend=None, # custom StateBackend (Redis, SQL, Memory, ...)
iterable_arg=0, # which arg is the iterable (int index or kwarg name)
save_every=1, # persist every N completed items
store_results=False, # also store each item's value (must be serializable)
keep_on_success=False, # keep checkpoint after successful completion
verbose=False, # print progress to stderr
auto_iterate=True, # set False for manual checkpoint() mode
)Every decorated function exposes:
my_job.peek_checkpoint() # -> Checkpoint object, or None
my_job.clear_checkpoint() # -> deletes the checkpoint
my_job.checkpoint_path # -> Path to the .safestate file (None for non-file backends)
my_job.job_name # -> the resolved job nameA Checkpoint holds completed_indices, total_items, last_failure,
frozen_state (dill-serialized locals), run_count, and a progress() method.
Built-in handlers cover sqlite3.Connection, socket.socket,
requests.Session, and io.IOBase. To add your own:
from safe_state import register_reconnector
register_reconnector(
MyApiClient,
extract=lambda c: {"host": c.host, "token": c.token},
reconnect=lambda meta: MyApiClient(meta["host"], meta["token"]),
)For non-iteration shaped work, set auto_iterate=False and call checkpoint()
manually:
from safe_state import safe_state, checkpoint
@safe_state(auto_iterate=False)
def big_job(graph):
visited = set()
for node in graph.walk():
process(node)
visited.add(node.id)
checkpoint(visited=visited)- Not a distributed task queue. For multi-machine job dispatch use Celery,
Dramatiq, or RQ.
safe-statesolves the much smaller problem of "this one process crashed; let me rerun the same script and resume." - Not a transaction manager.
transaction()wraps a single DB connection's commit/rollback; it's not a 2PC coordinator. For cross-system atomicity, use a saga pattern. - Not magic. It doesn't freeze CPython frames mid-instruction. The iteration boundary is the resume granularity. If a single item's work is itself a long pipeline, decompose it into smaller items.
- Not for generator functions. The decorated function must process items
inside a regular
forloop and return normally. If your function itself usesyield(i.e. is a generator function), calling it just constructs a generator object — the body never runs, and safe-state never sees any iterations complete. Use a normal function with aforloop instead.
safe-state overlaps with several existing tools, but occupies a distinct
point in the design space. Honest comparison:
tenacity / backoff / retrying — these wrap a single function call
with retry logic. They don't know about iteration, so if your script crashes
inside item 6,432 of 10,000, they can't pick up from there on the next run.
safe-state is about whole-loop resumption, not single-call retries. You
can absolutely use both together: tenacity for transient per-item retries,
safe-state for "the whole script died, let me resume."
joblib.Memory / diskcache — function-result memoization. Great for
caching expensive pure computations. Doesn't help with side effects (sending
emails, writing files, calling APIs) where you specifically want the work to
happen, just not twice.
celery / dramatiq / rq / huey — distributed task queues. Run
workers across machines, persist task state in a broker, retry failed tasks.
The right tool when you need horizontal scaling, isolation between tasks, or
async job dispatch. Way too heavy if you just have a single script that
processes a list. safe-state is the answer for "this one Python script
needs to survive crashes" without standing up Redis, RabbitMQ, and a worker
fleet. (Though you can use safe-state's RedisBackend to share checkpoint
state between processes if you want a lighter-weight middle ground.)
prefect / dagster / airflow — workflow orchestrators. Resume,
retry, and observability for multi-step DAGs. Same story as Celery: massively
more capability, massively more setup. Use them when the workflow is the
product. Use safe-state when the workflow is just a for loop.
pickle / dill / cloudpickle — serialization libraries. safe-state
uses dill internally and adds the reconnect registry on top so live
objects (sockets, DB connections) survive freeze-thaw. Pure pickle has no
answer for live connections.
In short, safe-state is the smallest useful tool for the one specific
problem of "this batch script crashed, I want to rerun it and have it skip
what it already did." If that's not your problem, one of the tools above is
probably a better fit.
MIT. See LICENSE.
Issues and pull requests welcome. Run the test suite with:
pip install -e ".[dev]"
pytestTest count by feature area:
- Core resume + serialization: 36 tests
- Storage backends (File, Memory, SQL): 11 tests
- Idempotency utilities: 10 tests
- Transaction manager: 9 tests
- Total: 66 tests