Show the status of the collect stage when starting a processing job#1286
Conversation
✅ Deploy Preview for antenna-preview canceled.
|
✅ Deploy Preview for antenna-ssec canceled.
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughIn ChangesCollect-stage STARTED initialization
Estimated code review effort🎯 2 (Simple) | ⏱️ ~5 minutes Possibly related issues
Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This PR improves job progress visibility during long-running ML “collect” operations and adds a deferred migration to keep job_type_key choices in sync after the “collection” → “capture set” rename.
Changes:
- Persist
collectstageSTARTEDprogress immediately so polling clients see active progress duringcollect_images(). - Add migration
0023_alter_job_job_type_keyto update thejob_type_keychoice label (“Populate captures collection” → “Populate capture set”) and keepmakemigrations --checkclean.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| ami/jobs/models.py | Saves the job immediately after marking the collect stage as STARTED, preventing UI polling from appearing frozen during long collection runs. |
| ami/jobs/migrations/0023_alter_job_job_type_key.py | Alters job_type_key field choices to reflect updated human-readable label for the populate job type. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
f1eb099 to
daa2071
Compare
|
Claude says: Reworked the approach after a review pass (commit |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
ami/jobs/models.py (1)
501-510: 🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
reprocess_all_images=Truenow has no persisted collect STARTED signal.After removing the pre-collect save, the collect-stage STARTED state is only persisted by
filter_processed_images(). But that path is skipped whenreprocess_all_images=True, so collect can remain visibly CREATED until collection ends.Suggested direction
job.progress.update_stage( "collect", status=JobState.STARTED, progress=0, ) + if job.project.feature_flags.reprocess_all_images: + # No filter_processed_images heartbeat in this mode; persist STARTED once. + job.save(update_fields=["progress", "updated_at"])🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@ami/jobs/models.py` around lines 501 - 510, The collect stage STARTED state needs to be persisted even when reprocess_all_images=True, but currently this only happens through filter_processed_images() which gets skipped in that case. When reprocess_all_images=True, the code path bypasses filter_processed_images() entirely, so the job.progress.update_stage() call shown in the diff is the only place that should persist the collect STARTED state. Ensure that the update_stage call with status JobState.STARTED and progress=0 is actually persisted to the database before the collect_images() call, regardless of the reprocess_all_images flag value, so the collect stage shows as STARTED rather than remaining in CREATED state.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@ami/ml/models/pipeline.py`:
- Line 227: The current implementation at line 227 uses
job.save(update_fields=["progress", "updated_at"]) which rewrites the entire
progress JSONB blob, causing data races when concurrent processes update
different progress sub-fields simultaneously. Replace this whole-blob rewrite
approach with database-level atomic updates using Django's update() method and
database functions that can update specific JSONB sub-fields without loading and
rewriting the entire object. This ensures that concurrent writes to different
progress sub-fields are atomic and won't clobber each other.
- Line 266: The assignment of total_images using len(images) eagerly
materializes the entire queryset into memory, which can cause performance issues
with large collections. Replace the len() call with images.count() which is a
queryset method that performs a COUNT query without materializing all records.
This ensures the code only uses len() for concrete list or tuple types while
using count() for queryset-backed inputs.
In `@ami/ml/tests.py`:
- Around line 1110-1192: Add a new test method after
test_filter_processed_images_skips_progress_emission_without_job that validates
the reprocess_all_images=True code path. The test should create a Job, generate
images, call filter_processed_images with reprocess_all_images=True (in addition
to job and total parameters), mock Job.save to capture calls, assert that
Job.save is invoked with the correct update_fields including updated_at, and
verify that the collect stage status is set to JobState.STARTED. This ensures
the collect-stage progress heartbeat is persisted even when the filtering logic
is bypassed, preventing silent regressions in this code path.
---
Outside diff comments:
In `@ami/jobs/models.py`:
- Around line 501-510: The collect stage STARTED state needs to be persisted
even when reprocess_all_images=True, but currently this only happens through
filter_processed_images() which gets skipped in that case. When
reprocess_all_images=True, the code path bypasses filter_processed_images()
entirely, so the job.progress.update_stage() call shown in the diff is the only
place that should persist the collect STARTED state. Ensure that the
update_stage call with status JobState.STARTED and progress=0 is actually
persisted to the database before the collect_images() call, regardless of the
reprocess_all_images flag value, so the collect stage shows as STARTED rather
than remaining in CREATED state.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 335d9ce7-ed56-49be-b63c-558438f699d0
📒 Files selected for processing (3)
ami/jobs/models.pyami/ml/models/pipeline.pyami/ml/tests.py
There was a problem hiding this comment.
Caution
Inline review comments failed to post. This is likely due to GitHub's internal server error or limits when posting large numbers of comments. If you are seeing this consistently it is likely a permissions issue. Please check "Moderation" -> "Code review limits" under your organization settings.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
ami/jobs/models.py (1)
501-510: 🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
reprocess_all_images=Truenow has no persisted collect STARTED signal.After removing the pre-collect save, the collect-stage STARTED state is only persisted by
filter_processed_images(). But that path is skipped whenreprocess_all_images=True, so collect can remain visibly CREATED until collection ends.Suggested direction
job.progress.update_stage( "collect", status=JobState.STARTED, progress=0, ) + if job.project.feature_flags.reprocess_all_images: + # No filter_processed_images heartbeat in this mode; persist STARTED once. + job.save(update_fields=["progress", "updated_at"])🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@ami/jobs/models.py` around lines 501 - 510, The collect stage STARTED state needs to be persisted even when reprocess_all_images=True, but currently this only happens through filter_processed_images() which gets skipped in that case. When reprocess_all_images=True, the code path bypasses filter_processed_images() entirely, so the job.progress.update_stage() call shown in the diff is the only place that should persist the collect STARTED state. Ensure that the update_stage call with status JobState.STARTED and progress=0 is actually persisted to the database before the collect_images() call, regardless of the reprocess_all_images flag value, so the collect stage shows as STARTED rather than remaining in CREATED state.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@ami/ml/models/pipeline.py`:
- Line 227: The current implementation at line 227 uses
job.save(update_fields=["progress", "updated_at"]) which rewrites the entire
progress JSONB blob, causing data races when concurrent processes update
different progress sub-fields simultaneously. Replace this whole-blob rewrite
approach with database-level atomic updates using Django's update() method and
database functions that can update specific JSONB sub-fields without loading and
rewriting the entire object. This ensures that concurrent writes to different
progress sub-fields are atomic and won't clobber each other.
- Line 266: The assignment of total_images using len(images) eagerly
materializes the entire queryset into memory, which can cause performance issues
with large collections. Replace the len() call with images.count() which is a
queryset method that performs a COUNT query without materializing all records.
This ensures the code only uses len() for concrete list or tuple types while
using count() for queryset-backed inputs.
In `@ami/ml/tests.py`:
- Around line 1110-1192: Add a new test method after
test_filter_processed_images_skips_progress_emission_without_job that validates
the reprocess_all_images=True code path. The test should create a Job, generate
images, call filter_processed_images with reprocess_all_images=True (in addition
to job and total parameters), mock Job.save to capture calls, assert that
Job.save is invoked with the correct update_fields including updated_at, and
verify that the collect stage status is set to JobState.STARTED. This ensures
the collect-stage progress heartbeat is persisted even when the filtering logic
is bypassed, preventing silent regressions in this code path.
---
Outside diff comments:
In `@ami/jobs/models.py`:
- Around line 501-510: The collect stage STARTED state needs to be persisted
even when reprocess_all_images=True, but currently this only happens through
filter_processed_images() which gets skipped in that case. When
reprocess_all_images=True, the code path bypasses filter_processed_images()
entirely, so the job.progress.update_stage() call shown in the diff is the only
place that should persist the collect STARTED state. Ensure that the
update_stage call with status JobState.STARTED and progress=0 is actually
persisted to the database before the collect_images() call, regardless of the
reprocess_all_images flag value, so the collect stage shows as STARTED rather
than remaining in CREATED state.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 335d9ce7-ed56-49be-b63c-558438f699d0
📒 Files selected for processing (3)
ami/jobs/models.pyami/ml/models/pipeline.pyami/ml/tests.py
🛑 Comments failed to post (3)
ami/ml/models/pipeline.py (2)
227-227: 🗄️ Data Integrity & Integration | 🟠 Major | 🏗️ Heavy lift
Avoid whole-blob JSONB rewrites for throttled progress heartbeats.
Line 227 still rewrites the full
progressJSON value, so concurrent writes to otherprogresssub-fields can be lost even withupdate_fields=["progress", "updated_at"]. This is a data-race on JSONB content, not just a full-row-write issue.Based on learnings,
job.save(update_fields=["progress", ...])is not concurrency-safe for JSONB sub-field updates and can clobber concurrent progress mutations.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@ami/ml/models/pipeline.py` at line 227, The current implementation at line 227 uses job.save(update_fields=["progress", "updated_at"]) which rewrites the entire progress JSONB blob, causing data races when concurrent processes update different progress sub-fields simultaneously. Replace this whole-blob rewrite approach with database-level atomic updates using Django's update() method and database functions that can update specific JSONB sub-fields without loading and rewriting the entire object. This ensures that concurrent writes to different progress sub-fields are atomic and won't clobber each other.Source: Learnings
266-266: 🚀 Performance & Scalability | 🟠 Major | ⚡ Quick win
len(images)eagerly materializes queryset inputs and can blow up collect latency/memory.Using
len()on queryset-backedimagesforces full evaluation before batching, which undermines large-collection scaling. Useimages.count()for queryset inputs andlen()only for concrete lists/tuples.Suggested patch
- total_images = len(images) + if hasattr(images, "count") and callable(images.count): + total_images = images.count() + else: + total_images = len(images)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.if hasattr(images, "count") and callable(images.count): total_images = images.count() else: total_images = len(images)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@ami/ml/models/pipeline.py` at line 266, The assignment of total_images using len(images) eagerly materializes the entire queryset into memory, which can cause performance issues with large collections. Replace the len() call with images.count() which is a queryset method that performs a COUNT query without materializing all records. This ensures the code only uses len() for concrete list or tuple types while using count() for queryset-backed inputs.ami/ml/tests.py (1)
1110-1192: 📐 Maintainability & Code Quality | 🟠 Major | ⚡ Quick win
Add a collect-progress test for
reprocess_all_images=True.Current coverage validates the throttled heartbeat path only. Please add a case where filtering is bypassed and assert collect STARTED is still persisted during collect, so this mode can’t regress silently.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@ami/ml/tests.py` around lines 1110 - 1192, Add a new test method after test_filter_processed_images_skips_progress_emission_without_job that validates the reprocess_all_images=True code path. The test should create a Job, generate images, call filter_processed_images with reprocess_all_images=True (in addition to job and total parameters), mock Job.save to capture calls, assert that Job.save is invoked with the correct update_fields including updated_at, and verify that the collect stage status is set to JobState.STARTED. This ensures the collect-stage progress heartbeat is persisted even when the filtering logic is bypassed, preventing silent regressions in this code path.
c831e23 to
1582db3
Compare
|
@coderabbitai review |
✅ Action performedReview finished.
|
a94976d to
a97e815
Compare
|
@coderabbitai review |
✅ Action performedReview finished.
|
collect_images() persists the collect stage as STARTED for every path (filtered and reprocess-all) immediately after fetching the job, so the UI shows the stage in progress as soon as collection begins instead of only at SUCCESS. The existing throttled progress emit (#1322) is unchanged and carries the STARTED status forward in-memory on the filtered path; reprocess-all has no per-image loop, so this single mark is its only collect heartbeat. Adds tests pinning STARTED on both the filtered and reprocess-all paths.
a97e815 to
a43f1d4
Compare
Summary
When an ML job starts it transitions to STARTED, but the UI can show no progress with the stages still gray, because the long-running "collect" stage does not report itself as in-progress until collection finishes. On a large collection that call runs for minutes, so the job looks frozen.
This change makes
collect_images()persist the collect stage as STARTED the moment collection begins, for every path. The job already sets the stage to STARTED in memory, but that value was never written to the database before the multi-minute collection ran: the filtered path's throttled progress emit (added in #1322) only updates theprogressfraction, never the status, and the reprocess-all path has no per-image loop at all. So the stage stayed at its default status until collection finished and the job flipped it straight to SUCCESS. Now it shows in-progress as soon as collection begins, on both paths.List of Changes
collect_images()callsupdate_stage("collect", status=STARTED, progress=0)and a narrowsave(update_fields=["progress", "updated_at"])right after fetching the Job, before any branch.progresssave persists the status as a side effect.Scope note: the throttled scan-progress fraction and its constants are unchanged from #1322. This PR adds only the missing STARTED status persist; it does not touch the throttle behavior. The
progressJSONB is written whole (all stages) on each save, which is safe here because collect is stage 1 on a freshly-fetched row; the atomic per-stage write is tracked in #1337.Before:

After:

Test plan
collectstage as in-progress whilecollect_images()runs, on both the filtered and reprocess-all paths (below).filter_processed_images.What has been tested
Unit (
ami/ml/tests.py::TestPipeline): the pre-existing #1322 throttle tests (test_filter_processed_images_emits_throttled_collect_progress,test_filter_processed_images_skips_progress_emission_without_job) are unchanged and still pass, confirming this PR leavesfilter_processed_imagesuntouched. The change itself — a one-line status persist incollect_images()— is verified by the live end-to-end run below rather than a dedicated unit test, which would be too narrow (asserting one stage's status after one function call) to justify the added CI weight.Live end-to-end (deployed stack, real
run_jobvia the Celery worker, external poller reading the Job row every 50ms):CREATED→STARTED(+53ms) →SUCCESS(+207ms). Before this change it stayed at the default status and jumped straight to SUCCESS, so a poller never saw STARTED.reprocess_all_images=True):CREATED→STARTED(+53ms) →SUCCESS(+105ms).Both confirm the stage reports in-progress during collection, on the real dispatch path, for both branches.
Follow-up (not in this PR)
Review whether the throttled scan-progress emit introduced in #1322 (the
time.monotonic()cadence machinery and theCOLLECT_PROGRESS_SAVE_INTERVAL_SECONDS/COLLECT_PROGRESS_MAX_FRACTIONconstants infilter_processed_images) is worth keeping. The emitted fraction is scan progress — the share of input images inspected by the filter — not the count of images that need processing (that count is only known after collection and is reported viatotal_images). It animates a progress bar during collect but conveys little the operator acts on, so it may be removable in a follow-up. With this PR, the STARTED status no longer depends on it.🤖 Generated with Claude Code