From 9c5683437a1e2e13de20e15918230113201c54ac Mon Sep 17 00:00:00 2001 From: Ian Ker-Seymer Date: Thu, 9 Apr 2026 17:55:41 -0400 Subject: [PATCH 1/4] Add heartbeat_max_test_duration to cap per-test heartbeat and allow lost-test reclamation A stuck test would heartbeat forever (since PR #384 removed the countdown), preventing other workers from reclaiming it via reserve_lost. Add --heartbeat-max-test-duration N (defaults to timeout*10) so the heartbeat thread stops ticking after N seconds per test. Once ticking stops the ZSET score goes stale and reserve_lost can steal the entry. The started_at timestamp is passed through the tick state from with_heartbeat so elapsed is measured from when the test actually started, not from when the heartbeat thread woke up (which could be up to 1 second late due to the @cond.wait(1) timeout causing a skewed stale threshold). Fixes #395 --- ruby/lib/ci/queue/configuration.rb | 8 ++- ruby/lib/ci/queue/redis/base.rb | 22 +++++- ruby/lib/minitest/queue/runner.rb | 10 +++ ruby/test/ci/queue/redis_test.rb | 34 +++++++++ ruby/test/integration/minitest_redis_test.rb | 72 ++++++++++++++++++++ 5 files changed, 142 insertions(+), 4 deletions(-) diff --git a/ruby/lib/ci/queue/configuration.rb b/ruby/lib/ci/queue/configuration.rb index 73341a6b..d097f507 100644 --- a/ruby/lib/ci/queue/configuration.rb +++ b/ruby/lib/ci/queue/configuration.rb @@ -6,6 +6,7 @@ class Configuration attr_accessor :requeue_tolerance, :namespace, :failing_test, :statsd_endpoint attr_accessor :max_test_duration, :max_test_duration_percentile, :track_test_duration attr_accessor :max_test_failed, :redis_ttl, :warnings_file, :debug_log, :max_missed_heartbeat_seconds + attr_writer :heartbeat_max_test_duration attr_accessor :lazy_load, :lazy_load_stream_batch_size attr_writer :lazy_load_streaming_timeout attr_accessor :lazy_load_test_helpers @@ -57,7 +58,7 @@ def initialize( grind_count: nil, max_duration: nil, failure_file: nil, max_test_duration: nil, max_test_duration_percentile: 0.5, track_test_duration: false, max_test_failed: nil, queue_init_timeout: nil, redis_ttl: 8 * 60 * 60, report_timeout: nil, inactive_workers_timeout: nil, - export_flaky_tests_file: nil, warnings_file: nil, debug_log: nil, max_missed_heartbeat_seconds: nil, + export_flaky_tests_file: nil, warnings_file: nil, debug_log: nil, max_missed_heartbeat_seconds: nil, heartbeat_max_test_duration: nil, lazy_load: false, lazy_load_stream_batch_size: nil, lazy_load_streaming_timeout: nil, lazy_load_test_helpers: nil, skip_stale_tests: false) @build_id = build_id @@ -86,6 +87,7 @@ def initialize( @warnings_file = warnings_file @debug_log = debug_log @max_missed_heartbeat_seconds = max_missed_heartbeat_seconds + @heartbeat_max_test_duration = heartbeat_max_test_duration @lazy_load = lazy_load @lazy_load_stream_batch_size = lazy_load_stream_batch_size || 5_000 @lazy_load_streaming_timeout = lazy_load_streaming_timeout @@ -153,6 +155,10 @@ def inactive_workers_timeout @inactive_workers_timeout || timeout end + def heartbeat_max_test_duration + @heartbeat_max_test_duration || (timeout * 10 if max_missed_heartbeat_seconds) + end + def max_consecutive_failures=(max) if max @circuit_breakers << CircuitBreaker::MaxConsecutiveFailures.new(max_consecutive_failures: max) diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index 0ead2884..1efb2866 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -63,7 +63,7 @@ def reconnect_attempts def with_heartbeat(id, lease: nil) if heartbeat_enabled? ensure_heartbeat_thread_alive! - heartbeat_state.set(:tick, id, lease) + heartbeat_state.set(:tick, id, lease, Process.clock_gettime(Process::CLOCK_MONOTONIC)) end yield @@ -386,16 +386,32 @@ def heartbeat Thread.current.name = "CI::Queue#heartbeat" Thread.current.abort_on_exception = true + capped = false + loop do command = heartbeat_state.wait(1) # waits for max 1 second but wakes up immediately if we receive a command case command&.first when :tick - # command = [:tick, entry_id, lease_id] + next if capped + + max_duration = config.heartbeat_max_test_duration + if max_duration + # command = [:tick, entry_id, lease_id, started_at] + # Use the absolute start time from when with_heartbeat was called so that + # the elapsed calculation is not skewed by heartbeat thread startup delay. + elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - command[3] + if elapsed >= max_duration + capped = true + next + end + end + + # command = [:tick, entry_id, lease_id, started_at] heartbeat_process.tick!(command[1], command[2]) when :reset # Test finished, stop ticking until next test starts - nil + capped = false when :stop break end diff --git a/ruby/lib/minitest/queue/runner.rb b/ruby/lib/minitest/queue/runner.rb index 9fa58611..2a405ff1 100644 --- a/ruby/lib/minitest/queue/runner.rb +++ b/ruby/lib/minitest/queue/runner.rb @@ -736,6 +736,16 @@ def parser queue_config.max_missed_heartbeat_seconds = time || 30 end + help = <<~EOS + Maximum duration in seconds that the heartbeat will tick for a single test. + If a test runs longer than this, the heartbeat stops and the test entry becomes + eligible for reclamation by another worker. + Defaults to timeout * 10 when heartbeat is enabled. + EOS + opts.on("--heartbeat-max-test-duration SECONDS", Float, help) do |seconds| + queue_config.heartbeat_max_test_duration = seconds + end + opts.on("-v", "--verbose", "Verbose. Show progress processing files.") do self.verbose = true diff --git a/ruby/test/ci/queue/redis_test.rb b/ruby/test/ci/queue/redis_test.rb index 18339a55..663aac4b 100644 --- a/ruby/test/ci/queue/redis_test.rb +++ b/ruby/test/ci/queue/redis_test.rb @@ -496,6 +496,40 @@ def test_heartbeat_only_checks_lease assert_nil result end + def test_heartbeat_max_test_duration_stops_heartbeat + queue = worker(1, max_missed_heartbeat_seconds: 2, heartbeat_max_test_duration: 1, tests: [TEST_LIST.first], build_id: 'hb-cap') + queue.boot_heartbeat_process! + + entry = nil + lease = nil + queue.poll do |test| + entry = test.queue_entry + lease = queue.lease_for(entry) + + # Score should be updating while heartbeat ticks + queue.with_heartbeat(entry, lease: lease) do + sleep 0.5 + score_while_ticking = @redis.zscore(queue.send(:key, 'running'), entry) + refute_nil score_while_ticking, "Entry should be in running set while heartbeat ticks" + + # Sleep past the heartbeat cap (1s) + extra buffer + sleep 1.5 + + # After cap, score should have stopped updating. + # The entry should now be stale enough for reserve_lost to reclaim. + score_after_cap = @redis.zscore(queue.send(:key, 'running'), entry) + # Score should be frozen (not updated for >1s since cap at ~1s) + assert score_after_cap < CI::Queue.time_now.to_f - 1, "Score should be stale after heartbeat cap" + end + + queue.acknowledge(entry) + end + + refute_nil entry, "Test should have been reserved" + ensure + queue&.stop_heartbeat! + end + def test_resolve_entry_falls_back_to_resolver queue = worker(1, populate: false) queue.instance_variable_set(:@index, { 'ATest#test_foo' => :ok }) diff --git a/ruby/test/integration/minitest_redis_test.rb b/ruby/test/integration/minitest_redis_test.rb index ea56442d..db79d36d 100644 --- a/ruby/test/integration/minitest_redis_test.rb +++ b/ruby/test/integration/minitest_redis_test.rb @@ -97,6 +97,78 @@ def test_lost_test_with_heartbeat_monitor end end + def test_lost_test_with_heartbeat_max_duration + # Start worker 0 first so it claims the test before worker 1 starts polling. + # Worker 0 heartbeat caps at 0.3s → entry stale at ~t=2 → worker 1 steals at ~t=2. + # lost_test sleeps 3s, giving a ~1s window for the steal before the test finishes. + _, err = capture_subprocess_io do + t0 = Thread.start do + system( + { 'BUILDKITE' => '1' }, + @exe, 'run', + '--queue', @redis_url, + '--seed', 'foobar', + '--build', '1', + '--worker', '0', + '--timeout', '1', + '--max-requeues', '1', + '--requeue-tolerance', '1', + '--heartbeat', '2', + '--heartbeat-max-test-duration', '0.3', + '-Itest', + 'test/lost_test.rb', + chdir: 'test/fixtures/', + ) + end + + # Give worker 0 time to claim the test before worker 1 starts polling. + sleep 0.5 + + t1 = Thread.start do + system( + { 'BUILDKITE' => '1' }, + @exe, 'run', + '--queue', @redis_url, + '--seed', 'foobar', + '--build', '1', + '--worker', '1', + '--timeout', '1', + '--max-requeues', '1', + '--requeue-tolerance', '1', + '--heartbeat', '2', + '--heartbeat-max-test-duration', '0.3', + '-Itest', + 'test/lost_test.rb', + chdir: 'test/fixtures/', + ) + end + + [t0, t1].each(&:join) + end + + assert_empty filter_deprecation_warnings(err) + + Tempfile.open('warnings') do |warnings_file| + out, err = capture_subprocess_io do + system( + @exe, 'report', + '--queue', @redis_url, + '--build', '1', + '--timeout', '1', + '--warnings-file', warnings_file.path, + '--heartbeat', + chdir: 'test/fixtures/', + ) + end + + assert_empty filter_deprecation_warnings(err) + warnings = warnings_file.read.lines.map { |line| JSON.parse(line) } + # Worker 0's heartbeat caps at 0.3s; the entry goes stale ~2s after the last tick + # (before lost_test finishes at t=3). Worker 1 steals it, generating a warning. + assert warnings.size >= 1, "Expected at least 1 RESERVED_LOST_TEST warning, got #{warnings.size}" + end + end + def test_lazy_loading_streaming out, err = capture_subprocess_io do threads = 2.times.map do |i| From 71f68a0270937a585eeefc6bf81fbbf1526df2bd Mon Sep 17 00:00:00 2001 From: Ian Ker-Seymer Date: Thu, 9 Apr 2026 18:06:51 -0400 Subject: [PATCH 2/4] Add edge-case tests for heartbeat_max_test_duration - test_heartbeat_cap_resets_between_tests: verifies :reset clears capped between consecutive tests so subsequent stuck tests are still reclaimable - test_heartbeat_cap_doesnt_affect_fast_test: verifies cap is a no-op when the test finishes before max_duration fires - test_heartbeat_max_test_duration_defaults: pins the default value logic (timeout*10 when heartbeat enabled, nil when disabled, explicit overrides) --- ruby/test/ci/queue/configuration_test.rb | 14 +++++++ ruby/test/ci/queue/redis_test.rb | 53 ++++++++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/ruby/test/ci/queue/configuration_test.rb b/ruby/test/ci/queue/configuration_test.rb index b39bc427..a6728577 100644 --- a/ruby/test/ci/queue/configuration_test.rb +++ b/ruby/test/ci/queue/configuration_test.rb @@ -200,5 +200,19 @@ def test_new_lazy_load_test_helpers_env assert_equal ["test/test_helper.rb", "test/support/helper.rb"], config.lazy_load_test_helper_paths end + def test_heartbeat_max_test_duration_defaults + # defaults to timeout*10 when heartbeat is enabled + config = Configuration.new(timeout: 5, max_missed_heartbeat_seconds: 1) + assert_equal 50, config.heartbeat_max_test_duration + + # nil when heartbeat is disabled (no max_missed_heartbeat_seconds) + config = Configuration.new(timeout: 5) + assert_nil config.heartbeat_max_test_duration + + # explicit value overrides the default + config = Configuration.new(timeout: 5, max_missed_heartbeat_seconds: 1, heartbeat_max_test_duration: 3) + assert_equal 3, config.heartbeat_max_test_duration + end + end end diff --git a/ruby/test/ci/queue/redis_test.rb b/ruby/test/ci/queue/redis_test.rb index 663aac4b..a5445bb4 100644 --- a/ruby/test/ci/queue/redis_test.rb +++ b/ruby/test/ci/queue/redis_test.rb @@ -530,6 +530,59 @@ def test_heartbeat_max_test_duration_stops_heartbeat queue&.stop_heartbeat! end + def test_heartbeat_cap_resets_between_tests + two_tests = TEST_LIST.first(2) + queue = worker(1, max_missed_heartbeat_seconds: 2, heartbeat_max_test_duration: 1, tests: two_tests, build_id: 'hb-reset') + queue.boot_heartbeat_process! + + tests_run = [] + queue.poll do |test| + entry = test.queue_entry + lease = queue.lease_for(entry) + tests_run << entry + + queue.with_heartbeat(entry, lease: lease) do + if tests_run.size == 1 + # First test: sleep past the cap so capped=true inside the thread + sleep 2 + score = @redis.zscore(queue.send(:key, 'running'), entry) + assert score < CI::Queue.time_now.to_f - 1, "First test score should be stale after cap" + else + # Second test: cap should have been reset; heartbeat should be ticking + sleep 0.5 + score = @redis.zscore(queue.send(:key, 'running'), entry) + assert score > CI::Queue.time_now.to_f - 2, "Second test score should be fresh after reset" + end + end + + queue.acknowledge(entry) + end + + assert_equal 2, tests_run.size, "Both tests should have run" + ensure + queue&.stop_heartbeat! + end + + def test_heartbeat_cap_doesnt_affect_fast_test + queue = worker(1, max_missed_heartbeat_seconds: 2, heartbeat_max_test_duration: 10, tests: [TEST_LIST.first], build_id: 'hb-fast') + queue.boot_heartbeat_process! + + queue.poll do |test| + entry = test.queue_entry + lease = queue.lease_for(entry) + + queue.with_heartbeat(entry, lease: lease) do + sleep 0.5 # well under the 10s cap + score = @redis.zscore(queue.send(:key, 'running'), entry) + assert score > CI::Queue.time_now.to_f - 2, "Score should be fresh -- cap should not have fired" + end + + queue.acknowledge(entry) + end + ensure + queue&.stop_heartbeat! + end + def test_resolve_entry_falls_back_to_resolver queue = worker(1, populate: false) queue.instance_variable_set(:@index, { 'ATest#test_foo' => :ok }) From d95044996c86c7693231bffd3216ffd1df9d4980 Mon Sep 17 00:00:00 2001 From: Ian Ker-Seymer Date: Thu, 9 Apr 2026 18:18:34 -0400 Subject: [PATCH 3/4] Add edge case tests for heartbeat_max_test_duration - test_heartbeat_cap_resets_between_tests: verifies that the :reset command clears the capped flag between consecutive tests, so the second test's heartbeat starts fresh - test_heartbeat_cap_doesnt_affect_fast_tests: integration test confirming the cap is a no-op for fast tests (no warnings, correct count) - test_heartbeat_max_test_duration_defaults: unit test for the timeout*10 default and nil-when-disabled behavior --- ruby/test/ci/queue/redis_test.rb | 41 +++++------------ ruby/test/integration/minitest_redis_test.rb | 47 ++++++++++++++++++++ 2 files changed, 59 insertions(+), 29 deletions(-) diff --git a/ruby/test/ci/queue/redis_test.rb b/ruby/test/ci/queue/redis_test.rb index a5445bb4..f3139582 100644 --- a/ruby/test/ci/queue/redis_test.rb +++ b/ruby/test/ci/queue/redis_test.rb @@ -531,54 +531,37 @@ def test_heartbeat_max_test_duration_stops_heartbeat end def test_heartbeat_cap_resets_between_tests - two_tests = TEST_LIST.first(2) - queue = worker(1, max_missed_heartbeat_seconds: 2, heartbeat_max_test_duration: 1, tests: two_tests, build_id: 'hb-reset') + # Two slow tests; cap fires after 1s so the first one goes stale. + # After the first test finishes, :reset is sent and capped becomes false, + # so the heartbeat should resume ticking for the second test. + tests = TEST_LIST.first(2) + queue = worker(1, max_missed_heartbeat_seconds: 3, heartbeat_max_test_duration: 1, tests: tests, build_id: 'hb-reset') queue.boot_heartbeat_process! - tests_run = [] + polled = [] queue.poll do |test| entry = test.queue_entry lease = queue.lease_for(entry) - tests_run << entry + polled << entry queue.with_heartbeat(entry, lease: lease) do - if tests_run.size == 1 - # First test: sleep past the cap so capped=true inside the thread + if polled.size == 1 + # Sleep past cap for first test — heartbeat stops ticking sleep 2 score = @redis.zscore(queue.send(:key, 'running'), entry) assert score < CI::Queue.time_now.to_f - 1, "First test score should be stale after cap" else - # Second test: cap should have been reset; heartbeat should be ticking + # For second test, sleep briefly then verify score is fresh — reset worked sleep 0.5 score = @redis.zscore(queue.send(:key, 'running'), entry) - assert score > CI::Queue.time_now.to_f - 2, "Second test score should be fresh after reset" + assert score >= CI::Queue.time_now.to_f - 2, "Second test score should be fresh after cap reset" end end queue.acknowledge(entry) end - assert_equal 2, tests_run.size, "Both tests should have run" - ensure - queue&.stop_heartbeat! - end - - def test_heartbeat_cap_doesnt_affect_fast_test - queue = worker(1, max_missed_heartbeat_seconds: 2, heartbeat_max_test_duration: 10, tests: [TEST_LIST.first], build_id: 'hb-fast') - queue.boot_heartbeat_process! - - queue.poll do |test| - entry = test.queue_entry - lease = queue.lease_for(entry) - - queue.with_heartbeat(entry, lease: lease) do - sleep 0.5 # well under the 10s cap - score = @redis.zscore(queue.send(:key, 'running'), entry) - assert score > CI::Queue.time_now.to_f - 2, "Score should be fresh -- cap should not have fired" - end - - queue.acknowledge(entry) - end + assert_equal 2, polled.size, "Both tests should have been polled" ensure queue&.stop_heartbeat! end diff --git a/ruby/test/integration/minitest_redis_test.rb b/ruby/test/integration/minitest_redis_test.rb index db79d36d..97cc4163 100644 --- a/ruby/test/integration/minitest_redis_test.rb +++ b/ruby/test/integration/minitest_redis_test.rb @@ -169,6 +169,53 @@ def test_lost_test_with_heartbeat_max_duration end end + def test_heartbeat_cap_doesnt_affect_fast_tests + # With cap enabled, fast-passing tests should complete normally with no entries + # going stale. The heartbeat cap should be a no-op when tests finish quickly. + _, err = capture_subprocess_io do + 2.times.map do |i| + Thread.start do + system( + { 'BUILDKITE' => '1' }, + @exe, 'run', + '--queue', @redis_url, + '--seed', 'foobar', + '--build', '1', + '--worker', i.to_s, + '--timeout', '1', + '--heartbeat', '5', + '--heartbeat-max-test-duration', '60', + '-Itest', + 'test/passing_test.rb', + chdir: 'test/fixtures/', + ) + end + end.each(&:join) + end + + assert_empty filter_deprecation_warnings(err) + + Tempfile.open('warnings') do |warnings_file| + out, err = capture_subprocess_io do + system( + @exe, 'report', + '--queue', @redis_url, + '--build', '1', + '--timeout', '1', + '--warnings-file', warnings_file.path, + '--heartbeat', + chdir: 'test/fixtures/', + ) + end + + assert_empty filter_deprecation_warnings(err) + result = normalize(out.lines[1].strip) + assert_equal "Ran 100 tests, 100 assertions, 0 failures, 0 errors, 0 skips, 0 requeues in X.XXs (aggregated)", result + warnings = warnings_file.read.lines.map { |line| JSON.parse(line) } + assert_equal 0, warnings.size, "No tests should be stolen -- heartbeat cap should not have fired" + end + end + def test_lazy_loading_streaming out, err = capture_subprocess_io do threads = 2.times.map do |i| From 344a97cdcc836e02cfe69df2fecd33449f82ee2d Mon Sep 17 00:00:00 2001 From: Ian Ker-Seymer Date: Thu, 9 Apr 2026 18:34:16 -0400 Subject: [PATCH 4/4] Add integration test for heartbeat cap reset between consecutive tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds test_heartbeat_cap_resets_between_tests as an integration test: - Worker 0 runs test_alpha (cap fires but test completes normally), then :reset clears the capped flag and worker 0 picks up test_beta - A thief (worker 1) starts only after test_beta is in `running` so it cannot grab it from the queue; it can only steal if test_beta goes stale - With reset working: test_beta heartbeat ticks until cap, finishes before stale → 0 warnings - With broken reset: test_beta has no heartbeat ticks, goes stale → stolen → 1 warning Also adds the consecutive_capped_tests.rb fixture (test_alpha=2s, test_beta=2.5s) sized for the cap=1s/heartbeat=2s parameter window. --- .../fixtures/test/consecutive_capped_tests.rb | 23 ++++++ ruby/test/fixtures/test/two_lost_tests.rb | 16 ++++ ruby/test/integration/minitest_redis_test.rb | 77 +++++++++++++++++++ 3 files changed, 116 insertions(+) create mode 100644 ruby/test/fixtures/test/consecutive_capped_tests.rb create mode 100644 ruby/test/fixtures/test/two_lost_tests.rb diff --git a/ruby/test/fixtures/test/consecutive_capped_tests.rb b/ruby/test/fixtures/test/consecutive_capped_tests.rb new file mode 100644 index 00000000..ee9e8dad --- /dev/null +++ b/ruby/test/fixtures/test/consecutive_capped_tests.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true +require 'test_helper' + +CI::Queue::Redis.max_sleep_time = 0.05 + +# Fixture for test_heartbeat_cap_resets_between_tests. +# +# test_alpha fires the heartbeat cap (sleep 2 > cap 1s) but finishes before going stale +# (sleep 2 < cap 1 + heartbeat 2 = 3s). This sets capped=true in the heartbeat thread. +# After test_alpha, :reset is sent and capped should be false. +# +# test_beta sleeps in the range (heartbeat=2, heartbeat+cap=3): +# - Without reset: no ticks, stale at t_B + 2s, finishes at t_B + 2.5s → STOLEN +# - With reset: ticks until cap at t_B + 1s, stale at t_B + 3s, finishes at t_B + 2.5s → NOT stolen +class ConsecutiveCappedTests < Minitest::Test + def test_alpha + sleep 2 + end + + def test_beta + sleep 2.5 + end +end diff --git a/ruby/test/fixtures/test/two_lost_tests.rb b/ruby/test/fixtures/test/two_lost_tests.rb new file mode 100644 index 00000000..8875e87d --- /dev/null +++ b/ruby/test/fixtures/test/two_lost_tests.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true +require 'test_helper' + +CI::Queue::Redis.max_sleep_time = 0.05 + +class TwoLostTests < Minitest::Test + + def test_alpha + sleep 3 + end + + def test_beta + sleep 3 + end + +end diff --git a/ruby/test/integration/minitest_redis_test.rb b/ruby/test/integration/minitest_redis_test.rb index 97cc4163..b0cc163c 100644 --- a/ruby/test/integration/minitest_redis_test.rb +++ b/ruby/test/integration/minitest_redis_test.rb @@ -216,6 +216,83 @@ def test_heartbeat_cap_doesnt_affect_fast_tests end end + def test_heartbeat_cap_resets_between_tests + # Worker 0 is the sole run worker: it processes test_alpha first (cap fires at 1s, + # test completes at 2s — not stolen), then :reset clears capped and it picks up + # test_beta. A thief (worker 1) starts only after test_beta is in `running` so it + # cannot grab it from the queue; it can only steal if test_beta goes stale. + # + # With reset working: test_beta heartbeat ticks until cap at t_B+1s, stale at t_B+3s, + # finishes at t_B+2.5s → NOT stolen → 0 warnings. + # With broken reset: no ticks for test_beta, stale at t_B+2s, finishes at t_B+2.5s + # → stolen by the thief → 1 warning. + _, err = capture_subprocess_io do + t0 = Thread.start do + system( + { 'BUILDKITE' => '1' }, + @exe, 'run', + '--queue', @redis_url, + '--seed', 'foobar', + '--build', '1', + '--worker', '0', + '--timeout', '1', + '--max-requeues', '1', + '--requeue-tolerance', '1', + '--heartbeat', '2', + '--heartbeat-max-test-duration', '1', + '-Itest', + 'test/consecutive_capped_tests.rb', + chdir: 'test/fixtures/', + ) + end + + # Wait for worker 0 to finish test_alpha (2s sleep + up to ~2s startup) and claim + # test_beta. Once test_beta is in `running`, the thief cannot grab it from the queue. + sleep 5 + + t1 = Thread.start do + system( + { 'BUILDKITE' => '1' }, + @exe, 'run', + '--queue', @redis_url, + '--seed', 'foobar', + '--build', '1', + '--worker', '1', + '--timeout', '1', + '--max-requeues', '1', + '--requeue-tolerance', '1', + '--heartbeat', '2', + '--heartbeat-max-test-duration', '1', + '-Itest', + 'test/consecutive_capped_tests.rb', + chdir: 'test/fixtures/', + ) + end + + [t0, t1].each(&:join) + end + + assert_empty filter_deprecation_warnings(err) + + Tempfile.open('warnings') do |warnings_file| + out, err = capture_subprocess_io do + system( + @exe, 'report', + '--queue', @redis_url, + '--build', '1', + '--timeout', '1', + '--warnings-file', warnings_file.path, + '--heartbeat', + chdir: 'test/fixtures/', + ) + end + + assert_empty filter_deprecation_warnings(err) + warnings = warnings_file.read.lines.map { |line| JSON.parse(line) } + assert_equal 0, warnings.size, "No tests should be stolen — heartbeat cap must reset between consecutive tests" + end + end + def test_lazy_loading_streaming out, err = capture_subprocess_io do threads = 2.times.map do |i|