Skip to content

Commit 5ea9716

Browse files
Implement allow-newer-head and worker status metadata
Co-authored-by: Andrew <andrewxhill@gmail.com>
1 parent a71418b commit 5ea9716

13 files changed

Lines changed: 236 additions & 35 deletions

File tree

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ and the repo-specific guardrails live in
159159
[AGENTS.md](/Users/devrel/Projects/recallnet/mainline/AGENTS.md).
160160
Machine-readable JSON contracts and their compatibility policy are documented in
161161
[JSON_CONTRACTS.md](/Users/devrel/Projects/recallnet/mainline/docs/JSON_CONTRACTS.md).
162+
That document, not the internal Go structs, is the public automation contract.
162163

163164
## Install
164165

@@ -212,6 +213,7 @@ Submit and land:
212213
```bash
213214
cd /path/to/topic-worktree
214215
mq submit --check-only --json
216+
mq submit --allow-newer-head --wait --timeout 15m --json
215217
mq submit --wait --timeout 15m --json
216218
mq land --json --timeout 30m
217219
```

SPEC.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,9 @@ MVP checks:
298298
- worktree clean
299299
- protected branch worktree clean
300300
- branch not already integrated
301-
- branch HEAD still matches submitted SHA unless `--allow-newer-head` is set
301+
- branch HEAD still matches submitted SHA unless `--allow-newer-head` is set;
302+
when that flag is set, newer queued branch heads are allowed only if the new
303+
tip is still a descendant of the submitted SHA
302304

303305
Phase 2 checks:
304306

docs/FLOWS.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,17 @@ cd /path/to/topic-worktree
9898
mq submit --wait --timeout 10m --json
9999
```
100100

101+
If a factory keeps appending commits to the same queued branch and wants the
102+
newest descendant tip instead of a hard failure on head drift, submit with:
103+
104+
```bash
105+
cd /path/to/topic-worktree
106+
mq submit --allow-newer-head --wait --timeout 10m --json
107+
```
108+
109+
That only permits forward movement. If the queued branch rewinds or moves to a
110+
non-descendant tip, `mq` still fails the submission and asks for a resubmit.
111+
101112
Exit codes:
102113

103114
- `0`: integrated

docs/JSON_CONTRACTS.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ Optional top-level keys:
4040
- `latest_publish`
4141
- `active_submissions`
4242
- `active_publishes`
43+
- `integration_worker`
44+
- `publish_worker`
4345

4446
`protected_upstream` is a `git.BranchStatus` object with:
4547

@@ -57,11 +59,22 @@ upstream ref, not boolean-like drift flags.
5759
`latest_submission` and entries in `active_submissions` extend the durable
5860
submission record with optional blocked-state diagnostics:
5961

62+
- `allow_newer_head`
6063
- `blocked_reason`
6164
- `conflict_files`
6265
- `protected_tip_sha`
6366
- `retry_hint`
6467

68+
`integration_worker` and `publish_worker` mirror the active lock metadata when a
69+
worker is currently holding that lease:
70+
71+
- `domain`
72+
- `repo_root`
73+
- `owner`
74+
- `request_id`
75+
- `pid`
76+
- `created_at`
77+
6578
## `mq events --json`
6679

6780
Returns newline-delimited JSON. Each line is one durable event record with these

internal/app/app_test.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,11 @@ func TestStatusJSONContractContainsStableTopLevelFields(t *testing.T) {
201201
"repository_root",
202202
"state_path",
203203
}
204-
if gotKeys := sortedJSONKeys(payload); !slices.Equal(gotKeys, wantKeys) {
205-
t.Fatalf("expected exact status json keys %v, got %v", wantKeys, gotKeys)
204+
gotKeys := sortedJSONKeys(payload)
205+
for _, key := range wantKeys {
206+
if !slices.Contains(gotKeys, key) {
207+
t.Fatalf("expected status json to contain key %q, got %v", key, gotKeys)
208+
}
206209
}
207210
}
208211

@@ -505,7 +508,7 @@ func TestCLIAcceptsSubcommandFlagsForPlannedCommands(t *testing.T) {
505508
if !strings.Contains(output, "land submit status confidence run-once retry cancel publish") {
506509
t.Fatalf("expected completion script to include land and confidence, got %q", output)
507510
}
508-
if !strings.Contains(output, "--repo --branch --sha --worktree --requested-by --priority --json --check --check-only --wait --timeout --poll-interval") {
511+
if !strings.Contains(output, "--repo --branch --sha --worktree --requested-by --priority --allow-newer-head --json --check --check-only --wait --timeout --poll-interval") {
509512
t.Fatalf("expected submit completion flags, got %q", output)
510513
}
511514
if !strings.Contains(output, "retry cancel publish") {
@@ -555,9 +558,15 @@ func TestCLIAcceptsSubcommandFlagsForPlannedCommands(t *testing.T) {
555558
if !strings.Contains(output, "__fish_seen_subcommand_from land\" -l sha") {
556559
t.Fatalf("expected fish completion to include land sha flag, got %q", output)
557560
}
561+
if !strings.Contains(output, "__fish_seen_subcommand_from land\" -l allow-newer-head") {
562+
t.Fatalf("expected fish completion to include land allow-newer-head flag, got %q", output)
563+
}
558564
if !strings.Contains(output, "__fish_seen_subcommand_from submit\" -l sha") {
559565
t.Fatalf("expected fish completion to include submit sha flag, got %q", output)
560566
}
567+
if !strings.Contains(output, "__fish_seen_subcommand_from submit\" -l allow-newer-head") {
568+
t.Fatalf("expected fish completion to include submit allow-newer-head flag, got %q", output)
569+
}
561570
if !strings.Contains(output, "__fish_seen_subcommand_from submit\" -l check") {
562571
t.Fatalf("expected fish completion to include submit check flag, got %q", output)
563572
}
@@ -810,8 +819,8 @@ func TestStatusUpgradesExistingLegacyStateSchema(t *testing.T) {
810819
if err := db.QueryRow(`PRAGMA user_version;`).Scan(&version); err != nil {
811820
t.Fatalf("read user_version: %v", err)
812821
}
813-
if version != 4 {
814-
t.Fatalf("expected schema version 4 after status upgrade, got %d", version)
822+
if version != 5 {
823+
t.Fatalf("expected schema version 5 after status upgrade, got %d", version)
815824
}
816825
}
817826

internal/app/completion.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,10 @@ _mainline_completions()
8383
8484
case "${words[1]}" in
8585
land)
86-
COMPREPLY=( $(compgen -W "--repo --branch --sha --worktree --requested-by --priority --json --timeout --poll-interval" -- "$cur") )
86+
COMPREPLY=( $(compgen -W "--repo --branch --sha --worktree --requested-by --priority --allow-newer-head --json --timeout --poll-interval" -- "$cur") )
8787
;;
8888
submit)
89-
COMPREPLY=( $(compgen -W "--repo --branch --sha --worktree --requested-by --priority --json --check --check-only --wait --timeout --poll-interval" -- "$cur") )
89+
COMPREPLY=( $(compgen -W "--repo --branch --sha --worktree --requested-by --priority --allow-newer-head --json --check --check-only --wait --timeout --poll-interval" -- "$cur") )
9090
;;
9191
status)
9292
COMPREPLY=( $(compgen -W "--repo --json --events" -- "$cur") )
@@ -187,11 +187,11 @@ _mainline() {
187187
return
188188
;;
189189
land)
190-
_arguments '--repo[source worktree path]:path:_files -/' '--branch[branch to submit]:branch:' '--sha[detached commit to submit]:sha:' '--worktree[source worktree override]:path:_files -/' '--requested-by[submitter identity]:identity:' '--priority[submission priority]:priority:(high normal low)' '--json[json output]' '--timeout[maximum wait time]:duration:' '--poll-interval[wait interval between worker checks]:duration:'
190+
_arguments '--repo[source worktree path]:path:_files -/' '--branch[branch to submit]:branch:' '--sha[detached commit to submit]:sha:' '--worktree[source worktree override]:path:_files -/' '--requested-by[submitter identity]:identity:' '--priority[submission priority]:priority:(high normal low)' '--allow-newer-head[allow the queued branch tip to advance before integration if it stays descended from the submitted sha]' '--json[json output]' '--timeout[maximum wait time]:duration:' '--poll-interval[wait interval between worker checks]:duration:'
191191
return
192192
;;
193193
submit)
194-
_arguments '--repo[repository path]:path:_files -/' '--branch[branch name]:branch:' '--sha[detached commit to submit]:sha:' '--worktree[source worktree]:path:_files -/' '--requested-by[submitter identity]:identity:' '--priority[submission priority]:priority:(high normal low)' '--json[json output]' '--check[validate submission without queueing]' '--check-only[validate submission without queueing]' '--wait[wait for the submission to integrate]' '--timeout[maximum integration wait time]:duration:' '--poll-interval[wait interval between worker checks]:duration:'
194+
_arguments '--repo[repository path]:path:_files -/' '--branch[branch name]:branch:' '--sha[detached commit to submit]:sha:' '--worktree[source worktree]:path:_files -/' '--requested-by[submitter identity]:identity:' '--priority[submission priority]:priority:(high normal low)' '--allow-newer-head[allow the queued branch tip to advance before integration if it stays descended from the submitted sha]' '--json[json output]' '--check[validate submission without queueing]' '--check-only[validate submission without queueing]' '--wait[wait for the submission to integrate]' '--timeout[maximum integration wait time]:duration:' '--poll-interval[wait interval between worker checks]:duration:'
195195
return
196196
;;
197197
status)
@@ -307,6 +307,8 @@ complete -c mainline -n "__fish_seen_subcommand_from land" -l requested-by
307307
complete -c mq -n "__fish_seen_subcommand_from land" -l requested-by
308308
complete -c mainline -n "__fish_seen_subcommand_from land" -l priority
309309
complete -c mq -n "__fish_seen_subcommand_from land" -l priority
310+
complete -c mainline -n "__fish_seen_subcommand_from land" -l allow-newer-head
311+
complete -c mq -n "__fish_seen_subcommand_from land" -l allow-newer-head
310312
complete -c mainline -n "__fish_seen_subcommand_from land" -l json
311313
complete -c mq -n "__fish_seen_subcommand_from land" -l json
312314
complete -c mainline -n "__fish_seen_subcommand_from land" -l timeout
@@ -323,6 +325,8 @@ complete -c mainline -n "__fish_seen_subcommand_from submit" -l requested-by
323325
complete -c mq -n "__fish_seen_subcommand_from submit" -l requested-by
324326
complete -c mainline -n "__fish_seen_subcommand_from submit" -l priority
325327
complete -c mq -n "__fish_seen_subcommand_from submit" -l priority
328+
complete -c mainline -n "__fish_seen_subcommand_from submit" -l allow-newer-head
329+
complete -c mq -n "__fish_seen_subcommand_from submit" -l allow-newer-head
326330
complete -c mainline -n "__fish_seen_subcommand_from submit" -l json
327331
complete -c mq -n "__fish_seen_subcommand_from submit" -l json
328332
complete -c mainline -n "__fish_seen_subcommand_from submit" -l check

internal/app/land.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type landResult struct {
2020
RefKind string `json:"ref_kind,omitempty"`
2121
SourceWorktree string `json:"source_worktree"`
2222
SourceSHA string `json:"source_sha"`
23+
AllowNewerHead bool `json:"allow_newer_head,omitempty"`
2324
Priority string `json:"priority,omitempty"`
2425
RepositoryRoot string `json:"repository_root"`
2526
MainWorktree string `json:"main_worktree"`
@@ -46,6 +47,7 @@ final outcome out.
4647
4748
Examples:
4849
mq land --json --timeout 30m
50+
mq land --allow-newer-head --json --timeout 30m
4951
mq land --repo /path/to/topic-worktree --json
5052
5153
Flags:
@@ -60,13 +62,15 @@ Flags:
6062
var asJSON bool
6163
var timeout time.Duration
6264
var pollInterval time.Duration
65+
var allowNewerHead bool
6366

6467
fs.StringVar(&repoPath, "repo", ".", "source worktree path")
6568
fs.StringVar(&branch, "branch", "", "branch to submit")
6669
fs.StringVar(&sha, "sha", "", "detached commit to submit")
6770
fs.StringVar(&worktreePath, "worktree", "", "source worktree path override")
6871
fs.StringVar(&requestedBy, "requested-by", "", "submitter identity")
6972
fs.StringVar(&priority, "priority", submissionPriorityNormal, "submission priority: high, normal, or low")
73+
fs.BoolVar(&allowNewerHead, "allow-newer-head", false, "allow a queued branch head to advance before integration if it remains a descendant of the submitted sha")
7074
fs.BoolVar(&asJSON, "json", false, "output json")
7175
fs.DurationVar(&timeout, "timeout", 30*time.Minute, "maximum time to wait for integrate+publish")
7276
fs.DurationVar(&pollInterval, "poll-interval", 500*time.Millisecond, "wait interval between worker checks")
@@ -95,6 +99,7 @@ Flags:
9599
worktreePath: worktreePath,
96100
requestedBy: requestedBy,
97101
priority: priority,
102+
allowNewer: allowNewerHead,
98103
})
99104
if err != nil {
100105
return err
@@ -177,6 +182,7 @@ func waitForLandedPublish(queued queuedSubmission, timeout time.Duration, pollIn
177182
RefKind: queued.Submission.RefKind,
178183
SourceWorktree: queued.Submission.SourceWorktree,
179184
SourceSHA: queued.Submission.SourceSHA,
185+
AllowNewerHead: queued.Submission.AllowNewerHead,
180186
Priority: queued.Submission.Priority,
181187
RepositoryRoot: queued.RepoRoot,
182188
MainWorktree: queued.Config.Repo.MainWorktree,

internal/app/run_once.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,16 @@ func processIntegrationSubmission(ctx context.Context, store state.Store, repoRe
222222
return failIntegrationSubmissionWithSync(ctx, store, repoRecord.ID, submission, syncResult, fmt.Errorf("resolve branch head for %q: %w", submission.SourceRef, err))
223223
}
224224
if headSHA != submission.SourceSHA {
225-
return failIntegrationSubmissionWithSync(ctx, store, repoRecord.ID, submission, syncResult, fmt.Errorf("branch %q moved from submitted SHA %s to %s; resubmit the branch", submission.SourceRef, submission.SourceSHA, headSHA))
225+
if !submission.AllowNewerHead {
226+
return failIntegrationSubmissionWithSync(ctx, store, repoRecord.ID, submission, syncResult, fmt.Errorf("branch %q moved from submitted SHA %s to %s; resubmit the branch or submit with --allow-newer-head", submission.SourceRef, submission.SourceSHA, headSHA))
227+
}
228+
descends, descendsErr := sourceEngine.IsAncestor(submission.SourceSHA, headSHA)
229+
if descendsErr != nil {
230+
return failIntegrationSubmissionWithSync(ctx, store, repoRecord.ID, submission, syncResult, descendsErr)
231+
}
232+
if !descends {
233+
return failIntegrationSubmissionWithSync(ctx, store, repoRecord.ID, submission, syncResult, fmt.Errorf("branch %q moved from submitted SHA %s to non-descendant SHA %s; resubmit the branch", submission.SourceRef, submission.SourceSHA, headSHA))
234+
}
226235
}
227236
} else {
228237
headSHA = worktree.HeadSHA

internal/app/run_once_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,38 @@ func TestRunOnceFailsWhenQueuedBranchHeadDriftsAfterSubmit(t *testing.T) {
646646
}
647647
}
648648

649+
func TestRunOnceAllowsQueuedBranchHeadAdvanceWhenSubmittedWithAllowNewerHead(t *testing.T) {
650+
repoRoot, _ := createTestRepo(t)
651+
initRepoForWorker(t, repoRoot)
652+
653+
featurePath := filepath.Join(t.TempDir(), "feature-head-drift-allowed")
654+
runTestCommand(t, repoRoot, "git", "worktree", "add", "-b", "feature/head-drift-allowed", featurePath)
655+
writeFileAndCommit(t, featurePath, "head-drift.txt", "one\n", "head drift one")
656+
submitBranchWithArgs(t, featurePath, "--allow-newer-head")
657+
submittedSHA := trimNewline(runTestCommand(t, featurePath, "git", "rev-parse", "HEAD"))
658+
659+
writeFileAndCommit(t, featurePath, "head-drift.txt", "two\n", "head drift two")
660+
driftedSHA := trimNewline(runTestCommand(t, featurePath, "git", "rev-parse", "HEAD"))
661+
if driftedSHA == submittedSHA {
662+
t.Fatalf("expected branch head to move after second commit")
663+
}
664+
665+
runOnce(t, repoRoot)
666+
667+
protectedAfter := trimNewline(runTestCommand(t, repoRoot, "git", "rev-parse", "HEAD"))
668+
if protectedAfter != driftedSHA {
669+
t.Fatalf("expected protected branch to land newer head %q, got %q", driftedSHA, protectedAfter)
670+
}
671+
672+
status := readStatusJSON(t, repoRoot)
673+
if status.LatestSubmission == nil || status.LatestSubmission.Status != "succeeded" {
674+
t.Fatalf("expected succeeded latest submission, got %+v", status.LatestSubmission)
675+
}
676+
if !status.LatestSubmission.AllowNewerHead {
677+
t.Fatalf("expected allow_newer_head on latest submission, got %+v", status.LatestSubmission)
678+
}
679+
}
680+
649681
func TestRunOnceSyncsExternalProtectedAdvanceBeforeNextQueuedSubmission(t *testing.T) {
650682
repoRoot, remoteDir := createTestRepoWithRemote(t)
651683
initRepoForWorker(t, repoRoot)

internal/app/status.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"flag"
77
"fmt"
88
"io"
9+
"os"
910
"strings"
1011
"time"
1112

@@ -39,6 +40,8 @@ type statusResult struct {
3940
LatestPublish *state.PublishRequest `json:"latest_publish,omitempty"`
4041
ActiveSubmissions []statusSubmission `json:"active_submissions,omitempty"`
4142
ActivePublishes []state.PublishRequest `json:"active_publishes,omitempty"`
43+
IntegrationWorker *state.LeaseMetadata `json:"integration_worker,omitempty"`
44+
PublishWorker *state.LeaseMetadata `json:"publish_worker,omitempty"`
4245
RecentEvents []state.EventRecord `json:"recent_events"`
4346
}
4447

@@ -163,6 +166,13 @@ func collectStatus(repoPath string, limit int) (statusResult, error) {
163166
ActivePublishes: activePublishes(requests),
164167
RecentEvents: events,
165168
}
169+
lockManager := state.NewLockManager(layout.RepositoryRoot, layout.GitDir)
170+
if metadata, ok := readActiveLease(lockManager, state.IntegrationLock); ok {
171+
result.IntegrationWorker = &metadata
172+
}
173+
if metadata, ok := readActiveLease(lockManager, state.PublishLock); ok {
174+
result.PublishWorker = &metadata
175+
}
166176
if len(enrichedSubmissions) > 0 {
167177
latest := enrichedSubmissions[len(enrichedSubmissions)-1]
168178
result.LatestSubmission = &latest
@@ -231,6 +241,22 @@ func renderStatus(stdout io.Writer, result statusResult) error {
231241
} else {
232242
fmt.Fprintln(stdout, "Latest publish: none")
233243
}
244+
if result.IntegrationWorker != nil {
245+
fmt.Fprintf(stdout, "Integration worker: owner=%s request=%d pid=%d started=%s\n",
246+
result.IntegrationWorker.Owner,
247+
result.IntegrationWorker.RequestID,
248+
result.IntegrationWorker.PID,
249+
result.IntegrationWorker.CreatedAt.UTC().Format(time.RFC3339),
250+
)
251+
}
252+
if result.PublishWorker != nil {
253+
fmt.Fprintf(stdout, "Publish worker: owner=%s request=%d pid=%d started=%s\n",
254+
result.PublishWorker.Owner,
255+
result.PublishWorker.RequestID,
256+
result.PublishWorker.PID,
257+
result.PublishWorker.CreatedAt.UTC().Format(time.RFC3339),
258+
)
259+
}
234260
if len(result.ActiveSubmissions) > 0 {
235261
fmt.Fprintln(stdout, "Active submissions:")
236262
for _, submission := range result.ActiveSubmissions {
@@ -260,6 +286,17 @@ func renderStatus(stdout io.Writer, result statusResult) error {
260286
return nil
261287
}
262288

289+
func readActiveLease(lockManager state.LockManager, domain string) (state.LeaseMetadata, bool) {
290+
metadata, err := lockManager.Metadata(domain)
291+
if err != nil {
292+
if os.IsNotExist(err) {
293+
return state.LeaseMetadata{}, false
294+
}
295+
return state.LeaseMetadata{}, false
296+
}
297+
return metadata, true
298+
}
299+
263300
func activeSubmissions(submissions []statusSubmission) []statusSubmission {
264301
var active []statusSubmission
265302
for _, submission := range submissions {

0 commit comments

Comments
 (0)