Skip to content

Commit da41f5f

Browse files
authored
feat: tps pulse Phase 2 — prune + Flair publishing (ops-105) (#211)
Phase 2 additions to the PR review workflow engine: Prune: - pruneState(state, days) removes terminal instances (merged/closed) older than pruneAfterDays (default: 7) - Called at the start of every poll cycle before pollOnce - Returns count of pruned instances; logs if any pruned - PulseConfig.pruneAfterDays field (configurable in config.json) Flair publishing: - makeFlairPublisher(config) creates FlairPublisher if flairUrl + flairAgentId + flairAgentKey are set in config (opt-in, no-op otherwise) - On each state transition: publishEvent (OrgEvent) + writeMemory (persistent fact, stable id per PR key) - FlairPublisher injected into handleTransition/pollOnce as optional param - Publisher errors are caught and logged — never block the poll loop - PulseConfig gains flairUrl/flairAgentId/flairAgentKey fields Tests: +6 tests (3 pruneState, 3 FlairPublisher integration). 23 pulse tests total. 631 tests pass, 0 fail.
1 parent e998d63 commit da41f5f

2 files changed

Lines changed: 225 additions & 5 deletions

File tree

packages/cli/src/commands/pulse.ts

Lines changed: 81 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { spawnSync, type SpawnSyncReturns } from "node:child_process";
99
import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs";
1010
import { homedir } from "node:os";
1111
import { join } from "node:path";
12+
import { createFlairClient } from "../utils/flair-client.js";
1213

1314
// ---------------------------------------------------------------------------
1415
// Types
@@ -49,6 +50,10 @@ export interface PulseConfig {
4950
pollIntervalMs: number;
5051
remindAfterMs: number;
5152
ghAgent: string;
53+
pruneAfterDays: number;
54+
flairUrl?: string;
55+
flairAgentId?: string;
56+
flairAgentKey?: string;
5257
}
5358

5459
// Injectable runner type for testing
@@ -57,6 +62,14 @@ export type SyncRunner = (cmd: string, args: string[], opts?: { encoding?: Buffe
5762
// Injectable mail sender for testing
5863
export type MailSender = (to: string, body: string, agentId: string) => void;
5964

65+
// Injectable Flair publisher for testing (null = disabled)
66+
export type FlairPublisher = (
67+
key: string,
68+
from: PrState | null,
69+
to: PrState,
70+
instance: PrInstance,
71+
) => Promise<void>;
72+
6073
// ---------------------------------------------------------------------------
6174
// Defaults
6275
// ---------------------------------------------------------------------------
@@ -70,6 +83,7 @@ const DEFAULT_CONFIG: PulseConfig = {
7083
pollIntervalMs: 120000,
7184
remindAfterMs: 1800000,
7285
ghAgent: "flint",
86+
pruneAfterDays: 7,
7387
};
7488

7589
const PULSE_DIR = join(homedir(), ".tps", "pulse");
@@ -108,6 +122,27 @@ export function saveState(state: PulseState): void {
108122
writeFileSync(STATE_PATH, JSON.stringify(state, null, 2), "utf-8");
109123
}
110124

125+
/**
126+
* Remove terminal instances (merged/closed) older than pruneAfterDays.
127+
* Returns the number of instances pruned.
128+
*/
129+
export function pruneState(state: PulseState, pruneAfterDays: number): number {
130+
const cutoff = Date.now() - pruneAfterDays * 24 * 60 * 60 * 1000;
131+
const terminalStates = new Set<string>(["merged", "closed"]);
132+
let pruned = 0;
133+
for (const key of Object.keys(state.instances)) {
134+
const inst = state.instances[key];
135+
if (
136+
terminalStates.has(inst.state) &&
137+
new Date(inst.lastTransitionAt).getTime() < cutoff
138+
) {
139+
delete state.instances[key];
140+
pruned++;
141+
}
142+
}
143+
return pruned;
144+
}
145+
111146
// ---------------------------------------------------------------------------
112147
// GitHub API
113148
// ---------------------------------------------------------------------------
@@ -181,6 +216,7 @@ export function handleTransition(
181216
newState: PrState,
182217
config: PulseConfig,
183218
sender: MailSender,
219+
publisher?: FlairPublisher,
184220
): void {
185221
const oldState = instance.state;
186222
if (oldState === newState) return;
@@ -193,6 +229,13 @@ export function handleTransition(
193229

194230
console.log(`[pulse] ${key}: ${oldState}${newState}`);
195231

232+
// Publish to Flair (non-blocking, best-effort)
233+
if (publisher) {
234+
publisher(key, oldState, newState, instance).catch((e: unknown) => {
235+
console.warn(`[pulse] Flair publish failed: ${(e as Error).message}`);
236+
});
237+
}
238+
196239
// Determine mail targets based on transition
197240
switch (newState) {
198241
case "opened": {
@@ -305,6 +348,7 @@ export function pollOnce(
305348
state: PulseState,
306349
runner: SyncRunner = spawnSync as unknown as SyncRunner,
307350
sender: MailSender = defaultMailSender,
351+
publisher?: FlairPublisher,
308352
): void {
309353
const now = new Date().toISOString();
310354

@@ -363,12 +407,12 @@ export function pollOnce(
363407

364408
// If PR already has reviews, advance state
365409
if (computed !== "opened") {
366-
handleTransition(key, instance, computed, config, sender);
410+
handleTransition(key, instance, computed, config, sender, publisher);
367411
}
368412
} else {
369413
// Existing PR — check for state change
370414
existing.title = pr.title;
371-
handleTransition(key, existing, computed, config, sender);
415+
handleTransition(key, existing, computed, config, sender, publisher);
372416
}
373417
}
374418

@@ -380,7 +424,7 @@ export function pollOnce(
380424
try {
381425
const prData = ghApi(`repos/${repo}/pulls/${inst.prNumber}`, config.ghAgent, runner) as GhPr;
382426
if (prData.merged_at) {
383-
handleTransition(key, inst, "merged", config, sender);
427+
handleTransition(key, inst, "merged", config, sender, publisher);
384428
}
385429
} catch (e: unknown) {
386430
console.warn(`[pulse] Failed to check closed PR ${key}: ${(e as Error).message}`);
@@ -398,23 +442,55 @@ export function pollOnce(
398442
// Poll Loop (foreground)
399443
// ---------------------------------------------------------------------------
400444

445+
export function makeFlairPublisher(config: PulseConfig): FlairPublisher | undefined {
446+
if (!config.flairUrl || !config.flairAgentId || !config.flairAgentKey) return undefined;
447+
try {
448+
const client = createFlairClient(config.flairAgentId, config.flairUrl, config.flairAgentKey);
449+
return async (key: string, from: PrState | null, to: PrState, instance: PrInstance) => {
450+
// Publish OrgEvent for state transition
451+
await client.publishEvent({
452+
kind: `pr.${to}`,
453+
scope: key,
454+
summary: `PR #${instance.prNumber} (${instance.repo}): ${from ?? "new"}${to}`,
455+
detail: instance.title,
456+
refId: key,
457+
});
458+
// Store as persistent memory (stable id per PR key — same PUT overwrites)
459+
const memId = `${config.flairAgentId}-pulse-${key.replace(/[^a-z0-9]/gi, "-")}`;
460+
const content = `PR ${key} state: ${to}. Title: "${instance.title}". Transition: ${from ?? "new"}${to} at ${instance.lastTransitionAt}.`;
461+
await client.writeMemory(memId, content, {
462+
durability: "persistent",
463+
type: "fact",
464+
tags: ["pulse", "pr-lifecycle", to],
465+
});
466+
};
467+
} catch (e: unknown) {
468+
console.warn(`[pulse] Failed to create Flair publisher: ${(e as Error).message}`);
469+
return undefined;
470+
}
471+
}
472+
401473
export async function startPollLoop(
402474
config: PulseConfig,
403475
state: PulseState,
404-
opts: { dryRun?: boolean; runner?: SyncRunner; sender?: MailSender } = {},
476+
opts: { dryRun?: boolean; runner?: SyncRunner; sender?: MailSender; publisher?: FlairPublisher } = {},
405477
): Promise<void> {
406478
const runner = opts.runner ?? (spawnSync as unknown as SyncRunner);
407479
const sender = opts.dryRun
408480
? (to: string, body: string, _agentId: string) => {
409481
console.log(`[pulse/dry-run] would mail ${to}: ${body.slice(0, 80)}…`);
410482
}
411483
: (opts.sender ?? defaultMailSender);
484+
const publisher = opts.dryRun ? undefined : (opts.publisher ?? makeFlairPublisher(config));
412485

413486
console.log(`[pulse] Starting poll loop (interval=${config.pollIntervalMs}ms, repos=${config.repos.join(", ")})`);
414487

415488
const poll = () => {
416489
try {
417-
pollOnce(config, state, runner, sender);
490+
// Prune stale terminal instances before each poll
491+
const pruned = pruneState(state, config.pruneAfterDays);
492+
if (pruned > 0) console.log(`[pulse] Pruned ${pruned} completed instance(s) older than ${config.pruneAfterDays} days`);
493+
pollOnce(config, state, runner, sender, publisher);
418494
saveState(state);
419495
} catch (e: unknown) {
420496
console.error(`[pulse] Poll error: ${(e as Error).message}`);

packages/cli/test/pulse.test.ts

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@ import {
44
handleTransition,
55
checkReminders,
66
pollOnce,
7+
pruneState,
78
type PrInstance,
89
type PrState,
910
type PulseConfig,
1011
type PulseState,
1112
type SyncRunner,
1213
type MailSender,
14+
type FlairPublisher,
1315
} from "../src/commands/pulse.js";
1416

1517
// ---------------------------------------------------------------------------
@@ -357,3 +359,145 @@ describe("pollOnce", () => {
357359
expect(calls.length).toBe(2); // mail for PR #11 to both reviewers
358360
});
359361
});
362+
363+
// ---------------------------------------------------------------------------
364+
// pruneState
365+
// ---------------------------------------------------------------------------
366+
367+
describe("pruneState", () => {
368+
function makeTerminalInstance(state: PrState, daysOld: number): PrInstance {
369+
const ts = new Date(Date.now() - daysOld * 24 * 60 * 60 * 1000).toISOString();
370+
return {
371+
key: `pr:tpsdev-ai/cli#99`,
372+
repo: "tpsdev-ai/cli",
373+
prNumber: 99,
374+
title: "Old PR",
375+
state,
376+
openedAt: ts,
377+
lastTransitionAt: ts,
378+
reminderSentAt: null,
379+
history: [],
380+
};
381+
}
382+
383+
test("removes merged instances older than pruneAfterDays", () => {
384+
const state: PulseState = {
385+
version: 1,
386+
lastPollAt: "",
387+
instances: {
388+
"pr:tpsdev-ai/cli#1": makeTerminalInstance("merged", 8),
389+
"pr:tpsdev-ai/cli#2": makeTerminalInstance("merged", 3),
390+
},
391+
};
392+
const pruned = pruneState(state, 7);
393+
expect(pruned).toBe(1);
394+
expect(state.instances["pr:tpsdev-ai/cli#1"]).toBeUndefined();
395+
expect(state.instances["pr:tpsdev-ai/cli#2"]).toBeDefined();
396+
});
397+
398+
test("keeps non-terminal instances regardless of age", () => {
399+
const state: PulseState = {
400+
version: 1,
401+
lastPollAt: "",
402+
instances: {
403+
"pr:tpsdev-ai/cli#10": makeTerminalInstance("reviewing" as PrState, 30),
404+
},
405+
};
406+
const pruned = pruneState(state, 7);
407+
expect(pruned).toBe(0);
408+
expect(state.instances["pr:tpsdev-ai/cli#10"]).toBeDefined();
409+
});
410+
411+
test("returns 0 when nothing to prune", () => {
412+
const state: PulseState = { version: 1, lastPollAt: "", instances: {} };
413+
expect(pruneState(state, 7)).toBe(0);
414+
});
415+
});
416+
417+
// ---------------------------------------------------------------------------
418+
// Flair publisher (handleTransition integration)
419+
// ---------------------------------------------------------------------------
420+
421+
describe("FlairPublisher integration", () => {
422+
test("publisher is called on transition", async () => {
423+
const calls: Array<{ key: string; from: PrState | null; to: PrState }> = [];
424+
const publisher: FlairPublisher = async (key, from, to) => {
425+
calls.push({ key, from, to });
426+
};
427+
428+
const instance: PrInstance = {
429+
key: "pr:tpsdev-ai/cli#42",
430+
repo: "tpsdev-ai/cli",
431+
prNumber: 42,
432+
title: "Test PR",
433+
state: "reviewing",
434+
openedAt: new Date().toISOString(),
435+
lastTransitionAt: new Date().toISOString(),
436+
reminderSentAt: null,
437+
history: [],
438+
};
439+
const config = makeConfig();
440+
const mailCalls: string[] = [];
441+
const sender: MailSender = (to) => { mailCalls.push(to); };
442+
443+
handleTransition("pr:tpsdev-ai/cli#42", instance, "approved", config, sender, publisher);
444+
445+
// Give microtask queue a tick
446+
await Promise.resolve();
447+
448+
expect(calls).toHaveLength(1);
449+
expect(calls[0].from).toBe("reviewing");
450+
expect(calls[0].to).toBe("approved");
451+
});
452+
453+
test("publisher errors are swallowed (non-fatal)", async () => {
454+
const publisher: FlairPublisher = async () => {
455+
throw new Error("Flair unavailable");
456+
};
457+
458+
const instance: PrInstance = {
459+
key: "pr:tpsdev-ai/cli#1",
460+
repo: "tpsdev-ai/cli",
461+
prNumber: 1,
462+
title: "T",
463+
state: "reviewing",
464+
openedAt: new Date().toISOString(),
465+
lastTransitionAt: new Date().toISOString(),
466+
reminderSentAt: null,
467+
history: [],
468+
};
469+
const config = makeConfig();
470+
const sender: MailSender = () => {};
471+
472+
// Should not throw
473+
expect(() => {
474+
handleTransition("pr:tpsdev-ai/cli#1", instance, "approved", config, sender, publisher);
475+
}).not.toThrow();
476+
477+
// Give microtask queue a tick — error is caught internally
478+
await Promise.resolve();
479+
});
480+
481+
test("publisher is not called when state unchanged", async () => {
482+
const calls: string[] = [];
483+
const publisher: FlairPublisher = async (_, _from, to) => { calls.push(to); };
484+
485+
const instance: PrInstance = {
486+
key: "pr:tpsdev-ai/cli#5",
487+
repo: "tpsdev-ai/cli",
488+
prNumber: 5,
489+
title: "T",
490+
state: "approved",
491+
openedAt: new Date().toISOString(),
492+
lastTransitionAt: new Date().toISOString(),
493+
reminderSentAt: null,
494+
history: [],
495+
};
496+
const config = makeConfig();
497+
const sender: MailSender = () => {};
498+
499+
handleTransition("pr:tpsdev-ai/cli#5", instance, "approved", config, sender, publisher);
500+
await Promise.resolve();
501+
expect(calls).toHaveLength(0);
502+
});
503+
});

0 commit comments

Comments
 (0)