1
// This file is part of Substrate.
2

            
3
// Copyright (C) Parity Technologies (UK) Ltd.
4
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5

            
6
// This program is free software: you can redistribute it and/or modify
7
// it under the terms of the GNU General Public License as published by
8
// the Free Software Foundation, either version 3 of the License, or
9
// (at your option) any later version.
10

            
11
// This program is distributed in the hope that it will be useful,
12
// but WITHOUT ANY WARRANTY; without even the implied warranty of
13
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
// GNU General Public License for more details.
15

            
16
// You should have received a copy of the GNU General Public License
17
// along with this program. If not, see <https://www.gnu.org/licenses/>.
18

            
19
//! Client backend that is backed by a database.
20
//!
21
//! # Canonicality vs. Finality
22
//!
23
//! Finality indicates that a block will not be reverted, according to the consensus algorithm,
24
//! while canonicality indicates that the block may be reverted, but we will be unable to do so,
25
//! having discarded heavy state that will allow a chain reorganization.
26
//!
27
//! Finality implies canonicality but not vice-versa.
28

            
29
#![warn(missing_docs)]
30

            
31
pub mod offchain;
32

            
33
pub mod bench;
34

            
35
mod children;
36
mod parity_db;
37
mod pinned_blocks_cache;
38
mod record_stats_state;
39
mod stats;
40
#[cfg(any(feature = "rocksdb", test))]
41
mod upgrade;
42
mod utils;
43

            
44
use linked_hash_map::LinkedHashMap;
45
use log::{debug, trace, warn};
46
use parking_lot::{Mutex, RwLock};
47
use std::{
48
	collections::{HashMap, HashSet},
49
	io,
50
	path::{Path, PathBuf},
51
	sync::Arc,
52
};
53

            
54
use crate::{
55
	pinned_blocks_cache::PinnedBlocksCache,
56
	record_stats_state::RecordStatsState,
57
	stats::StateUsageStats,
58
	utils::{meta_keys, read_db, read_meta, DatabaseType, Meta},
59
};
60
use codec::{Decode, Encode};
61
use hash_db::Prefix;
62
use sc_client_api::{
63
	backend::NewBlockState,
64
	leaves::{FinalizationOutcome, LeafSet},
65
	utils::is_descendent_of,
66
	IoInfo, MemoryInfo, MemorySize, UsageInfo,
67
};
68
use sc_state_db::{IsPruned, LastCanonicalized, StateDb};
69
use sp_arithmetic::traits::Saturating;
70
use sp_blockchain::{
71
	Backend as _, CachedHeaderMetadata, DisplacedLeavesAfterFinalization, Error as ClientError,
72
	HeaderBackend, HeaderMetadata, HeaderMetadataCache, Result as ClientResult,
73
};
74
use sp_core::{
75
	offchain::OffchainOverlayedChange,
76
	storage::{well_known_keys, ChildInfo},
77
};
78
use sp_database::Transaction;
79
use sp_runtime::{
80
	generic::BlockId,
81
	traits::{
82
		Block as BlockT, Hash, HashingFor, Header as HeaderT, NumberFor, One, SaturatedConversion,
83
		Zero,
84
	},
85
	Justification, Justifications, StateVersion, Storage,
86
};
87
use sp_state_machine::{
88
	backend::{AsTrieBackend, Backend as StateBackend},
89
	BackendTransaction, ChildStorageCollection, DBValue, IndexOperation, IterArgs,
90
	OffchainChangesCollection, StateMachineStats, StorageCollection, StorageIterator, StorageKey,
91
	StorageValue, UsageInfo as StateUsageInfo,
92
};
93
use sp_trie::{cache::SharedTrieCache, prefixed_key, MemoryDB, MerkleValue, PrefixedMemoryDB};
94

            
95
// Re-export the Database trait so that one can pass an implementation of it.
96
pub use sc_state_db::PruningMode;
97
pub use sp_database::Database;
98

            
99
pub use bench::BenchmarkingState;
100

            
101
const CACHE_HEADERS: usize = 8;
102

            
103
/// DB-backed patricia trie state, transaction type is an overlay of changes to commit.
104
pub type DbState<H> = sp_state_machine::TrieBackend<Arc<dyn sp_state_machine::Storage<H>>, H>;
105

            
106
/// Builder for [`DbState`].
107
pub type DbStateBuilder<Hasher> =
108
	sp_state_machine::TrieBackendBuilder<Arc<dyn sp_state_machine::Storage<Hasher>>, Hasher>;
109

            
110
/// Length of a [`DbHash`].
111
const DB_HASH_LEN: usize = 32;
112

            
113
/// Hash type that this backend uses for the database.
114
pub type DbHash = sp_core::H256;
115

            
116
/// An extrinsic entry in the database.
117
#[derive(Debug, Encode, Decode)]
118
enum DbExtrinsic<B: BlockT> {
119
	/// Extrinsic that contains indexed data.
120
	Indexed {
121
		/// Hash of the indexed part.
122
		hash: DbHash,
123
		/// Extrinsic header.
124
		header: Vec<u8>,
125
	},
126
	/// Complete extrinsic data.
127
	Full(B::Extrinsic),
128
}
129

            
130
/// A reference tracking state.
131
///
132
/// It makes sure that the hash we are using stays pinned in storage
133
/// until this structure is dropped.
134
pub struct RefTrackingState<Block: BlockT> {
135
	state: DbState<HashingFor<Block>>,
136
	storage: Arc<StorageDb<Block>>,
137
	parent_hash: Option<Block::Hash>,
138
}
139

            
140
impl<B: BlockT> RefTrackingState<B> {
141
	fn new(
142
		state: DbState<HashingFor<B>>,
143
		storage: Arc<StorageDb<B>>,
144
		parent_hash: Option<B::Hash>,
145
	) -> Self {
146
		RefTrackingState { state, parent_hash, storage }
147
	}
148
}
149

            
150
impl<B: BlockT> Drop for RefTrackingState<B> {
151
	fn drop(&mut self) {
152
		if let Some(hash) = &self.parent_hash {
153
			self.storage.state_db.unpin(hash);
154
		}
155
	}
156
}
157

            
158
impl<Block: BlockT> std::fmt::Debug for RefTrackingState<Block> {
159
	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160
		write!(f, "Block {:?}", self.parent_hash)
161
	}
162
}
163

            
164
/// A raw iterator over the `RefTrackingState`.
165
pub struct RawIter<B: BlockT> {
166
	inner: <DbState<HashingFor<B>> as StateBackend<HashingFor<B>>>::RawIter,
167
}
168

            
169
impl<B: BlockT> StorageIterator<HashingFor<B>> for RawIter<B> {
170
	type Backend = RefTrackingState<B>;
171
	type Error = <DbState<HashingFor<B>> as StateBackend<HashingFor<B>>>::Error;
172

            
173
	fn next_key(&mut self, backend: &Self::Backend) -> Option<Result<StorageKey, Self::Error>> {
174
		self.inner.next_key(&backend.state)
175
	}
176

            
177
	fn next_pair(
178
		&mut self,
179
		backend: &Self::Backend,
180
	) -> Option<Result<(StorageKey, StorageValue), Self::Error>> {
181
		self.inner.next_pair(&backend.state)
182
	}
183

            
184
	fn was_complete(&self) -> bool {
185
		self.inner.was_complete()
186
	}
187
}
188

            
189
impl<B: BlockT> StateBackend<HashingFor<B>> for RefTrackingState<B> {
190
	type Error = <DbState<HashingFor<B>> as StateBackend<HashingFor<B>>>::Error;
191
	type TrieBackendStorage =
192
		<DbState<HashingFor<B>> as StateBackend<HashingFor<B>>>::TrieBackendStorage;
193
	type RawIter = RawIter<B>;
194

            
195
	fn storage(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
196
		self.state.storage(key)
197
	}
198

            
199
	fn storage_hash(&self, key: &[u8]) -> Result<Option<B::Hash>, Self::Error> {
200
		self.state.storage_hash(key)
201
	}
202

            
203
	fn child_storage(
204
		&self,
205
		child_info: &ChildInfo,
206
		key: &[u8],
207
	) -> Result<Option<Vec<u8>>, Self::Error> {
208
		self.state.child_storage(child_info, key)
209
	}
210

            
211
	fn child_storage_hash(
212
		&self,
213
		child_info: &ChildInfo,
214
		key: &[u8],
215
	) -> Result<Option<B::Hash>, Self::Error> {
216
		self.state.child_storage_hash(child_info, key)
217
	}
218

            
219
	fn closest_merkle_value(
220
		&self,
221
		key: &[u8],
222
	) -> Result<Option<MerkleValue<B::Hash>>, Self::Error> {
223
		self.state.closest_merkle_value(key)
224
	}
225

            
226
	fn child_closest_merkle_value(
227
		&self,
228
		child_info: &ChildInfo,
229
		key: &[u8],
230
	) -> Result<Option<MerkleValue<B::Hash>>, Self::Error> {
231
		self.state.child_closest_merkle_value(child_info, key)
232
	}
233

            
234
	fn exists_storage(&self, key: &[u8]) -> Result<bool, Self::Error> {
235
		self.state.exists_storage(key)
236
	}
237

            
238
	fn exists_child_storage(
239
		&self,
240
		child_info: &ChildInfo,
241
		key: &[u8],
242
	) -> Result<bool, Self::Error> {
243
		self.state.exists_child_storage(child_info, key)
244
	}
245

            
246
	fn next_storage_key(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
247
		self.state.next_storage_key(key)
248
	}
249

            
250
	fn next_child_storage_key(
251
		&self,
252
		child_info: &ChildInfo,
253
		key: &[u8],
254
	) -> Result<Option<Vec<u8>>, Self::Error> {
255
		self.state.next_child_storage_key(child_info, key)
256
	}
257

            
258
	fn storage_root<'a>(
259
		&self,
260
		delta: impl Iterator<Item = (&'a [u8], Option<&'a [u8]>)>,
261
		state_version: StateVersion,
262
	) -> (B::Hash, BackendTransaction<HashingFor<B>>) {
263
		self.state.storage_root(delta, state_version)
264
	}
265

            
266
	fn child_storage_root<'a>(
267
		&self,
268
		child_info: &ChildInfo,
269
		delta: impl Iterator<Item = (&'a [u8], Option<&'a [u8]>)>,
270
		state_version: StateVersion,
271
	) -> (B::Hash, bool, BackendTransaction<HashingFor<B>>) {
272
		self.state.child_storage_root(child_info, delta, state_version)
273
	}
274

            
275
	fn raw_iter(&self, args: IterArgs) -> Result<Self::RawIter, Self::Error> {
276
		self.state.raw_iter(args).map(|inner| RawIter { inner })
277
	}
278

            
279
	fn register_overlay_stats(&self, stats: &StateMachineStats) {
280
		self.state.register_overlay_stats(stats);
281
	}
282

            
283
	fn usage_info(&self) -> StateUsageInfo {
284
		self.state.usage_info()
285
	}
286
}
287

            
288
impl<B: BlockT> AsTrieBackend<HashingFor<B>> for RefTrackingState<B> {
289
	type TrieBackendStorage =
290
		<DbState<HashingFor<B>> as StateBackend<HashingFor<B>>>::TrieBackendStorage;
291

            
292
	fn as_trie_backend(
293
		&self,
294
	) -> &sp_state_machine::TrieBackend<Self::TrieBackendStorage, HashingFor<B>> {
295
		&self.state.as_trie_backend()
296
	}
297
}
298

            
299
/// Database settings.
300
pub struct DatabaseSettings {
301
	/// The maximum trie cache size in bytes.
302
	///
303
	/// If `None` is given, the cache is disabled.
304
	pub trie_cache_maximum_size: Option<usize>,
305
	/// Requested state pruning mode.
306
	pub state_pruning: Option<PruningMode>,
307
	/// Where to find the database.
308
	pub source: DatabaseSource,
309
	/// Block pruning mode.
310
	///
311
	/// NOTE: only finalized blocks are subject for removal!
312
	pub blocks_pruning: BlocksPruning,
313
}
314

            
315
/// Block pruning settings.
316
#[derive(Debug, Clone, Copy, PartialEq)]
317
pub enum BlocksPruning {
318
	/// Keep full block history, of every block that was ever imported.
319
	KeepAll,
320
	/// Keep full finalized block history.
321
	KeepFinalized,
322
	/// Keep N recent finalized blocks.
323
	Some(u32),
324
}
325

            
326
impl BlocksPruning {
327
	/// True if this is an archive pruning mode (either KeepAll or KeepFinalized).
328
	pub fn is_archive(&self) -> bool {
329
		match *self {
330
			BlocksPruning::KeepAll | BlocksPruning::KeepFinalized => true,
331
			BlocksPruning::Some(_) => false,
332
		}
333
	}
334
}
335

            
336
/// Where to find the database..
337
#[derive(Debug, Clone)]
338
pub enum DatabaseSource {
339
	/// Check given path, and see if there is an existing database there. If it's either `RocksDb`
340
	/// or `ParityDb`, use it. If there is none, create a new instance of `ParityDb`.
341
	Auto {
342
		/// Path to the paritydb database.
343
		paritydb_path: PathBuf,
344
		/// Path to the rocksdb database.
345
		rocksdb_path: PathBuf,
346
		/// Cache size in MiB. Used only by `RocksDb` variant of `DatabaseSource`.
347
		cache_size: usize,
348
	},
349
	/// Load a RocksDB database from a given path. Recommended for most uses.
350
	#[cfg(feature = "rocksdb")]
351
	RocksDb {
352
		/// Path to the database.
353
		path: PathBuf,
354
		/// Cache size in MiB.
355
		cache_size: usize,
356
	},
357

            
358
	/// Load a ParityDb database from a given path.
359
	ParityDb {
360
		/// Path to the database.
361
		path: PathBuf,
362
	},
363

            
364
	/// Use a custom already-open database.
365
	Custom {
366
		/// the handle to the custom storage
367
		db: Arc<dyn Database<DbHash>>,
368

            
369
		/// if set, the `create` flag will be required to open such datasource
370
		require_create_flag: bool,
371
	},
372
}
373

            
374
impl DatabaseSource {
375
	/// Return path for databases that are stored on disk.
376
	pub fn path(&self) -> Option<&Path> {
377
		match self {
378
			// as per https://github.com/paritytech/substrate/pull/9500#discussion_r684312550
379
			//
380
			// IIUC this is needed for polkadot to create its own dbs, so until it can use parity db
381
			// I would think rocksdb, but later parity-db.
382
			DatabaseSource::Auto { paritydb_path, .. } => Some(paritydb_path),
383
			#[cfg(feature = "rocksdb")]
384
			DatabaseSource::RocksDb { path, .. } => Some(path),
385
			DatabaseSource::ParityDb { path } => Some(path),
386
			DatabaseSource::Custom { .. } => None,
387
		}
388
	}
389

            
390
	/// Set path for databases that are stored on disk.
391
	pub fn set_path(&mut self, p: &Path) -> bool {
392
		match self {
393
			DatabaseSource::Auto { ref mut paritydb_path, .. } => {
394
				*paritydb_path = p.into();
395
				true
396
			},
397
			#[cfg(feature = "rocksdb")]
398
			DatabaseSource::RocksDb { ref mut path, .. } => {
399
				*path = p.into();
400
				true
401
			},
402
			DatabaseSource::ParityDb { ref mut path } => {
403
				*path = p.into();
404
				true
405
			},
406
			DatabaseSource::Custom { .. } => false,
407
		}
408
	}
409
}
410

            
411
impl std::fmt::Display for DatabaseSource {
412
	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
413
		let name = match self {
414
			DatabaseSource::Auto { .. } => "Auto",
415
			#[cfg(feature = "rocksdb")]
416
			DatabaseSource::RocksDb { .. } => "RocksDb",
417
			DatabaseSource::ParityDb { .. } => "ParityDb",
418
			DatabaseSource::Custom { .. } => "Custom",
419
		};
420
		write!(f, "{}", name)
421
	}
422
}
423

            
424
pub(crate) mod columns {
425
	pub const META: u32 = crate::utils::COLUMN_META;
426
	pub const STATE: u32 = 1;
427
	pub const STATE_META: u32 = 2;
428
	/// maps hashes to lookup keys and numbers to canon hashes.
429
	pub const KEY_LOOKUP: u32 = 3;
430
	pub const HEADER: u32 = 4;
431
	pub const BODY: u32 = 5;
432
	pub const JUSTIFICATIONS: u32 = 6;
433
	pub const AUX: u32 = 8;
434
	/// Offchain workers local storage
435
	pub const OFFCHAIN: u32 = 9;
436
	/// Transactions
437
	pub const TRANSACTION: u32 = 11;
438
	pub const BODY_INDEX: u32 = 12;
439
}
440

            
441
struct PendingBlock<Block: BlockT> {
442
	header: Block::Header,
443
	justifications: Option<Justifications>,
444
	body: Option<Vec<Block::Extrinsic>>,
445
	indexed_body: Option<Vec<Vec<u8>>>,
446
	leaf_state: NewBlockState,
447
}
448

            
449
// wrapper that implements trait required for state_db
450
#[derive(Clone)]
451
struct StateMetaDb(Arc<dyn Database<DbHash>>);
452

            
453
impl sc_state_db::MetaDb for StateMetaDb {
454
	type Error = sp_database::error::DatabaseError;
455

            
456
	fn get_meta(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
457
		Ok(self.0.get(columns::STATE_META, key))
458
	}
459
}
460

            
461
struct MetaUpdate<Block: BlockT> {
462
	pub hash: Block::Hash,
463
	pub number: NumberFor<Block>,
464
	pub is_best: bool,
465
	pub is_finalized: bool,
466
	pub with_state: bool,
467
}
468

            
469
fn cache_header<Hash: std::cmp::Eq + std::hash::Hash, Header>(
470
	cache: &mut LinkedHashMap<Hash, Option<Header>>,
471
	hash: Hash,
472
	header: Option<Header>,
473
) {
474
	cache.insert(hash, header);
475
	while cache.len() > CACHE_HEADERS {
476
		cache.pop_front();
477
	}
478
}
479

            
480
/// Block database
481
pub struct BlockchainDb<Block: BlockT> {
482
	db: Arc<dyn Database<DbHash>>,
483
	meta: Arc<RwLock<Meta<NumberFor<Block>, Block::Hash>>>,
484
	leaves: RwLock<LeafSet<Block::Hash, NumberFor<Block>>>,
485
	header_metadata_cache: Arc<HeaderMetadataCache<Block>>,
486
	header_cache: Mutex<LinkedHashMap<Block::Hash, Option<Block::Header>>>,
487
	pinned_blocks_cache: Arc<RwLock<PinnedBlocksCache<Block>>>,
488
}
489

            
490
impl<Block: BlockT> BlockchainDb<Block> {
491
	fn new(db: Arc<dyn Database<DbHash>>) -> ClientResult<Self> {
492
		let meta = read_meta::<Block>(&*db, columns::HEADER)?;
493
		let leaves = LeafSet::read_from_db(&*db, columns::META, meta_keys::LEAF_PREFIX)?;
494
		Ok(BlockchainDb {
495
			db,
496
			leaves: RwLock::new(leaves),
497
			meta: Arc::new(RwLock::new(meta)),
498
			header_metadata_cache: Arc::new(HeaderMetadataCache::default()),
499
			header_cache: Default::default(),
500
			pinned_blocks_cache: Arc::new(RwLock::new(PinnedBlocksCache::new())),
501
		})
502
	}
503

            
504
	fn update_meta(&self, update: MetaUpdate<Block>) {
505
		let MetaUpdate { hash, number, is_best, is_finalized, with_state } = update;
506
		let mut meta = self.meta.write();
507
		if number.is_zero() {
508
			meta.genesis_hash = hash;
509
		}
510

            
511
		if is_best {
512
			meta.best_number = number;
513
			meta.best_hash = hash;
514
		}
515

            
516
		if is_finalized {
517
			if with_state {
518
				meta.finalized_state = Some((hash, number));
519
			}
520
			meta.finalized_number = number;
521
			meta.finalized_hash = hash;
522
		}
523
	}
524

            
525
	fn update_block_gap(&self, gap: Option<(NumberFor<Block>, NumberFor<Block>)>) {
526
		let mut meta = self.meta.write();
527
		meta.block_gap = gap;
528
	}
529

            
530
	/// Empty the cache of pinned items.
531
	fn clear_pinning_cache(&self) {
532
		self.pinned_blocks_cache.write().clear();
533
	}
534

            
535
	/// Load a justification into the cache of pinned items.
536
	/// Reference count of the item will not be increased. Use this
537
	/// to load values for items into the cache which have already been pinned.
538
	fn insert_justifications_if_pinned(&self, hash: Block::Hash, justification: Justification) {
539
		let mut cache = self.pinned_blocks_cache.write();
540
		if !cache.contains(hash) {
541
			return
542
		}
543

            
544
		let justifications = Justifications::from(justification);
545
		cache.insert_justifications(hash, Some(justifications));
546
	}
547

            
548
	/// Load a justification from the db into the cache of pinned items.
549
	/// Reference count of the item will not be increased. Use this
550
	/// to load values for items into the cache which have already been pinned.
551
	fn insert_persisted_justifications_if_pinned(&self, hash: Block::Hash) -> ClientResult<()> {
552
		let mut cache = self.pinned_blocks_cache.write();
553
		if !cache.contains(hash) {
554
			return Ok(())
555
		}
556

            
557
		let justifications = self.justifications_uncached(hash)?;
558
		cache.insert_justifications(hash, justifications);
559
		Ok(())
560
	}
561

            
562
	/// Load a block body from the db into the cache of pinned items.
563
	/// Reference count of the item will not be increased. Use this
564
	/// to load values for items items into the cache which have already been pinned.
565
	fn insert_persisted_body_if_pinned(&self, hash: Block::Hash) -> ClientResult<()> {
566
		let mut cache = self.pinned_blocks_cache.write();
567
		if !cache.contains(hash) {
568
			return Ok(())
569
		}
570

            
571
		let body = self.body_uncached(hash)?;
572
		cache.insert_body(hash, body);
573
		Ok(())
574
	}
575

            
576
	/// Bump reference count for pinned item.
577
	fn bump_ref(&self, hash: Block::Hash) {
578
		self.pinned_blocks_cache.write().pin(hash);
579
	}
580

            
581
	/// Decrease reference count for pinned item and remove if reference count is 0.
582
	fn unpin(&self, hash: Block::Hash) {
583
		self.pinned_blocks_cache.write().unpin(hash);
584
	}
585

            
586
	fn justifications_uncached(&self, hash: Block::Hash) -> ClientResult<Option<Justifications>> {
587
		match read_db(
588
			&*self.db,
589
			columns::KEY_LOOKUP,
590
			columns::JUSTIFICATIONS,
591
			BlockId::<Block>::Hash(hash),
592
		)? {
593
			Some(justifications) => match Decode::decode(&mut &justifications[..]) {
594
				Ok(justifications) => Ok(Some(justifications)),
595
				Err(err) =>
596
					return Err(sp_blockchain::Error::Backend(format!(
597
						"Error decoding justifications: {}",
598
						err
599
					))),
600
			},
601
			None => Ok(None),
602
		}
603
	}
604

            
605
	fn body_uncached(&self, hash: Block::Hash) -> ClientResult<Option<Vec<Block::Extrinsic>>> {
606
		if let Some(body) =
607
			read_db(&*self.db, columns::KEY_LOOKUP, columns::BODY, BlockId::Hash::<Block>(hash))?
608
		{
609
			// Plain body
610
			match Decode::decode(&mut &body[..]) {
611
				Ok(body) => return Ok(Some(body)),
612
				Err(err) =>
613
					return Err(sp_blockchain::Error::Backend(format!(
614
						"Error decoding body: {}",
615
						err
616
					))),
617
			}
618
		}
619

            
620
		if let Some(index) = read_db(
621
			&*self.db,
622
			columns::KEY_LOOKUP,
623
			columns::BODY_INDEX,
624
			BlockId::Hash::<Block>(hash),
625
		)? {
626
			match Vec::<DbExtrinsic<Block>>::decode(&mut &index[..]) {
627
				Ok(index) => {
628
					let mut body = Vec::new();
629
					for ex in index {
630
						match ex {
631
							DbExtrinsic::Indexed { hash, header } => {
632
								match self.db.get(columns::TRANSACTION, hash.as_ref()) {
633
									Some(t) => {
634
										let mut input =
635
											utils::join_input(header.as_ref(), t.as_ref());
636
										let ex = Block::Extrinsic::decode(&mut input).map_err(
637
											|err| {
638
												sp_blockchain::Error::Backend(format!(
639
													"Error decoding indexed extrinsic: {}",
640
													err
641
												))
642
											},
643
										)?;
644
										body.push(ex);
645
									},
646
									None =>
647
										return Err(sp_blockchain::Error::Backend(format!(
648
											"Missing indexed transaction {:?}",
649
											hash
650
										))),
651
								};
652
							},
653
							DbExtrinsic::Full(ex) => {
654
								body.push(ex);
655
							},
656
						}
657
					}
658
					return Ok(Some(body))
659
				},
660
				Err(err) =>
661
					return Err(sp_blockchain::Error::Backend(format!(
662
						"Error decoding body list: {}",
663
						err
664
					))),
665
			}
666
		}
667
		Ok(None)
668
	}
669
}
670

            
671
impl<Block: BlockT> sc_client_api::blockchain::HeaderBackend<Block> for BlockchainDb<Block> {
672
	fn header(&self, hash: Block::Hash) -> ClientResult<Option<Block::Header>> {
673
		let mut cache = self.header_cache.lock();
674
		if let Some(result) = cache.get_refresh(&hash) {
675
			return Ok(result.clone())
676
		}
677
		let header = utils::read_header(
678
			&*self.db,
679
			columns::KEY_LOOKUP,
680
			columns::HEADER,
681
			BlockId::<Block>::Hash(hash),
682
		)?;
683
		cache_header(&mut cache, hash, header.clone());
684
		Ok(header)
685
	}
686

            
687
	fn info(&self) -> sc_client_api::blockchain::Info<Block> {
688
		let meta = self.meta.read();
689
		sc_client_api::blockchain::Info {
690
			best_hash: meta.best_hash,
691
			best_number: meta.best_number,
692
			genesis_hash: meta.genesis_hash,
693
			finalized_hash: meta.finalized_hash,
694
			finalized_number: meta.finalized_number,
695
			finalized_state: meta.finalized_state,
696
			number_leaves: self.leaves.read().count(),
697
			block_gap: meta.block_gap,
698
		}
699
	}
700

            
701
	fn status(&self, hash: Block::Hash) -> ClientResult<sc_client_api::blockchain::BlockStatus> {
702
		match self.header(hash)?.is_some() {
703
			true => Ok(sc_client_api::blockchain::BlockStatus::InChain),
704
			false => Ok(sc_client_api::blockchain::BlockStatus::Unknown),
705
		}
706
	}
707

            
708
	fn number(&self, hash: Block::Hash) -> ClientResult<Option<NumberFor<Block>>> {
709
		Ok(self.header_metadata(hash).ok().map(|header_metadata| header_metadata.number))
710
	}
711

            
712
	fn hash(&self, number: NumberFor<Block>) -> ClientResult<Option<Block::Hash>> {
713
		Ok(utils::read_header::<Block>(
714
			&*self.db,
715
			columns::KEY_LOOKUP,
716
			columns::HEADER,
717
			BlockId::Number(number),
718
		)?
719
		.map(|header| header.hash()))
720
	}
721
}
722

            
723
impl<Block: BlockT> sc_client_api::blockchain::Backend<Block> for BlockchainDb<Block> {
724
	fn body(&self, hash: Block::Hash) -> ClientResult<Option<Vec<Block::Extrinsic>>> {
725
		let cache = self.pinned_blocks_cache.read();
726
		if let Some(result) = cache.body(&hash) {
727
			return Ok(result.clone())
728
		}
729

            
730
		self.body_uncached(hash)
731
	}
732

            
733
	fn justifications(&self, hash: Block::Hash) -> ClientResult<Option<Justifications>> {
734
		let cache = self.pinned_blocks_cache.read();
735
		if let Some(result) = cache.justifications(&hash) {
736
			return Ok(result.clone())
737
		}
738

            
739
		self.justifications_uncached(hash)
740
	}
741

            
742
	fn last_finalized(&self) -> ClientResult<Block::Hash> {
743
		Ok(self.meta.read().finalized_hash)
744
	}
745

            
746
	fn leaves(&self) -> ClientResult<Vec<Block::Hash>> {
747
		Ok(self.leaves.read().hashes())
748
	}
749

            
750
	fn children(&self, parent_hash: Block::Hash) -> ClientResult<Vec<Block::Hash>> {
751
		children::read_children(&*self.db, columns::META, meta_keys::CHILDREN_PREFIX, parent_hash)
752
	}
753

            
754
	fn indexed_transaction(&self, hash: Block::Hash) -> ClientResult<Option<Vec<u8>>> {
755
		Ok(self.db.get(columns::TRANSACTION, hash.as_ref()))
756
	}
757

            
758
	fn has_indexed_transaction(&self, hash: Block::Hash) -> ClientResult<bool> {
759
		Ok(self.db.contains(columns::TRANSACTION, hash.as_ref()))
760
	}
761

            
762
	fn block_indexed_body(&self, hash: Block::Hash) -> ClientResult<Option<Vec<Vec<u8>>>> {
763
		let body = match read_db(
764
			&*self.db,
765
			columns::KEY_LOOKUP,
766
			columns::BODY_INDEX,
767
			BlockId::<Block>::Hash(hash),
768
		)? {
769
			Some(body) => body,
770
			None => return Ok(None),
771
		};
772
		match Vec::<DbExtrinsic<Block>>::decode(&mut &body[..]) {
773
			Ok(index) => {
774
				let mut transactions = Vec::new();
775
				for ex in index.into_iter() {
776
					if let DbExtrinsic::Indexed { hash, .. } = ex {
777
						match self.db.get(columns::TRANSACTION, hash.as_ref()) {
778
							Some(t) => transactions.push(t),
779
							None =>
780
								return Err(sp_blockchain::Error::Backend(format!(
781
									"Missing indexed transaction {:?}",
782
									hash
783
								))),
784
						}
785
					}
786
				}
787
				Ok(Some(transactions))
788
			},
789
			Err(err) =>
790
				Err(sp_blockchain::Error::Backend(format!("Error decoding body list: {}", err))),
791
		}
792
	}
793
}
794

            
795
impl<Block: BlockT> HeaderMetadata<Block> for BlockchainDb<Block> {
796
	type Error = sp_blockchain::Error;
797

            
798
	fn header_metadata(
799
		&self,
800
		hash: Block::Hash,
801
	) -> Result<CachedHeaderMetadata<Block>, Self::Error> {
802
		self.header_metadata_cache.header_metadata(hash).map_or_else(
803
			|| {
804
				self.header(hash)?
805
					.map(|header| {
806
						let header_metadata = CachedHeaderMetadata::from(&header);
807
						self.header_metadata_cache
808
							.insert_header_metadata(header_metadata.hash, header_metadata.clone());
809
						header_metadata
810
					})
811
					.ok_or_else(|| {
812
						ClientError::UnknownBlock(format!(
813
							"Header was not found in the database: {:?}",
814
							hash
815
						))
816
					})
817
			},
818
			Ok,
819
		)
820
	}
821

            
822
	fn insert_header_metadata(&self, hash: Block::Hash, metadata: CachedHeaderMetadata<Block>) {
823
		self.header_metadata_cache.insert_header_metadata(hash, metadata)
824
	}
825

            
826
	fn remove_header_metadata(&self, hash: Block::Hash) {
827
		self.header_cache.lock().remove(&hash);
828
		self.header_metadata_cache.remove_header_metadata(hash);
829
	}
830
}
831

            
832
/// Database transaction
833
pub struct BlockImportOperation<Block: BlockT> {
834
	old_state: RecordStatsState<RefTrackingState<Block>, Block>,
835
	db_updates: PrefixedMemoryDB<HashingFor<Block>>,
836
	storage_updates: StorageCollection,
837
	child_storage_updates: ChildStorageCollection,
838
	offchain_storage_updates: OffchainChangesCollection,
839
	pending_block: Option<PendingBlock<Block>>,
840
	aux_ops: Vec<(Vec<u8>, Option<Vec<u8>>)>,
841
	finalized_blocks: Vec<(Block::Hash, Option<Justification>)>,
842
	set_head: Option<Block::Hash>,
843
	commit_state: bool,
844
	index_ops: Vec<IndexOperation>,
845
}
846

            
847
impl<Block: BlockT> BlockImportOperation<Block> {
848
	fn apply_offchain(&mut self, transaction: &mut Transaction<DbHash>) {
849
		let mut count = 0;
850
		for ((prefix, key), value_operation) in self.offchain_storage_updates.drain(..) {
851
			count += 1;
852
			let key = crate::offchain::concatenate_prefix_and_key(&prefix, &key);
853
			match value_operation {
854
				OffchainOverlayedChange::SetValue(val) =>
855
					transaction.set_from_vec(columns::OFFCHAIN, &key, val),
856
				OffchainOverlayedChange::Remove => transaction.remove(columns::OFFCHAIN, &key),
857
			}
858
		}
859

            
860
		if count > 0 {
861
			log::debug!(target: "sc_offchain", "Applied {} offchain indexing changes.", count);
862
		}
863
	}
864

            
865
	fn apply_aux(&mut self, transaction: &mut Transaction<DbHash>) {
866
		for (key, maybe_val) in self.aux_ops.drain(..) {
867
			match maybe_val {
868
				Some(val) => transaction.set_from_vec(columns::AUX, &key, val),
869
				None => transaction.remove(columns::AUX, &key),
870
			}
871
		}
872
	}
873

            
874
	fn apply_new_state(
875
		&mut self,
876
		storage: Storage,
877
		state_version: StateVersion,
878
	) -> ClientResult<Block::Hash> {
879
		if storage.top.keys().any(|k| well_known_keys::is_child_storage_key(k)) {
880
			return Err(sp_blockchain::Error::InvalidState)
881
		}
882

            
883
		let child_delta = storage.children_default.values().map(|child_content| {
884
			(
885
				&child_content.child_info,
886
				child_content.data.iter().map(|(k, v)| (&k[..], Some(&v[..]))),
887
			)
888
		});
889

            
890
		let (root, transaction) = self.old_state.full_storage_root(
891
			storage.top.iter().map(|(k, v)| (&k[..], Some(&v[..]))),
892
			child_delta,
893
			state_version,
894
		);
895

            
896
		self.db_updates = transaction;
897
		Ok(root)
898
	}
899
}
900

            
901
impl<Block: BlockT> sc_client_api::backend::BlockImportOperation<Block>
902
	for BlockImportOperation<Block>
903
{
904
	type State = RecordStatsState<RefTrackingState<Block>, Block>;
905

            
906
	fn state(&self) -> ClientResult<Option<&Self::State>> {
907
		Ok(Some(&self.old_state))
908
	}
909

            
910
	fn set_block_data(
911
		&mut self,
912
		header: Block::Header,
913
		body: Option<Vec<Block::Extrinsic>>,
914
		indexed_body: Option<Vec<Vec<u8>>>,
915
		justifications: Option<Justifications>,
916
		leaf_state: NewBlockState,
917
	) -> ClientResult<()> {
918
		assert!(self.pending_block.is_none(), "Only one block per operation is allowed");
919
		self.pending_block =
920
			Some(PendingBlock { header, body, indexed_body, justifications, leaf_state });
921
		Ok(())
922
	}
923

            
924
	fn update_db_storage(
925
		&mut self,
926
		update: PrefixedMemoryDB<HashingFor<Block>>,
927
	) -> ClientResult<()> {
928
		self.db_updates = update;
929
		Ok(())
930
	}
931

            
932
	fn reset_storage(
933
		&mut self,
934
		storage: Storage,
935
		state_version: StateVersion,
936
	) -> ClientResult<Block::Hash> {
937
		let root = self.apply_new_state(storage, state_version)?;
938
		self.commit_state = true;
939
		Ok(root)
940
	}
941

            
942
	fn set_genesis_state(
943
		&mut self,
944
		storage: Storage,
945
		commit: bool,
946
		state_version: StateVersion,
947
	) -> ClientResult<Block::Hash> {
948
		let root = self.apply_new_state(storage, state_version)?;
949
		self.commit_state = commit;
950
		Ok(root)
951
	}
952

            
953
	fn insert_aux<I>(&mut self, ops: I) -> ClientResult<()>
954
	where
955
		I: IntoIterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
956
	{
957
		self.aux_ops.append(&mut ops.into_iter().collect());
958
		Ok(())
959
	}
960

            
961
	fn update_storage(
962
		&mut self,
963
		update: StorageCollection,
964
		child_update: ChildStorageCollection,
965
	) -> ClientResult<()> {
966
		self.storage_updates = update;
967
		self.child_storage_updates = child_update;
968
		Ok(())
969
	}
970

            
971
	fn update_offchain_storage(
972
		&mut self,
973
		offchain_update: OffchainChangesCollection,
974
	) -> ClientResult<()> {
975
		self.offchain_storage_updates = offchain_update;
976
		Ok(())
977
	}
978

            
979
	fn mark_finalized(
980
		&mut self,
981
		block: Block::Hash,
982
		justification: Option<Justification>,
983
	) -> ClientResult<()> {
984
		self.finalized_blocks.push((block, justification));
985
		Ok(())
986
	}
987

            
988
	fn mark_head(&mut self, hash: Block::Hash) -> ClientResult<()> {
989
		assert!(self.set_head.is_none(), "Only one set head per operation is allowed");
990
		self.set_head = Some(hash);
991
		Ok(())
992
	}
993

            
994
	fn update_transaction_index(&mut self, index_ops: Vec<IndexOperation>) -> ClientResult<()> {
995
		self.index_ops = index_ops;
996
		Ok(())
997
	}
998
}
999

            
struct StorageDb<Block: BlockT> {
	pub db: Arc<dyn Database<DbHash>>,
	pub state_db: StateDb<Block::Hash, Vec<u8>, StateMetaDb>,
	prefix_keys: bool,
}
impl<Block: BlockT> sp_state_machine::Storage<HashingFor<Block>> for StorageDb<Block> {
	fn get(&self, key: &Block::Hash, prefix: Prefix) -> Result<Option<DBValue>, String> {
		if self.prefix_keys {
			let key = prefixed_key::<HashingFor<Block>>(key, prefix);
			self.state_db.get(&key, self)
		} else {
			self.state_db.get(key.as_ref(), self)
		}
		.map_err(|e| format!("Database backend error: {:?}", e))
	}
}
impl<Block: BlockT> sc_state_db::NodeDb for StorageDb<Block> {
	type Error = io::Error;
	type Key = [u8];
	fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
		Ok(self.db.get(columns::STATE, key))
	}
}
struct DbGenesisStorage<Block: BlockT> {
	root: Block::Hash,
	storage: PrefixedMemoryDB<HashingFor<Block>>,
}
impl<Block: BlockT> DbGenesisStorage<Block> {
	pub fn new(root: Block::Hash, storage: PrefixedMemoryDB<HashingFor<Block>>) -> Self {
		DbGenesisStorage { root, storage }
	}
}
impl<Block: BlockT> sp_state_machine::Storage<HashingFor<Block>> for DbGenesisStorage<Block> {
	fn get(&self, key: &Block::Hash, prefix: Prefix) -> Result<Option<DBValue>, String> {
		use hash_db::HashDB;
		Ok(self.storage.get(key, prefix))
	}
}
struct EmptyStorage<Block: BlockT>(pub Block::Hash);
impl<Block: BlockT> EmptyStorage<Block> {
	pub fn new() -> Self {
		let mut root = Block::Hash::default();
		let mut mdb = MemoryDB::<HashingFor<Block>>::default();
		// both triedbmut are the same on empty storage.
		sp_trie::trie_types::TrieDBMutBuilderV1::<HashingFor<Block>>::new(&mut mdb, &mut root)
			.build();
		EmptyStorage(root)
	}
}
impl<Block: BlockT> sp_state_machine::Storage<HashingFor<Block>> for EmptyStorage<Block> {
	fn get(&self, _key: &Block::Hash, _prefix: Prefix) -> Result<Option<DBValue>, String> {
		Ok(None)
	}
}
/// Frozen `value` at time `at`.
///
/// Used as inner structure under lock in `FrozenForDuration`.
struct Frozen<T: Clone> {
	at: std::time::Instant,
	value: Option<T>,
}
/// Some value frozen for period of time.
///
/// If time `duration` not passed since the value was instantiated,
/// current frozen value is returned. Otherwise, you have to provide
/// a new value which will be again frozen for `duration`.
pub(crate) struct FrozenForDuration<T: Clone> {
	duration: std::time::Duration,
	value: parking_lot::Mutex<Frozen<T>>,
}
impl<T: Clone> FrozenForDuration<T> {
	fn new(duration: std::time::Duration) -> Self {
		Self { duration, value: Frozen { at: std::time::Instant::now(), value: None }.into() }
	}
	fn take_or_else<F>(&self, f: F) -> T
	where
		F: FnOnce() -> T,
	{
		let mut lock = self.value.lock();
		let now = std::time::Instant::now();
		if now.saturating_duration_since(lock.at) > self.duration || lock.value.is_none() {
			let new_value = f();
			lock.at = now;
			lock.value = Some(new_value.clone());
			new_value
		} else {
			lock.value.as_ref().expect("Checked with in branch above; qed").clone()
		}
	}
}
/// Disk backend.
///
/// Disk backend keeps data in a key-value store. In archive mode, trie nodes are kept from all
/// blocks. Otherwise, trie nodes are kept only from some recent blocks.
pub struct Backend<Block: BlockT> {
	storage: Arc<StorageDb<Block>>,
	offchain_storage: offchain::LocalStorage,
	blockchain: BlockchainDb<Block>,
	canonicalization_delay: u64,
	import_lock: Arc<RwLock<()>>,
	is_archive: bool,
	blocks_pruning: BlocksPruning,
	io_stats: FrozenForDuration<(kvdb::IoStats, StateUsageInfo)>,
	state_usage: Arc<StateUsageStats>,
	genesis_state: RwLock<Option<Arc<DbGenesisStorage<Block>>>>,
	shared_trie_cache: Option<sp_trie::cache::SharedTrieCache<HashingFor<Block>>>,
}
impl<Block: BlockT> Backend<Block> {
	/// Create a new instance of database backend.
	///
	/// The pruning window is how old a block must be before the state is pruned.
	pub fn new(db_config: DatabaseSettings, canonicalization_delay: u64) -> ClientResult<Self> {
		use utils::OpenDbError;
		let db_source = &db_config.source;
		let (needs_init, db) =
			match crate::utils::open_database::<Block>(db_source, DatabaseType::Full, false) {
				Ok(db) => (false, db),
				Err(OpenDbError::DoesNotExist) => {
					let db =
						crate::utils::open_database::<Block>(db_source, DatabaseType::Full, true)?;
					(true, db)
				},
				Err(as_is) => return Err(as_is.into()),
			};
		Self::from_database(db as Arc<_>, canonicalization_delay, &db_config, needs_init)
	}
	/// Reset the shared trie cache.
	pub fn reset_trie_cache(&self) {
		if let Some(cache) = &self.shared_trie_cache {
			cache.reset();
		}
	}
	/// Create new memory-backed client backend for tests.
	#[cfg(any(test, feature = "test-helpers"))]
	pub fn new_test(blocks_pruning: u32, canonicalization_delay: u64) -> Self {
		Self::new_test_with_tx_storage(BlocksPruning::Some(blocks_pruning), canonicalization_delay)
	}
	/// Create new memory-backed client backend for tests.
	#[cfg(any(test, feature = "test-helpers"))]
	pub fn new_test_with_tx_storage(
		blocks_pruning: BlocksPruning,
		canonicalization_delay: u64,
	) -> Self {
		let db = kvdb_memorydb::create(crate::utils::NUM_COLUMNS);
		let db = sp_database::as_database(db);
		let state_pruning = match blocks_pruning {
			BlocksPruning::KeepAll => PruningMode::ArchiveAll,
			BlocksPruning::KeepFinalized => PruningMode::ArchiveCanonical,
			BlocksPruning::Some(n) => PruningMode::blocks_pruning(n),
		};
		let db_setting = DatabaseSettings {
			trie_cache_maximum_size: Some(16 * 1024 * 1024),
			state_pruning: Some(state_pruning),
			source: DatabaseSource::Custom { db, require_create_flag: true },
			blocks_pruning,
		};
		Self::new(db_setting, canonicalization_delay).expect("failed to create test-db")
	}
	/// Expose the Database that is used by this backend.
	/// The second argument is the Column that stores the State.
	///
	/// Should only be needed for benchmarking.
	#[cfg(any(feature = "runtime-benchmarks"))]
	pub fn expose_db(&self) -> (Arc<dyn sp_database::Database<DbHash>>, sp_database::ColumnId) {
		(self.storage.db.clone(), columns::STATE)
	}
	/// Expose the Storage that is used by this backend.
	///
	/// Should only be needed for benchmarking.
	#[cfg(any(feature = "runtime-benchmarks"))]
	pub fn expose_storage(&self) -> Arc<dyn sp_state_machine::Storage<HashingFor<Block>>> {
		self.storage.clone()
	}
	fn from_database(
		db: Arc<dyn Database<DbHash>>,
		canonicalization_delay: u64,
		config: &DatabaseSettings,
		should_init: bool,
	) -> ClientResult<Self> {
		let mut db_init_transaction = Transaction::new();
		let requested_state_pruning = config.state_pruning.clone();
		let state_meta_db = StateMetaDb(db.clone());
		let map_e = sp_blockchain::Error::from_state_db;
		let (state_db_init_commit_set, state_db) = StateDb::open(
			state_meta_db,
			requested_state_pruning,
			!db.supports_ref_counting(),
			should_init,
		)
		.map_err(map_e)?;
		apply_state_commit(&mut db_init_transaction, state_db_init_commit_set);
		let state_pruning_used = state_db.pruning_mode();
		let is_archive_pruning = state_pruning_used.is_archive();
		let blockchain = BlockchainDb::new(db.clone())?;
		let storage_db =
			StorageDb { db: db.clone(), state_db, prefix_keys: !db.supports_ref_counting() };
		let offchain_storage = offchain::LocalStorage::new(db.clone());
		let backend = Backend {
			storage: Arc::new(storage_db),
			offchain_storage,
			blockchain,
			canonicalization_delay,
			import_lock: Default::default(),
			is_archive: is_archive_pruning,
			io_stats: FrozenForDuration::new(std::time::Duration::from_secs(1)),
			state_usage: Arc::new(StateUsageStats::new()),
			blocks_pruning: config.blocks_pruning,
			genesis_state: RwLock::new(None),
			shared_trie_cache: config.trie_cache_maximum_size.map(|maximum_size| {
				SharedTrieCache::new(sp_trie::cache::CacheSize::new(maximum_size))
			}),
		};
		// Older DB versions have no last state key. Check if the state is available and set it.
		let info = backend.blockchain.info();
		if info.finalized_state.is_none() &&
			info.finalized_hash != Default::default() &&
			sc_client_api::Backend::have_state_at(
				&backend,
				info.finalized_hash,
				info.finalized_number,
			) {
			backend.blockchain.update_meta(MetaUpdate {
				hash: info.finalized_hash,
				number: info.finalized_number,
				is_best: info.finalized_hash == info.best_hash,
				is_finalized: true,
				with_state: true,
			});
		}
		db.commit(db_init_transaction)?;
		Ok(backend)
	}
	/// Handle setting head within a transaction. `route_to` should be the last
	/// block that existed in the database. `best_to` should be the best block
	/// to be set.
	///
	/// In the case where the new best block is a block to be imported, `route_to`
	/// should be the parent of `best_to`. In the case where we set an existing block
	/// to be best, `route_to` should equal to `best_to`.
	fn set_head_with_transaction(
		&self,
		transaction: &mut Transaction<DbHash>,
		route_to: Block::Hash,
		best_to: (NumberFor<Block>, Block::Hash),
	) -> ClientResult<(Vec<Block::Hash>, Vec<Block::Hash>)> {
		let mut enacted = Vec::default();
		let mut retracted = Vec::default();
		let (best_number, best_hash) = best_to;
		let meta = self.blockchain.meta.read();
		if meta.best_number.saturating_sub(best_number).saturated_into::<u64>() >
			self.canonicalization_delay
		{
			return Err(sp_blockchain::Error::SetHeadTooOld)
		}
		let parent_exists =
			self.blockchain.status(route_to)? == sp_blockchain::BlockStatus::InChain;
		// Cannot find tree route with empty DB or when imported a detached block.
		if meta.best_hash != Default::default() && parent_exists {
			let tree_route = sp_blockchain::tree_route(&self.blockchain, meta.best_hash, route_to)?;
			// uncanonicalize: check safety violations and ensure the numbers no longer
			// point to these block hashes in the key mapping.
			for r in tree_route.retracted() {
				if r.hash == meta.finalized_hash {
					warn!(
						"Potential safety failure: reverting finalized block {:?}",
						(&r.number, &r.hash)
					);
					return Err(sp_blockchain::Error::NotInFinalizedChain)
				}
				retracted.push(r.hash);
				utils::remove_number_to_key_mapping(transaction, columns::KEY_LOOKUP, r.number)?;
			}
			// canonicalize: set the number lookup to map to this block's hash.
			for e in tree_route.enacted() {
				enacted.push(e.hash);
				utils::insert_number_to_key_mapping(
					transaction,
					columns::KEY_LOOKUP,
					e.number,
					e.hash,
				)?;
			}
		}
		let lookup_key = utils::number_and_hash_to_lookup_key(best_number, &best_hash)?;
		transaction.set_from_vec(columns::META, meta_keys::BEST_BLOCK, lookup_key);
		utils::insert_number_to_key_mapping(
			transaction,
			columns::KEY_LOOKUP,
			best_number,
			best_hash,
		)?;
		Ok((enacted, retracted))
	}
	fn ensure_sequential_finalization(
		&self,
		header: &Block::Header,
		last_finalized: Option<Block::Hash>,
	) -> ClientResult<()> {
		let last_finalized =
			last_finalized.unwrap_or_else(|| self.blockchain.meta.read().finalized_hash);
		if last_finalized != self.blockchain.meta.read().genesis_hash &&
			*header.parent_hash() != last_finalized
		{
			return Err(sp_blockchain::Error::NonSequentialFinalization(format!(
				"Last finalized {:?} not parent of {:?}",
				last_finalized,
				header.hash()
			)))
		}
		Ok(())
	}
	/// `remove_displaced` can be set to `false` if this is not the last of many subsequent calls
	/// for performance reasons.
	fn finalize_block_with_transaction(
		&self,
		transaction: &mut Transaction<DbHash>,
		hash: Block::Hash,
		header: &Block::Header,
		last_finalized: Option<Block::Hash>,
		justification: Option<Justification>,
		current_transaction_justifications: &mut HashMap<Block::Hash, Justification>,
		remove_displaced: bool,
	) -> ClientResult<MetaUpdate<Block>> {
		// TODO: ensure best chain contains this block.
		let number = *header.number();
		self.ensure_sequential_finalization(header, last_finalized)?;
		let with_state = sc_client_api::Backend::have_state_at(self, hash, number);
		self.note_finalized(
			transaction,
			header,
			hash,
			with_state,
			current_transaction_justifications,
			remove_displaced,
		)?;
		if let Some(justification) = justification {
			transaction.set_from_vec(
				columns::JUSTIFICATIONS,
				&utils::number_and_hash_to_lookup_key(number, hash)?,
				Justifications::from(justification.clone()).encode(),
			);
			current_transaction_justifications.insert(hash, justification);
		}
		Ok(MetaUpdate { hash, number, is_best: false, is_finalized: true, with_state })
	}
	// performs forced canonicalization with a delay after importing a non-finalized block.
	fn force_delayed_canonicalize(
		&self,
		transaction: &mut Transaction<DbHash>,
	) -> ClientResult<()> {
		let best_canonical = match self.storage.state_db.last_canonicalized() {
			LastCanonicalized::None => 0,
			LastCanonicalized::Block(b) => b,
			// Nothing needs to be done when canonicalization is not happening.
			LastCanonicalized::NotCanonicalizing => return Ok(()),
		};
		let info = self.blockchain.info();
		let best_number: u64 = self.blockchain.info().best_number.saturated_into();
		for to_canonicalize in
			best_canonical + 1..=best_number.saturating_sub(self.canonicalization_delay)
		{
			let hash_to_canonicalize = sc_client_api::blockchain::HeaderBackend::hash(
				&self.blockchain,
				to_canonicalize.saturated_into(),
			)?
			.ok_or_else(|| {
				let best_hash = info.best_hash;
				sp_blockchain::Error::Backend(format!(
					"Can't canonicalize missing block number #{to_canonicalize} when for best block {best_hash:?} (#{best_number})",
				))
			})?;
			if !sc_client_api::Backend::have_state_at(
				self,
				hash_to_canonicalize,
				to_canonicalize.saturated_into(),
			) {
				return Ok(())
			}
			trace!(target: "db", "Canonicalize block #{} ({:?})", to_canonicalize, hash_to_canonicalize);
			let commit = self.storage.state_db.canonicalize_block(&hash_to_canonicalize).map_err(
				sp_blockchain::Error::from_state_db::<
					sc_state_db::Error<sp_database::error::DatabaseError>,
				>,
			)?;
			apply_state_commit(transaction, commit);
		}
		Ok(())
	}
	fn try_commit_operation(&self, mut operation: BlockImportOperation<Block>) -> ClientResult<()> {
		let mut transaction = Transaction::new();
		operation.apply_aux(&mut transaction);
		operation.apply_offchain(&mut transaction);
		let mut meta_updates = Vec::with_capacity(operation.finalized_blocks.len());
		let (best_num, mut last_finalized_hash, mut last_finalized_num, mut block_gap) = {
			let meta = self.blockchain.meta.read();
			(meta.best_number, meta.finalized_hash, meta.finalized_number, meta.block_gap)
		};
		let mut current_transaction_justifications: HashMap<Block::Hash, Justification> =
			HashMap::new();
		let mut finalized_blocks = operation.finalized_blocks.into_iter().peekable();
		while let Some((block_hash, justification)) = finalized_blocks.next() {
			let block_header = self.blockchain.expect_header(block_hash)?;
			meta_updates.push(self.finalize_block_with_transaction(
				&mut transaction,
				block_hash,
				&block_header,
				Some(last_finalized_hash),
				justification,
				&mut current_transaction_justifications,
				finalized_blocks.peek().is_none(),
			)?);
			last_finalized_hash = block_hash;
			last_finalized_num = *block_header.number();
		}
		let imported = if let Some(pending_block) = operation.pending_block {
			let hash = pending_block.header.hash();
			let parent_hash = *pending_block.header.parent_hash();
			let number = *pending_block.header.number();
			let highest_leaf = self
				.blockchain
				.leaves
				.read()
				.highest_leaf()
				.map(|(n, _)| n)
				.unwrap_or(Zero::zero());
			let existing_header = number <= highest_leaf && self.blockchain.header(hash)?.is_some();
			// blocks are keyed by number + hash.
			let lookup_key = utils::number_and_hash_to_lookup_key(number, hash)?;
			if pending_block.leaf_state.is_best() {
				self.set_head_with_transaction(&mut transaction, parent_hash, (number, hash))?;
			};
			utils::insert_hash_to_key_mapping(&mut transaction, columns::KEY_LOOKUP, number, hash)?;
			transaction.set_from_vec(columns::HEADER, &lookup_key, pending_block.header.encode());
			if let Some(body) = pending_block.body {
				// If we have any index operations we save block in the new format with indexed
				// extrinsic headers Otherwise we save the body as a single blob.
				if operation.index_ops.is_empty() {
					transaction.set_from_vec(columns::BODY, &lookup_key, body.encode());
				} else {
					let body =
						apply_index_ops::<Block>(&mut transaction, body, operation.index_ops);
					transaction.set_from_vec(columns::BODY_INDEX, &lookup_key, body);
				}
			}
			if let Some(body) = pending_block.indexed_body {
				apply_indexed_body::<Block>(&mut transaction, body);
			}
			if let Some(justifications) = pending_block.justifications {
				transaction.set_from_vec(
					columns::JUSTIFICATIONS,
					&lookup_key,
					justifications.encode(),
				);
			}
			if number.is_zero() {
				transaction.set(columns::META, meta_keys::GENESIS_HASH, hash.as_ref());
				if operation.commit_state {
					transaction.set_from_vec(columns::META, meta_keys::FINALIZED_STATE, lookup_key);
				} else {
					// When we don't want to commit the genesis state, we still preserve it in
					// memory to bootstrap consensus. It is queried for an initial list of
					// authorities, etc.
					*self.genesis_state.write() = Some(Arc::new(DbGenesisStorage::new(
						*pending_block.header.state_root(),
						operation.db_updates.clone(),
					)));
				}
			}
			let finalized = if operation.commit_state {
				let mut changeset: sc_state_db::ChangeSet<Vec<u8>> =
					sc_state_db::ChangeSet::default();
				let mut ops: u64 = 0;
				let mut bytes: u64 = 0;
				let mut removal: u64 = 0;
				let mut bytes_removal: u64 = 0;
				for (mut key, (val, rc)) in operation.db_updates.drain() {
					self.storage.db.sanitize_key(&mut key);
					if rc > 0 {
						ops += 1;
						bytes += key.len() as u64 + val.len() as u64;
						if rc == 1 {
							changeset.inserted.push((key, val.to_vec()));
						} else {
							changeset.inserted.push((key.clone(), val.to_vec()));
							for _ in 0..rc - 1 {
								changeset.inserted.push((key.clone(), Default::default()));
							}
						}
					} else if rc < 0 {
						removal += 1;
						bytes_removal += key.len() as u64;
						if rc == -1 {
							changeset.deleted.push(key);
						} else {
							for _ in 0..-rc {
								changeset.deleted.push(key.clone());
							}
						}
					}
				}
				self.state_usage.tally_writes_nodes(ops, bytes);
				self.state_usage.tally_removed_nodes(removal, bytes_removal);
				let mut ops: u64 = 0;
				let mut bytes: u64 = 0;
				for (key, value) in operation
					.storage_updates
					.iter()
					.chain(operation.child_storage_updates.iter().flat_map(|(_, s)| s.iter()))
				{
					ops += 1;
					bytes += key.len() as u64;
					if let Some(v) = value.as_ref() {
						bytes += v.len() as u64;
					}
				}
				self.state_usage.tally_writes(ops, bytes);
				let number_u64 = number.saturated_into::<u64>();
				let commit = self
					.storage
					.state_db
					.insert_block(&hash, number_u64, pending_block.header.parent_hash(), changeset)
					.map_err(|e: sc_state_db::Error<sp_database::error::DatabaseError>| {
						sp_blockchain::Error::from_state_db(e)
					})?;
				apply_state_commit(&mut transaction, commit);
				if number <= last_finalized_num {
					// Canonicalize in the db when re-importing existing blocks with state.
					let commit = self.storage.state_db.canonicalize_block(&hash).map_err(
						sp_blockchain::Error::from_state_db::<
							sc_state_db::Error<sp_database::error::DatabaseError>,
						>,
					)?;
					apply_state_commit(&mut transaction, commit);
					meta_updates.push(MetaUpdate {
						hash,
						number,
						is_best: false,
						is_finalized: true,
						with_state: true,
					});
				}
				// Check if need to finalize. Genesis is always finalized instantly.
				let finalized = number_u64 == 0 || pending_block.leaf_state.is_final();
				finalized
			} else {
				(number.is_zero() && last_finalized_num.is_zero()) ||
					pending_block.leaf_state.is_final()
			};
			let header = &pending_block.header;
			let is_best = pending_block.leaf_state.is_best();
			debug!(
				target: "db",
				"DB Commit {:?} ({}), best={}, state={}, existing={}, finalized={}",
				hash,
				number,
				is_best,
				operation.commit_state,
				existing_header,
				finalized,
			);
			self.state_usage.merge_sm(operation.old_state.usage_info());
			// release state reference so that it can be finalized
			// VERY IMPORTANT
			drop(operation.old_state);
			if finalized {
				// TODO: ensure best chain contains this block.
				self.ensure_sequential_finalization(header, Some(last_finalized_hash))?;
				let mut current_transaction_justifications = HashMap::new();
				self.note_finalized(
					&mut transaction,
					header,
					hash,
					operation.commit_state,
					&mut current_transaction_justifications,
					true,
				)?;
			} else {
				// canonicalize blocks which are old enough, regardless of finality.
				self.force_delayed_canonicalize(&mut transaction)?
			}
			if !existing_header {
				// Add a new leaf if the block has the potential to be finalized.
				if number > last_finalized_num || last_finalized_num.is_zero() {
					let mut leaves = self.blockchain.leaves.write();
					leaves.import(hash, number, parent_hash);
					leaves.prepare_transaction(
						&mut transaction,
						columns::META,
						meta_keys::LEAF_PREFIX,
					);
				}
				let mut children = children::read_children(
					&*self.storage.db,
					columns::META,
					meta_keys::CHILDREN_PREFIX,
					parent_hash,
				)?;
				if !children.contains(&hash) {
					children.push(hash);
					children::write_children(
						&mut transaction,
						columns::META,
						meta_keys::CHILDREN_PREFIX,
						parent_hash,
						children,
					);
				}
				if let Some((mut start, end)) = block_gap {
					if number == start {
						start += One::one();
						utils::insert_number_to_key_mapping(
							&mut transaction,
							columns::KEY_LOOKUP,
							number,
							hash,
						)?;
					}
					if start > end {
						transaction.remove(columns::META, meta_keys::BLOCK_GAP);
						block_gap = None;
						debug!(target: "db", "Removed block gap.");
					} else {
						block_gap = Some((start, end));
						debug!(target: "db", "Update block gap. {:?}", block_gap);
						transaction.set(
							columns::META,
							meta_keys::BLOCK_GAP,
							&(start, end).encode(),
						);
					}
				} else if number > best_num + One::one() &&
					number > One::one() && self.blockchain.header(parent_hash)?.is_none()
				{
					let gap = (best_num + One::one(), number - One::one());
					transaction.set(columns::META, meta_keys::BLOCK_GAP, &gap.encode());
					block_gap = Some(gap);
					debug!(target: "db", "Detected block gap {:?}", block_gap);
				}
			}
			meta_updates.push(MetaUpdate {
				hash,
				number,
				is_best: pending_block.leaf_state.is_best(),
				is_finalized: finalized,
				with_state: operation.commit_state,
			});
			Some((pending_block.header, hash))
		} else {
			None
		};
		if let Some(set_head) = operation.set_head {
			if let Some(header) =
				sc_client_api::blockchain::HeaderBackend::header(&self.blockchain, set_head)?
			{
				let number = header.number();
				let hash = header.hash();
				self.set_head_with_transaction(&mut transaction, hash, (*number, hash))?;
				meta_updates.push(MetaUpdate {
					hash,
					number: *number,
					is_best: true,
					is_finalized: false,
					with_state: false,
				});
			} else {
				return Err(sp_blockchain::Error::UnknownBlock(format!(
					"Cannot set head {:?}",
					set_head
				)))
			}
		}
		self.storage.db.commit(transaction)?;
		// Apply all in-memory state changes.
		// Code beyond this point can't fail.
		if let Some((header, hash)) = imported {
			trace!(target: "db", "DB Commit done {:?}", hash);
			let header_metadata = CachedHeaderMetadata::from(&header);
			self.blockchain.insert_header_metadata(header_metadata.hash, header_metadata);
			cache_header(&mut self.blockchain.header_cache.lock(), hash, Some(header));
		}
		for m in meta_updates {
			self.blockchain.update_meta(m);
		}
		self.blockchain.update_block_gap(block_gap);
		Ok(())
	}
	// Write stuff to a transaction after a new block is finalized. This canonicalizes finalized
	// blocks. Fails if called with a block which was not a child of the last finalized block.
	/// `remove_displaced` can be set to `false` if this is not the last of many subsequent calls
	/// for performance reasons.
	fn note_finalized(
		&self,
		transaction: &mut Transaction<DbHash>,
		f_header: &Block::Header,
		f_hash: Block::Hash,
		with_state: bool,
		current_transaction_justifications: &mut HashMap<Block::Hash, Justification>,
		remove_displaced: bool,
	) -> ClientResult<()> {
		let f_num = *f_header.number();
		let lookup_key = utils::number_and_hash_to_lookup_key(f_num, f_hash)?;
		if with_state {
			transaction.set_from_vec(columns::META, meta_keys::FINALIZED_STATE, lookup_key.clone());
		}
		transaction.set_from_vec(columns::META, meta_keys::FINALIZED_BLOCK, lookup_key);
		let requires_canonicalization = match self.storage.state_db.last_canonicalized() {
			LastCanonicalized::None => true,
			LastCanonicalized::Block(b) => f_num.saturated_into::<u64>() > b,
			LastCanonicalized::NotCanonicalizing => false,
		};
		if requires_canonicalization && sc_client_api::Backend::have_state_at(self, f_hash, f_num) {
			let commit = self.storage.state_db.canonicalize_block(&f_hash).map_err(
				sp_blockchain::Error::from_state_db::<
					sc_state_db::Error<sp_database::error::DatabaseError>,
				>,
			)?;
			apply_state_commit(transaction, commit);
		}
		if remove_displaced {
			let new_displaced = self.blockchain.displaced_leaves_after_finalizing(f_hash, f_num)?;
			self.blockchain.leaves.write().remove_displaced_leaves(FinalizationOutcome::new(
				new_displaced.displaced_leaves.iter().copied(),
			));
			if !matches!(self.blocks_pruning, BlocksPruning::KeepAll) {
				self.prune_displaced_branches(transaction, &new_displaced)?;
			}
		}
		self.prune_blocks(transaction, f_num, current_transaction_justifications)?;
		Ok(())
	}
	fn prune_blocks(
		&self,
		transaction: &mut Transaction<DbHash>,
		finalized_number: NumberFor<Block>,
		current_transaction_justifications: &mut HashMap<Block::Hash, Justification>,
	) -> ClientResult<()> {
		if let BlocksPruning::Some(blocks_pruning) = self.blocks_pruning {
			// Always keep the last finalized block
			let keep = std::cmp::max(blocks_pruning, 1);
			if finalized_number >= keep.into() {
				let number = finalized_number.saturating_sub(keep.into());
				// Before we prune a block, check if it is pinned
				if let Some(hash) = self.blockchain.hash(number)? {
					self.blockchain.insert_persisted_body_if_pinned(hash)?;
					// If the block was finalized in this transaction, it will not be in the db
					// yet.
					if let Some(justification) = current_transaction_justifications.remove(&hash) {
						self.blockchain.insert_justifications_if_pinned(hash, justification);
					} else {
						self.blockchain.insert_persisted_justifications_if_pinned(hash)?;
					}
				};
				self.prune_block(transaction, BlockId::<Block>::number(number))?;
			}
		}
		Ok(())
	}
	fn prune_displaced_branches(
		&self,
		transaction: &mut Transaction<DbHash>,
		displaced: &DisplacedLeavesAfterFinalization<Block>,
	) -> ClientResult<()> {
		// Discard all blocks from displaced branches
		for &hash in displaced.displaced_blocks.iter() {
			self.blockchain.insert_persisted_body_if_pinned(hash)?;
			self.prune_block(transaction, BlockId::<Block>::hash(hash))?;
		}
		Ok(())
	}
	fn prune_block(
		&self,
		transaction: &mut Transaction<DbHash>,
		id: BlockId<Block>,
	) -> ClientResult<()> {
		debug!(target: "db", "Removing block #{}", id);
		utils::remove_from_db(
			transaction,
			&*self.storage.db,
			columns::KEY_LOOKUP,
			columns::BODY,
			id,
		)?;
		utils::remove_from_db(
			transaction,
			&*self.storage.db,
			columns::KEY_LOOKUP,
			columns::JUSTIFICATIONS,
			id,
		)?;
		if let Some(index) =
			read_db(&*self.storage.db, columns::KEY_LOOKUP, columns::BODY_INDEX, id)?
		{
			utils::remove_from_db(
				transaction,
				&*self.storage.db,
				columns::KEY_LOOKUP,
				columns::BODY_INDEX,
				id,
			)?;
			match Vec::<DbExtrinsic<Block>>::decode(&mut &index[..]) {
				Ok(index) =>
					for ex in index {
						if let DbExtrinsic::Indexed { hash, .. } = ex {
							transaction.release(columns::TRANSACTION, hash);
						}
					},
				Err(err) =>
					return Err(sp_blockchain::Error::Backend(format!(
						"Error decoding body list: {}",
						err
					))),
			}
		}
		Ok(())
	}
	fn empty_state(&self) -> RecordStatsState<RefTrackingState<Block>, Block> {
		let root = EmptyStorage::<Block>::new().0; // Empty trie
		let db_state = DbStateBuilder::<HashingFor<Block>>::new(self.storage.clone(), root)
			.with_optional_cache(self.shared_trie_cache.as_ref().map(|c| c.local_cache()))
			.build();
		let state = RefTrackingState::new(db_state, self.storage.clone(), None);
		RecordStatsState::new(state, None, self.state_usage.clone())
	}
}
fn apply_state_commit(
	transaction: &mut Transaction<DbHash>,
	commit: sc_state_db::CommitSet<Vec<u8>>,
) {
	for (key, val) in commit.data.inserted.into_iter() {
		transaction.set_from_vec(columns::STATE, &key[..], val);
	}
	for key in commit.data.deleted.into_iter() {
		transaction.remove(columns::STATE, &key[..]);
	}
	for (key, val) in commit.meta.inserted.into_iter() {
		transaction.set_from_vec(columns::STATE_META, &key[..], val);
	}
	for key in commit.meta.deleted.into_iter() {
		transaction.remove(columns::STATE_META, &key[..]);
	}
}
fn apply_index_ops<Block: BlockT>(
	transaction: &mut Transaction<DbHash>,
	body: Vec<Block::Extrinsic>,
	ops: Vec<IndexOperation>,
) -> Vec<u8> {
	let mut extrinsic_index: Vec<DbExtrinsic<Block>> = Vec::with_capacity(body.len());
	let mut index_map = HashMap::new();
	let mut renewed_map = HashMap::new();
	for op in ops {
		match op {
			IndexOperation::Insert { extrinsic, hash, size } => {
				index_map.insert(extrinsic, (hash, size));
			},
			IndexOperation::Renew { extrinsic, hash } => {
				renewed_map.insert(extrinsic, DbHash::from_slice(hash.as_ref()));
			},
		}
	}
	for (index, extrinsic) in body.into_iter().enumerate() {
		let db_extrinsic = if let Some(hash) = renewed_map.get(&(index as u32)) {
			// Bump ref counter
			let extrinsic = extrinsic.encode();
			transaction.reference(columns::TRANSACTION, DbHash::from_slice(hash.as_ref()));
			DbExtrinsic::Indexed { hash: *hash, header: extrinsic }
		} else {
			match index_map.get(&(index as u32)) {
				Some((hash, size)) => {
					let encoded = extrinsic.encode();
					if *size as usize <= encoded.len() {
						let offset = encoded.len() - *size as usize;
						transaction.store(
							columns::TRANSACTION,
							DbHash::from_slice(hash.as_ref()),
							encoded[offset..].to_vec(),
						);
						DbExtrinsic::Indexed {
							hash: DbHash::from_slice(hash.as_ref()),
							header: encoded[..offset].to_vec(),
						}
					} else {
						// Invalid indexed slice. Just store full data and don't index anything.
						DbExtrinsic::Full(extrinsic)
					}
				},
				_ => DbExtrinsic::Full(extrinsic),
			}
		};
		extrinsic_index.push(db_extrinsic);
	}
	debug!(
		target: "db",
		"DB transaction index: {} inserted, {} renewed, {} full",
		index_map.len(),
		renewed_map.len(),
		extrinsic_index.len() - index_map.len() - renewed_map.len(),
	);
	extrinsic_index.encode()
}
fn apply_indexed_body<Block: BlockT>(transaction: &mut Transaction<DbHash>, body: Vec<Vec<u8>>) {
	for extrinsic in body {
		let hash = sp_runtime::traits::BlakeTwo256::hash(&extrinsic);
		transaction.store(columns::TRANSACTION, DbHash::from_slice(hash.as_ref()), extrinsic);
	}
}
impl<Block> sc_client_api::backend::AuxStore for Backend<Block>
where
	Block: BlockT,
{
	fn insert_aux<
		'a,
		'b: 'a,
		'c: 'a,
		I: IntoIterator<Item = &'a (&'c [u8], &'c [u8])>,
		D: IntoIterator<Item = &'a &'b [u8]>,
	>(
		&self,
		insert: I,
		delete: D,
	) -> ClientResult<()> {
		let mut transaction = Transaction::new();
		for (k, v) in insert {
			transaction.set(columns::AUX, k, v);
		}
		for k in delete {
			transaction.remove(columns::AUX, k);
		}
		self.storage.db.commit(transaction)?;
		Ok(())
	}
	fn get_aux(&self, key: &[u8]) -> ClientResult<Option<Vec<u8>>> {
		Ok(self.storage.db.get(columns::AUX, key))
	}
}
impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
	type BlockImportOperation = BlockImportOperation<Block>;
	type Blockchain = BlockchainDb<Block>;
	type State = RecordStatsState<RefTrackingState<Block>, Block>;
	type OffchainStorage = offchain::LocalStorage;
	fn begin_operation(&self) -> ClientResult<Self::BlockImportOperation> {
		Ok(BlockImportOperation {
			pending_block: None,
			old_state: self.empty_state(),
			db_updates: PrefixedMemoryDB::default(),
			storage_updates: Default::default(),
			child_storage_updates: Default::default(),
			offchain_storage_updates: Default::default(),
			aux_ops: Vec::new(),
			finalized_blocks: Vec::new(),
			set_head: None,
			commit_state: false,
			index_ops: Default::default(),
		})
	}
	fn begin_state_operation(
		&self,
		operation: &mut Self::BlockImportOperation,
		block: Block::Hash,
	) -> ClientResult<()> {
		if block == Default::default() {
			operation.old_state = self.empty_state();
		} else {
			operation.old_state = self.state_at(block)?;
		}
		operation.commit_state = true;
		Ok(())
	}
	fn commit_operation(&self, operation: Self::BlockImportOperation) -> ClientResult<()> {
		let usage = operation.old_state.usage_info();
		self.state_usage.merge_sm(usage);
		if let Err(e) = self.try_commit_operation(operation) {
			let state_meta_db = StateMetaDb(self.storage.db.clone());
			self.storage
				.state_db
				.reset(state_meta_db)
				.map_err(sp_blockchain::Error::from_state_db)?;
			self.blockchain.clear_pinning_cache();
			Err(e)
		} else {
			self.storage.state_db.sync();
			Ok(())
		}
	}
	fn finalize_block(
		&self,
		hash: Block::Hash,
		justification: Option<Justification>,
	) -> ClientResult<()> {
		let mut transaction = Transaction::new();
		let header = self.blockchain.expect_header(hash)?;
		let mut current_transaction_justifications = HashMap::new();
		let m = self.finalize_block_with_transaction(
			&mut transaction,
			hash,
			&header,
			None,
			justification,
			&mut current_transaction_justifications,
			true,
		)?;
		self.storage.db.commit(transaction)?;
		self.blockchain.update_meta(m);
		Ok(())
	}
	fn append_justification(
		&self,
		hash: Block::Hash,
		justification: Justification,
	) -> ClientResult<()> {
		let mut transaction: Transaction<DbHash> = Transaction::new();
		let header = self.blockchain.expect_header(hash)?;
		let number = *header.number();
		// Check if the block is finalized first.
		let is_descendent_of = is_descendent_of(&self.blockchain, None);
		let last_finalized = self.blockchain.last_finalized()?;
		// We can do a quick check first, before doing a proper but more expensive check
		if number > self.blockchain.info().finalized_number ||
			(hash != last_finalized && !is_descendent_of(&hash, &last_finalized)?)
		{
			return Err(ClientError::NotInFinalizedChain)
		}
		let justifications = if let Some(mut stored_justifications) =
			self.blockchain.justifications(hash)?
		{
			if !stored_justifications.append(justification) {
				return Err(ClientError::BadJustification("Duplicate consensus engine ID".into()))
			}
			stored_justifications
		} else {
			Justifications::from(justification)
		};
		transaction.set_from_vec(
			columns::JUSTIFICATIONS,
			&utils::number_and_hash_to_lookup_key(number, hash)?,
			justifications.encode(),
		);
		self.storage.db.commit(transaction)?;
		Ok(())
	}
	fn offchain_storage(&self) -> Option<Self::OffchainStorage> {
		Some(self.offchain_storage.clone())
	}
	fn usage_info(&self) -> Option<UsageInfo> {
		let (io_stats, state_stats) = self.io_stats.take_or_else(|| {
			(
				// TODO: implement DB stats and cache size retrieval
				kvdb::IoStats::empty(),
				self.state_usage.take(),
			)
		});
		let database_cache = MemorySize::from_bytes(0);
		let state_cache = MemorySize::from_bytes(
			self.shared_trie_cache.as_ref().map_or(0, |c| c.used_memory_size()),
		);
		Some(UsageInfo {
			memory: MemoryInfo { state_cache, database_cache },
			io: IoInfo {
				transactions: io_stats.transactions,
				bytes_read: io_stats.bytes_read,
				bytes_written: io_stats.bytes_written,
				writes: io_stats.writes,
				reads: io_stats.reads,
				average_transaction_size: io_stats.avg_transaction_size() as u64,
				state_reads: state_stats.reads.ops,
				state_writes: state_stats.writes.ops,
				state_writes_cache: state_stats.overlay_writes.ops,
				state_reads_cache: state_stats.cache_reads.ops,
				state_writes_nodes: state_stats.nodes_writes.ops,
			},
		})
	}
	fn revert(
		&self,
		n: NumberFor<Block>,
		revert_finalized: bool,
	) -> ClientResult<(NumberFor<Block>, HashSet<Block::Hash>)> {
		let mut reverted_finalized = HashSet::new();
		let info = self.blockchain.info();
		let highest_leaf = self
			.blockchain
			.leaves
			.read()
			.highest_leaf()
			.and_then(|(n, h)| h.last().map(|h| (n, *h)));
		let best_number = info.best_number;
		let best_hash = info.best_hash;
		let finalized = info.finalized_number;
		let revertible = best_number - finalized;
		let n = if !revert_finalized && revertible < n { revertible } else { n };
		let (n, mut number_to_revert, mut hash_to_revert) = match highest_leaf {
			Some((l_n, l_h)) => (n + (l_n - best_number), l_n, l_h),
			None => (n, best_number, best_hash),
		};
		let mut revert_blocks = || -> ClientResult<NumberFor<Block>> {
			for c in 0..n.saturated_into::<u64>() {
				if number_to_revert.is_zero() {
					return Ok(c.saturated_into::<NumberFor<Block>>())
				}
				let mut transaction = Transaction::new();
				let removed = self.blockchain.header(hash_to_revert)?.ok_or_else(|| {
					sp_blockchain::Error::UnknownBlock(format!(
						"Error reverting to {}. Block header not found.",
						hash_to_revert,
					))
				})?;
				let removed_hash = removed.hash();
				let prev_number = number_to_revert.saturating_sub(One::one());
				let prev_hash =
					if prev_number == best_number { best_hash } else { *removed.parent_hash() };
				if !self.have_state_at(prev_hash, prev_number) {
					return Ok(c.saturated_into::<NumberFor<Block>>())
				}
				match self.storage.state_db.revert_one() {
					Some(commit) => {
						apply_state_commit(&mut transaction, commit);
						number_to_revert = prev_number;
						hash_to_revert = prev_hash;
						let update_finalized = number_to_revert < finalized;
						let key = utils::number_and_hash_to_lookup_key(
							number_to_revert,
							&hash_to_revert,
						)?;
						if update_finalized {
							transaction.set_from_vec(
								columns::META,
								meta_keys::FINALIZED_BLOCK,
								key.clone(),
							);
							reverted_finalized.insert(removed_hash);
							if let Some((hash, _)) = self.blockchain.info().finalized_state {
								if hash == hash_to_revert {
									if !number_to_revert.is_zero() &&
										self.have_state_at(
											prev_hash,
											number_to_revert - One::one(),
										) {
										let lookup_key = utils::number_and_hash_to_lookup_key(
											number_to_revert - One::one(),
											prev_hash,
										)?;
										transaction.set_from_vec(
											columns::META,
											meta_keys::FINALIZED_STATE,
											lookup_key,
										);
									} else {
										transaction
											.remove(columns::META, meta_keys::FINALIZED_STATE);
									}
								}
							}
						}
						transaction.set_from_vec(columns::META, meta_keys::BEST_BLOCK, key);
						transaction.remove(columns::KEY_LOOKUP, removed.hash().as_ref());
						children::remove_children(
							&mut transaction,
							columns::META,
							meta_keys::CHILDREN_PREFIX,
							hash_to_revert,
						);
						self.storage.db.commit(transaction)?;
						let is_best = number_to_revert < best_number;
						self.blockchain.update_meta(MetaUpdate {
							hash: hash_to_revert,
							number: number_to_revert,
							is_best,
							is_finalized: update_finalized,
							with_state: false,
						});
					},
					None => return Ok(c.saturated_into::<NumberFor<Block>>()),
				}
			}
			Ok(n)
		};
		let reverted = revert_blocks()?;
		let revert_leaves = || -> ClientResult<()> {
			let mut transaction = Transaction::new();
			let mut leaves = self.blockchain.leaves.write();
			leaves.revert(hash_to_revert, number_to_revert);
			leaves.prepare_transaction(&mut transaction, columns::META, meta_keys::LEAF_PREFIX);
			self.storage.db.commit(transaction)?;
			Ok(())
		};
		revert_leaves()?;
		Ok((reverted, reverted_finalized))
	}
	fn remove_leaf_block(&self, hash: Block::Hash) -> ClientResult<()> {
		let best_hash = self.blockchain.info().best_hash;
		if best_hash == hash {
			return Err(sp_blockchain::Error::Backend(format!("Can't remove best block {:?}", hash)))
		}
		let hdr = self.blockchain.header_metadata(hash)?;
		if !self.have_state_at(hash, hdr.number) {
			return Err(sp_blockchain::Error::UnknownBlock(format!(
				"State already discarded for {:?}",
				hash
			)))
		}
		let mut leaves = self.blockchain.leaves.write();
		if !leaves.contains(hdr.number, hash) {
			return Err(sp_blockchain::Error::Backend(format!(
				"Can't remove non-leaf block {:?}",
				hash
			)))
		}
		let mut transaction = Transaction::new();
		if let Some(commit) = self.storage.state_db.remove(&hash) {
			apply_state_commit(&mut transaction, commit);
		}
		transaction.remove(columns::KEY_LOOKUP, hash.as_ref());
		let children: Vec<_> = self
			.blockchain()
			.children(hdr.parent)?
			.into_iter()
			.filter(|child_hash| *child_hash != hash)
			.collect();
		let parent_leaf = if children.is_empty() {
			children::remove_children(
				&mut transaction,
				columns::META,
				meta_keys::CHILDREN_PREFIX,
				hdr.parent,
			);
			Some(hdr.parent)
		} else {
			children::write_children(
				&mut transaction,
				columns::META,
				meta_keys::CHILDREN_PREFIX,
				hdr.parent,
				children,
			);
			None
		};
		let remove_outcome = leaves.remove(hash, hdr.number, parent_leaf);
		leaves.prepare_transaction(&mut transaction, columns::META, meta_keys::LEAF_PREFIX);
		if let Err(e) = self.storage.db.commit(transaction) {
			if let Some(outcome) = remove_outcome {
				leaves.undo().undo_remove(outcome);
			}
			return Err(e.into())
		}
		self.blockchain().remove_header_metadata(hash);
		Ok(())
	}
	fn blockchain(&self) -> &BlockchainDb<Block> {
		&self.blockchain
	}
	fn state_at(&self, hash: Block::Hash) -> ClientResult<Self::State> {
		if hash == self.blockchain.meta.read().genesis_hash {
			if let Some(genesis_state) = &*self.genesis_state.read() {
				let root = genesis_state.root;
				let db_state =
					DbStateBuilder::<HashingFor<Block>>::new(genesis_state.clone(), root)
						.with_optional_cache(
							self.shared_trie_cache.as_ref().map(|c| c.local_cache()),
						)
						.build();
				let state = RefTrackingState::new(db_state, self.storage.clone(), None);
				return Ok(RecordStatsState::new(state, None, self.state_usage.clone()))
			}
		}
		match self.blockchain.header_metadata(hash) {
			Ok(ref hdr) => {
				let hint = || {
					sc_state_db::NodeDb::get(self.storage.as_ref(), hdr.state_root.as_ref())
						.unwrap_or(None)
						.is_some()
				};
				if let Ok(()) =
					self.storage.state_db.pin(&hash, hdr.number.saturated_into::<u64>(), hint)
				{
					let root = hdr.state_root;
					let db_state =
						DbStateBuilder::<HashingFor<Block>>::new(self.storage.clone(), root)
							.with_optional_cache(
								self.shared_trie_cache.as_ref().map(|c| c.local_cache()),
							)
							.build();
					let state = RefTrackingState::new(db_state, self.storage.clone(), Some(hash));
					Ok(RecordStatsState::new(state, Some(hash), self.state_usage.clone()))
				} else {
					Err(sp_blockchain::Error::UnknownBlock(format!(
						"State already discarded for {:?}",
						hash
					)))
				}
			},
			Err(e) => Err(e),
		}
	}
	fn have_state_at(&self, hash: Block::Hash, number: NumberFor<Block>) -> bool {
		if self.is_archive {
			match self.blockchain.header_metadata(hash) {
				Ok(header) => sp_state_machine::Storage::get(
					self.storage.as_ref(),
					&header.state_root,
					(&[], None),
				)
				.unwrap_or(None)
				.is_some(),
				_ => false,
			}
		} else {
			match self.storage.state_db.is_pruned(&hash, number.saturated_into::<u64>()) {
				IsPruned::Pruned => false,
				IsPruned::NotPruned => true,
				IsPruned::MaybePruned => match self.blockchain.header_metadata(hash) {
					Ok(header) => sp_state_machine::Storage::get(
						self.storage.as_ref(),
						&header.state_root,
						(&[], None),
					)
					.unwrap_or(None)
					.is_some(),
					_ => false,
				},
			}
		}
	}
	fn get_import_lock(&self) -> &RwLock<()> {
		&self.import_lock
	}
	fn requires_full_sync(&self) -> bool {
		matches!(
			self.storage.state_db.pruning_mode(),
			PruningMode::ArchiveAll | PruningMode::ArchiveCanonical
		)
	}
	fn pin_block(&self, hash: <Block as BlockT>::Hash) -> sp_blockchain::Result<()> {
		let hint = || {
			let header_metadata = self.blockchain.header_metadata(hash);
			header_metadata
				.map(|hdr| {
					sc_state_db::NodeDb::get(self.storage.as_ref(), hdr.state_root.as_ref())
						.unwrap_or(None)
						.is_some()
				})
				.unwrap_or(false)
		};
		if let Some(number) = self.blockchain.number(hash)? {
			self.storage.state_db.pin(&hash, number.saturated_into::<u64>(), hint).map_err(
				|_| {
					sp_blockchain::Error::UnknownBlock(format!(
						"Unable to pin: state already discarded for `{:?}`",
						hash
					))
				},
			)?;
		} else {
			return Err(ClientError::UnknownBlock(format!(
				"Can not pin block with hash `{:?}`. Block not found.",
				hash
			)))
		}
		if self.blocks_pruning != BlocksPruning::KeepAll {
			// Only increase reference count for this hash. Value is loaded once we prune.
			self.blockchain.bump_ref(hash);
		}
		Ok(())
	}
	fn unpin_block(&self, hash: <Block as BlockT>::Hash) {
		self.storage.state_db.unpin(&hash);
		if self.blocks_pruning != BlocksPruning::KeepAll {
			self.blockchain.unpin(hash);
		}
	}
}
impl<Block: BlockT> sc_client_api::backend::LocalBackend<Block> for Backend<Block> {}
#[cfg(test)]
pub(crate) mod tests {
	use super::*;
	use crate::columns;
	use hash_db::{HashDB, EMPTY_PREFIX};
	use sc_client_api::{
		backend::{Backend as BTrait, BlockImportOperation as Op},
		blockchain::Backend as BLBTrait,
	};
	use sp_blockchain::{lowest_common_ancestor, tree_route};
	use sp_core::H256;
	use sp_runtime::{
		testing::{Block as RawBlock, ExtrinsicWrapper, Header},
		traits::{BlakeTwo256, Hash},
		ConsensusEngineId, StateVersion,
	};
	const CONS0_ENGINE_ID: ConsensusEngineId = *b"CON0";
	const CONS1_ENGINE_ID: ConsensusEngineId = *b"CON1";
	pub(crate) type Block = RawBlock<ExtrinsicWrapper<u64>>;
	pub fn insert_header(
		backend: &Backend<Block>,
		number: u64,
		parent_hash: H256,
		changes: Option<Vec<(Vec<u8>, Vec<u8>)>>,
		extrinsics_root: H256,
	) -> H256 {
		insert_block(backend, number, parent_hash, changes, extrinsics_root, Vec::new(), None)
			.unwrap()
	}
	pub fn insert_block(
		backend: &Backend<Block>,
		number: u64,
		parent_hash: H256,
		_changes: Option<Vec<(Vec<u8>, Vec<u8>)>>,
		extrinsics_root: H256,
		body: Vec<ExtrinsicWrapper<u64>>,
		transaction_index: Option<Vec<IndexOperation>>,
	) -> Result<H256, sp_blockchain::Error> {
		use sp_runtime::testing::Digest;
		let digest = Digest::default();
		let mut header =
			Header { number, parent_hash, state_root: Default::default(), digest, extrinsics_root };
		let block_hash = if number == 0 { Default::default() } else { parent_hash };
		let mut op = backend.begin_operation().unwrap();
		backend.begin_state_operation(&mut op, block_hash).unwrap();
		if let Some(index) = transaction_index {
			op.update_transaction_index(index).unwrap();
		}
		// Insert some fake data to ensure that the block can be found in the state column.
		let (root, overlay) = op.old_state.storage_root(
			vec![(block_hash.as_ref(), Some(block_hash.as_ref()))].into_iter(),
			StateVersion::V1,
		);
		op.update_db_storage(overlay).unwrap();
		header.state_root = root.into();
		op.set_block_data(header.clone(), Some(body), None, None, NewBlockState::Best)
			.unwrap();
		backend.commit_operation(op)?;
		Ok(header.hash())
	}
	pub fn insert_disconnected_header(
		backend: &Backend<Block>,
		number: u64,
		parent_hash: H256,
		extrinsics_root: H256,
		best: bool,
	) -> H256 {
		use sp_runtime::testing::Digest;
		let digest = Digest::default();
		let header =
			Header { number, parent_hash, state_root: Default::default(), digest, extrinsics_root };
		let mut op = backend.begin_operation().unwrap();
		op.set_block_data(
			header.clone(),
			Some(vec![]),
			None,
			None,
			if best { NewBlockState::Best } else { NewBlockState::Normal },
		)
		.unwrap();
		backend.commit_operation(op).unwrap();
		header.hash()
	}
	pub fn insert_header_no_head(
		backend: &Backend<Block>,
		number: u64,
		parent_hash: H256,
		extrinsics_root: H256,
	) -> H256 {
		use sp_runtime::testing::Digest;
		let digest = Digest::default();
		let mut header =
			Header { number, parent_hash, state_root: Default::default(), digest, extrinsics_root };
		let mut op = backend.begin_operation().unwrap();
		let root = backend
			.state_at(parent_hash)
			.unwrap_or_else(|_| {
				if parent_hash == Default::default() {
					backend.empty_state()
				} else {
					panic!("Unknown block: {parent_hash:?}")
				}
			})
			.storage_root(
				vec![(parent_hash.as_ref(), Some(parent_hash.as_ref()))].into_iter(),
				StateVersion::V1,
			)
			.0;
		header.state_root = root.into();
		op.set_block_data(header.clone(), None, None, None, NewBlockState::Normal)
			.unwrap();
		backend.commit_operation(op).unwrap();
		header.hash()
	}
	#[test]
	fn block_hash_inserted_correctly() {
		let backing = {
			let db = Backend::<Block>::new_test(1, 0);
			for i in 0..10 {
				assert!(db.blockchain().hash(i).unwrap().is_none());
				{
					let hash = if i == 0 {
						Default::default()
					} else {
						db.blockchain.hash(i - 1).unwrap().unwrap()
					};
					let mut op = db.begin_operation().unwrap();
					db.begin_state_operation(&mut op, hash).unwrap();
					let header = Header {
						number: i,
						parent_hash: hash,
						state_root: Default::default(),
						digest: Default::default(),
						extrinsics_root: Default::default(),
					};
					op.set_block_data(header, Some(vec![]), None, None, NewBlockState::Best)
						.unwrap();
					db.commit_operation(op).unwrap();
				}
				assert!(db.blockchain().hash(i).unwrap().is_some())
			}
			db.storage.db.clone()
		};
		let backend = Backend::<Block>::new(
			DatabaseSettings {
				trie_cache_maximum_size: Some(16 * 1024 * 1024),
				state_pruning: Some(PruningMode::blocks_pruning(1)),
				source: DatabaseSource::Custom { db: backing, require_create_flag: false },
				blocks_pruning: BlocksPruning::KeepFinalized,
			},
			0,
		)
		.unwrap();
		assert_eq!(backend.blockchain().info().best_number, 9);
		for i in 0..10 {
			assert!(backend.blockchain().hash(i).unwrap().is_some())
		}
	}
	#[test]
	fn set_state_data() {
		set_state_data_inner(StateVersion::V0);
		set_state_data_inner(StateVersion::V1);
	}
	fn set_state_data_inner(state_version: StateVersion) {
		let db = Backend::<Block>::new_test(2, 0);
		let hash = {
			let mut op = db.begin_operation().unwrap();
			let mut header = Header {
				number: 0,
				parent_hash: Default::default(),
				state_root: Default::default(),
				digest: Default::default(),
				extrinsics_root: Default::default(),
			};
			let storage = vec![(vec![1, 3, 5], vec![2, 4, 6]), (vec![1, 2, 3], vec![9, 9, 9])];
			header.state_root = op
				.old_state
				.storage_root(storage.iter().map(|(x, y)| (&x[..], Some(&y[..]))), state_version)
				.0
				.into();
			let hash = header.hash();
			op.reset_storage(
				Storage {
					top: storage.into_iter().collect(),
					children_default: Default::default(),
				},
				state_version,
			)
			.unwrap();
			op.set_block_data(header.clone(), Some(vec![]), None, None, NewBlockState::Best)
				.unwrap();
			db.commit_operation(op).unwrap();
			let state = db.state_at(hash).unwrap();
			assert_eq!(state.storage(&[1, 3, 5]).unwrap(), Some(vec![2, 4, 6]));
			assert_eq!(state.storage(&[1, 2, 3]).unwrap(), Some(vec![9, 9, 9]));
			assert_eq!(state.storage(&[5, 5, 5]).unwrap(), None);
			hash
		};
		{
			let mut op = db.begin_operation().unwrap();
			db.begin_state_operation(&mut op, hash).unwrap();
			let mut header = Header {
				number: 1,
				parent_hash: hash,
				state_root: Default::default(),
				digest: Default::default(),
				extrinsics_root: Default::default(),
			};
			let storage = vec![(vec![1, 3, 5], None), (vec![5, 5, 5], Some(vec![4, 5, 6]))];
			let (root, overlay) = op.old_state.storage_root(
				storage.iter().map(|(k, v)| (k.as_slice(), v.as_ref().map(|v| &v[..]))),
				state_version,
			);
			op.update_db_storage(overlay).unwrap();
			header.state_root = root.into();
			op.update_storage(storage, Vec::new()).unwrap();
			op.set_block_data(header.clone(), Some(vec![]), None, None, NewBlockState::Best)
				.unwrap();
			db.commit_operation(op).unwrap();
			let state = db.state_at(header.hash()).unwrap();
			assert_eq!(state.storage(&[1, 3, 5]).unwrap(), None);
			assert_eq!(state.storage(&[1, 2, 3]).unwrap(), Some(vec![9, 9, 9]));
			assert_eq!(state.storage(&[5, 5, 5]).unwrap(), Some(vec![4, 5, 6]));
		}
	}
	#[test]
	fn delete_only_when_negative_rc() {
		sp_tracing::try_init_simple();
		let state_version = StateVersion::default();
		let key;
		let backend = Backend::<Block>::new_test(1, 0);
		let hash = {
			let mut op = backend.begin_operation().unwrap();
			backend.begin_state_operation(&mut op, Default::default()).unwrap();
			let mut header = Header {
				number: 0,
				parent_hash: Default::default(),
				state_root: Default::default(),
				digest: Default::default(),
				extrinsics_root: Default::default(),
			};
			header.state_root =
				op.old_state.storage_root(std::iter::empty(), state_version).0.into();
			let hash = header.hash();
			op.reset_storage(
				Storage { top: Default::default(), children_default: Default::default() },
				state_version,
			)
			.unwrap();
			key = op.db_updates.insert(EMPTY_PREFIX, b"hello");
			op.set_block_data(header, Some(vec![]), None, None, NewBlockState::Best)
				.unwrap();
			backend.commit_operation(op).unwrap();
			assert_eq!(
				backend
					.storage
					.db
					.get(columns::STATE, &sp_trie::prefixed_key::<BlakeTwo256>(&key, EMPTY_PREFIX))
					.unwrap(),
				&b"hello"[..]
			);
			hash
		};
		let hashof1 = {
			let mut op = backend.begin_operation().unwrap();
			backend.begin_state_operation(&mut op, hash).unwrap();
			let mut header = Header {
				number: 1,
				parent_hash: hash,
				state_root: Default::default(),
				digest: Default::default(),
				extrinsics_root: Default::default(),
			};
			let storage: Vec<(_, _)> = vec![];
			header.state_root = op
				.old_state
				.storage_root(storage.iter().cloned().map(|(x, y)| (x, Some(y))), state_version)
				.0
				.into();
			let hash = header.hash();
			op.db_updates.insert(EMPTY_PREFIX, b"hello");
			op.db_updates.remove(&key, EMPTY_PREFIX);
			op.set_block_data(header, Some(vec![]), None, None, NewBlockState::Best)
				.unwrap();
			backend.commit_operation(op).unwrap();
			assert_eq!(
				backend
					.storage
					.db
					.get(columns::STATE, &sp_trie::prefixed_key::<BlakeTwo256>(&key, EMPTY_PREFIX))
					.unwrap(),
				&b"hello"[..]
			);
			hash
		};
		let hashof2 = {
			let mut op = backend.begin_operation().unwrap();
			backend.begin_state_operation(&mut op, hashof1).unwrap();
			let mut header = Header {
				number: 2,
				parent_hash: hashof1,
				state_root: Default::default(),
				digest: Default::default(),
				extrinsics_root: Default::default(),
			};
			let storage: Vec<(_, _)> = vec![];
			header.state_root = op
				.old_state
				.storage_root(storage.iter().cloned().map(|(x, y)| (x, Some(y))), state_version)
				.0
				.into();
			let hash = header.hash();
			op.db_updates.remove(&key, EMPTY_PREFIX);
			op.set_block_data(header, Some(vec![]), None, None, NewBlockState::Best)
				.unwrap();
			backend.commit_operation(op).unwrap();
			assert!(backend
				.storage
				.db
				.get(columns::STATE, &sp_trie::prefixed_key::<BlakeTwo256>(&key, EMPTY_PREFIX))
				.is_some());
			hash
		};
		let hashof3 = {
			let mut op = backend.begin_operation().unwrap();
			backend.begin_state_operation(&mut op, hashof2).unwrap();
			let mut header = Header {
				number: 3,
				parent_hash: hashof2,
				state_root: Default::default(),
				digest: Default::default(),
				extrinsics_root: Default::default(),
			};
			let storage: Vec<(_, _)> = vec![];
			header.state_root = op
				.old_state
				.storage_root(storage.iter().cloned().map(|(x, y)| (x, Some(y))), state_version)
				.0
				.into();
			let hash = header.hash();
			op.set_block_data(header, Some(vec![]), None, None, NewBlockState::Best)
				.unwrap();
			backend.commit_operation(op).unwrap();
			hash
		};
		let hashof4 = {
			let mut op = backend.begin_operation().unwrap();
			backend.begin_state_operation(&mut op, hashof3).unwrap();
			let mut header = Header {
				number: 4,
				parent_hash: hashof3,
				state_root: Default::default(),
				digest: Default::default(),
				extrinsics_root: Default::default(),
			};
			let storage: Vec<(_, _)> = vec![];
			header.state_root = op
				.old_state
				.storage_root(storage.iter().cloned().map(|(x, y)| (x, Some(y))), state_version)
				.0
				.into();
			let hash = header.hash();
			op.set_block_data(header, Some(vec![]), None, None, NewBlockState::Best)
				.unwrap();
			backend.commit_operation(op).unwrap();
			assert!(backend
				.storage
				.db
				.get(columns::STATE, &sp_trie::prefixed_key::<BlakeTwo256>(&key, EMPTY_PREFIX))
				.is_none());
			hash
		};
		backend.finalize_block(hashof1, None).unwrap();
		backend.finalize_block(hashof2, None).unwrap();
		backend.finalize_block(hashof3, None).unwrap();
		backend.finalize_block(hashof4, None).unwrap();
		assert!(backend
			.storage
			.db
			.get(columns::STATE, &sp_trie::prefixed_key::<BlakeTwo256>(&key, EMPTY_PREFIX))
			.is_none());
	}
	#[test]
	fn tree_route_works() {
		let backend = Backend::<Block>::new_test(1000, 100);
		let blockchain = backend.blockchain();
		let block0 = insert_header(&backend, 0, Default::default(), None, Default::default());
		// fork from genesis: 3 prong.
		let a1 = insert_header(&backend, 1, block0, None, Default::default());
		let a2 = insert_header(&backend, 2, a1, None, Default::default());
		let a3 = insert_header(&backend, 3, a2, None, Default::default());
		// fork from genesis: 2 prong.
		let b1 = insert_header(&backend, 1, block0, None, H256::from([1; 32]));
		let b2 = insert_header(&backend, 2, b1, None, Default::default());
		{
			let tree_route = tree_route(blockchain, a1, a1).unwrap();
			assert_eq!(tree_route.common_block().hash, a1);
			assert!(tree_route.retracted().is_empty());
			assert!(tree_route.enacted().is_empty());
		}
		{
			let tree_route = tree_route(blockchain, a3, b2).unwrap();
			assert_eq!(tree_route.common_block().hash, block0);
			assert_eq!(
				tree_route.retracted().iter().map(|r| r.hash).collect::<Vec<_>>(),
				vec![a3, a2, a1]
			);
			assert_eq!(
				tree_route.enacted().iter().map(|r| r.hash).collect::<Vec<_>>(),
				vec![b1, b2]
			);
		}
		{
			let tree_route = tree_route(blockchain, a1, a3).unwrap();
			assert_eq!(tree_route.common_block().hash, a1);
			assert!(tree_route.retracted().is_empty());
			assert_eq!(
				tree_route.enacted().iter().map(|r| r.hash).collect::<Vec<_>>(),
				vec![a2, a3]
			);
		}
		{
			let tree_route = tree_route(blockchain, a3, a1).unwrap();
			assert_eq!(tree_route.common_block().hash, a1);
			assert_eq!(
				tree_route.retracted().iter().map(|r| r.hash).collect::<Vec<_>>(),
				vec![a3, a2]
			);
			assert!(tree_route.enacted().is_empty());
		}
		{
			let tree_route = tree_route(blockchain, a2, a2).unwrap();
			assert_eq!(tree_route.common_block().hash, a2);
			assert!(tree_route.retracted().is_empty());
			assert!(tree_route.enacted().is_empty());
		}
	}
	#[test]
	fn tree_route_child() {
		let backend = Backend::<Block>::new_test(1000, 100);
		let blockchain = backend.blockchain();
		let block0 = insert_header(&backend, 0, Default::default(), None, Default::default());
		let block1 = insert_header(&backend, 1, block0, None, Default::default());
		{
			let tree_route = tree_route(blockchain, block0, block1).unwrap();
			assert_eq!(tree_route.common_block().hash, block0);
			assert!(tree_route.retracted().is_empty());
			assert_eq!(
				tree_route.enacted().iter().map(|r| r.hash).collect::<Vec<_>>(),
				vec![block1]
			);
		}
	}
	#[test]
	fn lowest_common_ancestor_works() {
		let backend = Backend::<Block>::new_test(1000, 100);
		let blockchain = backend.blockchain();
		let block0 = insert_header(&backend, 0, Default::default(), None, Default::default());
		// fork from genesis: 3 prong.
		let a1 = insert_header(&backend, 1, block0, None, Default::default());
		let a2 = insert_header(&backend, 2, a1, None, Default::default());
		let a3 = insert_header(&backend, 3, a2, None, Default::default());
		// fork from genesis: 2 prong.
		let b1 = insert_header(&backend, 1, block0, None, H256::from([1; 32]));
		let b2 = insert_header(&backend, 2, b1, None, Default::default());
		{
			let lca = lowest_common_ancestor(blockchain, a3, b2).unwrap();
			assert_eq!(lca.hash, block0);
			assert_eq!(lca.number, 0);
		}
		{
			let lca = lowest_common_ancestor(blockchain, a1, a3).unwrap();
			assert_eq!(lca.hash, a1);
			assert_eq!(lca.number, 1);
		}
		{
			let lca = lowest_common_ancestor(blockchain, a3, a1).unwrap();
			assert_eq!(lca.hash, a1);
			assert_eq!(lca.number, 1);
		}
		{
			let lca = lowest_common_ancestor(blockchain, a2, a3).unwrap();
			assert_eq!(lca.hash, a2);
			assert_eq!(lca.number, 2);
		}
		{
			let lca = lowest_common_ancestor(blockchain, a2, a1).unwrap();
			assert_eq!(lca.hash, a1);
			assert_eq!(lca.number, 1);
		}
		{
			let lca = lowest_common_ancestor(blockchain, a2, a2).unwrap();
			assert_eq!(lca.hash, a2);
			assert_eq!(lca.number, 2);
		}
	}
	#[test]
	fn displaced_leaves_after_finalizing_works_with_disconnect() {
		// In this test we will create a situation that can typically happen after warp sync.
		// The situation looks like this:
		// g -> <unimported> -> a3 -> a4
		// Basically there is a gap of unimported blocks at some point in the chain.
		let backend = Backend::<Block>::new_test(1000, 100);
		let blockchain = backend.blockchain();
		let genesis_number = 0;
		let genesis_hash =
			insert_header(&backend, genesis_number, Default::default(), None, Default::default());
		let a3_number = 3;
		let a3_hash = insert_disconnected_header(
			&backend,
			a3_number,
			H256::from([200; 32]),
			H256::from([1; 32]),
			true,
		);
		let a4_number = 4;
		let a4_hash =
			insert_disconnected_header(&backend, a4_number, a3_hash, H256::from([2; 32]), true);
		{
			let displaced =
				blockchain.displaced_leaves_after_finalizing(a3_hash, a3_number).unwrap();
			assert_eq!(blockchain.leaves().unwrap(), vec![a4_hash, genesis_hash]);
			assert_eq!(displaced.displaced_leaves, vec![(genesis_number, genesis_hash)]);
			assert_eq!(displaced.displaced_blocks, vec![]);
		}
		{
			let displaced =
				blockchain.displaced_leaves_after_finalizing(a4_hash, a4_number).unwrap();
			assert_eq!(blockchain.leaves().unwrap(), vec![a4_hash, genesis_hash]);
			assert_eq!(displaced.displaced_leaves, vec![(genesis_number, genesis_hash)]);
			assert_eq!(displaced.displaced_blocks, vec![]);
		}
		// Import block a1 which has the genesis block as parent.
		// g -> a1 -> <unimported> -> a3(f) -> a4
		let a1_number = 1;
		let a1_hash = insert_disconnected_header(
			&backend,
			a1_number,
			genesis_hash,
			H256::from([123; 32]),
			false,
		);
		{
			let displaced =
				blockchain.displaced_leaves_after_finalizing(a3_hash, a3_number).unwrap();
			assert_eq!(blockchain.leaves().unwrap(), vec![a4_hash, a1_hash]);
			assert_eq!(displaced.displaced_leaves, vec![]);
			assert_eq!(displaced.displaced_blocks, vec![]);
		}
		// Import block b1 which has the genesis block as parent.
		// g -> a1 -> <unimported> -> a3(f) -> a4
		//  \-> b1
		let b1_number = 1;
		let b1_hash = insert_disconnected_header(
			&backend,
			b1_number,
			genesis_hash,
			H256::from([124; 32]),
			false,
		);
		{
			let displaced =
				blockchain.displaced_leaves_after_finalizing(a3_hash, a3_number).unwrap();
			assert_eq!(blockchain.leaves().unwrap(), vec![a4_hash, a1_hash, b1_hash]);
			assert_eq!(displaced.displaced_leaves, vec![]);
			assert_eq!(displaced.displaced_blocks, vec![]);
		}
		// If branch of b blocks is higher in number than a branch, we
		// should still not prune disconnected leafs.
		// g -> a1 -> <unimported> -> a3(f) -> a4
		//  \-> b1 -> b2 ----------> b3 ----> b4 -> b5
		let b2_number = 2;
		let b2_hash =
			insert_disconnected_header(&backend, b2_number, b1_hash, H256::from([40; 32]), false);
		let b3_number = 3;
		let b3_hash =
			insert_disconnected_header(&backend, b3_number, b2_hash, H256::from([41; 32]), false);
		let b4_number = 4;
		let b4_hash =
			insert_disconnected_header(&backend, b4_number, b3_hash, H256::from([42; 32]), false);
		let b5_number = 5;
		let b5_hash =
			insert_disconnected_header(&backend, b5_number, b4_hash, H256::from([43; 32]), false);
		{
			let displaced =
				blockchain.displaced_leaves_after_finalizing(a3_hash, a3_number).unwrap();
			assert_eq!(blockchain.leaves().unwrap(), vec![b5_hash, a4_hash, a1_hash]);
			assert_eq!(displaced.displaced_leaves, vec![]);
			assert_eq!(displaced.displaced_blocks, vec![]);
		}
		// Even though there is a disconnect, diplace should still detect
		// branches above the block gap.
		//                              /-> c4
		// g -> a1 -> <unimported> -> a3 -> a4(f)
		//  \-> b1 -> b2 ----------> b3 -> b4 -> b5
		let c4_number = 4;
		let c4_hash =
			insert_disconnected_header(&backend, c4_number, a3_hash, H256::from([44; 32]), false);
		{
			let displaced =
				blockchain.displaced_leaves_after_finalizing(a4_hash, a4_number).unwrap();
			assert_eq!(blockchain.leaves().unwrap(), vec![b5_hash, a4_hash, c4_hash, a1_hash]);
			assert_eq!(displaced.displaced_leaves, vec![(c4_number, c4_hash)]);
			assert_eq!(displaced.displaced_blocks, vec![c4_hash]);
		}
	}
	#[test]
	fn displaced_leaves_after_finalizing_works() {
		let backend = Backend::<Block>::new_test(1000, 100);
		let blockchain = backend.blockchain();
		let genesis_number = 0;
		let genesis_hash =
			insert_header(&backend, genesis_number, Default::default(), None, Default::default());
		// fork from genesis: 3 prong.
		// block 0 -> a1 -> a2 -> a3
		//        \
		//         -> b1 -> b2 -> c1 -> c2
		//              \
		//               -> d1 -> d2
		let a1_number = 1;
		let a1_hash = insert_header(&backend, a1_number, genesis_hash, None, Default::default());
		let a2_number = 2;
		let a2_hash = insert_header(&backend, a2_number, a1_hash, None, Default::default());
		let a3_number = 3;
		let a3_hash = insert_header(&backend, a3_number, a2_hash, None, Default::default());
		{
			let displaced = blockchain
				.displaced_leaves_after_finalizing(genesis_hash, genesis_number)
				.unwrap();
			assert_eq!(displaced.displaced_leaves, vec![]);
			assert_eq!(displaced.displaced_blocks, vec![]);
		}
		{
			let displaced_a1 =
				blockchain.displaced_leaves_after_finalizing(a1_hash, a1_number).unwrap();
			assert_eq!(displaced_a1.displaced_leaves, vec![]);
			assert_eq!(displaced_a1.displaced_blocks, vec![]);
			let displaced_a2 =
				blockchain.displaced_leaves_after_finalizing(a2_hash, a3_number).unwrap();
			assert_eq!(displaced_a2.displaced_leaves, vec![]);
			assert_eq!(displaced_a2.displaced_blocks, vec![]);
			let displaced_a3 =
				blockchain.displaced_leaves_after_finalizing(a3_hash, a3_number).unwrap();
			assert_eq!(displaced_a3.displaced_leaves, vec![]);
			assert_eq!(displaced_a3.displaced_blocks, vec![]);
		}
		{
			// Finalized block is above leaves and not imported yet.
			// We will not be able to make a connection,
			// nothing can be marked as displaced.
			let displaced =
				blockchain.displaced_leaves_after_finalizing(H256::from([57; 32]), 10).unwrap();
			assert_eq!(displaced.displaced_leaves, vec![]);
			assert_eq!(displaced.displaced_blocks, vec![]);
		}
		// fork from genesis: 2 prong.
		let b1_number = 1;
		let b1_hash = insert_header(&backend, b1_number, genesis_hash, None, H256::from([1; 32]));
		let b2_number = 2;
		let b2_hash = insert_header(&backend, b2_number, b1_hash, None, Default::default());
		// fork from b2.
		let c1_number = 3;
		let c1_hash = insert_header(&backend, c1_number, b2_hash, None, H256::from([2; 32]));
		let c2_number = 4;
		let c2_hash = insert_header(&backend, c2_number, c1_hash, None, Default::default());
		// fork from b1.
		let d1_number = 2;
		let d1_hash = insert_header(&backend, d1_number, b1_hash, None, H256::from([3; 32]));
		let d2_number = 3;
		let d2_hash = insert_header(&backend, d2_number, d1_hash, None, Default::default());
		{
			let displaced_a1 =
				blockchain.displaced_leaves_after_finalizing(a1_hash, a1_number).unwrap();
			assert_eq!(
				displaced_a1.displaced_leaves,
				vec![(c2_number, c2_hash), (d2_number, d2_hash)]
			);
			let mut displaced_blocks = vec![b1_hash, b2_hash, c1_hash, c2_hash, d1_hash, d2_hash];
			displaced_blocks.sort();
			assert_eq!(displaced_a1.displaced_blocks, displaced_blocks);
			let displaced_a2 =
				blockchain.displaced_leaves_after_finalizing(a2_hash, a2_number).unwrap();
			assert_eq!(displaced_a1.displaced_leaves, displaced_a2.displaced_leaves);
			assert_eq!(displaced_a1.displaced_blocks, displaced_a2.displaced_blocks);
			let displaced_a3 =
				blockchain.displaced_leaves_after_finalizing(a3_hash, a3_number).unwrap();
			assert_eq!(displaced_a1.displaced_leaves, displaced_a3.displaced_leaves);
			assert_eq!(displaced_a1.displaced_blocks, displaced_a3.displaced_blocks);
		}
		{
			let displaced =
				blockchain.displaced_leaves_after_finalizing(b1_hash, b1_number).unwrap();
			assert_eq!(displaced.displaced_leaves, vec![(a3_number, a3_hash)]);
			let mut displaced_blocks = vec![a1_hash, a2_hash, a3_hash];
			displaced_blocks.sort();
			assert_eq!(displaced.displaced_blocks, displaced_blocks);
		}
		{
			let displaced =
				blockchain.displaced_leaves_after_finalizing(b2_hash, b2_number).unwrap();
			assert_eq!(
				displaced.displaced_leaves,
				vec![(a3_number, a3_hash), (d2_number, d2_hash)]
			);
			let mut displaced_blocks = vec![a1_hash, a2_hash, a3_hash, d1_hash, d2_hash];
			displaced_blocks.sort();
			assert_eq!(displaced.displaced_blocks, displaced_blocks);
		}
		{
			let displaced =
				blockchain.displaced_leaves_after_finalizing(c2_hash, c2_number).unwrap();
			assert_eq!(
				displaced.displaced_leaves,
				vec![(a3_number, a3_hash), (d2_number, d2_hash)]
			);
			let mut displaced_blocks = vec![a1_hash, a2_hash, a3_hash, d1_hash, d2_hash];
			displaced_blocks.sort();
			assert_eq!(displaced.displaced_blocks, displaced_blocks);
		}
	}
	#[test]
	fn test_tree_route_regression() {
		// NOTE: this is a test for a regression introduced in #3665, the result
		// of tree_route would be erroneously computed, since it was taking into
		// account the `ancestor` in `CachedHeaderMetadata` for the comparison.
		// in this test we simulate the same behavior with the side-effect
		// triggering the issue being eviction of a previously fetched record
		// from the cache, therefore this test is dependent on the LRU cache
		// size for header metadata, which is currently set to 5000 elements.
		let backend = Backend::<Block>::new_test(10000, 10000);
		let blockchain = backend.blockchain();
		let genesis = insert_header(&backend, 0, Default::default(), None, Default::default());
		let block100 = (1..=100).fold(genesis, |parent, n| {
			insert_header(&backend, n, parent, None, Default::default())
		});
		let block7000 = (101..=7000).fold(block100, |parent, n| {
			insert_header(&backend, n, parent, None, Default::default())
		});
		// This will cause the ancestor of `block100` to be set to `genesis` as a side-effect.
		lowest_common_ancestor(blockchain, genesis, block100).unwrap();
		// While traversing the tree we will have to do 6900 calls to
		// `header_metadata`, which will make sure we will exhaust our cache
		// which only takes 5000 elements. In particular, the `CachedHeaderMetadata` struct for
		// block #100 will be evicted and will get a new value (with ancestor set to its parent).
		let tree_route = tree_route(blockchain, block100, block7000).unwrap();
		assert!(tree_route.retracted().is_empty());
	}
	#[test]
	fn test_leaves_with_complex_block_tree() {
		let backend: Arc<Backend<substrate_test_runtime_client::runtime::Block>> =
			Arc::new(Backend::new_test(20, 20));
		substrate_test_runtime_client::trait_tests::test_leaves_for_backend(backend);
	}
	#[test]
	fn test_children_with_complex_block_tree() {
		let backend: Arc<Backend<substrate_test_runtime_client::runtime::Block>> =
			Arc::new(Backend::new_test(20, 20));
		substrate_test_runtime_client::trait_tests::test_children_for_backend(backend);
	}
	#[test]
	fn test_blockchain_query_by_number_gets_canonical() {
		let backend: Arc<Backend<substrate_test_runtime_client::runtime::Block>> =
			Arc::new(Backend::new_test(20, 20));
		substrate_test_runtime_client::trait_tests::test_blockchain_query_by_number_gets_canonical(
			backend,
		);
	}
	#[test]
	fn test_leaves_pruned_on_finality() {
		//   / 1b - 2b - 3b
		// 0 - 1a - 2a
		//   \ 1c
		let backend: Backend<Block> = Backend::new_test(10, 10);
		let block0 = insert_header(&backend, 0, Default::default(), None, Default::default());
		let block1_a = insert_header(&backend, 1, block0, None, Default::default());
		let block1_b = insert_header(&backend, 1, block0, None, [1; 32].into());
		let block1_c = insert_header(&backend, 1, block0, None, [2; 32].into());
		assert_eq!(backend.blockchain().leaves().unwrap(), vec![block1_a, block1_b, block1_c]);
		let block2_a = insert_header(&backend, 2, block1_a, None, Default::default());
		let block2_b = insert_header(&backend, 2, block1_b, None, Default::default());
		let block3_b = insert_header(&backend, 3, block2_b, None, [3; 32].into());
		assert_eq!(backend.blockchain().leaves().unwrap(), vec![block3_b, block2_a, block1_c]);
		backend.finalize_block(block1_a, None).unwrap();
		backend.finalize_block(block2_a, None).unwrap();
		// All leaves are pruned that are known to not belong to canonical branch
		assert_eq!(backend.blockchain().leaves().unwrap(), vec![block2_a]);
	}
	#[test]
	fn test_aux() {
		let backend: Backend<substrate_test_runtime_client::runtime::Block> =
			Backend::new_test(0, 0);
		assert!(backend.get_aux(b"test").unwrap().is_none());
		backend.insert_aux(&[(&b"test"[..], &b"hello"[..])], &[]).unwrap();
		assert_eq!(b"hello", &backend.get_aux(b"test").unwrap().unwrap()[..]);
		backend.insert_aux(&[], &[&b"test"[..]]).unwrap();
		assert!(backend.get_aux(b"test").unwrap().is_none());
	}
	#[test]
	fn test_finalize_block_with_justification() {
		use sc_client_api::blockchain::Backend as BlockChainBackend;
		let backend = Backend::<Block>::new_test(10, 10);
		let block0 = insert_header(&backend, 0, Default::default(), None, Default::default());
		let block1 = insert_header(&backend, 1, block0, None, Default::default());
		let justification = Some((CONS0_ENGINE_ID, vec![1, 2, 3]));
		backend.finalize_block(block1, justification.clone()).unwrap();
		assert_eq!(
			backend.blockchain().justifications(block1).unwrap(),
			justification.map(Justifications::from),
		);
	}
	#[test]
	fn test_append_justification_to_finalized_block() {
		use sc_client_api::blockchain::Backend as BlockChainBackend;
		let backend = Backend::<Block>::new_test(10, 10);
		let block0 = insert_header(&backend, 0, Default::default(), None, Default::default());
		let block1 = insert_header(&backend, 1, block0, None, Default::default());
		let just0 = (CONS0_ENGINE_ID, vec![1, 2, 3]);
		backend.finalize_block(block1, Some(just0.clone().into())).unwrap();
		let just1 = (CONS1_ENGINE_ID, vec![4, 5]);
		backend.append_justification(block1, just1.clone()).unwrap();
		let just2 = (CONS1_ENGINE_ID, vec![6, 7]);
		assert!(matches!(
			backend.append_justification(block1, just2),
			Err(ClientError::BadJustification(_))
		));
		let justifications = {
			let mut just = Justifications::from(just0);
			just.append(just1);
			just
		};
		assert_eq!(backend.blockchain().justifications(block1).unwrap(), Some(justifications),);
	}
	#[test]
	fn test_finalize_multiple_blocks_in_single_op() {
		let backend = Backend::<Block>::new_test(10, 10);
		let block0 = insert_header(&backend, 0, Default::default(), None, Default::default());
		let block1 = insert_header(&backend, 1, block0, None, Default::default());
		let block2 = insert_header(&backend, 2, block1, None, Default::default());
		let block3 = insert_header(&backend, 3, block2, None, Default::default());
		let block4 = insert_header(&backend, 4, block3, None, Default::default());
		{
			let mut op = backend.begin_operation().unwrap();
			backend.begin_state_operation(&mut op, block0).unwrap();
			op.mark_finalized(block1, None).unwrap();
			op.mark_finalized(block2, None).unwrap();
			backend.commit_operation(op).unwrap();
		}
		{
			let mut op = backend.begin_operation().unwrap();
			backend.begin_state_operation(&mut op, block2).unwrap();
			op.mark_finalized(block3, None).unwrap();
			op.mark_finalized(block4, None).unwrap();
			backend.commit_operation(op).unwrap();
		}
	}
	#[test]
	fn storage_hash_is_cached_correctly() {
		let state_version = StateVersion::default();
		let backend = Backend::<Block>::new_test(10, 10);
		let hash0 = {
			let mut op = backend.begin_operation().unwrap();
			backend.begin_state_operation(&mut op, Default::default()).unwrap();
			let mut header = Header {
				number: 0,
				parent_hash: Default::default(),
				state_root: Default::default(),
				digest: Default::default(),
				extrinsics_root: Default::default(),
			};
			let storage = vec![(b"test".to_vec(), b"test".to_vec())];
			header.state_root = op
				.old_state
				.storage_root(storage.iter().map(|(x, y)| (&x[..], Some(&y[..]))), state_version)
				.0
				.into();
			let hash = header.hash();
			op.reset_storage(
				Storage {
					top: storage.into_iter().collect(),
					children_default: Default::default(),
				},
				state_version,
			)
			.unwrap();
			op.set_block_data(header.clone(), Some(vec![]), None, None, NewBlockState::Best)
				.unwrap();
			backend.commit_operation(op).unwrap();
			hash
		};
		let block0_hash = backend.state_at(hash0).unwrap().storage_hash(&b"test"[..]).unwrap();
		let hash1 = {
			let mut op = backend.begin_operation().unwrap();
			backend.begin_state_operation(&mut op, hash0).unwrap();
			let mut header = Header {
				number: 1,
				parent_hash: hash0,
				state_root: Default::default(),
				digest: Default::default(),
				extrinsics_root: Default::default(),
			};
			let storage = vec![(b"test".to_vec(), Some(b"test2".to_vec()))];
			let (root, overlay) = op.old_state.storage_root(
				storage.iter().map(|(k, v)| (k.as_slice(), v.as_ref().map(|v| &v[..]))),
				state_version,
			);
			op.update_db_storage(overlay).unwrap();
			header.state_root = root.into();
			let hash = header.hash();
			op.update_storage(storage, Vec::new()).unwrap();
			op.set_block_data(header, Some(vec![]), None, None, NewBlockState::Normal)
				.unwrap();
			backend.commit_operation(op).unwrap();
			hash
		};
		{
			let header = backend.blockchain().header(hash1).unwrap().unwrap();
			let mut op = backend.begin_operation().unwrap();
			op.set_block_data(header, None, None, None, NewBlockState::Best).unwrap();
			backend.commit_operation(op).unwrap();
		}
		let block1_hash = backend.state_at(hash1).unwrap().storage_hash(&b"test"[..]).unwrap();
		assert_ne!(block0_hash, block1_hash);
	}
	#[test]
	fn test_finalize_non_sequential() {
		let backend = Backend::<Block>::new_test(10, 10);
		let block0 = insert_header(&backend, 0, Default::default(), None, Default::default());
		let block1 = insert_header(&backend, 1, block0, None, Default::default());
		let block2 = insert_header(&backend, 2, block1, None, Default::default());
		{
			let mut op = backend.begin_operation().unwrap();
			backend.begin_state_operation(&mut op, block0).unwrap();
			op.mark_finalized(block2, None).unwrap();
			backend.commit_operation(op).unwrap_err();
		}
	}
	#[test]
	fn prune_blocks_on_finalize() {
		let pruning_modes =
			vec![BlocksPruning::Some(2), BlocksPruning::KeepFinalized, BlocksPruning::KeepAll];
		for pruning_mode in pruning_modes {
			let backend = Backend::<Block>::new_test_with_tx_storage(pruning_mode, 0);
			let mut blocks = Vec::new();
			let mut prev_hash = Default::default();
			for i in 0..5 {
				let hash = insert_block(
					&backend,
					i,
					prev_hash,
					None,
					Default::default(),
					vec![i.into()],
					None,
				)
				.unwrap();
				blocks.push(hash);
				prev_hash = hash;
			}
			{
				let mut op = backend.begin_operation().unwrap();
				backend.begin_state_operation(&mut op, blocks[4]).unwrap();
				for i in 1..5 {
					op.mark_finalized(blocks[i], None).unwrap();
				}
				backend.commit_operation(op).unwrap();
			}
			let bc = backend.blockchain();
			if matches!(pruning_mode, BlocksPruning::Some(_)) {
				assert_eq!(None, bc.body(blocks[0]).unwrap());
				assert_eq!(None, bc.body(blocks[1]).unwrap());
				assert_eq!(None, bc.body(blocks[2]).unwrap());
				assert_eq!(Some(vec![3.into()]), bc.body(blocks[3]).unwrap());
				assert_eq!(Some(vec![4.into()]), bc.body(blocks[4]).unwrap());
			} else {
				for i in 0..5 {
					assert_eq!(Some(vec![(i as u64).into()]), bc.body(blocks[i]).unwrap());
				}
			}
		}
	}
	#[test]
	fn prune_blocks_on_finalize_with_fork() {
		sp_tracing::try_init_simple();
		let pruning_modes =
			vec![BlocksPruning::Some(2), BlocksPruning::KeepFinalized, BlocksPruning::KeepAll];
		for pruning in pruning_modes {
			let backend = Backend::<Block>::new_test_with_tx_storage(pruning, 10);
			let mut blocks = Vec::new();
			let mut prev_hash = Default::default();
			for i in 0..5 {
				let hash = insert_block(
					&backend,
					i,
					prev_hash,
					None,
					Default::default(),
					vec![i.into()],
					None,
				)
				.unwrap();
				blocks.push(hash);
				prev_hash = hash;
			}
			// insert a fork at block 2
			let fork_hash_root =
				insert_block(&backend, 2, blocks[1], None, H256::random(), vec![2.into()], None)
					.unwrap();
			insert_block(
				&backend,
				3,
				fork_hash_root,
				None,
				H256::random(),
				vec![3.into(), 11.into()],
				None,
			)
			.unwrap();
			let mut op = backend.begin_operation().unwrap();
			backend.begin_state_operation(&mut op, blocks[4]).unwrap();
			op.mark_head(blocks[4]).unwrap();
			backend.commit_operation(op).unwrap();
			let bc = backend.blockchain();
			assert_eq!(Some(vec![2.into()]), bc.body(fork_hash_root).unwrap());
			for i in 1..5 {
				let mut op = backend.begin_operation().unwrap();
				backend.begin_state_operation(&mut op, blocks[4]).unwrap();
				op.mark_finalized(blocks[i], None).unwrap();
				backend.commit_operation(op).unwrap();
			}
			if matches!(pruning, BlocksPruning::Some(_)) {
				assert_eq!(None, bc.body(blocks[0]).unwrap());
				assert_eq!(None, bc.body(blocks[1]).unwrap());
				assert_eq!(None, bc.body(blocks[2]).unwrap());
				assert_eq!(Some(vec![3.into()]), bc.body(blocks[3]).unwrap());
				assert_eq!(Some(vec![4.into()]), bc.body(blocks[4]).unwrap());
			} else {
				for i in 0..5 {
					assert_eq!(Some(vec![(i as u64).into()]), bc.body(blocks[i]).unwrap());
				}
			}
			if matches!(pruning, BlocksPruning::KeepAll) {
				assert_eq!(Some(vec![2.into()]), bc.body(fork_hash_root).unwrap());
			} else {
				assert_eq!(None, bc.body(fork_hash_root).unwrap());
			}
			assert_eq!(bc.info().best_number, 4);
			for i in 0..5 {
				assert!(bc.hash(i).unwrap().is_some());
			}
		}
	}
	#[test]
	fn prune_blocks_on_finalize_and_reorg() {
		//	0 - 1b
		//	\ - 1a - 2a - 3a
		//	     \ - 2b
		let backend = Backend::<Block>::new_test_with_tx_storage(BlocksPruning::Some(10), 10);
		let make_block = |index, parent, val: u64| {
			insert_block(&backend, index, parent, None, H256::random(), vec![val.into()], None)
				.unwrap()
		};
		let block_0 = make_block(0, Default::default(), 0x00);
		let block_1a = make_block(1, block_0, 0x1a);
		let block_1b = make_block(1, block_0, 0x1b);
		let block_2a = make_block(2, block_1a, 0x2a);
		let block_2b = make_block(2, block_1a, 0x2b);
		let block_3a = make_block(3, block_2a, 0x3a);
		// Make sure 1b is head
		let mut op = backend.begin_operation().unwrap();
		backend.begin_state_operation(&mut op, block_0).unwrap();
		op.mark_head(block_1b).unwrap();
		backend.commit_operation(op).unwrap();
		// Finalize 3a
		let mut op = backend.begin_operation().unwrap();
		backend.begin_state_operation(&mut op, block_0).unwrap();
		op.mark_head(block_3a).unwrap();
		op.mark_finalized(block_1a, None).unwrap();
		op.mark_finalized(block_2a, None).unwrap();
		op.mark_finalized(block_3a, None).unwrap();
		backend.commit_operation(op).unwrap();
		let bc = backend.blockchain();
		assert_eq!(None, bc.body(block_1b).unwrap());
		assert_eq!(None, bc.body(block_2b).unwrap());
		assert_eq!(Some(vec![0x00.into()]), bc.body(block_0).unwrap());
		assert_eq!(Some(vec![0x1a.into()]), bc.body(block_1a).unwrap());
		assert_eq!(Some(vec![0x2a.into()]), bc.body(block_2a).unwrap());
		assert_eq!(Some(vec![0x3a.into()]), bc.body(block_3a).unwrap());
	}
	#[test]
	fn indexed_data_block_body() {
		let backend = Backend::<Block>::new_test_with_tx_storage(BlocksPruning::Some(1), 10);
		let x0 = ExtrinsicWrapper::from(0u64).encode();
		let x1 = ExtrinsicWrapper::from(1u64).encode();
		let x0_hash = <HashingFor<Block> as sp_core::Hasher>::hash(&x0[1..]);
		let x1_hash = <HashingFor<Block> as sp_core::Hasher>::hash(&x1[1..]);
		let index = vec![
			IndexOperation::Insert {
				extrinsic: 0,
				hash: x0_hash.as_ref().to_vec(),
				size: (x0.len() - 1) as u32,
			},
			IndexOperation::Insert {
				extrinsic: 1,
				hash: x1_hash.as_ref().to_vec(),
				size: (x1.len() - 1) as u32,
			},
		];
		let hash = insert_block(
			&backend,
			0,
			Default::default(),
			None,
			Default::default(),
			vec![0u64.into(), 1u64.into()],
			Some(index),
		)
		.unwrap();
		let bc = backend.blockchain();
		assert_eq!(bc.indexed_transaction(x0_hash).unwrap().unwrap(), &x0[1..]);
		assert_eq!(bc.indexed_transaction(x1_hash).unwrap().unwrap(), &x1[1..]);
		let hashof0 = bc.info().genesis_hash;
		// Push one more blocks and make sure block is pruned and transaction index is cleared.
		let block1 =
			insert_block(&backend, 1, hash, None, Default::default(), vec![], None).unwrap();
		backend.finalize_block(block1, None).unwrap();
		assert_eq!(bc.body(hashof0).unwrap(), None);
		assert_eq!(bc.indexed_transaction(x0_hash).unwrap(), None);
		assert_eq!(bc.indexed_transaction(x1_hash).unwrap(), None);
	}
	#[test]
	fn index_invalid_size() {
		let backend = Backend::<Block>::new_test_with_tx_storage(BlocksPruning::Some(1), 10);
		let x0 = ExtrinsicWrapper::from(0u64).encode();
		let x1 = ExtrinsicWrapper::from(1u64).encode();
		let x0_hash = <HashingFor<Block> as sp_core::Hasher>::hash(&x0[..]);
		let x1_hash = <HashingFor<Block> as sp_core::Hasher>::hash(&x1[..]);
		let index = vec![
			IndexOperation::Insert {
				extrinsic: 0,
				hash: x0_hash.as_ref().to_vec(),
				size: (x0.len()) as u32,
			},
			IndexOperation::Insert {
				extrinsic: 1,
				hash: x1_hash.as_ref().to_vec(),
				size: (x1.len() + 1) as u32,
			},
		];
		insert_block(
			&backend,
			0,
			Default::default(),
			None,
			Default::default(),
			vec![0u64.into(), 1u64.into()],
			Some(index),
		)
		.unwrap();
		let bc = backend.blockchain();
		assert_eq!(bc.indexed_transaction(x0_hash).unwrap().unwrap(), &x0[..]);
		assert_eq!(bc.indexed_transaction(x1_hash).unwrap(), None);
	}
	#[test]
	fn renew_transaction_storage() {
		let backend = Backend::<Block>::new_test_with_tx_storage(BlocksPruning::Some(2), 10);
		let mut blocks = Vec::new();
		let mut prev_hash = Default::default();
		let x1 = ExtrinsicWrapper::from(0u64).encode();
		let x1_hash = <HashingFor<Block> as sp_core::Hasher>::hash(&x1[1..]);
		for i in 0..10 {
			let mut index = Vec::new();
			if i == 0 {
				index.push(IndexOperation::Insert {
					extrinsic: 0,
					hash: x1_hash.as_ref().to_vec(),
					size: (x1.len() - 1) as u32,
				});
			} else if i < 5 {
				// keep renewing 1st
				index.push(IndexOperation::Renew { extrinsic: 0, hash: x1_hash.as_ref().to_vec() });
			} // else stop renewing
			let hash = insert_block(
				&backend,
				i,
				prev_hash,
				None,
				Default::default(),
				vec![i.into()],
				Some(index),
			)
			.unwrap();
			blocks.push(hash);
			prev_hash = hash;
		}
		for i in 1..10 {
			let mut op = backend.begin_operation().unwrap();
			backend.begin_state_operation(&mut op, blocks[4]).unwrap();
			op.mark_finalized(blocks[i], None).unwrap();
			backend.commit_operation(op).unwrap();
			let bc = backend.blockchain();
			if i < 6 {
				assert!(bc.indexed_transaction(x1_hash).unwrap().is_some());
			} else {
				assert!(bc.indexed_transaction(x1_hash).unwrap().is_none());
			}
		}
	}
	#[test]
	fn remove_leaf_block_works() {
		let backend = Backend::<Block>::new_test_with_tx_storage(BlocksPruning::Some(2), 10);
		let mut blocks = Vec::new();
		let mut prev_hash = Default::default();
		for i in 0..2 {
			let hash = insert_block(
				&backend,
				i,
				prev_hash,
				None,
				Default::default(),
				vec![i.into()],
				None,
			)
			.unwrap();
			blocks.push(hash);
			prev_hash = hash;
		}
		for i in 0..2 {
			let hash = insert_block(
				&backend,
				2,
				blocks[1],
				None,
				sp_core::H256::random(),
				vec![i.into()],
				None,
			)
			.unwrap();
			blocks.push(hash);
		}
		// insert a fork at block 1, which becomes best block
		let best_hash = insert_block(
			&backend,
			1,
			blocks[0],
			None,
			sp_core::H256::random(),
			vec![42.into()],
			None,
		)
		.unwrap();
		assert_eq!(backend.blockchain().info().best_hash, best_hash);
		assert!(backend.remove_leaf_block(best_hash).is_err());
		assert_eq!(backend.blockchain().leaves().unwrap(), vec![blocks[2], blocks[3], best_hash]);
		assert_eq!(backend.blockchain().children(blocks[1]).unwrap(), vec![blocks[2], blocks[3]]);
		assert!(backend.have_state_at(blocks[3], 2));
		assert!(backend.blockchain().header(blocks[3]).unwrap().is_some());
		backend.remove_leaf_block(blocks[3]).unwrap();
		assert!(!backend.have_state_at(blocks[3], 2));
		assert!(backend.blockchain().header(blocks[3]).unwrap().is_none());
		assert_eq!(backend.blockchain().leaves().unwrap(), vec![blocks[2], best_hash]);
		assert_eq!(backend.blockchain().children(blocks[1]).unwrap(), vec![blocks[2]]);
		assert!(backend.have_state_at(blocks[2], 2));
		assert!(backend.blockchain().header(blocks[2]).unwrap().is_some());
		backend.remove_leaf_block(blocks[2]).unwrap();
		assert!(!backend.have_state_at(blocks[2], 2));
		assert!(backend.blockchain().header(blocks[2]).unwrap().is_none());
		assert_eq!(backend.blockchain().leaves().unwrap(), vec![best_hash, blocks[1]]);
		assert_eq!(backend.blockchain().children(blocks[1]).unwrap(), vec![]);
		assert!(backend.have_state_at(blocks[1], 1));
		assert!(backend.blockchain().header(blocks[1]).unwrap().is_some());
		backend.remove_leaf_block(blocks[1]).unwrap();
		assert!(!backend.have_state_at(blocks[1], 1));
		assert!(backend.blockchain().header(blocks[1]).unwrap().is_none());
		assert_eq!(backend.blockchain().leaves().unwrap(), vec![best_hash]);
		assert_eq!(backend.blockchain().children(blocks[0]).unwrap(), vec![best_hash]);
	}
	#[test]
	fn test_import_existing_block_as_new_head() {
		let backend: Backend<Block> = Backend::new_test(10, 3);
		let block0 = insert_header(&backend, 0, Default::default(), None, Default::default());
		let block1 = insert_header(&backend, 1, block0, None, Default::default());
		let block2 = insert_header(&backend, 2, block1, None, Default::default());
		let block3 = insert_header(&backend, 3, block2, None, Default::default());
		let block4 = insert_header(&backend, 4, block3, None, Default::default());
		let block5 = insert_header(&backend, 5, block4, None, Default::default());
		assert_eq!(backend.blockchain().info().best_hash, block5);
		// Insert 1 as best again. This should fail because canonicalization_delay == 3 and best ==
		// 5
		let header = Header {
			number: 1,
			parent_hash: block0,
			state_root: BlakeTwo256::trie_root(Vec::new(), StateVersion::V1),
			digest: Default::default(),
			extrinsics_root: Default::default(),
		};
		let mut op = backend.begin_operation().unwrap();
		op.set_block_data(header, None, None, None, NewBlockState::Best).unwrap();
		assert!(matches!(backend.commit_operation(op), Err(sp_blockchain::Error::SetHeadTooOld)));
		// Insert 2 as best again.
		let header = backend.blockchain().header(block2).unwrap().unwrap();
		let mut op = backend.begin_operation().unwrap();
		op.set_block_data(header, None, None, None, NewBlockState::Best).unwrap();
		backend.commit_operation(op).unwrap();
		assert_eq!(backend.blockchain().info().best_hash, block2);
	}
	#[test]
	fn test_import_existing_block_as_final() {
		let backend: Backend<Block> = Backend::new_test(10, 10);
		let block0 = insert_header(&backend, 0, Default::default(), None, Default::default());
		let block1 = insert_header(&backend, 1, block0, None, Default::default());
		let _block2 = insert_header(&backend, 2, block1, None, Default::default());
		// Genesis is auto finalized, the rest are not.
		assert_eq!(backend.blockchain().info().finalized_hash, block0);
		// Insert 1 as final again.
		let header = backend.blockchain().header(block1).unwrap().unwrap();
		let mut op = backend.begin_operation().unwrap();
		op.set_block_data(header, None, None, None, NewBlockState::Final).unwrap();
		backend.commit_operation(op).unwrap();
		assert_eq!(backend.blockchain().info().finalized_hash, block1);
	}
	#[test]
	fn test_import_existing_state_fails() {
		let backend: Backend<Block> = Backend::new_test(10, 10);
		let genesis =
			insert_block(&backend, 0, Default::default(), None, Default::default(), vec![], None)
				.unwrap();
		insert_block(&backend, 1, genesis, None, Default::default(), vec![], None).unwrap();
		let err = insert_block(&backend, 1, genesis, None, Default::default(), vec![], None)
			.err()
			.unwrap();
		match err {
			sp_blockchain::Error::StateDatabase(m) if m == "Block already exists" => (),
			e @ _ => panic!("Unexpected error {:?}", e),
		}
	}
	#[test]
	fn test_leaves_not_created_for_ancient_blocks() {
		let backend: Backend<Block> = Backend::new_test(10, 10);
		let block0 = insert_header(&backend, 0, Default::default(), None, Default::default());
		let block1_a = insert_header(&backend, 1, block0, None, Default::default());
		let block2_a = insert_header(&backend, 2, block1_a, None, Default::default());
		backend.finalize_block(block1_a, None).unwrap();
		assert_eq!(backend.blockchain().leaves().unwrap(), vec![block2_a]);
		// Insert a fork prior to finalization point. Leave should not be created.
		insert_header_no_head(&backend, 1, block0, [1; 32].into());
		assert_eq!(backend.blockchain().leaves().unwrap(), vec![block2_a]);
	}
	#[test]
	fn revert_non_best_blocks() {
		let backend = Backend::<Block>::new_test(10, 10);
		let genesis =
			insert_block(&backend, 0, Default::default(), None, Default::default(), vec![], None)
				.unwrap();
		let block1 =
			insert_block(&backend, 1, genesis, None, Default::default(), vec![], None).unwrap();
		let block2 =
			insert_block(&backend, 2, block1, None, Default::default(), vec![], None).unwrap();
		let block3 = {
			let mut op = backend.begin_operation().unwrap();
			backend.begin_state_operation(&mut op, block1).unwrap();
			let header = Header {
				number: 3,
				parent_hash: block2,
				state_root: BlakeTwo256::trie_root(Vec::new(), StateVersion::V1),
				digest: Default::default(),
				extrinsics_root: Default::default(),
			};
			op.set_block_data(header.clone(), Some(Vec::new()), None, None, NewBlockState::Normal)
				.unwrap();
			backend.commit_operation(op).unwrap();
			header.hash()
		};
		let block4 = {
			let mut op = backend.begin_operation().unwrap();
			backend.begin_state_operation(&mut op, block2).unwrap();
			let header = Header {
				number: 4,
				parent_hash: block3,
				state_root: BlakeTwo256::trie_root(Vec::new(), StateVersion::V1),
				digest: Default::default(),
				extrinsics_root: Default::default(),
			};
			op.set_block_data(header.clone(), Some(Vec::new()), None, None, NewBlockState::Normal)
				.unwrap();
			backend.commit_operation(op).unwrap();
			header.hash()
		};
		let block3_fork = {
			let mut op = backend.begin_operation().unwrap();
			backend.begin_state_operation(&mut op, block2).unwrap();
			let header = Header {
				number: 3,
				parent_hash: block2,
				state_root: BlakeTwo256::trie_root(Vec::new(), StateVersion::V1),
				digest: Default::default(),
				extrinsics_root: H256::from_low_u64_le(42),
			};
			op.set_block_data(header.clone(), Some(Vec::new()), None, None, NewBlockState::Normal)
				.unwrap();
			backend.commit_operation(op).unwrap();
			header.hash()
		};
		assert!(backend.have_state_at(block1, 1));
		assert!(backend.have_state_at(block2, 2));
		assert!(backend.have_state_at(block3, 3));
		assert!(backend.have_state_at(block4, 4));
		assert!(backend.have_state_at(block3_fork, 3));
		assert_eq!(backend.blockchain.leaves().unwrap(), vec![block4, block3_fork]);
		assert_eq!(4, backend.blockchain.leaves.read().highest_leaf().unwrap().0);
		assert_eq!(3, backend.revert(1, false).unwrap().0);
		assert!(backend.have_state_at(block1, 1));
		assert!(!backend.have_state_at(block2, 2));
		assert!(!backend.have_state_at(block3, 3));
		assert!(!backend.have_state_at(block4, 4));
		assert!(!backend.have_state_at(block3_fork, 3));
		assert_eq!(backend.blockchain.leaves().unwrap(), vec![block1]);
		assert_eq!(1, backend.blockchain.leaves.read().highest_leaf().unwrap().0);
	}
	#[test]
	fn revert_finalized_blocks() {
		let pruning_modes = [BlocksPruning::Some(10), BlocksPruning::KeepAll];
		// we will create a chain with 11 blocks, finalize block #8 and then
		// attempt to revert 5 blocks.
		for pruning_mode in pruning_modes {
			let backend = Backend::<Block>::new_test_with_tx_storage(pruning_mode, 1);
			let mut parent = Default::default();
			for i in 0..=10 {
				parent = insert_block(&backend, i, parent, None, Default::default(), vec![], None)
					.unwrap();
			}
			assert_eq!(backend.blockchain().info().best_number, 10);
			let block8 = backend.blockchain().hash(8).unwrap().unwrap();
			backend.finalize_block(block8, None).unwrap();
			backend.revert(5, true).unwrap();
			match pruning_mode {
				// we can only revert to blocks for which we have state, if pruning is enabled
				// then the last state available will be that of the latest finalized block
				BlocksPruning::Some(_) =>
					assert_eq!(backend.blockchain().info().finalized_number, 8),
				// otherwise if we're not doing state pruning we can revert past finalized blocks
				_ => assert_eq!(backend.blockchain().info().finalized_number, 5),
			}
		}
	}
	#[test]
	fn test_no_duplicated_leaves_allowed() {
		let backend: Backend<Block> = Backend::new_test(10, 10);
		let block0 = insert_header(&backend, 0, Default::default(), None, Default::default());
		let block1 = insert_header(&backend, 1, block0, None, Default::default());
		// Add block 2 not as the best block
		let block2 = insert_header_no_head(&backend, 2, block1, Default::default());
		assert_eq!(backend.blockchain().leaves().unwrap(), vec![block2]);
		assert_eq!(backend.blockchain().info().best_hash, block1);
		// Add block 2 as the best block
		let block2 = insert_header(&backend, 2, block1, None, Default::default());
		assert_eq!(backend.blockchain().leaves().unwrap(), vec![block2]);
		assert_eq!(backend.blockchain().info().best_hash, block2);
	}
	#[test]
	fn force_delayed_canonicalize_waiting_for_blocks_to_be_finalized() {
		let pruning_modes =
			[BlocksPruning::Some(10), BlocksPruning::KeepAll, BlocksPruning::KeepFinalized];
		for pruning_mode in pruning_modes {
			eprintln!("Running with pruning mode: {:?}", pruning_mode);
			let backend = Backend::<Block>::new_test_with_tx_storage(pruning_mode, 1);
			let genesis = insert_block(
				&backend,
				0,
				Default::default(),
				None,
				Default::default(),
				vec![],
				None,
			)
			.unwrap();
			let block1 = {
				let mut op = backend.begin_operation().unwrap();
				backend.begin_state_operation(&mut op, genesis).unwrap();
				let mut header = Header {
					number: 1,
					parent_hash: genesis,
					state_root: Default::default(),
					digest: Default::default(),
					extrinsics_root: Default::default(),
				};
				let storage = vec![(vec![1, 3, 5], None), (vec![5, 5, 5], Some(vec![4, 5, 6]))];
				let (root, overlay) = op.old_state.storage_root(
					storage.iter().map(|(k, v)| (k.as_slice(), v.as_ref().map(|v| &v[..]))),
					StateVersion::V1,
				);
				op.update_db_storage(overlay).unwrap();
				header.state_root = root.into();
				op.update_storage(storage, Vec::new()).unwrap();
				op.set_block_data(
					header.clone(),
					Some(Vec::new()),
					None,
					None,
					NewBlockState::Normal,
				)
				.unwrap();
				backend.commit_operation(op).unwrap();
				header.hash()
			};
			if matches!(pruning_mode, BlocksPruning::Some(_)) {
				assert_eq!(
					LastCanonicalized::Block(0),
					backend.storage.state_db.last_canonicalized()
				);
			}
			// This should not trigger any forced canonicalization as we didn't have imported any
			// best block by now.
			let block2 = {
				let mut op = backend.begin_operation().unwrap();
				backend.begin_state_operation(&mut op, block1).unwrap();
				let mut header = Header {
					number: 2,
					parent_hash: block1,
					state_root: Default::default(),
					digest: Default::default(),
					extrinsics_root: Default::default(),
				};
				let storage = vec![(vec![5, 5, 5], Some(vec![4, 5, 6, 2]))];
				let (root, overlay) = op.old_state.storage_root(
					storage.iter().map(|(k, v)| (k.as_slice(), v.as_ref().map(|v| &v[..]))),
					StateVersion::V1,
				);
				op.update_db_storage(overlay).unwrap();
				header.state_root = root.into();
				op.update_storage(storage, Vec::new()).unwrap();
				op.set_block_data(
					header.clone(),
					Some(Vec::new()),
					None,
					None,
					NewBlockState::Normal,
				)
				.unwrap();
				backend.commit_operation(op).unwrap();
				header.hash()
			};
			if matches!(pruning_mode, BlocksPruning::Some(_)) {
				assert_eq!(
					LastCanonicalized::Block(0),
					backend.storage.state_db.last_canonicalized()
				);
			}
			// This should also not trigger it yet, because we import a best block, but the best
			// block from the POV of the db is still at `0`.
			let block3 = {
				let mut op = backend.begin_operation().unwrap();
				backend.begin_state_operation(&mut op, block2).unwrap();
				let mut header = Header {
					number: 3,
					parent_hash: block2,
					state_root: Default::default(),
					digest: Default::default(),
					extrinsics_root: Default::default(),
				};
				let storage = vec![(vec![5, 5, 5], Some(vec![4, 5, 6, 3]))];
				let (root, overlay) = op.old_state.storage_root(
					storage.iter().map(|(k, v)| (k.as_slice(), v.as_ref().map(|v| &v[..]))),
					StateVersion::V1,
				);
				op.update_db_storage(overlay).unwrap();
				header.state_root = root.into();
				op.update_storage(storage, Vec::new()).unwrap();
				op.set_block_data(
					header.clone(),
					Some(Vec::new()),
					None,
					None,
					NewBlockState::Best,
				)
				.unwrap();
				backend.commit_operation(op).unwrap();
				header.hash()
			};
			// Now it should kick in.
			let block4 = {
				let mut op = backend.begin_operation().unwrap();
				backend.begin_state_operation(&mut op, block3).unwrap();
				let mut header = Header {
					number: 4,
					parent_hash: block3,
					state_root: Default::default(),
					digest: Default::default(),
					extrinsics_root: Default::default(),
				};
				let storage = vec![(vec![5, 5, 5], Some(vec![4, 5, 6, 4]))];
				let (root, overlay) = op.old_state.storage_root(
					storage.iter().map(|(k, v)| (k.as_slice(), v.as_ref().map(|v| &v[..]))),
					StateVersion::V1,
				);
				op.update_db_storage(overlay).unwrap();
				header.state_root = root.into();
				op.update_storage(storage, Vec::new()).unwrap();
				op.set_block_data(
					header.clone(),
					Some(Vec::new()),
					None,
					None,
					NewBlockState::Best,
				)
				.unwrap();
				backend.commit_operation(op).unwrap();
				header.hash()
			};
			if matches!(pruning_mode, BlocksPruning::Some(_)) {
				assert_eq!(
					LastCanonicalized::Block(2),
					backend.storage.state_db.last_canonicalized()
				);
			}
			assert_eq!(block1, backend.blockchain().hash(1).unwrap().unwrap());
			assert_eq!(block2, backend.blockchain().hash(2).unwrap().unwrap());
			assert_eq!(block3, backend.blockchain().hash(3).unwrap().unwrap());
			assert_eq!(block4, backend.blockchain().hash(4).unwrap().unwrap());
		}
	}
	#[test]
	fn test_pinned_blocks_on_finalize() {
		let backend = Backend::<Block>::new_test_with_tx_storage(BlocksPruning::Some(1), 10);
		let mut blocks = Vec::new();
		let mut prev_hash = Default::default();
		let build_justification = |i: u64| ([0, 0, 0, 0], vec![i.try_into().unwrap()]);
		// Block tree:
		//   0 -> 1 -> 2 -> 3 -> 4
		for i in 0..5 {
			let hash = insert_block(
				&backend,
				i,
				prev_hash,
				None,
				Default::default(),
				vec![i.into()],
				None,
			)
			.unwrap();
			blocks.push(hash);
			// Avoid block pruning.
			backend.pin_block(blocks[i as usize]).unwrap();
			prev_hash = hash;
		}
		let bc = backend.blockchain();
		// Check that we can properly access values when there is reference count
		// but no value.
		assert_eq!(Some(vec![1.into()]), bc.body(blocks[1]).unwrap());
		// Block 1 gets pinned three times
		backend.pin_block(blocks[1]).unwrap();
		backend.pin_block(blocks[1]).unwrap();
		// Finalize all blocks. This will trigger pruning.
		let mut op = backend.begin_operation().unwrap();
		backend.begin_state_operation(&mut op, blocks[4]).unwrap();
		for i in 1..5 {
			op.mark_finalized(blocks[i], Some(build_justification(i.try_into().unwrap())))
				.unwrap();
		}
		backend.commit_operation(op).unwrap();
		// Block 0, 1, 2, 3 are pinned, so all values should be cached.
		// Block 4 is inside the pruning window, its value is in db.
		assert_eq!(Some(vec![0.into()]), bc.body(blocks[0]).unwrap());
		assert_eq!(Some(vec![1.into()]), bc.body(blocks[1]).unwrap());
		assert_eq!(
			Some(Justifications::from(build_justification(1))),
			bc.justifications(blocks[1]).unwrap()
		);
		assert_eq!(Some(vec![2.into()]), bc.body(blocks[2]).unwrap());
		assert_eq!(
			Some(Justifications::from(build_justification(2))),
			bc.justifications(blocks[2]).unwrap()
		);
		assert_eq!(Some(vec![3.into()]), bc.body(blocks[3]).unwrap());
		assert_eq!(
			Some(Justifications::from(build_justification(3))),
			bc.justifications(blocks[3]).unwrap()
		);
		assert_eq!(Some(vec![4.into()]), bc.body(blocks[4]).unwrap());
		assert_eq!(
			Some(Justifications::from(build_justification(4))),
			bc.justifications(blocks[4]).unwrap()
		);
		// Unpin all blocks. Values should be removed from cache.
		for block in &blocks {
			backend.unpin_block(*block);
		}
		assert!(bc.body(blocks[0]).unwrap().is_none());
		// Block 1 was pinned twice, we expect it to be still cached
		assert!(bc.body(blocks[1]).unwrap().is_some());
		assert!(bc.justifications(blocks[1]).unwrap().is_some());
		// Headers should also be available while pinned
		assert!(bc.header(blocks[1]).ok().flatten().is_some());
		assert!(bc.body(blocks[2]).unwrap().is_none());
		assert!(bc.justifications(blocks[2]).unwrap().is_none());
		assert!(bc.body(blocks[3]).unwrap().is_none());
		assert!(bc.justifications(blocks[3]).unwrap().is_none());
		// After these unpins, block 1 should also be removed
		backend.unpin_block(blocks[1]);
		assert!(bc.body(blocks[1]).unwrap().is_some());
		assert!(bc.justifications(blocks[1]).unwrap().is_some());
		backend.unpin_block(blocks[1]);
		assert!(bc.body(blocks[1]).unwrap().is_none());
		assert!(bc.justifications(blocks[1]).unwrap().is_none());
		// Block 4 is inside the pruning window and still kept
		assert_eq!(Some(vec![4.into()]), bc.body(blocks[4]).unwrap());
		assert_eq!(
			Some(Justifications::from(build_justification(4))),
			bc.justifications(blocks[4]).unwrap()
		);
		// Block tree:
		//   0 -> 1 -> 2 -> 3 -> 4 -> 5
		let hash =
			insert_block(&backend, 5, prev_hash, None, Default::default(), vec![5.into()], None)
				.unwrap();
		blocks.push(hash);
		backend.pin_block(blocks[4]).unwrap();
		// Mark block 5 as finalized.
		let mut op = backend.begin_operation().unwrap();
		backend.begin_state_operation(&mut op, blocks[5]).unwrap();
		op.mark_finalized(blocks[5], Some(build_justification(5))).unwrap();
		backend.commit_operation(op).unwrap();
		assert!(bc.body(blocks[0]).unwrap().is_none());
		assert!(bc.body(blocks[1]).unwrap().is_none());
		assert!(bc.body(blocks[2]).unwrap().is_none());
		assert!(bc.body(blocks[3]).unwrap().is_none());
		assert_eq!(Some(vec![4.into()]), bc.body(blocks[4]).unwrap());
		assert_eq!(
			Some(Justifications::from(build_justification(4))),
			bc.justifications(blocks[4]).unwrap()
		);
		assert_eq!(Some(vec![5.into()]), bc.body(blocks[5]).unwrap());
		assert!(bc.header(blocks[5]).ok().flatten().is_some());
		backend.unpin_block(blocks[4]);
		assert!(bc.body(blocks[4]).unwrap().is_none());
		assert!(bc.justifications(blocks[4]).unwrap().is_none());
		// Append a justification to block 5.
		backend.append_justification(blocks[5], ([0, 0, 0, 1], vec![42])).unwrap();
		let hash =
			insert_block(&backend, 6, blocks[5], None, Default::default(), vec![6.into()], None)
				.unwrap();
		blocks.push(hash);
		// Pin block 5 so it gets loaded into the cache on prune
		backend.pin_block(blocks[5]).unwrap();
		// Finalize block 6 so block 5 gets pruned. Since it is pinned both justifications should be
		// in memory.
		let mut op = backend.begin_operation().unwrap();
		backend.begin_state_operation(&mut op, blocks[6]).unwrap();
		op.mark_finalized(blocks[6], None).unwrap();
		backend.commit_operation(op).unwrap();
		assert_eq!(Some(vec![5.into()]), bc.body(blocks[5]).unwrap());
		assert!(bc.header(blocks[5]).ok().flatten().is_some());
		let mut expected = Justifications::from(build_justification(5));
		expected.append(([0, 0, 0, 1], vec![42]));
		assert_eq!(Some(expected), bc.justifications(blocks[5]).unwrap());
	}
	#[test]
	fn test_pinned_blocks_on_finalize_with_fork() {
		let backend = Backend::<Block>::new_test_with_tx_storage(BlocksPruning::Some(1), 10);
		let mut blocks = Vec::new();
		let mut prev_hash = Default::default();
		// Block tree:
		//   0 -> 1 -> 2 -> 3 -> 4
		for i in 0..5 {
			let hash = insert_block(
				&backend,
				i,
				prev_hash,
				None,
				Default::default(),
				vec![i.into()],
				None,
			)
			.unwrap();
			blocks.push(hash);
			// Avoid block pruning.
			backend.pin_block(blocks[i as usize]).unwrap();
			prev_hash = hash;
		}
		// Insert a fork at the second block.
		// Block tree:
		//   0 -> 1 -> 2 -> 3 -> 4
		//        \ -> 2 -> 3
		let fork_hash_root =
			insert_block(&backend, 2, blocks[1], None, H256::random(), vec![2.into()], None)
				.unwrap();
		let fork_hash_3 = insert_block(
			&backend,
			3,
			fork_hash_root,
			None,
			H256::random(),
			vec![3.into(), 11.into()],
			None,
		)
		.unwrap();
		// Do not prune the fork hash.
		backend.pin_block(fork_hash_3).unwrap();
		let mut op = backend.begin_operation().unwrap();
		backend.begin_state_operation(&mut op, blocks[4]).unwrap();
		op.mark_head(blocks[4]).unwrap();
		backend.commit_operation(op).unwrap();
		for i in 1..5 {
			let mut op = backend.begin_operation().unwrap();
			backend.begin_state_operation(&mut op, blocks[4]).unwrap();
			op.mark_finalized(blocks[i], None).unwrap();
			backend.commit_operation(op).unwrap();
		}
		let bc = backend.blockchain();
		assert_eq!(Some(vec![0.into()]), bc.body(blocks[0]).unwrap());
		assert_eq!(Some(vec![1.into()]), bc.body(blocks[1]).unwrap());
		assert_eq!(Some(vec![2.into()]), bc.body(blocks[2]).unwrap());
		assert_eq!(Some(vec![3.into()]), bc.body(blocks[3]).unwrap());
		assert_eq!(Some(vec![4.into()]), bc.body(blocks[4]).unwrap());
		// Check the fork hashes.
		assert_eq!(None, bc.body(fork_hash_root).unwrap());
		assert_eq!(Some(vec![3.into(), 11.into()]), bc.body(fork_hash_3).unwrap());
		// Unpin all blocks, except the forked one.
		for block in &blocks {
			backend.unpin_block(*block);
		}
		assert!(bc.body(blocks[0]).unwrap().is_none());
		assert!(bc.body(blocks[1]).unwrap().is_none());
		assert!(bc.body(blocks[2]).unwrap().is_none());
		assert!(bc.body(blocks[3]).unwrap().is_none());
		assert!(bc.body(fork_hash_3).unwrap().is_some());
		backend.unpin_block(fork_hash_3);
		assert!(bc.body(fork_hash_3).unwrap().is_none());
	}
}