Skip to content

Commit 309d103

Browse files
committed
update low request accpet
1 parent fb20b38 commit 309d103

2 files changed

Lines changed: 35 additions & 30 deletions

File tree

src/zinc/engine.zig

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ pub const Worker = struct {
158158
while (self.write_queue.pop()) |write_op| {
159159
allocator.free(write_op.data);
160160
}
161-
161+
162162
// Free any remaining queued requests
163163
while (self.request_queue.pop()) |ctx| {
164164
allocator.free(ctx.data);
@@ -230,7 +230,6 @@ workers: []Worker = undefined,
230230
worker_threads: ?[]std.Thread = null, // null if threads haven't been started yet
231231
threads_started: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
232232

233-
234233
/// Create a new engine.
235234
fn create(conf: Config.Engine) anyerror!*Engine {
236235
const allocator = conf.allocator;
@@ -317,28 +316,25 @@ fn create(conf: Config.Engine) anyerror!*Engine {
317316
// Calculate reasonable buffer counts to avoid excessive memory usage
318317
// For high thread counts, use smaller initial pools that can grow as needed
319318
const estimated_conns_per_thread = @max(engine.max_conn / conf.num_threads, 100);
320-
319+
321320
// Calculate initial read buffers per thread
322321
const initial_read_buffers = if (conf.initial_read_buffers_per_thread) |count|
323322
count
324323
else
325324
@min(estimated_conns_per_thread, conf.max_initial_read_buffers);
326325
const max_read_buffers = initial_read_buffers * conf.max_read_buffers_multiplier;
327-
326+
328327
worker.read_buffer_pool = try BufferPool.init(
329328
allocator,
330329
conf.read_buffer_len,
331330
initial_read_buffers,
332331
max_read_buffers,
333332
);
334-
333+
335334
// Write buffer pool: calculate based on read buffers ratio
336-
const initial_write_buffers = @min(
337-
@as(usize, @intFromFloat(@as(f64, @floatFromInt(initial_read_buffers)) * conf.initial_write_buffers_ratio)),
338-
conf.max_initial_write_buffers
339-
);
335+
const initial_write_buffers = @min(@as(usize, @intFromFloat(@as(f64, @floatFromInt(initial_read_buffers)) * conf.initial_write_buffers_ratio)), conf.max_initial_write_buffers);
340336
const max_write_buffers = initial_write_buffers * conf.max_write_buffers_multiplier;
341-
337+
342338
worker.write_buffer_pool = try BufferPool.init(
343339
allocator,
344340
conf.write_buffer_size,
@@ -472,11 +468,15 @@ fn workerThread(worker: *Worker) void {
472468
startWriteWorker(worker, write_op.connection, write_op.data) catch |write_err| {
473469
std.log.err("Failed to start queued write: {}", .{write_err});
474470
engine.allocator.free(write_op.data);
471+
engine.allocator.destroy(write_op);
472+
continue;
475473
};
474+
// Free WriteOp structure after queuing the write operation
475+
// The data buffer will be freed in handleWriteCompletionWorker after write completes
476+
engine.allocator.destroy(write_op);
476477
writes_processed += 1;
477478
}
478-
479-
479+
480480
// Process request queue in batches to reduce Thread.Pool spawn overhead
481481
// Process ALL available requests immediately, even if less than batch size
482482
// This ensures requests don't wait for batch to fill
@@ -486,12 +486,12 @@ fn workerThread(worker: *Worker) void {
486486
std.log.err("Failed to submit request batch: {}", .{err});
487487
};
488488
}
489-
489+
490490
// Run AIO event loop with balanced timeout
491491
// Use reasonable timeout to balance latency and CPU usage
492492
// Process writes immediately, but don't spin CPU with very short timeouts
493493
const timeout = engine.event_wait_timeout_ns;
494-
494+
495495
worker.aio_io.run_for_ns(timeout) catch |err| {
496496
if (err != error.TimeoutTooBig and !engine.stopping.isSet()) {
497497
std.log.err("AIO run_for_ns error: {}", .{err});
@@ -561,7 +561,7 @@ pub fn startWrite(self: *Engine, connection: *Connection, data: []const u8) !voi
561561
}
562562
return error.ConnectionNotFound;
563563
};
564-
564+
565565
return queueWriteToWorker(worker, connection, data);
566566
}
567567

@@ -575,7 +575,7 @@ fn queueWriteToWorker(worker: *Worker, connection: *Connection, data: []const u8
575575
return err; // Fast failure path
576576
};
577577
@memcpy(data_copy, data);
578-
578+
579579
// Create write operation - use single allocation for WriteOp
580580
const write_op = worker.engine.allocator.create(WriteOp) catch |err| {
581581
worker.engine.allocator.free(data_copy); // Cleanup on failure
@@ -585,7 +585,7 @@ fn queueWriteToWorker(worker: *Worker, connection: *Connection, data: []const u8
585585
.connection = connection,
586586
.data = data_copy,
587587
};
588-
588+
589589
// Queue write operation (worker thread will process it in event loop)
590590
// QueueType.push is lock-free and very fast
591591
worker.write_queue.push(write_op);
@@ -771,7 +771,7 @@ fn handleReadCompletionWorker(worker: *Worker, connection: *Connection, data: []
771771
// Worker thread will batch requests and submit them together
772772
worker.request_queue.push(ctx);
773773
const queue_size = worker.request_queue_size.fetchAdd(1, .monotonic) + 1;
774-
774+
775775
// Submit batch if queue reaches batch size OR if this is the first request in queue
776776
// This ensures requests are processed immediately even if batch doesn't fill
777777
const batch_size = 20;
@@ -794,7 +794,7 @@ fn submitRequestBatch(worker: *Worker, engine: *Engine) !void {
794794
const batch_size = 20; // Optimal batch size for reducing spawn overhead
795795
var batch: [batch_size]*RequestContext = undefined;
796796
var batch_count: usize = 0;
797-
797+
798798
// Collect up to batch_size requests, but process whatever is available
799799
// This ensures requests don't wait for batch to fill
800800
while (batch_count < batch_size) {
@@ -806,7 +806,7 @@ fn submitRequestBatch(worker: *Worker, engine: *Engine) !void {
806806
break; // No more requests
807807
}
808808
}
809-
809+
810810
// Submit batch if we have requests (even if less than batch_size)
811811
// This ensures requests are processed immediately, not waiting for batch to fill
812812
if (batch_count > 0) {
@@ -842,12 +842,12 @@ fn processRequestBatch(batch: []*RequestContext) void {
842842
if (batch.len == 0) {
843843
return;
844844
}
845-
845+
846846
// Process all requests in batch
847847
for (batch) |ctx| {
848848
processRequestAsync(ctx);
849849
}
850-
850+
851851
// Free batch array
852852
const engine = batch[0].worker.engine;
853853
engine.allocator.free(batch);
@@ -909,7 +909,7 @@ fn closeConnectionWorker(worker: *Worker, connection: *Connection) void {
909909
const engine = worker.engine;
910910
if (connection.fd != -1) {
911911
const fd = connection.fd;
912-
912+
913913
// Close socket first - this will cancel any pending operations
914914
// Note: close_socket should handle cleanup of pending operations
915915
worker.aio_io.close_socket(fd);
@@ -1123,7 +1123,7 @@ fn readCallbackWorker(
11231123
std.log.err("readCallbackWorker: completion operation is not recv", .{});
11241124
return;
11251125
}
1126-
1126+
11271127
const bytes_read = result catch {
11281128
// Handle read error
11291129
const connection = getConnectionWorker(worker, completion.operation.recv.socket);
@@ -1174,15 +1174,15 @@ fn writeCallbackWorker(
11741174
std.log.err("writeCallbackWorker: completion operation is not send", .{});
11751175
return;
11761176
}
1177-
1177+
11781178
const bytes_written = result catch {
11791179
// Handle write error
11801180
const connection = getConnectionWorker(worker, completion.operation.send.socket);
11811181
if (connection) |conn| {
11821182
if (conn.write_buffer) |buffer| {
1183-
// Return buffer to pool (release always succeeds)
1184-
// If buffer is not from pool, it will be freed by the pool's release logic
1185-
worker.write_buffer_pool.release(buffer);
1183+
// Free buffer allocated in queueWriteToWorker (not from pool)
1184+
// Buffer was allocated with engine.allocator, not from write_buffer_pool
1185+
worker.engine.allocator.free(buffer);
11861186
conn.write_buffer = null;
11871187
}
11881188
closeConnectionWorker(worker, conn);

src/zinc/response.zig

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -263,18 +263,23 @@ fn sendAsync(self: *Self, content: []const u8, options: RespondOptions, engine_p
263263
// and BufferPool is not thread-safe (designed for single-threaded event loop)
264264
const response_data = try engine.allocator.alloc(u8, total_size);
265265
errdefer engine.allocator.free(response_data);
266-
266+
267267
@memcpy(response_data[0..header_size], h.items);
268268
if (body_size > 0) {
269269
@memcpy(response_data[header_size..][0..body_size], content);
270270
}
271271

272272
// Use AIO to send the response asynchronously
273-
// Buffer will be freed in handleWriteCompletionWorker after write completes
273+
// queueWriteToWorker will copy the data, so we can free the original buffer here
274+
// The copied buffer will be freed in handleWriteCompletionWorker after write completes
274275
engine.startWrite(connection, response_data) catch |err| {
275276
engine.allocator.free(response_data);
276277
return err;
277278
};
279+
280+
// Free the original buffer since queueWriteToWorker has copied it
281+
// The copied buffer will be managed by the write queue and freed after completion
282+
engine.allocator.free(response_data);
278283
}
279284

280285
pub fn write(self: *Self, content: []const u8, options: RespondOptions) anyerror!void {

0 commit comments

Comments
 (0)