-
Notifications
You must be signed in to change notification settings - Fork 78
Expand file tree
/
Copy pathcommand-queue.js
More file actions
167 lines (144 loc) · 4.79 KB
/
command-queue.js
File metadata and controls
167 lines (144 loc) · 4.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
/**
* Command Queue Module
* Prevents overlapping execution of commands of the same type.
*/
const constants = require('./constants');
// State: executing commands by target type
// Maps target -> commandId
const executingCommands = new Map();
let hooks = null;
/**
* Initialize command queue with hooks.
* @param {Object} hooksModule - Hooks module for listening to action events
*/
exports.initialize = (hooksModule) => {
if (hooks) return; // Already initialized
hooks = hooksModule;
// Listen for action completion events
hooks.on('action', (event, actionId) => {
if (event === 'stopped' || event === 'failed') {
// Find and remove the completed action from executingCommands
for (const [target, commandId] of executingCommands.entries()) {
if (commandId === actionId) {
executingCommands.delete(target);
break;
}
}
}
});
};
/**
* Get the target/type key from a command.
* @param {Object} command - Command object
* @returns {string} - Target key (e.g., "alert", "lock")
*/
const getCommandTarget = (command) => {
// Check both command.body.target and command.target
if (command.body && command.body.target) {
return command.body.target;
}
if (command.target) {
return command.target;
}
return 'unknown';
};
/**
* Process command for execution.
* @param {Object} command - Command object
* @param {EventEmitter} emitter - Event emitter
* @param {Object} logger - Logger instance
* @returns {boolean} - Always true (ACK is always sent for received commands)
*/
exports.enqueueCommand = (command, emitter, logger) => {
const target = getCommandTarget(command);
const commandId = command.id;
const commandType = command.body ? command.body.command : command.command;
// Providers ('get', 'report', 'cancel') and 'stop' commands don't need tracking
// - Providers execute immediately and return data
// - 'stop' commands are instant operations that just stop other actions
// - 'triggers' target doesn't emit standard action completion events
const isProvider = ['get', 'report', 'cancel'].includes(commandType);
const isStopCommand = commandType === 'stop';
const isTriggersTarget = target === 'triggers';
const skipTracking = isProvider || isStopCommand || isTriggersTarget;
// Check if a command of this type is currently executing (only for tracked commands)
if (!skipTracking && executingCommands.has(target)) {
// Duplicate command - send ACK but do NOT execute
const idMsg = commandId ? ` (id: ${commandId})` : '';
logger.warn(`Command of type '${target}' already executing, ignoring duplicate${idMsg}`);
// Emit event for monitoring purposes (command was received but not executed)
emitter.emit('command_rejected', {
command,
reason: `Already running: ${target}`,
});
return true; // ACK sent to confirm reception, but command will NOT execute
}
// Execute immediately
const idMsg = commandId ? ` (id: ${commandId})` : '';
logger.info(`Executing command of type '${target}'${idMsg}`);
// Track executing command (only for actions that need tracking)
if (!skipTracking) {
executingCommands.set(target, commandId);
}
emitter.emit('command', command);
return true; // ACK will be sent
};
/**
* Process array of commands through the queue.
* @param {Array} commands - Array of command objects
* @param {EventEmitter} emitter - Event emitter
* @param {Object} logger - Logger instance
* @returns {Array} - Array of accepted commands
*/
exports.processCommands = (commands, emitter, logger) => {
if (!Array.isArray(commands)) {
logger.error('processCommands expects an array');
return [];
}
const acceptedCommands = [];
commands.forEach((command) => {
const accepted = exports.enqueueCommand(command, emitter, logger);
if (accepted) {
acceptedCommands.push(command);
}
});
return acceptedCommands;
};
/**
* Get current execution status for a target type.
* @param {string} target - Target type
* @returns {Object} - Execution status
*/
exports.getQueueStatus = (target) => {
return {
isExecuting: executingCommands.has(target),
executingId: executingCommands.has(target) ? executingCommands.get(target) : null,
};
};
/**
* Clear executing command for a target type.
* @param {string} target - Target type
*/
exports.clearQueue = (target) => {
executingCommands.delete(target);
};
/**
* Clear all executing commands.
*/
exports.clearAllQueues = () => {
executingCommands.clear();
};
/**
* Get all execution status for debugging.
* @returns {Object} - All execution status
*/
exports.getAllQueuesStatus = () => {
const status = {};
for (const [target, commandId] of executingCommands.entries()) {
status[target] = {
isExecuting: true,
executingId: commandId,
};
}
return status;
};