Skip to content

Commit 22191a0

Browse files
committed
Fix squash query to handle concurrent updates
Widen the validity lookup from `= _now` to `>= _now` so that the squash logic finds history rows written by a concurrent transaction whose `lower(validity)` may be slightly ahead of the current transaction's start time. Also, added `upper_inf(validity)` filter for find only open-ended ranges. We can have only one open range by system design. After the new `FOR UPDATE` lock serializes concurrent updates, the second transaction's `_now` is set before it acquires the lock and can be earlier than the `lower(validity)` written by the first transaction. The strict equality missed that row, falling through to the `ELSE` branch and triggering a `PG::ExclusionViolation` or `RangeError` on overlapping validity ranges.
1 parent d169fc6 commit 22191a0

6 files changed

Lines changed: 302 additions & 2 deletions

File tree

README.sql

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,12 @@ create or replace function chronomodel_countries_update() returns trigger as $$
111111
return new;
112112
end if;
113113

114+
perform 1 from only temporal.countries where id = old.id for update;
115+
114116
_now := timezone('UTC', now());
115117
_hid := null;
116118

117-
select hid into _hid from history.countries where id = old.id and lower(validity) = _now;
119+
select hid into _hid from history.countries where id = old.id and upper_inf(validity) and lower(validity) >= _now limit 1;
118120

119121
if _hid is not null then
120122
update history.countries set ( name, updated_at ) = ( new.name, new.updated_at ) where hid = _hid;

lib/chrono_model/adapter/ddl.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,10 +154,12 @@ def chrono_create_UPDATE_trigger(table, pk, current, history, fields, values, op
154154
RETURN NEW;
155155
END IF;
156156
157+
PERFORM 1 FROM ONLY #{current} WHERE #{pk} = OLD.#{pk} FOR UPDATE;
158+
157159
_now := timezone('UTC', now());
158160
_hid := NULL;
159161
160-
#{"SELECT hid INTO _hid FROM #{history} WHERE #{pk} = OLD.#{pk} AND lower(validity) = _now;" unless ENV['CHRONOMODEL_NO_SQUASH']}
162+
#{"SELECT hid INTO _hid FROM #{history} WHERE #{pk} = OLD.#{pk} AND upper_inf(validity) AND lower(validity) >= _now LIMIT 1;" unless ENV['CHRONOMODEL_NO_SQUASH']}
161163
162164
IF _hid IS NOT NULL THEN
163165
UPDATE #{history} SET (#{fields}) = (#{values}) WHERE hid = _hid;

spec/chrono_model/history_models_spec.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
expected['sub_bars'] = SubBar::History if defined?(SubBar::History)
2121
expected['sub_sub_bars'] = SubSubBar::History if defined?(SubSubBar::History)
2222

23+
expected['overlappers'] = Overlapper::History if defined?(Overlapper::History)
24+
expected['overlap_bros'] = OverlapBro::History if defined?(OverlapBro::History)
25+
2326
# default_scope_spec
2427
expected['defoos'] = Defoo::History if defined?(Defoo::History)
2528

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
# frozen_string_literal: true
2+
3+
require 'spec_helper'
4+
require 'support/time_machine/structure'
5+
6+
RSpec.describe ChronoModel::TimeMachine do
7+
include ChronoTest::TimeMachine::Helpers
8+
9+
describe 'concurrent updates' do
10+
it 'handles concurrent updates without PG::ExclusionViolation' do
11+
foo = Foo.create!(name: 'concurrency-test')
12+
13+
threads_count = 4
14+
iterations = 10
15+
errors = []
16+
mutex = Mutex.new
17+
18+
threads = Array.new(threads_count) do |i|
19+
Thread.new do
20+
iterations.times do |j|
21+
Foo.transaction do
22+
record = Foo.find(foo.id)
23+
record.update!(name: "iteration-#{i}-#{j}")
24+
end
25+
rescue StandardError => e
26+
mutex.synchronize do
27+
errors << "Thread #{i} Iteration #{j}: #{e.class} - #{e.message}"
28+
end
29+
end
30+
end
31+
end
32+
33+
threads.each(&:join)
34+
35+
expect(errors).to be_empty
36+
ensure
37+
# Clean up: delete the record and its history
38+
if foo
39+
foo.destroy
40+
Foo::History.where(id: foo.id).delete_all
41+
end
42+
end
43+
44+
it 'keeps the open history row aligned with the current row when an older transaction commits last' do
45+
foo = Foo.create!(name: 'orig')
46+
errors = []
47+
mutex = Mutex.new
48+
started = Queue.new
49+
proceed = Queue.new
50+
stale = nil
51+
stale_started = false
52+
53+
begin
54+
stale = Thread.new do
55+
Foo.connection_pool.with_connection do
56+
Foo.transaction do
57+
Foo.connection.execute("SELECT timezone('UTC', now())")
58+
stale_started = true
59+
started << true
60+
proceed.pop
61+
Foo.find(foo.id).update!(name: 'stale')
62+
end
63+
end
64+
rescue StandardError => e
65+
started << true unless stale_started
66+
mutex.synchronize { errors << e }
67+
end
68+
69+
started.pop
70+
sleep(0.05)
71+
Foo.find(foo.id).update!(name: 't1')
72+
sleep(0.05)
73+
Foo.find(foo.id).update!(name: 't2')
74+
75+
proceed << true
76+
stale.join
77+
78+
expect(errors).to be_empty
79+
expect(foo.reload.name).to eq('stale')
80+
expect(Foo::History.where(id: foo.id).where('upper_inf(validity)').pick(:name)).to eq(foo.name)
81+
ensure
82+
proceed << true if proceed && stale&.alive?
83+
stale&.join
84+
85+
if foo
86+
Foo.where(id: foo.id).delete_all
87+
Foo::History.where(id: foo.id).delete_all
88+
end
89+
end
90+
end
91+
end
92+
93+
describe 'concurrent deletions' do
94+
it 'handles concurrent deletions without error' do
95+
threads_count = 4
96+
foo_ids = Array.new(threads_count) { |i| Foo.create!(name: "test-delete-#{i}") }.map(&:id)
97+
errors = []
98+
mutex = Mutex.new
99+
100+
threads = Array.new(threads_count) do |i|
101+
Thread.new do
102+
Foo.where(id: foo_ids[i]).delete_all
103+
rescue StandardError => e
104+
mutex.synchronize do
105+
errors << "Thread #{i}: #{e.class} - #{e.message}"
106+
end
107+
end
108+
end
109+
110+
threads.each(&:join)
111+
112+
expect(errors).to be_empty
113+
foo_ids.each do |id|
114+
expect(Foo.exists?(id)).to be(false)
115+
end
116+
ensure
117+
# Clean up any remaining records and history
118+
foo_ids&.each do |id|
119+
Foo::History.where(id: id).delete_all
120+
Foo.where(id: id).delete_all
121+
end
122+
end
123+
end
124+
end
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
# frozen_string_literal: true
2+
3+
require 'spec_helper'
4+
require 'support/time_machine/structure'
5+
6+
RSpec.describe ChronoModel::TimeMachine do
7+
include ChronoTest::TimeMachine::Helpers
8+
9+
after do
10+
Overlapper::History.delete_all
11+
Overlapper.delete_all
12+
OverlapBro::History.delete_all
13+
OverlapBro.delete_all
14+
end
15+
16+
describe 'records created in the same transaction' do
17+
it 'are all visible in as_of() queries using the first record creation time' do
18+
first_record = nil
19+
second_record = nil
20+
third_record = nil
21+
22+
Overlapper.transaction do
23+
first_record = Overlapper.create!(name: 'first')
24+
second_record = Overlapper.create!(name: 'second')
25+
third_record = Overlapper.create!(name: 'third')
26+
end
27+
28+
Overlapper.transaction do
29+
Overlapper.create!(name: 'second transaction record')
30+
end
31+
32+
# Get the validity start time of the first record from history
33+
first_record_validity_start = first_record.history.last.validity.begin
34+
35+
# All records created in the same transaction should be visible
36+
# when querying as_of() with the first record's creation timestamp
37+
records_at_first_timestamp = Overlapper.as_of(first_record_validity_start)
38+
expect(records_at_first_timestamp.map(&:id)).to contain_exactly(first_record.id, second_record.id, third_record.id)
39+
end
40+
41+
it 'allows querying associated records created in the same transaction' do
42+
overlap = nil
43+
44+
Overlapper.transaction do
45+
bro = OverlapBro.create!(name: 'bro record')
46+
overlap = Overlapper.create!(name: 'child', overlap_bro: bro)
47+
end
48+
49+
overlap_validity_start = overlap.history.last.validity.begin
50+
overlap_at_creation = Overlapper.as_of(overlap_validity_start).find(overlap.id)
51+
# Child record should be visible through the association
52+
expect(overlap_at_creation.overlap_bro.name).to eq('bro record')
53+
end
54+
55+
it 'maintains consistent timestamps for records created together' do
56+
records = nil
57+
58+
Overlapper.transaction do
59+
records = Array.new(5) { |i| Overlapper.create!(name: "record-#{i}") }
60+
end
61+
62+
# All records should have the same validity start time (transaction time)
63+
validity_starts = records.map { |r| r.history.last.validity.begin }
64+
65+
expect(validity_starts.uniq.size).to eq(1),
66+
"Expected all records to have the same validity start time, but got: #{validity_starts.inspect}"
67+
end
68+
end
69+
70+
describe 'records updated in the same transaction' do
71+
# This test reproduces the issue where clock_timestamp() in UPDATE triggers
72+
# causes records updated within the same transaction to have different timestamps,
73+
# making them invisible in as_of() queries.
74+
#
75+
# Scenario:
76+
# 1. Create an "overlap_bro" record
77+
# 2. Create associated "overlappers" that reference this version
78+
# 3. Update the overlappers within the same transaction
79+
# 4. Query as_of(overlap_bro.created_at) should show all overlappers
80+
it 'updated records are visible in as_of() queries using the transaction start time' do
81+
bro = nil
82+
83+
OverlapBro.transaction do
84+
bro = OverlapBro.create!(name: 'bro_v1')
85+
3.times do |i|
86+
overlap = Overlapper.create!(name: "overlapper_#{i}", overlap_bro: bro)
87+
overlap.update!(name: "overlapper_#{i}_updated")
88+
end
89+
end
90+
91+
bro_created_at = bro.history.last.validity.begin
92+
93+
# Query all overlappers as of the bro's creation time
94+
# With clock_timestamp() in UPDATE trigger, updated overlappers would have
95+
# a later timestamp and would NOT be visible here
96+
overlappers_at_bro_time = Overlapper.as_of(bro_created_at).where(overlap_bro_id: bro.id)
97+
98+
expect(overlappers_at_bro_time.count).to eq(3),
99+
"Expected 3 overlappers visible at bro creation time, got #{overlappers_at_bro_time.count}"
100+
101+
expect(overlappers_at_bro_time.map(&:name)).to contain_exactly(
102+
'overlapper_0_updated',
103+
'overlapper_1_updated',
104+
'overlapper_2_updated'
105+
)
106+
end
107+
108+
it 'multiple updates in same transaction maintain consistent timestamp' do
109+
overlap = nil
110+
111+
Overlapper.transaction do
112+
overlap = Overlapper.create!(name: 'original')
113+
overlap.update!(name: 'update1')
114+
overlap.update!(name: 'update2')
115+
overlap.update!(name: 'final')
116+
end
117+
118+
# All history entries created in this transaction should share the same timestamp
119+
# (due to squashing or consistent now() usage)
120+
history_entries = overlap.history.order(:hid)
121+
122+
# The final state should be visible at the creation timestamp
123+
creation_time = history_entries.first.validity.begin
124+
overlap_at_creation = Overlapper.as_of(creation_time).find(overlap.id)
125+
expect(overlap_at_creation.name).to eq('final')
126+
end
127+
end
128+
129+
describe 'records updated after initial creation' do
130+
it 'handles updates without timestamp conflicts' do
131+
overlap = Overlapper.create!(name: 'original')
132+
133+
# Sleep a tiny bit to ensure different transaction time
134+
sleep(0.01)
135+
136+
overlap.update!(name: 'updated')
137+
138+
# The update should succeed and create a valid history entry
139+
overlap.reload
140+
expect(overlap.name).to eq('updated')
141+
142+
# History should have entries with non-overlapping validity ranges
143+
history_entries = overlap.history.order(:hid)
144+
expect(history_entries.size).to eq 2
145+
end
146+
end
147+
end

spec/support/time_machine/structure.rb

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,16 @@ def up
8383
t.references :sub_bar
8484
end
8585

86+
adapter.create_table 'overlap_bros', temporal: true do |t|
87+
t.string :name
88+
end
89+
90+
adapter.create_table 'overlappers', temporal: true do |t|
91+
t.string :name
92+
93+
t.references :overlap_bro
94+
end
95+
8696
class ::Bar < ActiveRecord::Base
8797
include ChronoModel::TimeMachine
8898

@@ -162,6 +172,18 @@ class ::SubSubBar < ActiveRecord::Base
162172
belongs_to :sub_bar
163173
end
164174

175+
class ::OverlapBro < ActiveRecord::Base
176+
include ChronoModel::TimeMachine
177+
178+
has_many :overlappers
179+
end
180+
181+
class ::Overlapper < ActiveRecord::Base
182+
include ChronoModel::TimeMachine
183+
184+
belongs_to :overlap_bro, optional: true
185+
end
186+
165187
# Master timeline, used in multiple specs. It is defined here
166188
# as a global variable to be able to be shared across specs.
167189
#

0 commit comments

Comments
 (0)