1
// This file is part of Substrate.
2

            
3
// Copyright (C) Parity Technologies (UK) Ltd.
4
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5

            
6
// This program is free software: you can redistribute it and/or modify
7
// it under the terms of the GNU General Public License as published by
8
// the Free Software Foundation, either version 3 of the License, or
9
// (at your option) any later version.
10

            
11
// This program is distributed in the hope that it will be useful,
12
// but WITHOUT ANY WARRANTY; without even the implied warranty of
13
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
// GNU General Public License for more details.
15

            
16
// You should have received a copy of the GNU General Public License
17
// along with this program. If not, see <https://www.gnu.org/licenses/>.
18

            
19
//! Substrate service. Starts a thread that spins up the network, client, and extrinsic pool.
20
//! Manages communication between them.
21

            
22
#![warn(missing_docs)]
23
#![recursion_limit = "1024"]
24

            
25
pub mod chain_ops;
26
pub mod config;
27
pub mod error;
28

            
29
mod builder;
30
#[cfg(feature = "test-helpers")]
31
pub mod client;
32
#[cfg(not(feature = "test-helpers"))]
33
mod client;
34
mod metrics;
35
mod task_manager;
36

            
37
use std::{collections::HashMap, net::SocketAddr};
38

            
39
use codec::{Decode, Encode};
40
use futures::{pin_mut, FutureExt, StreamExt};
41
use jsonrpsee::RpcModule;
42
use log::{debug, error, warn};
43
use sc_client_api::{blockchain::HeaderBackend, BlockBackend, BlockchainEvents, ProofProvider};
44
use sc_network::{
45
	config::MultiaddrWithPeerId, service::traits::NetworkService, NetworkBackend, NetworkBlock,
46
	NetworkPeers, NetworkStateInfo,
47
};
48
use sc_network_sync::SyncingService;
49
use sc_network_types::PeerId;
50
use sc_utils::mpsc::TracingUnboundedReceiver;
51
use sp_blockchain::HeaderMetadata;
52
use sp_consensus::SyncOracle;
53
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
54

            
55
pub use self::{
56
	builder::{
57
		build_network, gen_rpc_module, init_telemetry, new_client, new_db_backend, new_full_client,
58
		new_full_parts, new_full_parts_record_import, new_full_parts_with_genesis_builder,
59
		new_wasm_executor, propagate_transaction_notifications, spawn_tasks, BuildNetworkParams,
60
		KeystoreContainer, NetworkStarter, SpawnTasksParams, TFullBackend, TFullCallExecutor,
61
		TFullClient,
62
	},
63
	client::{ClientConfig, LocalCallExecutor},
64
	error::Error,
65
};
66
#[allow(deprecated)]
67
pub use builder::new_native_or_wasm_executor;
68

            
69
pub use sc_chain_spec::{
70
	construct_genesis_block, resolve_state_version_from_wasm, BuildGenesisBlock,
71
	GenesisBlockBuilder,
72
};
73

            
74
pub use config::{
75
	BasePath, BlocksPruning, Configuration, DatabaseSource, PruningMode, Role, RpcMethods, TaskType,
76
};
77
pub use sc_chain_spec::{
78
	ChainSpec, ChainType, Extension as ChainSpecExtension, GenericChainSpec, NoExtension,
79
	Properties,
80
};
81

            
82
pub use sc_consensus::ImportQueue;
83
pub use sc_executor::NativeExecutionDispatch;
84
pub use sc_network_sync::WarpSyncParams;
85
#[doc(hidden)]
86
pub use sc_network_transactions::config::{TransactionImport, TransactionImportFuture};
87
pub use sc_rpc::{
88
	RandomIntegerSubscriptionId, RandomStringSubscriptionId, RpcSubscriptionIdProvider,
89
};
90
pub use sc_tracing::TracingReceiver;
91
pub use sc_transaction_pool::Options as TransactionPoolOptions;
92
pub use sc_transaction_pool_api::{error::IntoPoolError, InPoolTransaction, TransactionPool};
93
#[doc(hidden)]
94
pub use std::{ops::Deref, result::Result, sync::Arc};
95
pub use task_manager::{SpawnTaskHandle, Task, TaskManager, TaskRegistry, DEFAULT_GROUP_NAME};
96

            
97
const DEFAULT_PROTOCOL_ID: &str = "sup";
98

            
99
/// RPC handlers that can perform RPC queries.
100
#[derive(Clone)]
101
pub struct RpcHandlers(Arc<RpcModule<()>>);
102

            
103
impl RpcHandlers {
104
	/// Starts an RPC query.
105
	///
106
	/// The query is passed as a string and must be valid JSON-RPC request object.
107
	///
108
	/// Returns a response and a stream if the call successful, fails if the
109
	/// query could not be decoded as a JSON-RPC request object.
110
	///
111
	/// If the request subscribes you to events, the `stream` can be used to
112
	/// retrieve the events.
113
	pub async fn rpc_query(
114
		&self,
115
		json_query: &str,
116
	) -> Result<(String, tokio::sync::mpsc::Receiver<String>), serde_json::Error> {
117
		// Because `tokio::sync::mpsc::channel` is used under the hood
118
		// it will panic if it's set to usize::MAX.
119
		//
120
		// This limit is used to prevent panics and is large enough.
121
		const TOKIO_MPSC_MAX_SIZE: usize = tokio::sync::Semaphore::MAX_PERMITS;
122

            
123
		self.0.raw_json_request(json_query, TOKIO_MPSC_MAX_SIZE).await
124
	}
125

            
126
	/// Provides access to the underlying `RpcModule`
127
	pub fn handle(&self) -> Arc<RpcModule<()>> {
128
		self.0.clone()
129
	}
130
}
131

            
132
/// An incomplete set of chain components, but enough to run the chain ops subcommands.
133
pub struct PartialComponents<Client, Backend, SelectChain, ImportQueue, TransactionPool, Other> {
134
	/// A shared client instance.
135
	pub client: Arc<Client>,
136
	/// A shared backend instance.
137
	pub backend: Arc<Backend>,
138
	/// The chain task manager.
139
	pub task_manager: TaskManager,
140
	/// A keystore container instance.
141
	pub keystore_container: KeystoreContainer,
142
	/// A chain selection algorithm instance.
143
	pub select_chain: SelectChain,
144
	/// An import queue.
145
	pub import_queue: ImportQueue,
146
	/// A shared transaction pool.
147
	pub transaction_pool: Arc<TransactionPool>,
148
	/// Everything else that needs to be passed into the main build function.
149
	pub other: Other,
150
}
151

            
152
/// Builds a future that continuously polls the network.
153
async fn build_network_future<
154
	B: BlockT,
155
	C: BlockchainEvents<B>
156
		+ HeaderBackend<B>
157
		+ BlockBackend<B>
158
		+ HeaderMetadata<B, Error = sp_blockchain::Error>
159
		+ ProofProvider<B>
160
		+ Send
161
		+ Sync
162
		+ 'static,
163
	H: sc_network_common::ExHashT,
164
	N: NetworkBackend<B, <B as BlockT>::Hash>,
165
>(
166
	network: N,
167
	client: Arc<C>,
168
	sync_service: Arc<SyncingService<B>>,
169
	announce_imported_blocks: bool,
170
) {
171
	let mut imported_blocks_stream = client.import_notification_stream().fuse();
172

            
173
	// Stream of finalized blocks reported by the client.
174
	let mut finality_notification_stream = client.finality_notification_stream().fuse();
175

            
176
	let network_run = network.run().fuse();
177
	pin_mut!(network_run);
178

            
179
	loop {
180
		futures::select! {
181
			// List of blocks that the client has imported.
182
			notification = imported_blocks_stream.next() => {
183
				let notification = match notification {
184
					Some(n) => n,
185
					// If this stream is shut down, that means the client has shut down, and the
186
					// most appropriate thing to do for the network future is to shut down too.
187
					None => {
188
						debug!("Block import stream has terminated, shutting down the network future.");
189
						return
190
					},
191
				};
192

            
193
				if announce_imported_blocks {
194
					sync_service.announce_block(notification.hash, None);
195
				}
196

            
197
				if notification.is_new_best {
198
					sync_service.new_best_block_imported(
199
						notification.hash,
200
						*notification.header.number(),
201
					);
202
				}
203
			}
204

            
205
			// List of blocks that the client has finalized.
206
			notification = finality_notification_stream.select_next_some() => {
207
				sync_service.on_block_finalized(notification.hash, notification.header);
208
			}
209

            
210
			// Drive the network. Shut down the network future if `NetworkWorker` has terminated.
211
			_ = network_run => {
212
				debug!("`NetworkWorker` has terminated, shutting down the network future.");
213
				return
214
			}
215
		}
216
	}
217
}
218

            
219
/// Builds a future that processes system RPC requests.
220
pub async fn build_system_rpc_future<
221
	B: BlockT,
222
	C: BlockchainEvents<B>
223
		+ HeaderBackend<B>
224
		+ BlockBackend<B>
225
		+ HeaderMetadata<B, Error = sp_blockchain::Error>
226
		+ ProofProvider<B>
227
		+ Send
228
		+ Sync
229
		+ 'static,
230
	H: sc_network_common::ExHashT,
231
>(
232
	role: Role,
233
	network_service: Arc<dyn NetworkService>,
234
	sync_service: Arc<SyncingService<B>>,
235
	client: Arc<C>,
236
	mut rpc_rx: TracingUnboundedReceiver<sc_rpc::system::Request<B>>,
237
	should_have_peers: bool,
238
) {
239
	// Current best block at initialization, to report to the RPC layer.
240
	let starting_block = client.info().best_number;
241

            
242
	loop {
243
		// Answer incoming RPC requests.
244
		let Some(req) = rpc_rx.next().await else {
245
			debug!("RPC requests stream has terminated, shutting down the system RPC future.");
246
			return
247
		};
248

            
249
		match req {
250
			sc_rpc::system::Request::Health(sender) => match sync_service.peers_info().await {
251
				Ok(info) => {
252
					let _ = sender.send(sc_rpc::system::Health {
253
						peers: info.len(),
254
						is_syncing: sync_service.is_major_syncing(),
255
						should_have_peers,
256
					});
257
				},
258
				Err(_) => log::error!("`SyncingEngine` shut down"),
259
			},
260
			sc_rpc::system::Request::LocalPeerId(sender) => {
261
				let _ = sender.send(network_service.local_peer_id().to_base58());
262
			},
263
			sc_rpc::system::Request::LocalListenAddresses(sender) => {
264
				let peer_id = (network_service.local_peer_id()).into();
265
				let p2p_proto_suffix = sc_network::multiaddr::Protocol::P2p(peer_id);
266
				let addresses = network_service
267
					.listen_addresses()
268
					.iter()
269
					.map(|addr| addr.clone().with(p2p_proto_suffix.clone()).to_string())
270
					.collect();
271
				let _ = sender.send(addresses);
272
			},
273
			sc_rpc::system::Request::Peers(sender) => match sync_service.peers_info().await {
274
				Ok(info) => {
275
					let _ = sender.send(
276
						info.into_iter()
277
							.map(|(peer_id, p)| sc_rpc::system::PeerInfo {
278
								peer_id: peer_id.to_base58(),
279
								roles: format!("{:?}", p.roles),
280
								best_hash: p.best_hash,
281
								best_number: p.best_number,
282
							})
283
							.collect(),
284
					);
285
				},
286
				Err(_) => log::error!("`SyncingEngine` shut down"),
287
			},
288
			sc_rpc::system::Request::NetworkState(sender) => {
289
				let network_state = network_service.network_state().await;
290
				if let Ok(network_state) = network_state {
291
					if let Ok(network_state) = serde_json::to_value(network_state) {
292
						let _ = sender.send(network_state);
293
					}
294
				} else {
295
					break
296
				}
297
			},
298
			sc_rpc::system::Request::NetworkAddReservedPeer(peer_addr, sender) => {
299
				let result = match MultiaddrWithPeerId::try_from(peer_addr) {
300
					Ok(peer) => network_service.add_reserved_peer(peer),
301
					Err(err) => Err(err.to_string()),
302
				};
303
				let x = result.map_err(sc_rpc::system::error::Error::MalformattedPeerArg);
304
				let _ = sender.send(x);
305
			},
306
			sc_rpc::system::Request::NetworkRemoveReservedPeer(peer_id, sender) => {
307
				let _ = match peer_id.parse::<PeerId>() {
308
					Ok(peer_id) => {
309
						network_service.remove_reserved_peer(peer_id);
310
						sender.send(Ok(()))
311
					},
312
					Err(e) => sender.send(Err(sc_rpc::system::error::Error::MalformattedPeerArg(
313
						e.to_string(),
314
					))),
315
				};
316
			},
317
			sc_rpc::system::Request::NetworkReservedPeers(sender) => {
318
				let Ok(reserved_peers) = network_service.reserved_peers().await else {
319
					break;
320
				};
321

            
322
				let _ =
323
					sender.send(reserved_peers.iter().map(|peer_id| peer_id.to_base58()).collect());
324
			},
325
			sc_rpc::system::Request::NodeRoles(sender) => {
326
				use sc_rpc::system::NodeRole;
327

            
328
				let node_role = match role {
329
					Role::Authority { .. } => NodeRole::Authority,
330
					Role::Full => NodeRole::Full,
331
				};
332

            
333
				let _ = sender.send(vec![node_role]);
334
			},
335
			sc_rpc::system::Request::SyncState(sender) => {
336
				use sc_rpc::system::SyncState;
337

            
338
				match sync_service.best_seen_block().await {
339
					Ok(best_seen_block) => {
340
						let best_number = client.info().best_number;
341
						let _ = sender.send(SyncState {
342
							starting_block,
343
							current_block: best_number,
344
							highest_block: best_seen_block.unwrap_or(best_number),
345
						});
346
					},
347
					Err(_) => log::error!("`SyncingEngine` shut down"),
348
				}
349
			},
350
		}
351
	}
352

            
353
	debug!("`NetworkWorker` has terminated, shutting down the system RPC future.");
354
}
355

            
356
// Wrapper for HTTP and WS servers that makes sure they are properly shut down.
357
mod waiting {
358
	pub struct Server(pub Option<sc_rpc_server::Server>);
359

            
360
	impl Drop for Server {
361
		fn drop(&mut self) {
362
			if let Some(server) = self.0.take() {
363
				// This doesn't not wait for the server to be stopped but fires the signal.
364
				let _ = server.stop();
365
			}
366
		}
367
	}
368
}
369

            
370
/// Starts RPC servers.
371
pub fn start_rpc_servers<R>(
372
	config: &Configuration,
373
	gen_rpc_module: R,
374
	rpc_id_provider: Option<Box<dyn RpcSubscriptionIdProvider>>,
375
) -> Result<Box<dyn std::any::Any + Send + Sync>, error::Error>
376
where
377
	R: Fn(sc_rpc::DenyUnsafe) -> Result<RpcModule<()>, Error>,
378
{
379
	fn deny_unsafe(addr: SocketAddr, methods: &RpcMethods) -> sc_rpc::DenyUnsafe {
380
		let is_exposed_addr = !addr.ip().is_loopback();
381
		match (is_exposed_addr, methods) {
382
			| (_, RpcMethods::Unsafe) | (false, RpcMethods::Auto) => sc_rpc::DenyUnsafe::No,
383
			_ => sc_rpc::DenyUnsafe::Yes,
384
		}
385
	}
386

            
387
	// if binding the specified port failed then a random port is assigned by the OS.
388
	let backup_port = |mut addr: SocketAddr| {
389
		addr.set_port(0);
390
		addr
391
	};
392

            
393
	let addr = config.rpc_addr.unwrap_or_else(|| ([127, 0, 0, 1], config.rpc_port).into());
394
	let backup_addr = backup_port(addr);
395
	let metrics = sc_rpc_server::RpcMetrics::new(config.prometheus_registry())?;
396

            
397
	let server_config = sc_rpc_server::Config {
398
		addrs: [addr, backup_addr],
399
		batch_config: config.rpc_batch_config,
400
		max_connections: config.rpc_max_connections,
401
		max_payload_in_mb: config.rpc_max_request_size,
402
		max_payload_out_mb: config.rpc_max_response_size,
403
		max_subs_per_conn: config.rpc_max_subs_per_conn,
404
		message_buffer_capacity: config.rpc_message_buffer_capacity,
405
		rpc_api: gen_rpc_module(deny_unsafe(addr, &config.rpc_methods))?,
406
		metrics,
407
		id_provider: rpc_id_provider,
408
		cors: config.rpc_cors.as_ref(),
409
		tokio_handle: config.tokio_handle.clone(),
410
		rate_limit: config.rpc_rate_limit,
411
		rate_limit_whitelisted_ips: config.rpc_rate_limit_whitelisted_ips.clone(),
412
		rate_limit_trust_proxy_headers: config.rpc_rate_limit_trust_proxy_headers,
413
	};
414

            
415
	// TODO: https://github.com/paritytech/substrate/issues/13773
416
	//
417
	// `block_in_place` is a hack to allow callers to call `block_on` prior to
418
	// calling `start_rpc_servers`.
419
	match tokio::task::block_in_place(|| {
420
		config.tokio_handle.block_on(sc_rpc_server::start_server(server_config))
421
	}) {
422
		Ok(server) => Ok(Box::new(waiting::Server(Some(server)))),
423
		Err(e) => Err(Error::Application(e)),
424
	}
425
}
426

            
427
/// Transaction pool adapter.
428
pub struct TransactionPoolAdapter<C, P> {
429
	pool: Arc<P>,
430
	client: Arc<C>,
431
}
432

            
433
impl<C, P> TransactionPoolAdapter<C, P> {
434
	/// Constructs a new instance of [`TransactionPoolAdapter`].
435
	pub fn new(pool: Arc<P>, client: Arc<C>) -> Self {
436
		Self { pool, client }
437
	}
438
}
439

            
440
/// Get transactions for propagation.
441
///
442
/// Function extracted to simplify the test and prevent creating `ServiceFactory`.
443
fn transactions_to_propagate<Pool, B, H, E>(pool: &Pool) -> Vec<(H, B::Extrinsic)>
444
where
445
	Pool: TransactionPool<Block = B, Hash = H, Error = E>,
446
	B: BlockT,
447
	H: std::hash::Hash + Eq + sp_runtime::traits::Member + sp_runtime::traits::MaybeSerialize,
448
	E: IntoPoolError + From<sc_transaction_pool_api::error::Error>,
449
{
450
	pool.ready()
451
		.filter(|t| t.is_propagable())
452
		.map(|t| {
453
			let hash = t.hash().clone();
454
			let ex: B::Extrinsic = t.data().clone();
455
			(hash, ex)
456
		})
457
		.collect()
458
}
459

            
460
impl<B, H, C, Pool, E> sc_network_transactions::config::TransactionPool<H, B>
461
	for TransactionPoolAdapter<C, Pool>
462
where
463
	C: HeaderBackend<B>
464
		+ BlockBackend<B>
465
		+ HeaderMetadata<B, Error = sp_blockchain::Error>
466
		+ ProofProvider<B>
467
		+ Send
468
		+ Sync
469
		+ 'static,
470
	Pool: 'static + TransactionPool<Block = B, Hash = H, Error = E>,
471
	B: BlockT,
472
	H: std::hash::Hash + Eq + sp_runtime::traits::Member + sp_runtime::traits::MaybeSerialize,
473
	E: 'static + IntoPoolError + From<sc_transaction_pool_api::error::Error>,
474
{
475
	fn transactions(&self) -> Vec<(H, B::Extrinsic)> {
476
		transactions_to_propagate(&*self.pool)
477
	}
478

            
479
	fn hash_of(&self, transaction: &B::Extrinsic) -> H {
480
		self.pool.hash_of(transaction)
481
	}
482

            
483
	fn import(&self, transaction: B::Extrinsic) -> TransactionImportFuture {
484
		let encoded = transaction.encode();
485
		let uxt = match Decode::decode(&mut &encoded[..]) {
486
			Ok(uxt) => uxt,
487
			Err(e) => {
488
				debug!("Transaction invalid: {:?}", e);
489
				return Box::pin(futures::future::ready(TransactionImport::Bad))
490
			},
491
		};
492

            
493
		let import_future = self.pool.submit_one(
494
			self.client.info().best_hash,
495
			sc_transaction_pool_api::TransactionSource::External,
496
			uxt,
497
		);
498
		Box::pin(async move {
499
			match import_future.await {
500
				Ok(_) => TransactionImport::NewGood,
501
				Err(e) => match e.into_pool_error() {
502
					Ok(sc_transaction_pool_api::error::Error::AlreadyImported(_)) =>
503
						TransactionImport::KnownGood,
504
					Ok(e) => {
505
						debug!("Error adding transaction to the pool: {:?}", e);
506
						TransactionImport::Bad
507
					},
508
					Err(e) => {
509
						debug!("Error converting pool error: {}", e);
510
						// it is not bad at least, just some internal node logic error, so peer is
511
						// innocent.
512
						TransactionImport::KnownGood
513
					},
514
				},
515
			}
516
		})
517
	}
518

            
519
	fn on_broadcasted(&self, propagations: HashMap<H, Vec<String>>) {
520
		self.pool.on_broadcasted(propagations)
521
	}
522

            
523
	fn transaction(&self, hash: &H) -> Option<B::Extrinsic> {
524
		self.pool.ready_transaction(hash).and_then(
525
			// Only propagable transactions should be resolved for network service.
526
			|tx| if tx.is_propagable() { Some(tx.data().clone()) } else { None },
527
		)
528
	}
529
}
530

            
531
#[cfg(test)]
532
mod tests {
533
	use super::*;
534
	use futures::executor::block_on;
535
	use sc_transaction_pool::BasicPool;
536
	use sp_consensus::SelectChain;
537
	use substrate_test_runtime_client::{
538
		prelude::*,
539
		runtime::{ExtrinsicBuilder, Transfer, TransferData},
540
	};
541

            
542
	#[test]
543
	fn should_not_propagate_transactions_that_are_marked_as_such() {
544
		// given
545
		let (client, longest_chain) = TestClientBuilder::new().build_with_longest_chain();
546
		let client = Arc::new(client);
547
		let spawner = sp_core::testing::TaskExecutor::new();
548
		let pool =
549
			BasicPool::new_full(Default::default(), true.into(), None, spawner, client.clone());
550
		let source = sp_runtime::transaction_validity::TransactionSource::External;
551
		let best = block_on(longest_chain.best_chain()).unwrap();
552
		let transaction = Transfer {
553
			amount: 5,
554
			nonce: 0,
555
			from: AccountKeyring::Alice.into(),
556
			to: AccountKeyring::Bob.into(),
557
		}
558
		.into_unchecked_extrinsic();
559
		block_on(pool.submit_one(best.hash(), source, transaction.clone())).unwrap();
560
		block_on(pool.submit_one(
561
			best.hash(),
562
			source,
563
			ExtrinsicBuilder::new_call_do_not_propagate().nonce(1).build(),
564
		))
565
		.unwrap();
566
		assert_eq!(pool.status().ready, 2);
567

            
568
		// when
569
		let transactions = transactions_to_propagate(&*pool);
570

            
571
		// then
572
		assert_eq!(transactions.len(), 1);
573
		assert!(TransferData::try_from(&transactions[0].1).is_ok());
574
	}
575
}