|
23 | 23 | from std/algorithm import sort |
24 | 24 | from std/sequtils import toSeq |
25 | 25 | from stew/staticfor import staticFor |
26 | | -from system/ansi_c import c_malloc, c_free |
27 | 26 |
|
28 | 27 | type |
29 | 28 | CellBytes = array[fulu.CELLS_PER_EXT_BLOB, Cell] |
@@ -154,156 +153,56 @@ proc recover_cells_and_proofs_parallel*( |
154 | 153 | tp: Taskpool, |
155 | 154 | dataColumns: seq[ref fulu.DataColumnSidecar]): |
156 | 155 | Result[seq[CellsAndProofs], cstring] = |
157 | | - ## Recover blobs from data column sidecars in parallel. |
158 | | - ## - Uses unmanaged C buffers for worker inputs so no Nim GC objects |
159 | | - ## - Bounds in-flight tasks to limit peak memory/alloc pressure. |
160 | | - ## - Ensures all spawned tasks are awaited (drained) on any early return. |
161 | | - |
| 156 | + ## This helper recovers blobs from the data column sidecars parallelly |
162 | 157 | if dataColumns.len == 0: |
163 | 158 | return err("DataColumnSidecar: Length should not be 0") |
164 | | - if dataColumns.len > NUMBER_OF_COLUMNS: |
165 | | - return err("DataColumnSidecar: Length exceeds NUMBER_OF_COLUMNS") |
166 | 159 |
|
167 | 160 | let |
168 | 161 | columnCount = dataColumns.len |
169 | 162 | blobCount = dataColumns[0].column.len |
170 | 163 |
|
171 | 164 | for column in dataColumns: |
172 | | - if blobCount != column.column.len: |
| 165 | + if not (blobCount == column.column.len): |
173 | 166 | return err("DataColumns do not have the same length") |
174 | 167 |
|
175 | | - # Worker that runs on a taskpool thread. It only sees raw pointers and |
176 | | - # constructs its own worker-local seqs (on the worker's heap) before calling |
177 | | - # the KZG recovery routine. Keeps GC objects thread-local. |
178 | | - proc workerRecover(idxPtr: ptr CellIndex, cellsPtr: ptr Cell, |
179 | | - columnCount: int): Result[CellsAndProofs, void] = |
180 | | - ## Worker runs on a taskpool thread. It receives raw C buffers (ptr) and |
181 | | - ## converts them into worker-local seqs before calling the KZG recovery |
182 | | - ## routine, so no Nim GC objects cross thread-local heaps. |
183 | | - var |
184 | | - localIndices = newSeq[CellIndex](columnCount) |
185 | | - localCells = newSeq[Cell](columnCount) |
186 | | - let |
187 | | - idxArr = cast[ptr UncheckedArray[CellIndex]](idxPtr) |
188 | | - cellsArr = cast[ptr UncheckedArray[Cell]](cellsPtr) |
189 | | - for j in 0 ..< columnCount: |
190 | | - localIndices[j] = idxArr[j] |
191 | | - localCells[j] = cellsArr[j] |
192 | | - # use the task wrapper which maps string errors to void |
193 | | - recoverCellsAndKzgProofsTask(localIndices, localCells) |
194 | | - |
195 | 168 | var |
196 | | - pendingFuts: seq[Flowvar[Result[CellsAndProofs, void]]] = @[] |
197 | | - pendingIdxPtrs: seq[ptr CellIndex] = @[] |
198 | | - pendingCellsPtrs: seq[ptr Cell] = @[] |
| 169 | + pendingFuts: seq[Flowvar[Result[CellsAndProofs, void]]] |
199 | 170 | res = newSeq[CellsAndProofs](blobCount) |
200 | 171 |
|
201 | | - # pre-size sequences so we can index-assign without reallocs |
202 | | - pendingFuts.setLen(blobCount) |
203 | | - pendingIdxPtrs.setLen(blobCount) |
204 | | - pendingCellsPtrs.setLen(blobCount) |
205 | | - |
206 | | - # track how many we've actually spawned |
207 | | - var spawned = 0 |
208 | | - |
209 | | - # Choose a sane limit for concurrent tasks to reduce peak memory/alloc pressure. |
210 | | - let maxInFlight = min(blobCount, 9) |
211 | | - |
212 | 172 | let startTime = Moment.now() |
213 | 173 | const reconstructionTimeout = 2.seconds |
214 | 174 |
|
215 | | - proc freePendingPtrPair(idxPtr: ptr CellIndex, cellsPtr: ptr Cell) = |
216 | | - c_free(idxPtr) |
217 | | - c_free(cellsPtr) |
218 | | - |
219 | | - proc drainPending(startIdx: int) = |
220 | | - for j in startIdx ..< spawned: |
221 | | - discard sync pendingFuts[j] |
222 | | - freePendingPtrPair(pendingIdxPtrs[j], pendingCellsPtrs[j]) |
223 | | - pendingIdxPtrs[j] = nil |
224 | | - pendingCellsPtrs[j] = nil |
225 | | - |
226 | | - var completed = 0 |
227 | | - |
228 | | - # ---- Spawn + bounded-await loop ---- |
| 175 | + # ---- Spawn phase with time limit ---- |
229 | 176 | for blobIdx in 0 ..< blobCount: |
230 | 177 | let now = Moment.now() |
231 | 178 | if (now - startTime) > reconstructionTimeout: |
232 | | - trace "PeerDAS reconstruction timed out while preparing columns", |
233 | | - spawned = spawned, total = blobCount |
234 | | - drainPending(0) |
235 | | - return err("Data column reconstruction timed out") |
| 179 | + debug "PeerDAS reconstruction timed out while preparing columns", |
| 180 | + spawned = pendingFuts.len, total = blobCount |
| 181 | + break # Stop spawning new tasks |
236 | 182 |
|
237 | | - # Allocate unmanaged C buffers and copy data into them |
238 | | - let |
239 | | - idxBytes = csize_t(columnCount) * csize_t(sizeof(CellIndex)) |
240 | | - cellsBytes = csize_t(columnCount) * csize_t(sizeof(Cell)) |
241 | | - idxPtr = cast[ptr CellIndex](c_malloc(idxBytes)) |
242 | | - if idxPtr == nil: |
243 | | - drainPending(0) |
244 | | - return err("Failed to allocate memory for cell indices during reconstruction") |
245 | | - let cellsPtr = cast[ptr Cell](c_malloc(cellsBytes)) |
246 | | - if cellsPtr == nil: |
247 | | - c_free(idxPtr) |
248 | | - drainPending(0) |
249 | | - return err("Failed to allocate memory for cell data during reconstruction") |
250 | | - |
251 | | - # populate C buffers via UncheckedArray casts |
252 | | - let |
253 | | - idxArr = cast[ptr UncheckedArray[CellIndex]](idxPtr) |
254 | | - cellsArr = cast[ptr UncheckedArray[Cell]](cellsPtr) |
| 183 | + var |
| 184 | + cellIndices = newSeq[CellIndex](columnCount) |
| 185 | + cells = newSeq[Cell](columnCount) |
255 | 186 | for i in 0 ..< dataColumns.len: |
256 | | - idxArr[i] = dataColumns[i][].index |
257 | | - cellsArr[i] = dataColumns[i][].column[blobIdx] |
258 | | - |
259 | | - # store into pre-sized arrays by index and spawn worker |
260 | | - pendingIdxPtrs[spawned] = idxPtr |
261 | | - pendingCellsPtrs[spawned] = cellsPtr |
262 | | - pendingFuts[spawned] = tp.spawn workerRecover(idxPtr, cellsPtr, columnCount) |
263 | | - inc spawned |
264 | | - |
265 | | - # If too many in-flight tasks, await the oldest one |
266 | | - while spawned - completed >= maxInFlight: |
267 | | - let now2 = Moment.now() |
268 | | - if (now2 - startTime) > reconstructionTimeout: |
269 | | - trace "PeerDAS reconstruction timed out while awaiting tasks", |
270 | | - completed = completed, totalSpawned = spawned |
271 | | - drainPending(completed) |
272 | | - return err("Data column reconstruction timed out") |
273 | | - |
274 | | - let futRes = sync pendingFuts[completed] |
275 | | - freePendingPtrPair(pendingIdxPtrs[completed], pendingCellsPtrs[completed]) |
276 | | - pendingIdxPtrs[completed] = nil |
277 | | - pendingCellsPtrs[completed] = nil |
278 | | - |
279 | | - if futRes.isErr: |
280 | | - drainPending(completed + 1) |
281 | | - return err("KZG cells and proofs recovery failed") |
282 | | - res[completed] = futRes.get |
283 | | - inc completed |
284 | | - |
285 | | - # ---- Wait for remaining spawned tasks ---- |
286 | | - for i in completed ..< spawned: |
| 187 | + cellIndices[i] = dataColumns[i][].index |
| 188 | + cells[i] = dataColumns[i][].column[blobIdx] |
| 189 | + pendingFuts.add(tp.spawn recoverCellsAndKzgProofsTask(cellIndices, cells)) |
| 190 | + |
| 191 | + # ---- Sync phase ---- |
| 192 | + for i in 0 ..< pendingFuts.len: |
287 | 193 | let now = Moment.now() |
288 | 194 | if (now - startTime) > reconstructionTimeout: |
289 | | - trace "PeerDAS reconstruction timed out during final sync", |
290 | | - completed = i, totalSpawned = spawned |
291 | | - drainPending(i) |
| 195 | + debug "PeerDAS reconstruction timed out", |
| 196 | + completed = i, totalSpawned = pendingFuts.len |
292 | 197 | return err("Data column reconstruction timed out") |
293 | 198 |
|
294 | 199 | let futRes = sync pendingFuts[i] |
295 | | - freePendingPtrPair(pendingIdxPtrs[i], pendingCellsPtrs[i]) |
296 | | - pendingIdxPtrs[i] = nil |
297 | | - pendingCellsPtrs[i] = nil |
298 | | - |
299 | 200 | if futRes.isErr: |
300 | | - drainPending(i + 1) |
301 | 201 | return err("KZG cells and proofs recovery failed") |
| 202 | + |
302 | 203 | res[i] = futRes.get |
303 | 204 |
|
304 | | - # If we spawned fewer than blobCount, spawn-phase timed out earlier |
305 | | - if spawned < blobCount: |
306 | | - drainPending(0) |
| 205 | + if pendingFuts.len < blobCount: |
307 | 206 | return err("Data column reconstruction timed out") |
308 | 207 |
|
309 | 208 | ok(res) |
|
0 commit comments