-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprocess_id.go
More file actions
185 lines (158 loc) · 4.6 KB
/
process_id.go
File metadata and controls
185 lines (158 loc) · 4.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package comet
import (
"os"
"path/filepath"
"runtime"
"sync"
"sync/atomic"
"syscall"
"unsafe"
)
var (
processIDMutex sync.Mutex
processIDCache = make(map[string]int)
)
// GetProcessID returns a unique process ID (0 to N-1) for multi-process deployments.
// It uses a shared memory file to coordinate process ID assignment across multiple
// processes that may start in any order.
//
// This is useful for deployments where you don't have explicit control over process
// startup order, such as:
// - systemd with multiple service instances
// - Container orchestration (Kubernetes, Docker Swarm)
// - Process managers (PM2, Supervisor)
// - Manual process spawning
//
// The function will return:
// - 0 to (NumCPU-1): Successfully acquired process ID
// - -1: Failed to acquire a process ID (all slots taken or error)
//
// Example usage:
//
// processID := comet.GetProcessID()
// if processID < 0 {
// log.Fatal("Failed to acquire process ID")
// }
// config := comet.MultiProcessConfig(processID, runtime.NumCPU())
// client, err := comet.NewClient(dataDir, config)
func GetProcessID(shmFile ...string) int {
shmFile_ := filepath.Join(os.TempDir(), "comet-worker-slots")
if len(shmFile) > 0 {
shmFile_ = shmFile[0]
}
return GetProcessIDWithFile(shmFile_)
}
// GetProcessIDWithFile is like GetProcessID but allows specifying a custom shared memory file.
// This is useful when running multiple independent Comet deployments on the same machine.
func GetProcessIDWithFile(shmFile string) int {
processIDMutex.Lock()
defer processIDMutex.Unlock()
// Check cache first
if result, exists := processIDCache[shmFile]; exists {
return result
}
// Not cached, acquire a new process ID
result := doGetProcessID(shmFile)
// Only cache successful acquisitions
if result >= 0 {
processIDCache[shmFile] = result
}
return result
}
func doGetProcessID(shmFile string) int {
if shmFile == "" {
shmFile = filepath.Join(os.TempDir(), "comet-worker-slots")
}
maxWorkers := runtime.NumCPU()
slotSize := 8 // 8 bytes for PID (uint32) + 4 bytes padding
file, err := os.OpenFile(shmFile, os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return -1
}
defer file.Close()
// Only truncate if file is too small
if stat, _ := file.Stat(); stat.Size() < int64(maxWorkers*slotSize) {
file.Truncate(int64(maxWorkers * slotSize))
}
data, err := syscall.Mmap(int(file.Fd()), 0, maxWorkers*slotSize,
syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED)
if err != nil {
return -1
}
defer syscall.Munmap(data)
myPID := uint32(os.Getpid())
// Try to acquire a slot
for i := range maxWorkers {
offset := i * slotSize
pidPtr := (*uint32)(unsafe.Pointer(&data[offset]))
// Try to claim an empty slot
if atomic.CompareAndSwapUint32(pidPtr, 0, myPID) {
return i
}
// Check if existing PID is still alive
existingPID := atomic.LoadUint32(pidPtr)
if existingPID != 0 && !isProcessAlive(int(existingPID)) {
// Process is dead, try to claim its slot
if atomic.CompareAndSwapUint32(pidPtr, existingPID, myPID) {
return i
}
}
// If this is our PID, we already have this slot
if existingPID == myPID {
return i
}
}
return -1
}
func isProcessAlive(pid int) bool {
if pid <= 0 {
return false
}
// Check if process exists and we can signal it
err := syscall.Kill(pid, 0)
if err == nil {
return true
}
// ESRCH means "no such process"
if errno, ok := err.(syscall.Errno); ok && errno == syscall.ESRCH {
return false
}
// EPERM means process exists but we can't signal it
// Treat as alive to be safe
return true
}
// ReleaseProcessID releases the process ID when shutting down gracefully.
// This is optional but helps with faster slot reuse.
func ReleaseProcessID(shmFile_ ...string) {
shmFile := filepath.Join(os.TempDir(), "comet-worker-slots")
if len(shmFile_) > 0 {
shmFile = shmFile_[0]
}
processIDMutex.Lock()
defer processIDMutex.Unlock()
processIDResult, exists := processIDCache[shmFile]
if !exists || processIDResult < 0 {
return // No slot to release
}
maxWorkers := runtime.NumCPU()
slotSize := 8
file, err := os.OpenFile(shmFile, os.O_RDWR, 0644)
if err != nil {
return
}
defer file.Close()
data, err := syscall.Mmap(int(file.Fd()), 0, maxWorkers*slotSize,
syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED)
if err != nil {
return
}
defer syscall.Munmap(data)
myPID := uint32(os.Getpid())
offset := processIDResult * slotSize
pidPtr := (*uint32)(unsafe.Pointer(&data[offset]))
// Only clear if it's still our PID
if atomic.CompareAndSwapUint32(pidPtr, myPID, 0) {
// Remove from cache after successful release
delete(processIDCache, shmFile)
}
}