diff --git a/CMakeLists.txt b/CMakeLists.txt index 246187ebbda..52bd87327d4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -31,6 +31,10 @@ option(USE_UBSAN "Build with UndefinedBehaviorSanitizer" OFF) option(UBSAN_HALT_ON_ERROR "Halt on error when building with UBSan" ON) option(USE_GPERFTOOLS "Build with gperftools CPU profiler support" OFF) option(USE_FRAME_POINTER "Build with frame pointers for accurate profiling" OFF) +option(FALCO_MULTI_THREAD "Enable multi-threaded event processing (requires modern eBPF)" OFF) +if(FALCO_MULTI_THREAD) + add_definitions(-DFALCO_MULTI_THREAD) +endif() # Enable frame pointers by default when using gperftools for accurate stack traces if(USE_GPERFTOOLS AND NOT USE_FRAME_POINTER) diff --git a/cmake/modules/CompilerFlags.cmake b/cmake/modules/CompilerFlags.cmake index 56388a0260c..97abfbba503 100644 --- a/cmake/modules/CompilerFlags.cmake +++ b/cmake/modules/CompilerFlags.cmake @@ -68,6 +68,11 @@ if(NOT MSVC) endif() endif() + if(USE_TSAN) + set(FALCO_SECURITY_FLAGS "${FALCO_SECURITY_FLAGS} -fsanitize=thread") + set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=thread") + endif() + if(USE_FRAME_POINTER) set(FALCO_SECURITY_FLAGS "${FALCO_SECURITY_FLAGS} -fno-omit-frame-pointer") endif() diff --git a/cmake/modules/falcosecurity-libs.cmake b/cmake/modules/falcosecurity-libs.cmake index 3872d187df5..c0e95619403 100644 --- a/cmake/modules/falcosecurity-libs.cmake +++ b/cmake/modules/falcosecurity-libs.cmake @@ -157,6 +157,20 @@ else() message(STATUS "No strlcpy and strlcat found, will use local definition") endif() +# Keep libs in sync with Falco's multi-thread option. Use FORCE so toggling FALCO_MULTI_THREAD +# updates a previously cached ENABLE_MULTI_THREAD value. +if(FALCO_MULTI_THREAD) + set(ENABLE_MULTI_THREAD + ON + CACHE BOOL "Multi-threaded libsinsp (from FALCO_MULTI_THREAD)" FORCE + ) +else() + set(ENABLE_MULTI_THREAD + OFF + CACHE BOOL "Multi-threaded libsinsp (from FALCO_MULTI_THREAD)" FORCE + ) +endif() + if(CMAKE_SYSTEM_NAME MATCHES "Linux") include(driver) endif() diff --git a/falco.yaml b/falco.yaml index e71222ba258..e65cf0dbce3 100644 --- a/falco.yaml +++ b/falco.yaml @@ -438,6 +438,12 @@ engine: cpus_for_each_buffer: 2 buf_size_preset: 4 drop_failed_exit: false + # -- [EXPERIMENTAL] Parallel event processing is optional. When Falco and + # falcosecurity/libs are built with multi-thread support, you may add: + # num_worker_threads: + # with N > 1 for TGID-partitioned ring buffers (one worker per buffer); omit + # this key or use 0 for single-threaded mode. If libs lack multi-thread support, + # do not set this key (schema rejects it in those builds). # -- Engine-specific configuration for replay engine which replays a capture file. replay: # -- Path to the capture file to replay (eg: /path/to/file.scap) diff --git a/proposals/20251205-multi-thread-falco-design-review.md b/proposals/20251205-multi-thread-falco-design-review.md new file mode 100644 index 00000000000..80900ffd16e --- /dev/null +++ b/proposals/20251205-multi-thread-falco-design-review.md @@ -0,0 +1,257 @@ +# Review of Multi-Threaded Falco Design Proposal + +## Overall Assessment + +**Status**: Strong foundation with some areas needing clarification and expansion + +The proposal provides a solid high-level design for multi-threading in Falco. The analysis of partitioning strategies is thorough, and the choice of TGID partitioning is well-justified. However, several areas need more detail or clarification before implementation. + +--- + +## Strengths + +1. **Clear Problem Statement**: The goal of addressing single CPU core saturation is well-defined +2. **Comprehensive Trade-off Analysis**: Good comparison of TGID, TID, and pipelining approaches +3. **Honest Risk Assessment**: Acknowledges complexity and potential pitfalls +4. **Structured Approach**: Well-organized sections that build logically + +--- + +## Critical Issues & Recommendations + +### 1. TGID Routing Implementation Details + +**Issue**: The proposal states routing happens "at the kernel driver level" but doesn't specify: +- Whether routing occurs in eBPF code or userspace +- How the hash function is implemented +- Whether `num_workers` is fixed at initialization or dynamic +- How ring buffers are allocated/managed per partition + +**Recommendation**: Add a section clarifying: +- Routing mechanism (eBPF vs userspace) +- Ring buffer allocation strategy +- Hash function selection (e.g., Jenkins hash, simple modulo) +- Dynamic worker scaling considerations + +### 2. Shared State Synchronization Design + +**Issue**: Line 32 mentions "lightweight synchronization mechanisms" but defers to a future document. This is critical for correctness. + +**Recommendation**: At minimum, specify: +- Which data structures need synchronization (thread table, file descriptor tables, etc.) +- Locking strategy (fine-grained locks, lock-free structures, RCU) +- Performance targets (e.g., "synchronization overhead < 5% of event processing time") +- Deadlock prevention strategy + +### 3. Load Imbalance Mitigation + +**Issue**: The "hot process" problem is acknowledged but mitigation strategies are limited. + +**Recommendation**: Consider adding: +- **Work Stealing**: Allow idle workers to steal from busy partitions +- **Dynamic Rebalancing**: Periodically reassign TGIDs to different partitions +- **Metrics**: Define how to measure and detect load imbalance +- **Thresholds**: Define when load imbalance becomes problematic + +### 4. Temporal Consistency Guarantees + +**Issue**: The synchronization point discussion (lines 88-90) is good but incomplete. + +**Recommendation**: Expand to cover: +- **Maximum Wait Time**: What happens if parent event never arrives? +- **Timeout Strategy**: How long to wait before falling back to last-resort fetching? +- **Event Ordering Guarantees**: Document what ordering guarantees are provided vs. what's required +- **Race Condition Examples**: Provide concrete examples of problematic scenarios + +### 5. Output Handling Thread Safety + +**Issue**: Line 5 mentions "output handling" but the proposal doesn't discuss how outputs (gRPC, files, stdout) are handled in a multi-threaded context. + +**Recommendation**: Add section covering: +- Output queue design (per-thread vs. shared) +- Output ordering guarantees (if any) +- Thread-safe output mechanisms +- Rate limiting in multi-threaded context + +### 6. Plugin Thread Safety + +**Issue**: Line 33 mentions plugins aren't thread-safe and defers to a future document, but this affects the entire design. + +**Recommendation**: At minimum, specify: +- Whether plugins will be per-thread instances or shared with locks +- Migration strategy for existing plugins +- Timeline for plugin thread-safety requirements +- Backward compatibility considerations + +### 7. Error Handling and Recovery + +**Issue**: No discussion of error handling when: +- A worker thread crashes +- Ring buffer overflows +- Synchronization deadlocks occur +- Parent thread information is permanently unavailable + +**Recommendation**: Add section on: +- Worker thread failure recovery +- Ring buffer overflow handling +- Deadlock detection/prevention +- Graceful degradation strategies + +### 8. Performance Metrics and Success Criteria + +**Issue**: No clear definition of success metrics. + +**Recommendation**: Define: +- Target throughput improvement (e.g., "2x with 4 threads") +- Acceptable synchronization overhead +- Maximum acceptable event reordering window +- Drop rate reduction targets + +### 9. Migration Path + +**Issue**: No discussion of how to transition from single-threaded to multi-threaded. + +**Recommendation**: Add section on: +- Feature flag/configuration option +- Backward compatibility +- Rollback strategy +- Testing approach + +### 10. Testing Strategy + +**Issue**: No mention of how to test multi-threaded correctness. + +**Recommendation**: Add section on: +- Concurrency testing approaches +- Race condition detection +- Load imbalance testing +- Temporal consistency validation + +--- + +## Technical Concerns + +### 1. Ring Buffer Design + +**Question**: `BPF_MAP_TYPE_RINGBUF` doesn't have per-CPU variants. How will per-TGID ring buffers be implemented? +- Multiple ring buffer maps? +- Single ring buffer with routing in userspace? +- Custom eBPF map type? + +**Recommendation**: Clarify the ring buffer architecture. + +### 2. Hash Function Quality + +**Issue**: Simple modulo on TGID may cause poor distribution if TGIDs are sequential or clustered. + +**Recommendation**: Specify a proper hash function (e.g., Jenkins hash, xxHash) to ensure good distribution. + +### 3. vfork() Handling + +**Issue**: Line 90 mentions vfork() as a special case but doesn't provide a solution. + +**Recommendation**: Either: +- Specify the alternative synchronization point +- Document that vfork() will use last-resort fetching +- Consider adding clone enter parent event back + +### 4. Signal Handling + +**Issue**: Current codebase shows signal handling (SIGUSR1, SIGINT, SIGHUP) in the event loop. How will this work with multiple threads? + +**Recommendation**: Document signal handling strategy for multi-threaded context. + +--- + +## Minor Issues + +1. **Line 19**: Image reference - ensure `images/falco-architecture.png` exists +2. **Line 28**: Image reference - ensure `images/falco-multi-thread-architecture.png` exists +3. **Line 30**: Typo: "writes event into" should be "writes events into" +4. **Line 49**: "ring-buffer" vs "ring buffer" - be consistent with terminology +5. **Line 63**: Consider adding a concrete example of a "hot process" scenario +6. **Line 88**: "clone exit parent event" - consider adding a brief explanation of what this event represents + +--- + +## Missing Sections + +1. **Architecture Diagram Details**: The proposed architecture diagram should show: + - Worker thread pool + - Shared state with synchronization points + - Output handling + - Event flow + +2. **Configuration**: How will users configure: + - Number of worker threads + - Synchronization strategy (blocking vs. deferring) + - Load balancing parameters + +3. **Monitoring**: What metrics will be exposed: + - Per-thread event rates + - Load imbalance metrics + - Synchronization wait times + - Deferred event queue sizes + +4. **Limitations**: Document known limitations: + - Maximum number of supported threads + - Performance degradation scenarios + - Unsupported use cases + +--- + +## Suggestions for Improvement + +### 1. Add Implementation Phases + +Consider breaking the implementation into phases: +- **Phase 1**: Basic TGID partitioning with blocking synchronization +- **Phase 2**: Add deferring mechanism +- **Phase 3**: Add signaling-based synchronization +- **Phase 4**: Optimize with work stealing + +### 2. Add Performance Modeling + +Include expected performance characteristics: +- Linear scaling up to N threads +- Synchronization overhead estimates +- Memory overhead per thread + +### 3. Add Comparison with Alternatives + +Consider briefly comparing with: +- User-space event batching +- Kernel-level improvements +- Hardware offloading + +### 4. Add References + +Include references to: +- Related work on multi-threaded event processing +- BPF ring buffer documentation +- Lock-free data structure papers (if applicable) + +--- + +## Questions for Authors + +1. What is the expected timeline for the shared state synchronization design document? +2. How will this interact with existing multi-source support (line 544-598 in process_events.cpp shows multi-source threading)? +3. Will this be opt-in initially or default behavior? +4. What is the minimum kernel version requirement for BPF_MAP_TYPE_RINGBUF? +5. How will this affect capture mode (offline trace file processing)? + +--- + +## Conclusion + +This is a solid high-level design that addresses a real performance problem. The TGID partitioning approach is well-reasoned, but the proposal needs more detail on: + +1. **Implementation specifics** (routing, ring buffers, synchronization) +2. **Error handling and recovery** +3. **Performance targets and metrics** +4. **Migration and testing strategies** + +I recommend expanding the document with the sections above before proceeding to detailed design documents. The foundation is strong, but these details are critical for successful implementation. + +**Recommendation**: ✅ **Approve with revisions** - Address critical issues before implementation begins. diff --git a/scripts/validate-tsan.sh b/scripts/validate-tsan.sh new file mode 100755 index 00000000000..0907219446b --- /dev/null +++ b/scripts/validate-tsan.sh @@ -0,0 +1,270 @@ +#!/usr/bin/env bash +# SPDX-License-Identifier: Apache-2.0 +# +# Validate Falco in multi-threaded mode with ThreadSanitizer (TSAN): 8 workers, +# no plugins (table API not thread-safe), default ruleset without container +# plugin requirement. Enables Prometheus /metrics on the webserver and scrapes it +# periodically to stress metrics collection under concurrent workers. Runs 10 +# minutes under load (event-generator + stress-ng). +# Uses TSAN suppressions from build-tsan/tsan_suppressions_falco.txt and +# halt_on_error=1 to fail-fast on any data race. +# +# Prerequisites: +# - Falco built with TSAN in build-tsan/ (see .cursor/skills/falco-build-local-libs/SKILL.md) +# - stress-ng, Docker (for event-generator) +# +# Usage (from Falco repo root): +# ./scripts/validate-tsan.sh +# +# Success: 10-minute run under load with no TSAN violation. + +set -e + +FALCO_ROOT="$(cd "$(dirname "$0")/.." && pwd)" +cd "$FALCO_ROOT" + +BUILD_TSAN="${FALCO_ROOT}/build-tsan" +FALCO_BIN="${BUILD_TSAN}/userspace/falco/falco" +CONFIG="${FALCO_ROOT}/falco.yaml" +RULES_NO_CONTAINER="${BUILD_TSAN}/falco_rules_no_container.yaml" +SUPPRESSIONS="${BUILD_TSAN}/tsan_suppressions_falco.txt" +TSAN_LOG="${BUILD_TSAN}/tsan_validation.log" +RUN_DURATION_SEC="${RUN_DURATION_SEC:-600}" + +# Default ruleset URL (remove container plugin requirement for no-plugin run) +RULES_URL="${FALCO_RULES_URL:-https://raw.githubusercontent.com/falcosecurity/rules/main/rules/falco_rules.yaml}" + +LIBS_SUPPRESSIONS="${LIBS_SUPPRESSIONS:-/workspaces/libs/userspace/libsinsp/test/tsan_suppressions.txt}" +FALCO_TSAN_EXTRA="${FALCO_ROOT}/scripts/tsan_suppressions_falco.txt" + +if [[ ! -x "$FALCO_BIN" ]]; then + echo "Falco TSAN binary not found. Build with:" + echo " cmake -B build-tsan -DUSE_JEMALLOC=OFF -DCMAKE_CXX_FLAGS=\"-fsanitize=thread\" -DCMAKE_EXE_LINKER_FLAGS=\"-fsanitize=thread\" ... && cmake --build build-tsan --target falco" + exit 1 +fi + +mkdir -p "$BUILD_TSAN" +{ + [[ -f "$FALCO_TSAN_EXTRA" ]] && cat "$FALCO_TSAN_EXTRA" + [[ -f "$LIBS_SUPPRESSIONS" ]] && cat "$LIBS_SUPPRESSIONS" +} > "$SUPPRESSIONS" +if [[ ! -s "$SUPPRESSIONS" ]]; then + echo "No TSAN suppressions assembled. Add $FALCO_TSAN_EXTRA and/or point LIBS_SUPPRESSIONS at an existing file." + exit 1 +fi +echo "TSAN suppressions: $SUPPRESSIONS (Falco: ${FALCO_TSAN_EXTRA}, libs: ${LIBS_SUPPRESSIONS})" + +if [[ ! -f "$CONFIG" ]]; then + echo "Config not found: $CONFIG" + exit 1 +fi + +# Prepare config and rules: no plugins (avoid container plugin TSAN races), no config_files merge +mkdir -p "$BUILD_TSAN" +CONFIG_TSAN="${BUILD_TSAN}/falco_tsan.yaml" +cp "$CONFIG" "$CONFIG_TSAN" +# Force no plugins and no config_files so /etc/falco/config.d cannot add container plugin +sed -i '/^load_plugins:/s/.*/load_plugins: []/' "$CONFIG_TSAN" +sed -i '/^config_files:/s/.*/config_files: []/' "$CONFIG_TSAN" +# Remove the default config_files list entry so the inline config_files: [] takes effect +sed -i '\| - /etc/falco/config.d|d' "$CONFIG_TSAN" +CONFIG="$CONFIG_TSAN" +echo "Using TSAN config $CONFIG (no plugins, no config_files merge)" + +echo "Fetching default rules and removing container plugin requirement..." +if ! curl -sSLf "$RULES_URL" -o "${RULES_NO_CONTAINER}.tmp"; then + echo "Failed to fetch rules from $RULES_URL" + exit 1 +fi +# Remove required_plugin_versions block so Falco runs without container plugin +sed -e '/required_plugin_versions/,/version: 0\.4\.0/d' \ + "${RULES_NO_CONTAINER}.tmp" > "${RULES_NO_CONTAINER}.tmp2" + +# Adapt or comment out all rules/macros using container fields (no container plugin): +# - Adapt any macro that uses container. to condition (never_true) so rules still load. +# - Comment out only rules that use container. (macros are adapted, not removed). +# - When adapting a macro, replace the whole condition (including multi-line "condition: >" blocks). +awk ' +BEGIN { n=0; has_container_ref=0 } +/^\- (macro|rule|list):/ { + if (n>0) { + if (has_container_ref && (block[0] ~ /^\- macro:/)) { + for (i=0; i0) { + if (has_container_ref && (block[0] ~ /^\- macro:/)) { + for (i=0; i "$RULES_NO_CONTAINER" +rm -f "${RULES_NO_CONTAINER}.tmp" "${RULES_NO_CONTAINER}.tmp2" +echo "Rules written to $RULES_NO_CONTAINER (container-field macros adapted to never_true, container-dependent rules commented out)" + +# TSAN: suppressions + fail-fast on first data race. +# report_atomic_races=0: Folly hazptr / thread-local StaticMeta can report atomic-vs-mutex-init +# races where one stack is only pthread_mutex_lock; our race: suppressions cannot match both sides. +export TSAN_OPTIONS="suppressions=${SUPPRESSIONS} halt_on_error=1 report_atomic_races=0" +echo "TSAN_OPTIONS=$TSAN_OPTIONS" +echo "Starting Falco under TSAN with 8 workers, Prometheus /metrics, no plugins (log: $TSAN_LOG) ..." + +sudo env TSAN_OPTIONS="$TSAN_OPTIONS" "$FALCO_BIN" -c "$CONFIG" -r "$RULES_NO_CONTAINER" \ + -o engine.modern_ebpf.num_worker_threads=8 \ + -o time_format_iso_8601=true \ + -o json_output=true \ + -o metrics.enabled=true \ + -o metrics.interval=5s \ + -o webserver.prometheus_metrics_enabled=true \ + > "$TSAN_LOG" 2>&1 & +FALCO_PID=$! + +cleanup() { + echo "Stopping load generators and Falco..." + if [[ -n "${PROM_SCRAPE_PID:-}" ]]; then + kill "$PROM_SCRAPE_PID" 2>/dev/null || true + wait "$PROM_SCRAPE_PID" 2>/dev/null || true + fi + sudo pkill -P $$ 2>/dev/null || true + sudo kill "$FALCO_PID" 2>/dev/null || true + wait "$FALCO_PID" 2>/dev/null || true +} +trap cleanup EXIT + +# Wait for Falco to open capture and health endpoint +for i in $(seq 1 30); do + if curl -s -o /dev/null -w "%{http_code}" http://127.0.0.1:8765/healthz 2>/dev/null | grep -q 200; then + echo "Falco is up (health check OK)." + break + fi + if ! kill -0 "$FALCO_PID" 2>/dev/null; then + echo "Falco exited early. Last lines of $TSAN_LOG:" + tail -80 "$TSAN_LOG" + exit 1 + fi + sleep 1 +done + +if ! kill -0 "$FALCO_PID" 2>/dev/null; then + echo "Falco exited during startup. Last lines of $TSAN_LOG:" + tail -80 "$TSAN_LOG" + exit 1 +fi + +# /metrics is registered only after inspectors open (later than /healthz) +METRICS_OK=0 +for i in $(seq 1 60); do + if curl -s -o /dev/null -w "%{http_code}" http://127.0.0.1:8765/metrics 2>/dev/null | grep -q 200; then + METRICS_OK=1 + break + fi + if ! kill -0 "$FALCO_PID" 2>/dev/null; then + echo "Falco exited before /metrics became available. Last lines of $TSAN_LOG:" + tail -80 "$TSAN_LOG" + exit 1 + fi + sleep 1 +done +if [[ "$METRICS_OK" -ne 1 ]]; then + echo "Prometheus /metrics did not return HTTP 200 within 60s. Last lines of $TSAN_LOG:" + tail -80 "$TSAN_LOG" + exit 1 +fi +echo "Prometheus /metrics OK (scraping in background for ${RUN_DURATION_SEC}s)." + +# Scrape /metrics concurrently with event workers to exercise metrics under TSAN +PROM_SCRAPE_LOG="${BUILD_TSAN}/prometheus_scrape.log" +( + end=$((SECONDS + RUN_DURATION_SEC)) + while [[ $SECONDS -lt $end ]]; do + curl -sS -o /dev/null --max-time 5 http://127.0.0.1:8765/metrics 2>/dev/null || true + sleep 2 + done +) >> "$PROM_SCRAPE_LOG" 2>&1 & +PROM_SCRAPE_PID=$! + +echo "Running load for ${RUN_DURATION_SEC}s (event-generator + stress-ng + /metrics scrapes)..." + +# stress-ng for full duration +STRESS_PID="" +if command -v stress-ng &>/dev/null; then + stress-ng --cpu 2 --timeout "${RUN_DURATION_SEC}s" >> "${BUILD_TSAN}/stress-ng.log" 2>&1 & + STRESS_PID=$! +fi + +# event-generator syscall actions in a loop for full duration +EG_PID="" +if command -v docker &>/dev/null; then + ( + echo "Starting event-generator (Docker) syscall loop..." + end=$((SECONDS + RUN_DURATION_SEC)) + while [[ $SECONDS -lt $end ]]; do + docker run --rm --pid=host falcosecurity/event-generator run syscall --sleep 50ms 2>/dev/null || true + done + ) >> "${BUILD_TSAN}/event-generator.log" 2>&1 & + EG_PID=$! +else + echo "Docker not found; skipping event-generator (stress-ng only)." +fi + +# Wait for the full run duration; if Falco exits early (e.g. TSAN halt_on_error), we detect it +elapsed=0 +while [[ $elapsed -lt $RUN_DURATION_SEC ]]; do + sleep 10 + elapsed=$((elapsed + 10)) + if ! kill -0 "$FALCO_PID" 2>/dev/null; then + echo "Falco exited after ${elapsed}s. Last lines of $TSAN_LOG:" + tail -120 "$TSAN_LOG" + exit 1 + fi + printf "\r %ds / %ds ..." "$elapsed" "$RUN_DURATION_SEC" +done +echo "" + +# Optional: wait for load generators to finish (they may already be done) +[[ -n "$STRESS_PID" ]] && wait "$STRESS_PID" 2>/dev/null || true +[[ -n "$EG_PID" ]] && wait "$EG_PID" 2>/dev/null || true + +# Cleanup will stop Falco +sleep 2 + +if grep -q "ThreadSanitizer: data race" "$TSAN_LOG" 2>/dev/null; then + echo "--- TSAN reported data race(s). Summary from $TSAN_LOG ---" + grep -A 1 "ThreadSanitizer: data race" "$TSAN_LOG" || true + echo "Full log: $TSAN_LOG" + exit 1 +fi + +echo "Validation finished: ${RUN_DURATION_SEC}s run under load with no TSAN data races. See $TSAN_LOG for full output." diff --git a/userspace/engine/formats.cpp b/userspace/engine/formats.cpp index c266e0311f5..ec8d9174db3 100644 --- a/userspace/engine/formats.cpp +++ b/userspace/engine/formats.cpp @@ -97,8 +97,9 @@ std::string falco_formats::format_event(sinsp_evt *evt, char time_sec[20]; // sizeof "YYYY-MM-DDTHH:MM:SS" char time_ns[12]; // sizeof ".sssssssssZ" std::string iso8601evttime; + struct tm tm_buf; - strftime(time_sec, sizeof(time_sec), "%FT%T", gmtime(&evttime)); + strftime(time_sec, sizeof(time_sec), "%FT%T", gmtime_r(&evttime, &tm_buf)); snprintf(time_ns, sizeof(time_ns), ".%09luZ", evt->get_ts() % 1000000000); iso8601evttime = time_sec; iso8601evttime += time_ns; diff --git a/userspace/falco/app/actions/configure_syscall_buffer_num.cpp b/userspace/falco/app/actions/configure_syscall_buffer_num.cpp index 5e671be0c15..9d4f6615910 100644 --- a/userspace/falco/app/actions/configure_syscall_buffer_num.cpp +++ b/userspace/falco/app/actions/configure_syscall_buffer_num.cpp @@ -32,14 +32,30 @@ falco::app::run_result falco::app::actions::configure_syscall_buffer_num( return run_result::fatal("cannot get the number of online CPUs from the system\n"); } - if(s.config->m_modern_ebpf.m_cpus_for_each_buffer > online_cpus) { - falco_logger::log(falco_logger::level::WARNING, - "you required a buffer every '" + - std::to_string(s.config->m_modern_ebpf.m_cpus_for_each_buffer) + - "' CPUs but there are only '" + std::to_string(online_cpus) + - "' online CPUs. Falco changed the config to: one buffer every '" + - std::to_string(online_cpus) + "' CPUs\n"); - s.config->m_modern_ebpf.m_cpus_for_each_buffer = online_cpus; + auto num_workers = s.config->m_modern_ebpf.m_num_worker_threads; + if(num_workers > 1) { + if(num_workers > online_cpus) { + falco_logger::log(falco_logger::level::WARNING, + "[EXPERIMENTAL] Requested " + std::to_string(num_workers) + + " worker threads but only " + + std::to_string(online_cpus) + + " CPUs are online. Performance may degrade.\n"); + } + falco_logger::log(falco_logger::level::INFO, + "[EXPERIMENTAL] Multi-threaded mode with " + + std::to_string(num_workers) + + " worker threads (TGID-partitioned ring buffers).\n"); + } else { + if(s.config->m_modern_ebpf.m_cpus_for_each_buffer > online_cpus) { + falco_logger::log( + falco_logger::level::WARNING, + "you required a buffer every '" + + std::to_string(s.config->m_modern_ebpf.m_cpus_for_each_buffer) + + "' CPUs but there are only '" + std::to_string(online_cpus) + + "' online CPUs. Falco changed the config to: one buffer every '" + + std::to_string(online_cpus) + "' CPUs\n"); + s.config->m_modern_ebpf.m_cpus_for_each_buffer = online_cpus; + } } #endif return run_result::ok(); diff --git a/userspace/falco/app/actions/helpers_inspector.cpp b/userspace/falco/app/actions/helpers_inspector.cpp index c809dcd819c..713c65788ff 100644 --- a/userspace/falco/app/actions/helpers_inspector.cpp +++ b/userspace/falco/app/actions/helpers_inspector.cpp @@ -101,15 +101,29 @@ falco::app::run_result falco::app::actions::open_live_inspector(falco::app::stat { falco_logger::log(falco_logger::level::INFO, "Opening '" + source + "' source with modern BPF probe."); - falco_logger::log( - falco_logger::level::INFO, - "One ring buffer every '" + - std::to_string(s.config->m_modern_ebpf.m_cpus_for_each_buffer) + - "' CPUs."); - inspector->open_modern_bpf(s.syscall_buffer_bytes_size, - s.config->m_modern_ebpf.m_cpus_for_each_buffer, - true, - s.selected_sc_set); + auto num_workers = s.config->m_modern_ebpf.m_num_worker_threads; + if(num_workers > 1) { + falco_logger::log( + falco_logger::level::INFO, + "[EXPERIMENTAL] Multi-threaded mode: " + std::to_string(num_workers) + + " worker threads with TGID-partitioned ring buffers."); + inspector->open_modern_bpf(s.syscall_buffer_bytes_size, + num_workers, + num_workers, + true, + s.selected_sc_set); + } else { + falco_logger::log( + falco_logger::level::INFO, + "One ring buffer every '" + + std::to_string(s.config->m_modern_ebpf.m_cpus_for_each_buffer) + + "' CPUs."); + inspector->open_modern_bpf(s.syscall_buffer_bytes_size, + s.config->m_modern_ebpf.m_cpus_for_each_buffer, + DEFAULT_ITERS_NUM, + true, + s.selected_sc_set); + } } else /* Kernel module (default). */ { try { diff --git a/userspace/falco/app/actions/process_events.cpp b/userspace/falco/app/actions/process_events.cpp index 515a25d4a98..4a47be79fba 100644 --- a/userspace/falco/app/actions/process_events.cpp +++ b/userspace/falco/app/actions/process_events.cpp @@ -39,6 +39,8 @@ limitations under the License. #include #include +#include "formats.h" + using namespace falco::app; using namespace falco::app::actions; @@ -364,6 +366,227 @@ static falco::app::run_result do_inspect( return run_result::ok(); } +#ifdef FALCO_MULTI_THREAD + +struct worker_context { + std::shared_ptr engine; + std::unique_ptr formats; + size_t source_engine_idx; +}; + +static worker_context create_worker_context(falco::app::state& s, + std::shared_ptr inspector, + const std::string& source) { + worker_context ctx; + auto src_info = s.source_infos.at(source); + auto& filterchecks = *src_info->filterchecks; + + ctx.engine = std::make_shared(); + auto filter_factory = std::make_shared(inspector.get(), filterchecks); + auto formatter_factory = + std::make_shared(inspector.get(), filterchecks); + if(s.config->m_json_output) { + formatter_factory->set_output_format(sinsp_evt_formatter::OF_JSON); + } + ctx.source_engine_idx = ctx.engine->add_source(source, filter_factory, formatter_factory); + + std::vector rules_contents; + falco::load_result::rules_contents_t rc; + read_files(s.config->m_loaded_rules_filenames.begin(), + s.config->m_loaded_rules_filenames.end(), + rules_contents, + rc); + + for(auto& filename : s.config->m_loaded_rules_filenames) { + auto res = ctx.engine->load_rules(rc.at(filename), filename); + if(!res->successful()) { + throw falco_exception("Worker engine failed to load rules from " + filename + ": " + + res->as_string(true, rc)); + } + } + + for(const auto& sel : s.config->m_rules_selection) { + bool enable = sel.m_op == falco_configuration::rule_selection_operation::enable; + if(!sel.m_rule.empty()) { + ctx.engine->enable_rule_wildcard(sel.m_rule, enable); + } + if(!sel.m_tag.empty()) { + ctx.engine->enable_rule_by_tag(std::set{sel.m_tag}, enable); + } + } + + ctx.engine->set_min_priority(s.config->m_min_priority); + ctx.engine->complete_rule_loading(); + + ctx.formats = std::make_unique(ctx.engine, + s.config->m_json_include_output_property, + s.config->m_json_include_tags_property, + s.config->m_json_include_message_property, + s.config->m_json_include_output_fields_property, + s.config->m_time_format_iso_8601); + return ctx; +} + +static falco::app::run_result do_inspect_multi_thread(falco::app::state& s, + std::shared_ptr inspector, + const std::string& source, + uint64_t duration_to_tot_ns) { + auto num_workers = s.config->m_modern_ebpf.m_num_worker_threads; + falco_logger::log(falco_logger::level::INFO, + "[EXPERIMENTAL] Creating " + std::to_string(num_workers) + + " worker contexts for multi-threaded event processing.\n"); + + std::vector worker_ctxs; + worker_ctxs.reserve(num_workers); + for(uint16_t i = 0; i < num_workers; i++) { + worker_ctxs.push_back(create_worker_context(s, inspector, source)); + } + + inspector->start_capture(); + + std::vector worker_results(num_workers); + std::vector threads; + std::vector worker_evt_counts(num_workers, 0); + std::string hostname; + { + char buf[256]; + if(gethostname(buf, sizeof(buf)) == 0) { + hostname = buf; + } + } + + for(uint16_t i = 0; i < num_workers; i++) { + threads.emplace_back([&s, + &inspector, + &worker_ctxs, + &worker_results, + &worker_evt_counts, + &hostname, + i, + duration_to_tot_ns]() { + auto& wctx = worker_ctxs[i]; + auto& result = worker_results[i]; + auto& num_evts = worker_evt_counts[i]; + result = run_result::ok(); + + try { + sinsp_buffer_t buffer_h = inspector->reserve_buffer_handle(); + falco_logger::log(falco_logger::level::DEBUG, + "[EXPERIMENTAL] Worker " + std::to_string(i) + + " reserved buffer handle " + std::to_string(buffer_h) + + "\n"); + + sinsp_evt* ev = nullptr; + uint64_t duration_start = 0; + + while(true) { + int32_t rc = inspector->next(&ev, buffer_h); + + if(falco::app::g_terminate_signal.triggered()) { + break; + } + if(falco::app::g_restart_signal.triggered()) { + break; + } + + if(rc == SCAP_TIMEOUT || rc == SCAP_FILTERED_EVENT) { + continue; + } + if(rc == SCAP_EOF) { + break; + } + if(rc != SCAP_SUCCESS) { + result = run_result::fatal("Worker " + std::to_string(i) + ": " + + inspector->getlasterr()); + break; + } + + if(duration_start == 0) { + duration_start = ev->get_ts(); + } else if(duration_to_tot_ns > 0) { + const int64_t diff = ev->get_ts() - duration_start; + if(diff >= (int64_t)duration_to_tot_ns) { + break; + } + } + + auto res = wctx.engine->process_event(wctx.source_engine_idx, + ev, + s.config->m_rule_matching); + if(res != nullptr) { + for(auto& rule_res : *res) { + std::string formatted_msg = wctx.formats->format_event( + rule_res.evt, + rule_res.rule, + rule_res.source, + falco_common::format_priority(rule_res.priority_num), + rule_res.format, + rule_res.tags, + hostname, + rule_res.extra_output_fields); + + auto fields = wctx.formats->get_field_values(rule_res.evt, + rule_res.source, + rule_res.format); + for(auto const& ef : rule_res.extra_output_fields) { + std::string fformat = ef.second.first; + if(fformat.empty()) { + continue; + } + if(fformat[0] != '*') { + fformat = "*" + fformat; + } + fields[ef.first] = wctx.formats->format_string(rule_res.evt, + fformat, + rule_res.source); + } + + nlohmann::json json_fields; + for(auto& f : fields) { + json_fields[f.first] = f.second; + } + + s.outputs->handle_event_formatted(rule_res.evt->get_ts(), + rule_res.priority_num, + formatted_msg, + rule_res.rule, + rule_res.source, + json_fields, + rule_res.tags); + } + } + + num_evts++; + } + } catch(const std::exception& e) { + result = run_result::fatal("Worker " + std::to_string(i) + ": " + e.what()); + } + }); + } + + for(uint16_t i = 0; i < num_workers; i++) { + threads[i].join(); + } + + inspector->stop_capture(); + + uint64_t total_evts = 0; + auto merged = run_result::ok(); + for(uint16_t i = 0; i < num_workers; i++) { + total_evts += worker_evt_counts[i]; + merged = run_result::merge(merged, worker_results[i]); + falco_logger::log(falco_logger::level::INFO, + "[EXPERIMENTAL] Worker " + std::to_string(i) + " processed " + + std::to_string(worker_evt_counts[i]) + " events.\n"); + } + falco_logger::log(falco_logger::level::INFO, + "[EXPERIMENTAL] Total events processed across all workers: " + + std::to_string(total_evts) + "\n"); + return merged; +} + +#endif // FALCO_MULTI_THREAD + static void process_inspector_events( falco::app::state& s, std::shared_ptr inspector, @@ -566,13 +789,28 @@ falco::app::run_result falco::app::actions::process_events(falco::app::state& s) s.on_inspectors_opened(); } - // optimization: with only one source we don't spawn additional threads - process_inspector_events(s, - src_info->inspector, - statsw, - source, - ctx.sync.get(), - &ctx.res); +#ifdef FALCO_MULTI_THREAD + bool use_multi_thread = s.is_modern_ebpf() && + s.config->m_modern_ebpf.m_num_worker_threads > 1 && + source == falco_common::syscall_source; + + if(use_multi_thread) { + ctx.res = do_inspect_multi_thread( + s, + src_info->inspector, + source, + uint64_t(s.options.duration_to_tot * ONE_SECOND_IN_NS)); + ctx.sync->finish(); + } else +#endif // FALCO_MULTI_THREAD + { + process_inspector_events(s, + src_info->inspector, + statsw, + source, + ctx.sync.get(), + &ctx.res); + } } else { auto res_ptr = &ctx.res; auto sync_ptr = ctx.sync.get(); diff --git a/userspace/falco/config_json_schema.h b/userspace/falco/config_json_schema.h index 2d21529310b..0827e64c763 100644 --- a/userspace/falco/config_json_schema.h +++ b/userspace/falco/config_json_schema.h @@ -17,7 +17,21 @@ limitations under the License. #pragma once -#define LONG_STRING_CONST(...) #__VA_ARGS__ +/* Omitted from config JSON schema when libsinsp is built without ENABLE_MULTI_THREAD. */ +#if defined(ENABLE_MULTI_THREAD) +#define FALCO_CONFIG_SCHEMA_MODERN_EBPF_EXTRA_PROPS \ + , \ + "num_worker_threads": { \ + "type": "integer" \ + } +#else +#define FALCO_CONFIG_SCHEMA_MODERN_EBPF_EXTRA_PROPS +#endif + +/* Two-step stringify so nested macros (e.g. FALCO_CONFIG_SCHEMA_MODERN_EBPF_EXTRA_PROPS) expand + * before #; a single #__VA_ARGS__ can stringify the macro name instead of its replacement. */ +#define CONFIG_SCHEMA_STR_HELPER(...) #__VA_ARGS__ +#define LONG_STRING_CONST(...) CONFIG_SCHEMA_STR_HELPER(__VA_ARGS__) const char config_schema_string[] = LONG_STRING_CONST( @@ -400,6 +414,7 @@ const char config_schema_string[] = LONG_STRING_CONST( "drop_failed_exit": { "type": "boolean" } + FALCO_CONFIG_SCHEMA_MODERN_EBPF_EXTRA_PROPS }, "title": "ModernEbpf" }, diff --git a/userspace/falco/configuration.cpp b/userspace/falco/configuration.cpp index b0ff990c6f1..7b91cc9de00 100644 --- a/userspace/falco/configuration.cpp +++ b/userspace/falco/configuration.cpp @@ -264,6 +264,26 @@ void falco_configuration::load_engine_config(const std::string &config_name) { m_modern_ebpf.m_drop_failed_exit = m_config.get_scalar("engine.modern_ebpf.drop_failed_exit", DEFAULT_DROP_FAILED_EXIT); +#if defined(ENABLE_MULTI_THREAD) + m_modern_ebpf.m_num_worker_threads = + m_config.get_scalar("engine.modern_ebpf.num_worker_threads", 0); +#else + { + /* Schema may warn (not fail) on unknown keys; accept 0 as the default from stock + * falco.yaml but reject a non-zero request. */ + uint16_t requested = + m_config.get_scalar("engine.modern_ebpf.num_worker_threads", 0); + if(requested != 0) { + throw std::logic_error( + "Error reading config file (" + config_name + + "): engine.modern_ebpf.num_worker_threads=" + std::to_string(requested) + + " is set, but falcosecurity/libs was built without multi-thread support " + "(ENABLE_MULTI_THREAD is off). Remove this setting or rebuild libs with " + "-DENABLE_MULTI_THREAD=ON."); + } + m_modern_ebpf.m_num_worker_threads = 0; + } +#endif break; case engine_kind_t::REPLAY: m_replay.m_capture_file = diff --git a/userspace/falco/configuration.h b/userspace/falco/configuration.h index 68baea7dce0..c353e0a2395 100644 --- a/userspace/falco/configuration.h +++ b/userspace/falco/configuration.h @@ -70,6 +70,7 @@ class falco_configuration { uint16_t m_cpus_for_each_buffer; int16_t m_buf_size_preset; bool m_drop_failed_exit; + uint16_t m_num_worker_threads; }; struct replay_config { diff --git a/userspace/falco/falco_outputs.cpp b/userspace/falco/falco_outputs.cpp index aed55e19c20..2373bd1d071 100644 --- a/userspace/falco/falco_outputs.cpp +++ b/userspace/falco/falco_outputs.cpp @@ -156,6 +156,25 @@ void falco_outputs::handle_event(sinsp_evt *evt, this->push(cmsg); } +void falco_outputs::handle_event_formatted(uint64_t ts, + falco_common::priority_type priority, + const std::string &msg, + const std::string &rule, + const std::string &source, + const nlohmann::json &fields, + const std::set &tags) { + falco_outputs::ctrl_msg cmsg = {}; + cmsg.ts = ts; + cmsg.priority = priority; + cmsg.msg = msg; + cmsg.rule = rule; + cmsg.source = source; + cmsg.fields = fields; + cmsg.tags = tags; + cmsg.type = ctrl_msg_type::CTRL_MSG_OUTPUT; + this->push(cmsg); +} + void falco_outputs::handle_msg(uint64_t ts, falco_common::priority_type priority, const std::string &msg, @@ -180,8 +199,9 @@ void falco_outputs::handle_msg(uint64_t ts, char time_sec[20]; // sizeof "YYYY-MM-DDTHH:MM:SS" char time_ns[12]; // sizeof ".sssssssssZ" std::string iso8601evttime; + struct tm tm_buf; - strftime(time_sec, sizeof(time_sec), "%FT%T", gmtime(&evttime)); + strftime(time_sec, sizeof(time_sec), "%FT%T", gmtime_r(&evttime, &tm_buf)); snprintf(time_ns, sizeof(time_ns), ".%09luZ", ts % 1000000000); iso8601evttime = time_sec; iso8601evttime += time_ns; diff --git a/userspace/falco/falco_outputs.h b/userspace/falco/falco_outputs.h index adfe0f6edba..a17b1f302fd 100644 --- a/userspace/falco/falco_outputs.h +++ b/userspace/falco/falco_outputs.h @@ -77,6 +77,19 @@ class falco_outputs { const std::string &rule, nlohmann::json &output_fields); + /*! + \brief Push a pre-formatted event message to the output queue. + Used by multi-threaded workers that format events with their own + per-worker falco_formats/engine instances. + */ + void handle_event_formatted(uint64_t ts, + falco_common::priority_type priority, + const std::string &msg, + const std::string &rule, + const std::string &source, + const nlohmann::json &fields, + const std::set &tags); + /*! \brief Sends a cleanup message to all outputs. Each output can have an implementation-specific behavior. diff --git a/userspace/falco/webserver.cpp b/userspace/falco/webserver.cpp index 5b83162a37a..ee0ce504200 100644 --- a/userspace/falco/webserver.cpp +++ b/userspace/falco/webserver.cpp @@ -58,6 +58,23 @@ void falco_webserver::start(const falco::app::state &state, res.set_content(versions_json_str, "application/json"); }); + // Register /metrics before listen(): dynamic Get() after listen() races with httplib workers + // serving concurrent requests (TSAN). Handlers stay gated until inspectors are open. + if(state.config->m_metrics_enabled && + state.config->m_webserver_config.m_prometheus_metrics_enabled) { + m_prometheus_metrics_ready.store(false, std::memory_order_release); + m_server->Get("/metrics", [this, &state](const httplib::Request &, httplib::Response &res) { + if(!m_prometheus_metrics_ready.load(std::memory_order_acquire)) { + res.status = 503; + res.set_content("# Falco metrics not ready (inspectors not open yet)\n", + falco_metrics::content_type_prometheus); + return; + } + res.set_content(falco_metrics::to_text_prometheus(state), + falco_metrics::content_type_prometheus); + }); + } + // run server in a separate thread if(!m_server->is_valid()) { m_server = nullptr; @@ -104,9 +121,6 @@ void falco_webserver::stop() { void falco_webserver::enable_prometheus_metrics(const falco::app::state &state) { if(state.config->m_metrics_enabled && state.config->m_webserver_config.m_prometheus_metrics_enabled) { - m_server->Get("/metrics", [&state](const httplib::Request &, httplib::Response &res) { - res.set_content(falco_metrics::to_text_prometheus(state), - falco_metrics::content_type_prometheus); - }); + m_prometheus_metrics_ready.store(true, std::memory_order_release); } } diff --git a/userspace/falco/webserver.h b/userspace/falco/webserver.h index faf204be464..163e2fb93f8 100644 --- a/userspace/falco/webserver.h +++ b/userspace/falco/webserver.h @@ -22,6 +22,7 @@ limitations under the License. #include +#include #include #include @@ -47,4 +48,7 @@ class falco_webserver { std::unique_ptr m_server = nullptr; std::thread m_server_thread; std::atomic m_failed; + /// Set after inspectors are open; /metrics is registered before listen() to avoid racing + /// httplib route registration with worker threads handling other endpoints. + std::atomic m_prometheus_metrics_ready{false}; };