1
// This file is part of Substrate.
2

            
3
// Copyright (C) Parity Technologies (UK) Ltd.
4
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5

            
6
// This program is free software: you can redistribute it and/or modify
7
// it under the terms of the GNU General Public License as published by
8
// the Free Software Foundation, either version 3 of the License, or
9
// (at your option) any later version.
10

            
11
// This program is distributed in the hope that it will be useful,
12
// but WITHOUT ANY WARRANTY; without even the implied warranty of
13
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
// GNU General Public License for more details.
15

            
16
// You should have received a copy of the GNU General Public License
17
// along with this program. If not, see <https://www.gnu.org/licenses/>.
18

            
19
use crate::{
20
	build_network_future, build_system_rpc_future,
21
	client::{Client, ClientConfig},
22
	config::{Configuration, KeystoreConfig, PrometheusConfig},
23
	error::Error,
24
	metrics::MetricsService,
25
	start_rpc_servers, BuildGenesisBlock, GenesisBlockBuilder, RpcHandlers, SpawnTaskHandle,
26
	TaskManager, TransactionPoolAdapter,
27
};
28
use futures::{channel::oneshot, future::ready, FutureExt, StreamExt};
29
use jsonrpsee::RpcModule;
30
use log::info;
31
use prometheus_endpoint::Registry;
32
use sc_chain_spec::get_extension;
33
use sc_client_api::{
34
	execution_extensions::ExecutionExtensions, proof_provider::ProofProvider, BadBlocks,
35
	BlockBackend, BlockchainEvents, ExecutorProvider, ForkBlocks, StorageProvider, UsageProvider,
36
};
37
use sc_client_db::{Backend, DatabaseSettings};
38
use sc_consensus::import_queue::ImportQueue;
39
use sc_executor::{
40
	sp_wasm_interface::HostFunctions, HeapAllocStrategy, NativeExecutionDispatch, RuntimeVersionOf,
41
	WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY,
42
};
43
use sc_keystore::LocalKeystore;
44
use sc_network::{
45
	config::{FullNetworkConfiguration, SyncMode},
46
	service::{
47
		traits::{PeerStore, RequestResponseConfig},
48
		NotificationMetrics,
49
	},
50
	NetworkBackend, NetworkStateInfo,
51
};
52
use sc_network_common::role::Roles;
53
use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
54
use sc_network_sync::{
55
	block_relay_protocol::BlockRelayParams, block_request_handler::BlockRequestHandler,
56
	engine::SyncingEngine, service::network::NetworkServiceProvider,
57
	state_request_handler::StateRequestHandler,
58
	warp_request_handler::RequestHandler as WarpSyncRequestHandler, SyncingService, WarpSyncParams,
59
};
60
use sc_rpc::{
61
	author::AuthorApiServer,
62
	chain::ChainApiServer,
63
	offchain::OffchainApiServer,
64
	state::{ChildStateApiServer, StateApiServer},
65
	system::SystemApiServer,
66
	DenyUnsafe, SubscriptionTaskExecutor,
67
};
68
use sc_rpc_spec_v2::{
69
	archive::ArchiveApiServer,
70
	chain_head::ChainHeadApiServer,
71
	transaction::{TransactionApiServer, TransactionBroadcastApiServer},
72
};
73
use sc_telemetry::{telemetry, ConnectionMessage, Telemetry, TelemetryHandle, SUBSTRATE_INFO};
74
use sc_transaction_pool_api::{MaintainedTransactionPool, TransactionPool};
75
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
76
use sp_api::{CallApiAt, ProvideRuntimeApi};
77
use sp_blockchain::{HeaderBackend, HeaderMetadata};
78
use sp_consensus::block_validation::{
79
	BlockAnnounceValidator, Chain, DefaultBlockAnnounceValidator,
80
};
81
use sp_core::traits::{CodeExecutor, SpawnNamed};
82
use sp_keystore::KeystorePtr;
83
use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero};
84
use std::{str::FromStr, sync::Arc, time::SystemTime};
85

            
86
/// Full client type.
87
pub type TFullClient<TBl, TRtApi, TExec> =
88
	Client<TFullBackend<TBl>, TFullCallExecutor<TBl, TExec>, TBl, TRtApi>;
89

            
90
/// Full client backend type.
91
pub type TFullBackend<TBl> = Backend<TBl>;
92

            
93
/// Full client call executor type.
94
pub type TFullCallExecutor<TBl, TExec> = crate::client::LocalCallExecutor<TBl, Backend<TBl>, TExec>;
95

            
96
type TFullParts<TBl, TRtApi, TExec> =
97
	(TFullClient<TBl, TRtApi, TExec>, Arc<TFullBackend<TBl>>, KeystoreContainer, TaskManager);
98

            
99
/// Construct a local keystore shareable container
100
pub struct KeystoreContainer(Arc<LocalKeystore>);
101

            
102
impl KeystoreContainer {
103
	/// Construct KeystoreContainer
104
	pub fn new(config: &KeystoreConfig) -> Result<Self, Error> {
105
		let keystore = Arc::new(match config {
106
			KeystoreConfig::Path { path, password } =>
107
				LocalKeystore::open(path.clone(), password.clone())?,
108
			KeystoreConfig::InMemory => LocalKeystore::in_memory(),
109
		});
110

            
111
		Ok(Self(keystore))
112
	}
113

            
114
	/// Returns a shared reference to a dynamic `Keystore` trait implementation.
115
	pub fn keystore(&self) -> KeystorePtr {
116
		self.0.clone()
117
	}
118

            
119
	/// Returns a shared reference to the local keystore .
120
	pub fn local_keystore(&self) -> Arc<LocalKeystore> {
121
		self.0.clone()
122
	}
123
}
124

            
125
/// Creates a new full client for the given config.
126
pub fn new_full_client<TBl, TRtApi, TExec>(
127
	config: &Configuration,
128
	telemetry: Option<TelemetryHandle>,
129
	executor: TExec,
130
) -> Result<TFullClient<TBl, TRtApi, TExec>, Error>
131
where
132
	TBl: BlockT,
133
	TExec: CodeExecutor + RuntimeVersionOf + Clone,
134
{
135
	new_full_parts(config, telemetry, executor).map(|parts| parts.0)
136
}
137

            
138
/// Create the initial parts of a full node with the default genesis block builder.
139
pub fn new_full_parts_record_import<TBl, TRtApi, TExec>(
140
	config: &Configuration,
141
	telemetry: Option<TelemetryHandle>,
142
	executor: TExec,
143
	enable_import_proof_recording: bool,
144
) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
145
where
146
	TBl: BlockT,
147
	TExec: CodeExecutor + RuntimeVersionOf + Clone,
148
{
149
	let backend = new_db_backend(config.db_config())?;
150

            
151
	let genesis_block_builder = GenesisBlockBuilder::new(
152
		config.chain_spec.as_storage_builder(),
153
		!config.no_genesis(),
154
		backend.clone(),
155
		executor.clone(),
156
	)?;
157

            
158
	new_full_parts_with_genesis_builder(
159
		config,
160
		telemetry,
161
		executor,
162
		backend,
163
		genesis_block_builder,
164
		enable_import_proof_recording,
165
	)
166
}
167
/// Create the initial parts of a full node with the default genesis block builder.
168
pub fn new_full_parts<TBl, TRtApi, TExec>(
169
	config: &Configuration,
170
	telemetry: Option<TelemetryHandle>,
171
	executor: TExec,
172
) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
173
where
174
	TBl: BlockT,
175
	TExec: CodeExecutor + RuntimeVersionOf + Clone,
176
{
177
	new_full_parts_record_import(config, telemetry, executor, false)
178
}
179

            
180
/// Create the initial parts of a full node.
181
pub fn new_full_parts_with_genesis_builder<TBl, TRtApi, TExec, TBuildGenesisBlock>(
182
	config: &Configuration,
183
	telemetry: Option<TelemetryHandle>,
184
	executor: TExec,
185
	backend: Arc<TFullBackend<TBl>>,
186
	genesis_block_builder: TBuildGenesisBlock,
187
	enable_import_proof_recording: bool,
188
) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
189
where
190
	TBl: BlockT,
191
	TExec: CodeExecutor + RuntimeVersionOf + Clone,
192
	TBuildGenesisBlock: BuildGenesisBlock<
193
		TBl,
194
		BlockImportOperation = <Backend<TBl> as sc_client_api::backend::Backend<TBl>>::BlockImportOperation
195
	>,
196
{
197
	let keystore_container = KeystoreContainer::new(&config.keystore)?;
198

            
199
	let task_manager = {
200
		let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
201
		TaskManager::new(config.tokio_handle.clone(), registry)?
202
	};
203

            
204
	let chain_spec = &config.chain_spec;
205
	let fork_blocks = get_extension::<ForkBlocks<TBl>>(chain_spec.extensions())
206
		.cloned()
207
		.unwrap_or_default();
208

            
209
	let bad_blocks = get_extension::<BadBlocks<TBl>>(chain_spec.extensions())
210
		.cloned()
211
		.unwrap_or_default();
212

            
213
	let client = {
214
		let extensions = sc_client_api::execution_extensions::ExecutionExtensions::new(
215
			None,
216
			Arc::new(executor.clone()),
217
		);
218

            
219
		let wasm_runtime_substitutes = config
220
			.chain_spec
221
			.code_substitutes()
222
			.into_iter()
223
			.map(|(n, c)| {
224
				let number = NumberFor::<TBl>::from_str(&n).map_err(|_| {
225
					Error::Application(Box::from(format!(
226
						"Failed to parse `{}` as block number for code substitutes. \
227
						 In an old version the key for code substitute was a block hash. \
228
						 Please update the chain spec to a version that is compatible with your node.",
229
						n
230
					)))
231
				})?;
232
				Ok((number, c))
233
			})
234
			.collect::<Result<std::collections::HashMap<_, _>, Error>>()?;
235

            
236
		let client = new_client(
237
			backend.clone(),
238
			executor,
239
			genesis_block_builder,
240
			fork_blocks,
241
			bad_blocks,
242
			extensions,
243
			Box::new(task_manager.spawn_handle()),
244
			config.prometheus_config.as_ref().map(|config| config.registry.clone()),
245
			telemetry,
246
			ClientConfig {
247
				offchain_worker_enabled: config.offchain_worker.enabled,
248
				offchain_indexing_api: config.offchain_worker.indexing_enabled,
249
				wasmtime_precompiled: config.wasmtime_precompiled.clone(),
250
				wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
251
				no_genesis: matches!(
252
					config.network.sync_mode,
253
					SyncMode::LightState { .. } | SyncMode::Warp { .. }
254
				),
255
				wasm_runtime_substitutes,
256
				enable_import_proof_recording,
257
			},
258
		)?;
259

            
260
		client
261
	};
262

            
263
	Ok((client, backend, keystore_container, task_manager))
264
}
265

            
266
/// Creates a [`NativeElseWasmExecutor`](sc_executor::NativeElseWasmExecutor) according to
267
/// [`Configuration`].
268
#[deprecated(note = "Please switch to `new_wasm_executor`. Will be removed at end of 2024.")]
269
#[allow(deprecated)]
270
pub fn new_native_or_wasm_executor<D: NativeExecutionDispatch>(
271
	config: &Configuration,
272
) -> sc_executor::NativeElseWasmExecutor<D> {
273
	#[allow(deprecated)]
274
	sc_executor::NativeElseWasmExecutor::new_with_wasm_executor(new_wasm_executor(config))
275
}
276

            
277
/// Creates a [`WasmExecutor`] according to [`Configuration`].
278
pub fn new_wasm_executor<H: HostFunctions>(config: &Configuration) -> WasmExecutor<H> {
279
	let strategy = config
280
		.default_heap_pages
281
		.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |p| HeapAllocStrategy::Static { extra_pages: p as _ });
282
	let mut wasm_builder = WasmExecutor::<H>::builder()
283
		.with_execution_method(config.wasm_method)
284
		.with_onchain_heap_alloc_strategy(strategy)
285
		.with_offchain_heap_alloc_strategy(strategy)
286
		.with_max_runtime_instances(config.max_runtime_instances)
287
		.with_runtime_cache_size(config.runtime_cache_size);
288

            
289
	if let Some(ref wasmtime_precompiled_path) = config.wasmtime_precompiled {
290
		wasm_builder = wasm_builder.with_wasmtime_precompiled_path(wasmtime_precompiled_path);
291
	}
292
	
293
	wasm_builder.build()
294
}
295

            
296
/// Create an instance of default DB-backend backend.
297
pub fn new_db_backend<Block>(
298
	settings: DatabaseSettings,
299
) -> Result<Arc<Backend<Block>>, sp_blockchain::Error>
300
where
301
	Block: BlockT,
302
{
303
	const CANONICALIZATION_DELAY: u64 = 4096;
304

            
305
	Ok(Arc::new(Backend::new(settings, CANONICALIZATION_DELAY)?))
306
}
307

            
308
/// Create an instance of client backed by given backend.
309
pub fn new_client<E, Block, RA, G>(
310
	backend: Arc<Backend<Block>>,
311
	executor: E,
312
	genesis_block_builder: G,
313
	fork_blocks: ForkBlocks<Block>,
314
	bad_blocks: BadBlocks<Block>,
315
	execution_extensions: ExecutionExtensions<Block>,
316
	spawn_handle: Box<dyn SpawnNamed>,
317
	prometheus_registry: Option<Registry>,
318
	telemetry: Option<TelemetryHandle>,
319
	config: ClientConfig<Block>,
320
) -> Result<
321
	Client<
322
		Backend<Block>,
323
		crate::client::LocalCallExecutor<Block, Backend<Block>, E>,
324
		Block,
325
		RA,
326
	>,
327
	sp_blockchain::Error,
328
>
329
where
330
	Block: BlockT,
331
	E: CodeExecutor + RuntimeVersionOf,
332
	G: BuildGenesisBlock<
333
		Block,
334
		BlockImportOperation = <Backend<Block> as sc_client_api::backend::Backend<Block>>::BlockImportOperation
335
	>,
336
{
337
	let executor = crate::client::LocalCallExecutor::new(
338
		backend.clone(),
339
		executor,
340
		config.clone(),
341
		execution_extensions,
342
	)?;
343

            
344
	Client::new(
345
		backend,
346
		executor,
347
		spawn_handle,
348
		genesis_block_builder,
349
		fork_blocks,
350
		bad_blocks,
351
		prometheus_registry,
352
		telemetry,
353
		config,
354
	)
355
}
356

            
357
/// Parameters to pass into `build`.
358
pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
359
	/// The service configuration.
360
	pub config: Configuration,
361
	/// A shared client returned by `new_full_parts`.
362
	pub client: Arc<TCl>,
363
	/// A shared backend returned by `new_full_parts`.
364
	pub backend: Arc<Backend>,
365
	/// A task manager returned by `new_full_parts`.
366
	pub task_manager: &'a mut TaskManager,
367
	/// A shared keystore returned by `new_full_parts`.
368
	pub keystore: KeystorePtr,
369
	/// A shared transaction pool.
370
	pub transaction_pool: Arc<TExPool>,
371
	/// Builds additional [`RpcModule`]s that should be added to the server
372
	pub rpc_builder:
373
		Box<dyn Fn(DenyUnsafe, SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>>,
374
	/// A shared network instance.
375
	pub network: Arc<dyn sc_network::service::traits::NetworkService>,
376
	/// A Sender for RPC requests.
377
	pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
378
	/// Controller for transactions handlers
379
	pub tx_handler_controller:
380
		sc_network_transactions::TransactionsHandlerController<<TBl as BlockT>::Hash>,
381
	/// Syncing service.
382
	pub sync_service: Arc<SyncingService<TBl>>,
383
	/// Telemetry instance for this node.
384
	pub telemetry: Option<&'a mut Telemetry>,
385
}
386

            
387
/// Spawn the tasks that are required to run a node.
388
pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
389
	params: SpawnTasksParams<TBl, TCl, TExPool, TRpc, TBackend>,
390
) -> Result<RpcHandlers, Error>
391
where
392
	TCl: ProvideRuntimeApi<TBl>
393
		+ HeaderMetadata<TBl, Error = sp_blockchain::Error>
394
		+ Chain<TBl>
395
		+ BlockBackend<TBl>
396
		+ BlockIdTo<TBl, Error = sp_blockchain::Error>
397
		+ ProofProvider<TBl>
398
		+ HeaderBackend<TBl>
399
		+ BlockchainEvents<TBl>
400
		+ ExecutorProvider<TBl>
401
		+ UsageProvider<TBl>
402
		+ StorageProvider<TBl, TBackend>
403
		+ CallApiAt<TBl>
404
		+ Send
405
		+ 'static,
406
	<TCl as ProvideRuntimeApi<TBl>>::Api: sp_api::Metadata<TBl>
407
		+ sp_transaction_pool::runtime_api::TaggedTransactionQueue<TBl>
408
		+ sp_session::SessionKeys<TBl>
409
		+ sp_api::ApiExt<TBl>,
410
	TBl: BlockT,
411
	TBl::Hash: Unpin,
412
	TBl::Header: Unpin,
413
	TBackend: 'static + sc_client_api::backend::Backend<TBl> + Send,
414
	TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
415
{
416
	let SpawnTasksParams {
417
		mut config,
418
		task_manager,
419
		client,
420
		backend,
421
		keystore,
422
		transaction_pool,
423
		rpc_builder,
424
		network,
425
		system_rpc_tx,
426
		tx_handler_controller,
427
		sync_service,
428
		telemetry,
429
	} = params;
430

            
431
	let chain_info = client.usage_info().chain;
432

            
433
	sp_session::generate_initial_session_keys(
434
		client.clone(),
435
		chain_info.best_hash,
436
		config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(),
437
		keystore.clone(),
438
	)
439
	.map_err(|e| Error::Application(Box::new(e)))?;
440

            
441
	let sysinfo = sc_sysinfo::gather_sysinfo();
442
	sc_sysinfo::print_sysinfo(&sysinfo);
443

            
444
	let telemetry = telemetry
445
		.map(|telemetry| {
446
			init_telemetry(&mut config, network.clone(), client.clone(), telemetry, Some(sysinfo))
447
		})
448
		.transpose()?;
449

            
450
	info!("📦 Highest known block at #{}", chain_info.best_number);
451

            
452
	let spawn_handle = task_manager.spawn_handle();
453

            
454
	// Inform the tx pool about imported and finalized blocks.
455
	spawn_handle.spawn(
456
		"txpool-notifications",
457
		Some("transaction-pool"),
458
		sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()),
459
	);
460

            
461
	spawn_handle.spawn(
462
		"on-transaction-imported",
463
		Some("transaction-pool"),
464
		propagate_transaction_notifications(
465
			transaction_pool.clone(),
466
			tx_handler_controller,
467
			telemetry.clone(),
468
		),
469
	);
470

            
471
	// Prometheus metrics.
472
	let metrics_service =
473
		if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() {
474
			// Set static metrics.
475
			let metrics = MetricsService::with_prometheus(telemetry, &registry, &config)?;
476
			spawn_handle.spawn(
477
				"prometheus-endpoint",
478
				None,
479
				prometheus_endpoint::init_prometheus(port, registry).map(drop),
480
			);
481

            
482
			metrics
483
		} else {
484
			MetricsService::new(telemetry)
485
		};
486

            
487
	// Periodically updated metrics and telemetry updates.
488
	spawn_handle.spawn(
489
		"telemetry-periodic-send",
490
		None,
491
		metrics_service.run(
492
			client.clone(),
493
			transaction_pool.clone(),
494
			network.clone(),
495
			sync_service.clone(),
496
		),
497
	);
498

            
499
	let rpc_id_provider = config.rpc_id_provider.take();
500

            
501
	// jsonrpsee RPC
502
	let gen_rpc_module = |deny_unsafe: DenyUnsafe| {
503
		gen_rpc_module(
504
			deny_unsafe,
505
			task_manager.spawn_handle(),
506
			client.clone(),
507
			transaction_pool.clone(),
508
			keystore.clone(),
509
			system_rpc_tx.clone(),
510
			&config,
511
			backend.clone(),
512
			&*rpc_builder,
513
		)
514
	};
515

            
516
	let rpc = start_rpc_servers(&config, gen_rpc_module, rpc_id_provider)?;
517
	let rpc_handlers = RpcHandlers(Arc::new(gen_rpc_module(sc_rpc::DenyUnsafe::No)?.into()));
518

            
519
	// Spawn informant task
520
	spawn_handle.spawn(
521
		"informant",
522
		None,
523
		sc_informant::build(
524
			client.clone(),
525
			network,
526
			sync_service.clone(),
527
			config.informant_output_format,
528
		),
529
	);
530

            
531
	task_manager.keep_alive((config.base_path, rpc));
532

            
533
	Ok(rpc_handlers)
534
}
535

            
536
/// Returns a future that forwards imported transactions to the transaction networking protocol.
537
pub async fn propagate_transaction_notifications<Block, ExPool>(
538
	transaction_pool: Arc<ExPool>,
539
	tx_handler_controller: sc_network_transactions::TransactionsHandlerController<
540
		<Block as BlockT>::Hash,
541
	>,
542
	telemetry: Option<TelemetryHandle>,
543
) where
544
	Block: BlockT,
545
	ExPool: MaintainedTransactionPool<Block = Block, Hash = <Block as BlockT>::Hash>,
546
{
547
	// transaction notifications
548
	transaction_pool
549
		.import_notification_stream()
550
		.for_each(move |hash| {
551
			tx_handler_controller.propagate_transaction(hash);
552
			let status = transaction_pool.status();
553
			telemetry!(
554
				telemetry;
555
				SUBSTRATE_INFO;
556
				"txpool.import";
557
				"ready" => status.ready,
558
				"future" => status.future,
559
			);
560
			ready(())
561
		})
562
		.await;
563
}
564

            
565
/// Initialize telemetry with provided configuration and return telemetry handle
566
pub fn init_telemetry<Block, Client, Network>(
567
	config: &mut Configuration,
568
	network: Network,
569
	client: Arc<Client>,
570
	telemetry: &mut Telemetry,
571
	sysinfo: Option<sc_telemetry::SysInfo>,
572
) -> sc_telemetry::Result<TelemetryHandle>
573
where
574
	Block: BlockT,
575
	Client: BlockBackend<Block>,
576
	Network: NetworkStateInfo,
577
{
578
	let genesis_hash = client.block_hash(Zero::zero()).ok().flatten().unwrap_or_default();
579
	let connection_message = ConnectionMessage {
580
		name: config.network.node_name.to_owned(),
581
		implementation: config.impl_name.to_owned(),
582
		version: config.impl_version.to_owned(),
583
		target_os: sc_sysinfo::TARGET_OS.into(),
584
		target_arch: sc_sysinfo::TARGET_ARCH.into(),
585
		target_env: sc_sysinfo::TARGET_ENV.into(),
586
		config: String::new(),
587
		chain: config.chain_spec.name().to_owned(),
588
		genesis_hash: format!("{:?}", genesis_hash),
589
		authority: config.role.is_authority(),
590
		startup_time: SystemTime::UNIX_EPOCH
591
			.elapsed()
592
			.map(|dur| dur.as_millis())
593
			.unwrap_or(0)
594
			.to_string(),
595
		network_id: network.local_peer_id().to_base58(),
596
		sysinfo,
597
	};
598

            
599
	telemetry.start_telemetry(connection_message)?;
600

            
601
	Ok(telemetry.handle())
602
}
603

            
604
/// Generate RPC module using provided configuration
605
pub fn gen_rpc_module<TBl, TBackend, TCl, TRpc, TExPool>(
606
	deny_unsafe: DenyUnsafe,
607
	spawn_handle: SpawnTaskHandle,
608
	client: Arc<TCl>,
609
	transaction_pool: Arc<TExPool>,
610
	keystore: KeystorePtr,
611
	system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
612
	config: &Configuration,
613
	backend: Arc<TBackend>,
614
	rpc_builder: &(dyn Fn(DenyUnsafe, SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>),
615
) -> Result<RpcModule<()>, Error>
616
where
617
	TBl: BlockT,
618
	TCl: ProvideRuntimeApi<TBl>
619
		+ BlockchainEvents<TBl>
620
		+ HeaderBackend<TBl>
621
		+ HeaderMetadata<TBl, Error = sp_blockchain::Error>
622
		+ ExecutorProvider<TBl>
623
		+ CallApiAt<TBl>
624
		+ ProofProvider<TBl>
625
		+ StorageProvider<TBl, TBackend>
626
		+ BlockBackend<TBl>
627
		+ Send
628
		+ Sync
629
		+ 'static,
630
	TBackend: sc_client_api::backend::Backend<TBl> + 'static,
631
	<TCl as ProvideRuntimeApi<TBl>>::Api: sp_session::SessionKeys<TBl> + sp_api::Metadata<TBl>,
632
	TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
633
	TBl::Hash: Unpin,
634
	TBl::Header: Unpin,
635
{
636
	let system_info = sc_rpc::system::SystemInfo {
637
		chain_name: config.chain_spec.name().into(),
638
		impl_name: config.impl_name.clone(),
639
		impl_version: config.impl_version.clone(),
640
		properties: config.chain_spec.properties(),
641
		chain_type: config.chain_spec.chain_type(),
642
	};
643

            
644
	let mut rpc_api = RpcModule::new(());
645
	let task_executor = Arc::new(spawn_handle);
646

            
647
	let (chain, state, child_state) = {
648
		let chain = sc_rpc::chain::new_full(client.clone(), task_executor.clone()).into_rpc();
649
		let (state, child_state) =
650
			sc_rpc::state::new_full(client.clone(), task_executor.clone(), deny_unsafe);
651
		let state = state.into_rpc();
652
		let child_state = child_state.into_rpc();
653

            
654
		(chain, state, child_state)
655
	};
656

            
657
	const MAX_TRANSACTION_PER_CONNECTION: usize = 16;
658

            
659
	let transaction_broadcast_rpc_v2 = sc_rpc_spec_v2::transaction::TransactionBroadcast::new(
660
		client.clone(),
661
		transaction_pool.clone(),
662
		task_executor.clone(),
663
		MAX_TRANSACTION_PER_CONNECTION,
664
	)
665
	.into_rpc();
666

            
667
	let transaction_v2 = sc_rpc_spec_v2::transaction::Transaction::new(
668
		client.clone(),
669
		transaction_pool.clone(),
670
		task_executor.clone(),
671
	)
672
	.into_rpc();
673

            
674
	let chain_head_v2 = sc_rpc_spec_v2::chain_head::ChainHead::new(
675
		client.clone(),
676
		backend.clone(),
677
		task_executor.clone(),
678
		// Defaults to sensible limits for the `ChainHead`.
679
		sc_rpc_spec_v2::chain_head::ChainHeadConfig::default(),
680
	)
681
	.into_rpc();
682

            
683
	// Part of the RPC v2 spec.
684
	// An archive node that can respond to the `archive` RPC-v2 queries is a node with:
685
	// - state pruning in archive mode: The storage of blocks is kept around
686
	// - block pruning in archive mode: The block's body is kept around
687
	let is_archive_node = config.state_pruning.as_ref().map(|sp| sp.is_archive()).unwrap_or(false) &&
688
		config.blocks_pruning.is_archive();
689
	if is_archive_node {
690
		let genesis_hash =
691
			client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed");
692
		let archive_v2 = sc_rpc_spec_v2::archive::Archive::new(
693
			client.clone(),
694
			backend.clone(),
695
			genesis_hash,
696
			// Defaults to sensible limits for the `Archive`.
697
			sc_rpc_spec_v2::archive::ArchiveConfig::default(),
698
		)
699
		.into_rpc();
700
		rpc_api.merge(archive_v2).map_err(|e| Error::Application(e.into()))?;
701
	}
702

            
703
	let author = sc_rpc::author::Author::new(
704
		client.clone(),
705
		transaction_pool,
706
		keystore,
707
		deny_unsafe,
708
		task_executor.clone(),
709
	)
710
	.into_rpc();
711

            
712
	let system = sc_rpc::system::System::new(system_info, system_rpc_tx, deny_unsafe).into_rpc();
713

            
714
	if let Some(storage) = backend.offchain_storage() {
715
		let offchain = sc_rpc::offchain::Offchain::new(storage, deny_unsafe).into_rpc();
716

            
717
		rpc_api.merge(offchain).map_err(|e| Error::Application(e.into()))?;
718
	}
719

            
720
	// Part of the RPC v2 spec.
721
	rpc_api.merge(transaction_v2).map_err(|e| Error::Application(e.into()))?;
722
	rpc_api
723
		.merge(transaction_broadcast_rpc_v2)
724
		.map_err(|e| Error::Application(e.into()))?;
725
	rpc_api.merge(chain_head_v2).map_err(|e| Error::Application(e.into()))?;
726

            
727
	// Part of the old RPC spec.
728
	rpc_api.merge(chain).map_err(|e| Error::Application(e.into()))?;
729
	rpc_api.merge(author).map_err(|e| Error::Application(e.into()))?;
730
	rpc_api.merge(system).map_err(|e| Error::Application(e.into()))?;
731
	rpc_api.merge(state).map_err(|e| Error::Application(e.into()))?;
732
	rpc_api.merge(child_state).map_err(|e| Error::Application(e.into()))?;
733
	// Additional [`RpcModule`]s defined in the node to fit the specific blockchain
734
	let extra_rpcs = rpc_builder(deny_unsafe, task_executor.clone())?;
735
	rpc_api.merge(extra_rpcs).map_err(|e| Error::Application(e.into()))?;
736

            
737
	Ok(rpc_api)
738
}
739

            
740
/// Parameters to pass into `build_network`.
741
pub struct BuildNetworkParams<
742
	'a,
743
	TBl: BlockT,
744
	TNet: NetworkBackend<TBl, <TBl as BlockT>::Hash>,
745
	TExPool,
746
	TImpQu,
747
	TCl,
748
> {
749
	/// The service configuration.
750
	pub config: &'a Configuration,
751
	/// Full network configuration.
752
	pub net_config: FullNetworkConfiguration<TBl, <TBl as BlockT>::Hash, TNet>,
753
	/// A shared client returned by `new_full_parts`.
754
	pub client: Arc<TCl>,
755
	/// A shared transaction pool.
756
	pub transaction_pool: Arc<TExPool>,
757
	/// A handle for spawning tasks.
758
	pub spawn_handle: SpawnTaskHandle,
759
	/// An import queue.
760
	pub import_queue: TImpQu,
761
	/// A block announce validator builder.
762
	pub block_announce_validator_builder:
763
		Option<Box<dyn FnOnce(Arc<TCl>) -> Box<dyn BlockAnnounceValidator<TBl> + Send> + Send>>,
764
	/// Optional warp sync params.
765
	pub warp_sync_params: Option<WarpSyncParams<TBl>>,
766
	/// User specified block relay params. If not specified, the default
767
	/// block request handler will be used.
768
	pub block_relay: Option<BlockRelayParams<TBl, TNet>>,
769
	/// Metrics.
770
	pub metrics: NotificationMetrics,
771
}
772

            
773
/// Build the network service, the network status sinks and an RPC sender.
774
pub fn build_network<TBl, TNet, TExPool, TImpQu, TCl>(
775
	params: BuildNetworkParams<TBl, TNet, TExPool, TImpQu, TCl>,
776
) -> Result<
777
	(
778
		Arc<dyn sc_network::service::traits::NetworkService>,
779
		TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
780
		sc_network_transactions::TransactionsHandlerController<<TBl as BlockT>::Hash>,
781
		NetworkStarter,
782
		Arc<SyncingService<TBl>>,
783
	),
784
	Error,
785
>
786
where
787
	TBl: BlockT,
788
	TCl: ProvideRuntimeApi<TBl>
789
		+ HeaderMetadata<TBl, Error = sp_blockchain::Error>
790
		+ Chain<TBl>
791
		+ BlockBackend<TBl>
792
		+ BlockIdTo<TBl, Error = sp_blockchain::Error>
793
		+ ProofProvider<TBl>
794
		+ HeaderBackend<TBl>
795
		+ BlockchainEvents<TBl>
796
		+ 'static,
797
	TExPool: TransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
798
	TImpQu: ImportQueue<TBl> + 'static,
799
	TNet: NetworkBackend<TBl, <TBl as BlockT>::Hash>,
800
{
801
	let BuildNetworkParams {
802
		config,
803
		mut net_config,
804
		client,
805
		transaction_pool,
806
		spawn_handle,
807
		import_queue,
808
		block_announce_validator_builder,
809
		warp_sync_params,
810
		block_relay,
811
		metrics,
812
	} = params;
813

            
814
	if warp_sync_params.is_none() && config.network.sync_mode.is_warp() {
815
		return Err("Warp sync enabled, but no warp sync provider configured.".into())
816
	}
817

            
818
	if client.requires_full_sync() {
819
		match config.network.sync_mode {
820
			SyncMode::LightState { .. } =>
821
				return Err("Fast sync doesn't work for archive nodes".into()),
822
			SyncMode::Warp => return Err("Warp sync doesn't work for archive nodes".into()),
823
			SyncMode::Full => {},
824
		}
825
	}
826

            
827
	let protocol_id = config.protocol_id();
828
	let genesis_hash = client
829
		.block_hash(0u32.into())
830
		.ok()
831
		.flatten()
832
		.expect("Genesis block exists; qed");
833

            
834
	let block_announce_validator = if let Some(f) = block_announce_validator_builder {
835
		f(client.clone())
836
	} else {
837
		Box::new(DefaultBlockAnnounceValidator)
838
	};
839

            
840
	let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new();
841
	let (mut block_server, block_downloader, block_request_protocol_config) = match block_relay {
842
		Some(params) => (params.server, params.downloader, params.request_response_config),
843
		None => {
844
			// Custom protocol was not specified, use the default block handler.
845
			// Allow both outgoing and incoming requests.
846
			let params = BlockRequestHandler::new::<TNet>(
847
				chain_sync_network_handle.clone(),
848
				&protocol_id,
849
				config.chain_spec.fork_id(),
850
				client.clone(),
851
				config.network.default_peers_set.in_peers as usize +
852
					config.network.default_peers_set.out_peers as usize,
853
			);
854
			(params.server, params.downloader, params.request_response_config)
855
		},
856
	};
857
	spawn_handle.spawn("block-request-handler", Some("networking"), async move {
858
		block_server.run().await;
859
	});
860

            
861
	let (state_request_protocol_config, state_request_protocol_name) = {
862
		let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize +
863
			net_config.network_config.default_peers_set.reserved_nodes.len();
864
		// Allow both outgoing and incoming requests.
865
		let (handler, protocol_config) = StateRequestHandler::new::<TNet>(
866
			&protocol_id,
867
			config.chain_spec.fork_id(),
868
			client.clone(),
869
			num_peer_hint,
870
		);
871
		let config_name = protocol_config.protocol_name().clone();
872

            
873
		spawn_handle.spawn("state-request-handler", Some("networking"), handler.run());
874
		(protocol_config, config_name)
875
	};
876

            
877
	let (warp_sync_protocol_config, warp_request_protocol_name) = match warp_sync_params.as_ref() {
878
		Some(WarpSyncParams::WithProvider(warp_with_provider)) => {
879
			// Allow both outgoing and incoming requests.
880
			let (handler, protocol_config) = WarpSyncRequestHandler::new::<_, TNet>(
881
				protocol_id.clone(),
882
				genesis_hash,
883
				config.chain_spec.fork_id(),
884
				warp_with_provider.clone(),
885
			);
886
			let config_name = protocol_config.protocol_name().clone();
887

            
888
			spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
889
			(Some(protocol_config), Some(config_name))
890
		},
891
		_ => (None, None),
892
	};
893

            
894
	let light_client_request_protocol_config = {
895
		// Allow both outgoing and incoming requests.
896
		let (handler, protocol_config) = LightClientRequestHandler::new::<TNet>(
897
			&protocol_id,
898
			config.chain_spec.fork_id(),
899
			client.clone(),
900
		);
901
		spawn_handle.spawn("light-client-request-handler", Some("networking"), handler.run());
902
		protocol_config
903
	};
904

            
905
	// install request handlers to `FullNetworkConfiguration`
906
	net_config.add_request_response_protocol(block_request_protocol_config);
907
	net_config.add_request_response_protocol(state_request_protocol_config);
908
	net_config.add_request_response_protocol(light_client_request_protocol_config);
909

            
910
	if let Some(config) = warp_sync_protocol_config {
911
		net_config.add_request_response_protocol(config);
912
	}
913

            
914
	let bitswap_config = config.network.ipfs_server.then(|| {
915
		let (handler, config) = TNet::bitswap_server(client.clone());
916
		spawn_handle.spawn("bitswap-request-handler", Some("networking"), handler);
917

            
918
		config
919
	});
920

            
921
	// create transactions protocol and add it to the list of supported protocols of
922
	let peer_store_handle = net_config.peer_store_handle();
923
	let (transactions_handler_proto, transactions_config) =
924
		sc_network_transactions::TransactionsHandlerPrototype::new::<_, TBl, TNet>(
925
			protocol_id.clone(),
926
			genesis_hash,
927
			config.chain_spec.fork_id(),
928
			metrics.clone(),
929
			Arc::clone(&peer_store_handle),
930
		);
931
	net_config.add_notification_protocol(transactions_config);
932

            
933
	// Start task for `PeerStore`
934
	let peer_store = net_config.take_peer_store();
935
	let peer_store_handle = peer_store.handle();
936
	spawn_handle.spawn("peer-store", Some("networking"), peer_store.run());
937

            
938
	let (engine, sync_service, block_announce_config) = SyncingEngine::new(
939
		Roles::from(&config.role),
940
		client.clone(),
941
		config.prometheus_config.as_ref().map(|config| config.registry.clone()).as_ref(),
942
		metrics.clone(),
943
		&net_config,
944
		protocol_id.clone(),
945
		&config.chain_spec.fork_id().map(ToOwned::to_owned),
946
		block_announce_validator,
947
		warp_sync_params,
948
		chain_sync_network_handle,
949
		import_queue.service(),
950
		block_downloader,
951
		state_request_protocol_name,
952
		warp_request_protocol_name,
953
		Arc::clone(&peer_store_handle),
954
	)?;
955
	let sync_service_import_queue = sync_service.clone();
956
	let sync_service = Arc::new(sync_service);
957

            
958
	let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed");
959
	let network_params = sc_network::config::Params::<TBl, <TBl as BlockT>::Hash, TNet> {
960
		role: config.role.clone(),
961
		executor: {
962
			let spawn_handle = Clone::clone(&spawn_handle);
963
			Box::new(move |fut| {
964
				spawn_handle.spawn("libp2p-node", Some("networking"), fut);
965
			})
966
		},
967
		network_config: net_config,
968
		genesis_hash,
969
		protocol_id: protocol_id.clone(),
970
		fork_id: config.chain_spec.fork_id().map(ToOwned::to_owned),
971
		metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()),
972
		block_announce_config,
973
		bitswap_config,
974
		notification_metrics: metrics,
975
	};
976

            
977
	let has_bootnodes = !network_params.network_config.network_config.boot_nodes.is_empty();
978
	let network_mut = TNet::new(network_params)?;
979
	let network = network_mut.network_service().clone();
980

            
981
	let (tx_handler, tx_handler_controller) = transactions_handler_proto.build(
982
		network.clone(),
983
		sync_service.clone(),
984
		Arc::new(TransactionPoolAdapter { pool: transaction_pool, client: client.clone() }),
985
		config.prometheus_config.as_ref().map(|config| &config.registry),
986
	)?;
987
	spawn_handle.spawn_blocking(
988
		"network-transactions-handler",
989
		Some("networking"),
990
		tx_handler.run(),
991
	);
992

            
993
	spawn_handle.spawn_blocking(
994
		"chain-sync-network-service-provider",
995
		Some("networking"),
996
		chain_sync_network_provider.run(Arc::new(network.clone())),
997
	);
998
	spawn_handle.spawn("import-queue", None, import_queue.run(Box::new(sync_service_import_queue)));
999
	spawn_handle.spawn_blocking("syncing", None, engine.run());
	let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000);
	spawn_handle.spawn(
		"system-rpc-handler",
		Some("networking"),
		build_system_rpc_future::<_, _, <TBl as BlockT>::Hash>(
			config.role.clone(),
			network_mut.network_service(),
			sync_service.clone(),
			client.clone(),
			system_rpc_rx,
			has_bootnodes,
		),
	);
	let future = build_network_future::<_, _, <TBl as BlockT>::Hash, _>(
		network_mut,
		client,
		sync_service.clone(),
		config.announce_block,
	);
	// TODO: Normally, one is supposed to pass a list of notifications protocols supported by the
	// node through the `NetworkConfiguration` struct. But because this function doesn't know in
	// advance which components, such as GrandPa or Polkadot, will be plugged on top of the
	// service, it is unfortunately not possible to do so without some deep refactoring. To
	// bypass this problem, the `NetworkService` provides a `register_notifications_protocol`
	// method that can be called even after the network has been initialized. However, we want to
	// avoid the situation where `register_notifications_protocol` is called *after* the network
	// actually connects to other peers. For this reason, we delay the process of the network
	// future until the user calls `NetworkStarter::start_network`.
	//
	// This entire hack should eventually be removed in favour of passing the list of protocols
	// through the configuration.
	//
	// See also https://github.com/paritytech/substrate/issues/6827
	let (network_start_tx, network_start_rx) = oneshot::channel();
	// The network worker is responsible for gathering all network messages and processing
	// them. This is quite a heavy task, and at the time of the writing of this comment it
	// frequently happens that this future takes several seconds or in some situations
	// even more than a minute until it has processed its entire queue. This is clearly an
	// issue, and ideally we would like to fix the network future to take as little time as
	// possible, but we also take the extra harm-prevention measure to execute the networking
	// future using `spawn_blocking`.
	spawn_handle.spawn_blocking("network-worker", Some("networking"), async move {
		if network_start_rx.await.is_err() {
			log::warn!(
				"The NetworkStart returned as part of `build_network` has been silently dropped"
			);
			// This `return` might seem unnecessary, but we don't want to make it look like
			// everything is working as normal even though the user is clearly misusing the API.
			return
		}
		future.await
	});
	Ok((
		network,
		system_rpc_tx,
		tx_handler_controller,
		NetworkStarter(network_start_tx),
		sync_service.clone(),
	))
}
/// Object used to start the network.
#[must_use]
pub struct NetworkStarter(oneshot::Sender<()>);
impl NetworkStarter {
	/// Create a new NetworkStarter
	pub fn new(sender: oneshot::Sender<()>) -> Self {
		NetworkStarter(sender)
	}
	/// Start the network. Call this after all sub-components have been initialized.
	///
	/// > **Note**: If you don't call this function, the networking will not work.
	pub fn start_network(self) {
		let _ = self.0.send(());
	}
}