1
// This file is part of Substrate.
2

            
3
// Copyright (C) Parity Technologies (UK) Ltd.
4
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5

            
6
// This program is free software: you can redistribute it and/or modify
7
// it under the terms of the GNU General Public License as published by
8
// the Free Software Foundation, either version 3 of the License, or
9
// (at your option) any later version.
10

            
11
// This program is distributed in the hope that it will be useful,
12
// but WITHOUT ANY WARRANTY; without even the implied warranty of
13
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
// GNU General Public License for more details.
15

            
16
// You should have received a copy of the GNU General Public License
17
// along with this program. If not, see <https://www.gnu.org/licenses/>.
18

            
19
use crate::TelemetryPayload;
20
use futures::{channel::mpsc, prelude::*};
21
use libp2p::{core::transport::Transport, Multiaddr};
22
use rand::Rng as _;
23
use std::{
24
	fmt, mem,
25
	pin::Pin,
26
	task::{Context, Poll},
27
	time::Duration,
28
};
29
use wasm_timer::Delay;
30

            
31
pub(crate) type ConnectionNotifierSender = mpsc::Sender<()>;
32
pub(crate) type ConnectionNotifierReceiver = mpsc::Receiver<()>;
33

            
34
pub(crate) fn connection_notifier_channel() -> (ConnectionNotifierSender, ConnectionNotifierReceiver)
35
{
36
	mpsc::channel(0)
37
}
38

            
39
/// Handler for a single telemetry node.
40
///
41
/// This is a wrapper `Sink` around a network `Sink` with 3 particularities:
42
///  - It is infallible: if the connection stops, it will reconnect automatically when the server
43
///    becomes available again.
44
///  - It holds a list of "connection messages" which are sent automatically when the connection is
45
///    (re-)established. This is used for the "system.connected" message that needs to be send for
46
///    every substrate node that connects.
47
///  - It doesn't stay in pending while waiting for connection. Instead, it moves data into the void
48
///    if the connection could not be established. This is important for the `Dispatcher` `Sink`
49
///    which we don't want to block if one connection is broken.
50
#[derive(Debug)]
51
pub(crate) struct Node<TTrans: Transport> {
52
	/// Address of the node.
53
	addr: Multiaddr,
54
	/// State of the connection.
55
	socket: NodeSocket<TTrans>,
56
	/// Transport used to establish new connections.
57
	transport: TTrans,
58
	/// Messages that are sent when the connection (re-)establishes.
59
	pub(crate) connection_messages: Vec<TelemetryPayload>,
60
	/// Notifier for when the connection (re-)establishes.
61
	pub(crate) telemetry_connection_notifier: Vec<ConnectionNotifierSender>,
62
}
63

            
64
enum NodeSocket<TTrans: Transport> {
65
	/// We're connected to the node. This is the normal state.
66
	Connected(NodeSocketConnected<TTrans>),
67
	/// We are currently dialing the node.
68
	Dialing(TTrans::Dial),
69
	/// A new connection should be started as soon as possible.
70
	ReconnectNow,
71
	/// Waiting before attempting to dial again.
72
	WaitingReconnect(Delay),
73
	/// Temporary transition state.
74
	Poisoned,
75
}
76

            
77
impl<TTrans: Transport> NodeSocket<TTrans> {
78
	fn wait_reconnect() -> NodeSocket<TTrans> {
79
		let random_delay = rand::thread_rng().gen_range(10..20);
80
		let delay = Delay::new(Duration::from_secs(random_delay));
81
		log::trace!(target: "telemetry", "Pausing for {} secs before reconnecting", random_delay);
82
		NodeSocket::WaitingReconnect(delay)
83
	}
84
}
85

            
86
struct NodeSocketConnected<TTrans: Transport> {
87
	/// Where to send data.
88
	sink: TTrans::Output,
89
	/// Queue of packets to send before accepting new packets.
90
	buf: Vec<Vec<u8>>,
91
}
92

            
93
impl<TTrans: Transport> Node<TTrans> {
94
	/// Builds a new node handler.
95
	pub(crate) fn new(
96
		transport: TTrans,
97
		addr: Multiaddr,
98
		connection_messages: Vec<serde_json::Map<String, serde_json::Value>>,
99
		telemetry_connection_notifier: Vec<ConnectionNotifierSender>,
100
	) -> Self {
101
		Node {
102
			addr,
103
			socket: NodeSocket::ReconnectNow,
104
			transport,
105
			connection_messages,
106
			telemetry_connection_notifier,
107
		}
108
	}
109
}
110

            
111
impl<TTrans: Transport, TSinkErr> Node<TTrans>
112
where
113
	TTrans::Dial: Unpin,
114
	TTrans::Output:
115
		Sink<Vec<u8>, Error = TSinkErr> + Stream<Item = Result<Vec<u8>, TSinkErr>> + Unpin,
116
	TSinkErr: fmt::Debug,
117
{
118
	// NOTE: this code has been inspired from `Buffer` (`futures_util::sink::Buffer`).
119
	//       https://docs.rs/futures-util/0.3.8/src/futures_util/sink/buffer.rs.html#32
120
	fn try_send_connection_messages(
121
		self: Pin<&mut Self>,
122
		cx: &mut Context<'_>,
123
		conn: &mut NodeSocketConnected<TTrans>,
124
	) -> Poll<Result<(), TSinkErr>> {
125
		while let Some(item) = conn.buf.pop() {
126
			if let Err(e) = conn.sink.start_send_unpin(item) {
127
				return Poll::Ready(Err(e))
128
			}
129
			futures::ready!(conn.sink.poll_ready_unpin(cx))?;
130
		}
131
		Poll::Ready(Ok(()))
132
	}
133
}
134

            
135
pub(crate) enum Infallible {}
136

            
137
impl<TTrans: Transport, TSinkErr> Sink<TelemetryPayload> for Node<TTrans>
138
where
139
	TTrans: Unpin,
140
	TTrans::Dial: Unpin,
141
	TTrans::Output:
142
		Sink<Vec<u8>, Error = TSinkErr> + Stream<Item = Result<Vec<u8>, TSinkErr>> + Unpin,
143
	TSinkErr: fmt::Debug,
144
{
145
	type Error = Infallible;
146

            
147
	fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
148
		let mut socket = mem::replace(&mut self.socket, NodeSocket::Poisoned);
149
		self.socket = loop {
150
			match socket {
151
				NodeSocket::Connected(mut conn) => match conn.sink.poll_ready_unpin(cx) {
152
					Poll::Ready(Ok(())) => {
153
						match self.as_mut().try_send_connection_messages(cx, &mut conn) {
154
							Poll::Ready(Err(err)) => {
155
								log::warn!(target: "telemetry", "⚠️  Disconnected from {}: {:?}", self.addr, err);
156
								socket = NodeSocket::wait_reconnect();
157
							},
158
							Poll::Ready(Ok(())) => {
159
								self.socket = NodeSocket::Connected(conn);
160
								return Poll::Ready(Ok(()))
161
							},
162
							Poll::Pending => {
163
								self.socket = NodeSocket::Connected(conn);
164
								return Poll::Pending
165
							},
166
						}
167
					},
168
					Poll::Ready(Err(err)) => {
169
						log::warn!(target: "telemetry", "⚠️  Disconnected from {}: {:?}", self.addr, err);
170
						socket = NodeSocket::wait_reconnect();
171
					},
172
					Poll::Pending => {
173
						self.socket = NodeSocket::Connected(conn);
174
						return Poll::Pending
175
					},
176
				},
177
				NodeSocket::Dialing(mut s) => match Future::poll(Pin::new(&mut s), cx) {
178
					Poll::Ready(Ok(sink)) => {
179
						log::debug!(target: "telemetry", "✅ Connected to {}", self.addr);
180

            
181
						{
182
							let mut index = 0;
183
							while index < self.telemetry_connection_notifier.len() {
184
								let sender = &mut self.telemetry_connection_notifier[index];
185
								if let Err(error) = sender.try_send(()) {
186
									if !error.is_disconnected() {
187
										log::debug!(target: "telemetry", "Failed to send a telemetry connection notification: {}", error);
188
									} else {
189
										self.telemetry_connection_notifier.swap_remove(index);
190
										continue
191
									}
192
								}
193
								index += 1;
194
							}
195
						}
196

            
197
						let buf = self
198
							.connection_messages
199
							.iter()
200
							.map(|json| {
201
								let mut json = json.clone();
202
								json.insert(
203
									"ts".to_string(),
204
									chrono::Local::now().to_rfc3339().into(),
205
								);
206
								json
207
							})
208
							.filter_map(|json| match serde_json::to_vec(&json) {
209
								Ok(message) => Some(message),
210
								Err(err) => {
211
									log::error!(
212
										target: "telemetry",
213
										"An error occurred while generating new connection \
214
										messages: {}",
215
										err,
216
									);
217
									None
218
								},
219
							})
220
							.collect();
221

            
222
						socket = NodeSocket::Connected(NodeSocketConnected { sink, buf });
223
					},
224
					Poll::Pending => break NodeSocket::Dialing(s),
225
					Poll::Ready(Err(err)) => {
226
						log::warn!(target: "telemetry", "❌ Error while dialing {}: {:?}", self.addr, err);
227
						socket = NodeSocket::wait_reconnect();
228
					},
229
				},
230
				NodeSocket::ReconnectNow => {
231
					let addr = self.addr.clone();
232
					match self.transport.dial(addr) {
233
						Ok(d) => {
234
							log::trace!(target: "telemetry", "Re-dialing {}", self.addr);
235
							socket = NodeSocket::Dialing(d);
236
						},
237
						Err(err) => {
238
							log::warn!(target: "telemetry", "❌ Error while re-dialing {}: {:?}", self.addr, err);
239
							socket = NodeSocket::wait_reconnect();
240
						},
241
					}
242
				},
243
				NodeSocket::WaitingReconnect(mut s) => {
244
					if Future::poll(Pin::new(&mut s), cx).is_ready() {
245
						socket = NodeSocket::ReconnectNow;
246
					} else {
247
						break NodeSocket::WaitingReconnect(s)
248
					}
249
				},
250
				NodeSocket::Poisoned => {
251
					log::error!(target: "telemetry", "‼️ Poisoned connection with {}", self.addr);
252
					break NodeSocket::Poisoned
253
				},
254
			}
255
		};
256

            
257
		// The Dispatcher blocks when the Node syncs blocks. This is why it is important that the
258
		// Node sinks don't go into "Pending" state while waiting for reconnection but rather
259
		// discard the excess of telemetry messages.
260
		Poll::Ready(Ok(()))
261
	}
262

            
263
	fn start_send(mut self: Pin<&mut Self>, item: TelemetryPayload) -> Result<(), Self::Error> {
264
		// Any buffered outgoing telemetry messages are discarded while (re-)connecting.
265
		match &mut self.socket {
266
			NodeSocket::Connected(conn) => match serde_json::to_vec(&item) {
267
				Ok(data) => {
268
					log::trace!(target: "telemetry", "Sending {} bytes", data.len());
269
					let _ = conn.sink.start_send_unpin(data);
270
				},
271
				Err(err) => log::debug!(
272
					target: "telemetry",
273
					"Could not serialize payload: {}",
274
					err,
275
				),
276
			},
277
			// We are currently dialing the node.
278
			NodeSocket::Dialing(_) => log::trace!(target: "telemetry", "Dialing"),
279
			// A new connection should be started as soon as possible.
280
			NodeSocket::ReconnectNow => log::trace!(target: "telemetry", "Reconnecting"),
281
			// Waiting before attempting to dial again.
282
			NodeSocket::WaitingReconnect(_) => {},
283
			// Temporary transition state.
284
			NodeSocket::Poisoned => log::trace!(target: "telemetry", "Poisoned"),
285
		}
286
		Ok(())
287
	}
288

            
289
	fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
290
		match &mut self.socket {
291
			NodeSocket::Connected(conn) => match conn.sink.poll_flush_unpin(cx) {
292
				Poll::Ready(Err(e)) => {
293
					// When `telemetry` closes the websocket connection we end
294
					// up here, which is sub-optimal. See
295
					// https://github.com/libp2p/rust-libp2p/issues/2021 for
296
					// what we could do to improve this.
297
					log::trace!(target: "telemetry", "[poll_flush] Error: {:?}", e);
298
					self.socket = NodeSocket::wait_reconnect();
299
					Poll::Ready(Ok(()))
300
				},
301
				Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
302
				Poll::Pending => Poll::Pending,
303
			},
304
			_ => Poll::Ready(Ok(())),
305
		}
306
	}
307

            
308
	fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
309
		match &mut self.socket {
310
			NodeSocket::Connected(conn) => conn.sink.poll_close_unpin(cx).map(|_| Ok(())),
311
			_ => Poll::Ready(Ok(())),
312
		}
313
	}
314
}
315

            
316
impl<TTrans: Transport> fmt::Debug for NodeSocket<TTrans> {
317
	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
318
		use NodeSocket::*;
319
		f.write_str(match self {
320
			Connected(_) => "Connected",
321
			Dialing(_) => "Dialing",
322
			ReconnectNow => "ReconnectNow",
323
			WaitingReconnect(_) => "WaitingReconnect",
324
			Poisoned => "Poisoned",
325
		})
326
	}
327
}