Skip to content

Commit 641c324

Browse files
belimawrCopilot
andauthored
Fix container ignoring max bytes when parsing CRI partial messages (#49743)
GenAI-Assisted: Yes Human-Reviewed: Yes Tool: Claude-CLI, Model: Sonet 4.6 --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent e982b4f commit 641c324

5 files changed

Lines changed: 116 additions & 9 deletions

File tree

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# REQUIRED
2+
# Kind can be one of:
3+
# - breaking-change: a change to previously-documented behavior
4+
# - deprecation: functionality that is being removed in a later release
5+
# - bug-fix: fixes a problem in a previous version
6+
# - enhancement: extends functionality but does not break or fix existing behavior
7+
# - feature: new functionality
8+
# - known-issue: problems that we are aware of in a given version
9+
# - security: impacts on the security of a product or a user’s deployment.
10+
# - upgrade: important information for someone upgrading from a prior version
11+
# - other: does not fit into any of the other categories
12+
kind: bug-fix
13+
14+
# REQUIRED for all kinds
15+
# Change summary; a 80ish characters long description of the change.
16+
summary: Fix container input not respecting max bytes when parsing CRI partial lines
17+
18+
# REQUIRED for breaking-change, deprecation, known-issue
19+
# Long description; in case the summary is not enough to describe the change
20+
# this field accommodate a description without length limits.
21+
# description:
22+
23+
# REQUIRED for breaking-change, deprecation, known-issue
24+
# impact:
25+
26+
# REQUIRED for breaking-change, deprecation, known-issue
27+
# action:
28+
29+
# REQUIRED for all kinds
30+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
31+
component: filebeat
32+
33+
# AUTOMATED
34+
# OPTIONAL to manually add other PR URLs
35+
# PR URL: A link the PR that added the changeset.
36+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
37+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
38+
# Please provide it if you are adding a fragment for a different PR.
39+
# pr: https://github.com/owner/repo/1234
40+
41+
# AUTOMATED
42+
# OPTIONAL to manually add other issue URLs
43+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
44+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
45+
issue: https://github.com/elastic/beats/issues/49259

filebeat/input/log/harvester.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,7 @@ func (h *Harvester) openFile() error {
529529

530530
f, err := file_helper.ReadOpen(h.state.Source)
531531
if err != nil {
532-
return fmt.Errorf("Failed opening %s: %w", h.state.Source, err)
532+
return fmt.Errorf("failed opening %s: %w", h.state.Source, err)
533533
}
534534

535535
harvesterOpenFiles.Add(1)
@@ -551,11 +551,11 @@ func (h *Harvester) validateFile(f *os.File) error {
551551

552552
info, err := f.Stat()
553553
if err != nil {
554-
return fmt.Errorf("Failed getting stats for file %s: %w", h.state.Source, err)
554+
return fmt.Errorf("failed getting stats for file %s: %w", h.state.Source, err)
555555
}
556556

557557
if !info.Mode().IsRegular() {
558-
return fmt.Errorf("Tried to open non regular file: %q %s", info.Mode(), info.Name())
558+
return fmt.Errorf("tried to open non regular file: %q %s", info.Mode(), info.Name())
559559
}
560560

561561
// Compares the stat of the opened file to the state given by the input. Abort if not match.
@@ -687,7 +687,7 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
687687

688688
if h.config.DockerJSON != nil {
689689
// Docker json-file format, add custom parsing to the pipeline
690-
r = readjson.New(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial, h.config.DockerJSON.Format, h.config.DockerJSON.CRIFlags, h.logger)
690+
r = readjson.New(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial, h.config.DockerJSON.Format, h.config.DockerJSON.CRIFlags, h.config.MaxBytes, h.logger)
691691
}
692692

693693
if h.config.JSON != nil {

libbeat/reader/parser/parser.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ func (c *Config) Create(in reader.Reader, log *logp.Logger) Parser {
173173
if err != nil {
174174
return p
175175
}
176-
p = readjson.NewContainerParser(p, &config, log)
176+
p = readjson.NewContainerParser(p, &config, int(c.pCfg.MaxBytes), log)
177177
case "syslog":
178178
config := syslog.DefaultConfig()
179179
cfg := ns.Config()

libbeat/reader/readjson/docker_json.go

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ type DockerJSONReader struct {
4343
// parse CRI flags
4444
criflags bool
4545

46+
// maximum number of bytes to use when reassembling partial CRI/docker log lines;
47+
// limits growth while joining fragments but does not cap the size of the initial chunk.
48+
// A value of 0 means no limit is applied during reassembly.
49+
maxBytes int
50+
4651
parseLine func(message *reader.Message, msg *logLine) error
4752

4853
stripNewLine func(msg *reader.Message)
@@ -60,12 +65,13 @@ type logLine struct {
6065
}
6166

6267
// New creates a new reader renaming a field
63-
func New(r reader.Reader, stream string, partial bool, format string, CRIFlags bool, logger *logp.Logger) *DockerJSONReader {
68+
func New(r reader.Reader, stream string, partial bool, format string, CRIFlags bool, maxBytes int, logger *logp.Logger) *DockerJSONReader {
6469
reader := DockerJSONReader{
6570
stream: stream,
6671
partial: partial,
6772
reader: r,
6873
criflags: CRIFlags,
74+
maxBytes: maxBytes,
6975
logger: logger.Named("reader_docker_json"),
7076
}
7177

@@ -87,12 +93,13 @@ func New(r reader.Reader, stream string, partial bool, format string, CRIFlags b
8793
return &reader
8894
}
8995

90-
func NewContainerParser(r reader.Reader, config *ContainerJSONConfig, logger *logp.Logger) *DockerJSONReader {
96+
func NewContainerParser(r reader.Reader, config *ContainerJSONConfig, maxBytes int, logger *logp.Logger) *DockerJSONReader {
9197
reader := DockerJSONReader{
9298
stream: config.Stream.String(),
9399
partial: true,
94100
reader: r,
95101
criflags: true,
102+
maxBytes: maxBytes,
96103
logger: logger.Named("parser_container"),
97104
}
98105

@@ -233,6 +240,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) {
233240
}
234241

235242
// Handle multiline messages, join partial lines
243+
truncated := false
236244
for p.partial && logLine.Partial {
237245
next, err := p.reader.Next()
238246

@@ -248,7 +256,24 @@ func (p *DockerJSONReader) Next() (reader.Message, error) {
248256
p.logger.Errorf("Parse line error: %v", err)
249257
continue
250258
}
251-
message.Content = append(message.Content, next.Content...)
259+
260+
// Enforce max_bytes during partial line reassembly to prevent unbounded
261+
// memory growth. Once the limit is reached, drain remaining partial
262+
// chunks (updating the byte counter only) so the reader stays aligned
263+
// to logical line boundaries for the next Next() call.
264+
if truncated {
265+
continue
266+
}
267+
if p.maxBytes > 0 && len(message.Content)+len(next.Content) > p.maxBytes {
268+
remaining := p.maxBytes - len(message.Content)
269+
if remaining > 0 {
270+
message.Content = append(message.Content, next.Content[:remaining]...)
271+
}
272+
_ = message.AddFlagsWithKey("log.flags", "truncated")
273+
truncated = true
274+
} else {
275+
message.Content = append(message.Content, next.Content...)
276+
}
252277
}
253278

254279
if p.stream != "all" && p.stream != logLine.Stream {

libbeat/reader/readjson/docker_json_test.go

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
package readjson
1919

2020
import (
21+
"fmt"
2122
"io"
2223
"testing"
2324
"time"
2425

2526
"github.com/stretchr/testify/assert"
2627

2728
"github.com/elastic/beats/v7/libbeat/reader"
29+
"github.com/elastic/elastic-agent-libs/logp"
2830
"github.com/elastic/elastic-agent-libs/logp/logptest"
2931
"github.com/elastic/elastic-agent-libs/mapstr"
3032
)
@@ -351,7 +353,7 @@ func TestDockerJSON(t *testing.T) {
351353
for _, test := range tests {
352354
t.Run(test.name, func(t *testing.T) {
353355
r := &mockReader{messages: test.input}
354-
json := New(r, test.stream, test.partial, test.format, test.criflags, logger)
356+
json := New(r, test.stream, test.partial, test.format, test.criflags, 0, logger)
355357
message, err := json.Next()
356358

357359
if test.expectedError != nil {
@@ -370,6 +372,41 @@ func TestDockerJSON(t *testing.T) {
370372
}
371373
}
372374

375+
func TestDockerJSONMaxBytes(t *testing.T) {
376+
// Simulate many CRI partial chunks that would exceed max_bytes in aggregate.
377+
// The reader must truncate the assembled message and drain remaining partials
378+
// without allocating unbounded memory.
379+
chunkContent := "abcdefghij" // 10 bytes per chunk
380+
numChunks := 5
381+
maxBytes := 25 // limit is less than 5*10 = 50 bytes
382+
383+
var inputs [][]byte
384+
for i := range numChunks {
385+
flag := "P"
386+
if i == numChunks-1 {
387+
flag = "F"
388+
}
389+
line := fmt.Sprintf("2017-10-12T13:32:21.232861448Z stdout %s %s", flag, chunkContent)
390+
inputs = append(inputs, []byte(line))
391+
}
392+
393+
r := &mockReader{messages: inputs}
394+
json := New(r, "stdout", true, "cri", true, maxBytes, logp.NewNopLogger())
395+
message, err := json.Next()
396+
397+
assert.NoError(t, err)
398+
assert.Len(t, message.Content, maxBytes, "content should be capped at maxBytes")
399+
400+
flags, err := message.Fields.GetValue("log.flags")
401+
assert.NoError(t, err, "'log.flags' not present in event")
402+
assert.Contains(t, flags, "truncated", "truncated flag should be set")
403+
404+
// All partial chunks up to and including the final F line must have been
405+
// consumed by this single Next() call. If any remain, subsequent calls
406+
// would emit orphaned partial chunks as separate events, breaking alignment.
407+
assert.Empty(t, r.messages, "all partial chunks must be drained before returning")
408+
}
409+
373410
type mockReader struct {
374411
messages [][]byte
375412
}

0 commit comments

Comments
 (0)