Skip to content

Commit 289c11c

Browse files
Supersede stale blocked submissions by reachability
Co-authored-by: Andrew <andrewxhill@gmail.com>
1 parent 6d57f91 commit 289c11c

7 files changed

Lines changed: 177 additions & 0 deletions

File tree

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,8 @@ Recent hardening coverage now explicitly exercises the adoption-critical paths:
244244

245245
- concurrent multi-worktree submit, integrate, and publish flows
246246
- deleted, moved, and dirtied source worktrees after submit
247+
- stale queued or blocked submissions whose source SHA is already reachable from
248+
protected `main`, including after the original worktree was dropped
247249
- queued branch head drift after submit
248250
- external protected-branch advancement while queued work waits
249251
- inherited `pre-push` hook success and failure-plus-retry publish paths

SPEC.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,9 @@ These are current design constraints, not hidden bugs:
168168
rebase and push
169169
- factories should currently integrate through the CLI/daemon contract, not by
170170
importing `internal/`
171+
- queued or blocked submissions whose `source_sha` is already reachable from
172+
protected `main` are obsolete and may be auto-superseded even if their
173+
original source worktree has been deleted
171174

172175
## Forward Product Direction
173176

docs/FLOWS.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,11 @@ without human branch surgery.
281281
`mq cancel --blocked` bulk-cancels blocked submissions when they are obsolete
282282
and you want to clear them out of the active queue surface.
283283

284+
`mq run-once` and `mq doctor --fix` also auto-supersede queued or blocked
285+
submissions whose `source_sha` is already reachable from protected `main`, even
286+
if the original source worktree was deleted after that commit effectively
287+
landed.
288+
284289
`mq status --json` now projects publish correlation back onto succeeded
285290
submissions through `publish_request_id`, `publish_status`, and `outcome`, so a
286291
factory can answer “did this submission fully land?” from one status surface.

internal/app/repo.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1205,6 +1205,11 @@ func runDoctorFix(ctx context.Context, engine git.Engine, cfg policy.File, lockM
12051205
return nil, nil, err
12061206
}
12071207
}
1208+
if superseded, err := supersedeProtectedReachableSubmissions(ctx, store, repoRecord.ID, engine, cfg.Repo.ProtectedBranch); err != nil {
1209+
return nil, nil, err
1210+
} else if superseded > 0 {
1211+
applied = append(applied, fmt.Sprintf("superseded %d obsolete submission(s) already reachable from %s", superseded, cfg.Repo.ProtectedBranch))
1212+
}
12081213

12091214
if cfg.Repo.MainWorktree != "" && engine.BranchExists(cfg.Repo.ProtectedBranch) {
12101215
if _, err := os.Stat(cfg.Repo.MainWorktree); err == nil {

internal/app/repo_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -920,6 +920,61 @@ func TestDoctorJSONListsConcreteUnfinishedQueueItems(t *testing.T) {
920920
}
921921
}
922922

923+
func TestDoctorFixSupersedesBlockedSubmissionAlreadyReachableFromProtectedBranch(t *testing.T) {
924+
repoRoot, _ := createTestRepoWithRemote(t)
925+
initRepoForWorker(t, repoRoot)
926+
927+
featurePath := filepath.Join(t.TempDir(), "feature-doctor-reachable")
928+
runTestCommand(t, repoRoot, "git", "worktree", "add", "-b", "feature/doctor-reachable", featurePath)
929+
writeFileAndCommit(t, featurePath, "doctor.txt", "doctor\n", "doctor reachable feature")
930+
sourceSHA := trimNewline(runTestCommand(t, featurePath, "git", "rev-parse", "HEAD"))
931+
runTestCommand(t, repoRoot, "git", "merge", "--ff-only", "feature/doctor-reachable")
932+
933+
layout, err := git.DiscoverRepositoryLayout(repoRoot)
934+
if err != nil {
935+
t.Fatalf("DiscoverRepositoryLayout: %v", err)
936+
}
937+
store := state.NewStore(state.DefaultPath(layout.GitDir))
938+
repoRecord, err := store.GetRepositoryByPath(context.Background(), layout.RepositoryRoot)
939+
if err != nil {
940+
t.Fatalf("GetRepositoryByPath: %v", err)
941+
}
942+
submission, err := store.CreateIntegrationSubmission(context.Background(), state.IntegrationSubmission{
943+
RepoID: repoRecord.ID,
944+
BranchName: "feature/doctor-reachable",
945+
SourceRef: "feature/doctor-reachable",
946+
RefKind: submissionRefKindBranch,
947+
SourceWorktree: featurePath,
948+
SourceSHA: sourceSHA,
949+
Status: domain.SubmissionStatusBlocked,
950+
LastError: "waiting on dropped worktree",
951+
})
952+
if err != nil {
953+
t.Fatalf("CreateIntegrationSubmission: %v", err)
954+
}
955+
956+
if err := os.RemoveAll(featurePath); err != nil {
957+
t.Fatalf("RemoveAll(featurePath): %v", err)
958+
}
959+
960+
var out bytes.Buffer
961+
var errOut bytes.Buffer
962+
if err := runDoctor([]string{"--repo", repoRoot, "--fix", "--json"}, newStepPrinter(&out), &errOut); err != nil {
963+
t.Fatalf("runDoctor --fix returned error: %v", err)
964+
}
965+
966+
updated, err := store.GetIntegrationSubmission(context.Background(), submission.ID)
967+
if err != nil {
968+
t.Fatalf("GetIntegrationSubmission: %v", err)
969+
}
970+
if updated.Status != domain.SubmissionStatusSuperseded {
971+
t.Fatalf("expected superseded blocked submission, got %+v", updated)
972+
}
973+
if !strings.Contains(updated.LastError, "already reachable from protected branch") {
974+
t.Fatalf("expected reachability supersede reason, got %+v", updated)
975+
}
976+
}
977+
923978
func TestStatusJSONWorksWhenCanonicalRootIsDirty(t *testing.T) {
924979
repoRoot, _ := createTestRepoWithRemote(t)
925980
initRepoForWorker(t, repoRoot)

internal/app/run_once.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,11 @@ func runOneCycle(repoPath string) (string, error) {
110110
lease.Release()
111111
return "", err
112112
}
113+
recoveredObsolete, err := supersedeProtectedReachableSubmissions(ctx, store, repoRecord.ID, git.NewEngine(mainLayout.WorktreeRoot), cfg.Repo.ProtectedBranch)
114+
if err != nil {
115+
lease.Release()
116+
return "", err
117+
}
113118

114119
submission, err := store.NextQueuedIntegrationSubmission(ctx, repoRecord.ID)
115120
if err != nil {
@@ -159,6 +164,9 @@ func runOneCycle(repoPath string) (string, error) {
159164
if err != nil {
160165
return "", err
161166
}
167+
if result == "No queued publish requests." && recoveredObsolete > 0 {
168+
return fmt.Sprintf("Superseded %d obsolete submission(s) already reachable from %s", recoveredObsolete, cfg.Repo.ProtectedBranch), nil
169+
}
162170
return result, nil
163171
}
164172

@@ -791,6 +799,46 @@ func supersedeObsoleteSubmissions(ctx context.Context, store state.Store, repoID
791799
return nil
792800
}
793801

802+
func supersedeProtectedReachableSubmissions(ctx context.Context, store state.Store, repoID int64, engine git.Engine, protectedBranch string) (int, error) {
803+
protectedSHA, err := engine.BranchHeadSHA(protectedBranch)
804+
if err != nil {
805+
return 0, err
806+
}
807+
submissions, err := store.ListIntegrationSubmissions(ctx, repoID)
808+
if err != nil {
809+
return 0, err
810+
}
811+
superseded := 0
812+
for _, submission := range submissions {
813+
if submission.Status != domain.SubmissionStatusQueued && submission.Status != domain.SubmissionStatusBlocked {
814+
continue
815+
}
816+
if submission.SourceSHA == "" {
817+
continue
818+
}
819+
reachable, err := engine.IsAncestor(submission.SourceSHA, protectedBranch)
820+
if err != nil || !reachable {
821+
continue
822+
}
823+
reason := fmt.Sprintf("superseded because source sha %s is already reachable from protected branch %q at %s", submission.SourceSHA, protectedBranch, protectedSHA)
824+
if _, err := store.UpdateIntegrationSubmissionStatus(ctx, submission.ID, domain.SubmissionStatusSuperseded, reason); err != nil {
825+
return superseded, err
826+
}
827+
if err := appendSubmissionEvent(ctx, store, repoID, submission.ID, domain.EventType("submission.superseded"), map[string]string{
828+
"branch": submissionDisplayRef(submission),
829+
"source_ref": submission.SourceRef,
830+
"ref_kind": string(submission.RefKind),
831+
"source_sha": submission.SourceSHA,
832+
"superseded_protected": protectedSHA,
833+
"reason": reason,
834+
}); err != nil {
835+
return superseded, err
836+
}
837+
superseded++
838+
}
839+
return superseded, nil
840+
}
841+
794842
func recoverInterruptedPublishRequests(ctx context.Context, store state.Store, repoID int64) (int, error) {
795843
running, err := store.ListPublishRequestsByStatus(ctx, repoID, "running")
796844
if err != nil {

internal/app/run_once_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"strings"
1212
"testing"
1313

14+
"github.com/recallnet/mainline/internal/domain"
1415
"github.com/recallnet/mainline/internal/git"
1516
"github.com/recallnet/mainline/internal/policy"
1617
"github.com/recallnet/mainline/internal/state"
@@ -750,6 +751,64 @@ func TestRunOnceFailsWhenSubmittedSourceWorktreeTurnsDirtyAfterSubmit(t *testing
750751
}
751752
}
752753

754+
func TestRunOnceSupersedesBlockedSubmissionAlreadyReachableAfterWorktreeDrop(t *testing.T) {
755+
repoRoot, _ := createTestRepo(t)
756+
initRepoForWorker(t, repoRoot)
757+
758+
featurePath := filepath.Join(t.TempDir(), "feature-reachable-dropped")
759+
runTestCommand(t, repoRoot, "git", "worktree", "add", "-b", "feature/reachable-dropped", featurePath)
760+
writeFileAndCommit(t, featurePath, "reachable.txt", "reachable\n", "reachable feature")
761+
sourceSHA := trimNewline(runTestCommand(t, featurePath, "git", "rev-parse", "HEAD"))
762+
runTestCommand(t, repoRoot, "git", "merge", "--ff-only", "feature/reachable-dropped")
763+
764+
layout, err := git.DiscoverRepositoryLayout(repoRoot)
765+
if err != nil {
766+
t.Fatalf("DiscoverRepositoryLayout: %v", err)
767+
}
768+
store := state.NewStore(state.DefaultPath(layout.GitDir))
769+
repoRecord, err := store.GetRepositoryByPath(context.Background(), layout.RepositoryRoot)
770+
if err != nil {
771+
t.Fatalf("GetRepositoryByPath: %v", err)
772+
}
773+
submission, err := store.CreateIntegrationSubmission(context.Background(), state.IntegrationSubmission{
774+
RepoID: repoRecord.ID,
775+
BranchName: "feature/reachable-dropped",
776+
SourceRef: "feature/reachable-dropped",
777+
RefKind: submissionRefKindBranch,
778+
SourceWorktree: featurePath,
779+
SourceSHA: sourceSHA,
780+
Status: domain.SubmissionStatusBlocked,
781+
LastError: "rebase conflict",
782+
})
783+
if err != nil {
784+
t.Fatalf("CreateIntegrationSubmission: %v", err)
785+
}
786+
787+
if err := os.RemoveAll(featurePath); err != nil {
788+
t.Fatalf("RemoveAll(featurePath): %v", err)
789+
}
790+
791+
var runOut bytes.Buffer
792+
var runErr bytes.Buffer
793+
if err := runRunOnce([]string{"--repo", repoRoot}, newStepPrinter(&runOut), &runErr); err != nil {
794+
t.Fatalf("runRunOnce returned error: %v", err)
795+
}
796+
if !strings.Contains(runOut.String(), "Superseded 1 obsolete submission") {
797+
t.Fatalf("expected supersede summary, got %q", runOut.String())
798+
}
799+
800+
updated, err := store.GetIntegrationSubmission(context.Background(), submission.ID)
801+
if err != nil {
802+
t.Fatalf("GetIntegrationSubmission: %v", err)
803+
}
804+
if updated.Status != domain.SubmissionStatusSuperseded {
805+
t.Fatalf("expected superseded blocked submission, got %+v", updated)
806+
}
807+
if !strings.Contains(updated.LastError, "already reachable from protected branch") {
808+
t.Fatalf("expected reachability supersede reason, got %+v", updated)
809+
}
810+
}
811+
753812
func TestRunOnceFailsWhenQueuedBranchHeadDriftsAfterSubmit(t *testing.T) {
754813
repoRoot, _ := createTestRepo(t)
755814
initRepoForWorker(t, repoRoot)

0 commit comments

Comments
 (0)