1
// This file is part of Substrate.
2

            
3
// Copyright (C) Parity Technologies (UK) Ltd.
4
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5

            
6
// This program is free software: you can redistribute it and/or modify
7
// it under the terms of the GNU General Public License as published by
8
// the Free Software Foundation, either version 3 of the License, or
9
// (at your option) any later version.
10

            
11
// This program is distributed in the hope that it will be useful,
12
// but WITHOUT ANY WARRANTY; without even the implied warranty of
13
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
// GNU General Public License for more details.
15

            
16
// You should have received a copy of the GNU General Public License
17
// along with this program. If not, see <https://www.gnu.org/licenses/>.
18

            
19
//! Substrate's client telemetry is a part of substrate that allows ingesting telemetry data
20
//! with for example [Polkadot telemetry](https://github.com/paritytech/substrate-telemetry).
21
//!
22
//! It works using Tokio's [tracing](https://github.com/tokio-rs/tracing/) library. The telemetry
23
//! information uses tracing's logging to report the telemetry data which is then retrieved by a
24
//! tracing `Layer`. This layer will then send the data through an asynchronous channel to a
25
//! background task called [`TelemetryWorker`] which will send the information to the configured
26
//! remote telemetry servers.
27
//!
28
//! If multiple substrate nodes are running in the same process, it uses a `tracing::Span` to
29
//! identify which substrate node is reporting the telemetry. Every task spawned using sc-service's
30
//! `TaskManager` automatically inherit this span.
31
//!
32
//! Substrate's nodes initialize/register with the [`TelemetryWorker`] using a
33
//! [`TelemetryWorkerHandle`]. This handle can be cloned and passed around. It uses an asynchronous
34
//! channel to communicate with the running [`TelemetryWorker`] dedicated to registration.
35
//! Registering can happen at any point in time during the process execution.
36

            
37
#![warn(missing_docs)]
38

            
39
use futures::{channel::mpsc, prelude::*};
40
use libp2p::Multiaddr;
41
use log::{error, warn};
42
use parking_lot::Mutex;
43
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
44
use serde::Serialize;
45
use std::{
46
	collections::{
47
		hash_map::Entry::{Occupied, Vacant},
48
		HashMap,
49
	},
50
	sync::{atomic, Arc},
51
};
52

            
53
pub use log;
54
pub use serde_json;
55

            
56
mod endpoints;
57
mod error;
58
mod node;
59
mod transport;
60

            
61
pub use endpoints::*;
62
pub use error::*;
63
use node::*;
64
use transport::*;
65

            
66
/// Substrate DEBUG log level.
67
pub const SUBSTRATE_DEBUG: VerbosityLevel = 9;
68
/// Substrate INFO log level.
69
pub const SUBSTRATE_INFO: VerbosityLevel = 0;
70

            
71
/// Consensus TRACE log level.
72
pub const CONSENSUS_TRACE: VerbosityLevel = 9;
73
/// Consensus DEBUG log level.
74
pub const CONSENSUS_DEBUG: VerbosityLevel = 5;
75
/// Consensus WARN log level.
76
pub const CONSENSUS_WARN: VerbosityLevel = 4;
77
/// Consensus INFO log level.
78
pub const CONSENSUS_INFO: VerbosityLevel = 1;
79

            
80
/// Telemetry message verbosity.
81
pub type VerbosityLevel = u8;
82

            
83
pub(crate) type Id = u64;
84
pub(crate) type TelemetryPayload = serde_json::Map<String, serde_json::Value>;
85
pub(crate) type TelemetryMessage = (Id, VerbosityLevel, TelemetryPayload);
86

            
87
/// Message sent when the connection (re-)establishes.
88
#[derive(Debug, Serialize)]
89
pub struct ConnectionMessage {
90
	/// Node's name.
91
	pub name: String,
92
	/// Node's implementation.
93
	pub implementation: String,
94
	/// Node's version.
95
	pub version: String,
96
	/// Node's configuration.
97
	pub config: String,
98
	/// Node's chain.
99
	pub chain: String,
100
	/// Node's genesis hash.
101
	pub genesis_hash: String,
102
	/// Node is an authority.
103
	pub authority: bool,
104
	/// Node's startup time.
105
	pub startup_time: String,
106
	/// Node's network ID.
107
	pub network_id: String,
108

            
109
	/// Node's OS.
110
	pub target_os: String,
111

            
112
	/// Node's ISA.
113
	pub target_arch: String,
114

            
115
	/// Node's target platform ABI or libc.
116
	pub target_env: String,
117

            
118
	/// Node's software and hardware information.
119
	pub sysinfo: Option<SysInfo>,
120
}
121

            
122
/// Hardware and software information for the node.
123
///
124
/// Gathering most of this information is highly OS-specific,
125
/// so most of the fields here are optional.
126
#[derive(Debug, Serialize)]
127
pub struct SysInfo {
128
	/// The exact CPU model.
129
	pub cpu: Option<String>,
130
	/// The total amount of memory, in bytes.
131
	pub memory: Option<u64>,
132
	/// The number of physical CPU cores.
133
	pub core_count: Option<u32>,
134
	/// The Linux kernel version.
135
	pub linux_kernel: Option<String>,
136
	/// The exact Linux distribution used.
137
	pub linux_distro: Option<String>,
138
	/// Whether the node's running under a virtual machine.
139
	pub is_virtual_machine: Option<bool>,
140
}
141

            
142
/// Telemetry worker.
143
///
144
/// It should run as a background task using the [`TelemetryWorker::run`] method. This method
145
/// will consume the object and any further attempts of initializing a new telemetry through its
146
/// handle will fail (without being fatal).
147
#[derive(Debug)]
148
pub struct TelemetryWorker {
149
	message_receiver: mpsc::Receiver<TelemetryMessage>,
150
	message_sender: mpsc::Sender<TelemetryMessage>,
151
	register_receiver: TracingUnboundedReceiver<Register>,
152
	register_sender: TracingUnboundedSender<Register>,
153
	id_counter: Arc<atomic::AtomicU64>,
154
}
155

            
156
impl TelemetryWorker {
157
	/// Instantiate a new [`TelemetryWorker`] which can run in background.
158
	///
159
	/// Only one is needed per process.
160
	pub fn new(buffer_size: usize) -> Result<Self> {
161
		// Let's try to initialize a transport to get an early return.
162
		// Later transport will be initialized multiple times in
163
		// `::process_register`, so it's a convenient way to get an
164
		// error as early as possible.
165
		let _transport = initialize_transport()?;
166
		let (message_sender, message_receiver) = mpsc::channel(buffer_size);
167
		let (register_sender, register_receiver) =
168
			tracing_unbounded("mpsc_telemetry_register", 10_000);
169

            
170
		Ok(Self {
171
			message_receiver,
172
			message_sender,
173
			register_receiver,
174
			register_sender,
175
			id_counter: Arc::new(atomic::AtomicU64::new(1)),
176
		})
177
	}
178

            
179
	/// Get a new [`TelemetryWorkerHandle`].
180
	///
181
	/// This is used when you want to register with the [`TelemetryWorker`].
182
	pub fn handle(&self) -> TelemetryWorkerHandle {
183
		TelemetryWorkerHandle {
184
			message_sender: self.message_sender.clone(),
185
			register_sender: self.register_sender.clone(),
186
			id_counter: self.id_counter.clone(),
187
		}
188
	}
189

            
190
	/// Run the telemetry worker.
191
	///
192
	/// This should be run in a background task.
193
	pub async fn run(mut self) {
194
		let mut node_map: HashMap<Id, Vec<(VerbosityLevel, Multiaddr)>> = HashMap::new();
195
		let mut node_pool: HashMap<Multiaddr, _> = HashMap::new();
196
		let mut pending_connection_notifications: Vec<_> = Vec::new();
197

            
198
		loop {
199
			futures::select! {
200
				message = self.message_receiver.next() => Self::process_message(
201
					message,
202
					&mut node_pool,
203
					&node_map,
204
				).await,
205
				init_payload = self.register_receiver.next() => Self::process_register(
206
					init_payload,
207
					&mut node_pool,
208
					&mut node_map,
209
					&mut pending_connection_notifications,
210
				).await,
211
			}
212
		}
213
	}
214

            
215
	async fn process_register(
216
		input: Option<Register>,
217
		node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
218
		node_map: &mut HashMap<Id, Vec<(VerbosityLevel, Multiaddr)>>,
219
		pending_connection_notifications: &mut Vec<(Multiaddr, ConnectionNotifierSender)>,
220
	) {
221
		let input = input.expect("the stream is never closed; qed");
222

            
223
		match input {
224
			Register::Telemetry { id, endpoints, connection_message } => {
225
				let endpoints = endpoints.0;
226

            
227
				let connection_message = match serde_json::to_value(&connection_message) {
228
					Ok(serde_json::Value::Object(mut value)) => {
229
						value.insert("msg".into(), "system.connected".into());
230
						let mut obj = serde_json::Map::new();
231
						obj.insert("id".to_string(), id.into());
232
						obj.insert("payload".to_string(), value.into());
233
						Some(obj)
234
					},
235
					Ok(_) => {
236
						unreachable!("ConnectionMessage always serialize to an object; qed")
237
					},
238
					Err(err) => {
239
						log::error!(
240
							target: "telemetry",
241
							"Could not serialize connection message: {}",
242
							err,
243
						);
244
						None
245
					},
246
				};
247

            
248
				for (addr, verbosity) in endpoints {
249
					log::trace!(
250
						target: "telemetry",
251
						"Initializing telemetry for: {:?}",
252
						addr,
253
					);
254
					node_map.entry(id).or_default().push((verbosity, addr.clone()));
255

            
256
					let node = match node_pool.entry(addr.clone()) {
257
						Occupied(entry) => entry.into_mut(),
258
						Vacant(entry) => {
259
							let transport = initialize_transport();
260
							let transport = match transport {
261
								Ok(t) => t,
262
								Err(err) => {
263
									log::error!(
264
										target: "telemetry",
265
										"Could not initialise transport: {}",
266
										err,
267
									);
268
									continue
269
								},
270
							};
271
							entry.insert(Node::new(transport, addr.clone(), Vec::new(), Vec::new()))
272
						},
273
					};
274

            
275
					node.connection_messages.extend(connection_message.clone());
276

            
277
					pending_connection_notifications.retain(|(addr_b, connection_message)| {
278
						if *addr_b == addr {
279
							node.telemetry_connection_notifier.push(connection_message.clone());
280
							false
281
						} else {
282
							true
283
						}
284
					});
285
				}
286
			},
287
			Register::Notifier { addresses, connection_notifier } => {
288
				for addr in addresses {
289
					// If the Node has been initialized, we directly push the connection_notifier.
290
					// Otherwise we push it to a queue that will be consumed when the connection
291
					// initializes, thus ensuring that the connection notifier will be sent to the
292
					// Node when it becomes available.
293
					if let Some(node) = node_pool.get_mut(&addr) {
294
						node.telemetry_connection_notifier.push(connection_notifier.clone());
295
					} else {
296
						pending_connection_notifications.push((addr, connection_notifier.clone()));
297
					}
298
				}
299
			},
300
		}
301
	}
302

            
303
	// dispatch messages to the telemetry nodes
304
	async fn process_message(
305
		input: Option<TelemetryMessage>,
306
		node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
307
		node_map: &HashMap<Id, Vec<(VerbosityLevel, Multiaddr)>>,
308
	) {
309
		let (id, verbosity, payload) = input.expect("the stream is never closed; qed");
310

            
311
		let ts = chrono::Local::now().to_rfc3339();
312
		let mut message = serde_json::Map::new();
313
		message.insert("id".into(), id.into());
314
		message.insert("ts".into(), ts.into());
315
		message.insert("payload".into(), payload.into());
316

            
317
		let nodes = if let Some(nodes) = node_map.get(&id) {
318
			nodes
319
		} else {
320
			// This is a normal error because the telemetry ID exists before the telemetry is
321
			// initialized.
322
			log::trace!(
323
				target: "telemetry",
324
				"Received telemetry log for unknown id ({:?}): {}",
325
				id,
326
				serde_json::to_string(&message)
327
					.unwrap_or_else(|err| format!(
328
						"could not be serialized ({}): {:?}",
329
						err,
330
						message,
331
					)),
332
			);
333
			return
334
		};
335

            
336
		for (node_max_verbosity, addr) in nodes {
337
			if verbosity > *node_max_verbosity {
338
				continue
339
			}
340

            
341
			if let Some(node) = node_pool.get_mut(addr) {
342
				let _ = node.send(message.clone()).await;
343
			} else {
344
				log::debug!(
345
					target: "telemetry",
346
					"Received message for unknown node ({}). This is a bug. \
347
					Message sent: {}",
348
					addr,
349
					serde_json::to_string(&message)
350
						.unwrap_or_else(|err| format!(
351
							"could not be serialized ({}): {:?}",
352
							err,
353
							message,
354
						)),
355
				);
356
			}
357
		}
358
	}
359
}
360

            
361
/// Handle to the [`TelemetryWorker`] thats allows initializing the telemetry for a Substrate node.
362
#[derive(Debug, Clone)]
363
pub struct TelemetryWorkerHandle {
364
	message_sender: mpsc::Sender<TelemetryMessage>,
365
	register_sender: TracingUnboundedSender<Register>,
366
	id_counter: Arc<atomic::AtomicU64>,
367
}
368

            
369
impl TelemetryWorkerHandle {
370
	/// Instantiate a new [`Telemetry`] object.
371
	pub fn new_telemetry(&mut self, endpoints: TelemetryEndpoints) -> Telemetry {
372
		let addresses = endpoints.0.iter().map(|(addr, _)| addr.clone()).collect();
373

            
374
		Telemetry {
375
			message_sender: self.message_sender.clone(),
376
			register_sender: self.register_sender.clone(),
377
			id: self.id_counter.fetch_add(1, atomic::Ordering::Relaxed),
378
			connection_notifier: TelemetryConnectionNotifier {
379
				register_sender: self.register_sender.clone(),
380
				addresses,
381
			},
382
			endpoints: Some(endpoints),
383
		}
384
	}
385
}
386

            
387
/// A telemetry instance that can be used to send telemetry messages.
388
#[derive(Debug)]
389
pub struct Telemetry {
390
	message_sender: mpsc::Sender<TelemetryMessage>,
391
	register_sender: TracingUnboundedSender<Register>,
392
	id: Id,
393
	connection_notifier: TelemetryConnectionNotifier,
394
	endpoints: Option<TelemetryEndpoints>,
395
}
396

            
397
impl Telemetry {
398
	/// Initialize the telemetry with the endpoints provided in argument for the current substrate
399
	/// node.
400
	///
401
	/// This method must be called during the substrate node initialization.
402
	///
403
	/// The `endpoints` argument is a collection of telemetry WebSocket servers with a corresponding
404
	/// verbosity level.
405
	///
406
	/// The `connection_message` argument is a JSON object that is sent every time the connection
407
	/// (re-)establishes.
408
	pub fn start_telemetry(&mut self, connection_message: ConnectionMessage) -> Result<()> {
409
		let endpoints = self.endpoints.take().ok_or(Error::TelemetryAlreadyInitialized)?;
410

            
411
		self.register_sender
412
			.unbounded_send(Register::Telemetry { id: self.id, endpoints, connection_message })
413
			.map_err(|_| Error::TelemetryWorkerDropped)
414
	}
415

            
416
	/// Make a new clonable handle to this [`Telemetry`]. This is used for reporting telemetries.
417
	pub fn handle(&self) -> TelemetryHandle {
418
		TelemetryHandle {
419
			message_sender: Arc::new(Mutex::new(self.message_sender.clone())),
420
			id: self.id,
421
			connection_notifier: self.connection_notifier.clone(),
422
		}
423
	}
424
}
425

            
426
/// Handle to a [`Telemetry`].
427
///
428
/// Used to report telemetry messages.
429
#[derive(Debug, Clone)]
430
pub struct TelemetryHandle {
431
	message_sender: Arc<Mutex<mpsc::Sender<TelemetryMessage>>>,
432
	id: Id,
433
	connection_notifier: TelemetryConnectionNotifier,
434
}
435

            
436
impl TelemetryHandle {
437
	/// Send telemetry messages.
438
	pub fn send_telemetry(&self, verbosity: VerbosityLevel, payload: TelemetryPayload) {
439
		match self.message_sender.lock().try_send((self.id, verbosity, payload)) {
440
			Ok(()) => {},
441
			Err(err) if err.is_full() => log::trace!(
442
				target: "telemetry",
443
				"Telemetry channel full.",
444
			),
445
			Err(_) => log::trace!(
446
				target: "telemetry",
447
				"Telemetry channel closed.",
448
			),
449
		}
450
	}
451

            
452
	/// Get event stream for telemetry connection established events.
453
	///
454
	/// This function will return an error if the telemetry has already been started by
455
	/// [`Telemetry::start_telemetry`].
456
	pub fn on_connect_stream(&self) -> ConnectionNotifierReceiver {
457
		self.connection_notifier.on_connect_stream()
458
	}
459
}
460

            
461
/// Used to create a stream of events with only one event: when a telemetry connection
462
/// (re-)establishes.
463
#[derive(Clone, Debug)]
464
pub struct TelemetryConnectionNotifier {
465
	register_sender: TracingUnboundedSender<Register>,
466
	addresses: Vec<Multiaddr>,
467
}
468

            
469
impl TelemetryConnectionNotifier {
470
	fn on_connect_stream(&self) -> ConnectionNotifierReceiver {
471
		let (message_sender, message_receiver) = connection_notifier_channel();
472
		if let Err(err) = self.register_sender.unbounded_send(Register::Notifier {
473
			addresses: self.addresses.clone(),
474
			connection_notifier: message_sender,
475
		}) {
476
			error!(
477
				target: "telemetry",
478
				"Could not create a telemetry connection notifier: \
479
				the telemetry is probably already running: {}",
480
				err,
481
			);
482
		}
483
		message_receiver
484
	}
485
}
486

            
487
#[derive(Debug)]
488
enum Register {
489
	Telemetry { id: Id, endpoints: TelemetryEndpoints, connection_message: ConnectionMessage },
490
	Notifier { addresses: Vec<Multiaddr>, connection_notifier: ConnectionNotifierSender },
491
}
492

            
493
/// Report a telemetry.
494
///
495
/// Translates to `tracing::info`, but contains an additional verbosity parameter which the log
496
/// record is tagged with. Additionally the verbosity parameter is added to the record as a
497
/// key-value pair.
498
///
499
/// # Example
500
///
501
/// ```no_run
502
/// # use sc_telemetry::*;
503
/// # let authority_id = 42_u64;
504
/// # let set_id = (43_u64, 44_u64);
505
/// # let authorities = vec![45_u64];
506
/// # let telemetry: Option<TelemetryHandle> = None;
507
/// telemetry!(
508
///     telemetry;      // an `Option<TelemetryHandle>`
509
///     CONSENSUS_INFO;
510
///     "afg.authority_set";
511
///     "authority_id" => authority_id.to_string(),
512
///     "authority_set_id" => ?set_id,
513
///     "authorities" => authorities,
514
/// );
515
/// ```
516
#[macro_export(local_inner_macros)]
517
macro_rules! telemetry {
518
	( $telemetry:expr; $verbosity:expr; $msg:expr; $( $t:tt )* ) => {{
519
		if let Some(telemetry) = $telemetry.as_ref() {
520
			let verbosity: $crate::VerbosityLevel = $verbosity;
521
			match format_fields_to_json!($($t)*) {
522
				Err(err) => {
523
					$crate::log::debug!(
524
						target: "telemetry",
525
						"Could not serialize value for telemetry: {}",
526
						err,
527
					);
528
				},
529
				Ok(mut json) => {
530
					json.insert("msg".into(), $msg.into());
531
					telemetry.send_telemetry(verbosity, json);
532
				},
533
			}
534
		}
535
	}};
536
}
537

            
538
#[macro_export(local_inner_macros)]
539
#[doc(hidden)]
540
macro_rules! format_fields_to_json {
541
	( $k:literal => $v:expr $(,)? $(, $($t:tt)+ )? ) => {{
542
		$crate::serde_json::to_value(&$v)
543
			.map(|value| {
544
				let mut map = $crate::serde_json::Map::new();
545
				map.insert($k.into(), value);
546
				map
547
			})
548
			$(
549
				.and_then(|mut prev_map| {
550
					format_fields_to_json!($($t)*)
551
						.map(move |mut other_map| {
552
							prev_map.append(&mut other_map);
553
							prev_map
554
						})
555
				})
556
			)*
557
	}};
558
	( $k:literal => ? $v:expr $(,)? $(, $($t:tt)+ )? ) => {{
559
		let mut map = $crate::serde_json::Map::new();
560
		map.insert($k.into(), std::format!("{:?}", &$v).into());
561
		$crate::serde_json::Result::Ok(map)
562
		$(
563
			.and_then(|mut prev_map| {
564
				format_fields_to_json!($($t)*)
565
					.map(move |mut other_map| {
566
						prev_map.append(&mut other_map);
567
						prev_map
568
					})
569
			})
570
		)*
571
	}};
572
}