This document breaks down how the system is designed and why certain architectural decisions were made.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Client Browser β
ββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββ
β HTTP/HTTPS
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Nginx Reverse Proxy β
β - Routes /api/* β Backend (FastAPI) β
β - Routes /* β Frontend (Static Files) β
β - Handles CORS, compression, caching β
ββββββββββββββββ¬ββββββββββββββββββββββββββ¬βββββββββββββββββββββββββ
β β
βΌ βΌ
ββββββββββββββββββββββββββββ βββββββββββββββββββββββββββββββββββ
β Backend (FastAPI) β β Frontend (React/Vite) β
β ββββββββββββββββββββββ β β βββββββββββββββββββββββββββββ β
β β Routes β β β β Components β β
β β /auth, /scans β β β β Auth, Scan UI β β
β βββββββββββ¬βββββββββββ β β βββββββββββ¬ββββββββββββββββββ β
β βΌ β β β β
β ββββββββββββββββββββββ β β βββββββββββββββββββββββββββββ β
β β Services β β β β TanStack Query β β
β β Business Logic β β β β Server State Mgmt β β
β βββββββββββ¬βββββββββββ β β βββββββββββ¬ββββββββββββββββββ β
β βΌ β β β β
β ββββββββββββββββββββββ β β βββββββββββββββββββββββββββββ β
β β Repositories β β β β Zustand Stores β β
β β Data Access β β β β Local State Mgmt β β
β βββββββββββ¬βββββββββββ β β βββββββββββββββββββββββββββββ β
β β β β β
β βββββββββββΌβββββββββββ β βββββββββββββββββββββββββββββββββββ
β β Scanners β β
β β RateLimit, Auth, β β
β β SQLi, IDOR β β
β ββββββββββββββββββββββ β
ββββββββββββββββ¬βββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β PostgreSQL Database β
β Tables: users, scans, test_results β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Nginx Reverse Proxy
- Purpose: Single entry point for all HTTP traffic
- Responsibilities: Route requests based on path, serve static files, handle SSL termination (production), compress responses, cache static assets
- Interfaces: Exposes port 80 (HTTP) and optionally 443 (HTTPS), proxies to backend on internal port 8000
FastAPI Backend
- Purpose: REST API server providing authentication and scanning services
- Responsibilities: Validate requests, enforce authentication, execute security scans, store results, return JSON responses
- Interfaces: Exposes HTTP endpoints at
/auth/*and/scans/*, connects to PostgreSQL for data persistence
React Frontend
- Purpose: User interface for creating scans and viewing results
- Responsibilities: Form validation, API communication, state management, result visualization
- Interfaces: Communicates with backend via
/apiprefix, renders in browser
PostgreSQL Database
- Purpose: Persistent storage for users, scans, and test results
- Responsibilities: Data integrity, relationship enforcement, query optimization
- Interfaces: Accepts connections from backend on port 5432, enforces foreign key constraints
Scanner Modules
- Purpose: Execute security tests against target APIs
- Responsibilities: Send HTTP requests, analyze responses, detect vulnerabilities, collect evidence
- Interfaces: Inherit from
BaseScanner, returnTestResultCreateschemas
Step by step walkthrough of what happens when a user submits a new scan:
1. User submits form β Frontend validates (Zod schema)
Input: { targetUrl, authToken, testsToRun, maxRequests }
Validation happens at frontend/src/lib/validation.ts:42-52
2. Frontend β POST /api/scans/ β Nginx
Adds Authorization header with JWT from localStorage
Request routed based on /api prefix
3. Nginx β Backend (routes/scans.py:23-37)
Proxy passes to http://backend:8000/scans/
Preserves headers including Authorization
4. Route handler β Dependencies check auth
@limiter.limit("15/minute") - rate limits this endpoint
get_current_user() extracts JWT, validates, loads user from DB
Code at backend/core/dependencies.py:17-49
5. Route β Service layer (services/scan_service.py:23-65)
ScanService.run_scan(db, user_id, scan_request)
Creates Scan record in database via repository
6. Service β Scanner modules (scanners/*.py)
Loops through requested tests (rate_limit, auth, sqli, idor)
Instantiates appropriate scanner class for each test
Each scanner inherits from BaseScanner
7. Scanner β Target API
Makes HTTP requests using requests.Session
Implements retry logic, rate limiting, timeout handling
Base logic at backend/scanners/base_scanner.py:40-156
8. Scanner analyzes responses β Returns TestResultCreate
Detects vulnerabilities based on:
- Status codes (429 for rate limiting)
- Response content (SQL errors)
- Timing differences (blind SQLi)
- Header patterns (JWT algorithms)
9. Service saves results β Repository β Database
TestResultRepository.create_test_result() for each scanner output
Foreign key links results to scan
Code at backend/repositories/test_result_repository.py:19-50
10. Service β Route β JSON response
ScanResponse includes full scan with nested test_results
Frontend receives and redirects to /scans/{id}
11. Frontend fetches full scan β GET /api/scans/{id}
TanStack Query caches result
Renders TestResultCard for each result
Example with code references:
# Step 5: Route handler
@router.post("/", response_model=ScanResponse)
@limiter.limit(settings.API_RATE_LIMIT_SCAN) # "15/minute"
async def create_scan(
request: Request,
scan_request: ScanRequest,
db: Session = Depends(get_db),
current_user: UserResponse = Depends(get_current_user), # Auth check
) -> ScanResponse:
return ScanService.run_scan(db, current_user.id, scan_request)
# Step 6: Service orchestrates scanners
scanner_mapping: dict[TestType, type[BaseScanner]] = {
TestType.RATE_LIMIT: RateLimitScanner,
TestType.AUTH: AuthScanner,
TestType.SQLI: SQLiScanner,
TestType.IDOR: IDORScanner,
}
for test_type in scan_request.tests_to_run:
scanner_class = scanner_mapping.get(test_type)
scanner = scanner_class(
target_url=str(scan_request.target_url),
auth_token=scan_request.auth_token,
max_requests=scan_request.max_requests,
)
result = scanner.scan() # Execute the test
results.append(result)
# Step 9: Save to database
for result in results:
TestResultRepository.create_test_result(
db=db,
scan_id=scan.id,
test_name=result.test_name,
status=result.status,
severity=result.severity,
details=result.details,
evidence_json=result.evidence_json,
recommendations_json=result.recommendations_json,
)1. User fills registration form β Frontend validates
Zod schema checks: email format, password strength (8+ chars, uppercase, lowercase, number)
frontend/src/lib/validation.ts:17-31
2. POST /api/auth/register β Rate limited to 15/minute
Prevents automated account creation abuse
backend/routes/auth.py:25-42
3. Service checks if email exists β Repository query
UserRepository.get_by_email() checks for duplicates
Returns 400 if email already registered
4. Service hashes password with bcrypt
core/security.py:11-18 uses bcrypt.gensalt() and hashpw()
Salt automatically generated per password
5. Repository creates user record
UserRepository.create_user(email, hashed_password)
Sets is_active=True, created_at=UTC timestamp
6. Response returns user data (NOT password)
UserResponse schema excludes hashed_password field
Frontend redirects to login page
What it is: Abstraction layer between business logic and data access. All database queries go through repository classes that provide clean interfaces.
Where we use it:
repositories/user_repository.py- User CRUD operationsrepositories/scan_repository.py- Scan queries with eager loadingrepositories/test_result_repository.py- Test result operations
Why we chose it:
Keeps services clean and testable. Services call UserRepository.get_by_email(db, email) instead of writing raw queries. If we switch from SQLAlchemy to a different ORM or database entirely, we only change repository implementations, not service logic.
Trade-offs:
- Pros: Testable (mock repositories), maintainable (queries in one place), flexible (swap implementations)
- Cons: Extra layer of abstraction, more files to navigate, can feel like overkill for simple CRUD
Example implementation:
# repositories/user_repository.py:12-35
class UserRepository:
"""
Repository for User database operations
"""
@staticmethod
def get_by_id(db: Session, user_id: int) -> User | None:
return db.query(User).filter(User.id == user_id).first()
@staticmethod
def get_by_email(db: Session, email: str) -> User | None:
return db.query(User).filter(User.email == email).first()
@staticmethod
def create_user(
db: Session,
email: str,
hashed_password: str,
commit: bool = True
) -> User:
user = User(email=email, hashed_password=hashed_password)
db.add(user)
if commit:
db.commit()
db.refresh(user)
return userServices use it:
# services/auth_service.py:24-32
existing_user = UserRepository.get_by_email(db, user_data.email)
if existing_user:
raise HTTPException(status_code=400, detail="Email already registered")
hashed_password = hash_password(user_data.password)
user = UserRepository.create_user(
db=db,
email=user_data.email,
hashed_password=hashed_password,
)What it is: FastAPI's dependency injection system automatically provides values to route handler parameters. Used for database sessions, authentication, rate limiting.
Where we use it:
Every route handler in routes/auth.py and routes/scans.py uses dependencies:
# routes/scans.py:23-37
@router.post("/", response_model=ScanResponse)
@limiter.limit(settings.API_RATE_LIMIT_SCAN)
async def create_scan(
request: Request, # Injected by FastAPI
scan_request: ScanRequest, # Parsed and validated from request body
db: Session = Depends(get_db), # Database session injected
current_user: UserResponse = Depends(get_current_user), # Auth check injected
) -> ScanResponse:
return ScanService.run_scan(db, current_user.id, scan_request)Why we chose it: Clean separation of concerns. The route handler doesn't know how to:
- Get a database session (handled by
get_db) - Validate JWT tokens (handled by
get_current_user) - Parse request bodies (handled by Pydantic)
This makes testing easier - mock the dependencies, not the entire request cycle.
Trade-offs:
- Pros: Testable, reusable, explicit dependencies, automatic cleanup (session closing)
- Cons: "Magic" behavior for beginners, debugging can be tricky if dependency fails
The get_current_user dependency implementation (core/dependencies.py:17-49):
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
) -> UserResponse:
"""
FastAPI dependency to extract and verify the current authenticated user
"""
try:
payload = decode_token(credentials.credentials)
email: str | None = payload.get("sub")
if email is None:
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
user = UserRepository.get_by_email(db, email)
if not user:
raise HTTPException(status_code=401, detail="User not found")
return UserResponse.model_validate(user)
except ValueError:
raise HTTPException(status_code=401, detail="Invalid authentication credentials")What it is:
Define the skeleton of an algorithm in a base class, let subclasses override specific steps. All scanners share common HTTP logic but implement their own scan() method.
Where we use it:
scanners/base_scanner.py provides common functionality:
class BaseScanner(ABC):
def __init__(self, target_url: str, auth_token: str | None = None, max_requests: int | None = None):
self.target_url = target_url.rstrip("/")
self.auth_token = auth_token
self.max_requests = max_requests or settings.DEFAULT_MAX_REQUESTS
self.session = self._create_session()
self.last_request_time = 0.0
self.request_count = 0
def make_request(self, method: str, endpoint: str, **kwargs: Any) -> requests.Response:
"""Common HTTP request logic with retry and rate limiting"""
self._wait_before_request()
# ... retry logic, backoff, timeout handling
def get_baseline_timing(self, endpoint: str, samples: int | None = None) -> tuple[float, float]:
"""Statistical baseline for time-based detection"""
# ... takes samples, calculates mean and stdev
@abstractmethod
def scan(self) -> TestResultCreate:
"""Must be implemented by specific scanner classes"""Subclasses implement scan():
# scanners/sqli_scanner.py:25-60
class SQLiScanner(BaseScanner):
def scan(self) -> TestResultCreate:
error_based_test = self._test_error_based_sqli()
if error_based_test["vulnerable"]:
return self._create_vulnerable_result(...)
boolean_based_test = self._test_boolean_based_sqli()
if boolean_based_test["vulnerable"]:
return self._create_vulnerable_result(...)
time_based_test = self._test_time_based_sqli()
if time_based_test["vulnerable"]:
return self._create_vulnerable_result(...)
return TestResultCreate(status=ScanStatus.SAFE, ...)Why we chose it:
Eliminates code duplication. Request spacing, retry logic, session management - written once in BaseScanner, used by all four scanner types.
Trade-offs:
- Pros: DRY principle, consistent behavior, easy to add new scanners
- Cons: Tight coupling to base class, inheritance can be limiting
The backend uses a three-layer architecture:
ββββββββββββββββββββββββββββββββββββββ
β Layer 1: Routes β
β - HTTP request/response β
β - Validation (Pydantic) β
β - Auth checks (dependencies) β
β - Rate limiting β
ββββββββββββββββββ¬ββββββββββββββββββββ
β
ββββββββββββββββββββββββββββββββββββββ
β Layer 2: Services β
β - Business logic β
β - Orchestration β
β - Transaction management β
β - Error handling β
ββββββββββββββββββ¬ββββββββββββββββββββ
β
ββββββββββββββββββββββββββββββββββββββ
β Layer 3: Repositories β
β - Database queries β
β - Data access only β
β - No business logic β
ββββββββββββββββββββββββββββββββββββββ
Separation makes each layer testable in isolation:
- Test routes with mocked services
- Test services with mocked repositories
- Test repositories against a real test database
It also enforces single responsibility. Routes don't write SQL. Services don't parse HTTP headers. Repositories don't implement business rules.
Layer 1: Routes (routes/auth.py, routes/scans.py)
- Files: Route handler functions decorated with
@router.get/post/delete - Imports: Can import from services, schemas, dependencies
- Forbidden: Direct database access, business logic, calling repositories directly
Example route:
# routes/scans.py:67-83
@router.get("/{scan_id}", response_model=ScanResponse)
@limiter.limit(settings.API_RATE_LIMIT_DEFAULT)
async def get_scan(
request: Request,
scan_id: int,
db: Session = Depends(get_db),
current_user: UserResponse = Depends(get_current_user),
) -> ScanResponse:
"""
Get a specific scan by ID
"""
return ScanService.get_scan_by_id(db, scan_id, current_user.id)Layer 2: Services (services/auth_service.py, services/scan_service.py)
- Files: Service classes with static methods
- Imports: Repositories, models, schemas, utilities
- Forbidden: HTTP-specific code (requests, responses), direct SQL queries
Example service method:
# services/scan_service.py:23-65
@staticmethod
def run_scan(db: Session, user_id: int, scan_request: ScanRequest) -> ScanResponse:
# Create scan record
scan = ScanRepository.create_scan(
db=db,
user_id=user_id,
target_url=str(scan_request.target_url),
)
# Map test types to scanner classes
scanner_mapping: dict[TestType, type[BaseScanner]] = {
TestType.RATE_LIMIT: RateLimitScanner,
TestType.AUTH: AuthScanner,
TestType.SQLI: SQLiScanner,
TestType.IDOR: IDORScanner,
}
results: list[TestResultCreate] = []
# Execute each requested test
for test_type in scan_request.tests_to_run:
scanner_class = scanner_mapping.get(test_type)
scanner = scanner_class(...)
result = scanner.scan()
results.append(result)
# Save all results
for result in results:
TestResultRepository.create_test_result(db=db, scan_id=scan.id, ...)
db.refresh(scan)
return ScanResponse.model_validate(scan)Layer 3: Repositories (repositories/user_repository.py, repositories/scan_repository.py, repositories/test_result_repository.py)
- Files: Repository classes with static methods for database operations
- Imports: Models, SQLAlchemy, config
- Forbidden: Business logic, HTTP handling, calling other repositories
Example repository method:
# repositories/scan_repository.py:48-71
@staticmethod
def get_by_user(
db: Session,
user_id: int,
skip: int = 0,
limit: int | None = None
) -> list[Scan]:
"""
Get all scans for a user with pagination
"""
if limit is None:
limit = settings.DEFAULT_PAGINATION_LIMIT
return (
db.query(Scan)
.options(joinedload(Scan.test_results)) # Eager load relationships
.filter(Scan.user_id == user_id)
.order_by(Scan.scan_date.desc())
.offset(skip)
.limit(limit)
.all()
)# models/User.py:12-43
class User(BaseModel):
"""
Stores authentication credentials and user information
"""
__tablename__ = "users"
email = Column(
String(settings.EMAIL_MAX_LENGTH), # 255 chars
unique=True,
nullable=False,
index=True, # Fast lookups by email
)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean, default=True, nullable=False)Fields explained:
id: Auto-incrementing primary key (inherited fromBaseModel)email: Unique identifier for login, indexed for fast authentication querieshashed_password: Bcrypt hash, never stored in plaintext, never returned in API responsesis_active: Soft delete flag, allows disabling accounts without losing datacreated_at,updated_at: Timestamps inherited fromBaseModel
Relationships:
- One-to-many with Scan:
user.scansreturns all scans created by this user - Defined by relationship in Scan model:
user = relationship("User", backref="scans")
# models/Scan.py:15-57
class Scan(BaseModel):
"""
Stores metadata about scans performed on target URLs
"""
__tablename__ = "scans"
user_id = Column(
Integer,
ForeignKey("users.id", ondelete="CASCADE"), # Delete scans when user deleted
nullable=False,
index=True,
)
target_url = Column(
String(settings.URL_MAX_LENGTH), # 2048 chars
nullable=False,
)
scan_date = Column(
DateTime(timezone=True),
default=lambda: datetime.now(UTC),
nullable=False,
)
user = relationship("User", backref="scans")
test_results = relationship(
"TestResult",
back_populates="scan",
cascade="all, delete-orphan", # Delete results when scan deleted
)Fields explained:
user_id: Foreign key to users table, indexed for filtering scans by usertarget_url: URL that was scanned, up to 2048 chars for long query stringsscan_date: When scan was initiated, timezone-aware datetime in UTCCASCADE: When user is deleted, their scans are deleted. When scan is deleted, its test results are deleted
Relationships:
- Many-to-one with User:
scan.usergets the user who created it - One-to-many with TestResult:
scan.test_resultsgets all vulnerability findings
Properties (computed, not stored):
@property
def has_vulnerabilities(self) -> bool:
return any(result.status == "vulnerable" for result in self.test_results)
@property
def vulnerability_count(self) -> int:
return sum(1 for result in self.test_results if result.status == "vulnerable")# models/TestResult.py:16-57
class TestResult(BaseModel):
"""
Stores individual test results for each security scan
"""
__tablename__ = "test_results"
scan_id = Column(
Integer,
ForeignKey("scans.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
test_name = Column(
Enum(TestType), # rate_limit, auth, sqli, idor
nullable=False,
index=True,
)
status = Column(
Enum(ScanStatus), # vulnerable, safe, error
nullable=False,
index=True,
)
severity = Column(
Enum(Severity), # critical, high, medium, low, info
nullable=False,
index=True,
)
details = Column(Text, nullable=False)
evidence_json = Column(JSON, nullable=False, default=dict)
recommendations_json = Column(JSON, nullable=False, default=list)Fields explained:
scan_id: Foreign key to scans table, which scan this result belongs totest_name: Enum constraining values to valid test types, indexed for filtering by teststatus: Enum for vulnerable/safe/error, indexed for finding all vulnerabilitiesseverity: Enum for CRITICAL/HIGH/MEDIUM/LOW/INFO, indexed for prioritizationdetails: Text description of what was foundevidence_json: JSON storing response codes, payloads, timings - varies by test typerecommendations_json: Array of strings with remediation steps
Why JSON columns: Evidence varies by test type:
- Rate limit test:
{"rate_limit_headers": {...}, "bypass_method": "IP spoofing"} - SQLi test:
{"database_type": "mysql", "payload": "' OR 1=1--", "response_time": "5.23s"} - Auth test:
{"algorithm_variant": "none", "status_code": 200}
JSON flexibility lets each scanner store relevant data without schema changes.
What we're protecting against:
-
Unauthorized access to scan data - Users should only see their own scans. Attacker tries to view scan ID 123 when they only created scan ID 456. Defense: Authorization check in
services/scan_service.py:77-80verifiesscan.user_id == user_id. -
Token theft and replay - Attacker steals JWT from network traffic or XSS. Defense: HTTPS in production (enforced by nginx), short token lifetime (24 hours from
config.py:24), httpOnly cookies (not implemented but recommended). -
Brute force login attempts - Attacker tries common passwords against accounts. Defense: Rate limiting at
routes/auth.py:49limits login to 20/minute, bcrypt makes password verification slow. -
SQL injection in scanner payloads - Malicious user creates scan with SQLi payload as target URL hoping to exploit our database. Defense: All database access uses parameterized queries via SQLAlchemy, never concatenation.
-
Resource exhaustion - Attacker submits scans with max_requests=50 repeatedly to consume backend resources. Defense: Rate limiting on scan creation (15/minute), timeout limits on scanners, max_requests capped at 50.
What we're NOT protecting against (out of scope):
- DDoS attacks - Application-level rate limiting can't stop volumetric network floods. Requires infrastructure defenses (CloudFlare, AWS Shield).
- Database compromise - If attacker gains direct database access, they can read all data. Requires infrastructure hardening, encrypted columns for sensitive data.
- Server-side request forgery (SSRF) - Scanners make requests to user-provided URLs. This is intentional functionality. Mitigation: scanners run with limited network access, not on internal network.
Multiple layers of security create defense in depth:
Layer 1: Network (Nginx)
β HTTPS, CORS headers, rate limits
Layer 2: Application (FastAPI)
β JWT validation, endpoint rate limits, input validation
Layer 3: Business Logic (Services)
β Authorization checks, transaction management
Layer 4: Data Access (Repositories)
β Parameterized queries, row-level permissions
Why multiple layers?
If one defense fails, others catch the attack. Example: Nginx rate limit bypassed via IP spoofing, but application-level rate limit (by user ID) still protects. JWT validation bypassed somehow, but service layer still checks scan.user_id before returning data.
Complete JWT authentication cycle:
1. Registration (services/auth_service.py:19-41):
# Hash password with bcrypt
hashed_password = hash_password(user_data.password)
# Bcrypt automatically generates salt, 10 rounds by default
# Store hashed password
user = UserRepository.create_user(
db=db,
email=user_data.email,
hashed_password=hashed_password,
)2. Login (services/auth_service.py:43-71):
# Verify password
if not verify_password(login_data.password, user.hashed_password):
raise HTTPException(status_code=401, detail="Invalid email or password")
# Create JWT with expiration
access_token = create_access_token(
data={"sub": user.email}, # Subject claim
expires_delta=timedelta(minutes=1440) # 24 hours
)
return TokenResponse(access_token=access_token, token_type="bearer")3. Protected endpoint access (core/dependencies.py:17-49):
# Extract token from Authorization header
credentials: HTTPAuthorizationCredentials = Depends(security)
# security = HTTPBearer() from fastapi.security
# Decode and verify token
payload = decode_token(credentials.credentials)
email = payload.get("sub")
# Load user from database
user = UserRepository.get_by_email(db, email)
if not user:
raise HTTPException(status_code=401)
return UserResponse.model_validate(user)4. Route handler receives authenticated user:
@router.post("/scans/")
async def create_scan(
current_user: UserResponse = Depends(get_current_user), # Authenticated
...
):
# current_user is guaranteed to be valid at this point
return ScanService.run_scan(db, current_user.id, ...)Multiple rate limit implementations:
1. Nginx level - Not implemented in dev, but production nginx can use limit_req:
limit_req_zone $binary_remote_addr zone=api_limit:10m rate=100r/m;
limit_req zone=api_limit burst=20 nodelay;2. Application level - SlowAPI per-endpoint limits (backend/factory.py:34-36):
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)Applied to routes:
# routes/auth.py:25-28
@router.post("/register", ...)
@limiter.limit(settings.API_RATE_LIMIT_REGISTER) # "15/minute"
async def register(...):
# routes/auth.py:47-50
@router.post("/login", ...)
@limiter.limit(settings.API_RATE_LIMIT_LOGIN) # "20/minute"
async def login(...):
# routes/scans.py:23-26
@router.post("/", ...)
@limiter.limit(settings.API_RATE_LIMIT_SCAN) # "15/minute"
async def create_scan(...):3. Scanner-level - Outgoing requests to targets are spaced (base_scanner.py:64-90):
def _wait_before_request(self, jitter_ms: int | None = None) -> None:
"""
Implement request spacing to avoid overwhelming target
"""
required_delay = 1.0 / (self.max_requests / settings.SCANNER_RATE_LIMIT_WINDOW_SECONDS)
# If max_requests=100 and window=60s, delay = 1.0 / (100/60) = 0.6s between requests
jitter = random.uniform(0, jitter_ms / 1000.0) # Random variation
elapsed = time.time() - self.last_request_time
if elapsed < required_delay:
time.sleep(required_delay - elapsed + jitter)This prevents scanners from hammering target APIs and getting IP banned.
What we store:
- User accounts (email, hashed password)
- Scan metadata (target URL, timestamp)
- Test results (findings, evidence, recommendations)
Why PostgreSQL:
- Relational data with foreign keys (scans β users, test_results β scans)
- JSON column support for flexible evidence storage
- ACID transactions for data integrity
- Mature, well-documented, widely deployed
Alternatives considered:
- MongoDB: Better for schema-less data, but we have clear relationships and benefit from foreign key constraints
- SQLite: Simpler setup, but doesn't handle concurrent writes well (multiple scans running simultaneously)
Schema design:
-- Automatically generated by SQLAlchemy from models
CREATE TABLE users (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
hashed_password VARCHAR NOT NULL,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL
);
CREATE INDEX ix_users_email ON users(email);
CREATE TABLE scans (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
target_url VARCHAR(2048) NOT NULL,
scan_date TIMESTAMP WITH TIME ZONE NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL
);
CREATE INDEX ix_scans_user_id ON scans(user_id);
CREATE TABLE test_results (
id SERIAL PRIMARY KEY,
scan_id INTEGER NOT NULL REFERENCES scans(id) ON DELETE CASCADE,
test_name VARCHAR NOT NULL, -- Enum: rate_limit, auth, sqli, idor
status VARCHAR NOT NULL, -- Enum: vulnerable, safe, error
severity VARCHAR NOT NULL, -- Enum: critical, high, medium, low, info
details TEXT NOT NULL,
evidence_json JSON NOT NULL,
recommendations_json JSON NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL
);
CREATE INDEX ix_test_results_scan_id ON test_results(scan_id);
CREATE INDEX ix_test_results_test_name ON test_results(test_name);
CREATE INDEX ix_test_results_status ON test_results(status);
CREATE INDEX ix_test_results_severity ON test_results(severity);Indexes explained:
users.email: Fast login lookups (WHERE email = ?)scans.user_id: Fast filtering (WHERE user_id = ?)test_results.scan_id: Fast joins (JOIN scans WHERE scan_id = ?)test_results.status: Fast vulnerability queries (WHERE status = 'vulnerable')test_results.severity: Fast filtering by severity (WHERE severity = 'critical')
Database connections are expensive to create. SQLAlchemy maintains a pool:
# core/database.py:12-17
engine = create_engine(
settings.DATABASE_URL,
pool_pre_ping=True, # Verify connections before use (handles DB restarts)
echo=settings.DEBUG, # Log all SQL when DEBUG=True
)Default pool size: 5 connections, overflow: 10 (up to 15 total).
Session lifecycle managed by dependency:
# core/database.py:28-36
def get_db() -> Generator[Session, None, None]:
"""
FastAPI dependency for database sessions
"""
db = SessionLocal()
try:
yield db # Provide to route handler
finally:
db.close() # Always close, even if exceptionAll configuration lives in .env and config.py:
# config.py:14-69
class Settings(BaseSettings):
# Application
APP_NAME: str = "API Security Tester"
VERSION: str = "1.0.0"
DEBUG: bool = False
# Database
DATABASE_URL: str # Required, no default
# Security - JWT
SECRET_KEY: str # Required, MUST be random in production
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 1440
# Rate limiting
API_RATE_LIMIT_LOGIN: str = "20/minute"
API_RATE_LIMIT_REGISTER: str = "15/minute"
API_RATE_LIMIT_SCAN: str = "15/minute"
# Scanner limits
SCANNER_MAX_CONCURRENT_REQUESTS: int = 50
SCANNER_CONNECTION_TIMEOUT: int = 180
DEFAULT_MAX_REQUESTS: int = 100Why centralized config:
- Single source of truth for all constants
- Type validation with Pydantic
- Easy to change without touching code
- Different values for dev/test/prod
Configuration strategy:
Development uses .env file loaded by Docker Compose. Production uses environment variables set directly in Docker, Kubernetes, or cloud platform.
Critical settings that must be changed for production:
SECRET_KEY- Generate withopenssl rand -hex 32DEBUG- Must befalseDATABASE_URL- Production database, not localhostCORS_ORIGINS- Actual frontend domain, nothttp://localhost
Where this system gets slow under load:
-
Scanner HTTP requests - Each scan makes 10-100 HTTP requests to external APIs. If target is slow (5s response time), scans take minutes. Can't parallelize much without overwhelming targets. Mitigated by timeout limits (180s from
config.py:51). -
Database queries with relationships - Loading
scan.test_resultstriggers N+1 queries if not eager loaded. With 10 scans and 4 results each = 41 queries (1 for scans + 40 for results). Solved withjoinedload()in repositories:
# repositories/scan_repository.py:50-59
return (
db.query(Scan)
.options(joinedload(Scan.test_results)) # Single query with JOIN
.filter(Scan.user_id == user_id)
.all()
)- Password hashing on login - Bcrypt is intentionally slow (prevents brute force). Each login takes ~100ms. Under load, authentication becomes bottleneck. Mitigated by rate limiting login attempts and using caching (not implemented, but could add Redis for session tokens).
What we did to make it faster:
-
Request pooling -
BaseScannerreusesrequests.Session()which maintains connection pools to targets. Avoids TCP handshake overhead on each request. -
Database indexes - Foreign keys and commonly queried columns (email, user_id, status) are indexed. Queries that would do table scans become index lookups.
-
Response pagination - Scan list queries use LIMIT/OFFSET to avoid loading thousands of records:
# repositories/scan_repository.py:50-71
@staticmethod
def get_by_user(
db: Session,
user_id: int,
skip: int = 0,
limit: int | None = None # Default 100 from config
) -> list[Scan]:
if limit is None:
limit = settings.DEFAULT_PAGINATION_LIMIT
return (
db.query(Scan)
.offset(skip)
.limit(limit) # Only load requested page
.all()
)- Enum columns -
test_name,status,severityuse Postgres ENUMs, not strings. Smaller storage, faster comparisons, enforced validity.
Vertical scaling (more CPU/RAM on single server):
- Database: Increase
max_connectionsin Postgres config - Backend: Run more gunicorn workers (4 workers in production Dockerfile)
- Frontend: Nginx already efficient, bottleneck is unlikely here
Current limits with single server:
- Database can handle ~100 concurrent connections
- Backend with 4 workers handles ~400 concurrent requests
- Scanners are the real limit - each scan is long-running (30-60s)
Horizontal scaling (more servers): Challenges:
- Scanners are stateless, can run on any backend instance β
- Database requires connection pooling strategy (PgBouncer)
- Shared session state needed (Redis) or stick to JWT (stateless) β
- Load balancer required (nginx, AWS ALB)
What needs to change:
- Add load balancer in front of backend
- Configure shared session store or rely solely on JWT
- Database connection pool management (PgBouncer)
- Consider async task queue (Celery, RQ) for long-running scans
What we chose:
Scanners use synchronous requests library, not async httpx or aiohttp.
Alternatives considered:
httpxasync HTTP client - Could run all tests concurrentlyaiohttp- Similar benefits, different API
Trade-offs: Pros of sync:
- Simpler code, easier to reason about timing
- Time-based SQLi detection requires precise timing control
- Baseline timing calculation needs sequential requests
- Standard
requestslibrary is battle-tested
Cons of sync:
- Can't run multiple tests concurrently within a scan
- Blocks event loop (mitigated by running in thread pool)
- Slower for scans with many tests
Why we made this choice:
Accuracy over speed. Time-based blind SQL injection detection (sqli_scanner.py:183-251) requires:
- Establish baseline response time (multiple samples)
- Send delay payload
- Measure if response is slower by expected amount
Async concurrency would introduce timing noise. A delay of 5.1s vs 5.3s could be network jitter, not SQLi. Sequential requests with controlled spacing give cleaner signals.
What we chose:
Repository pattern - UserRepository.get_by_email(db, email) instead of User.find_by_email(email).
Alternatives considered:
- Active Record (Django-style) - Models have class methods for queries
- Data Mapper (raw SQL) - Write SQL strings directly
Trade-offs: Pros of repository:
- Clear separation: models define structure, repositories define queries
- Testable: mock repositories in unit tests
- Flexible: swap ORM without changing service code
Cons of repository:
- More files, more navigation
- Extra abstraction layer
- Can feel like overkill for simple CRUD
Why we made this choice:
Testability and maintainability. Services like AuthService.login_user() call UserRepository.get_by_email(). In tests, mock the repository to return a fake user without touching the database.
If we later migrate from SQLAlchemy to another ORM, we only change repository implementations. Services remain unchanged.
What we chose: Single long-lived JWT (24 hours), no refresh token mechanism.
Alternatives considered:
- Short access tokens (15 min) + refresh tokens (30 days)
- Session-based auth with server-side storage
Trade-offs: Pros of current approach:
- Simpler implementation, no refresh endpoint
- Stateless - no session storage needed
- Works across multiple backend instances immediately
Cons of current approach:
- Can't invalidate tokens before expiration
- If token stolen, attacker has 24 hours of access
- No way to force logout on all devices
Why we made this choice: Simplicity for educational project. Adding refresh tokens requires:
- Refresh token storage (database or Redis)
- Refresh endpoint with rotation logic
- Token revocation tracking
- More complex frontend token management
For a production app, you'd implement refresh tokens. For learning how JWT works, this is clearer.
What we chose:
Run everything in Docker containers with dev.compose.yml, even for local development.
Alternatives considered:
- Local Postgres + local Python + local Node (no Docker)
- Docker for services, local for development
- Kubernetes locally (minikube, kind)
Trade-offs: Pros of Docker Compose:
- Identical environment for all developers
- Spin up entire stack with one command
- Hot reload still works with volume mounts
- Production architecture matches dev (Docker in both)
Cons of Docker Compose:
- Slower file system on Mac (volume mounts)
- Extra resource usage (containers overhead)
- Learning curve for Docker debugging
Why we made this choice:
"Works on my machine" is eliminated. Every developer gets Postgres 16, Python 3.11, Node 20 regardless of their host OS. New team member runs docker compose up and they're ready.
Volume mounts preserve hot reload:
# dev.compose.yml:33-35
volumes:
- ./backend:/app # Maps local backend/ to container /app
# Changes to backend/*.py trigger uvicorn reloadProduction deployment uses optimized containers:
ββββββββββββββββββββ
Internet ββββββββ> β Nginx (Port 80) β
ββββββββββ¬ββββββββββ
β
ββββββββββΌββββββββββ
β Backend (8000) β
β Gunicorn β
β 4 workers β
ββββββββββ¬ββββββββββ
β
ββββββββββΌββββββββββ
β PostgreSQL β
β (internal only) β
ββββββββββββββββββββ
Components:
Nginx container - Built from conf/docker/prod/vite.docker:
- Multi-stage build: compile React, then serve with nginx
- Serves static files from
/usr/share/nginx/html - Proxies
/apito backend - Gzip compression, caching headers
Backend container - Built from conf/docker/prod/fastapi.docker:
- Runs gunicorn with 4 uvicorn workers
- Non-root user for security
- No volume mounts (code baked into image)
Database container - Postgres 16 Alpine:
- Not exposed to host in production
- Data persists in Docker volume
Infrastructure: Minimal production setup:
- Single VPS (DigitalOcean Droplet, AWS EC2)
- Docker Compose orchestration
- SSL via Let's Encrypt (certbot) or Cloudflare proxy
Scaling beyond single server:
- Backend: Multiple instances behind load balancer
- Database: Read replicas, connection pooling
- Static files: CDN (CloudFront, Cloudflare)
- Validation errors (400) - Pydantic catches bad input:
# schemas/user_schemas.py:26-38
@field_validator("password")
@classmethod
def validate_password_strength(cls, v: str) -> str:
if not re.search(r"[A-Z]", v):
raise ValueError("Password must contain uppercase letter")
# Pydantic converts to HTTP 422 automatically- Authentication errors (401) - JWT invalid or expired:
# core/dependencies.py:33-36
if email is None:
raise HTTPException(
status_code=401,
detail="Invalid authentication credentials"
)- Authorization errors (403) - Valid user, wrong resource:
# services/scan_service.py:77-80
if scan.user_id != user_id:
raise HTTPException(
status_code=403,
detail="Not authorized to access this scan"
)- Not found errors (404) - Resource doesn't exist:
# services/scan_service.py:73-74
if not scan:
raise HTTPException(status_code=404, detail="Scan not found")- Scanner errors - Caught and returned as
status="error":
# services/scan_service.py:52-65
try:
scanner = scanner_class(...)
result = scanner.scan()
results.append(result)
except Exception as e:
results.append(
TestResultCreate(
test_name=test_type,
status="error",
severity="info",
details=f"Scanner error: {str(e)}",
...
)
)Database connection loss:
- Detection:
pool_pre_ping=Truetests connections before use - Response: SQLAlchemy automatically reconnects
- Recovery: Failed transaction rolls back, next request gets new connection
Scanner timeout:
- Detection:
requests.Timeoutexception afterSCANNER_CONNECTION_TIMEOUTseconds - Response: Retry with exponential backoff (up to 3 times)
- Recovery: If all retries fail, return error result (scan continues with other tests)
Rate limit exceeded (429 from target):
- Detection: HTTP 429 status code in scanner response
- Response: Read
Retry-Afterheader, wait specified duration - Recovery: Retry request after waiting
Code from base_scanner.py:92-156:
def make_request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
retry_count = 0
backoff_factor = 2.0
while retry_count <= settings.DEFAULT_RETRY_COUNT:
try:
response = self.session.request(method, url, **kwargs)
if response.status_code == 429:
retry_after = response.headers.get("Retry-After", "60")
wait_time = int(retry_after) if retry_after.isdigit() else 60
time.sleep(wait_time)
retry_count += 1
continue # Try again
if response.status_code >= 500: # Server error
wait_time = backoff_factor ** retry_count
time.sleep(wait_time)
retry_count += 1
continue
return response # Success
except (requests.Timeout, requests.ConnectionError):
if retry_count < settings.DEFAULT_RETRY_COUNT:
wait_time = backoff_factor ** retry_count
time.sleep(wait_time)
retry_count += 1
else:
raise # Give up after retries exhaustedWant to add a new security test (e.g., XSS detection)? Here's the process:
1. Create scanner in backend/scanners/xss_scanner.py:
from .base_scanner import BaseScanner
from core.enums import TestType, ScanStatus, Severity
class XSSScanner(BaseScanner):
def scan(self) -> TestResultCreate:
# Test for reflected XSS
test_result = self._test_reflected_xss()
if test_result["vulnerable"]:
return TestResultCreate(
test_name=TestType.XSS, # Need to add to enum
status=ScanStatus.VULNERABLE,
severity=Severity.HIGH,
details="Reflected XSS detected",
evidence_json=test_result,
recommendations_json=[...]
)
return TestResultCreate(test_name=TestType.XSS, status=ScanStatus.SAFE, ...)2. Add to enum in backend/core/enums.py:19-25:
class TestType(str, Enum):
RATE_LIMIT = "rate_limit"
AUTH = "auth"
SQLI = "sqli"
IDOR = "idor"
XSS = "xss" # New test type3. Register in service at backend/services/scan_service.py:32-37:
scanner_mapping: dict[TestType, type[BaseScanner]] = {
TestType.RATE_LIMIT: RateLimitScanner,
TestType.AUTH: AuthScanner,
TestType.SQLI: SQLiScanner,
TestType.IDOR: IDORScanner,
TestType.XSS: XSSScanner, # Register new scanner
}4. Update frontend constants in frontend/src/config/constants.ts:44-51:
export const SCAN_TEST_TYPES = {
RATE_LIMIT: 'rate_limit',
AUTH: 'auth',
SQLI: 'sqli',
IDOR: 'idor',
XSS: 'xss', // Add new test type
} as const;
export const TEST_TYPE_LABELS: Record<ScanTestType, string> = {
// ... existing
[SCAN_TEST_TYPES.XSS]: 'Cross-Site Scripting',
};No changes needed to:
- Database schema (test_name is enum, migrations auto-update)
- Routes (they just pass through test types)
- Repositories (they store whatever test types are sent)
Current architectural limitations:
-
No scan queueing - Scans run synchronously in request handler. If 10 users submit scans simultaneously, they all block on scanner HTTP requests. Fix requires async task queue (Celery with Redis, or RQ).
-
No real-time scan progress - Frontend submits scan and waits for complete response. Long scans (2 minutes) show no progress. Fix requires WebSocket connection or polling for progress updates.
-
Single target per scan - Can't scan multiple URLs in one operation. Fix requires loop in service layer and UI for multiple target inputs.
-
No historical comparison - Can't compare scan results over time ("Was this vulnerable last week?"). Fix requires additional queries and UI for trend visualization.
-
Limited concurrency in scanners - Tests run sequentially within a scan. Could run all 4 tests simultaneously, but chose not to for timing accuracy. Trade-off between speed and precision.
These are not bugs - they're conscious trade-offs. Fixing them would require significant architectural changes.
How we're different:
- Burp is a proxy, we're a standalone scanner
- Burp has GUI desktop app, we're web-based
- Burp is comprehensive (hundreds of tests), we focus on 4 core vulnerabilities
Why we made different choices: Educational focus. Burp is for professional pentesters. This project teaches how scanners work by implementing the core logic yourself.
How we're different:
- ZAP is passive + active scanning, we're active only
- ZAP auto-discovers endpoints, we test provided URLs
- ZAP integrates with CI/CD, we're standalone
Why we made different choices: Simplicity. ZAP is powerful but complex. This project shows the fundamentals without overwhelming features.
Initial design focused on:
- Four core vulnerability types
- Synchronous scanners for accuracy
- Repository pattern for clean separation
- Docker-first development
Planned architectural changes:
-
Async task queue - Move scanning to background workers
- Why: Non-blocking API, better scalability
- What it enables: Real-time progress, scheduled scans
-
Plugin system - Load scanners dynamically
- Why: Extensibility without modifying core
- What it enables: Community contributions, custom tests
-
Report generation - PDF/HTML export of results
- Why: Sharing findings with teams
- What it enables: Professional documentation
-
Webhook notifications - Alert when scans complete
- Why: Integration with other tools
- What it enables: Slack/email notifications, CI/CD integration
Quick map of where to find things:
backend/factory.py- Application factory, middleware setup, route registrationbackend/config.py- All environment variables and configurationbackend/core/database.py- Database engine and session managementbackend/core/security.py- JWT creation, password hashing, token validationbackend/core/dependencies.py- FastAPI dependencies (auth, database)backend/models/- SQLAlchemy models (User, Scan, TestResult)backend/repositories/- Database query functionsbackend/services/- Business logic orchestrationbackend/routes/- API endpointsbackend/scanners/base_scanner.py- Common scanner functionalitybackend/scanners/*_scanner.py- Individual vulnerability testsfrontend/src/hooks/- React Query hooks for API callsfrontend/src/services/- API client functionsfrontend/src/store/- Zustand state managementconf/nginx/- Nginx reverse proxy configurationcompose.yml- Production Docker Composedev.compose.yml- Development with volume mounts
Now that you understand the architecture:
- Read 03-IMPLEMENTATION.md for code walkthrough - see how each scanner detects vulnerabilities, how authentication flows work, and how data moves through the layers
- Try modifying scanners - change SQLi payloads, adjust timing thresholds, add new detection logic to understand the implementation details