Skip to content

Commit 7569aa9

Browse files
committed
feat: background job overhaul with bullmq
1 parent 2985929 commit 7569aa9

37 files changed

Lines changed: 969 additions & 486 deletions

admin/.env.example

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ LOG_LEVEL=info
44
APP_KEY=some_random_key
55
NODE_ENV=development
66
SESSION_DRIVER=cookie
7-
DRIVE_DISK=fs
87
DB_HOST=localhost
98
DB_PORT=3306
109
DB_USER=root

admin/adonisrc.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ export default defineConfig({
5252
() => import('@adonisjs/cors/cors_provider'),
5353
() => import('@adonisjs/lucid/database_provider'),
5454
() => import('@adonisjs/inertia/inertia_provider'),
55-
() => import('@adonisjs/drive/drive_provider'),
56-
() => import('@adonisjs/transmit/transmit_provider')
55+
() => import('@adonisjs/transmit/transmit_provider'),
56+
() => import('#providers/map_static_provider')
5757
],
5858

5959
/*
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import type { HttpContext } from '@adonisjs/core/http'
2+
import { DownloadService } from '#services/download_service'
3+
import { downloadJobsByFiletypeSchema } from '#validators/download'
4+
import { inject } from '@adonisjs/core'
5+
6+
@inject()
7+
export default class DownloadsController {
8+
constructor(private downloadService: DownloadService) {}
9+
10+
async index() {
11+
return this.downloadService.listDownloadJobs()
12+
}
13+
14+
async filetype({ request }: HttpContext) {
15+
const payload = await request.validateUsing(downloadJobsByFiletypeSchema)
16+
return this.downloadService.listDownloadJobs(payload.params.filetype)
17+
}
18+
}

admin/app/controllers/maps_controller.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { MapService } from '#services/map_service'
22
import {
3-
filenameValidator,
3+
filenameParamValidator,
44
remoteDownloadValidator,
55
remoteDownloadValidatorOptional,
66
} from '#validators/common'
@@ -53,14 +53,14 @@ export default class MapsController {
5353
}
5454

5555
async delete({ request, response }: HttpContext) {
56-
const payload = await request.validateUsing(filenameValidator)
56+
const payload = await request.validateUsing(filenameParamValidator)
5757

5858
try {
59-
await this.mapService.delete(payload.filename)
59+
await this.mapService.delete(payload.params.filename)
6060
} catch (error) {
6161
if (error.message === 'not_found') {
6262
return response.status(404).send({
63-
message: `Map file with key ${payload.filename} not found`,
63+
message: `Map file with key ${payload.params.filename} not found`,
6464
})
6565
}
6666
throw error // Re-throw any other errors and let the global error handler catch

admin/app/controllers/zim_controller.ts

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { ZimService } from '#services/zim_service'
22
import {
33
downloadCollectionValidator,
4-
filenameValidator,
4+
filenameParamValidator,
55
remoteDownloadValidator,
66
} from '#validators/common'
77
import { listRemoteZimValidator } from '#validators/zim'
@@ -24,11 +24,12 @@ export default class ZimController {
2424

2525
async downloadRemote({ request }: HttpContext) {
2626
const payload = await request.validateUsing(remoteDownloadValidator)
27-
const filename = await this.zimService.downloadRemote(payload.url)
27+
const { filename, jobId } = await this.zimService.downloadRemote(payload.url)
2828

2929
return {
3030
message: 'Download started successfully',
3131
filename,
32+
jobId,
3233
url: payload.url,
3334
}
3435
}
@@ -44,10 +45,6 @@ export default class ZimController {
4445
}
4546
}
4647

47-
async listActiveDownloads({}: HttpContext) {
48-
return this.zimService.listActiveDownloads()
49-
}
50-
5148
async listCuratedCollections({}: HttpContext) {
5249
return this.zimService.listCuratedCollections()
5350
}
@@ -58,14 +55,14 @@ export default class ZimController {
5855
}
5956

6057
async delete({ request, response }: HttpContext) {
61-
const payload = await request.validateUsing(filenameValidator)
58+
const payload = await request.validateUsing(filenameParamValidator)
6259

6360
try {
64-
await this.zimService.delete(payload.filename)
61+
await this.zimService.delete(payload.params.filename)
6562
} catch (error) {
6663
if (error.message === 'not_found') {
6764
return response.status(404).send({
68-
message: `ZIM file with key ${payload.filename} not found`,
65+
message: `ZIM file with key ${payload.params.filename} not found`,
6966
})
7067
}
7168
throw error // Re-throw any other errors and let the global error handler catch

admin/app/jobs/run_download_job.ts

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import { Job } from 'bullmq'
2+
import { RunDownloadJobParams } from '../../types/downloads.js'
3+
import { QueueService } from '#services/queue_service'
4+
import { doResumableDownload } from '../utils/downloads.js'
5+
import { createHash } from 'crypto'
6+
import { DockerService } from '#services/docker_service'
7+
import { ZimService } from '#services/zim_service'
8+
9+
export class RunDownloadJob {
10+
static get queue() {
11+
return 'downloads'
12+
}
13+
14+
static get key() {
15+
return 'run-download'
16+
}
17+
18+
static getJobId(url: string): string {
19+
return createHash('sha256').update(url).digest('hex').slice(0, 16)
20+
}
21+
22+
async handle(job: Job) {
23+
const { url, filepath, timeout, allowedMimeTypes, forceNew, filetype } =
24+
job.data as RunDownloadJobParams
25+
26+
// console.log("Simulating delay for job for URL:", url)
27+
// await new Promise((resolve) => setTimeout(resolve, 30000)) // Simulate initial delay
28+
// console.log("Starting download for URL:", url)
29+
30+
// // simulate progress updates for demonstration
31+
// for (let progress = 0; progress <= 100; progress += 10) {
32+
// await new Promise((resolve) => setTimeout(resolve, 20000)) // Simulate time taken for each progress step
33+
// job.updateProgress(progress)
34+
// console.log(`Job progress for URL ${url}: ${progress}%`)
35+
// }
36+
37+
await doResumableDownload({
38+
url,
39+
filepath,
40+
timeout,
41+
allowedMimeTypes,
42+
forceNew,
43+
onProgress(progress) {
44+
const progressPercent = (progress.downloadedBytes / (progress.totalBytes || 1)) * 100
45+
job.updateProgress(Math.floor(progressPercent))
46+
},
47+
async onComplete(url) {
48+
if (filetype === 'zim') {
49+
try {
50+
const dockerService = new DockerService()
51+
const zimService = new ZimService(dockerService)
52+
await zimService.downloadRemoteSuccessCallback([url], true)
53+
} catch (error) {
54+
console.error(
55+
`[RunDownloadJob] Error in ZIM download success callback for URL ${url}:`,
56+
error
57+
)
58+
}
59+
}
60+
job.updateProgress(100)
61+
},
62+
})
63+
64+
return {
65+
url,
66+
filepath,
67+
}
68+
}
69+
70+
static async getByUrl(url: string): Promise<Job | undefined> {
71+
const queueService = new QueueService()
72+
const queue = queueService.getQueue(this.queue)
73+
const jobId = this.getJobId(url)
74+
return await queue.getJob(jobId)
75+
}
76+
77+
static async dispatch(params: RunDownloadJobParams) {
78+
const queueService = new QueueService()
79+
const queue = queueService.getQueue(this.queue)
80+
const jobId = this.getJobId(params.url)
81+
82+
try {
83+
const job = await queue.add(this.key, params, {
84+
jobId,
85+
attempts: 3,
86+
backoff: { type: 'exponential', delay: 2000 },
87+
removeOnComplete: true,
88+
})
89+
90+
return {
91+
job,
92+
created: true,
93+
message: `Dispatched download job for URL ${params.url}`,
94+
}
95+
} catch (error) {
96+
if (error.message.includes('job already exists')) {
97+
const existing = await queue.getJob(jobId)
98+
return {
99+
job: existing,
100+
created: false,
101+
message: `Job already exists for URL ${params.url}`,
102+
}
103+
}
104+
throw error
105+
}
106+
}
107+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import type { HttpContext } from '@adonisjs/core/http'
2+
import type { NextFn } from '@adonisjs/core/types/http'
3+
import StaticMiddleware from '@adonisjs/static/static_middleware'
4+
import { AssetsConfig } from '@adonisjs/static/types'
5+
6+
/**
7+
* See #providers/map_static_provider.ts for explanation
8+
* of why this middleware exists.
9+
*/
10+
export default class MapsStaticMiddleware {
11+
constructor(
12+
private path: string,
13+
private config: AssetsConfig
14+
) {}
15+
16+
async handle(ctx: HttpContext, next: NextFn) {
17+
const staticMiddleware = new StaticMiddleware(this.path, this.config)
18+
return staticMiddleware.handle(ctx, next)
19+
}
20+
}

admin/app/services/docker_service.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import logger from '@adonisjs/core/services/logger'
44
import { inject } from '@adonisjs/core'
55
import { ServiceStatus } from '../../types/services.js'
66
import transmit from '@adonisjs/transmit/services/main'
7-
import { doSimpleDownload } from '../utils/downloads.js'
7+
import { doResumableDownloadWithRetry } from '../utils/downloads.js'
88
import path from 'path'
99

1010
@inject()
@@ -347,10 +347,15 @@ export class DockerService {
347347
)
348348

349349
try {
350-
await doSimpleDownload({
350+
await doResumableDownloadWithRetry({
351351
url: WIKIPEDIA_ZIM_URL,
352352
filepath,
353353
timeout: 60000,
354+
allowedMimeTypes: [
355+
'application/x-zim',
356+
'application/x-openzim',
357+
'application/octet-stream',
358+
],
354359
})
355360

356361
this._broadcast(

0 commit comments

Comments
 (0)