A systematic Apache Pulsar load testing framework using OpenMessaging Benchmark (OMB) with automated test execution, comprehensive reporting, and cost tracking.
What it does: Run performance benchmarks against your Pulsar cluster What it doesn't do: Deploy or manage Pulsar infrastructure (your cluster must already exist)
⚠️ IMPORTANT: This framework assumes you have a running Pulsar cluster accessible via kubectl. For infrastructure setup, deployment guides, and troubleshooting, see CLAUDE.md.
One-time setup (macOS/Linux):
-
Install Nix (package manager):
curl --proto '=https' --tlsv1.2 -sSf -L https://install.determinate.systems/nix | sh -s -- install
-
Install direnv:
# macOS brew install direnv nix-direnv # Add to ~/.zshrc (or ~/.bashrc): eval "$(direnv hook zsh)"
-
Restart your shell after adding the direnv hook.
First time in a repo: When you
cdinto a directory with a new.envrc, direnv blocks it for security. Rundirenv allowonce to approve it.
Cluster access:
- Running Pulsar cluster (EKS, self-hosted, etc.) accessible via kubectl
- AWS credentials (for cost tracking only)
# Quick proof-of-concept test (20k msgs/sec, 2 minutes)
python scripts/orchestrator.py run \
--test-plan config/test-plans/poc.yaml
# View results
python scripts/orchestrator.py list
python scripts/orchestrator.py report --experiment-id latestResults are saved to results/latest/ with detailed metrics, latency percentiles, and HTML report.
-
Clone the repository:
git clone https://github.com/klaviyo/pulsar-aws-lab.git cd pulsar-aws-lab -
Allow direnv (first time only):
direnv allow
This will automatically:
- Download and cache Python 3.13.11
- Create
.venvwith all dependencies - Provide kubectl, helm, aws CLI in PATH
-
Verify setup:
which python # Should show: .../pulsar-aws-lab/.venv/bin/python python --version # Should show: Python 3.13.11 kubectl version # Should work (provided by Nix)
-
Configure kubectl (for your Pulsar cluster):
# For EKS clusters aws eks update-kubeconfig --region <region> --name <cluster-name> # Verify connection kubectl cluster-info kubectl get pods -n pulsar
-
Configure AWS credentials (optional, for cost tracking):
aws configure # Or export environment variables export AWS_ACCESS_KEY_ID=your_access_key export AWS_SECRET_ACCESS_KEY=your_secret_key export AWS_DEFAULT_REGION=us-east-1
Note for pyenv users: The Nix environment automatically removes pyenv shims from PATH to avoid conflicts. Your global pyenv setup is unaffected outside this repository.
The orchestrator provides three main commands:
Execute a test plan against your Pulsar cluster:
# Run proof-of-concept test (2 minutes, 20k msgs/sec)
python scripts/orchestrator.py run \
--test-plan config/test-plans/poc.yaml
# Run baseline tests (5 test variations, ~15 minutes total)
python scripts/orchestrator.py run \
--test-plan config/test-plans/baseline.yaml
# Custom experiment ID for organization
python scripts/orchestrator.py run \
--test-plan config/test-plans/my-tests.yaml \
--experiment-id latency-tuning-v3What happens during a test run:
- Validates Pulsar cluster is accessible
- Creates OMB producer/consumer Jobs in Kubernetes
- Runs each test variation sequentially
- Collects metrics: throughput, latency (p50/p95/p99/p99.9/max), errors
- Saves results to
results/<experiment-id>/
Create comprehensive HTML reports with charts and analysis:
# Report for latest experiment
python scripts/orchestrator.py report --experiment-id latest
# Report for specific experiment
python scripts/orchestrator.py report --experiment-id my-experiment-123Reports include:
- Throughput trends (msgs/sec, MB/sec)
- Latency percentiles and distribution
- Cost breakdown per test variation
- Configuration details and metadata
- Raw data files for further analysis
View all experiments and their results:
python scripts/orchestrator.py listOutput shows experiment IDs, timestamps, test plans used, and result locations.
Test plans define what benchmarks to run. Examples in config/test-plans/:
poc.yaml - Quick validation (2 minutes)
name: "proof-of-concept"
description: "Quick test to validate cluster"
base_workload:
topics: 1
partitions_per_topic: 8
message_size: 1024
test_duration_minutes: 2
test_runs:
- name: "baseline"
type: "fixed_rate"
producer_rate: 20000 # 20k msgs/secbaseline.yaml - Systematic exploration (5 tests, ~15 minutes)
name: "baseline"
description: "Standard performance baseline"
variations:
- message_size: [256, 1024, 4096]
- producer_rate: [10000, 50000]
- partitions_per_topic: [8, 16]Create custom test plans:
name: "my-custom-tests"
description: "Explore high-throughput scenarios"
base_workload:
topics: 1
partitions_per_topic: 16
message_size: 1024
producers_per_topic: 4
consumers_per_topic: 4
test_duration_minutes: 5
test_runs:
- name: "moderate-load"
type: "fixed_rate"
producer_rate: 50000
- name: "high-load"
type: "fixed_rate"
producer_rate: 100000
- name: "max-throughput"
type: "ramp_up"
initial_rate: 50000
rate_increment: 10000
increment_interval_minutes: 1Test types:
fixed_rate: Constant throughput testramp_up: Gradually increase load to find limitsscale_to_failure: Push until cluster saturates (future)latency_sensitivity: Measure latency under varying loads (future)
Workload parameters are defined within test plans. The base_workload section sets defaults, and individual test runs can override specific values:
# config/test-plans/custom.yaml
base_workload:
name: "custom-baseline"
topics: 10
partitions_per_topic: 16
message_size: 2048
subscriptions_per_topic: 1
consumers_per_topic: 4
producers_per_topic: 4
consumer_backlog_size_gb: 0
test_duration_minutes: 10
warmup_duration_minutes: 1
test_runs:
- name: "high-load"
type: "fixed_rate"
producer_rate: 50000All experiment data is saved to results/<experiment-id>/:
results/exp-20241013-120000/
├── test_report.html # Interactive HTML report
├── benchmark_results/
│ ├── poc-20k.log # OMB test results
│ └── ...
├── workload_poc-20k.yaml # Generated workload ConfigMap
├── omb_workers_poc-20k.yaml # Generated workers StatefulSet
├── omb_job_poc-20k.yaml # Generated driver Job
└── orchestrator.log # Execution log
Quick access to latest results:
# Latest experiment is symlinked
cd results/latest
# Open report in browser
open test_report.html # macOS
xdg-open test_report.html # Linux# Baseline before changes
python scripts/orchestrator.py run \
--test-plan config/test-plans/baseline.yaml \
--experiment-id before-tuning
# Make config changes to your Pulsar cluster
# (e.g., update JVM settings, replication factor, etc.)
# Test after changes
python scripts/orchestrator.py run \
--test-plan config/test-plans/baseline.yaml \
--experiment-id after-tuning
# Compare reports
python scripts/orchestrator.py report --experiment-id before-tuning
python scripts/orchestrator.py report --experiment-id after-tuning# Use ramp-up test to find saturation point
python scripts/orchestrator.py run \
--test-plan config/test-plans/max-throughput.yaml# Create test plan with message size variations
cat > config/test-plans/message-sizes.yaml <<EOF
name: "message-size-exploration"
variations:
message_size: [128, 512, 1024, 4096, 16384]
producer_rate: [20000]
EOF
python scripts/orchestrator.py run \
--test-plan config/test-plans/message-sizes.yamlOptional configuration via environment variables:
# Custom results directory
export PULSAR_LAB_HOME="/data/benchmarks"
# Kubernetes namespace (default: pulsar)
export PULSAR_NAMESPACE="my-pulsar-namespace"
# AWS region for cost tracking
export AWS_DEFAULT_REGION="us-west-2"
# Experiment tags for cost allocation
export EXPERIMENT_TAGS="team=data-platform,owner=jane.doe"For comprehensive documentation, see CLAUDE.md:
- Architecture: How OMB runs in Kubernetes
- Infrastructure Setup: Deploying Pulsar on EKS (if needed)
- Development: Building custom OMB Docker images
- Troubleshooting: Common issues and debugging
- Advanced Usage: Helm operations, custom configurations
Quick troubleshooting commands:
# Check OMB Job status
kubectl get jobs -n pulsar
# View OMB logs
kubectl logs -n pulsar job/omb-<test-name>
# Verify Pulsar broker connectivity
kubectl exec -n pulsar pulsar-broker-0 -- bin/pulsar-admin brokers list pulsar-cluster
# Check cluster health
kubectl get pods -n pulsar- CLAUDE.md - Complete technical documentation
- Test Plans - Example test configurations and workload definitions
Contributions welcome! Please:
- Check existing Jira tickets in the PLAT project or create a new one
- Create a feature branch
- Submit a pull request with tests
MIT License - see LICENSE file
- Issues: Jira (PLAT project)
- Pulsar Community: Slack | Mailing List