Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ jobs:
with:
gh-token: ${{ secrets.GH_TOKEN }}

- name: Build
env:
GH_TOKEN: ${{ secrets.GH_TOKEN }}
run: pnpm build

- name: Check
run: pnpm check

Expand Down
39 changes: 27 additions & 12 deletions benchmark/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ export function benchmark(type: string, options: any): void {
);
}

export function generateTestData(count: number, keySize: number = 20, valueSize: number = 100) {
export function generateTestData(
count: number,
keySize: number = 20,
valueSize: number = 100
): Array<{ key: string; value: string }> {
const data: Array<{ key: string; value: string }> = [];

for (let i = 0; i < count; i++) {
Expand Down Expand Up @@ -244,7 +248,7 @@ let workerCurrentSuites: WorkerSuite[] = [];
* Runs on the main thread and the worker thread. It discovers nested
* `describe()` calls and groups them into suites.
*/
function describeShim(name: string, fn: () => void) {
function describeShim(name: string, fn: () => void): void {
const suites =
workerCurrentSuites.length > 0
? workerCurrentSuites[workerCurrentSuites.length - 1].suites
Expand All @@ -261,23 +265,30 @@ function describeShim(name: string, fn: () => void) {
workerCurrentSuites.pop();
}

type WorkerDescribe = {
(name: string, fn: () => void): void;
only(name: string, fn: () => void): void;
skip(name: string, fn: () => void): void;
todo(name: string, fn: () => void): void;
};

/**
* This is the main `workerDescribe()` function that is exported. It has two
* code paths:
*
* 1. The main thread, which does discovery and calls `vitest.describe()`
* 2. The worker thread, which does discovery only
*/
export const workerDescribe = await (async () => {
export const workerDescribe: WorkerDescribe = await (async () => {
if (workerData?.benchmarkWorkerId) {
return Object.assign(describeShim, {
only(name: string, fn: () => void) {
only(name: string, fn: () => void): void {
describeShim(name, fn);
},
skip() {
skip(_name: string, _fn: () => void): void {
throw new Error('skip not supported in worker');
},
todo() {
todo(_name: string, _fn: () => void): void {
throw new Error('todo not supported in worker');
},
});
Expand All @@ -286,7 +297,7 @@ export const workerDescribe = await (async () => {
// main thread
const { describe } = await import('vitest');
return Object.assign(
(name: string, fn: () => void) => {
(name: string, fn: () => void): void => {
describeShim(name, () => {
// snapshot the worker current suites in the closure because vitest
// fires callbacks once all describes()'s have been discovered
Expand All @@ -300,7 +311,7 @@ export const workerDescribe = await (async () => {
});
},
{
only(name: string, fn: () => void) {
only(name: string, fn: () => void): void {
describeShim(name, () => {
const state = [...workerCurrentSuites];
describe.only(name, () => {
Expand Down Expand Up @@ -492,7 +503,7 @@ export function workerBenchmark(type: string, options: any): void {
* Runs on the worker thread, opens the database and wires up the message
* listeners.
*/
export async function workerInit() {
export async function workerInit(): Promise<void> {
if (!parentPort) {
throw new Error('Failed to initialize worker: parentPort is not available');
}
Expand Down Expand Up @@ -545,7 +556,7 @@ export async function workerInit() {
/**
* Runs on the worker thread and attempts to find the requested benchmark.
*/
function workerFindBenchmark() {
function workerFindBenchmark(): WorkerBenchmark {
let suite: WorkerSuite | undefined;
let suites: Record<string, WorkerSuite> | undefined = workerSuites;

Expand All @@ -567,7 +578,7 @@ function workerFindBenchmark() {
/**
* Runs on the main thread and launches a worker thread.
*/
export function workerLaunch(workerData: Record<string, any> = {}) {
export function workerLaunch(workerData: Record<string, any> = {}): Worker {
// Node.js 18 and older doesn't properly eval ESM code
const majorVersion = parseInt(process.versions.node.split('.')[0]);
const script =
Expand All @@ -589,7 +600,11 @@ export function workerLaunch(workerData: Record<string, any> = {}) {
return new Worker(script, { eval: true, workerData });
}

function withResolvers<T>() {
function withResolvers<T>(): {
resolve: (value: T) => void;
reject: (reason?: any) => void;
promise: Promise<T>;
} {
let resolve, reject;
const promise = new Promise<T>((res, rej) => {
resolve = res;
Expand Down
74 changes: 74 additions & 0 deletions stress-test/db-instances.stress.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { registryStatus } from '../src/index.js';
import { dbRunner } from '../test/lib/util.js';
import { createWorkerBootstrapScript } from '../test/lib/util.js';
import { setTimeout as delay } from 'node:timers/promises';
import { Worker } from 'node:worker_threads';
import { describe, expect, it } from 'vitest';

// Node.js 18 and older doesn't properly eval ESM code
const bootstrapScript = createWorkerBootstrapScript(
'./stress-test/workers/stress-db-instances-worker.mts'
);

describe('Stress DB Instances', () => {
it.skipIf(!globalThis.gc)(
'should create 10 worker threads and each open 500 databases with 25 column families',
() =>
dbRunner(async ({ dbPath }) => {
const promises: Promise<void>[] = [];
const workers: Worker[] = [];

const [initial] = registryStatus();

const workerThreads = 10;
const dbInstances = 500;
const numColumnFamilies = 25;

for (let i = 0; i < workerThreads; i++) {
const worker = new Worker(bootstrapScript, {
eval: true,
workerData: { path: dbPath, dbInstances, numColumnFamilies },
});
workers.push(worker);

promises.push(
new Promise<void>((resolve, reject) => {
worker.on('error', reject);
worker.on('message', (event) => {
if (event.done) {
resolve();
} else if (event.closed) {
resolve();
}
});
})
);
}

await Promise.all(promises);
promises.length = 0;

const [before] = registryStatus();

for (const worker of workers) {
worker.postMessage({ close: true });
}
await Promise.all(promises);

if (globalThis.gc) {
globalThis.gc();
await delay(100);
// Second gc() to finalize external buffers (napi_create_external_buffer
// C++ destructors may be deferred past the first gc cycle on some Node versions)
globalThis.gc();
await delay(50);
}

const [after] = registryStatus();

// +1 for the default column family
expect(Object.keys(before?.columnFamilies).length).toBe(numColumnFamilies + 1);
expect(initial.refCount).toBe(after.refCount);
})
);
});
21 changes: 4 additions & 17 deletions stress-test/transactions.stress.test.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,12 @@
import { dbRunner } from '../test/lib/util.js';
import { createWorkerBootstrapScript } from '../test/lib/util.js';
import { Worker } from 'node:worker_threads';
import { describe, it } from 'vitest';

// Node.js 18 and older doesn't properly eval ESM code
const majorVersion = parseInt(process.versions.node.split('.')[0]);
const bootstrapScript =
process.versions.deno || process.versions.bun
? `
import { pathToFileURL } from 'node:url';
import(pathToFileURL('./stress-test/workers/stress-transaction-put-worker.mts'));
`
: majorVersion < 20
? `
const tsx = require('tsx/cjs/api');
tsx.require('./stress-test/workers/stress-transaction-put-worker.mts', __dirname);
`
: `
import { register } from 'tsx/esm/api';
register();
import('./stress-test/workers/stress-transaction-put-worker.mts');
`;
const bootstrapScript = createWorkerBootstrapScript(
'./stress-test/workers/stress-transaction-put-worker.mts'
);

describe('Stress Transactions', () => {
it('should create 30 worker threads and commit 10k transactions', () =>
Expand Down
25 changes: 25 additions & 0 deletions stress-test/workers/stress-db-instances-worker.mts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { RocksDatabase } from '../../src/index.js';
import { parentPort, threadId, workerData } from 'node:worker_threads';

const dbs: RocksDatabase[] = [];

for (let i = 0; i < workerData.dbInstances; i++) {
for (let j = 0; j < workerData.numColumnFamilies; j++) {
const db = RocksDatabase.open(workerData.path, { name: `table-${j}` });
await db.transaction(async (transaction) => {
await transaction.put(`key-${i}-${j}-${threadId}`, 'value');
});
dbs.push(db);
}
}

parentPort?.postMessage({ done: true });

parentPort?.on('message', (event) => {
if (event.close) {
for (const db of dbs) {
db.close();
}
parentPort?.postMessage({ closed: true });
}
});
2 changes: 1 addition & 1 deletion test/transaction-log.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ describe('Transaction Log', () => {
db.removeListener('new-transaction-log', listener);
}));

(globalThis.gc ? it : it.skip)('should cleanup transaction log instance on GC', () =>
it.skipIf(!globalThis.gc)('should cleanup transaction log instance on GC', () =>
dbRunner(async ({ db }) => {
let weakRef: WeakRef<TransactionLog> | undefined;

Expand Down
2 changes: 1 addition & 1 deletion test/user-shared-buffer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ describe('User Shared Buffer', () => {
});
}));

(globalThis.gc ? it : it.skip)(
it.skipIf(!globalThis.gc)(
'should cleanup callbacks on GC',
() =>
dbRunner(async ({ db }) => {
Expand Down
2 changes: 1 addition & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@
"typeRoots": ["./node_modules/@types"],
"types": ["node"]
},
"include": ["./tsdown.config.ts", "./src", "./scripts", "./test"]
"include": ["./benchmark", "./scripts", "./src", "./stress-test", "./test", "./tsdown.config.ts"]
}
Loading