1
// This file is part of Substrate.
2

            
3
// Copyright (C) Parity Technologies (UK) Ltd.
4
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5

            
6
// This program is free software: you can redistribute it and/or modify
7
// it under the terms of the GNU General Public License as published by
8
// the Free Software Foundation, either version 3 of the License, or
9
// (at your option) any later version.
10

            
11
// This program is distributed in the hope that it will be useful,
12
// but WITHOUT ANY WARRANTY; without even the implied warranty of
13
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
// GNU General Public License for more details.
15

            
16
// You should have received a copy of the GNU General Public License
17
// along with this program. If not, see <https://www.gnu.org/licenses/>.
18

            
19
use crate::{error::Error as CliError, Result, Signals, SubstrateCli};
20
use chrono::prelude::*;
21
use futures::{future::FutureExt, Future};
22
use log::info;
23
use sc_service::{Configuration, Error as ServiceError, TaskManager};
24
use sc_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL};
25
use std::{marker::PhantomData, time::Duration};
26

            
27
/// Build a tokio runtime with all features.
28
pub fn build_runtime() -> std::result::Result<tokio::runtime::Runtime, std::io::Error> {
29
	tokio::runtime::Builder::new_multi_thread()
30
		.on_thread_start(|| {
31
			TOKIO_THREADS_ALIVE.inc();
32
			TOKIO_THREADS_TOTAL.inc();
33
		})
34
		.on_thread_stop(|| {
35
			TOKIO_THREADS_ALIVE.dec();
36
		})
37
		.enable_all()
38
		.build()
39
}
40

            
41
/// A Substrate CLI runtime that can be used to run a node or a command
42
pub struct Runner<C: SubstrateCli> {
43
	config: Configuration,
44
	tokio_runtime: tokio::runtime::Runtime,
45
	signals: Signals,
46
	phantom: PhantomData<C>,
47
}
48

            
49
impl<C: SubstrateCli> Runner<C> {
50
	/// Create a new runtime with the command provided in argument
51
	pub fn new(
52
		config: Configuration,
53
		tokio_runtime: tokio::runtime::Runtime,
54
		signals: Signals,
55
	) -> Result<Runner<C>> {
56
		Ok(Runner { config, tokio_runtime, signals, phantom: PhantomData })
57
	}
58

            
59
	/// Log information about the node itself.
60
	///
61
	/// # Example:
62
	///
63
	/// ```text
64
	/// 2020-06-03 16:14:21 Substrate Node
65
	/// 2020-06-03 16:14:21 ✌️  version 2.0.0-rc3-f4940588c-x86_64-linux-gnu
66
	/// 2020-06-03 16:14:21 ❤️  by Parity Technologies <admin@parity.io>, 2017-2020
67
	/// 2020-06-03 16:14:21 📋 Chain specification: Flaming Fir
68
	/// 2020-06-03 16:14:21 🏷  Node name: jolly-rod-7462
69
	/// 2020-06-03 16:14:21 👤 Role: FULL
70
	/// 2020-06-03 16:14:21 💾 Database: RocksDb at /tmp/c/chains/flamingfir7/db
71
	/// 2020-06-03 16:14:21 ⛓  Native runtime: node-251 (substrate-node-1.tx1.au10)
72
	/// ```
73
	fn print_node_infos(&self) {
74
		print_node_infos::<C>(self.config())
75
	}
76

            
77
	/// A helper function that runs a node with tokio and stops if the process receives the signal
78
	/// `SIGTERM` or `SIGINT`.
79
	pub fn run_node_until_exit<F, E>(
80
		self,
81
		initialize: impl FnOnce(Configuration) -> F,
82
	) -> std::result::Result<(), E>
83
	where
84
		F: Future<Output = std::result::Result<TaskManager, E>>,
85
		E: std::error::Error + Send + Sync + 'static + From<ServiceError>,
86
	{
87
		self.print_node_infos();
88

            
89
		let mut task_manager = self.tokio_runtime.block_on(initialize(self.config))?;
90

            
91
		let res = self
92
			.tokio_runtime
93
			.block_on(self.signals.run_until_signal(task_manager.future().fuse()));
94
		// We need to drop the task manager here to inform all tasks that they should shut down.
95
		//
96
		// This is important to be done before we instruct the tokio runtime to shutdown. Otherwise
97
		// the tokio runtime will wait the full 60 seconds for all tasks to stop.
98
		let task_registry = task_manager.into_task_registry();
99

            
100
		// Give all futures 60 seconds to shutdown, before tokio "leaks" them.
101
		let shutdown_timeout = Duration::from_secs(60);
102
		self.tokio_runtime.shutdown_timeout(shutdown_timeout);
103

            
104
		let running_tasks = task_registry.running_tasks();
105

            
106
		if !running_tasks.is_empty() {
107
			log::error!("Detected running(potentially stalled) tasks on shutdown:");
108
			running_tasks.iter().for_each(|(task, count)| {
109
				let instances_desc =
110
					if *count > 1 { format!("with {} instances ", count) } else { "".to_string() };
111

            
112
				if task.is_default_group() {
113
					log::error!(
114
						"Task \"{}\" was still running {}after waiting {} seconds to finish.",
115
						task.name,
116
						instances_desc,
117
						shutdown_timeout.as_secs(),
118
					);
119
				} else {
120
					log::error!(
121
						"Task \"{}\" (Group: {}) was still running {}after waiting {} seconds to finish.",
122
						task.name,
123
						task.group,
124
						instances_desc,
125
						shutdown_timeout.as_secs(),
126
					);
127
				}
128
			});
129
		}
130

            
131
		res.map_err(Into::into)
132
	}
133

            
134
	/// A helper function that runs a command with the configuration of this node.
135
	pub fn sync_run<E>(
136
		self,
137
		runner: impl FnOnce(Configuration) -> std::result::Result<(), E>,
138
	) -> std::result::Result<(), E>
139
	where
140
		E: std::error::Error + Send + Sync + 'static + From<ServiceError>,
141
	{
142
		runner(self.config)
143
	}
144

            
145
	/// A helper function that runs a future with tokio and stops if the process receives
146
	/// the signal `SIGTERM` or `SIGINT`.
147
	pub fn async_run<F, E>(
148
		self,
149
		runner: impl FnOnce(Configuration) -> std::result::Result<(F, TaskManager), E>,
150
	) -> std::result::Result<(), E>
151
	where
152
		F: Future<Output = std::result::Result<(), E>>,
153
		E: std::error::Error + Send + Sync + 'static + From<ServiceError> + From<CliError>,
154
	{
155
		let (future, task_manager) = runner(self.config)?;
156
		self.tokio_runtime.block_on(self.signals.run_until_signal(future.fuse()))?;
157
		// Drop the task manager before dropping the rest, to ensure that all futures were informed
158
		// about the shut down.
159
		drop(task_manager);
160
		Ok(())
161
	}
162

            
163
	/// Get an immutable reference to the node Configuration
164
	pub fn config(&self) -> &Configuration {
165
		&self.config
166
	}
167

            
168
	/// Get a mutable reference to the node Configuration
169
	pub fn config_mut(&mut self) -> &mut Configuration {
170
		&mut self.config
171
	}
172
}
173

            
174
/// Log information about the node itself.
175
pub fn print_node_infos<C: SubstrateCli>(config: &Configuration) {
176
	info!("{}", C::impl_name());
177
	info!("✌️  version {}", C::impl_version());
178
	info!("❤️  by {}, {}-{}", C::author(), C::copyright_start_year(), Local::now().year());
179
	info!("📋 Chain specification: {}", config.chain_spec.name());
180
	info!("🏷  Node name: {}", config.network.node_name);
181
	info!("👤 Role: {}", config.display_role());
182
	info!(
183
		"💾 Database: {} at {}",
184
		config.database,
185
		config
186
			.database
187
			.path()
188
			.map_or_else(|| "<unknown>".to_owned(), |p| p.display().to_string())
189
	);
190
}
191

            
192
#[cfg(test)]
193
mod tests {
194
	use super::*;
195
	use sc_network::config::NetworkConfiguration;
196
	use sc_service::{Arc, ChainType, GenericChainSpec, NoExtension};
197
	use std::{
198
		path::PathBuf,
199
		sync::atomic::{AtomicU64, Ordering},
200
	};
201

            
202
	struct Cli;
203

            
204
	impl SubstrateCli for Cli {
205
		fn author() -> String {
206
			"test".into()
207
		}
208

            
209
		fn impl_name() -> String {
210
			"yep".into()
211
		}
212

            
213
		fn impl_version() -> String {
214
			"version".into()
215
		}
216

            
217
		fn description() -> String {
218
			"desc".into()
219
		}
220

            
221
		fn support_url() -> String {
222
			"no.pe".into()
223
		}
224

            
225
		fn copyright_start_year() -> i32 {
226
			2042
227
		}
228

            
229
		fn load_spec(
230
			&self,
231
			_: &str,
232
		) -> std::result::Result<Box<dyn sc_service::ChainSpec>, String> {
233
			Err("nope".into())
234
		}
235
	}
236

            
237
	fn create_runner() -> Runner<Cli> {
238
		let runtime = build_runtime().unwrap();
239

            
240
		let root = PathBuf::from("db");
241
		let runner = Runner::new(
242
			Configuration {
243
				impl_name: "spec".into(),
244
				impl_version: "3".into(),
245
				role: sc_service::Role::Authority,
246
				tokio_handle: runtime.handle().clone(),
247
				transaction_pool: Default::default(),
248
				network: NetworkConfiguration::new_memory(),
249
				keystore: sc_service::config::KeystoreConfig::InMemory,
250
				database: sc_client_db::DatabaseSource::ParityDb { path: root.clone() },
251
				trie_cache_maximum_size: None,
252
				state_pruning: None,
253
				blocks_pruning: sc_client_db::BlocksPruning::KeepAll,
254
				chain_spec: Box::new(
255
					GenericChainSpec::<NoExtension, ()>::builder(
256
						Default::default(),
257
						NoExtension::None,
258
					)
259
					.with_name("test")
260
					.with_id("test_id")
261
					.with_chain_type(ChainType::Development)
262
					.with_genesis_config_patch(Default::default())
263
					.build(),
264
				),
265
				wasm_method: Default::default(),
266
				wasmtime_precompiled: None,
267
				wasm_runtime_overrides: None,
268
				rpc_addr: None,
269
				rpc_max_connections: Default::default(),
270
				rpc_cors: None,
271
				rpc_methods: Default::default(),
272
				rpc_max_request_size: Default::default(),
273
				rpc_max_response_size: Default::default(),
274
				rpc_id_provider: Default::default(),
275
				rpc_max_subs_per_conn: Default::default(),
276
				rpc_message_buffer_capacity: Default::default(),
277
				rpc_port: 9944,
278
				rpc_batch_config: sc_service::config::RpcBatchRequestConfig::Unlimited,
279
				rpc_rate_limit: None,
280
				rpc_rate_limit_whitelisted_ips: Default::default(),
281
				rpc_rate_limit_trust_proxy_headers: Default::default(),
282
				prometheus_config: None,
283
				telemetry_endpoints: None,
284
				default_heap_pages: None,
285
				offchain_worker: Default::default(),
286
				force_authoring: false,
287
				disable_grandpa: false,
288
				dev_key_seed: None,
289
				tracing_targets: None,
290
				tracing_receiver: Default::default(),
291
				max_runtime_instances: 8,
292
				announce_block: true,
293
				base_path: sc_service::BasePath::new(root.clone()),
294
				data_path: root,
295
				informant_output_format: Default::default(),
296
				runtime_cache_size: 2,
297
			},
298
			runtime,
299
			Signals::dummy(),
300
		)
301
		.unwrap();
302

            
303
		runner
304
	}
305

            
306
	#[test]
307
	fn ensure_run_until_exit_informs_tasks_to_end() {
308
		let runner = create_runner();
309

            
310
		let counter = Arc::new(AtomicU64::new(0));
311
		let counter2 = counter.clone();
312

            
313
		runner
314
			.run_node_until_exit(move |cfg| async move {
315
				let task_manager = TaskManager::new(cfg.tokio_handle.clone(), None).unwrap();
316
				let (sender, receiver) = futures::channel::oneshot::channel();
317

            
318
				// We need to use `spawn_blocking` here so that we get a dedicated thread for our
319
				// future. This is important for this test, as otherwise tokio can just "drop" the
320
				// future.
321
				task_manager.spawn_handle().spawn_blocking("test", None, async move {
322
					let _ = sender.send(());
323
					loop {
324
						counter2.fetch_add(1, Ordering::Relaxed);
325
						futures_timer::Delay::new(Duration::from_millis(50)).await;
326
					}
327
				});
328

            
329
				task_manager.spawn_essential_handle().spawn_blocking("test2", None, async {
330
					// Let's stop this essential task directly when our other task started.
331
					// It will signal that the task manager should end.
332
					let _ = receiver.await;
333
				});
334

            
335
				Ok::<_, sc_service::Error>(task_manager)
336
			})
337
			.unwrap_err();
338

            
339
		let count = counter.load(Ordering::Relaxed);
340

            
341
		// Ensure that our counting task was running for less than 30 seconds.
342
		// It should be directly killed, but for CI and whatever we are being a little bit more
343
		// "relaxed".
344
		assert!((count as u128) < (Duration::from_secs(30).as_millis() / 50));
345
	}
346

            
347
	fn run_test_in_another_process(
348
		test_name: &str,
349
		test_body: impl FnOnce(),
350
	) -> Option<std::process::Output> {
351
		if std::env::var("RUN_FORKED_TEST").is_ok() {
352
			test_body();
353
			None
354
		} else {
355
			let output = std::process::Command::new(std::env::current_exe().unwrap())
356
				.arg(test_name)
357
				.env("RUN_FORKED_TEST", "1")
358
				.output()
359
				.unwrap();
360

            
361
			assert!(output.status.success());
362
			Some(output)
363
		}
364
	}
365

            
366
	/// This test ensures that `run_node_until_exit` aborts waiting for "stuck" tasks after 60
367
	/// seconds, aka doesn't wait until they are finished (which may never happen).
368
	#[test]
369
	fn ensure_run_until_exit_is_not_blocking_indefinitely() {
370
		let output = run_test_in_another_process(
371
			"ensure_run_until_exit_is_not_blocking_indefinitely",
372
			|| {
373
				sp_tracing::try_init_simple();
374

            
375
				let runner = create_runner();
376

            
377
				runner
378
					.run_node_until_exit(move |cfg| async move {
379
						let task_manager =
380
							TaskManager::new(cfg.tokio_handle.clone(), None).unwrap();
381
						let (sender, receiver) = futures::channel::oneshot::channel();
382

            
383
						// We need to use `spawn_blocking` here so that we get a dedicated thread
384
						// for our future. This future is more blocking code that will never end.
385
						task_manager.spawn_handle().spawn_blocking("test", None, async move {
386
							let _ = sender.send(());
387
							loop {
388
								std::thread::sleep(Duration::from_secs(30));
389
							}
390
						});
391

            
392
						task_manager.spawn_essential_handle().spawn_blocking(
393
							"test2",
394
							None,
395
							async {
396
								// Let's stop this essential task directly when our other task
397
								// started. It will signal that the task manager should end.
398
								let _ = receiver.await;
399
							},
400
						);
401

            
402
						Ok::<_, sc_service::Error>(task_manager)
403
					})
404
					.unwrap_err();
405
			},
406
		);
407

            
408
		let Some(output) = output else { return };
409

            
410
		let stderr = dbg!(String::from_utf8(output.stderr).unwrap());
411

            
412
		assert!(
413
			stderr.contains("Task \"test\" was still running after waiting 60 seconds to finish.")
414
		);
415
		assert!(!stderr
416
			.contains("Task \"test2\" was still running after waiting 60 seconds to finish."));
417
	}
418
}