Skip to content

Phase 1: Single Raft group (leader followers)

Uttom Akash edited this page Nov 12, 2025 · 1 revision

Phase 1: Single Raft group (leader + followers) that fits your setup:

  • core vector engine (vectorlite) implemented in Rust, exposing a gRPC control API to apply writes, export/import index, and do consistent SQLite backups.
  • a Go control-plane process co-located with each Rust node that runs a Raft instance (HashiCorp Raft in examples below). The Raft FSM, when a log entry is applied, invokes the local Rust gRPC to perform the SQL + index update inside the same local process (so DB+index remain consistent).
  • clients talk to any Go node via gRPC/HTTP; followers redirect to leader or forward requests.
  • snapshots are exported via Rust gRPC (a consistent SQLite DB backup + index archive) and carried by Raft snapshot mechanism.

Below I provide:

  1. Architecture summary (short).
  2. Protobuf (command + control RPCs) that you should implement in Rust and use from Go.
  3. Go implementation blueprint for Raft node: FSM.Apply, Snapshot, Restore, starting Raft, client handling, leader forwarding.
  4. Snapshot & restore flow (how Rust side participates).
  5. Operational notes, tuning, and test checklist.

If you want, I can expand any snippet into a runnable repo — say which piece you want first.


1) Architecture summary — single Raft group

  • Each physical node runs:

    • Rust vectorlite service (listens on local gRPC socket, manages SQLite file + HNSW index).
    • Go Raft control-plane (runs HashiCorp Raft, persists raft logs locally, implements FSM). The Go process calls the local Rust gRPC to apply commands and to take/restore snapshots.
  • Client flow for a write:

    1. Client -> any Go node (gRPC/HTTP). If node is a follower, it responds with leader address (or forwards).
    2. Request accepted by leader: leader serializes a Command and calls raft.Apply(...).
    3. When the log entry is committed and FSM.Apply runs on majority, FSM.Apply invokes local Rust gRPC to execute the command inside a local SQLite transaction (ensuring DB and index update are atomic locally).
    4. Leader returns success to client after commit and local apply.
  • Snapshot: leader uses Rust gRPC ExportSnapshot to get a consistent DB+index archive and provides it to Raft snapshot store; followers restore snapshots by passing archive to Rust gRPC ImportSnapshot.

Why this:

  • Raft provides ordering & durability. The state machine remains the Rust + SQLite engine, preserving your existing extension.
  • Doing actual DB writes only from FSM.Apply (not pre-apply) prevents divergence.

2) Protobuf: command + control RPCs (Rust ↔ Go)

Create a small .proto describing commands and snapshot endpoints. Minimal example:

syntax = "proto3";
package vectorlite;

message InsertVector {
  string id = 1;
  bytes vector = 2; // float32 bytes (little-endian contiguous)
  string meta_json = 3; // optional metadata for SQL columns
}

message DeleteVector {
  string id = 1;
}

message Command {
  oneof op {
    InsertVector insert = 1;
    DeleteVector delete = 2;
  }
  string table = 10; // virtual table name
}

message ApplyReply {
  bool ok = 1;
  string error = 2;
}

service VectorLiteService {
  // Execute an insert/delete inside a single SQLite transaction.
  rpc ApplyCommand (Command) returns (ApplyReply);

  // Export a consistent snapshot (sqlite db + index archive) as a stream
  // Caller receives a byte stream (chunks).
  rpc ExportSnapshot (google.protobuf.Empty) returns (stream SnapshotChunk);

  // Import a snapshot: caller streams the snapshot bytes, server restores.
  rpc ImportSnapshot (stream SnapshotChunk) returns (ApplyReply);

  // Optional helper to get leader hints (from Rust)
  rpc Ping(google.protobuf.Empty) returns (ApplyReply);
}

message SnapshotChunk {
  bytes data = 1;
}

Implement these RPCs in the Rust process. ApplyCommand must:

  • Begin a SQLite transaction,
  • Perform the SQL insert/delete via your vectorlite extension (which updates both b-tree and HNSW),
  • Commit and return success/failure.

ExportSnapshot should:

  • Use sqlite3_backup or WAL checkpoint to produce a consistent DB copy, and export the serialized index file(s) as a tar/zip stream. ImportSnapshot should:
  • Stop accepting queries (Rust side can coordinate with Go service), write DB + index files atomically, reopen DB.

3) Go control-plane: key pieces

Below are the most important code excerpts. They use github.com/hashicorp/raft and github.com/hashicorp/raft-boltdb (or any other stable store).

Note: these are blueprint snippets — you should adapt imports, error handling, TLS, and config for production.

a) Command serialization helper (protobuf -> bytes)

Assume you compiled the .proto and have Go types.

// cmd_serial.go
import (
    "github.com/golang/protobuf/proto"
    pb "your/proto/package/vectorlite"
)

func EncodeCommand(cmd *pb.Command) ([]byte, error) {
    return proto.Marshal(cmd)
}
func DecodeCommand(b []byte) (*pb.Command, error) {
    var cmd pb.Command
    if err := proto.Unmarshal(b, &cmd); err != nil {
        return nil, err
    }
    return &cmd, nil
}

b) FSM implementation (Apply / Snapshot / Restore)

// fsm.go
package raftnode

import (
    "bytes"
    "io"
    "sync"

    raft "github.com/hashicorp/raft"
    pb "your/proto/package/vectorlite"
    "context"
    "google.golang.org/grpc"
    "time"
)

type FSM struct {
    mu sync.Mutex

    // gRPC client to local Rust service
    rustClient pb.VectorLiteServiceClient
    // local address/ID for logging, etc.
    nodeID string
}

func NewFSM(rustAddr string, nodeID string) (*FSM, error) {
    conn, err := grpc.Dial(rustAddr, grpc.WithInsecure()) // TODO: TLS
    if err != nil { return nil, err }
    client := pb.NewVectorLiteServiceClient(conn)
    return &FSM{rustClient: client, nodeID: nodeID}, nil
}

// Apply is invoked when a log entry is committed
func (f *FSM) Apply(logEntry *raft.Log) interface{} {
    // Deserialize command
    cmd, err := DecodeCommand(logEntry.Data)
    if err != nil {
        return err
    }

    // Call Rust gRPC service to apply inside local SQLite transaction.
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    resp, err := f.rustClient.ApplyCommand(ctx, cmd)
    if err != nil {
        return err
    }
    if !resp.Ok {
        return errors.New(resp.Error)
    }
    return nil
}

// Snapshot - create a snapshot stream by asking Rust to ExportSnapshot
func (f *FSM) Snapshot() (raft.FSMSnapshot, error) {
    return &grpcSnapshot{rustClient: f.rustClient}, nil
}

// Restore - receive snapshot bytes from Raft and ask Rust to ImportSnapshot
func (f *FSM) Restore(rc io.ReadCloser) error {
    defer rc.Close()
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
    defer cancel()

    stream, err := f.rustClient.ImportSnapshot(ctx)
    if err != nil {
        return err
    }

    // stream data from rc into gRPC stream
    buf := make([]byte, 32*1024)
    for {
        n, err := rc.Read(buf)
        if n > 0 {
            chunk := &pb.SnapshotChunk{Data: buf[:n]}
            if err := stream.Send(chunk); err != nil {
                stream.CloseSend()
                return err
            }
        }
        if err == io.EOF {
            break
        }
        if err != nil {
            stream.CloseSend()
            return err
        }
    }
    // finish and receive final ApplyReply
    reply, err := stream.CloseAndRecv()
    if err != nil {
        return err
    }
    if !reply.Ok {
        return fmt.Errorf("import snapshot failed: %s", reply.Error)
    }
    return nil
}

// grpcSnapshot implements raft.FSMSnapshot by streaming snapshot bytes from Rust ExportSnapshot
type grpcSnapshot struct {
    rustClient pb.VectorLiteServiceClient
}

func (s *grpcSnapshot) Persist(sink raft.SnapshotSink) error {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
    defer cancel()
    stream, err := s.rustClient.ExportSnapshot(ctx, &empty.Empty{})
    if err != nil { sink.Cancel(); return err }
    buf := make([]byte, 0)

    for {
        chunk, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            sink.Cancel()
            return err
        }
        if _, werr := sink.Write(chunk.Data); werr != nil {
            sink.Cancel()
            return werr
        }
    }
    if err := sink.Close(); err != nil {
        sink.Cancel()
        return err
    }
    return nil
}

func (s *grpcSnapshot) Release() {}

Notes:

  • Persist streams snapshot bytes received from Rust to Raft snapshot sink. Raft will ship that snapshot to followers when needed.
  • Restore streams bytes from Raft into Rust ImportSnapshot.

c) Starting Raft and wiring FSM

// node.go (abridged)
import (
    "path/filepath"
    "os"
    "time"
    "github.com/hashicorp/raft-boltdb"
    raft "github.com/hashicorp/raft"
)

func StartNode(nodeID, bindAddr, dataDir, rustGrpcAddr string, peers []raft.Server) (*raft.Raft, *FSM, error) {
    config := raft.DefaultConfig()
    config.LocalID = raft.ServerID(nodeID)
    // tune timeouts for your environment:
    config.HeartbeatTimeout = 500 * time.Millisecond
    config.ElectionTimeout = 1500 * time.Millisecond
    config.LeaderLeaseTimeout = 500 * time.Millisecond
    config.CommitTimeout = 50 * time.Millisecond

    // Transport (TCP)
    transport, err := raft.NewTCPTransport(bindAddr, nil, 3, 10*time.Second, os.Stderr)
    if err != nil { return nil, nil, err }

    // Stable store
    boltDBPath := filepath.Join(dataDir, "raft.db")
    boltStore, err := raftboltdb.NewBoltStore(boltDBPath)
    if err != nil { return nil, nil, err }

    // Snapshot store
    snapshotStore, err := raft.NewFileSnapshotStore(dataDir, 2, os.Stderr)
    if err != nil { return nil, nil, err }

    fsm, err := NewFSM(rustGrpcAddr, nodeID)
    if err != nil { return nil, nil, err }

    r, err := raft.NewRaft(config, fsm, boltStore, boltStore, snapshotStore, transport)
    if err != nil { return nil, nil, err }

    // Bootstrap cluster if needed; peers param should include self and others as raft.Server
    hasState, err := raft.HasExistingState(boltStore, boltStore, snapshotStore)
    if err != nil { return nil, nil, err }
    if !hasState {
        configuration := raft.Configuration{Servers: peers}
        f := r.BootstrapCluster(configuration)
        if err := f.Error(); err != nil {
            return nil, nil, err
        }
    }

    return r, fsm, nil
}

d) Client write handling & leader forwarding

  • Client sends write to any Go node via HTTP/gRPC.
  • If the receiving node is leader: encode command and call raft.Apply(cmdBytes, timeout).
  • If not leader: return leader address (via raft.Leader()), or the node can forward the request to the leader automatically.

Example apply path:

// on leader
cmdBytes, _ := EncodeCommand(cmdProto)
future := raftNode.Apply(cmdBytes, 5*time.Second)
if err := future.Error(); err != nil {
    // handle apply error
}

If follower:

leader := raftNode.Leader()
if leader == "" {
    // no leader known; return TryAgain
}
// redirect client to leader's client endpoint

Forwarding variant: follower opens connection to leader and proxies the client request.


4) Snapshot & restore flow (detailed)

Leader snapshot creation:

  • Leader triggers snapshot (periodic or when Raft requests compaction).

  • Leader calls Rust gRPC ExportSnapshot, which:

    • ensures a consistent SQLite backup: use sqlite3_backup API or WAL checkpoint + copy. Ideally flush WAL to get a consistent DB copy.
    • collects index files (HNSW files) and packs them (tar.gz or custom format).
    • streams the archive to Go (ExportSnapshot streaming RPC).
  • Go FSM Persist writes the streamed bytes into Raft snapshot sink (Raft stores snapshot in its snapshot store).

Follower restore:

  • When follower needs to restore snapshot, Raft provides the snapshot bytes to FSM.Restore (as an io.ReadCloser).

  • FSM.Restore streams bytes into Rust gRPC ImportSnapshot, which:

    • stops queries (or applies in safe mode),
    • writes DB file + index files atomically (e.g., write to temp files, fsync, rename),
    • re-opens DB and index, then resumes serving.

Important: ImportSnapshot must ensure it sets node state such that subsequent log entries after the snapshot are applied correctly.


5) Operational notes & production considerations

  • Atomicity: Ensure Rust ApplyCommand executes SQL + index update in a single SQLite transaction. The vectorlite extension should update HNSW while the transaction is active so both DB and index are consistent together.
  • Idempotence: Raft guarantees at-most-once application for a given log index, but if your ApplyCommand logic can be retried (on restore/fail), make operations idempotent or use rowids/unique constraints.
  • Command size: Avoid huge raft entries. If vectors are large (3KB each), consider storing vector payloads in local DB via Rust but passing only references in the Raft command — however phase 1 can accept full vector bytes if throughput is moderate. For 10M vectors, snapshots handle bulk transfer.
  • Time budgets/timeouts: Apply should timeout after a reasonable period. Keep raft.Apply timeout slightly longer than Rust gRPC command timeout.
  • Leader redirection: For performance/operational simplicity, have followers return leader hint; clients then POST to leader.
  • TLS & auth: Secure Raft transport and gRPC with TLS and mutual auth.
  • Monitoring: export raft metrics (term, leader, commitIndex, lastApplied), snapshot durations, Rust export/import durations, and apply latencies.
  • Testing: Integration tests: single-node apply, 3-node commit and failover (kill leader), snapshot restore, concurrent writes, partition tests.

6) Testing checklist (Phase 1)

  1. Basic correctness

    • Start 3 nodes local. Insert 1000 vectors via leader; verify followers eventually return same search results.
  2. Leader failover

    • Kill leader mid-ingest. Ensure a new leader is elected and subsequent writes continue. Reintroduce old node; ensure it catches up.
  3. Snapshot/restore

    • Trigger snapshot on leader; force follower to restore snapshot; validate search parity.
  4. Network partition

    • Partition cluster into 2 vs 1; verify majority side continues, single node cannot commit.
  5. Performance

    • Measure write latency (client → leader apply → commit acknowledged). Measure search latency on followers and leader.

7) Example minimal file list to build (starter repo skeleton)

  • proto/vectorlite.proto → compile to Go + Rust.

  • rust/vectorlite_service/ → Rust gRPC server: ApplyCommand, ExportSnapshot, ImportSnapshot. Uses your existing vectorlite & SQLite extension.

  • go/raftnode/:

    • cmd_serial.go, fsm.go, node.go, server.go (client-facing gRPC/HTTP handlers + leader detection/forwarding).
  • deploy/ → docker-compose with 3 nodes (Rust + Go processes on each node).

  • tests/ → python/go test harness for failover and snapshot tests.


Quick checklist & recommended immediate tasks (what to implement first)

  1. Rust: implement ApplyCommand and ExportSnapshot/ImportSnapshot gRPC stubs. ExportSnapshot must be able to create a consistent archive quickly.
  2. Go: implement FSM that calls ApplyCommand. Start a local raft cluster (3 nodes) and test a simple Insert flow end-to-end.
  3. Test: run insert → restart follower → verify it restores snapshot and continues.

If you’d like, I’ll produce one of the following next (pick one) and implement it immediately here:

A) Full Go FSM + Raft node example (complete go files you can run locally against a stub Rust gRPC server). B) Rust gRPC server stubs (proto + Rust service skeleton showing how to apply commands to SQLite and export/import snapshots). C) A small end-to-end prototype where both sides are implemented (Go + Rust stubs) so you can run 3 nodes locally and test Raft applies & snapshots.

Tell me which piece you want me to generate now and I’ll produce the code (A, B, or C). If you want me to pick, I’ll start with A: the Go Raft FSM + node (it’s the fastest to validate Phase 1 and you can plug it into your existing Rust service).