diff --git a/pkg/cortexpb/timeseriesv2_test.go b/pkg/cortexpb/timeseriesv2_test.go index 3f06bd4c092..e71f6b9f155 100644 --- a/pkg/cortexpb/timeseriesv2_test.go +++ b/pkg/cortexpb/timeseriesv2_test.go @@ -1,6 +1,9 @@ package cortexpb import ( + "fmt" + "runtime" + "sync" "testing" "github.com/gogo/protobuf/proto" @@ -183,3 +186,144 @@ func BenchmarkMarshallWriteRequestV2(b *testing.B) { }) } } + +func BenchmarkWriteRequestV2Pool_CompareFixedSymbolCapaWithDynamic(b *testing.B) { + const ( + numVariants = 10 + gcInterval = 100 + ) + + scenarios := []struct { + name string + symbolMin, symbolMax int + }{ + {"symbol_range_200_2000", 200, 2000}, + {"symbol_range_1000_6000", 1000, 6000}, + {"symbol_range_1000_20000 (exceed maxSymbolsCapacity)", 1000, 20000}, + } + + for _, sc := range scenarios { + variants := make([][]string, numVariants) + for i := range numVariants { + size := sc.symbolMin + (sc.symbolMax-sc.symbolMin)*i/(numVariants-1) + syms := make([]string, size) + for j := range syms { + syms[j] = fmt.Sprintf("label_%04d", j) + } + variants[i] = syms + } + + b.Run(sc.name, func(b *testing.B) { + b.Run("dynamic_capacity", func(b *testing.B) { + saved := dynamicSymbolsCapacity.Load() + dynamicSymbolsCapacity.Store(int64(initialSymbolsCapacity)) + defer dynamicSymbolsCapacity.Store(saved) + + // Converge EMA to ensure dynamic capacity is adjusted to the range of variants. + for i := range 100 * numVariants { + req := PreallocWriteRequestV2FromPool() + req.Symbols = append(req.Symbols, variants[i%numVariants]...) + ReuseWriteRequestV2(req) + } + + b.ReportAllocs() + var idx int + for b.Loop() { + if idx > 0 && idx%gcInterval == 0 { + runtime.GC() // Periodically run GC to simulate real-world conditions where the pool may be emptied. + } + req := PreallocWriteRequestV2FromPool() + req.Symbols = append(req.Symbols, variants[idx%numVariants]...) + idx++ + ReuseWriteRequestV2(req) + } + }) + + b.Run("fixed_capacity", func(b *testing.B) { + fixedPool := sync.Pool{ + New: func() any { + return &PreallocWriteRequestV2{ + WriteRequestV2: WriteRequestV2{ + Symbols: make([]string, 0, initialSymbolsCapacity), + }, + } + }, + } + + b.ReportAllocs() + var idx int + for b.Loop() { + if idx > 0 && idx%gcInterval == 0 { + runtime.GC() + } + req := fixedPool.Get().(*PreallocWriteRequestV2) + req.Symbols = append(req.Symbols, variants[idx%numVariants]...) + idx++ + + // Same cleanup as ReuseWriteRequestV2, minus the EMA update. + req.Source = 0 + symbolsCap := int64(cap(req.Symbols)) + if symbolsCap > maxSymbolsCapacity { + if req.Timeseries != nil { + ReuseSliceV2(req.Timeseries) + req.Timeseries = nil + } + continue + } + for i := range req.Symbols { + req.Symbols[i] = "" + } + req.Symbols = req.Symbols[:0] + if req.Timeseries != nil { + ReuseSliceV2(req.Timeseries) + req.Timeseries = nil + } + fixedPool.Put(req) + } + }) + + b.Run("fixed_capacity_2048", func(b *testing.B) { + fixedPool2048 := sync.Pool{ + New: func() any { + return &PreallocWriteRequestV2{ + WriteRequestV2: WriteRequestV2{ + Symbols: make([]string, 0, 2048), + }, + } + }, + } + + b.ReportAllocs() + var idx int + for b.Loop() { + if idx > 0 && idx%gcInterval == 0 { + runtime.GC() + } + req := fixedPool2048.Get().(*PreallocWriteRequestV2) + req.Symbols = append(req.Symbols, variants[idx%numVariants]...) + idx++ + + // Same cleanup as ReuseWriteRequestV2, minus the EMA update. + req.Source = 0 + symbolsCap := int64(cap(req.Symbols)) + if symbolsCap > maxSymbolsCapacity { + if req.Timeseries != nil { + ReuseSliceV2(req.Timeseries) + req.Timeseries = nil + } + continue + } + for i := range req.Symbols { + req.Symbols[i] = "" + } + req.Symbols = req.Symbols[:0] + if req.Timeseries != nil { + ReuseSliceV2(req.Timeseries) + req.Timeseries = nil + } + fixedPool2048.Put(req) + } + }) + }) + } +}