-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
executable file
·363 lines (307 loc) · 12.3 KB
/
server.py
File metadata and controls
executable file
·363 lines (307 loc) · 12.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
#!/usr/bin/env -S uv run
# /// script
# requires-python = ">=3.11"
# dependencies = ["starlette", "uvicorn"]
# ///
"""Local dev API server: serves archive data from SQLite.
Endpoints mirror what D1 Workers would provide in prod.
"""
from starlette.applications import Starlette
from starlette.middleware.cors import CORSMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Mount, Route
from starlette.staticfiles import StaticFiles
import sqlite3
import sys
from pathlib import Path
_default_db = Path(__file__).parent / "archive.db"
DB_PATH = sys.argv[1] if len(sys.argv) > 1 else str(_default_db)
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def rows_to_dicts(rows):
return [dict(r) for r in rows]
async def get_channels(request: Request):
"""List all non-thread channels with message counts."""
db = get_db()
rows = db.execute("""
SELECT c.id, c.name, c.type, c.position, COUNT(m.id) as message_count,
MIN(m.timestamp) as oldest, MAX(m.timestamp) as newest
FROM channels c
LEFT JOIN messages m ON m.channel_id = c.id
WHERE c.type != 11
GROUP BY c.id
ORDER BY c.name
""").fetchall()
db.close()
return JSONResponse(rows_to_dicts(rows))
async def get_channel_messages(request: Request):
"""Get messages for a channel, paginated by cursor.
Query params:
before: message ID cursor (fetch older messages)
after: message ID cursor (fetch newer messages)
around: message ID to center on (returns limit/2 before + limit/2 after)
limit: max messages to return (default 50, max 200)
"""
channel_id = request.path_params["channel_id"]
before = request.query_params.get("before")
after = request.query_params.get("after")
around = request.query_params.get("around")
limit = min(int(request.query_params.get("limit", 50)), 200)
db = get_db()
if around:
half = limit // 2
before_rows = db.execute("""
SELECT m.*, u.username, u.global_name, u.avatar
FROM messages m
LEFT JOIN users u ON m.author_id = u.id
WHERE m.channel_id = ? AND CAST(m.id AS INTEGER) <= CAST(? AS INTEGER)
ORDER BY CAST(m.id AS INTEGER) DESC
LIMIT ?
""", (channel_id, around, half + 1)).fetchall()
after_rows = db.execute("""
SELECT m.*, u.username, u.global_name, u.avatar
FROM messages m
LEFT JOIN users u ON m.author_id = u.id
WHERE m.channel_id = ? AND CAST(m.id AS INTEGER) > CAST(? AS INTEGER)
ORDER BY CAST(m.id AS INTEGER) ASC
LIMIT ?
""", (channel_id, around, half)).fetchall()
# Combine: before_rows is newest-first, reverse it; after_rows is oldest-first
rows_list = list(reversed(before_rows)) + list(after_rows)
# Return in newest-first order for consistency
rows = list(reversed(rows_list))
elif after:
rows = db.execute("""
SELECT m.*, u.username, u.global_name, u.avatar
FROM messages m
LEFT JOIN users u ON m.author_id = u.id
WHERE m.channel_id = ? AND CAST(m.id AS INTEGER) > CAST(? AS INTEGER)
ORDER BY CAST(m.id AS INTEGER) ASC
LIMIT ?
""", (channel_id, after, limit)).fetchall()
elif before:
rows = db.execute("""
SELECT m.*, u.username, u.global_name, u.avatar
FROM messages m
LEFT JOIN users u ON m.author_id = u.id
WHERE m.channel_id = ? AND CAST(m.id AS INTEGER) < CAST(? AS INTEGER)
ORDER BY CAST(m.id AS INTEGER) DESC
LIMIT ?
""", (channel_id, before, limit)).fetchall()
else:
# Default: most recent messages
rows = db.execute("""
SELECT m.*, u.username, u.global_name, u.avatar
FROM messages m
LEFT JOIN users u ON m.author_id = u.id
WHERE m.channel_id = ?
ORDER BY CAST(m.id AS INTEGER) DESC
LIMIT ?
""", (channel_id, limit)).fetchall()
messages = rows_to_dicts(rows)
# Fetch attachments, reactions, embeds for these messages
if messages:
msg_ids = [m["id"] for m in messages]
placeholders = ",".join("?" * len(msg_ids))
attachments = db.execute(
f"SELECT * FROM attachments WHERE message_id IN ({placeholders})", msg_ids
).fetchall()
att_by_msg = {}
for a in attachments:
att_by_msg.setdefault(a["message_id"], []).append(dict(a))
reactions = db.execute(
f"SELECT * FROM reactions WHERE message_id IN ({placeholders})", msg_ids
).fetchall()
rxn_by_msg = {}
for r in reactions:
rxn_by_msg.setdefault(r["message_id"], []).append(dict(r))
embeds = db.execute(
f"SELECT * FROM embeds WHERE message_id IN ({placeholders})", msg_ids
).fetchall()
emb_by_msg = {}
for e in embeds:
emb_by_msg.setdefault(e["message_id"], []).append(dict(e))
for m in messages:
m["attachments"] = att_by_msg.get(m["id"], [])
m["reactions"] = rxn_by_msg.get(m["id"], [])
m["embeds"] = emb_by_msg.get(m["id"], [])
db.close()
return JSONResponse(messages)
async def get_message(request: Request):
"""Get a single message by ID with its related data."""
message_id = request.path_params["message_id"]
db = get_db()
row = db.execute("""
SELECT m.*, u.username, u.global_name, u.avatar
FROM messages m
LEFT JOIN users u ON m.author_id = u.id
WHERE m.id = ?
""", (message_id,)).fetchone()
if not row:
db.close()
return JSONResponse({"error": "not found"}, status_code=404)
msg = dict(row)
msg["attachments"] = rows_to_dicts(
db.execute("SELECT * FROM attachments WHERE message_id = ?", (message_id,)).fetchall()
)
msg["reactions"] = rows_to_dicts(
db.execute("SELECT * FROM reactions WHERE message_id = ?", (message_id,)).fetchall()
)
msg["embeds"] = rows_to_dicts(
db.execute("SELECT * FROM embeds WHERE message_id = ?", (message_id,)).fetchall()
)
db.close()
return JSONResponse(msg)
async def search_messages(request: Request):
"""Full-text search across all messages.
Query params:
q: search query
limit: max results (default 50, max 100)
"""
query = request.query_params.get("q", "").strip()
if not query:
return JSONResponse([])
limit = min(int(request.query_params.get("limit", 50)), 100)
db = get_db()
is_channel_search = query.startswith("#")
is_user_search = query.startswith("@")
name_query = query.lstrip("#").lstrip("@")
q_like = f"%{name_query}%"
base_select = """
SELECT m.id, m.channel_id, m.content, m.timestamp,
u.username, u.global_name, u.avatar,
c.name as channel_name
FROM messages m
LEFT JOIN users u ON m.author_id = u.id
LEFT JOIN channels c ON m.channel_id = c.id
"""
if is_channel_search:
if not name_query:
# Bare "#" — return recent messages containing any channel mention
rows = db.execute(
f"{base_select} WHERE m.content LIKE '%<#%>%' ORDER BY m.timestamp DESC LIMIT ?",
(limit,),
).fetchall()
db.close()
return JSONResponse([dict(r) for r in rows])
# #channel: only find messages that mention matching channels
matching_ids = [
row["id"] for row in
db.execute("SELECT id FROM channels WHERE name LIKE ?", (q_like,)).fetchall()
]
if not matching_ids:
db.close()
return JSONResponse([])
conditions = " OR ".join(f"m.content LIKE ?" for _ in matching_ids)
params = [f"%<#{cid}>%" for cid in matching_ids]
rows = db.execute(
f"{base_select} WHERE {conditions} ORDER BY m.timestamp DESC LIMIT ?",
params + [limit],
).fetchall()
elif is_user_search:
# @user: find messages by or mentioning matching users
matching_ids = [
row["id"] for row in
db.execute(
"SELECT id FROM users WHERE username LIKE ? OR global_name LIKE ?",
(q_like, q_like),
).fetchall()
]
if not matching_ids:
db.close()
return JSONResponse([])
mention_conds = []
params: list = []
for uid in matching_ids:
mention_conds.append("m.content LIKE ?")
params.append(f"%<@{uid}>%")
mention_conds.append("m.content LIKE ?")
params.append(f"%<@!{uid}>%")
mention_conds.append("m.author_id = ?")
params.append(uid)
conditions = " OR ".join(mention_conds)
rows = db.execute(
f"{base_select} WHERE {conditions} ORDER BY m.timestamp DESC LIMIT ?",
params + [limit],
).fetchall()
else:
# Plain text: FTS + mention resolution
# First, find channel/user IDs matching the query text
mention_conds = []
mention_params: list = []
for row in db.execute("SELECT id FROM channels WHERE name LIKE ? LIMIT 10", (q_like,)).fetchall():
mention_conds.append("m.content LIKE ?")
mention_params.append(f"%<#{row['id']}>%")
for row in db.execute(
"SELECT id FROM users WHERE (username LIKE ? OR global_name LIKE ?) LIMIT 10",
(q_like, q_like),
).fetchall():
mention_conds.append("m.content LIKE ?")
mention_params.append(f"%<@{row['id']}>%")
# FTS for content
fts_safe = all(c.isalnum() or c in ' _-' for c in name_query)
parts = []
if fts_safe and name_query:
parts.append(db.execute(f"""
{base_select.replace('FROM messages m', 'FROM messages_fts f JOIN messages m ON m.rowid = f.rowid')}
WHERE messages_fts MATCH ?
ORDER BY rank LIMIT ?
""", (name_query, limit)).fetchall())
# Content LIKE fallback
parts.append(db.execute(
f"{base_select} WHERE m.content LIKE ? ORDER BY m.timestamp DESC LIMIT ?",
(f"%{name_query}%", limit),
).fetchall())
if mention_conds:
conditions = " OR ".join(mention_conds)
parts.append(db.execute(
f"{base_select} WHERE {conditions} ORDER BY m.timestamp DESC LIMIT ?",
mention_params + [limit],
).fetchall())
# Merge, deduplicate by message id
seen = set()
rows = []
for part in parts:
for row in part:
if row["id"] not in seen:
seen.add(row["id"])
rows.append(row)
rows = rows[:limit]
db.close()
return JSONResponse(rows_to_dicts(rows))
async def get_threads(request: Request):
"""Get threads for a channel."""
channel_id = request.path_params["channel_id"]
db = get_db()
rows = db.execute("""
SELECT * FROM threads WHERE parent_channel_id = ?
ORDER BY id DESC
""", (channel_id,)).fetchall()
db.close()
return JSONResponse(rows_to_dicts(rows))
async def get_users(request: Request):
"""Get all users (for avatar/name resolution)."""
db = get_db()
rows = db.execute("SELECT * FROM users").fetchall()
db.close()
return JSONResponse(rows_to_dicts(rows))
routes = [
Route("/api/channels", get_channels),
Route("/api/channels/{channel_id}/messages", get_channel_messages),
Route("/api/channels/{channel_id}/threads", get_threads),
Route("/api/messages/{message_id}", get_message),
Route("/api/search", search_messages),
Route("/api/users", get_users),
]
att_dir = Path(DB_PATH).parent / "archive" / "attachments"
if att_dir.is_dir():
routes.append(Mount("/attachments", app=StaticFiles(directory=str(att_dir))))
app = Starlette(routes=routes)
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["GET"])
if __name__ == "__main__":
import uvicorn
print(f"Serving {DB_PATH} on http://localhost:5273/api/", file=sys.stderr)
uvicorn.run(app, host="0.0.0.0", port=5273, log_level="info")