Skip to content

Commit 15e3039

Browse files
committed
Switch cl-el comms to RpcChannel
Instead of using a socket (and having to open a fake connection between EL and CL), this pr switches to status-im/nim-json-rpc#254 for internal communication. Eventually, one could make this more efficient by skipping the JSON step, but like this, we at least no longer have to open an Engine API port which makes this setup more secure and easy to deploy (fewer open ports). There's also fewer potential errors to contend with and payloads don't have to travel across the OS buffers and instead stay internal to the process.
1 parent 1bc3bc8 commit 15e3039

10 files changed

Lines changed: 57 additions & 54 deletions

File tree

.gitmodules

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,3 +238,7 @@
238238
path = vendor/nim-quic
239239
url = https://github.com/vacp2p/nim-quic
240240
branch = main
241+
[submodule "vendor/nim-async-channels"]
242+
path = vendor/nim-async-channels
243+
url = https://github.com/status-im/nim-async-channels
244+
branch = master

execution_chain/conf.nim

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,12 @@ type
447447
defaultValue: false
448448
name: "engine-api" .}: bool
449449

450+
engineApiChannelEnabled* {.
451+
hidden
452+
desc: "Enable the Engine API Channel"
453+
defaultValue: false
454+
name: "debug-engine-api-channel" .}: bool
455+
450456
engineApiPort* {.
451457
desc: "Listening port for the Engine API(http and ws)"
452458
defaultValue: defaultEngineApiPort
@@ -771,7 +777,7 @@ func getAllowedOrigins*(config: ExecutionClientConf): seq[Uri] =
771777
result.add parseUri(item)
772778

773779
func engineApiServerEnabled*(config: ExecutionClientConf): bool =
774-
config.engineApiEnabled or config.engineApiWsEnabled
780+
config.engineApiEnabled or config.engineApiWsEnabled or config.engineApiChannelEnabled
775781

776782
func shareServerWithEngineApi*(config: ExecutionClientConf): bool =
777783
config.engineApiServerEnabled and

execution_chain/el_sync.nim

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ import
1818
web3/[engine_api, primitives, conversions],
1919
beacon_chain/consensus_object_pools/blockchain_dag,
2020
beacon_chain/el/[el_manager, engine_api_conversions],
21-
beacon_chain/spec/[forks, presets, state_transition_block]
21+
beacon_chain/spec/[forks, presets, state_transition_block],
22+
json_rpc/client
2223

2324
logScope:
2425
topics = "elsync"
@@ -87,24 +88,15 @@ proc findSlot(
8788

8889
Opt.some importedSlot
8990

90-
proc syncToEngineApi*(dag: ChainDAGRef, url: EngineApiUrl) {.async.} =
91+
proc syncToEngineApi*(dag: ChainDAGRef, rpcClient: RpcClient) {.async.} =
9192
# Takes blocks from the CL and sends them to the EL - the attempt is made
9293
# optimistically until something unexpected happens (reorg etc) at which point
9394
# the process ends
9495

9596
let
9697
# Create the client for the engine api
97-
# And exchange the capabilities for a test communication
98-
web3 = await url.newWeb3()
99-
rpcClient = web3.provider
10098
(lastEra1Block, firstSlotAfterMerge) = dag.cfg.loadNetworkConfig()
10199

102-
defer:
103-
try:
104-
await web3.close()
105-
except:
106-
discard
107-
108100
# Load the EL state detials and create the beaconAPI client
109101
var elBlockNumber = uint64(await rpcClient.eth_blockNumber())
110102

execution_chain/nimbus.nim

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,15 @@ proc workaround*(): int {.exportc.} =
1616
return int(Future[Quantity]().internalValue)
1717

1818
import
19-
std/[os, net, options, strformat, terminal, typetraits],
19+
std/[os, net, options, terminal, typetraits],
2020
stew/io2,
2121
chronos/threadsync,
2222
chronicles,
2323
metrics,
2424
metrics/chronos_httpserver,
25-
nimcrypto/sysrand,
2625
eth/enr/enr,
2726
eth/net/nat,
27+
json_rpc/rpcchannels,
2828
eth/p2p/discoveryv5/random2,
2929
beacon_chain/spec/[engine_authentication],
3030
beacon_chain/validators/keystore_management,
@@ -36,7 +36,6 @@ import
3636
nimbus_binary_common,
3737
process_state,
3838
],
39-
./rpc/jwt_auth,
4039
./[
4140
constants,
4241
conf as ecconf,
@@ -170,13 +169,13 @@ type
170169
tcpPort: Port
171170
udpPort: Port
172171
elSync: bool
172+
channel: RpcChannelPtrs
173173

174174
ExecutionThreadConfig = object
175175
tsp: ThreadSignalPtr
176176
tcpPort: Port
177177
udpPort: Option[Port]
178-
179-
var jwtKey: JwtSharedKey
178+
channel: RpcChannelPtrs
180179

181180
proc dataDir*(config: NimbusConf): string =
182181
string config.dataDirFlag.get(
@@ -190,14 +189,14 @@ proc justWait(tsp: ThreadSignalPtr) {.async: (raises: [CancelledError]).} =
190189
notice "Waiting failed", err = exc.msg
191190

192191
proc elSyncLoop(
193-
dag: ChainDAGRef, url: EngineApiUrl
192+
dag: ChainDAGRef, elManager: ELManager
194193
) {.async: (raises: [CancelledError]).} =
195194
while true:
196195
await sleepAsync(12.seconds)
197196

198197
# TODO trigger only when the EL needs syncing
199198
try:
200-
await syncToEngineApi(dag, url)
199+
await syncToEngineApi(dag, elManager.channel())
201200
except CatchableError as exc:
202201
# This can happen when the EL is busy doing some work, specially on
203202
# startup
@@ -208,17 +207,8 @@ proc runBeaconNode(p: BeaconThreadConfig) {.thread.} =
208207
stderr.writeLine error # Logging not yet set up
209208
quit QuitFailure
210209

211-
let engineUrl = EngineApiUrl.init(
212-
&"http://127.0.0.1:{defaultEngineApiPort}/", Opt.some(@(distinctBase(jwtKey)))
213-
)
214-
215210
config.metricsEnabled = false
216-
config.elUrls =
217-
@[
218-
EngineApiUrlConfigValue(
219-
url: engineUrl.url, jwtSecret: some toHex(distinctBase(jwtKey))
220-
)
221-
]
211+
config.elUrls = @[EngineApiUrlConfigValue(channel: Opt.some(p.channel))]
222212
config.statusBarEnabled = false # Multi-threading issues due to logging
223213
config.tcpPort = p.tcpPort
224214
config.udpPort = p.udpPort
@@ -244,7 +234,7 @@ proc runBeaconNode(p: BeaconThreadConfig) {.thread.} =
244234
return
245235

246236
if p.elSync:
247-
discard elSyncLoop(node.dag, engineUrl)
237+
discard elSyncLoop(node.dag, node.elManager)
248238

249239
dynamicLogScope(comp = "bn"):
250240
if node.nickname != "":
@@ -259,11 +249,8 @@ proc runBeaconNode(p: BeaconThreadConfig) {.thread.} =
259249
proc runExecutionClient(p: ExecutionThreadConfig) {.thread.} =
260250
var config = makeConfig(ignoreUnknown = true)
261251
config.metricsEnabled = false
262-
config.engineApiEnabled = true
263-
config.engineApiPort = Port(defaultEngineApiPort)
264-
config.engineApiAddress = defaultAdminListenAddress
265-
config.jwtSecret.reset()
266-
config.jwtSecretValue = some toHex(distinctBase(jwtKey))
252+
config.engineApiEnabled = false
253+
config.engineApiChannelEnabled = true
267254
config.agentString = "nimbus"
268255
config.tcpPort = p.tcpPort
269256
config.udpPortFlag = p.udpPort
@@ -281,16 +268,14 @@ proc runExecutionClient(p: ExecutionThreadConfig) {.thread.} =
281268
let com = setupCommonRef(config)
282269

283270
dynamicLogScope(comp = "ec"):
284-
nimbus_execution_client.runExeClient(config, com, p.tsp.justWait())
271+
nimbus_execution_client.runExeClient(
272+
config, com, p.tsp.justWait(), channel = Opt.some p.channel
273+
)
285274

286275
# Stop the other thread as well, in case `runExeClient` stopped early
287276
waitFor p.tsp.fire()
288277

289278
proc runCombinedClient() =
290-
# Make it harder to connect to the (internal) engine - this will of course
291-
# go away
292-
discard randomBytes(distinctBase(jwtKey))
293-
294279
const banner = "Nimbus v0.0.1"
295280

296281
var config = NimbusConf.loadWithBanners(banner, copyright, [specBanner], true).valueOr:
@@ -329,6 +314,9 @@ proc runCombinedClient() =
329314
"Baked-in KZG setup is correct"
330315
)
331316

317+
var channel: RpcChannel
318+
let pairs = channel.open().expect("working channel")
319+
332320
var bnThread: Thread[BeaconThreadConfig]
333321
let bnStop = ThreadSignalPtr.new().expect("working ThreadSignalPtr")
334322
createThread(
@@ -339,6 +327,7 @@ proc runCombinedClient() =
339327
tcpPort: config.beaconTcpPort.get(config.tcpPort.get(Port defaultEth2TcpPort)),
340328
udpPort: config.beaconUdpPort.get(config.udpPort.get(Port defaultEth2TcpPort)),
341329
elSync: config.elSync,
330+
channel: pairs,
342331
),
343332
)
344333

@@ -361,6 +350,7 @@ proc runCombinedClient() =
361350
some(Port(uint16(config.udpPort.get()) + 1))
362351
else:
363352
none(Port),
353+
channel: pairs,
364354
),
365355
)
366356

execution_chain/nimbus_desc.nim

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import
2222
./sync/snap as snap_sync,
2323
./sync/wire_protocol,
2424
./beacon/beacon_engine,
25-
./common
25+
./common,
26+
json_rpc/rpcchannels
2627

2728
when enabledLogLevel == TRACE:
2829
import std/sequtils
@@ -44,6 +45,7 @@ type
4445
NimbusNode* = ref object
4546
httpServer*: NimbusHttpServerRef
4647
engineApiServer*: NimbusHttpServerRef
48+
engineApiChannel*: RpcChannelServer
4749
ethNode*: EthereumNode
4850
fc*: ForkedChainRef
4951
txPool*: TxPoolRef

execution_chain/nimbus_execution_client.nim

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -208,14 +208,14 @@ proc setupP2P(nimbus: NimbusNode, config: ExecutionClientConf, com: CommonRef) =
208208
nimbus.beaconSyncRef = BeaconSyncRef(nil)
209209
nimbus.snapSyncRef = SnapSyncRef(nil)
210210

211-
proc init*(nimbus: NimbusNode, config: ExecutionClientConf, com: CommonRef) =
211+
proc init*(nimbus: NimbusNode, config: ExecutionClientConf, com: CommonRef, channel: Opt[RpcChannelPtrs]) =
212212
nimbus.accountsManager = new AccountsManager
213213
nimbus.rng = newRng()
214214

215215
basicServices(nimbus, config, com)
216216
manageAccounts(nimbus, config)
217217
setupP2P(nimbus, config, com)
218-
setupRpc(nimbus, config, com)
218+
setupRpc(nimbus, config, com, channel)
219219

220220
# Not starting any syncer if there is definitely no way to run it. This
221221
# avoids polling (i.e. waiting for instructions) and some logging.
@@ -232,9 +232,9 @@ proc init*(nimbus: NimbusNode, config: ExecutionClientConf, com: CommonRef) =
232232
nimbus.beaconSyncRef = BeaconSyncRef(nil)
233233
nimbus.snapSyncRef = SnapSyncRef(nil)
234234

235-
proc init*(T: type NimbusNode, config: ExecutionClientConf, com: CommonRef): T =
235+
proc init*(T: type NimbusNode, config: ExecutionClientConf, com: CommonRef, channel: Opt[RpcChannelPtrs]): T =
236236
let nimbus = T()
237-
nimbus.init(config, com)
237+
nimbus.init(config, com, channel)
238238
nimbus
239239

240240
proc preventLoadingDataDirForTheWrongNetwork(db: CoreDbRef; config: ExecutionClientConf) =
@@ -306,16 +306,17 @@ proc runExeClient*(
306306
com: CommonRef,
307307
stopper: StopFuture,
308308
nimbus = NimbusNode(nil),
309+
channel = Opt.none(RpcChannelPtrs),
309310
) =
310311
## Launches and runs the execution client for pre-configured `nimbus` and
311312
## `conf` argument descriptors.
312313
##
313314

314315
var nimbus = nimbus
315316
if nimbus.isNil:
316-
nimbus = NimbusNode.init(config, com)
317+
nimbus = NimbusNode.init(config, com, channel)
317318
else:
318-
nimbus.init(config, com)
319+
nimbus.init(config, com, channel)
319320

320321
defer:
321322
let

execution_chain/rpc.nim

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import
1313
chronicles,
1414
websock/websock,
15-
json_rpc/rpcserver,
15+
json_rpc/[rpcserver, rpcchannels],
1616
./rpc/[common, cors, debug, engine_api, jwt_auth, rpc_server, server_api],
1717
./[conf, nimbus_desc]
1818

@@ -23,7 +23,8 @@ export
2323
jwt_auth,
2424
cors,
2525
rpc_server,
26-
server_api
26+
server_api,
27+
rpcchannels
2728

2829
const DefaultChunkSize = 1024*1024
2930

@@ -53,7 +54,6 @@ func installRPC(server: RpcServer,
5354
if RpcFlag.Debug in flags:
5455
setupDebugRpc(com, nimbus.txPool, server)
5556

56-
5757
proc newRpcWebsocketHandler(): RpcWebSocketHandler =
5858
let rng = HmacDrbgContext.new()
5959
RpcWebSocketHandler(
@@ -198,8 +198,8 @@ proc addServices(handlers: var seq[RpcHandlerProc],
198198
handlers.addHandler(server)
199199

200200
proc setupRpc*(nimbus: NimbusNode, config: ExecutionClientConf,
201-
com: CommonRef) =
202-
if not config.engineApiEnabled:
201+
com: CommonRef, channel: Opt[RpcChannelPtrs]) =
202+
if not config.engineApiEnabled and channel.isNone():
203203
warn "Engine API disabled, the node will not respond to consensus client updates (enable with `--engine-api`)"
204204

205205
if not config.serverEnabled:
@@ -257,3 +257,10 @@ proc setupRpc*(nimbus: NimbusNode, config: ExecutionClientConf,
257257
quit(QuitFailure)
258258
nimbus.engineApiServer = res.get
259259
nimbus.engineApiServer.start()
260+
261+
if channel.isSome():
262+
nimbus.engineApiChannel = RpcChannelServer.new(channel[])
263+
264+
setupEngineAPI(nimbus.beaconEngine, nimbus.engineApiChannel)
265+
installRPC(nimbus.engineApiChannel, nimbus, config, com, serverApi, {RpcFlag.Eth})
266+
nimbus.engineApiChannel.start()

vendor/nim-async-channels

Submodule nim-async-channels added at e63ac36

0 commit comments

Comments
 (0)