Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ __debug_*
agent/lhm/obj
agent/lhm/bin
dockerfile_agent_dev
.DS_Store
102 changes: 89 additions & 13 deletions agent/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,15 @@ type dockerManager struct {
containerStatsMutex sync.RWMutex // Mutex to prevent concurrent access to containerStatsMap
apiContainerList []*container.ApiInfo // List of containers from Docker API
containerStatsMap map[string]*container.Stats // Keeps track of container stats
validIds map[string]struct{} // Map of valid container ids, used to prune invalid containers from containerStatsMap
goodDockerVersion bool // Whether docker version is at least 25.0.0 (one-shot works correctly)
isWindows bool // Whether the Docker Engine API is running on Windows
buf *bytes.Buffer // Buffer to store and read response bodies
decoder *json.Decoder // Reusable JSON decoder that reads from buf
apiStats *container.ApiStats // Reusable API stats object
excludeContainers []string // Patterns to exclude containers by name
usingPodman bool // Whether the Docker Engine API is running on Podman
containerLabelsMap map[string]map[string]string
validIds map[string]struct{} // Map of valid container ids, used to prune invalid containers from containerStatsMap
goodDockerVersion bool // Whether docker version is at least 25.0.0 (one-shot works correctly)
isWindows bool // Whether the Docker Engine API is running on Windows
buf *bytes.Buffer // Buffer to store and read response bodies
decoder *json.Decoder // Reusable JSON decoder that reads from buf
apiStats *container.ApiStats // Reusable API stats object
excludeContainers []string // Patterns to exclude containers by name
usingPodman bool // Whether the Docker Engine API is running on Podman

// Cache-time-aware tracking for CPU stats (similar to cpu.go)
// Maps cache time intervals to container-specific CPU usage tracking
Expand Down Expand Up @@ -196,6 +197,7 @@ func (dm *dockerManager) getDockerStats(cacheTimeMs uint16) ([]*container.Stats,
for id, v := range dm.containerStatsMap {
if _, exists := dm.validIds[id]; !exists {
delete(dm.containerStatsMap, id)
delete(dm.containerLabelsMap, id)
} else {
stats = append(stats, v)
}
Expand Down Expand Up @@ -383,6 +385,7 @@ func parseDockerStatus(status string) (string, container.DockerHealth) {
// Updates stats for individual container with cache-time-aware delta tracking
func (dm *dockerManager) updateContainerStats(ctr *container.ApiInfo, cacheTimeMs uint16) error {
name := ctr.Names[0][1:]
labels := dm.getContainerLabels(ctr)

resp, err := dm.client.Get(fmt.Sprintf("http://localhost/containers/%s/stats?stream=0&one-shot=1", ctr.IdShort))
if err != nil {
Expand All @@ -400,6 +403,9 @@ func (dm *dockerManager) updateContainerStats(ctr *container.ApiInfo, cacheTimeM
}

stats.Id = ctr.IdShort
stats.Name = name
stats.Image = ctr.Image
stats.Labels = labels

statusText, health := parseDockerStatus(ctr.Status)
stats.Status = statusText
Expand Down Expand Up @@ -474,6 +480,7 @@ func (dm *dockerManager) deleteContainerStatsSync(id string) {
dm.containerStatsMutex.Lock()
defer dm.containerStatsMutex.Unlock()
delete(dm.containerStatsMap, id)
delete(dm.containerLabelsMap, id)
for ct := range dm.lastCpuContainer {
delete(dm.lastCpuContainer[ct], id)
}
Expand Down Expand Up @@ -556,11 +563,12 @@ func newDockerManager() *dockerManager {
Timeout: timeout,
Transport: userAgentTransport,
},
containerStatsMap: make(map[string]*container.Stats),
sem: make(chan struct{}, 5),
apiContainerList: []*container.ApiInfo{},
apiStats: &container.ApiStats{},
excludeContainers: excludeContainers,
containerStatsMap: make(map[string]*container.Stats),
containerLabelsMap: make(map[string]map[string]string),
sem: make(chan struct{}, 5),
apiContainerList: []*container.ApiInfo{},
apiStats: &container.ApiStats{},
excludeContainers: excludeContainers,

// Initialize cache-time-aware tracking structures
lastCpuContainer: make(map[uint16]map[string]uint64),
Expand All @@ -587,6 +595,74 @@ func newDockerManager() *dockerManager {
return manager
}

func copyLabels(src map[string]string) map[string]string {
if len(src) == 0 {
return map[string]string{}
}
out := make(map[string]string, len(src))
for k, v := range src {
out[k] = v
}
return out
}

func mergeLabels(base, extra map[string]string) map[string]string {
if len(extra) == 0 {
return base
}
merged := copyLabels(base)
for k, v := range extra {
merged[k] = v
}
return merged
}

func (dm *dockerManager) getContainerLabels(ctr *container.ApiInfo) map[string]string {
base := copyLabels(ctr.Labels)

dm.containerStatsMutex.RLock()
cached, ok := dm.containerLabelsMap[ctr.IdShort]
dm.containerStatsMutex.RUnlock()
if ok {
return cached
}

inspectLabels, err := dm.fetchInspectLabels(ctr.IdShort)
if err != nil {
dm.containerStatsMutex.Lock()
dm.containerLabelsMap[ctr.IdShort] = base
dm.containerStatsMutex.Unlock()
return base
}

labels := mergeLabels(base, inspectLabels)
dm.containerStatsMutex.Lock()
dm.containerLabelsMap[ctr.IdShort] = labels
dm.containerStatsMutex.Unlock()
return labels
}

func (dm *dockerManager) fetchInspectLabels(containerID string) (map[string]string, error) {
type inspectResponse struct {
Config struct {
Labels map[string]string `json:"Labels"`
} `json:"Config"`
}

resp, err := dm.client.Get(fmt.Sprintf("http://localhost/containers/%s/json", containerID))
if err != nil {
return nil, err
}
defer resp.Body.Close()

var info inspectResponse
if err := json.NewDecoder(resp.Body).Decode(&info); err != nil {
return nil, err
}

return info.Config.Labels, nil
}

// checkDockerVersion checks Docker version and sets goodDockerVersion if at least 25.0.0.
// Versions before 25.0.0 have a bug with one-shot which requires all requests to be made in one batch.
func (dm *dockerManager) checkDockerVersion() {
Expand Down
11 changes: 6 additions & 5 deletions internal/entities/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ type ApiInfo struct {
Status string
State string
Image string
Labels map[string]string
// ImageID string
// Command string
// Created int64
// Ports []Port
// SizeRw int64 `json:",omitempty"`
// SizeRootFs int64 `json:",omitempty"`
// Labels map[string]string
// HostConfig struct {
// NetworkMode string `json:",omitempty"`
// Annotations map[string]string `json:",omitempty"`
Expand Down Expand Up @@ -136,10 +136,11 @@ type Stats struct {
NetworkRecv float64 `json:"nr,omitzero" cbor:"4,keyasint,omitzero"` // deprecated 0.18.3 (MB) - keep field for old agents/records
Bandwidth [2]uint64 `json:"b,omitzero" cbor:"9,keyasint,omitzero"` // [sent bytes, recv bytes]

Health DockerHealth `json:"-" cbor:"5,keyasint"`
Status string `json:"-" cbor:"6,keyasint"`
Id string `json:"-" cbor:"7,keyasint"`
Image string `json:"-" cbor:"8,keyasint"`
Health DockerHealth `json:"-" cbor:"5,keyasint"`
Status string `json:"-" cbor:"6,keyasint"`
Id string `json:"-" cbor:"7,keyasint"`
Image string `json:"-" cbor:"8,keyasint"`
Labels map[string]string `json:"-" cbor:"10,keyasint,omitempty"`
// PrevCpu [2]uint64 `json:"-"`
CpuSystem uint64 `json:"-"`
CpuContainer uint64 `json:"-"`
Expand Down
2 changes: 2 additions & 0 deletions internal/hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ func setCollectionAuthSettings(app core.App) error {
return err
}
containersListRule := strings.Replace(systemsReadRule, "users.id", "system.users.id", 1)
containersUpdateRule := containersListRule + " && @request.auth.role != \"readonly\""
containersCollection.ListRule = &containersListRule
containersCollection.UpdateRule = &containersUpdateRule
if err := app.Save(containersCollection); err != nil {
return err
}
Expand Down
37 changes: 33 additions & 4 deletions internal/hub/systems/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"hash/fnv"
"math/rand"
"net"
"regexp"
"strings"
"sync/atomic"
"time"
Expand All @@ -31,6 +32,8 @@ import (
"golang.org/x/crypto/ssh"
)

var autoContainerAliasVarPattern = regexp.MustCompile(`\$([A-Za-z0-9_.]+)`)

type System struct {
Id string `db:"id"`
Host string `db:"host"`
Expand Down Expand Up @@ -192,7 +195,8 @@ func (sys *System) createRecords(data *system.CombinedData) (*core.Record, error
// add containers and container_stats records
if len(data.Containers) > 0 {
if data.Containers[0].Id != "" {
if err := createContainerRecords(txApp, data.Containers, sys.Id); err != nil {
fmt.Printf("RECORDS - %+v", systemRecord.Collection())
if err := createContainerRecords(txApp, data.Containers, sys.Id, systemRecord.GetString("auto_container_alias")); err != nil {
return err
}
}
Expand Down Expand Up @@ -297,7 +301,7 @@ func createSystemdStatsRecords(app core.App, data []*systemd.Service, systemId s
}

// createContainerRecords creates container records
func createContainerRecords(app core.App, data []*container.Stats, systemId string) error {
func createContainerRecords(app core.App, data []*container.Stats, systemId string, autoContainerAliasTemplate string) error {
if len(data) == 0 {
return nil
}
Expand All @@ -309,7 +313,7 @@ func createContainerRecords(app core.App, data []*container.Stats, systemId stri
valueStrings := make([]string, 0, len(data))
for i, container := range data {
suffix := fmt.Sprintf("%d", i)
valueStrings = append(valueStrings, fmt.Sprintf("({:id%[1]s}, {:system}, {:name%[1]s}, {:image%[1]s}, {:status%[1]s}, {:health%[1]s}, {:cpu%[1]s}, {:memory%[1]s}, {:net%[1]s}, {:updated})", suffix))
valueStrings = append(valueStrings, fmt.Sprintf("({:id%[1]s}, {:system}, {:name%[1]s}, {:image%[1]s}, {:status%[1]s}, {:health%[1]s}, {:cpu%[1]s}, {:memory%[1]s}, {:net%[1]s}, {:alias%[1]s}, {:updated})", suffix))
params["id"+suffix] = container.Id
params["name"+suffix] = container.Name
params["image"+suffix] = container.Image
Expand All @@ -322,15 +326,40 @@ func createContainerRecords(app core.App, data []*container.Stats, systemId stri
netBytes = uint64((container.NetworkSent + container.NetworkRecv) * 1024 * 1024)
}
params["net"+suffix] = netBytes
params["alias"+suffix] = resolveAutoContainerAlias(autoContainerAliasTemplate, container.Labels)
}
queryString := fmt.Sprintf(
"INSERT INTO containers (id, system, name, image, status, health, cpu, memory, net, updated) VALUES %s ON CONFLICT(id) DO UPDATE SET system = excluded.system, name = excluded.name, image = excluded.image, status = excluded.status, health = excluded.health, cpu = excluded.cpu, memory = excluded.memory, net = excluded.net, updated = excluded.updated",
"INSERT INTO containers (id, system, name, image, status, health, cpu, memory, net, alias, updated) VALUES %s ON CONFLICT(id) DO UPDATE SET system = excluded.system, name = excluded.name, image = excluded.image, status = excluded.status, health = excluded.health, cpu = excluded.cpu, memory = excluded.memory, net = excluded.net, alias = CASE WHEN (containers.alias IS NULL OR containers.alias = '') AND (excluded.alias IS NOT NULL AND excluded.alias != '') THEN excluded.alias ELSE containers.alias END, updated = excluded.updated",
strings.Join(valueStrings, ","),
)
_, err := app.DB().NewQuery(queryString).Bind(params).Execute()
return err
}

func resolveAutoContainerAlias(template string, labels map[string]string) string {
template = strings.TrimSpace(template)
if template == "" || len(labels) == 0 {
return ""
}

matches := autoContainerAliasVarPattern.FindAllStringSubmatch(template, -1)
if len(matches) == 0 {
return ""
}

alias := template
for _, match := range matches {
labelKey := match[1]
labelValue, ok := labels[labelKey]
if !ok || strings.TrimSpace(labelValue) == "" {
return ""
}
alias = strings.ReplaceAll(alias, match[0], strings.TrimSpace(labelValue))
}

return strings.TrimSpace(alias)
}

// getRecord retrieves the system record from the database.
// If the record is not found, it removes the system from the manager.
func (sys *System) getRecord() (*core.Record, error) {
Expand Down
36 changes: 36 additions & 0 deletions internal/hub/systems/system_alias_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package systems

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestResolveAutoContainerAlias(t *testing.T) {
t.Run("replaces variables from labels", func(t *testing.T) {
alias := resolveAutoContainerAlias("$SERVICE-$ENV", map[string]string{
"SERVICE": "api",
"ENV": "prod",
})
assert.Equal(t, "api-prod", alias)
})

t.Run("returns empty when a label is missing", func(t *testing.T) {
alias := resolveAutoContainerAlias("$SERVICE-$ENV", map[string]string{
"SERVICE": "api",
})
assert.Empty(t, alias)
})

t.Run("returns empty when template has no variables", func(t *testing.T) {
alias := resolveAutoContainerAlias("plain-text", map[string]string{
"SERVICE": "api",
})
assert.Empty(t, alias)
})

t.Run("returns empty when labels are missing", func(t *testing.T) {
alias := resolveAutoContainerAlias("$SERVICE", nil)
assert.Empty(t, alias)
})
}
Loading