Skip to content

Commit fcca8b8

Browse files
authored
Configurable f3 nodes (#540)
* feat: update downstream parsers to use a list of nodes * feat: update actors that use cache to use a list of nodes * feat: update actors cache and helper to use list of nodes * feat: update factory and v1 and v2 parsers to use a list of nodes * test: update tests * test: update mock expectations * fix: retain cache setup name * fix: use context to access nodes * fix: use context to access nodes * fix: node loop logic * feat: pass nodes to downstream parsers
1 parent d28af55 commit fcca8b8

37 files changed

Lines changed: 462 additions & 316 deletions

actors/address.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package actors
22

33
import (
4+
"context"
45
"fmt"
56

67
"github.com/filecoin-project/go-address"
@@ -13,7 +14,7 @@ import (
1314
// if the address is a zero address account actor, it returns the robust address of the zero address account actor
1415
// if the address is already a robust address, it returns the address
1516
// if the address is f2 evm, we consolidate f2 -> f0 -> f4
16-
func ConsolidateToRobustAddress(addr address.Address, h *helper.Helper, logger *logger.Logger, bestEffort bool, canonical bool) (string, error) {
17+
func ConsolidateToRobustAddress(ctx context.Context, addr address.Address, h *helper.Helper, logger *logger.Logger, bestEffort bool, canonical bool) (string, error) {
1718
actorCache := h.GetActorsCache()
1819
if ok, _, _ := h.IsZeroAddressAccountActor(addr); ok {
1920
return helper.ZeroAddressAccountActorRobust, nil
@@ -22,18 +23,18 @@ func ConsolidateToRobustAddress(addr address.Address, h *helper.Helper, logger *
2223
if isRobust, _ := common.IsRobustAddress(addr); isRobust {
2324
// we need to handle cases where a f2 address for evm actors is used
2425
// f2 -> f0 -> f4, as we want to consolidate the address to f4 style
25-
shortAddressStr, err := actorCache.GetShortAddress(addr, canonical)
26+
shortAddressStr, err := actorCache.GetShortAddress(ctx, addr, canonical)
2627
if err == nil {
2728
shortAddress, _ := address.NewFromString(shortAddressStr)
28-
addrStr, err := actorCache.GetRobustAddress(shortAddress, canonical)
29+
addrStr, err := actorCache.GetRobustAddress(ctx, shortAddress, canonical)
2930
if err == nil {
3031
addr, _ = address.NewFromString(addrStr)
3132
}
3233
}
3334
return addr.String(), nil
3435
}
3536

36-
robustAddress, err := actorCache.GetRobustAddress(addr, canonical)
37+
robustAddress, err := actorCache.GetRobustAddress(ctx, addr, canonical)
3738
if err != nil && !bestEffort {
3839
logger.Warnf("Error converting address %s to robust format: %v", addr, err)
3940
return "", fmt.Errorf("error converting address to robust format: %v", err) // Fallback

actors/cache/actors_cache.go

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,10 @@ func (a *ActorsCache) ClearBadAddressCache() {
126126
a.badAddress.Clear()
127127
}
128128

129-
func (a *ActorsCache) GetActorCode(add address.Address, key filTypes.TipSetKey, onChainOnly, canonical bool) (string, error) {
129+
func (a *ActorsCache) GetActorCode(ctx context.Context, add address.Address, key filTypes.TipSetKey, onChainOnly, canonical bool) (string, error) {
130130
addStr := add.String()
131131

132-
store, actorCode, err := a.getActorCode(add, key, onChainOnly, canonical)
132+
store, actorCode, err := a.getActorCode(ctx, add, key, onChainOnly, canonical)
133133
if err != nil {
134134
a.logger.Errorf("[ActorsCache] - Unable to retrieve actor code from node: %s", err.Error())
135135
if strings.Contains(err.Error(), "actor not found") {
@@ -144,7 +144,7 @@ func (a *ActorsCache) GetActorCode(add address.Address, key filTypes.TipSetKey,
144144
}
145145

146146
// Code is not cached, store it
147-
err = a.storeActorCode(add, types.AddressInfo{
147+
err = a.storeActorCode(ctx, add, types.AddressInfo{
148148
ActorCid: actorCode,
149149
IsCanonical: canonical,
150150
})
@@ -157,8 +157,8 @@ func (a *ActorsCache) GetActorCode(add address.Address, key filTypes.TipSetKey,
157157
return actorCode, nil
158158
}
159159

160-
func (a *ActorsCache) GetRobustAddress(add address.Address, canonical bool) (string, error) {
161-
store, robust, err := a.getRobustAddress(add, canonical)
160+
func (a *ActorsCache) GetRobustAddress(ctx context.Context, add address.Address, canonical bool) (string, error) {
161+
store, robust, err := a.getRobustAddress(ctx, add, canonical)
162162
if err != nil {
163163
return "", err
164164
}
@@ -168,7 +168,7 @@ func (a *ActorsCache) GetRobustAddress(add address.Address, canonical bool) (str
168168
}
169169

170170
// Robust address is not cached, store it
171-
err = a.storeRobustAddress(add, types.AddressInfo{
171+
err = a.storeRobustAddress(ctx, add, types.AddressInfo{
172172
Robust: robust,
173173
IsCanonical: canonical,
174174
})
@@ -181,8 +181,8 @@ func (a *ActorsCache) GetRobustAddress(add address.Address, canonical bool) (str
181181
return robust, nil
182182
}
183183

184-
func (a *ActorsCache) GetShortAddress(add address.Address, canonical bool) (string, error) {
185-
store, short, err := a.getShortAddress(add, canonical)
184+
func (a *ActorsCache) GetShortAddress(ctx context.Context, add address.Address, canonical bool) (string, error) {
185+
store, short, err := a.getShortAddress(ctx, add, canonical)
186186
if err != nil {
187187
return "", err
188188
}
@@ -191,7 +191,7 @@ func (a *ActorsCache) GetShortAddress(add address.Address, canonical bool) (stri
191191
return short, nil
192192
}
193193
// Robust address is not cached, store it
194-
err = a.storeShortAddress(add, types.AddressInfo{
194+
err = a.storeShortAddress(ctx, add, types.AddressInfo{
195195
Short: short,
196196
IsCanonical: canonical,
197197
})
@@ -242,10 +242,10 @@ func (a *ActorsCache) getEVMSelectorSig(ctx context.Context, selectorID string,
242242
return "", err
243243
}
244244

245-
func (a *ActorsCache) getShortAddress(add address.Address, canonical bool) (store bool, shortAddress string, err error) {
245+
func (a *ActorsCache) getShortAddress(ctx context.Context, add address.Address, canonical bool) (store bool, shortAddress string, err error) {
246246
addStr := add.String()
247247
// Try canonical cache
248-
short, err := a.offChainCache.GetShortAddress(add, canonical)
248+
short, err := a.offChainCache.GetShortAddress(ctx, add, canonical)
249249
if err == nil {
250250
return false, short, nil
251251
}
@@ -255,7 +255,7 @@ func (a *ActorsCache) getShortAddress(add address.Address, canonical bool) (stor
255255
if a.isBadAddress(add) {
256256
return false, "", fmt.Errorf("address %s is flagged as bad", addStr)
257257
}
258-
short, err = a.onChainCache.GetShortAddress(add, canonical)
258+
short, err = a.onChainCache.GetShortAddress(ctx, add, canonical)
259259
if err != nil {
260260
a.logger.Debugf("[ActorsCache] - Unable to retrieve short address from onchain cache for address %s.", addStr)
261261
return false, "", err
@@ -264,7 +264,7 @@ func (a *ActorsCache) getShortAddress(add address.Address, canonical bool) (stor
264264
return true, short, nil
265265
}
266266

267-
func (a *ActorsCache) getRobustAddress(add address.Address, canonical bool) (store bool, robustAddr string, err error) {
267+
func (a *ActorsCache) getRobustAddress(ctx context.Context, add address.Address, canonical bool) (store bool, robustAddr string, err error) {
268268
addStr := add.String()
269269
// check if the address is a system actor ( no robust address)
270270
if _, ok := SystemActorsId[addStr]; ok {
@@ -281,7 +281,7 @@ func (a *ActorsCache) getRobustAddress(add address.Address, canonical bool) (sto
281281
}
282282
}
283283

284-
robust, err := a.offChainCache.GetRobustAddress(add, canonical)
284+
robust, err := a.offChainCache.GetRobustAddress(ctx, add, canonical)
285285
if err == nil {
286286
return false, robust, nil
287287
}
@@ -290,7 +290,7 @@ func (a *ActorsCache) getRobustAddress(add address.Address, canonical bool) (sto
290290
if a.isBadAddress(add) {
291291
return false, "", fmt.Errorf("%w: address %s is flagged as bad", ErrBadAddress, addStr)
292292
}
293-
robust, err = a.onChainCache.GetRobustAddress(add, canonical)
293+
robust, err = a.onChainCache.GetRobustAddress(ctx, add, canonical)
294294
if err != nil {
295295
a.logger.Debugf("[ActorsCache] - Unable to retrieve robust address from onchain cache for address %s.", addStr)
296296
return false, "", err
@@ -299,9 +299,9 @@ func (a *ActorsCache) getRobustAddress(add address.Address, canonical bool) (sto
299299
return true, robust, nil
300300
}
301301

302-
func (a *ActorsCache) getActorCode(add address.Address, key filTypes.TipSetKey, onChainOnly, canonical bool) (store bool, actorCode string, err error) {
302+
func (a *ActorsCache) getActorCode(ctx context.Context, add address.Address, key filTypes.TipSetKey, onChainOnly, canonical bool) (store bool, actorCode string, err error) {
303303
addStr := add.String()
304-
actorCode, err = a.offChainCache.GetActorCode(add, key, onChainOnly, canonical)
304+
actorCode, err = a.offChainCache.GetActorCode(ctx, add, key, onChainOnly, canonical)
305305
if err == nil {
306306
return false, actorCode, nil
307307
}
@@ -311,7 +311,7 @@ func (a *ActorsCache) getActorCode(add address.Address, key filTypes.TipSetKey,
311311
return false, "", fmt.Errorf(" %w : %s is flagged as bad", ErrBadAddress, addStr)
312312
}
313313

314-
actorCode, err = a.onChainCache.GetActorCode(add, key, onChainOnly, canonical)
314+
actorCode, err = a.onChainCache.GetActorCode(ctx, add, key, onChainOnly, canonical)
315315
if err != nil {
316316
a.logger.Debugf("[ActorsCache] - Unable to retrieve actor code from onchain cache for address %s.", addStr)
317317
return false, "", err
@@ -324,8 +324,8 @@ func (a *ActorsCache) StoreEVMSelectorSig(ctx context.Context, selectorID string
324324
return a.offChainCache.StoreEVMSelectorSig(ctx, selectorID, sig, canonical)
325325
}
326326

327-
func (a *ActorsCache) storeActorCode(add address.Address, info types.AddressInfo) error {
328-
shortAddress, err := a.GetShortAddress(add, info.IsCanonical)
327+
func (a *ActorsCache) storeActorCode(ctx context.Context, add address.Address, info types.AddressInfo) error {
328+
shortAddress, err := a.GetShortAddress(ctx, add, info.IsCanonical)
329329
if err != nil {
330330
return err
331331
}
@@ -339,8 +339,8 @@ func (a *ActorsCache) storeActorCode(add address.Address, info types.AddressInfo
339339
return nil
340340
}
341341

342-
func (a *ActorsCache) storeShortAddress(add address.Address, info types.AddressInfo) error {
343-
_, robustAddress, err := a.getRobustAddress(add, info.IsCanonical)
342+
func (a *ActorsCache) storeShortAddress(ctx context.Context, add address.Address, info types.AddressInfo) error {
343+
_, robustAddress, err := a.getRobustAddress(ctx, add, info.IsCanonical)
344344
if err != nil {
345345
return err
346346
}
@@ -354,8 +354,8 @@ func (a *ActorsCache) storeShortAddress(add address.Address, info types.AddressI
354354
return nil
355355
}
356356

357-
func (a *ActorsCache) storeRobustAddress(add address.Address, info types.AddressInfo) error {
358-
_, shortAddress, err := a.getShortAddress(add, info.IsCanonical)
357+
func (a *ActorsCache) storeRobustAddress(ctx context.Context, add address.Address, info types.AddressInfo) error {
358+
_, shortAddress, err := a.getShortAddress(ctx, add, info.IsCanonical)
359359
if err != nil {
360360
return err
361361
}

actors/cache/impl/onchain.go

Lines changed: 56 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import (
1616
filTypes "github.com/filecoin-project/lotus/chain/types"
1717
"github.com/ipfs/go-cid"
1818
"github.com/zondax/fil-parser/actors/cache/impl/common"
19+
parserContext "github.com/zondax/fil-parser/context"
20+
1921
golemBackoff "github.com/zondax/golem/pkg/zhttpclient/backoff"
2022

2123
cacheMetrics "github.com/zondax/fil-parser/actors/cache/metrics"
@@ -27,7 +29,6 @@ const OnChainImpl = "on-chain"
2729

2830
// OnChain implementation
2931
type OnChain struct {
30-
Node api.FullNode
3132
logger *logger.Logger
3233
backoff *golemBackoff.BackOff
3334
metrics *cacheMetrics.ActorsCacheMetricsClient
@@ -46,11 +47,7 @@ func (m *OnChain) BackFill() error {
4647
func (m *OnChain) NewImpl(source common.DataSource, logger *logger.Logger, metrics *cacheMetrics.ActorsCacheMetricsClient, backoff *golemBackoff.BackOff) error {
4748
// Node datastore is required
4849
m.logger = logger2.GetSafeLogger(logger)
49-
if source.Node == nil {
50-
m.logger.Panic("[ActorsCache] - Node ptr is nil")
51-
}
5250

53-
m.Node = source.Node
5451
m.metrics = metrics
5552
m.backoff = backoff
5653
m.httpClient = resty.New().SetTimeout(30 * time.Second)
@@ -62,16 +59,30 @@ func (m *OnChain) ImplementationType() string {
6259
return OnChainImpl
6360
}
6461

65-
func (m *OnChain) GetActorCode(address address.Address, key filTypes.TipSetKey, _, _ bool) (string, error) {
66-
actorCid, err := m.retrieveActorFromLotus(address, key)
62+
func (m *OnChain) GetActorCode(ctx context.Context, address address.Address, key filTypes.TipSetKey, _, _ bool) (string, error) {
63+
var actorCid cid.Cid
64+
var err error
65+
66+
nodes, err := parserContext.GetNodes(ctx)
6767
if err != nil {
68-
return cid.Undef.String(), err
68+
return "", err
69+
}
70+
if len(nodes) == 0 {
71+
return "", fmt.Errorf("no nodes available")
72+
}
73+
for _, node := range nodes {
74+
actorCid, err = m.retrieveActorFromLotus(node, address, key)
75+
if err == nil {
76+
break
77+
} else if !IsRetriableError(err) {
78+
return cid.Undef.String(), err
79+
}
6980
}
7081

7182
return actorCid.String(), nil
7283
}
7384

74-
func (m *OnChain) GetRobustAddress(address address.Address, _ bool) (string, error) {
85+
func (m *OnChain) GetRobustAddress(ctx context.Context, address address.Address, _ bool) (string, error) {
7586
isRobustAddress, err := common.IsRobustAddress(address)
7687
if err != nil {
7788
return "", err
@@ -82,16 +93,28 @@ func (m *OnChain) GetRobustAddress(address address.Address, _ bool) (string, err
8293
return address.String(), nil
8394
}
8495

85-
// Address is not in cache, get robust address from lotus
86-
robustAdd, err := m.retrieveActorPubKeyFromLotus(address, false)
96+
var robustAdd string
97+
nodes, err := parserContext.GetNodes(ctx)
8798
if err != nil {
8899
return "", err
89100
}
101+
if len(nodes) == 0 {
102+
return "", fmt.Errorf("no nodes available")
103+
}
104+
// Address is not in cache, get robust address from lotus
105+
for _, node := range nodes {
106+
robustAdd, err = m.retrieveActorPubKeyFromLotus(node, address, false)
107+
if err == nil {
108+
break
109+
} else if !IsRetriableError(err) {
110+
return "", err
111+
}
112+
}
90113

91114
return robustAdd, nil
92115
}
93116

94-
func (m *OnChain) GetShortAddress(address address.Address, _ bool) (string, error) {
117+
func (m *OnChain) GetShortAddress(ctx context.Context, address address.Address, _ bool) (string, error) {
95118
isRobustAddress, err := common.IsRobustAddress(address)
96119
if err != nil {
97120
return "", err
@@ -102,9 +125,21 @@ func (m *OnChain) GetShortAddress(address address.Address, _ bool) (string, erro
102125
return address.String(), nil
103126
}
104127

105-
shortAdd, err := m.retrieveActorPubKeyFromLotus(address, true)
128+
var shortAdd string
129+
nodes, err := parserContext.GetNodes(ctx)
106130
if err != nil {
107-
return "", common.ErrKeyNotFound
131+
return "", err
132+
}
133+
if len(nodes) == 0 {
134+
return "", fmt.Errorf("no nodes available")
135+
}
136+
for _, node := range nodes {
137+
shortAdd, err = m.retrieveActorPubKeyFromLotus(node, address, true)
138+
if err == nil {
139+
break
140+
} else if !IsRetriableError(err) {
141+
return "", common.ErrKeyNotFound
142+
}
108143
}
109144

110145
return shortAdd, nil
@@ -124,12 +159,12 @@ func (m *OnChain) IsGenesisActor(_ string) bool {
124159
return false
125160
}
126161

127-
func (m *OnChain) retrieveActorFromLotus(add address.Address, key filTypes.TipSetKey) (cid.Cid, error) {
162+
func (m *OnChain) retrieveActorFromLotus(node api.FullNode, add address.Address, key filTypes.TipSetKey) (cid.Cid, error) {
128163
nodeApiCallOptions := &NodeApiCallWithRetryOptions[*filTypes.Actor]{
129164
RequestName: "StateGetActorWithTipSetKey",
130165
BackOff: *m.backoff,
131166
Request: func() (*filTypes.Actor, error) {
132-
return m.Node.StateGetActor(context.Background(), add, key)
167+
return node.StateGetActor(context.Background(), add, key)
133168
},
134169
RetryErrStrings: []string{"ipld: could not find", "RPC client error", "503"},
135170
}
@@ -139,7 +174,7 @@ func (m *OnChain) retrieveActorFromLotus(add address.Address, key filTypes.TipSe
139174
// Try again but with an empty tipset Key
140175
nodeApiCallOptions.RequestName = "StateGetActor"
141176
nodeApiCallOptions.Request = func() (*filTypes.Actor, error) {
142-
return m.Node.StateGetActor(context.Background(), add, filTypes.EmptyTSK)
177+
return node.StateGetActor(context.Background(), add, filTypes.EmptyTSK)
143178
}
144179
actor, err = NodeApiCallWithRetry(nodeApiCallOptions, m.metrics)
145180
if err != nil {
@@ -151,7 +186,7 @@ func (m *OnChain) retrieveActorFromLotus(add address.Address, key filTypes.TipSe
151186
return actor.Code, nil
152187
}
153188

154-
func (m *OnChain) retrieveActorPubKeyFromLotus(add address.Address, reverse bool) (string, error) {
189+
func (m *OnChain) retrieveActorPubKeyFromLotus(node api.FullNode, add address.Address, reverse bool) (string, error) {
155190
var key address.Address
156191
var err error
157192

@@ -163,13 +198,13 @@ func (m *OnChain) retrieveActorPubKeyFromLotus(add address.Address, reverse bool
163198
if reverse {
164199
nodeApiCallOptions.RequestName = "StateLookupID"
165200
nodeApiCallOptions.Request = func() (address.Address, error) {
166-
return m.Node.StateLookupID(context.Background(), add, filTypes.EmptyTSK)
201+
return node.StateLookupID(context.Background(), add, filTypes.EmptyTSK)
167202
}
168203
key, err = NodeApiCallWithRetry(nodeApiCallOptions, m.metrics)
169204
} else {
170205
nodeApiCallOptions.RequestName = "StateAccountKey"
171206
nodeApiCallOptions.Request = func() (address.Address, error) {
172-
return m.Node.StateAccountKey(context.Background(), add, filTypes.EmptyTSK)
207+
return node.StateAccountKey(context.Background(), add, filTypes.EmptyTSK)
173208
}
174209
key, err = NodeApiCallWithRetry(nodeApiCallOptions, m.metrics)
175210
}
@@ -178,7 +213,7 @@ func (m *OnChain) retrieveActorPubKeyFromLotus(add address.Address, reverse bool
178213
if strings.Contains(err.Error(), "actor code is not account") {
179214
nodeApiCallOptions.RequestName = "StateLookupRobustAddress"
180215
nodeApiCallOptions.Request = func() (address.Address, error) {
181-
return m.Node.StateLookupRobustAddress(context.Background(), add, filTypes.EmptyTSK)
216+
return node.StateLookupRobustAddress(context.Background(), add, filTypes.EmptyTSK)
182217
}
183218
key, err = NodeApiCallWithRetry(nodeApiCallOptions, m.metrics)
184219
if err != nil {

0 commit comments

Comments
 (0)