Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions CONSENSUS_ARCH.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Distributed Transaction Integrity & Vector-Clock Reconciler

## 🚀 Overview
Issue #730 upgrades the application's synchronization layer from a naive "last-write-wins" model to a sophisticated **Distributed Consensus System**. By implementing **Vector Clocks**, the system can distinguish between causal updates (where one change happened after another) and concurrent updates (conflicts).

## 🏗️ Core Components

### 1. Vector Clocks (`utils/vectorClockUtils.js`)
Every transaction record now carries a "Version Map" representing its history across all devices.
- **Causality Tracking**: Allows the server to know if an incoming update is built upon the latest server state or if the client missed some changes.
- **Partial Ordering**: Enables mathematically sound conflict detection without needing a global synchronized clock.

### 2. The Consensus Engine (`services/consensusEngine.js`)
The brain of the sync layer. It evaluates incoming updates against three scenarios:
- **Causal Update**: The client is strictly ahead of the server. The update is applied immediately.
- **Stale Update**: The client is sending data it has already superseded. The update is ignored.
- **Concurrent Conflict**: Both server and client have modified the record independently. The system prevents data loss by moving the state into the **Conflict Graveyard**.

### 3. Conflict Graveyard (`models/SyncConflict.js`)
When a conflict occurs, the system doesn't guess a winner. Instead:
- It preserves the `ServerState`, `ClientState`, and their respective `VectorClocks`.
- It flags the transaction as `conflict` status.
- It exposes a manual resolution API for the user to choose `client_wins`, `server_wins`, or a `merge`.

### 4. Data Integrity Guard (`utils/hashGenerator.js`)
To prevent "Ghost Updates," every state is hashed. Even if vector clocks appear aligned, the system verifies the checksum to ensure no bit-rot or intercepted data is being injected.

## 🔄 The Sync Workflow
1. **Request Received**: `SyncInterceptor` extracts device identity and the client's vector clock.
2. **Reconciliation**: `ConsensusEngine` compares the client clock against the database clock.
3. **Action Execution**:
- If Causal: Database is updated, and the server clock is incremented.
- If Concurrent: A record is created in `SyncConflict` for the user to resolve later.
4. **Maintenance**: `ConflictPruner` periodically purges old resolved conflicts to keep the database lean.

## ✅ Benefits
- **Zero Data Loss**: Offline edits no longer overwrite newer online changes.
- **Causal Consistency**: The system guarantees that users always see a logically consistent history of their finances.
- **Device Agnostic**: Seamlessly scales from web to mobile to offline-first desktop apps.

## 🧪 Testing
Verify the logic with:
```bash
npx mocha tests/consensusEngine.test.js
```
31 changes: 31 additions & 0 deletions jobs/conflictPruner.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
const cron = require('node-cron');
const SyncConflict = require('../models/SyncConflict');
const logger = require('../utils/structuredLogger');

/**
* Conflict Pruner Job
* Issue #730: Automatically removes old resolved conflicts to save storage.
*/
class ConflictPruner {
start() {
// Run every day at 3 AM
cron.schedule('0 3 * * *', async () => {
logger.info('[ConflictPruner] Starting cleanup of resolved conflicts...');

try {
const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);

const result = await SyncConflict.deleteMany({
status: 'resolved',
resolvedAt: { $lt: thirtyDaysAgo }
});

logger.info(`[ConflictPruner] Purged ${result.deletedCount} old conflict records.`);
} catch (err) {
logger.error('[ConflictPruner] Cleanup failure', { error: err.message });
}
});
}
}

module.exports = new ConflictPruner();
51 changes: 19 additions & 32 deletions middleware/syncInterceptor.js
Original file line number Diff line number Diff line change
@@ -1,41 +1,28 @@
const syncManager = require('../services/syncManager');

/**
* Sync Interceptor Middleware
* Issue #660: Automatically hooks into POST/PUT/DELETE requests to log mutations
* Issue #730: Extracts device identity and vector clocks from headers.
* Essential for distributed consensus tracking.
*/
const syncInterceptor = async (req, res, next) => {
// Only capture mutations targeting supported syncing entities
const syncableRoutes = ['/api/expenses', '/api/budgets', '/api/workspaces'];
const isMutation = ['POST', 'PUT', 'PATCH', 'DELETE'].includes(req.method);

if (!isMutation || !syncableRoutes.some(route => req.originalUrl.startsWith(route))) {
return next();
}

const deviceId = req.headers['x-device-id'] || 'web-default';

// Intercept response to log success mutations
const originalJson = res.json;
res.json = function (data) {
if ((res.statusCode === 200 || res.statusCode === 201) && data.success && data.data) {
// Asynchronously log the mutation
const operation = req.method === 'POST' ? 'CREATE' : (req.method === 'DELETE' ? 'DELETE' : 'UPDATE');
const syncInterceptor = (req, res, next) => {
// 1. Extract Device ID
const deviceId = req.headers['x-device-id'] || 'web-browser';

// Handle both single objects and arrays
const entities = Array.isArray(data.data) ? data.data : [data.data];

entities.forEach(entity => {
syncManager.logMutation(
req.user._id,
deviceId,
entity,
operation,
req.body
).catch(err => console.error('[SyncInterceptor] Log failed:', err));
});
// 2. Extract Vector Clock (expected as JSON string in header)
let clientClock = {};
try {
const clockHeader = req.headers['x-vector-clock'];
if (clockHeader) {
clientClock = JSON.parse(clockHeader);
}
return originalJson.call(this, data);
} catch (err) {
console.warn('[SyncInterceptor] Invalid vector clock header format');
}

// Attach to request object for use in controllers/services
req.syncContext = {
deviceId,
clientClock
};

next();
Expand Down
49 changes: 26 additions & 23 deletions models/SyncConflict.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,52 @@ const mongoose = require('mongoose');

/**
* SyncConflict Model
* Issue #705: Stores conflicting edits detected via vector clocks.
* Allows for manual or automatic field-level resolution.
* Issue #730: Stores conflicting state snapshots for distributed transaction reconciliation.
* Acts as a "Conflict Graveyard" for manual or automatic resolution.
*/
const syncConflictSchema = new mongoose.Schema({
entityId: {
transactionId: {
type: mongoose.Schema.Types.ObjectId,
ref: 'Transaction',
required: true,
index: true
},
entityType: {
type: String,
required: true, // e.g., 'Transaction'
index: true
},
userId: {
type: mongoose.Schema.Types.ObjectId,
ref: 'User',
required: true,
index: true
},
workspaceId: {
type: mongoose.Schema.Types.ObjectId,
ref: 'Workspace',
index: true
},
baseState: mongoose.Schema.Types.Mixed,
conflictingStates: [{
deviceId: String,
state: mongoose.Schema.Types.Mixed,
vectorClock: {
type: Map,
of: Number
},
timestamp: { type: Date, default: Date.now }
}],
resolvedState: mongoose.Schema.Types.Mixed,
resolvedAt: Date,
resolutionStrategy: {
serverState: mongoose.Schema.Types.Mixed,
clientState: mongoose.Schema.Types.Mixed,
vectorClocks: {
server: { type: Map, of: Number },
client: { type: Map, of: Number }
},
conflictType: {
type: String,
enum: ['manual', 'auto_merge', 'last_write_wins', 'source_wins'],
default: 'manual'
enum: ['concurrent_update', 'delete_update_collision', 'logic_violation'],
default: 'concurrent_update'
},
status: {
type: String,
enum: ['open', 'resolved', 'ignored'],
default: 'open',
index: true
}
},
resolutionStrategy: String, // 'client_wins', 'server_wins', 'merge', 'manual'
resolvedAt: Date,
resolvedBy: {
type: mongoose.Schema.Types.ObjectId,
ref: 'User'
},
checksum: String
}, {
timestamps: true
});
Expand Down
9 changes: 4 additions & 5 deletions models/Transaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,12 @@ const transactionSchema = new mongoose.Schema({
},
syncMetadata: {
lastDeviceId: String,
lastModifiedByDevice: String,
checksum: String,
isDeleted: { type: Boolean, default: false },
deletedAt: Date,
conflicts: [{
deviceId: String,
timestamp: Date,
data: mongoose.Schema.Types.Mixed
}]
syncStatus: { type: String, enum: ['synced', 'pending', 'conflict'], default: 'synced' },
conflictsCount: { type: Number, default: 0 }
},
// NEW: Features for Event Sourcing
lastEventId: {
Expand Down
89 changes: 42 additions & 47 deletions routes/sync.js
Original file line number Diff line number Diff line change
@@ -1,71 +1,66 @@
const express = require('express');
const router = express.Router();
const auth = require('../middleware/auth');
const syncManager = require('../services/syncManager');
const syncInterceptor = require('../middleware/syncInterceptor');
const transactionService = require('../services/transactionService');
const consensusEngine = require('../services/consensusEngine');
const SyncConflict = require('../models/SyncConflict');
const ResponseFactory = require('../utils/ResponseFactory');

/**
* @route GET /api/sync/delta
* @desc Fetch differential updates since a specific version
* Distributed Sync API (Vector Clock Enhanced)
* Issue #730: Handles high-integrity multi-device data synchronization.
*/
router.get('/delta', auth, async (req, res) => {
try {
const lastVersion = parseInt(req.query.v) || 0;
const changes = await syncManager.getDifferentialUpdates(req.user._id, lastVersion);

const latestVersion = changes.length > 0 ? changes[changes.length - 1].version : lastVersion;

res.json({
success: true,
v: latestVersion,
count: changes.length,
changes
});
} catch (error) {
res.status(500).json({ success: false, error: error.message });
}
});

/**
* @route POST /api/sync/push
* @desc Push local changes from a client with conflict resolution
* @route POST /api/sync/transactions/:id
* @desc Sync a specific transaction with consensus reconciliation
*/
router.post('/push', auth, async (req, res) => {
router.post('/transactions/:id', auth, syncInterceptor, async (req, res) => {
try {
const deviceId = req.headers['x-device-id'] || 'web-default';
const { entityType, data } = req.body;
const { id } = req.params;
const result = await transactionService.syncUpdate(id, req.body, req.syncContext);

if (!entityType || !data) {
return res.status(400).json({ success: false, error: 'Missing sync payload' });
if (result.status === 'synced') {
return ResponseFactory.success(res, result.transaction, 200, 'State synchronized');
}

const result = await syncManager.applyIncomingUpdate(req.user._id, deviceId, entityType, data);
if (result.status === 'conflict') {
return ResponseFactory.success(res, null, 409, 'Conflict detected. Captured in graveyard.');
}

res.json({
success: true,
action: result.action,
entity: result.entity,
logs: result.logs
});
} catch (error) {
console.error('[SyncRoute] Push failed:', error);
res.status(500).json({ success: false, error: error.message });
return res.status(200).json({ success: true, ...result });
} catch (err) {
return res.status(500).json({ success: false, error: err.message });
}
});

/**
* @route POST /api/sync/delete
* @desc Propagate a hard/soft delete across devices
* @route GET /api/sync/conflicts
* @desc List all captured conflicts for the user
*/
router.post('/delete', auth, async (req, res) => {
try {
const deviceId = req.headers['x-device-id'] || 'web-default';
const { entityType, entityId } = req.body;
router.get('/conflicts', auth, async (req, res) => {
const conflicts = await SyncConflict.find({
userId: req.user._id,
status: 'open'
}).sort({ createdAt: -1 });

res.json({ success: true, count: conflicts.length, conflicts });
});

await syncManager.softDelete(req.user._id, deviceId, entityType, entityId);
/**
* @route POST /api/sync/conflicts/:id/resolve
* @desc Resolve a conflict using a specific strategy
*/
router.post('/conflicts/:id/resolve', auth, async (req, res) => {
try {
const { id } = req.params;
const { strategy, resolvedData } = req.body;

res.json({ success: true, message: 'Delete captured and synced' });
} catch (error) {
res.status(500).json({ success: false, error: error.message });
const result = await consensusEngine.resolveConflict(id, strategy, resolvedData);
res.json(result);
} catch (err) {
res.status(500).json({ success: false, error: err.message });
}
});

Expand Down
5 changes: 3 additions & 2 deletions server.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ async function connectDatabase() {
require('./jobs/conflictCleaner').start();
require('./jobs/logRotator').start();
require('./jobs/searchIndexer').start();
require('./jobs/conflictPruner').start();



// Start resilient orchestrator
Expand Down Expand Up @@ -114,8 +116,7 @@ app.use('/api/export', require('./routes/export'));
app.use('/api/forecasting', require('./routes/forecasting'));
app.use('/api/governance', require('./routes/governance'));
app.use('/api/taxonomy', require('./routes/taxonomy'));
app.use('/api/sync', require('./routes/syncManager'));
app.use('/api/audit', require('./routes/audit'));
app.use('/api/sync', require('./routes/sync'));

app.use('/api/telemetry', require('./routes/telemetry'));
app.use('/api/jobs', require('./routes/jobs'));
Expand Down
Loading