Impact
Beats running as OTel receivers ignore pipeline.Settings.InputQueueSize and always create the intake queue with size 0 (which memqueue normalizes to 20). This can cause avoidable backpressure/drops for receiver workloads that are explicitly configured with larger input queue capacity.
Reproduction Steps
- Create
libbeat/publisher/pipeline/receiver_input_queue_repro_test.go with this test:
package pipeline
import (
"reflect"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/elastic/beats/v7/libbeat/beat"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp/logptest"
"github.com/elastic/elastic-agent-libs/monitoring"
)
func TestReceiverPipelineUsesConfiguredInputQueueSize(t *testing.T) {
logger := logptest.NewTestingLogger(t, "")
const configuredInputQueueSize = 123
p, err := NewForReceiver(
beat.Info{Logger: logger},
Monitors{
Logger: logger,
Metrics: monitoring.NewRegistry(),
},
conf.Namespace{},
Settings{InputQueueSize: configuredInputQueueSize},
"",
)
require.NoError(t, err, "creating receiver pipeline should succeed")
defer p.Close()
controller, ok := p.outputController.(*otelOutputController)
require.True(t, ok, "receiver pipeline should use otelOutputController")
pushChan := reflect.ValueOf(controller.queue).Elem().FieldByName("pushChan")
require.True(t, pushChan.IsValid(), "memqueue broker should contain pushChan")
assert.Equal(t, configuredInputQueueSize, pushChan.Cap(), "receiver queue channel capacity should honor Settings.InputQueueSize")
}
- Run:
go test ./libbeat/publisher/pipeline -run TestReceiverPipelineUsesConfiguredInputQueueSize -count=1
Expected vs Actual
Expected: receiver pipeline queue capacity honors Settings.InputQueueSize (123 in test).
Actual: test fails; queue cap is 20.
--- FAIL: TestReceiverPipelineUsesConfiguredInputQueueSize (0.00s)
output_otel_test.go:83:
Error: Not equal:
expected: 123
actual : 20
Messages: receiver queue channel capacity should honor Settings.InputQueueSize
FAIL
FAIL github.com/elastic/beats/v7/libbeat/publisher/pipeline 0.007s
Failing Test
TestReceiverPipelineUsesConfiguredInputQueueSize above.
Evidence
x-pack/libbeat/cmd/instance/beat.go:282-289 sets pipeline.Settings{InputQueueSize: b.InputQueueSize} and calls pipeline.NewForReceiver(...).
libbeat/publisher/pipeline/pipeline.go:197-226 receives settings Settings in NewForReceiver, but then calls newOTelOutputController(...) without forwarding settings.InputQueueSize.
libbeat/publisher/pipeline/output_otel.go:70 hardcodes queue creation as queueFactory(monitors.Logger, queueObserver, 0, nil).
I also checked for existing tracking items with issue/PR searches (InputQueueSize receiver otel queue, newOTelOutputController queueFactory(..., 0, nil)) and did not find an open duplicate.
What is this? | From workflow: Bug Hunter
Give us feedback! React with 🚀 if perfect, 👍 if helpful, 👎 if not.
Impact
Beats running as OTel receivers ignore
pipeline.Settings.InputQueueSizeand always create the intake queue with size0(which memqueue normalizes to20). This can cause avoidable backpressure/drops for receiver workloads that are explicitly configured with larger input queue capacity.Reproduction Steps
libbeat/publisher/pipeline/receiver_input_queue_repro_test.gowith this test:go test ./libbeat/publisher/pipeline -run TestReceiverPipelineUsesConfiguredInputQueueSize -count=1Expected vs Actual
Expected: receiver pipeline queue capacity honors
Settings.InputQueueSize(123in test).Actual: test fails; queue cap is
20.Failing Test
TestReceiverPipelineUsesConfiguredInputQueueSizeabove.Evidence
x-pack/libbeat/cmd/instance/beat.go:282-289setspipeline.Settings{InputQueueSize: b.InputQueueSize}and callspipeline.NewForReceiver(...).libbeat/publisher/pipeline/pipeline.go:197-226receivessettings SettingsinNewForReceiver, but then callsnewOTelOutputController(...)without forwardingsettings.InputQueueSize.libbeat/publisher/pipeline/output_otel.go:70hardcodes queue creation asqueueFactory(monitors.Logger, queueObserver, 0, nil).I also checked for existing tracking items with issue/PR searches (
InputQueueSize receiver otel queue,newOTelOutputController queueFactory(..., 0, nil)) and did not find an open duplicate.What is this? | From workflow: Bug Hunter
Give us feedback! React with 🚀 if perfect, 👍 if helpful, 👎 if not.