1- import { existsSync , mkdirSync , readFileSync , readdirSync , renameSync , writeFileSync } from "node:fs" ;
1+ import { existsSync , mkdirSync , readFileSync , readdirSync , renameSync , rmSync , statSync , writeFileSync } from "node:fs" ;
22import { join } from "node:path" ;
33import { homedir } from "node:os" ;
44import { randomUUID } from "node:crypto" ;
@@ -12,11 +12,26 @@ export interface MailMessage {
1212 body : string ;
1313 timestamp : string ;
1414 read : boolean ;
15+ ackedAt ?: string ;
16+ nackedAt ?: string ;
17+ nackReason ?: string ;
18+ nackType ?: "transient" | "agent" | "permanent" ;
19+ checkedOutAt ?: string ;
20+ checkedOutBy ?: string ;
21+ deliveryAttempts ?: number ;
22+ retryAfter ?: string ;
23+ prNumber ?: number ;
1524 headers ?: Record < string , string > ;
1625}
1726
1827const MAX_BODY_BYTES = 64 * 1024 ;
1928export const MAX_INBOX_MESSAGES = 100 ;
29+ const LEASE_TIMEOUT_MS = 30 * 60 * 1000 ;
30+
31+ const VALID_ID = / ^ [ a - z A - Z 0 - 9 . _ - ] + $ / ;
32+ export function validateMessageId ( id : string ) : void {
33+ if ( ! VALID_ID . test ( id ) ) throw new Error ( `Invalid message ID: ${ id } ` ) ;
34+ }
2035
2136function assertValidAgentId ( agent : string ) : void {
2237 const safe = sanitizeIdentifier ( agent ) ;
@@ -41,16 +56,18 @@ export function getMailDir(): string {
4156 return dir ;
4257}
4358
44- export function getInbox ( agent : string ) : { root : string ; tmp : string ; fresh : string ; cur : string } {
59+ export function getInbox ( agent : string ) : { root : string ; tmp : string ; fresh : string ; cur : string ; dlq : string } {
4560 assertValidAgentId ( agent ) ;
4661 const root = join ( getMailDir ( ) , agent ) ;
4762 const tmp = join ( root , "tmp" ) ;
4863 const fresh = join ( root , "new" ) ;
4964 const cur = join ( root , "cur" ) ;
65+ const dlq = join ( root , "dlq" ) ;
5066 mkdirSync ( tmp , { recursive : true } ) ;
5167 mkdirSync ( fresh , { recursive : true } ) ;
5268 mkdirSync ( cur , { recursive : true } ) ;
53- return { root, tmp, fresh, cur } ;
69+ mkdirSync ( dlq , { recursive : true } ) ;
70+ return { root, tmp, fresh, cur, dlq } ;
5471}
5572
5673function readMessagesFromDir ( dir : string , read : boolean ) : MailMessage [ ] {
@@ -66,6 +83,46 @@ function readMessagesFromDir(dir: string, read: boolean): MailMessage[] {
6683 . sort ( ( a , b ) => ( a . timestamp < b . timestamp ? 1 : - 1 ) ) ;
6784}
6885
86+ function listMessageFiles ( dir : string ) : string [ ] {
87+ if ( ! existsSync ( dir ) ) return [ ] ;
88+ return readdirSync ( dir ) . filter ( ( f ) => f . endsWith ( ".json" ) ) ;
89+ }
90+
91+ function readMessageFile ( path : string ) : MailMessage {
92+ return JSON . parse ( readFileSync ( path , "utf-8" ) ) as MailMessage ;
93+ }
94+
95+ function writeMessageFile ( path : string , msg : MailMessage ) : void {
96+ writeFileSync ( path , JSON . stringify ( msg , null , 2 ) , "utf-8" ) ;
97+ }
98+
99+ function isLeaseExpired ( msg : MailMessage , now = Date . now ( ) ) : boolean {
100+ if ( ! msg . checkedOutAt ) return true ;
101+ return ( now - Date . parse ( msg . checkedOutAt ) ) > LEASE_TIMEOUT_MS ;
102+ }
103+
104+ function parseDurationMs ( raw ?: string , fallbackMs = 24 * 60 * 60 * 1000 ) : number {
105+ if ( ! raw ) return fallbackMs ;
106+ const m = raw . match ( / ^ ( \d + ) ( m s | s | m | h | d ) $ / ) ;
107+ if ( ! m ) return fallbackMs ;
108+ const n = Number ( m [ 1 ] ) ;
109+ const unit = m [ 2 ] ;
110+ return n * ( unit === "ms" ? 1 : unit === "s" ? 1000 : unit === "m" ? 60_000 : unit === "h" ? 3_600_000 : 86_400_000 ) ;
111+ }
112+
113+ function messagePathById ( agent : string , id : string ) : string | null {
114+ validateMessageId ( id ) ;
115+ const inbox = getInbox ( agent ) ;
116+ for ( const dir of [ inbox . fresh , inbox . cur , inbox . dlq ] ) {
117+ for ( const file of listMessageFiles ( dir ) ) {
118+ const full = join ( dir , file ) ;
119+ const msg = readMessageFile ( full ) ;
120+ if ( msg . id === id || msg . id . startsWith ( id ) ) return full ;
121+ }
122+ }
123+ return null ;
124+ }
125+
69126export function countInboxMessages ( agent : string ) : number {
70127 const inbox = getInbox ( agent ) ;
71128 return readdirSync ( inbox . fresh ) . filter ( ( f ) => f . endsWith ( ".json" ) ) . length +
@@ -108,30 +165,116 @@ export function sendMessage(to: string, body: string, from?: string): MailMessag
108165 return { ...message , filePath : newPath } ;
109166}
110167
111- export function checkMessages ( agent : string ) : MailMessage [ ] {
168+ export function checkMessages ( agent : string , checkedOutBy = agent ) : MailMessage [ ] {
112169 assertValidAgentId ( agent ) ;
170+ assertValidAgentId ( checkedOutBy ) ;
113171 const inbox = getInbox ( agent ) ;
114- const files = readdirSync ( inbox . fresh ) . filter ( ( f ) => f . endsWith ( ".json" ) ) ;
115172 const messages : MailMessage [ ] = [ ] ;
173+ const nowIso = new Date ( ) . toISOString ( ) ;
174+ const nowMs = Date . now ( ) ;
116175
117- for ( const f of files ) {
176+ for ( const f of listMessageFiles ( inbox . fresh ) ) {
118177 const from = join ( inbox . fresh , f ) ;
119178 const to = join ( inbox . cur , f ) ;
120179 renameSync ( from , to ) ;
121- const raw = readFileSync ( to , "utf-8" ) ;
122- const msg = JSON . parse ( raw ) as MailMessage ;
123- msg . read = true ;
180+ const msg = readMessageFile ( to ) ;
181+ msg . read = false ;
182+ msg . checkedOutAt = nowIso ;
183+ msg . checkedOutBy = checkedOutBy ;
184+ msg . deliveryAttempts = ( msg . deliveryAttempts ?? 0 ) + 1 ;
185+ writeMessageFile ( to , msg ) ;
124186 messages . push ( msg ) ;
125187 logEvent ( { event : "read" , from : msg . from , to : agent , messageId : msg . id } , msg . body ) ;
126188 }
127189
190+ for ( const f of listMessageFiles ( inbox . cur ) ) {
191+ const full = join ( inbox . cur , f ) ;
192+ const msg = readMessageFile ( full ) ;
193+ if ( msg . read || msg . nackedAt ) continue ;
194+ if ( msg . retryAfter && Date . parse ( msg . retryAfter ) > nowMs ) continue ;
195+ if ( msg . checkedOutBy && ! isLeaseExpired ( msg , nowMs ) ) continue ;
196+ msg . checkedOutAt = nowIso ;
197+ msg . checkedOutBy = checkedOutBy ;
198+ writeMessageFile ( full , msg ) ;
199+ messages . push ( msg ) ;
200+ }
201+
128202 return messages . sort ( ( a , b ) => ( a . timestamp < b . timestamp ? 1 : - 1 ) ) ;
129203}
130204
131205export function listMessages ( agent : string ) : MailMessage [ ] {
132206 assertValidAgentId ( agent ) ;
133207 const inbox = getInbox ( agent ) ;
134208 const unread = readMessagesFromDir ( inbox . fresh , false ) ;
135- const read = readMessagesFromDir ( inbox . cur , true ) ;
136- return [ ...unread , ...read ] . sort ( ( a , b ) => ( a . timestamp < b . timestamp ? 1 : - 1 ) ) ;
209+ const cur = readMessagesFromDir ( inbox . cur , true ) ;
210+ const dlq = readMessagesFromDir ( inbox . dlq , true ) ;
211+ return [ ...unread , ...cur , ...dlq ] . sort ( ( a , b ) => ( a . timestamp < b . timestamp ? 1 : - 1 ) ) ;
212+ }
213+
214+ export function ackMessage ( agent : string , id : string ) : MailMessage | null {
215+ const path = messagePathById ( agent , id ) ;
216+ if ( ! path ) return null ;
217+ const msg = readMessageFile ( path ) ;
218+ msg . read = true ;
219+ msg . ackedAt = new Date ( ) . toISOString ( ) ;
220+ delete msg . nackedAt ;
221+ delete msg . nackReason ;
222+ delete msg . nackType ;
223+ delete msg . checkedOutAt ;
224+ delete msg . checkedOutBy ;
225+ delete msg . retryAfter ;
226+ writeMessageFile ( path , msg ) ;
227+ return msg ;
228+ }
229+
230+ export function nackMessage ( agent : string , id : string , reason : string , type : "transient" | "agent" | "permanent" = "transient" , retryAfter ?: string ) : MailMessage | null {
231+ const path = messagePathById ( agent , id ) ;
232+ if ( ! path ) return null ;
233+ const msg = readMessageFile ( path ) ;
234+ msg . read = false ;
235+ msg . nackedAt = new Date ( ) . toISOString ( ) ;
236+ msg . nackReason = reason ;
237+ msg . nackType = type ;
238+ msg . checkedOutAt = undefined ;
239+ msg . checkedOutBy = undefined ;
240+ if ( type === "transient" && retryAfter ) {
241+ msg . retryAfter = new Date ( Date . now ( ) + parseDurationMs ( retryAfter , 60_000 ) ) . toISOString ( ) ;
242+ } else {
243+ delete msg . retryAfter ;
244+ }
245+ if ( type === "permanent" ) {
246+ const inbox = getInbox ( agent ) ;
247+ const target = join ( inbox . dlq , path . split ( "/" ) . pop ( ) ! ) ;
248+ writeMessageFile ( path , msg ) ;
249+ renameSync ( path , target ) ;
250+ return msg ;
251+ }
252+ writeMessageFile ( path , msg ) ;
253+ return msg ;
254+ }
255+
256+ export function gcMessages ( agent ?: string , maxAge = "24h" , prNumber ?: number , hardTtl = "48h" ) : number {
257+ const agents = agent ? [ agent ] : ( existsSync ( getMailDir ( ) ) ? readdirSync ( getMailDir ( ) ) . filter ( ( d ) => existsSync ( join ( getMailDir ( ) , d , "cur" ) ) ) : [ ] ) ;
258+ let removed = 0 ;
259+ const doneCutoff = Date . now ( ) - parseDurationMs ( maxAge , 24 * 60 * 60 * 1000 ) ;
260+ const hardCutoff = Date . now ( ) - parseDurationMs ( hardTtl , 48 * 60 * 60 * 1000 ) ;
261+ for ( const a of agents ) {
262+ const inbox = getInbox ( a ) ;
263+ for ( const dir of [ inbox . fresh , inbox . cur , inbox . dlq ] ) {
264+ for ( const file of listMessageFiles ( dir ) ) {
265+ const full = join ( dir , file ) ;
266+ const msg = readMessageFile ( full ) ;
267+ const ts = Date . parse ( msg . ackedAt ?? msg . timestamp ) ;
268+ const hardTs = Date . parse ( msg . timestamp ) ;
269+ const done = msg . read && ! ! msg . ackedAt ;
270+ const prMatch = prNumber == null || msg . prNumber === prNumber || msg . body . includes ( `#${ prNumber } ` ) || msg . body . includes ( `PR #${ prNumber } ` ) ;
271+ if ( ! prMatch ) continue ;
272+ if ( ( done && ts < doneCutoff ) || hardTs < hardCutoff ) {
273+ rmSync ( full , { force : true } ) ;
274+ removed ++ ;
275+ }
276+ }
277+ }
278+ }
279+ return removed ;
137280}
0 commit comments