Skip to content

Latest commit

 

History

History
358 lines (268 loc) · 10.1 KB

File metadata and controls

358 lines (268 loc) · 10.1 KB

CyberCrew v2.0 — Plugin Development Guide

This guide walks you through creating a new forensics module for CyberCrew v2.0. All modules follow the same plugin contract defined by BaseForensicsModule.


Overview

A CyberCrew module is a Python class that:

  1. Inherits from BaseForensicsModule
  2. Implements 5 abstract methods
  3. Uses InputSanitizer for all user input validation
  4. Never calls shell=True — always builds list[str] commands
  5. Connects to the shared ForensicsEngine and ArtifactHasher

Step-by-Step: Creating a New Module

Step 1 — Create the file

Create src/modules/my_module.py.

Step 2 — Define tool constants

# Tool identifier constants
TOOL_HASH_FILE = "hash_file"
TOOL_COMPARE = "compare_files"

ALL_TOOLS = [TOOL_HASH_FILE, TOOL_COMPARE]

Step 3 — Subclass BaseForensicsModule

from src.modules.base_module import BaseForensicsModule, ModuleInputError
from src.security.sanitizer import InputSanitizer
from src.core.engine import ForensicsEngine
from src.core.hasher import ArtifactHasher
from pathlib import Path
from typing import Any, Optional
from PyQt6.QtCore import QObject

class MyModule(BaseForensicsModule):
    def __init__(
        self,
        engine: ForensicsEngine,
        hasher: ArtifactHasher,
        case_id: str,
        evidence_dir: Path,
        parent: Optional[QObject] = None,
    ) -> None:
        super().__init__(engine, hasher, case_id, evidence_dir, parent)

Step 4 — Implement get_tools()

    def get_tools(self) -> list[str]:
        return list(ALL_TOOLS)

Step 5 — Implement get_required_binaries()

    def get_required_binaries(self) -> dict[str, str]:
        return {
            "sha256sum": "Built-in on Linux; part of coreutils",
            "diff":      "sudo apt install diffutils",
        }

Step 6 — Implement validate_input()

Always use InputSanitizer. Never pass raw user input to the command.

    def validate_input(self, tool_name: str, params: dict[str, Any]) -> None:
        if tool_name == TOOL_HASH_FILE:
            file_path = params.get("file_path", "")
            if not file_path:
                raise ModuleInputError("file_path", "File path is required.")
            InputSanitizer.validate_file_path(file_path)

        elif tool_name == TOOL_COMPARE:
            for key in ("file_a", "file_b"):
                if not params.get(key):
                    raise ModuleInputError(key, f"{key} is required.")
                InputSanitizer.validate_file_path(params[key])

Step 7 — Implement build_command()

Return a list[str] only. Never use shell=True.

    def build_command(self, tool_name: str, params: dict[str, Any]) -> list[str]:
        if tool_name == TOOL_HASH_FILE:
            return ["sha256sum", params["file_path"]]

        elif tool_name == TOOL_COMPARE:
            output = str(self.build_evidence_path("diff_result", "txt"))
            return ["diff", "--unified", params["file_a"], params["file_b"]]

        raise ValueError(f"Unknown tool: {tool_name}")

Step 8 — Implement parse_output()

Parse stdout lines into a metrics dict.

    def parse_output(self, tool_name: str, line: str) -> dict[str, Any] | None:
        if tool_name == TOOL_HASH_FILE:
            # sha256sum output: "abc123...  filename"
            parts = line.split()
            if len(parts) == 2 and len(parts[0]) == 64:
                return {
                    "sha256": parts[0],
                    "filename": parts[1],
                }
        elif tool_name == TOOL_COMPARE:
            if line.startswith("+") and not line.startswith("+++"):
                return {"added_lines": "1"}
            if line.startswith("-") and not line.startswith("---"):
                return {"removed_lines": "1"}
        return None

Step 9 — Register in main_window.py (optional)

If you want a dedicated page, add to MainWindow.__init__():

# In src/ui/main_window.py
from src.modules.my_module import MyModule

# In MainWindow.__init__, after other module pages:
page = ModulePage("my_module", self._pages)
self._module_pages["my_module"] = page
self._pages.addWidget(page)

And in src/ui/titlebar.py, add a nav tab:

# In TitleBar — add to the tabs list:
("MY MODULE", "my_module"),

Complete Example: FileHasher Module

This is a minimal but complete working module that computes SHA-256 hashes of files and directories.

"""CyberCrew Plugin — FileHasher Module

Provides two tools:
    1. hash_file — SHA-256 hash a single file
    2. hash_directory — Hash all files in a directory recursively
"""

from __future__ import annotations

import re
from pathlib import Path
from typing import Any, Optional

from PyQt6.QtCore import QObject

from src.core.engine import ForensicsEngine
from src.core.hasher import ArtifactHasher
from src.modules.base_module import BaseForensicsModule, ModuleInputError
from src.security.sanitizer import InputSanitizer

TOOL_HASH_FILE = "hash_file"
TOOL_HASH_DIR  = "hash_directory"
ALL_TOOLS      = [TOOL_HASH_FILE, TOOL_HASH_DIR]


class FileHasherModule(BaseForensicsModule):
    """File hashing module — SHA-256 checksum verification.

    Args:
        engine: Shared ForensicsEngine instance.
        hasher: Shared ArtifactHasher instance.
        case_id: Active case UUID.
        evidence_dir: Root evidence directory.
        parent: Optional Qt parent.
    """

    def __init__(
        self,
        engine: ForensicsEngine,
        hasher: ArtifactHasher,
        case_id: str,
        evidence_dir: Path,
        parent: Optional[QObject] = None,
    ) -> None:
        super().__init__(engine, hasher, case_id, evidence_dir, parent)
        self._hashes_found: int = 0

    def get_tools(self) -> list[str]:
        return list(ALL_TOOLS)

    def get_required_binaries(self) -> dict[str, str]:
        return {
            "sha256sum": "sudo apt install coreutils",
            "find":      "sudo apt install findutils",
        }

    def validate_input(self, tool_name: str, params: dict[str, Any]) -> None:
        target = params.get("target", "")
        if not target:
            raise ModuleInputError("target", "File or directory path is required.")
        InputSanitizer.validate_file_path(target)

    def build_command(self, tool_name: str, params: dict[str, Any]) -> list[str]:
        target = params["target"]
        output = str(self.build_evidence_path("hashes", "txt"))

        if tool_name == TOOL_HASH_FILE:
            return ["sha256sum", target]

        elif tool_name == TOOL_HASH_DIR:
            # find + xargs sha256sum (no pipes, so use find with -exec)
            return [
                "find", target,
                "-type", "f",
                "-exec", "sha256sum", "{}", "+",
            ]

        raise ValueError(f"Unknown tool: {tool_name}")

    def parse_output(self, tool_name: str, line: str) -> dict[str, Any] | None:
        # sha256sum format: "<64-char hash>  <filename>"
        match = re.match(r"^([0-9a-f]{64})\s+(.+)$", line.strip())
        if match:
            self._hashes_found += 1
            return {
                "sha256": match.group(1)[:16] + "...",
                "filename": Path(match.group(2)).name,
                "hashes_computed": str(self._hashes_found),
            }
        return None

    def on_complete(self, instance_id: str, exit_code: int) -> None:
        """Save hash manifest as an artifact on completion."""
        super().on_complete(instance_id, exit_code)

Testing Your Plugin

Create tests/test_my_module.py:

"""Tests for FileHasherModule."""

import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch

import pytest

from src.modules.file_hasher import FileHasherModule, TOOL_HASH_FILE
from src.modules.base_module import ModuleInputError


@pytest.fixture
def module(tmp_path):
    engine = MagicMock()
    hasher = MagicMock()
    return FileHasherModule(
        engine=engine,
        hasher=hasher,
        case_id="test-case-uuid",
        evidence_dir=tmp_path / "evidence",
    )


def test_get_tools(module):
    tools = module.get_tools()
    assert "hash_file" in tools
    assert "hash_directory" in tools


def test_validate_input_missing_target(module):
    with pytest.raises(ModuleInputError) as exc:
        module.validate_input(TOOL_HASH_FILE, {})
    assert exc.value.field == "target"


def test_validate_input_path_traversal(module):
    from src.security.sanitizer import PathTraversalError
    with pytest.raises((ModuleInputError, PathTraversalError)):
        module.validate_input(TOOL_HASH_FILE, {"target": "../../etc/passwd"})


def test_build_command(module, tmp_path):
    test_file = tmp_path / "test.txt"
    test_file.write_text("hello world")
    cmd = module.build_command(TOOL_HASH_FILE, {"target": str(test_file)})
    assert cmd == ["sha256sum", str(test_file)]
    assert isinstance(cmd, list)  # NEVER a string


def test_parse_output(module):
    line = "a9f2e3b1" + "0" * 56 + "  /path/to/file.txt"
    result = module.parse_output(TOOL_HASH_FILE, line)
    assert result is not None
    assert "sha256" in result
    assert "hashes_computed" in result


def test_parse_output_garbage(module):
    assert module.parse_output(TOOL_HASH_FILE, "not a hash line") is None

Run your tests:

pytest tests/test_my_module.py -v

Submitting a Plugin Pull Request

  1. Follow code standards:

    • Type hints on every function
    • Google-style docstrings on every class and method
    • No shell=True anywhere
    • All inputs validated via InputSanitizer
  2. Include tests: minimum 5 test cases covering:

    • get_tools() returns correct list
    • validate_input() rejects empty required fields
    • validate_input() rejects path traversal
    • build_command() produces a list[str]
    • parse_output() handles valid and invalid lines
  3. Update README.md feature matrix with your module's tools

  4. Submit PR with:

    • Description of what the module does
    • Which system binary it wraps
    • Which operating systems it supports
    • Any root/permission requirements
  5. Label your PR: module/new-module