77
88{.push raises : [].}
99
10- import std/ [deques, heapqueue, tables, strutils, sequtils, math, typetraits]
10+ import std/ [
11+ deques, heapqueue, sets, tables, strutils, sequtils, math, typetraits]
1112import stew/ base10, chronos, chronicles, results
1213import
1314 ../ spec/ [helpers, forks, column_map],
5354
5455 ColumnCompleteness * = object
5556 map: ColumnMap
57+ keys: HashSet [string ]
5658 done: bool
5759
5860 SyncRequest * [T] = object
@@ -334,6 +336,7 @@ func isComplete[M, N](
334336 peer: M,
335337 criteria: var N
336338): bool =
339+ mixin getKey
337340 if criteria.done:
338341 return true
339342 when N is BlockCompleteness :
@@ -343,6 +346,10 @@ func isComplete[M, N](
343346 # If criteria's map is empty, it means that we do not need columns for
344347 # the range.
345348 return true
349+ if $ (peer.getKey ()) in criteria.keys:
350+ # Columns was already downloaded from this peer for the range, there is no
351+ # reason to request it one more time.
352+ return true
346353 # If the peer has columns that we are still missing, we should return
347354 # `false`, so that peer will get request for that range, but if the peer
348355 # does not have the columns we need, we should return `true`.
@@ -377,13 +384,20 @@ func fillCompleteness[M](
377384 peer: M,
378385 missingMap: Opt [ColumnMap ],
379386 done: bool ,
387+ storePeer: bool ,
380388 criteria: var ColumnCompleteness
381389) =
390+ mixin getKey
391+
382392 if done:
383393 criteria.done = true
384394 criteria.map = ColumnMap ()
385395 return
386396
397+ if storePeer:
398+ let key = peer.getKey ()
399+ criteria.keys.incl ($ key)
400+
387401 if missingMap.isSome ():
388402 criteria.map = missingMap.get ()
389403
@@ -1028,7 +1042,7 @@ proc push*[M, N](sq: SyncQueue[M, N], requests: openArray[SyncRequest[M]]) =
10281042 elif N is ColumnCompleteness :
10291043 sq.fillCompleteness (
10301044 sq.requests[pos.qindex].data, request.item, Opt .none (ColumnMap ),
1031- done = false , sq.requests[pos.qindex].completeness)
1045+ done = false , storePeer = false , sq.requests[pos.qindex].completeness)
10321046 sq.del (pos)
10331047
10341048proc push * [M, N](sq: SyncQueue [M, N], sr: SyncRequest [M]) =
@@ -1141,7 +1155,7 @@ proc push*[M, N](
11411155 return SyncPushResponse (
11421156 code: SyncProcessError .NoRelevant , count: 0 'i64 )
11431157
1144- template fillCompleteness (pdone, pblck: untyped ) =
1158+ template fillCompleteness (pdone, pblck, pstore : untyped ) =
11451159 when N is BlockCompleteness :
11461160 sq.fillCompleteness (
11471161 sq.requests[position.qindex].data, sr.item, done = pdone,
@@ -1151,11 +1165,13 @@ proc push*[M, N](
11511165 let map = sq.getMissingMap (data, pblck.get ().root)
11521166 sq.fillCompleteness (
11531167 sq.requests[position.qindex].data, sr.item, Opt .some (map),
1154- done = pdone, sq.requests[position.qindex].completeness)
1168+ done = pdone, storePeer = pstore,
1169+ sq.requests[position.qindex].completeness)
11551170 else :
11561171 sq.fillCompleteness (
11571172 sq.requests[position.qindex].data, sr.item, Opt .none (ColumnMap ),
1158- done = pdone, sq.requests[position.qindex].completeness)
1173+ done = pdone, storePeer = pstore,
1174+ sq.requests[position.qindex].completeness)
11591175
11601176 # This is backpressure handling algorithm, this algorithm is blocking
11611177 # all pending `push` requests if `request` is not in range.
@@ -1234,18 +1250,18 @@ proc push*[M, N](
12341250 # peers returns empty response for the same range.
12351251 if sq.requests[position.qindex].voidsCount >= sq.requestsCount:
12361252 when N is BlockCompleteness :
1237- fillCompleteness (true , Opt .none (BlockId ))
1253+ fillCompleteness (true , Opt .none (BlockId ), false )
12381254 sq.advanceQueue (res)
12391255 elif N is ColumnCompleteness :
12401256 let localMap = sq.cbGetLocalColumnMap ()
12411257 # If completeness map was changed it proves that specific range is
12421258 # not actually empty and we should not move forward.
12431259 if sq.requests[position.qindex].completeness.map != localMap:
1244- fillCompleteness (false , Opt .none (BlockId ))
1260+ fillCompleteness (false , Opt .none (BlockId ), false )
12451261 else :
1246- fillCompleteness (true , Opt .none (BlockId ))
1262+ fillCompleteness (true , Opt .none (BlockId ), false )
12471263 else :
1248- fillCompleteness (false , Opt .none (BlockId ))
1264+ fillCompleteness (false , Opt .none (BlockId ), false )
12491265
12501266 of SyncProcessError .Duplicate :
12511267 # Duplicate responses does not affect failures count
@@ -1260,7 +1276,7 @@ proc push*[M, N](
12601276 topics = " sync"
12611277
12621278 sq.gapList.reset ()
1263- fillCompleteness (true , Opt .none (BlockId ))
1279+ fillCompleteness (true , Opt .none (BlockId ), false )
12641280 sq.advanceQueue (res)
12651281
12661282 of SyncProcessError .MissingSidecars :
@@ -1275,7 +1291,7 @@ proc push*[M, N](
12751291 topics = " sync"
12761292
12771293 inc (sq.requests[position.qindex].failuresCount)
1278- fillCompleteness (false , pres.blck)
1294+ fillCompleteness (false , pres.blck, true )
12791295 sq.del (position)
12801296 res = 0 'i64
12811297
@@ -1292,7 +1308,7 @@ proc push*[M, N](
12921308 topics = " sync"
12931309
12941310 inc (sq.requests[position.qindex].failuresCount)
1295- fillCompleteness (false , pres.blck)
1311+ fillCompleteness (false , pres.blck, false )
12961312 sq.del (position)
12971313 res = 0 'i64
12981314
@@ -1310,7 +1326,7 @@ proc push*[M, N](
13101326
13111327 sr.item.updateScore (PeerScoreUnviableFork )
13121328 inc (sq.requests[position.qindex].failuresCount)
1313- fillCompleteness (false , pres.blck)
1329+ fillCompleteness (false , pres.blck, false )
13141330 sq.del (position)
13151331 res = 0 'i64
13161332
@@ -1331,7 +1347,7 @@ proc push*[M, N](
13311347 sq.rewardForGaps (PeerScoreMissingValues )
13321348 sq.gapList.reset ()
13331349 inc (sq.requests[position.qindex].failuresCount)
1334- fillCompleteness (false , pres.blck)
1350+ fillCompleteness (false , pres.blck, false )
13351351 sq.del (position)
13361352 res = 0 'i64
13371353
@@ -1351,7 +1367,7 @@ proc push*[M, N](
13511367 topics = " sync"
13521368
13531369 sr.item.updateScore (PeerScoreMissingValues )
1354- fillCompleteness (false , pres.blck)
1370+ fillCompleteness (false , pres.blck, false )
13551371 sq.del (position)
13561372 res = 0 'i64
13571373
@@ -1364,7 +1380,7 @@ proc push*[M, N](
13641380 if sr.hasEndGap (data):
13651381 sq.gapList.add (GapItem .init (sr))
13661382
1367- fillCompleteness (true , Opt .none (BlockId ))
1383+ fillCompleteness (true , Opt .none (BlockId ), false )
13681384 sq.advanceQueue (res)
13691385 of SyncProcessError .NoRelevant :
13701386 raiseAssert " Processor should not return this error code"
@@ -1392,7 +1408,7 @@ proc push*[M, N](
13921408 except CancelledError as exc:
13931409 let pos = sq.find (sr)
13941410 if pos.isSome ():
1395- fillCompleteness (false , Opt .none (BlockId ))
1411+ fillCompleteness (false , Opt .none (BlockId ), false )
13961412 sq.del (pos.get ())
13971413 raise exc
13981414 finally :
0 commit comments