@@ -2,24 +2,29 @@ package io.libp2p.core.dsl
22
33import identify.pb.IdentifyOuterClass
44import io.libp2p.core.AddressBook
5+ import io.libp2p.core.ChannelVisitor
6+ import io.libp2p.core.Connection
57import io.libp2p.core.ConnectionHandler
68import io.libp2p.core.Host
7- import io.libp2p.core.StreamHandler
9+ import io.libp2p.core.P2PChannel
10+ import io.libp2p.core.Stream
811import io.libp2p.core.crypto.KEY_TYPE
912import io.libp2p.core.crypto.PrivKey
1013import io.libp2p.core.crypto.generateKeyPair
1114import io.libp2p.core.multiformats.Multiaddr
12- import io.libp2p.core.multistream.Multistream
15+ import io.libp2p.core.multistream.MultistreamProtocol
16+ import io.libp2p.core.multistream.MultistreamProtocolDebug
17+ import io.libp2p.core.multistream.MultistreamProtocolV1
1318import io.libp2p.core.multistream.ProtocolBinding
1419import io.libp2p.core.mux.StreamMuxer
1520import io.libp2p.core.mux.StreamMuxerDebug
21+ import io.libp2p.core.mux.StreamMuxerProtocol
1622import io.libp2p.core.security.SecureChannel
1723import io.libp2p.core.transport.Transport
1824import io.libp2p.etc.types.lazyVar
1925import io.libp2p.etc.types.toProtobuf
2026import io.libp2p.host.HostImpl
2127import io.libp2p.host.MemoryAddressBook
22- import io.libp2p.mux.mplex.MplexStreamMuxer
2328import io.libp2p.network.NetworkImpl
2429import io.libp2p.protocol.IdentifyBinding
2530import io.libp2p.security.secio.SecIoSecureChannel
@@ -31,8 +36,6 @@ import io.netty.handler.logging.LoggingHandler
3136
3237typealias TransportCtor = (ConnectionUpgrader ) -> Transport
3338typealias SecureChannelCtor = (PrivKey ) -> SecureChannel
34- typealias StreamMuxerCtor = () -> StreamMuxer
35- typealias ProtocolCtor = () -> ProtocolBinding <* >
3639typealias IdentityFactory = () -> PrivKey
3740
3841class HostConfigurationException (message : String ) : RuntimeException(message)
@@ -58,6 +61,10 @@ open class Builder {
5861 protected open val connectionHandlers = ConnectionHandlerBuilder ()
5962 protected open val network = NetworkConfigBuilder ()
6063 protected open val debug = DebugBuilder ()
64+ var multistreamProtocol: MultistreamProtocol = MultistreamProtocolV1
65+ var secureMultistreamProtocol: MultistreamProtocol by lazyVar { multistreamProtocol }
66+ var muxerMultistreamProtocol: MultistreamProtocol by lazyVar { multistreamProtocol }
67+ var streamMultistreamProtocol: MultistreamProtocol by lazyVar { multistreamProtocol }
6168
6269 /* *
6370 * Sets an identity for this host. If unset, libp2p will default to a random identity.
@@ -125,23 +132,35 @@ open class Builder {
125132 if (identity.factory == null ) identity.random()
126133 if (transports.values.isEmpty()) transports { add(::TcpTransport ) }
127134 if (secureChannels.values.isEmpty()) secureChannels { add(::SecIoSecureChannel ) }
128- if (muxers.values.isEmpty()) muxers { add(:: MplexStreamMuxer ) }
135+ if (muxers.values.isEmpty()) muxers { add(StreamMuxerProtocol . Mplex ) }
129136 }
130137
131- val privKey = identity.factory!! ()
138+ if (debug.beforeSecureHandler.handlers.isNotEmpty()) {
139+ (secureMultistreamProtocol as ? MultistreamProtocolDebug )?.also {
140+ val broadcast = ChannelVisitor .createBroadcast(* debug.beforeSecureHandler.handlers.toTypedArray())
141+ secureMultistreamProtocol = it.copyWithHandlers(preHandler = broadcast.toChannelHandler())
142+ } ? : throw IllegalStateException (" beforeSecureHandler can't be installed as MultistreamProtocol doesn't support debugging interface: ${secureMultistreamProtocol.javaClass} " )
143+ }
132144
133- val secureChannels = secureChannels.values.map { it(privKey) }
134- val muxers = muxers.values.map { it() }
145+ if (debug.afterSecureHandler.handlers.isNotEmpty()) {
146+ (muxerMultistreamProtocol as ? MultistreamProtocolDebug )?.also {
147+ val broadcast = ChannelVisitor .createBroadcast(* debug.afterSecureHandler.handlers.toTypedArray())
148+ muxerMultistreamProtocol = it.copyWithHandlers(preHandler = broadcast.toChannelHandler())
149+ } ? : throw IllegalStateException (" afterSecureHandler can't be installed as MultistreamProtocol doesn't support debugging interface: ${muxerMultistreamProtocol.javaClass} " )
150+ }
135151
136- muxers.mapNotNull { it as ? StreamMuxerDebug }.forEach { it.muxFramesDebugHandler = debug.muxFramesHandler.handler }
152+ val streamVisitors = ChannelVisitor .createBroadcast<Stream >()
153+ (streamMultistreamProtocol as ? MultistreamProtocolDebug )?.also {
154+ val broadcastPre =
155+ ChannelVisitor .createBroadcast(* (debug.streamPreHandler.handlers + (streamVisitors as ChannelVisitor <Stream >)).toTypedArray())
156+ val broadcast = ChannelVisitor .createBroadcast(* debug.streamHandler.handlers.toTypedArray())
157+ streamMultistreamProtocol =
158+ it.copyWithHandlers(broadcastPre.toChannelHandler(), broadcast.toChannelHandler())
159+ } ? : throw IllegalStateException (" streamPreHandler or streamHandler can't be installed as MultistreamProtocol doesn't support debugging interface: ${streamMultistreamProtocol.javaClass} " )
137160
138- val upgrader = ConnectionUpgrader (secureChannels, muxers).apply {
139- beforeSecureHandler = debug.beforeSecureHandler.handler
140- afterSecureHandler = debug.afterSecureHandler.handler
141- }
161+ val privKey = identity.factory!! ()
142162
143- val transports = transports.values.map { it(upgrader) }
144- val addressBook = addressBook.impl
163+ val secureChannels = secureChannels.values.map { it(privKey) }
145164
146165 protocols.values.mapNotNull { (it as ? IdentifyBinding ) }.map { it.protocol }.find { it.idMessage == null }?.apply {
147166 // initializing Identify with appropriate values
@@ -156,16 +175,23 @@ open class Builder {
156175 }
157176 }
158177
159- val protocolsMultistream: Multistream <Any > = Multistream .create(protocols.values)
160- val broadcastStreamHandler = StreamHandler .createBroadcast()
161- val allStreamHandlers = StreamHandler .createBroadcast(
162- protocolsMultistream.toStreamHandler(), broadcastStreamHandler
163- )
178+ val muxers = muxers.map { it.createMuxer(streamMultistreamProtocol, protocols.values) }
179+
180+ if (debug.muxFramesHandler.handlers.isNotEmpty()) {
181+ val broadcast = ChannelVisitor .createBroadcast(* debug.muxFramesHandler.handlers.toTypedArray())
182+ muxers.mapNotNull { it as ? StreamMuxerDebug }.forEach {
183+ it.muxFramesDebugHandler = broadcast
184+ }
185+ }
186+
187+ val upgrader = ConnectionUpgrader (secureMultistreamProtocol, secureChannels, muxerMultistreamProtocol, muxers)
188+
189+ val transports = transports.values.map { it(upgrader) }
190+ val addressBook = addressBook.impl
164191
165192 val connHandlerProtocols = protocols.values.mapNotNull { it as ? ConnectionHandler }
166193 val broadcastConnHandler = ConnectionHandler .createBroadcast(
167- listOf (ConnectionHandler .createStreamHandlerInitializer(allStreamHandlers)) +
168- connHandlerProtocols +
194+ connHandlerProtocols +
169195 connectionHandlers.values
170196 )
171197 val networkImpl = NetworkImpl (transports, broadcastConnHandler)
@@ -175,9 +201,9 @@ open class Builder {
175201 networkImpl,
176202 addressBook,
177203 network.listen.map { Multiaddr (it) },
178- protocolsMultistream ,
204+ protocols.values ,
179205 broadcastConnHandler,
180- broadcastStreamHandler
206+ streamVisitors
181207 )
182208 }
183209}
@@ -203,7 +229,7 @@ class AddressBookBuilder {
203229
204230class TransportsBuilder : Enumeration <TransportCtor >()
205231class SecureChannelsBuilder : Enumeration <SecureChannelCtor >()
206- class MuxersBuilder : Enumeration <StreamMuxerCtor >()
232+ class MuxersBuilder : Enumeration <StreamMuxerProtocol >()
207233class ProtocolsBuilder : Enumeration <ProtocolBinding <Any >>()
208234class ConnectionHandlerBuilder : Enumeration <ConnectionHandler >()
209235
@@ -212,24 +238,36 @@ class DebugBuilder {
212238 * Injects the [ChannelHandler] to the wire closest point.
213239 * Could be primarily useful for security handshake debugging/monitoring
214240 */
215- val beforeSecureHandler = DebugHandlerBuilder (" wire.sec.before" )
241+ val beforeSecureHandler = DebugHandlerBuilder < Connection > (" wire.sec.before" )
216242 /* *
217243 * Injects the [ChannelHandler] right after the connection cipher
218244 * to handle plain wire messages
219245 */
220- val afterSecureHandler = DebugHandlerBuilder (" wire.sec.after" )
246+ val afterSecureHandler = DebugHandlerBuilder < Connection > (" wire.sec.after" )
221247 /* *
222248 * Injects the [ChannelHandler] right after the [StreamMuxer] pipeline handler
223249 * It intercepts [io.libp2p.mux.MuxFrame] instances
224250 */
225- val muxFramesHandler = DebugHandlerBuilder (" wire.mux.frames" )
251+ val muxFramesHandler = DebugHandlerBuilder <Connection >(" wire.mux.frames" )
252+
253+ val streamPreHandler = DebugHandlerBuilder <Stream >(" wire.stream.pre" )
254+
255+ val streamHandler = DebugHandlerBuilder <Stream >(" wire.stream" )
226256}
227257
228- class DebugHandlerBuilder (var name : String ) {
229- var handler: ChannelHandler ? = null
258+ class DebugHandlerBuilder <TChannel : P2PChannel >(var name : String ) {
259+ val handlers = mutableListOf<ChannelVisitor <TChannel >>()
260+
261+ fun addHandler (handler : ChannelVisitor <TChannel >) {
262+ handlers + = handler
263+ }
264+
265+ fun addNettyHandler (handler : ChannelHandler ) {
266+ addHandler { it.pushHandler(handler) }
267+ }
230268
231- fun setLogger (level : LogLevel , loggerName : String = name) {
232- handler = LoggingHandler (loggerName, level)
269+ fun addLogger (level : LogLevel , loggerName : String = name) {
270+ addNettyHandler( LoggingHandler (loggerName, level) )
233271 }
234272}
235273
0 commit comments