1
// Copyright 2024 Moonbeam foundation
2
// This file is part of Moonbeam.
3

            
4
// Moonbeam is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Moonbeam is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Moonbeam.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
use sp_blockchain::{CachedHeaderMetadata, HeaderMetadata};
18
use sp_core::storage::well_known_keys;
19
use sp_runtime::{
20
	generic::BlockId,
21
	traits::{Block as BlockT, HashingFor, Header as HeaderT, NumberFor, Zero},
22
	Justification, Justifications, StateVersion, Storage,
23
};
24
use sp_state_machine::{
25
	BackendTransaction, ChildStorageCollection, IndexOperation, StorageCollection, TrieBackend,
26
};
27
use std::marker::PhantomData;
28
use std::time::Duration;
29
use std::{
30
	collections::{HashMap, HashSet},
31
	ptr,
32
	sync::Arc,
33
};
34

            
35
use sc_client_api::{
36
	backend::{self, NewBlockState},
37
	blockchain::{self, BlockStatus, HeaderBackend},
38
	leaves::LeafSet,
39
	UsageInfo,
40
};
41

            
42
use crate::lazy_loading;
43
use crate::lazy_loading::lock::ReadWriteLock;
44
use crate::lazy_loading::state_overrides::StateEntry;
45
use crate::lazy_loading::{helpers, state_overrides};
46
use moonbeam_cli_opt::LazyLoadingConfig;
47
use sc_client_api::StorageKey;
48
use sc_service::{Configuration, Error};
49
use serde::de::DeserializeOwned;
50
use sp_core::offchain::storage::InMemOffchainStorage;
51
use sp_core::{twox_128, H256};
52
use sp_storage::ChildInfo;
53
use sp_trie::PrefixedMemoryDB;
54

            
55
struct PendingBlock<B: BlockT> {
56
	block: StoredBlock<B>,
57
	state: NewBlockState,
58
}
59

            
60
#[derive(PartialEq, Eq, Clone)]
61
enum StoredBlock<B: BlockT> {
62
	Header(B::Header, Option<Justifications>),
63
	Full(B, Option<Justifications>),
64
}
65

            
66
impl<B: BlockT> StoredBlock<B> {
67
	fn new(
68
		header: B::Header,
69
		body: Option<Vec<B::Extrinsic>>,
70
		just: Option<Justifications>,
71
	) -> Self {
72
		match body {
73
			Some(body) => StoredBlock::Full(B::new(header, body), just),
74
			None => StoredBlock::Header(header, just),
75
		}
76
	}
77

            
78
	fn header(&self) -> &B::Header {
79
		match *self {
80
			StoredBlock::Header(ref h, _) => h,
81
			StoredBlock::Full(ref b, _) => b.header(),
82
		}
83
	}
84

            
85
	fn justifications(&self) -> Option<&Justifications> {
86
		match *self {
87
			StoredBlock::Header(_, ref j) | StoredBlock::Full(_, ref j) => j.as_ref(),
88
		}
89
	}
90

            
91
	fn extrinsics(&self) -> Option<&[B::Extrinsic]> {
92
		match *self {
93
			StoredBlock::Header(_, _) => None,
94
			StoredBlock::Full(ref b, _) => Some(b.extrinsics()),
95
		}
96
	}
97

            
98
	fn into_inner(self) -> (B::Header, Option<Vec<B::Extrinsic>>, Option<Justifications>) {
99
		match self {
100
			StoredBlock::Header(header, just) => (header, None, just),
101
			StoredBlock::Full(block, just) => {
102
				let (header, body) = block.deconstruct();
103
				(header, Some(body), just)
104
			}
105
		}
106
	}
107
}
108

            
109
#[derive(Clone)]
110
struct BlockchainStorage<Block: BlockT> {
111
	blocks: HashMap<Block::Hash, StoredBlock<Block>>,
112
	hashes: HashMap<NumberFor<Block>, Block::Hash>,
113
	best_hash: Block::Hash,
114
	best_number: NumberFor<Block>,
115
	finalized_hash: Block::Hash,
116
	finalized_number: NumberFor<Block>,
117
	genesis_hash: Block::Hash,
118
	header_cht_roots: HashMap<NumberFor<Block>, Block::Hash>,
119
	leaves: LeafSet<Block::Hash, NumberFor<Block>>,
120
	aux: HashMap<Vec<u8>, Vec<u8>>,
121
}
122

            
123
/// In-memory blockchain. Supports concurrent reads.
124
#[derive(Clone)]
125
pub struct Blockchain<Block: BlockT> {
126
	rpc_client: Arc<super::rpc_client::RPC>,
127
	storage: Arc<ReadWriteLock<BlockchainStorage<Block>>>,
128
}
129

            
130
impl<Block: BlockT + DeserializeOwned> Blockchain<Block> {
131
	/// Get header hash of given block.
132
	pub fn id(&self, id: BlockId<Block>) -> Option<Block::Hash> {
133
		match id {
134
			BlockId::Hash(h) => Some(h),
135
			BlockId::Number(n) => {
136
				let block_hash = self.storage.read().hashes.get(&n).cloned();
137
				match block_hash {
138
					None => {
139
						let block_hash =
140
							self.rpc_client.block_hash::<Block>(Some(n)).ok().flatten();
141

            
142
						block_hash.clone().map(|h| {
143
							self.storage.write().hashes.insert(n, h);
144
						});
145

            
146
						block_hash
147
					}
148
					block_hash => block_hash,
149
				}
150
			}
151
		}
152
	}
153

            
154
	/// Create new in-memory blockchain storage.
155
	fn new(rpc_client: Arc<super::rpc_client::RPC>) -> Blockchain<Block> {
156
		let storage = Arc::new(ReadWriteLock::new(BlockchainStorage {
157
			blocks: HashMap::new(),
158
			hashes: HashMap::new(),
159
			best_hash: Default::default(),
160
			best_number: Zero::zero(),
161
			finalized_hash: Default::default(),
162
			finalized_number: Zero::zero(),
163
			genesis_hash: Default::default(),
164
			header_cht_roots: HashMap::new(),
165
			leaves: LeafSet::new(),
166
			aux: HashMap::new(),
167
		}));
168
		Blockchain {
169
			rpc_client,
170
			storage,
171
		}
172
	}
173

            
174
	/// Insert a block header and associated data.
175
	pub fn insert(
176
		&self,
177
		hash: Block::Hash,
178
		header: <Block as BlockT>::Header,
179
		justifications: Option<Justifications>,
180
		body: Option<Vec<<Block as BlockT>::Extrinsic>>,
181
		new_state: NewBlockState,
182
	) -> sp_blockchain::Result<()> {
183
		let number = *header.number();
184
		if new_state.is_best() {
185
			self.apply_head(&header)?;
186
		}
187

            
188
		let mut storage = self.storage.write();
189
		if number.is_zero() {
190
			storage.genesis_hash = hash;
191
		} else {
192
			storage.leaves.import(hash, number, *header.parent_hash());
193
			storage
194
				.blocks
195
				.insert(hash, StoredBlock::new(header, body, justifications));
196

            
197
			if let NewBlockState::Final = new_state {
198
				storage.finalized_hash = hash;
199
				storage.finalized_number = number;
200
			}
201
		}
202

            
203
		Ok(())
204
	}
205

            
206
	/// Get total number of blocks.
207
	pub fn blocks_count(&self) -> usize {
208
		let count = self.storage.read().blocks.len();
209

            
210
		log::debug!(
211
			target: super::LAZY_LOADING_LOG_TARGET,
212
			"Total number of blocks: {:?}",
213
			count
214
		);
215

            
216
		count
217
	}
218

            
219
	/// Compare this blockchain with another in-mem blockchain
220
	pub fn equals_to(&self, other: &Self) -> bool {
221
		// Check ptr equality first to avoid double read locks.
222
		if ptr::eq(self, other) {
223
			return true;
224
		}
225
		self.canon_equals_to(other) && self.storage.read().blocks == other.storage.read().blocks
226
	}
227

            
228
	/// Compare canonical chain to other canonical chain.
229
	pub fn canon_equals_to(&self, other: &Self) -> bool {
230
		// Check ptr equality first to avoid double read locks.
231
		if ptr::eq(self, other) {
232
			return true;
233
		}
234
		let this = self.storage.read();
235
		let other = other.storage.read();
236
		this.hashes == other.hashes
237
			&& this.best_hash == other.best_hash
238
			&& this.best_number == other.best_number
239
			&& this.genesis_hash == other.genesis_hash
240
	}
241

            
242
	/// Insert header CHT root.
243
	pub fn insert_cht_root(&self, block: NumberFor<Block>, cht_root: Block::Hash) {
244
		self.storage
245
			.write()
246
			.header_cht_roots
247
			.insert(block, cht_root);
248
	}
249

            
250
	/// Set an existing block as head.
251
	pub fn set_head(&self, hash: Block::Hash) -> sp_blockchain::Result<()> {
252
		let header = self
253
			.header(hash)?
254
			.ok_or_else(|| sp_blockchain::Error::UnknownBlock(format!("{}", hash)))?;
255

            
256
		self.apply_head(&header)
257
	}
258

            
259
	fn apply_head(&self, header: &<Block as BlockT>::Header) -> sp_blockchain::Result<()> {
260
		let mut storage = self.storage.write();
261

            
262
		let hash = header.hash();
263
		let number = header.number();
264

            
265
		storage.best_hash = hash;
266
		storage.best_number = *number;
267
		storage.hashes.insert(*number, hash);
268

            
269
		Ok(())
270
	}
271

            
272
	fn finalize_header(
273
		&self,
274
		block: Block::Hash,
275
		justification: Option<Justification>,
276
	) -> sp_blockchain::Result<()> {
277
		let mut storage = self.storage.write();
278
		storage.finalized_hash = block;
279

            
280
		if justification.is_some() {
281
			let block = storage
282
				.blocks
283
				.get_mut(&block)
284
				.expect("hash was fetched from a block in the db; qed");
285

            
286
			let block_justifications = match block {
287
				StoredBlock::Header(_, ref mut j) | StoredBlock::Full(_, ref mut j) => j,
288
			};
289

            
290
			*block_justifications = justification.map(Justifications::from);
291
		}
292

            
293
		Ok(())
294
	}
295

            
296
	fn append_justification(
297
		&self,
298
		hash: Block::Hash,
299
		justification: Justification,
300
	) -> sp_blockchain::Result<()> {
301
		let mut storage = self.storage.write();
302

            
303
		let block = storage
304
			.blocks
305
			.get_mut(&hash)
306
			.expect("hash was fetched from a block in the db; qed");
307

            
308
		let block_justifications = match block {
309
			StoredBlock::Header(_, ref mut j) | StoredBlock::Full(_, ref mut j) => j,
310
		};
311

            
312
		if let Some(stored_justifications) = block_justifications {
313
			if !stored_justifications.append(justification) {
314
				return Err(sp_blockchain::Error::BadJustification(
315
					"Duplicate consensus engine ID".into(),
316
				));
317
			}
318
		} else {
319
			*block_justifications = Some(Justifications::from(justification));
320
		};
321

            
322
		Ok(())
323
	}
324

            
325
	fn write_aux(&self, ops: Vec<(Vec<u8>, Option<Vec<u8>>)>) {
326
		let mut storage = self.storage.write();
327
		for (k, v) in ops {
328
			match v {
329
				Some(v) => storage.aux.insert(k, v),
330
				None => storage.aux.remove(&k),
331
			};
332
		}
333
	}
334
}
335

            
336
impl<Block: BlockT + DeserializeOwned> HeaderBackend<Block> for Blockchain<Block> {
337
	fn header(
338
		&self,
339
		hash: Block::Hash,
340
	) -> sp_blockchain::Result<Option<<Block as BlockT>::Header>> {
341
		// First, try to get the header from local storage
342
		if let Some(header) = self
343
			.storage
344
			.read()
345
			.blocks
346
			.get(&hash)
347
			.map(|b| b.header().clone())
348
		{
349
			return Ok(Some(header));
350
		}
351

            
352
		// If not found in local storage, fetch from RPC client
353
		let header = self
354
			.rpc_client
355
			.block::<Block, _>(Some(hash))
356
			.ok()
357
			.flatten()
358
			.map(|full_block| {
359
				// Cache block header
360
				let block = full_block.block.clone();
361
				self.storage.write().blocks.insert(
362
					hash,
363
					StoredBlock::Full(block.clone(), full_block.justifications),
364
				);
365

            
366
				block.header().clone()
367
			});
368

            
369
		if header.is_none() {
370
			log::warn!(
371
				target: super::LAZY_LOADING_LOG_TARGET,
372
				"Expected block {:x?} to exist.",
373
				&hash
374
			);
375
		}
376

            
377
		Ok(header)
378
	}
379

            
380
	fn info(&self) -> blockchain::Info<Block> {
381
		let storage = self.storage.read();
382
		blockchain::Info {
383
			best_hash: storage.best_hash,
384
			best_number: storage.best_number,
385
			genesis_hash: storage.genesis_hash,
386
			finalized_hash: storage.finalized_hash,
387
			finalized_number: storage.finalized_number,
388
			finalized_state: Some((storage.finalized_hash, storage.finalized_number)),
389
			number_leaves: storage.leaves.count(),
390
			block_gap: None,
391
		}
392
	}
393

            
394
	fn status(&self, hash: Block::Hash) -> sp_blockchain::Result<BlockStatus> {
395
		match self.storage.read().blocks.contains_key(&hash) {
396
			true => Ok(BlockStatus::InChain),
397
			false => Ok(BlockStatus::Unknown),
398
		}
399
	}
400

            
401
	fn number(&self, hash: Block::Hash) -> sp_blockchain::Result<Option<NumberFor<Block>>> {
402
		let number = match self.storage.read().blocks.get(&hash) {
403
			Some(block) => *block.header().number(),
404
			_ => match self.rpc_client.block::<Block, _>(Some(hash)) {
405
				Ok(Some(block)) => *block.block.header().number(),
406
				err => {
407
					return Err(sp_blockchain::Error::UnknownBlock(
408
						format!("Failed to fetch block number from RPC: {:?}", err).into(),
409
					));
410
				}
411
			},
412
		};
413

            
414
		Ok(Some(number))
415
	}
416

            
417
	fn hash(
418
		&self,
419
		number: <<Block as BlockT>::Header as HeaderT>::Number,
420
	) -> sp_blockchain::Result<Option<Block::Hash>> {
421
		Ok(self.id(BlockId::Number(number)))
422
	}
423
}
424

            
425
impl<Block: BlockT + DeserializeOwned> HeaderMetadata<Block> for Blockchain<Block> {
426
	type Error = sp_blockchain::Error;
427

            
428
	fn header_metadata(
429
		&self,
430
		hash: Block::Hash,
431
	) -> Result<CachedHeaderMetadata<Block>, Self::Error> {
432
		self.header(hash)?
433
			.map(|header| CachedHeaderMetadata::from(&header))
434
			.ok_or_else(|| {
435
				sp_blockchain::Error::UnknownBlock(format!("header not found: {}", hash))
436
			})
437
	}
438

            
439
	fn insert_header_metadata(&self, _hash: Block::Hash, _metadata: CachedHeaderMetadata<Block>) {
440
		// No need to implement.
441
		unimplemented!("insert_header_metadata")
442
	}
443
	fn remove_header_metadata(&self, _hash: Block::Hash) {
444
		// No need to implement.
445
		unimplemented!("remove_header_metadata")
446
	}
447
}
448

            
449
impl<Block: BlockT + DeserializeOwned> blockchain::Backend<Block> for Blockchain<Block> {
450
	fn body(
451
		&self,
452
		hash: Block::Hash,
453
	) -> sp_blockchain::Result<Option<Vec<<Block as BlockT>::Extrinsic>>> {
454
		// First, try to get the header from local storage
455
		if let Some(extrinsics) = self
456
			.storage
457
			.read()
458
			.blocks
459
			.get(&hash)
460
			.and_then(|b| b.extrinsics().map(|x| x.to_vec()))
461
		{
462
			return Ok(Some(extrinsics));
463
		}
464
		let extrinsics = self
465
			.rpc_client
466
			.block::<Block, Block::Hash>(Some(hash))
467
			.ok()
468
			.flatten()
469
			.map(|b| b.block.extrinsics().to_vec());
470

            
471
		Ok(extrinsics)
472
	}
473

            
474
	fn justifications(&self, hash: Block::Hash) -> sp_blockchain::Result<Option<Justifications>> {
475
		Ok(self
476
			.storage
477
			.read()
478
			.blocks
479
			.get(&hash)
480
			.and_then(|b| b.justifications().cloned()))
481
	}
482

            
483
	fn last_finalized(&self) -> sp_blockchain::Result<Block::Hash> {
484
		let last_finalized = self.storage.read().finalized_hash;
485

            
486
		Ok(last_finalized)
487
	}
488

            
489
	fn leaves(&self) -> sp_blockchain::Result<Vec<Block::Hash>> {
490
		let leaves = self.storage.read().leaves.hashes();
491

            
492
		Ok(leaves)
493
	}
494

            
495
	fn children(&self, _parent_hash: Block::Hash) -> sp_blockchain::Result<Vec<Block::Hash>> {
496
		unimplemented!("Not supported by the `lazy-loading` backend.")
497
	}
498

            
499
	fn indexed_transaction(&self, _hash: Block::Hash) -> sp_blockchain::Result<Option<Vec<u8>>> {
500
		unimplemented!("Not supported by the `lazy-loading` backend.")
501
	}
502

            
503
	fn block_indexed_body(
504
		&self,
505
		_hash: Block::Hash,
506
	) -> sp_blockchain::Result<Option<Vec<Vec<u8>>>> {
507
		unimplemented!("Not supported by the `lazy-loading` backend.")
508
	}
509
}
510

            
511
impl<Block: BlockT + DeserializeOwned> backend::AuxStore for Blockchain<Block> {
512
	fn insert_aux<
513
		'a,
514
		'b: 'a,
515
		'c: 'a,
516
		I: IntoIterator<Item = &'a (&'c [u8], &'c [u8])>,
517
		D: IntoIterator<Item = &'a &'b [u8]>,
518
	>(
519
		&self,
520
		insert: I,
521
		delete: D,
522
	) -> sp_blockchain::Result<()> {
523
		let mut storage = self.storage.write();
524
		for (k, v) in insert {
525
			storage.aux.insert(k.to_vec(), v.to_vec());
526
		}
527
		for k in delete {
528
			storage.aux.remove(*k);
529
		}
530
		Ok(())
531
	}
532

            
533
	fn get_aux(&self, key: &[u8]) -> sp_blockchain::Result<Option<Vec<u8>>> {
534
		Ok(self.storage.read().aux.get(key).cloned())
535
	}
536
}
537

            
538
pub struct BlockImportOperation<Block: BlockT> {
539
	pending_block: Option<PendingBlock<Block>>,
540
	old_state: ForkedLazyBackend<Block>,
541
	new_state: Option<BackendTransaction<HashingFor<Block>>>,
542
	aux: Vec<(Vec<u8>, Option<Vec<u8>>)>,
543
	storage_updates: StorageCollection,
544
	finalized_blocks: Vec<(Block::Hash, Option<Justification>)>,
545
	set_head: Option<Block::Hash>,
546
	pub(crate) before_fork: bool,
547
}
548

            
549
impl<Block: BlockT + DeserializeOwned> BlockImportOperation<Block> {
550
	fn apply_storage(
551
		&mut self,
552
		storage: Storage,
553
		commit: bool,
554
		state_version: StateVersion,
555
	) -> sp_blockchain::Result<Block::Hash> {
556
		use sp_state_machine::Backend;
557
		check_genesis_storage(&storage)?;
558

            
559
		let child_delta = storage.children_default.values().map(|child_content| {
560
			(
561
				&child_content.child_info,
562
				child_content
563
					.data
564
					.iter()
565
					.map(|(k, v)| (k.as_ref(), Some(v.as_ref()))),
566
			)
567
		});
568

            
569
		let (root, transaction) = self.old_state.full_storage_root(
570
			storage
571
				.top
572
				.iter()
573
				.map(|(k, v)| (k.as_ref(), Some(v.as_ref()))),
574
			child_delta,
575
			state_version,
576
		);
577

            
578
		if commit {
579
			self.new_state = Some(transaction);
580
			self.storage_updates = storage
581
				.top
582
				.iter()
583
				.map(|(k, v)| {
584
					if v.is_empty() {
585
						(k.clone(), None)
586
					} else {
587
						(k.clone(), Some(v.clone()))
588
					}
589
				})
590
				.collect();
591
		}
592
		Ok(root)
593
	}
594
}
595

            
596
impl<Block: BlockT + DeserializeOwned> backend::BlockImportOperation<Block>
597
	for BlockImportOperation<Block>
598
{
599
	type State = ForkedLazyBackend<Block>;
600

            
601
	fn state(&self) -> sp_blockchain::Result<Option<&Self::State>> {
602
		Ok(Some(&self.old_state))
603
	}
604

            
605
	fn set_block_data(
606
		&mut self,
607
		header: <Block as BlockT>::Header,
608
		body: Option<Vec<<Block as BlockT>::Extrinsic>>,
609
		_indexed_body: Option<Vec<Vec<u8>>>,
610
		justifications: Option<Justifications>,
611
		state: NewBlockState,
612
	) -> sp_blockchain::Result<()> {
613
		assert!(
614
			self.pending_block.is_none(),
615
			"Only one block per operation is allowed"
616
		);
617
		self.pending_block = Some(PendingBlock {
618
			block: StoredBlock::new(header, body, justifications),
619
			state,
620
		});
621
		Ok(())
622
	}
623

            
624
	fn update_db_storage(
625
		&mut self,
626
		update: BackendTransaction<HashingFor<Block>>,
627
	) -> sp_blockchain::Result<()> {
628
		self.new_state = Some(update);
629
		Ok(())
630
	}
631

            
632
	fn set_genesis_state(
633
		&mut self,
634
		storage: Storage,
635
		commit: bool,
636
		state_version: StateVersion,
637
	) -> sp_blockchain::Result<Block::Hash> {
638
		self.apply_storage(storage, commit, state_version)
639
	}
640

            
641
	fn reset_storage(
642
		&mut self,
643
		storage: Storage,
644
		state_version: StateVersion,
645
	) -> sp_blockchain::Result<Block::Hash> {
646
		self.apply_storage(storage, true, state_version)
647
	}
648

            
649
	fn insert_aux<I>(&mut self, ops: I) -> sp_blockchain::Result<()>
650
	where
651
		I: IntoIterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
652
	{
653
		self.aux.append(&mut ops.into_iter().collect());
654
		Ok(())
655
	}
656

            
657
	fn update_storage(
658
		&mut self,
659
		update: StorageCollection,
660
		_child_update: ChildStorageCollection,
661
	) -> sp_blockchain::Result<()> {
662
		self.storage_updates = update.clone();
663
		Ok(())
664
	}
665

            
666
	fn mark_finalized(
667
		&mut self,
668
		hash: Block::Hash,
669
		justification: Option<Justification>,
670
	) -> sp_blockchain::Result<()> {
671
		self.finalized_blocks.push((hash, justification));
672
		Ok(())
673
	}
674

            
675
	fn mark_head(&mut self, hash: Block::Hash) -> sp_blockchain::Result<()> {
676
		assert!(
677
			self.pending_block.is_none(),
678
			"Only one set block per operation is allowed"
679
		);
680
		self.set_head = Some(hash);
681
		Ok(())
682
	}
683

            
684
	fn update_transaction_index(
685
		&mut self,
686
		_index: Vec<IndexOperation>,
687
	) -> sp_blockchain::Result<()> {
688
		Ok(())
689
	}
690

            
691
	fn set_create_gap(&mut self, _create_gap: bool) {
692
		// This implementation can be left empty or implemented as needed
693
		// For now, we're just implementing the trait method with no functionality
694
	}
695
}
696

            
697
/// DB-backed patricia trie state, transaction type is an overlay of changes to commit.
698
pub type DbState<B> = TrieBackend<Arc<dyn sp_state_machine::Storage<HashingFor<B>>>, HashingFor<B>>;
699

            
700
/// A struct containing arguments for iterating over the storage.
701
#[derive(Default)]
702
pub struct RawIterArgs {
703
	/// The prefix of the keys over which to iterate.
704
	pub prefix: Option<Vec<u8>>,
705

            
706
	/// The prefix from which to start the iteration from.
707
	///
708
	/// This is inclusive and the iteration will include the key which is specified here.
709
	pub start_at: Option<Vec<u8>>,
710

            
711
	/// If this is `true` then the iteration will *not* include
712
	/// the key specified in `start_at`, if there is such a key.
713
	pub start_at_exclusive: bool,
714
}
715

            
716
/// A raw iterator over the `BenchmarkingState`.
717
pub struct RawIter<Block: BlockT> {
718
	pub(crate) args: RawIterArgs,
719
	complete: bool,
720
	_phantom: PhantomData<Block>,
721
}
722

            
723
impl<Block: BlockT + DeserializeOwned> sp_state_machine::StorageIterator<HashingFor<Block>>
724
	for RawIter<Block>
725
{
726
	type Backend = ForkedLazyBackend<Block>;
727
	type Error = String;
728

            
729
	fn next_key(
730
		&mut self,
731
		backend: &Self::Backend,
732
	) -> Option<Result<sp_state_machine::StorageKey, Self::Error>> {
733
		use sp_state_machine::Backend;
734

            
735
		let remote_fetch =
736
			|key: Option<StorageKey>, start_key: Option<StorageKey>, block: Option<Block::Hash>| {
737
				let result = backend
738
					.rpc_client
739
					.storage_keys_paged(key, 5, start_key, block);
740

            
741
				match result {
742
					Ok(keys) => keys.first().map(|key| key.clone()),
743
					Err(err) => {
744
						log::trace!(
745
							target: super::LAZY_LOADING_LOG_TARGET,
746
							"Failed to fetch `next key` from RPC: {:?}",
747
							err
748
						);
749

            
750
						None
751
					}
752
				}
753
			};
754

            
755
		let prefix = self.args.prefix.clone().map(|k| StorageKey(k));
756
		let start_key = self.args.start_at.clone().map(|k| StorageKey(k));
757

            
758
		let maybe_next_key = if backend.before_fork {
759
			remote_fetch(prefix, start_key, backend.block_hash)
760
		} else {
761
			let mut iter_args = sp_state_machine::backend::IterArgs::default();
762
			iter_args.prefix = self.args.prefix.as_deref();
763
			iter_args.start_at = self.args.start_at.as_deref();
764
			iter_args.start_at_exclusive = true;
765
			iter_args.stop_on_incomplete_database = true;
766

            
767
			let readable_db = backend.db.read();
768
			let next_storage_key = readable_db
769
				.raw_iter(iter_args)
770
				.map(|mut iter| iter.next_key(&readable_db))
771
				.map(|op| op.and_then(|result| result.ok()))
772
				.ok()
773
				.flatten();
774

            
775
			// IMPORTANT: free storage read lock
776
			drop(readable_db);
777

            
778
			let removed_key = start_key
779
				.clone()
780
				.or(prefix.clone())
781
				.map(|key| backend.removed_keys.read().contains_key(&key.0))
782
				.unwrap_or(false);
783
			if next_storage_key.is_none() && !removed_key {
784
				let maybe_next_key = remote_fetch(prefix, start_key, Some(backend.fork_block));
785
				match maybe_next_key {
786
					Some(key) if !backend.removed_keys.read().contains_key(&key) => Some(key),
787
					_ => None,
788
				}
789
			} else {
790
				next_storage_key
791
			}
792
		};
793

            
794
		log::trace!(
795
			target: super::LAZY_LOADING_LOG_TARGET,
796
			"next_key: (prefix: {:?}, start_at: {:?}, next_key: {:?})",
797
			self.args.prefix.clone().map(|key| hex::encode(key)),
798
			self.args.start_at.clone().map(|key| hex::encode(key)),
799
			maybe_next_key.clone().map(|key| hex::encode(key))
800
		);
801

            
802
		if let Some(next_key) = maybe_next_key {
803
			if self
804
				.args
805
				.prefix
806
				.clone()
807
				.map(|filter_key| next_key.starts_with(&filter_key))
808
				.unwrap_or(false)
809
			{
810
				self.args.start_at = Some(next_key.clone());
811
				Some(Ok(next_key))
812
			} else {
813
				self.complete = true;
814
				None
815
			}
816
		} else {
817
			self.complete = true;
818
			None
819
		}
820
	}
821

            
822
	fn next_pair(
823
		&mut self,
824
		backend: &Self::Backend,
825
	) -> Option<Result<(sp_state_machine::StorageKey, sp_state_machine::StorageValue), Self::Error>>
826
	{
827
		use sp_state_machine::Backend;
828

            
829
		let remote_fetch =
830
			|key: Option<StorageKey>, start_key: Option<StorageKey>, block: Option<Block::Hash>| {
831
				let result = backend
832
					.rpc_client
833
					.storage_keys_paged(key, 5, start_key, block);
834

            
835
				match result {
836
					Ok(keys) => keys.first().map(|key| key.clone()),
837
					Err(err) => {
838
						log::trace!(
839
							target: super::LAZY_LOADING_LOG_TARGET,
840
							"Failed to fetch `next key` from RPC: {:?}",
841
							err
842
						);
843

            
844
						None
845
					}
846
				}
847
			};
848

            
849
		let prefix = self.args.prefix.clone().map(|k| StorageKey(k));
850
		let start_key = self.args.start_at.clone().map(|k| StorageKey(k));
851

            
852
		let maybe_next_key = if backend.before_fork {
853
			remote_fetch(prefix, start_key, backend.block_hash)
854
		} else {
855
			let mut iter_args = sp_state_machine::backend::IterArgs::default();
856
			iter_args.prefix = self.args.prefix.as_deref();
857
			iter_args.start_at = self.args.start_at.as_deref();
858
			iter_args.start_at_exclusive = true;
859
			iter_args.stop_on_incomplete_database = true;
860

            
861
			let readable_db = backend.db.read();
862
			let next_storage_key = readable_db
863
				.raw_iter(iter_args)
864
				.map(|mut iter| iter.next_key(&readable_db))
865
				.map(|op| op.and_then(|result| result.ok()))
866
				.ok()
867
				.flatten();
868

            
869
			// IMPORTANT: free storage read lock
870
			drop(readable_db);
871

            
872
			let removed_key = start_key
873
				.clone()
874
				.or(prefix.clone())
875
				.map(|key| backend.removed_keys.read().contains_key(&key.0))
876
				.unwrap_or(false);
877
			if next_storage_key.is_none() && !removed_key {
878
				let maybe_next_key = remote_fetch(prefix, start_key, Some(backend.fork_block));
879
				match maybe_next_key {
880
					Some(key) if !backend.removed_keys.read().contains_key(&key) => Some(key),
881
					_ => None,
882
				}
883
			} else {
884
				next_storage_key
885
			}
886
		};
887

            
888
		log::trace!(
889
			target: super::LAZY_LOADING_LOG_TARGET,
890
			"next_pair: (prefix: {:?}, start_at: {:?}, next_key: {:?})",
891
			self.args.prefix.clone().map(|key| hex::encode(key)),
892
			self.args.start_at.clone().map(|key| hex::encode(key)),
893
			maybe_next_key.clone().map(|key| hex::encode(key))
894
		);
895

            
896
		let maybe_value = maybe_next_key
897
			.clone()
898
			.and_then(|key| (*backend).storage(key.as_slice()).ok())
899
			.flatten();
900

            
901
		if let Some(next_key) = maybe_next_key {
902
			if self
903
				.args
904
				.prefix
905
				.clone()
906
				.map(|filter_key| next_key.starts_with(&filter_key))
907
				.unwrap_or(false)
908
			{
909
				self.args.start_at = Some(next_key.clone());
910

            
911
				match maybe_value {
912
					Some(value) => Some(Ok((next_key, value))),
913
					_ => None,
914
				}
915
			} else {
916
				self.complete = true;
917
				None
918
			}
919
		} else {
920
			self.complete = true;
921
			None
922
		}
923
	}
924

            
925
	fn was_complete(&self) -> bool {
926
		self.complete
927
	}
928
}
929

            
930
#[derive(Debug, Clone)]
931
pub struct ForkedLazyBackend<Block: BlockT> {
932
	rpc_client: Arc<super::rpc_client::RPC>,
933
	block_hash: Option<Block::Hash>,
934
	fork_block: Block::Hash,
935
	pub(crate) db: Arc<ReadWriteLock<sp_state_machine::InMemoryBackend<HashingFor<Block>>>>,
936
	pub(crate) removed_keys: Arc<ReadWriteLock<HashMap<Vec<u8>, ()>>>,
937
	before_fork: bool,
938
}
939

            
940
impl<Block: BlockT> ForkedLazyBackend<Block> {
941
	fn update_storage(&self, key: &[u8], value: &Option<Vec<u8>>) {
942
		if let Some(ref val) = value {
943
			let mut entries: HashMap<Option<ChildInfo>, StorageCollection> = Default::default();
944
			entries.insert(None, vec![(key.to_vec(), Some(val.clone()))]);
945

            
946
			self.db.write().insert(entries, StateVersion::V1);
947
		}
948
	}
949
}
950

            
951
impl<Block: BlockT + DeserializeOwned> sp_state_machine::Backend<HashingFor<Block>>
952
	for ForkedLazyBackend<Block>
953
{
954
	type Error = <DbState<Block> as sp_state_machine::Backend<HashingFor<Block>>>::Error;
955
	type TrieBackendStorage = PrefixedMemoryDB<HashingFor<Block>>;
956
	type RawIter = RawIter<Block>;
957

            
958
	fn storage(&self, key: &[u8]) -> Result<Option<sp_state_machine::StorageValue>, Self::Error> {
959
		let remote_fetch = |block: Option<Block::Hash>| {
960
			let result = self.rpc_client.storage(StorageKey(key.to_vec()), block);
961

            
962
			match result {
963
				Ok(data) => data.map(|v| v.0),
964
				Err(err) => {
965
					log::debug!(
966
						target: super::LAZY_LOADING_LOG_TARGET,
967
						"Failed to fetch storage from live network: {:?}",
968
						err
969
					);
970
					None
971
				}
972
			}
973
		};
974

            
975
		if self.before_fork {
976
			return Ok(remote_fetch(self.block_hash));
977
		}
978

            
979
		let readable_db = self.db.read();
980
		let maybe_storage = readable_db.storage(key);
981
		let value = match maybe_storage {
982
			Ok(Some(data)) => Some(data),
983
			_ if !self.removed_keys.read().contains_key(key) => {
984
				let result = remote_fetch(Some(self.fork_block));
985

            
986
				// Cache state
987
				drop(readable_db);
988
				self.update_storage(key, &result);
989

            
990
				result
991
			}
992
			_ => None,
993
		};
994

            
995
		Ok(value)
996
	}
997

            
998
	fn storage_hash(
999
		&self,
		key: &[u8],
	) -> Result<Option<<HashingFor<Block> as sp_core::Hasher>::Out>, Self::Error> {
		let remote_fetch = |block: Option<Block::Hash>| {
			let result = self
				.rpc_client
				.storage_hash(StorageKey(key.to_vec()), block);
			match result {
				Ok(hash) => Ok(hash),
				Err(err) => Err(format!("Failed to fetch storage hash from RPC: {:?}", err).into()),
			}
		};
		if self.before_fork {
			return remote_fetch(self.block_hash);
		}
		let storage_hash = self.db.read().storage_hash(key);
		match storage_hash {
			Ok(Some(hash)) => Ok(Some(hash)),
			_ if !self.removed_keys.read().contains_key(key) => remote_fetch(Some(self.fork_block)),
			_ => Ok(None),
		}
	}
	fn closest_merkle_value(
		&self,
		_key: &[u8],
	) -> Result<
		Option<sp_trie::MerkleValue<<HashingFor<Block> as sp_core::Hasher>::Out>>,
		Self::Error,
	> {
		unimplemented!("closest_merkle_value: unsupported feature for lazy loading")
	}
	fn child_closest_merkle_value(
		&self,
		_child_info: &sp_storage::ChildInfo,
		_key: &[u8],
	) -> Result<
		Option<sp_trie::MerkleValue<<HashingFor<Block> as sp_core::Hasher>::Out>>,
		Self::Error,
	> {
		unimplemented!("child_closest_merkle_value: unsupported feature for lazy loading")
	}
	fn child_storage(
		&self,
		_child_info: &sp_storage::ChildInfo,
		_key: &[u8],
	) -> Result<Option<sp_state_machine::StorageValue>, Self::Error> {
		unimplemented!("child_storage: unsupported feature for lazy loading");
	}
	fn child_storage_hash(
		&self,
		_child_info: &sp_storage::ChildInfo,
		_key: &[u8],
	) -> Result<Option<<HashingFor<Block> as sp_core::Hasher>::Out>, Self::Error> {
		unimplemented!("child_storage_hash: unsupported feature for lazy loading");
	}
	fn next_storage_key(
		&self,
		key: &[u8],
	) -> Result<Option<sp_state_machine::StorageKey>, Self::Error> {
		let remote_fetch = |block: Option<Block::Hash>| {
			let start_key = Some(StorageKey(key.to_vec()));
			let result = self
				.rpc_client
				.storage_keys_paged(start_key.clone(), 2, None, block);
			match result {
				Ok(keys) => keys.last().cloned(),
				Err(err) => {
					log::trace!(
						target: super::LAZY_LOADING_LOG_TARGET,
						"Failed to fetch `next storage key` from RPC: {:?}",
						err
					);
					None
				}
			}
		};
		let maybe_next_key = if self.before_fork {
			// Before the fork checkpoint, always fetch remotely
			remote_fetch(self.block_hash)
		} else {
			// Try to get the next storage key from the local DB
			let next_storage_key = self.db.read().next_storage_key(key);
			match next_storage_key {
				Ok(Some(next_key)) => Some(next_key),
				// If not found locally and key is not marked as removed, fetch remotely
				_ if !self.removed_keys.read().contains_key(key) => {
					remote_fetch(Some(self.fork_block))
				}
				// Otherwise, there's no next key
				_ => None,
			}
		}
		.filter(|next_key| next_key != key);
		log::trace!(
			target: super::LAZY_LOADING_LOG_TARGET,
			"next_storage_key: (key: {:?}, next_key: {:?})",
			hex::encode(key),
			maybe_next_key.clone().map(|key| hex::encode(key))
		);
		Ok(maybe_next_key)
	}
	fn next_child_storage_key(
		&self,
		_child_info: &sp_storage::ChildInfo,
		_key: &[u8],
	) -> Result<Option<sp_state_machine::StorageKey>, Self::Error> {
		unimplemented!("next_child_storage_key: unsupported feature for lazy loading");
	}
	fn storage_root<'a>(
		&self,
		delta: impl Iterator<Item = (&'a [u8], Option<&'a [u8]>)>,
		state_version: StateVersion,
	) -> (
		<HashingFor<Block> as sp_core::Hasher>::Out,
		BackendTransaction<HashingFor<Block>>,
	)
	where
		<HashingFor<Block> as sp_core::Hasher>::Out: Ord,
	{
		self.db.read().storage_root(delta, state_version)
	}
	fn child_storage_root<'a>(
		&self,
		_child_info: &sp_storage::ChildInfo,
		_delta: impl Iterator<Item = (&'a [u8], Option<&'a [u8]>)>,
		_state_version: StateVersion,
	) -> (
		<HashingFor<Block> as sp_core::Hasher>::Out,
		bool,
		BackendTransaction<HashingFor<Block>>,
	)
	where
		<HashingFor<Block> as sp_core::Hasher>::Out: Ord,
	{
		unimplemented!("child_storage_root: unsupported in lazy loading")
	}
	fn raw_iter(&self, args: sp_state_machine::IterArgs) -> Result<Self::RawIter, Self::Error> {
		let mut clone: RawIterArgs = Default::default();
		clone.start_at_exclusive = args.start_at_exclusive.clone();
		clone.prefix = args.prefix.map(|v| v.to_vec());
		clone.start_at = args.start_at.map(|v| v.to_vec());
		Ok(RawIter::<Block> {
			args: clone,
			complete: false,
			_phantom: Default::default(),
		})
	}
	fn register_overlay_stats(&self, stats: &sp_state_machine::StateMachineStats) {
		self.db.read().register_overlay_stats(stats)
	}
	fn usage_info(&self) -> sp_state_machine::UsageInfo {
		self.db.read().usage_info()
	}
}
impl<B: BlockT> sp_state_machine::backend::AsTrieBackend<HashingFor<B>> for ForkedLazyBackend<B> {
	type TrieBackendStorage = PrefixedMemoryDB<HashingFor<B>>;
	fn as_trie_backend(
		&self,
	) -> &sp_state_machine::TrieBackend<Self::TrieBackendStorage, HashingFor<B>> {
		unimplemented!("`as_trie_backend` is not supported in lazy loading mode.")
	}
}
/// Lazy loading (In-memory) backend. Keeps all states and blocks in memory.
pub struct Backend<Block: BlockT> {
	pub(crate) rpc_client: Arc<super::rpc_client::RPC>,
	states: ReadWriteLock<HashMap<Block::Hash, ForkedLazyBackend<Block>>>,
	pub(crate) blockchain: Blockchain<Block>,
	import_lock: parking_lot::RwLock<()>,
	pinned_blocks: ReadWriteLock<HashMap<Block::Hash, i64>>,
	pub(crate) fork_checkpoint: Block::Header,
}
impl<Block: BlockT + DeserializeOwned> Backend<Block> {
	fn new(rpc_client: Arc<super::rpc_client::RPC>, fork_checkpoint: Block::Header) -> Self {
		Backend {
			rpc_client: rpc_client.clone(),
			states: Default::default(),
			blockchain: Blockchain::new(rpc_client),
			import_lock: Default::default(),
			pinned_blocks: Default::default(),
			fork_checkpoint,
		}
	}
}
impl<Block: BlockT + DeserializeOwned> backend::AuxStore for Backend<Block> {
	fn insert_aux<
		'a,
		'b: 'a,
		'c: 'a,
		I: IntoIterator<Item = &'a (&'c [u8], &'c [u8])>,
		D: IntoIterator<Item = &'a &'b [u8]>,
	>(
		&self,
		_insert: I,
		_delete: D,
	) -> sp_blockchain::Result<()> {
		unimplemented!("`insert_aux` is not supported in lazy loading mode.")
	}
	fn get_aux(&self, _key: &[u8]) -> sp_blockchain::Result<Option<Vec<u8>>> {
		unimplemented!("`get_aux` is not supported in lazy loading mode.")
	}
}
impl<Block: BlockT + DeserializeOwned> backend::Backend<Block> for Backend<Block> {
	type BlockImportOperation = BlockImportOperation<Block>;
	type Blockchain = Blockchain<Block>;
	type State = ForkedLazyBackend<Block>;
	type OffchainStorage = InMemOffchainStorage;
	fn begin_operation(&self) -> sp_blockchain::Result<Self::BlockImportOperation> {
		let old_state = self.state_at(Default::default())?;
		Ok(BlockImportOperation {
			pending_block: None,
			old_state,
			new_state: None,
			aux: Default::default(),
			storage_updates: Default::default(),
			finalized_blocks: Default::default(),
			set_head: None,
			before_fork: false,
		})
	}
	fn begin_state_operation(
		&self,
		operation: &mut Self::BlockImportOperation,
		block: Block::Hash,
	) -> sp_blockchain::Result<()> {
		operation.old_state = self.state_at(block)?;
		Ok(())
	}
	fn commit_operation(&self, operation: Self::BlockImportOperation) -> sp_blockchain::Result<()> {
		for (block, justification) in operation.finalized_blocks {
			self.blockchain.finalize_header(block, justification)?;
		}
		if let Some(pending_block) = operation.pending_block {
			let old_state = &operation.old_state;
			let (header, body, justification) = pending_block.block.into_inner();
			let hash = header.hash();
			let new_removed_keys = old_state.removed_keys.clone();
			for (key, value) in operation.storage_updates.clone() {
				if value.is_some() {
					new_removed_keys.write().remove(&key.clone());
				} else {
					new_removed_keys.write().insert(key.clone(), ());
				}
			}
			let new_db = old_state.db.clone();
			new_db.write().insert(
				vec![(None::<ChildInfo>, operation.storage_updates)],
				StateVersion::V1,
			);
			let new_state = ForkedLazyBackend {
				rpc_client: self.rpc_client.clone(),
				block_hash: Some(hash.clone()),
				fork_block: self.fork_checkpoint.hash(),
				db: new_db,
				removed_keys: new_removed_keys,
				before_fork: operation.before_fork,
			};
			self.states.write().insert(hash, new_state);
			self.blockchain
				.insert(hash, header, justification, body, pending_block.state)?;
		}
		if !operation.aux.is_empty() {
			self.blockchain.write_aux(operation.aux);
		}
		if let Some(set_head) = operation.set_head {
			self.blockchain.set_head(set_head)?;
		}
		Ok(())
	}
	fn finalize_block(
		&self,
		hash: Block::Hash,
		justification: Option<Justification>,
	) -> sp_blockchain::Result<()> {
		self.blockchain.finalize_header(hash, justification)
	}
	fn append_justification(
		&self,
		hash: Block::Hash,
		justification: Justification,
	) -> sp_blockchain::Result<()> {
		self.blockchain.append_justification(hash, justification)
	}
	fn blockchain(&self) -> &Self::Blockchain {
		&self.blockchain
	}
	fn usage_info(&self) -> Option<UsageInfo> {
		None
	}
	fn offchain_storage(&self) -> Option<Self::OffchainStorage> {
		None
	}
	fn state_at(&self, hash: Block::Hash) -> sp_blockchain::Result<Self::State> {
		if hash == Default::default() {
			return Ok(ForkedLazyBackend::<Block> {
				rpc_client: self.rpc_client.clone(),
				block_hash: Some(hash),
				fork_block: self.fork_checkpoint.hash(),
				db: Default::default(),
				removed_keys: Default::default(),
				before_fork: true,
			});
		}
		let (backend, should_write) = self
			.states
			.read()
			.get(&hash)
			.cloned()
			.map(|state| Ok((state, false)))
			.unwrap_or_else(|| {
				self.rpc_client
					.header::<Block>(Some(hash))
					.ok()
					.flatten()
					.ok_or(sp_blockchain::Error::UnknownBlock(
						format!("Failed to fetch block header: {:?}", hash).into(),
					))
					.map(|header| {
						let checkpoint = self.fork_checkpoint.clone();
						let state = if header.number().gt(checkpoint.number()) {
							let parent = self.state_at(*header.parent_hash()).ok();
							ForkedLazyBackend::<Block> {
								rpc_client: self.rpc_client.clone(),
								block_hash: Some(hash),
								fork_block: checkpoint.hash(),
								db: parent.clone().map_or(Default::default(), |p| p.db),
								removed_keys: parent.map_or(Default::default(), |p| p.removed_keys),
								before_fork: false,
							}
						} else {
							ForkedLazyBackend::<Block> {
								rpc_client: self.rpc_client.clone(),
								block_hash: Some(hash),
								fork_block: checkpoint.hash(),
								db: Default::default(),
								removed_keys: Default::default(),
								before_fork: true,
							}
						};
						(state, true)
					})
			})?;
		if should_write {
			self.states.write().insert(hash, backend.clone());
		}
		Ok(backend)
	}
	fn revert(
		&self,
		_n: NumberFor<Block>,
		_revert_finalized: bool,
	) -> sp_blockchain::Result<(NumberFor<Block>, HashSet<Block::Hash>)> {
		Ok((Zero::zero(), HashSet::new()))
	}
	fn remove_leaf_block(&self, _hash: Block::Hash) -> sp_blockchain::Result<()> {
		Ok(())
	}
	fn get_import_lock(&self) -> &parking_lot::RwLock<()> {
		&self.import_lock
	}
	fn requires_full_sync(&self) -> bool {
		false
	}
	fn pin_block(&self, hash: <Block as BlockT>::Hash) -> blockchain::Result<()> {
		let mut blocks = self.pinned_blocks.write();
		*blocks.entry(hash).or_default() += 1;
		Ok(())
	}
	fn unpin_block(&self, hash: <Block as BlockT>::Hash) {
		let mut blocks = self.pinned_blocks.write();
		blocks
			.entry(hash)
			.and_modify(|counter| *counter -= 1)
			.or_insert(-1);
	}
}
impl<Block: BlockT + DeserializeOwned> backend::LocalBackend<Block> for Backend<Block> {}
/// Check that genesis storage is valid.
pub fn check_genesis_storage(storage: &Storage) -> sp_blockchain::Result<()> {
	if storage
		.top
		.iter()
		.any(|(k, _)| well_known_keys::is_child_storage_key(k))
	{
		return Err(sp_blockchain::Error::InvalidState);
	}
	if storage
		.children_default
		.keys()
		.any(|child_key| !well_known_keys::is_child_storage_key(child_key))
	{
		return Err(sp_blockchain::Error::InvalidState);
	}
	Ok(())
}
/// Create an instance of a lazy loading memory backend.
pub fn new_backend<Block>(
	config: &mut Configuration,
	lazy_loading_config: &LazyLoadingConfig,
) -> Result<Arc<Backend<Block>>, Error>
where
	Block: BlockT + DeserializeOwned,
	Block::Hash: From<H256>,
{
	let http_client = jsonrpsee::http_client::HttpClientBuilder::default()
		.max_request_size(u32::MAX)
		.max_response_size(u32::MAX)
		.request_timeout(Duration::from_secs(10))
		.build(lazy_loading_config.state_rpc.clone())
		.map_err(|e| {
			sp_blockchain::Error::Backend(
				format!("failed to build http client: {:?}", e).to_string(),
			)
		})?;
	let rpc = super::rpc_client::RPC::new(
		http_client,
		lazy_loading_config.delay_between_requests,
		lazy_loading_config.max_retries_per_request,
	);
	let block_hash = lazy_loading_config
		.from_block
		.map(|block| Into::<Block::Hash>::into(block));
	let checkpoint: Block = rpc
		.block::<Block, _>(block_hash)
		.ok()
		.flatten()
		.expect("Fetching fork checkpoint")
		.block;
	let backend = Arc::new(Backend::new(Arc::new(rpc), checkpoint.header().clone()));
	let chain_name = backend
		.rpc_client
		.system_chain()
		.expect("Should fetch chain id");
	let chain_properties = backend
		.rpc_client
		.system_properties()
		.expect("Should fetch chain properties");
	let spec_builder = lazy_loading::spec_builder()
		.with_name(chain_name.as_str())
		.with_properties(chain_properties);
	config.chain_spec = Box::new(spec_builder.build());
	let base_overrides =
		state_overrides::base_state_overrides(lazy_loading_config.runtime_override.clone());
	let custom_overrides = if let Some(path) = lazy_loading_config.state_overrides_path.clone() {
		state_overrides::read(path)?
	} else {
		Default::default()
	};
	let state_overrides: Vec<(Vec<u8>, Vec<u8>)> = [base_overrides, custom_overrides]
		.concat()
		.iter()
		.map(|entry| match entry {
			StateEntry::Concrete(v) => {
				let key = [
					&twox_128(v.pallet.as_bytes()),
					&twox_128(v.storage.as_bytes()),
					v.key.clone().unwrap_or(Vec::new()).as_slice(),
				]
				.concat();
				(key, v.value.clone())
			}
			StateEntry::Raw(raw) => (raw.key.clone(), raw.value.clone()),
		})
		.collect();
	// Produce first block after the fork
	let _ = helpers::produce_first_block(backend.clone(), checkpoint, state_overrides)?;
	Ok(backend)
}