Skip to content

Commit 0b755b7

Browse files
torsmcopybara-github
authored andcommitted
Add broadcast groups to reduce database load from frequent broadcasts with idempotent messages
PiperOrigin-RevId: 903242115
1 parent c1c31ce commit 0b755b7

8 files changed

Lines changed: 176 additions & 13 deletions

File tree

fleetspeak/src/server/db/store.go

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"errors"
3333
"fmt"
3434
"io"
35+
"maps"
3536
"math"
3637
"time"
3738

@@ -46,9 +47,14 @@ import (
4647
tspb "google.golang.org/protobuf/types/known/timestamppb"
4748
)
4849

49-
// ErrBroadcastDisabled is returned on an attempted operation on a disabled
50-
// broadcast or an associated allocation.
51-
var ErrBroadcastDisabled = errors.New("broadcast is disabled and allocation expired")
50+
var (
51+
// ErrBroadcastDisabled is returned on an attempted operation on a disabled
52+
// broadcast or an associated allocation.
53+
ErrBroadcastDisabled = errors.New("broadcast is disabled and allocation expired")
54+
55+
// ErrNotSupported is returned when a feature is not supported by the datastore.
56+
ErrNotSupported = errors.New("feature not supported")
57+
)
5258

5359
// A Store describes the full persistence mechanism required by the base
5460
// fleetspeak system. These operations must be thread safe. These must also be
@@ -318,9 +324,29 @@ func ComputeBroadcastAllocationCleanup(allocationLimit, allocated uint64) (uint6
318324
return allocated - allocationLimit, nil
319325
}
320326

327+
// LabelsEqual returns true if the two lists of labels are equivalent in the
328+
// context of required labels of broadcasts.
329+
func LabelsEqual(l1, l2 []*fspb.Label) bool {
330+
if len(l1) != len(l2) {
331+
return false
332+
}
333+
type label struct{ s, l string }
334+
toSet := func(ls []*fspb.Label) map[label]struct{} {
335+
m := make(map[label]struct{})
336+
for _, l := range ls {
337+
m[label{l.ServiceName, l.Label}] = struct{}{}
338+
}
339+
return m
340+
}
341+
return maps.Equal(toSet(l1), toSet(l2))
342+
}
343+
321344
// BroadcastStore provides methods to store and retrieve information about broadcasts.
322345
type BroadcastStore interface {
323346
// CreateBroadcast stores a new broadcast message.
347+
// If b.GroupName is non-empty, existing active broadcasts with the same
348+
// group, source, destination, and required_labels will be disabled
349+
// before inserting this new broadcast.
324350
CreateBroadcast(ctx context.Context, b *spb.Broadcast, limit uint64) error
325351

326352
// SetBroadcastLimit adjusts the limit of an existing broadcast.

fleetspeak/src/server/dbtesting/broadcaststore_suite.go

Lines changed: 103 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package dbtesting
22

33
import (
4-
"context"
4+
"bytes"
55
"errors"
66
"testing"
77
"time"
@@ -32,7 +32,7 @@ func broadcastStoreTest(t *testing.T, ds db.Store) {
3232
fin2 := sertesting.SetServerRetryTime(func(_ uint32) time.Time { return db.Now().Add(time.Minute) })
3333
defer fin2()
3434

35-
ctx := context.Background()
35+
ctx := t.Context()
3636

3737
var bid []ids.BroadcastID
3838

@@ -254,10 +254,110 @@ func testDisableBroadcasts(t *testing.T, ds db.Store, bid ids.BroadcastID, clien
254254
}
255255
}
256256

257+
func isBroadcastActive(t *testing.T, ds db.Store, bID ids.BroadcastID) bool {
258+
t.Helper()
259+
bs, err := ds.ListActiveBroadcasts(t.Context())
260+
if err != nil {
261+
t.Fatal(err)
262+
}
263+
for _, b := range bs {
264+
if bytes.Equal(b.Broadcast.BroadcastId, bID.Bytes()) {
265+
return true
266+
}
267+
}
268+
return false
269+
}
270+
271+
func testBroadcastReplacement(t *testing.T, ds db.Store) {
272+
ctx := t.Context()
273+
274+
createBC := func(grp, src, dst string, labels ...*fspb.Label) *spb.Broadcast {
275+
return &spb.Broadcast{
276+
GroupName: grp,
277+
Source: &fspb.Address{ServiceName: src},
278+
Destination: &fspb.Address{ServiceName: dst},
279+
RequiredLabels: labels,
280+
MessageType: "Empty",
281+
}
282+
}
283+
284+
l1 := &fspb.Label{ServiceName: "client", Label: "label1"}
285+
l2 := &fspb.Label{ServiceName: "client", Label: "label2"}
286+
287+
for _, tc := range []struct {
288+
desc string
289+
b1 *spb.Broadcast
290+
b2 *spb.Broadcast
291+
wantReplace bool
292+
}{
293+
{
294+
desc: "IdenticalMatchingProperties",
295+
b1: createBC("g1", "src1", "dst1", l1),
296+
b2: createBC("g1", "src1", "dst1", l1),
297+
wantReplace: true,
298+
},
299+
{
300+
desc: "DifferentSource",
301+
b1: createBC("g2", "src1", "dst1", l1),
302+
b2: createBC("g2", "src2", "dst1", l1),
303+
wantReplace: false,
304+
},
305+
{
306+
desc: "DifferentLabels",
307+
b1: createBC("g3", "src1", "dst1", l1),
308+
b2: createBC("g3", "src1", "dst1", l2),
309+
wantReplace: false,
310+
},
311+
{
312+
desc: "DifferentDestination",
313+
b1: createBC("g4", "src1", "dst1", l1),
314+
b2: createBC("g4", "src1", "dst2", l1),
315+
wantReplace: false,
316+
},
317+
{
318+
desc: "DifferentGroup",
319+
b1: createBC("g5", "src1", "dst1", l1),
320+
b2: createBC("g6", "src1", "dst1", l1),
321+
wantReplace: false,
322+
},
323+
} {
324+
t.Run(tc.desc, func(t *testing.T) {
325+
id1, _ := ids.RandomBroadcastID()
326+
id2, _ := ids.RandomBroadcastID()
327+
tc.b1.BroadcastId = id1.Bytes()
328+
tc.b2.BroadcastId = id2.Bytes()
329+
330+
if err := ds.CreateBroadcast(ctx, tc.b1, 10); err != nil {
331+
if errors.Is(err, db.ErrNotSupported) {
332+
t.Skip("Broadcast replacement feature not supported by this datastore")
333+
}
334+
t.Fatalf("Failed to create b1: %v", err)
335+
}
336+
if !isBroadcastActive(t, ds, id1) {
337+
t.Fatalf("b1 should be active upon creation")
338+
}
339+
340+
if err := ds.CreateBroadcast(ctx, tc.b2, 10); err != nil {
341+
t.Fatalf("Failed to create b2: %v", err)
342+
}
343+
344+
gotActive := isBroadcastActive(t, ds, id1)
345+
wantActive := !tc.wantReplace
346+
if gotActive != wantActive {
347+
t.Errorf("b1 got active status %v, want %v", gotActive, wantActive)
348+
}
349+
if !isBroadcastActive(t, ds, id2) {
350+
t.Errorf("b2 should always be active")
351+
}
352+
})
353+
}
354+
}
355+
257356
func broadcastStoreTestSuite(t *testing.T, env DbTestEnv) {
258357
t.Run("BroadcastStoreTestSuite", func(t *testing.T) {
259358
runTestSuite(t, env, map[string]func(*testing.T, db.Store){
260-
"BroadcastStoreTest": broadcastStoreTest,
359+
"BroadcastStoreTest": broadcastStoreTest,
360+
"BroadcastReplacementTest": testBroadcastReplacement,
261361
})
262362
})
263363
}

fleetspeak/src/server/mysql/broadcaststore.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,12 @@ func (d *Datastore) CreateBroadcast(ctx context.Context, b *spb.Broadcast, limit
114114
}
115115
dbB.messageLimit = limit
116116
return d.runInTx(ctx, false, func(tx *sql.Tx) error {
117+
if b.GetGroupName() != "" {
118+
// Failing broadcast requests with a groupname set is preferable to silently failing
119+
// to meet the expected behavior.
120+
return fmt.Errorf("broadcast replacement feature is not supported by the mysql datastore: %w", db.ErrNotSupported)
121+
}
122+
117123
if _, err := tx.ExecContext(ctx, "INSERT INTO broadcasts("+
118124
"broadcast_id, "+
119125
"source_service_name, "+

fleetspeak/src/server/proto/fleetspeak_server/broadcasts.pb.go

Lines changed: 21 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

fleetspeak/src/server/proto/fleetspeak_server/broadcasts.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,10 @@ message Broadcast {
3737

3838
// The payload of the broadcast.
3939
google.protobuf.Any data = 6;
40+
41+
// An optional group name that the broadcast gets categorized under.
42+
// If set, this broadcast will replace older broadcasts with the same group,
43+
// source, destination, and required labels on creation. This effectively
44+
// causes clients to only receive the latest broadcast in the same group.
45+
string group_name = 8;
4046
}

fleetspeak/src/server/spanner/broadcaststore.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ func (d *Datastore) CreateBroadcast(ctx context.Context, b *spb.Broadcast, limit
4848
MessageLimit: int64(limit),
4949
}
5050
_, err := d.dbClient.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
51+
if b.GetGroupName() != "" {
52+
// Failing broadcast requests with a groupname set is preferable to silently failing
53+
// to meet the expected behavior.
54+
return fmt.Errorf("broadcast replacement feature is not supported by the spanner datastore: %w", db.ErrNotSupported)
55+
}
5156
return d.tryCreateBroadcast(txn, &br)
5257
})
5358
return err

fleetspeak/src/server/sqlite/broadcaststore.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,12 @@ func (d *Datastore) CreateBroadcast(ctx context.Context, b *spb.Broadcast, limit
114114
}
115115
dbB.messageLimit = limit
116116
return d.runInTx(func(tx *sql.Tx) error {
117+
if b.GetGroupName() != "" {
118+
// Failing broadcast requests with a groupname set is preferable to silently failing
119+
// to meet the expected behavior.
120+
return fmt.Errorf("broadcast replacement feature is not supported by the sqlite datastore: %w", db.ErrNotSupported)
121+
}
122+
117123
if _, err := tx.ExecContext(ctx, "INSERT INTO broadcasts("+
118124
"broadcast_id, "+
119125
"source_service_name, "+

spanner-setup/fleetspeak.pb

86 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)