1
// Copyright 2017 Parity Technologies (UK) Ltd.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the "Software"),
5
// to deal in the Software without restriction, including without limitation
6
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7
// and/or sell copies of the Software, and to permit persons to whom the
8
// Software is furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19
// DEALINGS IN THE SOFTWARE.
20

            
21
//! Implementation of the libp2p [`libp2p_core::Transport`] trait for TCP/IP.
22
//!
23
//! # Usage
24
//!
25
//! This crate provides a [`async_io::Transport`] and [`tokio::Transport`], depending on
26
//! the enabled features, which implement the [`libp2p_core::Transport`] trait for use as a
27
//! transport with `libp2p-core` or `libp2p-swarm`.
28

            
29
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
30

            
31
mod provider;
32

            
33
#[cfg(feature = "async-io")]
34
pub use provider::async_io;
35

            
36
#[cfg(feature = "tokio")]
37
pub use provider::tokio;
38

            
39
use futures::{
40
    future::{self, Ready},
41
    prelude::*,
42
    stream::SelectAll,
43
};
44
use futures_timer::Delay;
45
use if_watch::IfEvent;
46
use libp2p_core::{
47
    address_translation,
48
    multiaddr::{Multiaddr, Protocol},
49
    transport::{ListenerId, TransportError, TransportEvent},
50
};
51
use provider::{Incoming, Provider};
52
use socket2::{Domain, Socket, Type};
53
use std::{
54
    collections::{HashSet, VecDeque},
55
    io,
56
    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpListener},
57
    pin::Pin,
58
    sync::{Arc, RwLock},
59
    task::{Context, Poll, Waker},
60
    time::Duration,
61
};
62

            
63
/// The configuration for a TCP/IP transport capability for libp2p.
64
#[derive(Clone, Debug)]
65
pub struct Config {
66
    /// TTL to set for opened sockets, or `None` to keep default.
67
    ttl: Option<u32>,
68
    /// `TCP_NODELAY` to set for opened sockets, or `None` to keep default.
69
    nodelay: Option<bool>,
70
    /// Size of the listen backlog for listen sockets.
71
    backlog: u32,
72
    /// Whether port reuse should be enabled.
73
    enable_port_reuse: bool,
74
}
75

            
76
type Port = u16;
77

            
78
/// The configuration for port reuse of listening sockets.
79
#[derive(Debug, Clone)]
80
enum PortReuse {
81
    /// Port reuse is disabled, i.e. ephemeral local ports are
82
    /// used for outgoing TCP connections.
83
    Disabled,
84
    /// Port reuse when dialing is enabled, i.e. the local
85
    /// address and port that a new socket for an outgoing
86
    /// connection is bound to are chosen from an existing
87
    /// listening socket, if available.
88
    Enabled {
89
        /// The addresses and ports of the listening sockets
90
        /// registered as eligible for port reuse when dialing.
91
        listen_addrs: Arc<RwLock<HashSet<(IpAddr, Port)>>>,
92
    },
93
}
94

            
95
impl PortReuse {
96
    /// Registers a socket address for port reuse.
97
    ///
98
    /// Has no effect if port reuse is disabled.
99
    fn register(&mut self, ip: IpAddr, port: Port) {
100
        if let PortReuse::Enabled { listen_addrs } = self {
101
            log::trace!("Registering for port reuse: {}:{}", ip, port);
102
            listen_addrs
103
                .write()
104
                .expect("`register()` and `unregister()` never panic while holding the lock")
105
                .insert((ip, port));
106
        }
107
    }
108

            
109
    /// Unregisters a socket address for port reuse.
110
    ///
111
    /// Has no effect if port reuse is disabled.
112
    fn unregister(&mut self, ip: IpAddr, port: Port) {
113
        if let PortReuse::Enabled { listen_addrs } = self {
114
            log::trace!("Unregistering for port reuse: {}:{}", ip, port);
115
            listen_addrs
116
                .write()
117
                .expect("`register()` and `unregister()` never panic while holding the lock")
118
                .remove(&(ip, port));
119
        }
120
    }
121

            
122
    /// Selects a listening socket address suitable for use
123
    /// as the local socket address when dialing.
124
    ///
125
    /// If multiple listening sockets are registered for port
126
    /// reuse, one is chosen whose IP protocol version and
127
    /// loopback status is the same as that of `remote_ip`.
128
    ///
129
    /// Returns `None` if port reuse is disabled or no suitable
130
    /// listening socket address is found.
131
    fn local_dial_addr(&self, remote_ip: &IpAddr) -> Option<SocketAddr> {
132
        if let PortReuse::Enabled { listen_addrs } = self {
133
            for (ip, port) in listen_addrs
134
                .read()
135
                .expect("`local_dial_addr` never panic while holding the lock")
136
                .iter()
137
            {
138
                if ip.is_ipv4() == remote_ip.is_ipv4()
139
                    && ip.is_loopback() == remote_ip.is_loopback()
140
                {
141
                    if remote_ip.is_ipv4() {
142
                        return Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), *port));
143
                    } else {
144
                        return Some(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), *port));
145
                    }
146
                }
147
            }
148
        }
149

            
150
        None
151
    }
152
}
153

            
154
impl Config {
155
    /// Creates a new configuration for a TCP/IP transport:
156
    ///
157
    ///   * Nagle's algorithm, i.e. `TCP_NODELAY`, is _enabled_.
158
    ///     See [`Config::nodelay`].
159
    ///   * Reuse of listening ports is _disabled_.
160
    ///     See [`Config::port_reuse`].
161
    ///   * No custom `IP_TTL` is set. The default of the OS TCP stack applies.
162
    ///     See [`Config::ttl`].
163
    ///   * The size of the listen backlog for new listening sockets is `1024`.
164
    ///     See [`Config::listen_backlog`].
165
    pub fn new() -> Self {
166
        Self {
167
            ttl: None,
168
            nodelay: None,
169
            backlog: 1024,
170
            enable_port_reuse: false,
171
        }
172
    }
173

            
174
    /// Configures the `IP_TTL` option for new sockets.
175
    pub fn ttl(mut self, value: u32) -> Self {
176
        self.ttl = Some(value);
177
        self
178
    }
179

            
180
    /// Configures the `TCP_NODELAY` option for new sockets.
181
    pub fn nodelay(mut self, value: bool) -> Self {
182
        self.nodelay = Some(value);
183
        self
184
    }
185

            
186
    /// Configures the listen backlog for new listen sockets.
187
    pub fn listen_backlog(mut self, backlog: u32) -> Self {
188
        self.backlog = backlog;
189
        self
190
    }
191

            
192
    /// Configures port reuse for local sockets, which implies
193
    /// reuse of listening ports for outgoing connections to
194
    /// enhance NAT traversal capabilities.
195
    ///
196
    /// Please refer to e.g. [RFC 4787](https://tools.ietf.org/html/rfc4787)
197
    /// section 4 and 5 for some of the NAT terminology used here.
198
    ///
199
    /// There are two main use-cases for port reuse among local
200
    /// sockets:
201
    ///
202
    ///   1. Creating multiple listening sockets for the same address
203
    ///      and port to allow accepting connections on multiple threads
204
    ///      without having to synchronise access to a single listen socket.
205
    ///
206
    ///   2. Creating outgoing connections whose local socket is bound to
207
    ///      the same address and port as a listening socket. In the rare
208
    ///      case of simple NATs with both endpoint-independent mapping and
209
    ///      endpoint-independent filtering, this can on its own already
210
    ///      permit NAT traversal by other nodes sharing the observed
211
    ///      external address of the local node. For the common case of
212
    ///      NATs with address-dependent or address and port-dependent
213
    ///      filtering, port reuse for outgoing connections can facilitate
214
    ///      further TCP hole punching techniques for NATs that perform
215
    ///      endpoint-independent mapping. Port reuse cannot facilitate
216
    ///      NAT traversal in the presence of "symmetric" NATs that employ
217
    ///      both address/port-dependent mapping and filtering, unless
218
    ///      there is some means of port prediction.
219
    ///
220
    /// Both use-cases are enabled when port reuse is enabled, with port reuse
221
    /// for outgoing connections (`2.` above) always being implied.
222
    ///
223
    /// > **Note**: Due to the identification of a TCP socket by a 4-tuple
224
    /// > of source IP address, source port, destination IP address and
225
    /// > destination port, with port reuse enabled there can be only
226
    /// > a single outgoing connection to a particular address and port
227
    /// > of a peer per local listening socket address.
228
    ///
229
    /// [`Transport`] keeps track of the listen socket addresses as they
230
    /// are reported by polling it. It is possible to listen on multiple
231
    /// addresses, enabling port reuse for each, knowing exactly which listen
232
    /// address is reused when dialing with a specific [`Transport`], as in the
233
    /// following example:
234
    ///
235
    /// ```no_run
236
    /// # use futures::StreamExt;
237
    /// # use libp2p_core::transport::{ListenerId, TransportEvent};
238
    /// # use libp2p_core::{Multiaddr, Transport};
239
    /// # use std::pin::Pin;
240
    /// # #[cfg(not(feature = "async-io"))]
241
    /// # fn main() {}
242
    /// #
243
    /// #[cfg(feature = "async-io")]
244
    /// #[async_std::main]
245
    /// async fn main() -> std::io::Result<()> {
246
    ///
247
    /// let listen_addr1: Multiaddr = "/ip4/127.0.0.1/tcp/9001".parse().unwrap();
248
    /// let listen_addr2: Multiaddr = "/ip4/127.0.0.1/tcp/9002".parse().unwrap();
249
    ///
250
    /// let mut tcp1 = libp2p_tcp::async_io::Transport::new(libp2p_tcp::Config::new().port_reuse(true)).boxed();
251
    /// tcp1.listen_on(ListenerId::next(), listen_addr1.clone()).expect("listener");
252
    /// match tcp1.select_next_some().await {
253
    ///     TransportEvent::NewAddress { listen_addr, .. } => {
254
    ///         println!("Listening on {:?}", listen_addr);
255
    ///         let mut stream = tcp1.dial(listen_addr2.clone()).unwrap().await?;
256
    ///         // `stream` has `listen_addr1` as its local socket address.
257
    ///     }
258
    ///     _ => {}
259
    /// }
260
    ///
261
    /// let mut tcp2 = libp2p_tcp::async_io::Transport::new(libp2p_tcp::Config::new().port_reuse(true)).boxed();
262
    /// tcp2.listen_on(ListenerId::next(), listen_addr2).expect("listener");
263
    /// match tcp2.select_next_some().await {
264
    ///     TransportEvent::NewAddress { listen_addr, .. } => {
265
    ///         println!("Listening on {:?}", listen_addr);
266
    ///         let mut socket = tcp2.dial(listen_addr1).unwrap().await?;
267
    ///         // `stream` has `listen_addr2` as its local socket address.
268
    ///     }
269
    ///     _ => {}
270
    /// }
271
    /// Ok(())
272
    /// }
273
    /// ```
274
    ///
275
    /// If a wildcard listen socket address is used to listen on any interface,
276
    /// there can be multiple such addresses registered for port reuse. In this
277
    /// case, one is chosen whose IP protocol version and loopback status is the
278
    /// same as that of the remote address. Consequently, for maximum control of
279
    /// the local listening addresses and ports that are used for outgoing
280
    /// connections, a new [`Transport`] should be created for each listening
281
    /// socket, avoiding the use of wildcard addresses which bind a socket to
282
    /// all network interfaces.
283
    ///
284
    /// When this option is enabled on a unix system, the socket
285
    /// option `SO_REUSEPORT` is set, if available, to permit
286
    /// reuse of listening ports for multiple sockets.
287
    pub fn port_reuse(mut self, port_reuse: bool) -> Self {
288
        self.enable_port_reuse = port_reuse;
289
        self
290
    }
291
}
292

            
293
impl Default for Config {
294
    fn default() -> Self {
295
        Self::new()
296
    }
297
}
298

            
299
/// An abstract [`libp2p_core::Transport`] implementation.
300
///
301
/// You shouldn't need to use this type directly. Use one of the following instead:
302
///
303
/// - [`tokio::Transport`]
304
/// - [`async_io::Transport`]
305
pub struct Transport<T>
306
where
307
    T: Provider + Send,
308
{
309
    config: Config,
310

            
311
    /// The configuration of port reuse when dialing.
312
    port_reuse: PortReuse,
313
    /// All the active listeners.
314
    /// The [`ListenStream`] struct contains a stream that we want to be pinned. Since the `VecDeque`
315
    /// can be resized, the only way is to use a `Pin<Box<>>`.
316
    listeners: SelectAll<ListenStream<T>>,
317
    /// Pending transport events to return from [`libp2p_core::Transport::poll`].
318
    pending_events:
319
        VecDeque<TransportEvent<<Self as libp2p_core::Transport>::ListenerUpgrade, io::Error>>,
320
}
321

            
322
impl<T> Transport<T>
323
where
324
    T: Provider + Send,
325
{
326
    /// Create a new instance of [`Transport`].
327
    ///
328
    /// If you don't want to specify a [`Config`], use [`Transport::default`].
329
    ///
330
    /// It is best to call this function through one of the type-aliases of this type:
331
    ///
332
    /// - [`tokio::Transport::new`]
333
    /// - [`async_io::Transport::new`]
334
    pub fn new(config: Config) -> Self {
335
        let port_reuse = if config.enable_port_reuse {
336
            PortReuse::Enabled {
337
                listen_addrs: Arc::new(RwLock::new(HashSet::new())),
338
            }
339
        } else {
340
            PortReuse::Disabled
341
        };
342
        Transport {
343
            config,
344
            port_reuse,
345
            ..Default::default()
346
        }
347
    }
348

            
349
    fn create_socket(&self, socket_addr: SocketAddr) -> io::Result<Socket> {
350
        let socket = Socket::new(
351
            Domain::for_address(socket_addr),
352
            Type::STREAM,
353
            Some(socket2::Protocol::TCP),
354
        )?;
355
        if socket_addr.is_ipv6() {
356
            socket.set_only_v6(true)?;
357
        }
358
        if let Some(ttl) = self.config.ttl {
359
            socket.set_ttl(ttl)?;
360
        }
361
        if let Some(nodelay) = self.config.nodelay {
362
            socket.set_nodelay(nodelay)?;
363
        }
364
        socket.set_reuse_address(true)?;
365
        #[cfg(unix)]
366
        if let PortReuse::Enabled { .. } = &self.port_reuse {
367
            socket.set_reuse_port(true)?;
368
        }
369
        Ok(socket)
370
    }
371

            
372
    fn do_listen(
373
        &mut self,
374
        id: ListenerId,
375
        socket_addr: SocketAddr,
376
    ) -> io::Result<ListenStream<T>> {
377
        let socket = self.create_socket(socket_addr)?;
378
        socket.bind(&socket_addr.into())?;
379
        socket.listen(self.config.backlog as _)?;
380
        socket.set_nonblocking(true)?;
381
        let listener: TcpListener = socket.into();
382
        let local_addr = listener.local_addr()?;
383

            
384
        if local_addr.ip().is_unspecified() {
385
            return ListenStream::<T>::new(
386
                id,
387
                listener,
388
                Some(T::new_if_watcher()?),
389
                self.port_reuse.clone(),
390
            );
391
        }
392

            
393
        self.port_reuse.register(local_addr.ip(), local_addr.port());
394
        let listen_addr = ip_to_multiaddr(local_addr.ip(), local_addr.port());
395
        self.pending_events.push_back(TransportEvent::NewAddress {
396
            listener_id: id,
397
            listen_addr,
398
        });
399
        ListenStream::<T>::new(id, listener, None, self.port_reuse.clone())
400
    }
401
}
402

            
403
impl<T> Default for Transport<T>
404
where
405
    T: Provider + Send,
406
{
407
    /// Creates a [`Transport`] with reasonable defaults.
408
    ///
409
    /// This transport will have port-reuse disabled.
410
    fn default() -> Self {
411
        let config = Config::default();
412
        let port_reuse = if config.enable_port_reuse {
413
            PortReuse::Enabled {
414
                listen_addrs: Arc::new(RwLock::new(HashSet::new())),
415
            }
416
        } else {
417
            PortReuse::Disabled
418
        };
419
        Transport {
420
            port_reuse,
421
            config,
422
            listeners: SelectAll::new(),
423
            pending_events: VecDeque::new(),
424
        }
425
    }
426
}
427

            
428
impl<T> libp2p_core::Transport for Transport<T>
429
where
430
    T: Provider + Send + 'static,
431
    T::Listener: Unpin,
432
    T::Stream: Unpin,
433
{
434
    type Output = T::Stream;
435
    type Error = io::Error;
436
    type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
437
    type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
438

            
439
    fn listen_on(
440
        &mut self,
441
        id: ListenerId,
442
        addr: Multiaddr,
443
    ) -> Result<(), TransportError<Self::Error>> {
444
        let socket_addr = if let Ok(sa) = multiaddr_to_socketaddr(addr.clone()) {
445
            sa
446
        } else {
447
            return Err(TransportError::MultiaddrNotSupported(addr));
448
        };
449
        log::debug!("listening on {}", socket_addr);
450
        let listener = self
451
            .do_listen(id, socket_addr)
452
            .map_err(TransportError::Other)?;
453
        self.listeners.push(listener);
454
        Ok(())
455
    }
456

            
457
    fn remove_listener(&mut self, id: ListenerId) -> bool {
458
        if let Some(listener) = self.listeners.iter_mut().find(|l| l.listener_id == id) {
459
            listener.close(Ok(()));
460
            true
461
        } else {
462
            false
463
        }
464
    }
465

            
466
    fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
467
        let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(addr.clone()) {
468
            if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() {
469
                return Err(TransportError::MultiaddrNotSupported(addr));
470
            }
471
            socket_addr
472
        } else {
473
            return Err(TransportError::MultiaddrNotSupported(addr));
474
        };
475
        log::debug!("dialing {}", socket_addr);
476

            
477
        let socket = self
478
            .create_socket(socket_addr)
479
            .map_err(TransportError::Other)?;
480

            
481
        if let Some(addr) = self.port_reuse.local_dial_addr(&socket_addr.ip()) {
482
            log::trace!("Binding dial socket to listen socket {}", addr);
483
            socket.bind(&addr.into()).map_err(TransportError::Other)?;
484
        }
485

            
486
        socket
487
            .set_nonblocking(true)
488
            .map_err(TransportError::Other)?;
489

            
490
        Ok(async move {
491
            // [`Transport::dial`] should do no work unless the returned [`Future`] is polled. Thus
492
            // do the `connect` call within the [`Future`].
493
            match socket.connect(&socket_addr.into()) {
494
                Ok(()) => {}
495
                Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {}
496
                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
497
                Err(err) => return Err(err),
498
            };
499

            
500
            let stream = T::new_stream(socket.into()).await?;
501
            Ok(stream)
502
        }
503
        .boxed())
504
    }
505

            
506
    fn dial_as_listener(
507
        &mut self,
508
        addr: Multiaddr,
509
    ) -> Result<Self::Dial, TransportError<Self::Error>> {
510
        self.dial(addr)
511
    }
512

            
513
    /// When port reuse is disabled and hence ephemeral local ports are
514
    /// used for outgoing connections, the returned address is the
515
    /// `observed` address with the port replaced by the port of the
516
    /// `listen` address.
517
    ///
518
    /// If port reuse is enabled, `Some(observed)` is returned, as there
519
    /// is a chance that the `observed` address _and_ port are reachable
520
    /// for other peers if there is a NAT in the way that does endpoint-
521
    /// independent filtering. Furthermore, even if that is not the case
522
    /// and TCP hole punching techniques must be used for NAT traversal,
523
    /// the `observed` address is still the one that a remote should connect
524
    /// to for the purpose of the hole punching procedure, as it represents
525
    /// the mapped IP and port of the NAT device in front of the local
526
    /// node.
527
    ///
528
    /// `None` is returned if one of the given addresses is not a TCP/IP
529
    /// address.
530
    fn address_translation(&self, listen: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
531
        if !is_tcp_addr(listen) || !is_tcp_addr(observed) {
532
            return None;
533
        }
534
        match &self.port_reuse {
535
            PortReuse::Disabled => address_translation(listen, observed),
536
            PortReuse::Enabled { .. } => Some(observed.clone()),
537
        }
538
    }
539

            
540
    /// Poll all listeners.
541
    fn poll(
542
        mut self: Pin<&mut Self>,
543
        cx: &mut Context<'_>,
544
    ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
545
        // Return pending events from closed listeners.
546
        if let Some(event) = self.pending_events.pop_front() {
547
            return Poll::Ready(event);
548
        }
549

            
550
        match self.listeners.poll_next_unpin(cx) {
551
            Poll::Ready(Some(transport_event)) => Poll::Ready(transport_event),
552
            _ => Poll::Pending,
553
        }
554
    }
555
}
556

            
557
/// A stream of incoming connections on one or more interfaces.
558
struct ListenStream<T>
559
where
560
    T: Provider,
561
{
562
    /// The ID of this listener.
563
    listener_id: ListenerId,
564
    /// The socket address that the listening socket is bound to,
565
    /// which may be a "wildcard address" like `INADDR_ANY` or `IN6ADDR_ANY`
566
    /// when listening on all interfaces for IPv4 respectively IPv6 connections.
567
    listen_addr: SocketAddr,
568
    /// The async listening socket for incoming connections.
569
    listener: T::Listener,
570
    /// Watcher for network interface changes.
571
    /// Reports [`IfEvent`]s for new / deleted ip-addresses when interfaces
572
    /// become or stop being available.
573
    ///
574
    /// `None` if the socket is only listening on a single interface.
575
    if_watcher: Option<T::IfWatcher>,
576
    /// The port reuse configuration for outgoing connections.
577
    ///
578
    /// If enabled, all IP addresses on which this listening stream
579
    /// is accepting connections (`in_addr`) are registered for reuse
580
    /// as local addresses for the sockets of outgoing connections. They are
581
    /// unregistered when the stream encounters an error or is dropped.
582
    port_reuse: PortReuse,
583
    /// How long to sleep after a (non-fatal) error while trying
584
    /// to accept a new connection.
585
    sleep_on_error: Duration,
586
    /// The current pause, if any.
587
    pause: Option<Delay>,
588
    /// Pending event to reported.
589
    pending_event: Option<<Self as Stream>::Item>,
590
    /// The listener can be manually closed with [`Transport::remove_listener`](libp2p_core::Transport::remove_listener).
591
    is_closed: bool,
592
    /// The stream must be awaken after it has been closed to deliver the last event.
593
    close_listener_waker: Option<Waker>,
594
}
595

            
596
impl<T> ListenStream<T>
597
where
598
    T: Provider,
599
{
600
    /// Constructs a [`ListenStream`] for incoming connections around
601
    /// the given [`TcpListener`].
602
    fn new(
603
        listener_id: ListenerId,
604
        listener: TcpListener,
605
        if_watcher: Option<T::IfWatcher>,
606
        port_reuse: PortReuse,
607
    ) -> io::Result<Self> {
608
        let listen_addr = listener.local_addr()?;
609
        let listener = T::new_listener(listener)?;
610

            
611
        Ok(ListenStream {
612
            port_reuse,
613
            listener,
614
            listener_id,
615
            listen_addr,
616
            if_watcher,
617
            pause: None,
618
            sleep_on_error: Duration::from_millis(100),
619
            pending_event: None,
620
            is_closed: false,
621
            close_listener_waker: None,
622
        })
623
    }
624

            
625
    /// Disables port reuse for any listen address of this stream.
626
    ///
627
    /// This is done when the [`ListenStream`] encounters a fatal
628
    /// error (for the stream) or is dropped.
629
    ///
630
    /// Has no effect if port reuse is disabled.
631
    fn disable_port_reuse(&mut self) {
632
        match &self.if_watcher {
633
            Some(if_watcher) => {
634
                for ip_net in T::addrs(if_watcher) {
635
                    self.port_reuse
636
                        .unregister(ip_net.addr(), self.listen_addr.port());
637
                }
638
            }
639
            None => self
640
                .port_reuse
641
                .unregister(self.listen_addr.ip(), self.listen_addr.port()),
642
        }
643
    }
644

            
645
    /// Close the listener.
646
    ///
647
    /// This will create a [`TransportEvent::ListenerClosed`] and
648
    /// terminate the stream once the event has been reported.
649
    fn close(&mut self, reason: Result<(), io::Error>) {
650
        if self.is_closed {
651
            return;
652
        }
653
        self.pending_event = Some(TransportEvent::ListenerClosed {
654
            listener_id: self.listener_id,
655
            reason,
656
        });
657
        self.is_closed = true;
658

            
659
        // Wake the stream to deliver the last event.
660
        if let Some(waker) = self.close_listener_waker.take() {
661
            waker.wake();
662
        }
663
    }
664

            
665
    /// Poll for a next If Event.
666
    fn poll_if_addr(&mut self, cx: &mut Context<'_>) -> Poll<<Self as Stream>::Item> {
667
        let if_watcher = match self.if_watcher.as_mut() {
668
            Some(if_watcher) => if_watcher,
669
            None => return Poll::Pending,
670
        };
671

            
672
        let my_listen_addr_port = self.listen_addr.port();
673

            
674
        while let Poll::Ready(Some(event)) = if_watcher.poll_next_unpin(cx) {
675
            match event {
676
                Ok(IfEvent::Up(inet)) => {
677
                    let ip = inet.addr();
678
                    if self.listen_addr.is_ipv4() == ip.is_ipv4() {
679
                        let ma = ip_to_multiaddr(ip, my_listen_addr_port);
680
                        log::debug!("New listen address: {}", ma);
681
                        self.port_reuse.register(ip, my_listen_addr_port);
682
                        return Poll::Ready(TransportEvent::NewAddress {
683
                            listener_id: self.listener_id,
684
                            listen_addr: ma,
685
                        });
686
                    }
687
                }
688
                Ok(IfEvent::Down(inet)) => {
689
                    let ip = inet.addr();
690
                    if self.listen_addr.is_ipv4() == ip.is_ipv4() {
691
                        let ma = ip_to_multiaddr(ip, my_listen_addr_port);
692
                        log::debug!("Expired listen address: {}", ma);
693
                        self.port_reuse.unregister(ip, my_listen_addr_port);
694
                        return Poll::Ready(TransportEvent::AddressExpired {
695
                            listener_id: self.listener_id,
696
                            listen_addr: ma,
697
                        });
698
                    }
699
                }
700
                Err(error) => {
701
                    self.pause = Some(Delay::new(self.sleep_on_error));
702
                    return Poll::Ready(TransportEvent::ListenerError {
703
                        listener_id: self.listener_id,
704
                        error,
705
                    });
706
                }
707
            }
708
        }
709

            
710
        Poll::Pending
711
    }
712
}
713

            
714
impl<T> Drop for ListenStream<T>
715
where
716
    T: Provider,
717
{
718
    fn drop(&mut self) {
719
        self.disable_port_reuse();
720
    }
721
}
722

            
723
impl<T> Stream for ListenStream<T>
724
where
725
    T: Provider,
726
    T::Listener: Unpin,
727
    T::Stream: Unpin,
728
{
729
    type Item = TransportEvent<Ready<Result<T::Stream, io::Error>>, io::Error>;
730

            
731
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
732
        if let Some(mut pause) = self.pause.take() {
733
            match pause.poll_unpin(cx) {
734
                Poll::Ready(_) => {}
735
                Poll::Pending => {
736
                    self.pause = Some(pause);
737
                    return Poll::Pending;
738
                }
739
            }
740
        }
741

            
742
        if let Some(event) = self.pending_event.take() {
743
            return Poll::Ready(Some(event));
744
        }
745

            
746
        if self.is_closed {
747
            // Terminate the stream if the listener closed and all remaining events have been reported.
748
            return Poll::Ready(None);
749
        }
750

            
751
        if let Poll::Ready(event) = self.poll_if_addr(cx) {
752
            return Poll::Ready(Some(event));
753
        }
754

            
755
        // Take the pending connection from the backlog.
756
        match T::poll_accept(&mut self.listener, cx) {
757
            Poll::Ready(Ok(Incoming {
758
                local_addr,
759
                remote_addr,
760
                stream,
761
            })) => {
762
                let local_addr = ip_to_multiaddr(local_addr.ip(), local_addr.port());
763
                let remote_addr = ip_to_multiaddr(remote_addr.ip(), remote_addr.port());
764

            
765
                log::debug!("Incoming connection from {} at {}", remote_addr, local_addr);
766

            
767
                return Poll::Ready(Some(TransportEvent::Incoming {
768
                    listener_id: self.listener_id,
769
                    upgrade: future::ok(stream),
770
                    local_addr,
771
                    send_back_addr: remote_addr,
772
                }));
773
            }
774
            Poll::Ready(Err(error)) => {
775
                // These errors are non-fatal for the listener stream.
776
                self.pause = Some(Delay::new(self.sleep_on_error));
777
                return Poll::Ready(Some(TransportEvent::ListenerError {
778
                    listener_id: self.listener_id,
779
                    error,
780
                }));
781
            }
782
            Poll::Pending => {}
783
        }
784

            
785
        self.close_listener_waker = Some(cx.waker().clone());
786
        Poll::Pending
787
    }
788
}
789

            
790
/// Extracts a `SocketAddr` from a given `Multiaddr`.
791
///
792
/// Fails if the given `Multiaddr` does not begin with an IP
793
/// protocol encapsulating a TCP port.
794
fn multiaddr_to_socketaddr(mut addr: Multiaddr) -> Result<SocketAddr, ()> {
795
    // "Pop" the IP address and TCP port from the end of the address,
796
    // ignoring a `/p2p/...` suffix as well as any prefix of possibly
797
    // outer protocols, if present.
798
    let mut port = None;
799
    while let Some(proto) = addr.pop() {
800
        match proto {
801
            Protocol::Ip4(ipv4) => match port {
802
                Some(port) => return Ok(SocketAddr::new(ipv4.into(), port)),
803
                None => return Err(()),
804
            },
805
            Protocol::Ip6(ipv6) => match port {
806
                Some(port) => return Ok(SocketAddr::new(ipv6.into(), port)),
807
                None => return Err(()),
808
            },
809
            Protocol::Tcp(portnum) => match port {
810
                Some(_) => return Err(()),
811
                None => port = Some(portnum),
812
            },
813
            Protocol::P2p(_) => {}
814
            _ => return Err(()),
815
        }
816
    }
817
    Err(())
818
}
819

            
820
// Create a [`Multiaddr`] from the given IP address and port number.
821
fn ip_to_multiaddr(ip: IpAddr, port: u16) -> Multiaddr {
822
    Multiaddr::empty().with(ip.into()).with(Protocol::Tcp(port))
823
}
824

            
825
fn is_tcp_addr(addr: &Multiaddr) -> bool {
826
    use Protocol::*;
827

            
828
    let mut iter = addr.iter();
829

            
830
    let first = match iter.next() {
831
        None => return false,
832
        Some(p) => p,
833
    };
834
    let second = match iter.next() {
835
        None => return false,
836
        Some(p) => p,
837
    };
838

            
839
    matches!(first, Ip4(_) | Ip6(_) | Dns(_) | Dns4(_) | Dns6(_)) && matches!(second, Tcp(_))
840
}
841

            
842
#[cfg(test)]
843
mod tests {
844
    use super::*;
845
    use futures::{
846
        channel::{mpsc, oneshot},
847
        future::poll_fn,
848
    };
849
    use libp2p_core::Transport as _;
850
    use libp2p_identity::PeerId;
851

            
852
    #[test]
853
    fn multiaddr_to_tcp_conversion() {
854
        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
855

            
856
        assert!(
857
            multiaddr_to_socketaddr("/ip4/127.0.0.1/udp/1234".parse::<Multiaddr>().unwrap())
858
                .is_err()
859
        );
860

            
861
        assert_eq!(
862
            multiaddr_to_socketaddr("/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap()),
863
            Ok(SocketAddr::new(
864
                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
865
                12345,
866
            ))
867
        );
868
        assert_eq!(
869
            multiaddr_to_socketaddr(
870
                "/ip4/255.255.255.255/tcp/8080"
871
                    .parse::<Multiaddr>()
872
                    .unwrap()
873
            ),
874
            Ok(SocketAddr::new(
875
                IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)),
876
                8080,
877
            ))
878
        );
879
        assert_eq!(
880
            multiaddr_to_socketaddr("/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap()),
881
            Ok(SocketAddr::new(
882
                IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
883
                12345,
884
            ))
885
        );
886
        assert_eq!(
887
            multiaddr_to_socketaddr(
888
                "/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080"
889
                    .parse::<Multiaddr>()
890
                    .unwrap()
891
            ),
892
            Ok(SocketAddr::new(
893
                IpAddr::V6(Ipv6Addr::new(
894
                    65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535,
895
                )),
896
                8080,
897
            ))
898
        );
899
    }
900

            
901
    #[test]
902
    fn communicating_between_dialer_and_listener() {
903
        env_logger::try_init().ok();
904

            
905
        async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
906
            let mut tcp = Transport::<T>::default().boxed();
907
            tcp.listen_on(ListenerId::next(), addr).unwrap();
908
            loop {
909
                match tcp.select_next_some().await {
910
                    TransportEvent::NewAddress { listen_addr, .. } => {
911
                        ready_tx.send(listen_addr).await.unwrap();
912
                    }
913
                    TransportEvent::Incoming { upgrade, .. } => {
914
                        let mut upgrade = upgrade.await.unwrap();
915
                        let mut buf = [0u8; 3];
916
                        upgrade.read_exact(&mut buf).await.unwrap();
917
                        assert_eq!(buf, [1, 2, 3]);
918
                        upgrade.write_all(&[4, 5, 6]).await.unwrap();
919
                        return;
920
                    }
921
                    e => panic!("Unexpected transport event: {e:?}"),
922
                }
923
            }
924
        }
925

            
926
        async fn dialer<T: Provider>(mut ready_rx: mpsc::Receiver<Multiaddr>) {
927
            let addr = ready_rx.next().await.unwrap();
928
            let mut tcp = Transport::<T>::default();
929

            
930
            // Obtain a future socket through dialing
931
            let mut socket = tcp.dial(addr.clone()).unwrap().await.unwrap();
932
            socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
933

            
934
            let mut buf = [0u8; 3];
935
            socket.read_exact(&mut buf).await.unwrap();
936
            assert_eq!(buf, [4, 5, 6]);
937
        }
938

            
939
        fn test(addr: Multiaddr) {
940
            #[cfg(feature = "async-io")]
941
            {
942
                let (ready_tx, ready_rx) = mpsc::channel(1);
943
                let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx);
944
                let dialer = dialer::<async_io::Tcp>(ready_rx);
945
                let listener = async_std::task::spawn(listener);
946
                async_std::task::block_on(dialer);
947
                async_std::task::block_on(listener);
948
            }
949

            
950
            #[cfg(feature = "tokio")]
951
            {
952
                let (ready_tx, ready_rx) = mpsc::channel(1);
953
                let listener = listener::<tokio::Tcp>(addr, ready_tx);
954
                let dialer = dialer::<tokio::Tcp>(ready_rx);
955
                let rt = ::tokio::runtime::Builder::new_current_thread()
956
                    .enable_io()
957
                    .build()
958
                    .unwrap();
959
                let tasks = ::tokio::task::LocalSet::new();
960
                let listener = tasks.spawn_local(listener);
961
                tasks.block_on(&rt, dialer);
962
                tasks.block_on(&rt, listener).unwrap();
963
            }
964
        }
965

            
966
        test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
967
        test("/ip6/::1/tcp/0".parse().unwrap());
968
    }
969

            
970
    #[test]
971
    fn wildcard_expansion() {
972
        env_logger::try_init().ok();
973

            
974
        async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
975
            let mut tcp = Transport::<T>::default().boxed();
976
            tcp.listen_on(ListenerId::next(), addr).unwrap();
977

            
978
            loop {
979
                match tcp.select_next_some().await {
980
                    TransportEvent::NewAddress { listen_addr, .. } => {
981
                        let mut iter = listen_addr.iter();
982
                        match iter.next().expect("ip address") {
983
                            Protocol::Ip4(ip) => assert!(!ip.is_unspecified()),
984
                            Protocol::Ip6(ip) => assert!(!ip.is_unspecified()),
985
                            other => panic!("Unexpected protocol: {other}"),
986
                        }
987
                        if let Protocol::Tcp(port) = iter.next().expect("port") {
988
                            assert_ne!(0, port)
989
                        } else {
990
                            panic!("No TCP port in address: {listen_addr}")
991
                        }
992
                        ready_tx.send(listen_addr).await.ok();
993
                    }
994
                    TransportEvent::Incoming { .. } => {
995
                        return;
996
                    }
997
                    _ => {}
998
                }
999
            }
        }
        async fn dialer<T: Provider>(mut ready_rx: mpsc::Receiver<Multiaddr>) {
            let dest_addr = ready_rx.next().await.unwrap();
            let mut tcp = Transport::<T>::default();
            tcp.dial(dest_addr).unwrap().await.unwrap();
        }
        fn test(addr: Multiaddr) {
            #[cfg(feature = "async-io")]
            {
                let (ready_tx, ready_rx) = mpsc::channel(1);
                let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx);
                let dialer = dialer::<async_io::Tcp>(ready_rx);
                let listener = async_std::task::spawn(listener);
                async_std::task::block_on(dialer);
                async_std::task::block_on(listener);
            }
            #[cfg(feature = "tokio")]
            {
                let (ready_tx, ready_rx) = mpsc::channel(1);
                let listener = listener::<tokio::Tcp>(addr, ready_tx);
                let dialer = dialer::<tokio::Tcp>(ready_rx);
                let rt = ::tokio::runtime::Builder::new_current_thread()
                    .enable_io()
                    .build()
                    .unwrap();
                let tasks = ::tokio::task::LocalSet::new();
                let listener = tasks.spawn_local(listener);
                tasks.block_on(&rt, dialer);
                tasks.block_on(&rt, listener).unwrap();
            }
        }
        test("/ip4/0.0.0.0/tcp/0".parse().unwrap());
        test("/ip6/::1/tcp/0".parse().unwrap());
    }
    #[test]
    fn port_reuse_dialing() {
        env_logger::try_init().ok();
        async fn listener<T: Provider>(
            addr: Multiaddr,
            mut ready_tx: mpsc::Sender<Multiaddr>,
            port_reuse_rx: oneshot::Receiver<Protocol<'_>>,
        ) {
            let mut tcp = Transport::<T>::new(Config::new()).boxed();
            tcp.listen_on(ListenerId::next(), addr).unwrap();
            loop {
                match tcp.select_next_some().await {
                    TransportEvent::NewAddress { listen_addr, .. } => {
                        ready_tx.send(listen_addr).await.ok();
                    }
                    TransportEvent::Incoming {
                        upgrade,
                        mut send_back_addr,
                        ..
                    } => {
                        // Receive the dialer tcp port reuse
                        let remote_port_reuse = port_reuse_rx.await.unwrap();
                        // And check it is the same as the remote port used for upgrade
                        assert_eq!(send_back_addr.pop().unwrap(), remote_port_reuse);
                        let mut upgrade = upgrade.await.unwrap();
                        let mut buf = [0u8; 3];
                        upgrade.read_exact(&mut buf).await.unwrap();
                        assert_eq!(buf, [1, 2, 3]);
                        upgrade.write_all(&[4, 5, 6]).await.unwrap();
                        return;
                    }
                    e => panic!("Unexpected event: {e:?}"),
                }
            }
        }
        async fn dialer<T: Provider>(
            addr: Multiaddr,
            mut ready_rx: mpsc::Receiver<Multiaddr>,
            port_reuse_tx: oneshot::Sender<Protocol<'_>>,
        ) {
            let dest_addr = ready_rx.next().await.unwrap();
            let mut tcp = Transport::<T>::new(Config::new().port_reuse(true));
            tcp.listen_on(ListenerId::next(), addr).unwrap();
            match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
                TransportEvent::NewAddress { .. } => {
                    // Check that tcp and listener share the same port reuse SocketAddr
                    let listener = tcp.listeners.iter().next().unwrap();
                    let port_reuse_tcp = tcp.port_reuse.local_dial_addr(&listener.listen_addr.ip());
                    let port_reuse_listener = listener
                        .port_reuse
                        .local_dial_addr(&listener.listen_addr.ip());
                    assert!(port_reuse_tcp.is_some());
                    assert_eq!(port_reuse_tcp, port_reuse_listener);
                    // Send the dialer tcp port reuse to the listener
                    port_reuse_tx
                        .send(Protocol::Tcp(port_reuse_tcp.unwrap().port()))
                        .ok();
                    // Obtain a future socket through dialing
                    let mut socket = tcp.dial(dest_addr).unwrap().await.unwrap();
                    socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
                    // socket.flush().await;
                    let mut buf = [0u8; 3];
                    socket.read_exact(&mut buf).await.unwrap();
                    assert_eq!(buf, [4, 5, 6]);
                }
                e => panic!("Unexpected transport event: {e:?}"),
            }
        }
        fn test(addr: Multiaddr) {
            #[cfg(feature = "async-io")]
            {
                let (ready_tx, ready_rx) = mpsc::channel(1);
                let (port_reuse_tx, port_reuse_rx) = oneshot::channel();
                let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx, port_reuse_rx);
                let dialer = dialer::<async_io::Tcp>(addr.clone(), ready_rx, port_reuse_tx);
                let listener = async_std::task::spawn(listener);
                async_std::task::block_on(dialer);
                async_std::task::block_on(listener);
            }
            #[cfg(feature = "tokio")]
            {
                let (ready_tx, ready_rx) = mpsc::channel(1);
                let (port_reuse_tx, port_reuse_rx) = oneshot::channel();
                let listener = listener::<tokio::Tcp>(addr.clone(), ready_tx, port_reuse_rx);
                let dialer = dialer::<tokio::Tcp>(addr, ready_rx, port_reuse_tx);
                let rt = ::tokio::runtime::Builder::new_current_thread()
                    .enable_io()
                    .build()
                    .unwrap();
                let tasks = ::tokio::task::LocalSet::new();
                let listener = tasks.spawn_local(listener);
                tasks.block_on(&rt, dialer);
                tasks.block_on(&rt, listener).unwrap();
            }
        }
        test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
        test("/ip6/::1/tcp/0".parse().unwrap());
    }
    #[test]
    fn port_reuse_listening() {
        env_logger::try_init().ok();
        async fn listen_twice<T: Provider>(addr: Multiaddr) {
            let mut tcp = Transport::<T>::new(Config::new().port_reuse(true));
            tcp.listen_on(ListenerId::next(), addr).unwrap();
            match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
                TransportEvent::NewAddress {
                    listen_addr: addr1, ..
                } => {
                    let listener1 = tcp.listeners.iter().next().unwrap();
                    let port_reuse_tcp =
                        tcp.port_reuse.local_dial_addr(&listener1.listen_addr.ip());
                    let port_reuse_listener1 = listener1
                        .port_reuse
                        .local_dial_addr(&listener1.listen_addr.ip());
                    assert!(port_reuse_tcp.is_some());
                    assert_eq!(port_reuse_tcp, port_reuse_listener1);
                    // Listen on the same address a second time.
                    tcp.listen_on(ListenerId::next(), addr1.clone()).unwrap();
                    match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
                        TransportEvent::NewAddress {
                            listen_addr: addr2, ..
                        } => assert_eq!(addr1, addr2),
                        e => panic!("Unexpected transport event: {e:?}"),
                    }
                }
                e => panic!("Unexpected transport event: {e:?}"),
            }
        }
        fn test(addr: Multiaddr) {
            #[cfg(feature = "async-io")]
            {
                let listener = listen_twice::<async_io::Tcp>(addr.clone());
                async_std::task::block_on(listener);
            }
            #[cfg(feature = "tokio")]
            {
                let listener = listen_twice::<tokio::Tcp>(addr);
                let rt = ::tokio::runtime::Builder::new_current_thread()
                    .enable_io()
                    .build()
                    .unwrap();
                rt.block_on(listener);
            }
        }
        test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
    }
    #[test]
    fn listen_port_0() {
        env_logger::try_init().ok();
        async fn listen<T: Provider>(addr: Multiaddr) -> Multiaddr {
            let mut tcp = Transport::<T>::default().boxed();
            tcp.listen_on(ListenerId::next(), addr).unwrap();
            tcp.select_next_some()
                .await
                .into_new_address()
                .expect("listen address")
        }
        fn test(addr: Multiaddr) {
            #[cfg(feature = "async-io")]
            {
                let new_addr = async_std::task::block_on(listen::<async_io::Tcp>(addr.clone()));
                assert!(!new_addr.to_string().contains("tcp/0"));
            }
            #[cfg(feature = "tokio")]
            {
                let rt = ::tokio::runtime::Builder::new_current_thread()
                    .enable_io()
                    .build()
                    .unwrap();
                let new_addr = rt.block_on(listen::<tokio::Tcp>(addr));
                assert!(!new_addr.to_string().contains("tcp/0"));
            }
        }
        test("/ip6/::1/tcp/0".parse().unwrap());
        test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
    }
    #[test]
    fn listen_invalid_addr() {
        env_logger::try_init().ok();
        fn test(addr: Multiaddr) {
            #[cfg(feature = "async-io")]
            {
                let mut tcp = async_io::Transport::default();
                assert!(tcp.listen_on(ListenerId::next(), addr.clone()).is_err());
            }
            #[cfg(feature = "tokio")]
            {
                let mut tcp = tokio::Transport::default();
                assert!(tcp.listen_on(ListenerId::next(), addr).is_err());
            }
        }
        test("/ip4/127.0.0.1/tcp/12345/tcp/12345".parse().unwrap());
    }
    #[cfg(feature = "async-io")]
    #[test]
    fn test_address_translation_async_io() {
        test_address_translation::<async_io::Transport>()
    }
    #[cfg(feature = "tokio")]
    #[test]
    fn test_address_translation_tokio() {
        test_address_translation::<tokio::Transport>()
    }
    fn test_address_translation<T>()
    where
        T: Default + libp2p_core::Transport,
    {
        let transport = T::default();
        let port = 42;
        let tcp_listen_addr = Multiaddr::empty()
            .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
            .with(Protocol::Tcp(port));
        let observed_ip = Ipv4Addr::new(123, 45, 67, 8);
        let tcp_observed_addr = Multiaddr::empty()
            .with(Protocol::Ip4(observed_ip))
            .with(Protocol::Tcp(1))
            .with(Protocol::P2p(PeerId::random()));
        let translated = transport
            .address_translation(&tcp_listen_addr, &tcp_observed_addr)
            .unwrap();
        let mut iter = translated.iter();
        assert_eq!(iter.next(), Some(Protocol::Ip4(observed_ip)));
        assert_eq!(iter.next(), Some(Protocol::Tcp(port)));
        assert_eq!(iter.next(), None);
        let quic_addr = Multiaddr::empty()
            .with(Protocol::Ip4(Ipv4Addr::new(87, 65, 43, 21)))
            .with(Protocol::Udp(1))
            .with(Protocol::QuicV1);
        assert!(transport
            .address_translation(&tcp_listen_addr, &quic_addr)
            .is_none());
        assert!(transport
            .address_translation(&quic_addr, &tcp_observed_addr)
            .is_none());
    }
    #[test]
    fn test_remove_listener() {
        env_logger::try_init().ok();
        async fn cycle_listeners<T: Provider>() -> bool {
            let mut tcp = Transport::<T>::default().boxed();
            let listener_id = ListenerId::next();
            tcp.listen_on(listener_id, "/ip4/127.0.0.1/tcp/0".parse().unwrap())
                .unwrap();
            tcp.remove_listener(listener_id)
        }
        #[cfg(feature = "async-io")]
        {
            assert!(async_std::task::block_on(cycle_listeners::<async_io::Tcp>()));
        }
        #[cfg(feature = "tokio")]
        {
            let rt = ::tokio::runtime::Builder::new_current_thread()
                .enable_io()
                .build()
                .unwrap();
            assert!(rt.block_on(cycle_listeners::<tokio::Tcp>()));
        }
    }
    #[test]
    fn test_listens_ipv4_ipv6_separately() {
        fn test<T: Provider>() {
            let port = {
                let listener = TcpListener::bind("127.0.0.1:0").unwrap();
                listener.local_addr().unwrap().port()
            };
            let mut tcp = Transport::<T>::default().boxed();
            let listener_id = ListenerId::next();
            tcp.listen_on(
                listener_id,
                format!("/ip4/0.0.0.0/tcp/{port}").parse().unwrap(),
            )
            .unwrap();
            tcp.listen_on(
                ListenerId::next(),
                format!("/ip6/::/tcp/{port}").parse().unwrap(),
            )
            .unwrap();
        }
        #[cfg(feature = "async-io")]
        {
            async_std::task::block_on(async {
                test::<async_io::Tcp>();
            })
        }
        #[cfg(feature = "tokio")]
        {
            let rt = ::tokio::runtime::Builder::new_current_thread()
                .enable_io()
                .build()
                .unwrap();
            rt.block_on(async {
                test::<async_io::Tcp>();
            });
        }
    }
}