Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 40 additions & 37 deletions jobs/conflictCleaner.js
Original file line number Diff line number Diff line change
@@ -1,54 +1,57 @@
const cron = require('node-cron');
const SyncConflict = require('../models/SyncConflict');
const jobOrchestrator = require('../services/jobOrchestrator');
const logger = require('../utils/structuredLogger');

/**
* Conflict Cleaner Job
* Issue #705: Periodically cleans up resolved or old conflict logs.
* Issue #705 & #719: Refactored for Resilient Orchestration.
*/
class ConflictCleaner {
constructor() {
this.name = 'ConflictCleaner';
this.name = 'CONFLICT_CLEANER';
}

/**
* Start the cleaner worker
* Now hooks into the resilient orchestrator
*/
start() {
console.log(`[${this.name}] Initializing conflict maintenance worker...`);

// Run every Sunday at 4:00 AM
cron.schedule('0 4 * * 0', async () => {
try {
console.log(`[${this.name}] Starting conflict cleanup cycle...`);

// 1. Delete resolved conflicts older than 30 days
const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
const resolvedResult = await SyncConflict.deleteMany({
status: 'resolved',
resolvedAt: { $lt: thirtyDaysAgo }
});

// 2. Identify stale open conflicts (older than 90 days) and mark as ignored
const ninetyDaysAgo = new Date(Date.now() - 90 * 24 * 60 * 60 * 1000);
const ignoredResult = await SyncConflict.updateMany(
{
status: 'open',
createdAt: { $lt: ninetyDaysAgo }
},
{
status: 'ignored',
resolutionStrategy: 'auto_merge' // Flagged as auto-discarded
}
);

console.log(`[${this.name}] Maintenance complete.
- Purged ${resolvedResult.deletedCount} old resolved conflicts.
- Ignored ${ignoredResult.modifiedCount} stale open conflicts.`);

} catch (error) {
console.error(`[${this.name}] Critical maintenance error:`, error);
jobOrchestrator.register(
this.name,
'0 4 * * 0', // Every Sunday at 4:00 AM
this.executeCleanup.bind(this),
{ retryLimit: 3, baseDelay: 10000 }
);
}

async executeCleanup() {
logger.info(`[${this.name}] Starting maintenance cycle...`);

// 1. Delete resolved conflicts older than 30 days
const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
const resolvedResult = await SyncConflict.deleteMany({
status: 'resolved',
resolvedAt: { $lt: thirtyDaysAgo }
});

// 2. Identify stale open conflicts (older than 90 days) and mark as ignored
const ninetyDaysAgo = new Date(Date.now() - 90 * 24 * 60 * 60 * 1000);
const ignoredResult = await SyncConflict.updateMany(
{
status: 'open',
createdAt: { $lt: ninetyDaysAgo }
},
{
status: 'ignored',
resolutionStrategy: 'auto_merge'
}
);

logger.info(`[${this.name}] Cleanup complete`, {
purgedCount: resolvedResult.deletedCount,
ignoredCount: ignoredResult.modifiedCount
});

return Promise.resolve();
}
}

Expand Down
54 changes: 29 additions & 25 deletions jobs/logRotator.js
Original file line number Diff line number Diff line change
@@ -1,50 +1,54 @@
const cron = require('node-cron');
const fs = require('fs');
const path = require('path');
const logger = require('../utils/structuredLogger');
const jobOrchestrator = require('../services/jobOrchestrator');

/**
* Log Rotator Job
* Issue #713: Prevents log files from consuming infinite disk space.
* Issue #713 & #719: Refactored for Resilient Orchestration.
*/
class LogRotator {
constructor() {
this.logDir = path.join(process.cwd(), 'logs');
if (!fs.existsSync(this.logDir)) {
fs.mkdirSync(this.logDir, { recursive: true });
}
}

/**
* Now hooks into the resilient orchestrator
*/
start() {
console.log('[LogRotator] Initializing log rotation worker...');

// Run every night at midnight
cron.schedule('0 0 * * *', () => {
this.rotate();
});
jobOrchestrator.register(
'LOG_ROTATION',
'0 0 * * *', // Nightly at midnight
this.rotate.bind(this),
{ retryLimit: 2, baseDelay: 5000 }
);
}

rotate() {
logger.info('Starting nightly log rotation sequence...');
async rotate() {
logger.info('[LogRotator] Sequence started...');

try {
const files = fs.readdirSync(this.logDir);
const timestamp = new Date().toISOString().replace(/:/g, '-');
const files = fs.readdirSync(this.logDir);
const timestamp = new Date().toISOString().replace(/:/g, '-');

for (const file of files) {
if (file.endsWith('.log') && !file.includes('_archive_')) {
const oldPath = path.join(this.logDir, file);
const newPath = path.join(this.logDir, `${file.replace('.log', '')}_archive_${timestamp}.log`);
for (const file of files) {
if (file.endsWith('.log') && !file.includes('_archive_')) {
const oldPath = path.join(this.logDir, file);
const newPath = path.join(this.logDir, `${file.replace('.log', '')}_archive_${timestamp}.log`);

// In a real system, we might GZIP this, or move it to S3
try {
fs.renameSync(oldPath, newPath);
} catch (err) {
// It's possible the file is locked by the logger itself
logger.warn(`Could not rotate busy log file: ${file}`);
}
}

// Also clean up archives older than 7 days
this.cleanupArchives();

logger.info('Log rotation completed successfully.');
} catch (err) {
logger.error('Log rotation failed', { error: err.message });
}

this.cleanupArchives();
return Promise.resolve();
}

cleanupArchives() {
Expand Down
33 changes: 33 additions & 0 deletions middleware/jobGuard.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
const logger = require('../utils/structuredLogger');

/**
* Job Guard Middleware
* Issue #719: Prevents unauthorized manual triggers of sensitive background tasks.
* Validates admin status or specific system-to-system auth tokens.
*/
const jobGuard = (req, res, next) => {
// 1. Ensure user is authenticated
if (!req.user) {
return res.status(401).json({ success: false, error: 'Authentication required' });
}

// 2. Check for Admin role or specified System Token
const isAdmin = req.user.role === 'admin';
const isSystemTrigger = req.headers['x-system-token'] === process.env.SYSTEM_TASK_TOKEN;

if (!isAdmin && !isSystemTrigger) {
logger.warn('Forbidden manual job trigger attempt', {
userId: req.user._id,
job: req.params.jobName || req.body.jobName
});

return res.status(403).json({
success: false,
error: 'You do not have permission to manage background tasks.'
});
}

next();
};

module.exports = jobGuard;
54 changes: 54 additions & 0 deletions models/JobState.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
const mongoose = require('mongoose');

/**
* JobState Model
* Issue #719: Persistent state tracking for background jobs to ensure reliability.
*/
const jobStateSchema = new mongoose.Schema({
jobName: {
type: String,
required: true,
unique: true,
index: true
},
status: {
type: String,
enum: ['idle', 'running', 'failed', 'completed'],
default: 'idle'
},
lastRunAt: Date,
lastCompletedAt: Date,
nextRunAt: Date,
executionCount: {
type: Number,
default: 0
},
failureCount: {
type: Number,
default: 0
},
lastError: {
message: String,
stack: String,
timestamp: Date
},
averageDurationMs: {
type: Number,
default: 0
},
config: {
interval: String, // cron expression
retryLimit: { type: Number, default: 3 },
enabled: { type: Boolean, default: true }
},
history: [{
status: String,
durationMs: Number,
timestamp: { type: Date, default: Date.now },
errorMessage: String
}]
}, {
timestamps: true
});

module.exports = mongoose.model('JobState', jobStateSchema);
65 changes: 65 additions & 0 deletions routes/jobs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
const express = require('express');
const router = express.Router();
const auth = require('../middleware/auth');
const jobGuard = require('../middleware/jobGuard');
const jobOrchestrator = require('../services/jobOrchestrator');
const JobState = require('../models/JobState');
const ResponseFactory = require('../utils/ResponseFactory');

/**
* Background Job Management API
* Issue #719: Management dashboard endpoints for monitoring and controlling resilient tasks.
*/

/**
* @route GET /api/jobs/status
* @desc Get status of all background jobs
*/
router.get('/status', auth, jobGuard, async (req, res) => {
try {
const jobs = await JobState.find().sort({ jobName: 1 });
return ResponseFactory.success(res, jobs);
} catch (err) {
return res.status(500).json({ success: false, error: err.message });
}
});

/**
* @route POST /api/jobs/:jobName/trigger
* @desc Manually trigger a job execution
*/
router.post('/:jobName/trigger', auth, jobGuard, async (req, res) => {
try {
const { jobName } = req.params;

// Non-blocking trigger
jobOrchestrator.runJob(jobName);

return ResponseFactory.success(res, null, 202, `Execution of ${jobName} initiated.`);
} catch (err) {
return res.status(404).json({ success: false, error: err.message });
}
});

/**
* @route PATCH /api/jobs/:jobName/toggle
* @desc Pause or resume a job
*/
router.patch('/:jobName/toggle', auth, jobGuard, async (req, res) => {
try {
const { jobName } = req.params;
const { enabled } = req.body;

if (enabled) {
await jobOrchestrator.resume(jobName);
} else {
await jobOrchestrator.pause(jobName);
}

return ResponseFactory.success(res, { enabled }, 200, `Job ${jobName} updated successfully.`);
} catch (err) {
return res.status(500).json({ success: false, error: err.message });
}
});

module.exports = router;
5 changes: 4 additions & 1 deletion server.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ async function connectDatabase() {
require('./jobs/searchIndexer').start();


// Start resilient orchestrator
require('./services/jobOrchestrator').start();



console.log('✓ Cron jobs initialized');
Expand Down Expand Up @@ -111,7 +114,7 @@ app.use('/api/governance', require('./routes/governance'));
app.use('/api/taxonomy', require('./routes/taxonomy'));
app.use('/api/sync', require('./routes/syncManager'));
app.use('/api/telemetry', require('./routes/telemetry'));
app.use('/api/search', require('./routes/search'));
app.use('/api/jobs', require('./routes/jobs'));



Expand Down
Loading