Skip to content

Commit fe219ae

Browse files
authored
Add purge-queue command (#41)
1 parent 11843a6 commit fe219ae

9 files changed

Lines changed: 204 additions & 21 deletions

README.md

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ brew upgrade dlq-utils
2222

2323
- Invoke a function using messages from a queue
2424
- Move or copy messages from one queue to another
25+
- Delete messages from a queue based on a regular expression
2526
- Template a message before sending it to a queue or invoking a function
26-
- (soon) Filter messages before sending them to a queue or invoking a function
2727
- Save messages from a queue to a text file
2828
- Read lines from a text file and send them as messages to a queue
2929

@@ -34,28 +34,35 @@ It's necessary to specify the environment variable `AWS_PROFILE` with the [named
3434
Invoke an AWS Lambda function with all messages from an Amazon SQS queue, being able to transform them before invoking the function.
3535

3636
```shell
37-
AWS_PROFILE=configured-profile dlq-utils queue-to-lambda --queue-url "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" --function-name "some-lambda-function"
37+
AWS_PROFILE=configured-profile dlq-utils queue-to-lambda -s "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" -d "some-lambda-function"
3838
```
3939

4040
#### `file-to-queue`
4141
Read a text file to send each line as a message to an Amazon SQS queue.
4242

4343
```shell
44-
AWS_PROFILE=configured-profile dlq-utils file-to-queue --queue-url "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" --file "/Users/myuser/Documents/some-file.txt"
44+
AWS_PROFILE=configured-profile dlq-utils file-to-queue -s "/Users/myuser/Documents/some-file.txt" -d "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue"
4545
```
4646

4747
#### `queue-to-file`
4848
Consume all messages from an Amazon SQS queue to save them in a text file.
4949

5050
```shell
51-
AWS_PROFILE=configured-profile dlq-utils queue-to-file --queue-url "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" --file "/Users/myuser/Documents/some-file.txt"
51+
AWS_PROFILE=configured-profile dlq-utils queue-to-file -s "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" -d "/Users/myuser/Documents/some-file.txt"
5252
```
5353

5454
#### `queue-to-queue`
5555
Move all messages from an Amazon SQS queue to another one, being able to transform them.
5656

5757
```shell
58-
AWS_PROFILE=configured-profile dlq-utils queue-to-queue --source-queue-url "https://sqs.us-east-1.amazonaws.com/000000000000/source-queue" --dest-queue-url "https://sqs.us-east-1.amazonaws.com/000000000000/dest-queue"
58+
AWS_PROFILE=configured-profile dlq-utils queue-to-queue -s "https://sqs.us-east-1.amazonaws.com/000000000000/source-queue" -d "https://sqs.us-east-1.amazonaws.com/000000000000/dest-queue"
59+
```
60+
61+
#### `purge-queue`
62+
Purge a queue conditionally based on a regular expression tested on the message body.
63+
64+
```shell
65+
AWS_PROFILE=configured-profile dlq-utils purge-queue --queue-url "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" --regex ".foo"
5966
```
6067
For full documentation run `dlq-utils help [command]`.
6168

@@ -74,12 +81,15 @@ Next, you need to run the command below inside the repository folder to locally
7481
npx link .
7582
```
7683

77-
After that, every time you make a change in the code base you need to rebuild the project to update the CLI behavior.
84+
After that, every time you make a change in the code base you need to rebuild the project to update the CLI behavior. To execute commands use the prefix `npx` and do not forget the parameter `--endpoint-url`:
85+
86+
```shell
87+
npx dlq-utils queue-to-lambda -s "http://localhost:9324/000000000000/some-queue" -d "some-lambda-function" --endpoint-url "http://localhost:9324"
88+
```
7889

7990
## Roadmap
8091

8192
Here you will find a list of features I want to include in the project:
8293

83-
- ✨ Add the ability to filter out messages with a regex
8494
- 🔧 Add tooling to facilitate local testing
8595
- 🔧 Add hot reload to automatically rebuild the project and improve the development experience

src/index.js

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import fileToQueue from "./file-to-queue";
77
import queueToFile from "./queue-to-file";
88
import queueToLambda from "./queue-to-lambda";
99
import queueToQueue from "./queue-to-queue";
10+
import purgeQueue from "./purge-queue";
1011

1112
const program = new Command();
1213

@@ -53,8 +54,8 @@ program
5354
"Consume all messages from a queue (without deleting) to save them in a text file. " +
5455
"If the file already exists it will be overwritten"
5556
)
56-
.requiredOption("-s --file <string>", "The full name of the text file where the messages should be saved")
57-
.requiredOption("-d --queue-url <string>", "The URL of the queue that contains the messages")
57+
.requiredOption("-s --queue-url <string>", "The URL of the queue that contains the messages")
58+
.requiredOption("-d --file <string>", "The full name of the text file where the messages should be saved")
5859
.option(
5960
"--endpoint-url <string>",
6061
"Just like in aws-cli commands, this is only required when using a local version of SQS"
@@ -78,4 +79,15 @@ program
7879
)
7980
.action(queueToQueue);
8081

82+
program
83+
.command("purge-queue")
84+
.description("Clear queue conditionally based on a regular expression")
85+
.requiredOption("-r --regex <string>", "The regex to select the messages that should be deleted")
86+
.requiredOption("-q --queue-url <string>", "The URL of the queue that contains the messages")
87+
.option(
88+
"--endpoint-url <string>",
89+
"Just like in aws-cli commands, this is only required when using a local version of SQS and Lambda (e.g. LocalStack)"
90+
)
91+
.action(purgeQueue);
92+
8193
program.parse();

src/purge-queue.js

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import logger from "./logger";
2+
import { SQSClient } from "@aws-sdk/client-sqs";
3+
import { consumeMessages } from "./sqs-consumer";
4+
import resourceValidator from "./resource-validator";
5+
6+
export default async ({ regex: condition, queueUrl, endpointUrl: endpoint }) => {
7+
const sqsClient = new SQSClient({ endpoint });
8+
9+
if (!(await resourceValidator.validateQueue(sqsClient, queueUrl))) {
10+
return;
11+
}
12+
const regex = new RegExp(condition);
13+
14+
let totalMessages = 0;
15+
let totalMessagesDeleted = 0;
16+
const messageConsumer = async (message) => {
17+
totalMessages++;
18+
const messageBody = message.body;
19+
if (regex.test(messageBody)) {
20+
totalMessagesDeleted++;
21+
return true;
22+
}
23+
return false;
24+
};
25+
await consumeMessages(sqsClient, queueUrl, messageConsumer);
26+
logger.success(
27+
`Finished purge-queue successfully. ${totalMessagesDeleted} of ${totalMessages} messages were deleted`
28+
);
29+
};

src/queue-to-file.js

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import resourceValidator from "./resource-validator";
77
export default async ({ file, queueUrl, endpointUrl: endpoint }) => {
88
const sqsClient = new SQSClient({ endpoint });
99

10-
if (!(await isQueueValid(sqsClient, queueUrl))) {
10+
if (!(await resourceValidator.validateQueue(sqsClient, queueUrl))) {
1111
return;
1212
}
1313

@@ -23,8 +23,3 @@ export default async ({ file, queueUrl, endpointUrl: endpoint }) => {
2323
await consumeMessages(sqsClient, queueUrl, messageConsumer);
2424
logger.success(`Finished queue-to-file successfully. ${totalMessagesSaved} messages saved to file`);
2525
};
26-
27-
async function isQueueValid(sqsClient, queueUrl) {
28-
const resourcesToValidate = [{ type: "queue", value: queueUrl }];
29-
return await resourceValidator.validate(resourcesToValidate, sqsClient);
30-
}

src/resource-validator.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ async function validateQueue(sqsClient, queueUrl) {
5757
if (await isExistingQueue(sqsClient, queueName)) {
5858
return true;
5959
}
60-
logger.error("(ERROR) Some of the specified queues do not exist or are not accessible");
60+
logger.error(`(ERROR) Queue ${queueUrl} does not exist or is not accessible`);
6161
return false;
6262
}
6363

@@ -76,4 +76,4 @@ function mapPermissionToFsMode(permissionFlag) {
7676
throw new Error("Unknown file permission");
7777
}
7878

79-
export default { validate };
79+
export default { validate, validateQueue };
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import purgeQueue from "purge-queue";
2+
3+
const QUEUE_NAME = 'events-queue';
4+
5+
let sqsContainer;
6+
let sqsClient;
7+
8+
beforeAll(async () => {
9+
sqsContainer = await setUpSqsService();
10+
sqsClient = createSqsClient();
11+
12+
await createQueue(sqsClient, QUEUE_NAME);
13+
});
14+
15+
afterAll(async () => {
16+
await sqsContainer.stop();
17+
});
18+
19+
it('should consume messages from queue and delete only those that match the regex', async () => {
20+
const queueUrl = getQueueUrl(QUEUE_NAME);
21+
const messages = await sendTestMessages(sqsClient, QUEUE_NAME);
22+
23+
const messagesThatShouldBeDeleted = messages.filter(message => message.slice(-1) % 2);
24+
const messagesThatShouldBeKept = messages.filter(message => !(message.slice(-1) % 2));
25+
26+
await purgeQueue({
27+
endpointUrl: SQS_ENDPOINT_URL,
28+
regex: `^(${messagesThatShouldBeDeleted.join("|")})$`,
29+
queueUrl
30+
});
31+
32+
await waitVisibilityTimeout();
33+
await assertQueueContainsMessages(sqsClient, QUEUE_NAME, messagesThatShouldBeKept);
34+
});
35+
36+
it('should not throw exception when queue does not exist', async () => {
37+
const queueUrl = getQueueUrl("nonexistent");
38+
39+
await purgeQueue({
40+
endpointUrl: SQS_ENDPOINT_URL,
41+
regex: ".",
42+
queueUrl
43+
});
44+
});

test/purge-queue.test.js

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import purgeQueue from "purge-queue";
2+
import { consumeMessages } from "sqs-consumer";
3+
import resourceValidator from "resource-validator";
4+
jest.mock('sqs-consumer', () => ({
5+
consumeMessages: jest.fn()
6+
}));
7+
jest.mock('resource-validator', () => ({
8+
validateQueue: jest.fn()
9+
}));
10+
11+
it('should not consume messages when queue is not valid', async () => {
12+
const queueUrl = 'https://sqs.us-east-1.amazonaws.com/00000000/test-queue';
13+
14+
resourceValidator.validateQueue.mockReturnValueOnce(false);
15+
16+
await purgeQueue({ condition: ".", queueUrl });
17+
18+
expect(consumeMessages.mock.calls.length).toBe(0);
19+
});
20+
21+
it('should use consumer that returns true when message matches condition', async () => {
22+
const queueUrl = 'https://sqs.us-east-1.amazonaws.com/00000000/test-queue';
23+
const endpointUrl = 'http://localhost:4566';
24+
25+
resourceValidator.validateQueue.mockReturnValueOnce(true);
26+
27+
await purgeQueue({ regex: "\"field\":\\s*\"value\"", queueUrl, endpointUrl });
28+
29+
expect(consumeMessages.mock.calls.length).toBe(1);
30+
expect(consumeMessages.mock.calls[0][1]).toEqual(queueUrl);
31+
32+
const createdSqsClient = consumeMessages.mock.calls[0][0];
33+
const resolvedEndpoint = await createdSqsClient.config.endpoint();
34+
35+
expect(resolvedEndpoint.protocol).toBe('http:');
36+
expect(resolvedEndpoint.hostname).toBe('localhost');
37+
expect(resolvedEndpoint.port).toBe(4566);
38+
39+
const messageBody = '{ "field": "value", "other": 2 }';
40+
const shouldDeleteMessage = await consumeMessages.mock.calls[0][2]({ body: messageBody });
41+
expect(shouldDeleteMessage).toBe(true);
42+
});
43+
44+
it('should use consumer that returns false when message does not match condition', async () => {
45+
const queueUrl = 'https://sqs.us-east-1.amazonaws.com/00000000/test-queue';
46+
const endpointUrl = 'http://localhost:4566';
47+
48+
resourceValidator.validateQueue.mockReturnValueOnce(true);
49+
50+
await purgeQueue({ regex: "\"field\":\\s*\"value\"", queueUrl, endpointUrl });
51+
52+
expect(consumeMessages.mock.calls.length).toBe(1);
53+
expect(consumeMessages.mock.calls[0][1]).toEqual(queueUrl);
54+
55+
const createdSqsClient = consumeMessages.mock.calls[0][0];
56+
const resolvedEndpoint = await createdSqsClient.config.endpoint();
57+
58+
expect(resolvedEndpoint.protocol).toBe('http:');
59+
expect(resolvedEndpoint.hostname).toBe('localhost');
60+
expect(resolvedEndpoint.port).toBe(4566);
61+
62+
const messageBody = '{ "field": "potato", "other": 2 }';
63+
const shouldDeleteMessage = await consumeMessages.mock.calls[0][2]({ body: messageBody });
64+
expect(shouldDeleteMessage).toBe(false);
65+
});

test/queue-to-file.test.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ jest.mock('fs', () => ({
1010
createWriteStream: jest.fn()
1111
}));
1212
jest.mock('resource-validator', () => ({
13-
validate: jest.fn()
13+
validateQueue: jest.fn()
1414
}));
1515

1616
it('should consume messages from queue and save them in file', async () => {
@@ -20,7 +20,7 @@ it('should consume messages from queue and save them in file', async () => {
2020
const lineWriter = { write: jest.fn() };
2121

2222
fs.createWriteStream.mockReturnValueOnce(lineWriter);
23-
resourceValidator.validate.mockReturnValueOnce(true);
23+
resourceValidator.validateQueue.mockReturnValueOnce(true);
2424

2525
await queueToFile({ queueUrl, file, endpointUrl });
2626

@@ -46,7 +46,7 @@ it('should create consumer that returns false', async () => {
4646
const lineWriter = { write: jest.fn() };
4747

4848
fs.createWriteStream.mockReturnValueOnce(lineWriter);
49-
resourceValidator.validate.mockReturnValueOnce(true);
49+
resourceValidator.validateQueue.mockReturnValueOnce(true);
5050

5151
await queueToFile({ queueUrl, file });
5252

@@ -60,7 +60,7 @@ it('should not consume messages when queue is not valid', async () => {
6060
const queueUrl = 'https://sqs.us-east-1.amazonaws.com/00000000/test-queue';
6161
const file = 'path/filename.csv';
6262

63-
resourceValidator.validate.mockReturnValueOnce(false);
63+
resourceValidator.validateQueue.mockReturnValueOnce(false);
6464

6565
await queueToFile({ queueUrl, file });
6666

test/resource-validator.test.js

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,3 +144,31 @@ describe('validate', () => {
144144
expect(isExistingFunction.mock.calls.length).toBe(0);
145145
});
146146
});
147+
148+
describe('validateQueue', () => {
149+
it('should extract queue name from queue URL and return true when queue exists', async () => {
150+
const sqsClient = { send: jest.fn() };
151+
isExistingQueue.mockReturnValueOnce(true);
152+
153+
const valid = await resourceValidator.validateQueue(
154+
sqsClient,
155+
"https://sqs.us-east-1.amazonaws.com/00000000/test-queue"
156+
);
157+
158+
expect(isExistingQueue).toBeCalledWith(sqsClient, "test-queue");
159+
expect(valid).toBe(true);
160+
});
161+
162+
it('should extract queue name from queue URL and return false when queue does not exist', async () => {
163+
const sqsClient = { send: jest.fn() };
164+
isExistingQueue.mockReturnValueOnce(false);
165+
166+
const valid = await resourceValidator.validateQueue(
167+
sqsClient,
168+
"https://sqs.us-east-1.amazonaws.com/00000000/test-queue"
169+
);
170+
171+
expect(isExistingQueue).toBeCalledWith(sqsClient, "test-queue");
172+
expect(valid).toBe(false);
173+
});
174+
});

0 commit comments

Comments
 (0)