1
// Copyright 2024 Moonbeam foundation
2
// This file is part of Moonbeam.
3

            
4
// Moonbeam is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Moonbeam is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Moonbeam.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
use sp_blockchain::{CachedHeaderMetadata, HeaderMetadata};
18
use sp_core::storage::well_known_keys;
19
use sp_runtime::{
20
	generic::BlockId,
21
	traits::{Block as BlockT, HashingFor, Header as HeaderT, NumberFor, Zero},
22
	Justification, Justifications, StateVersion, Storage,
23
};
24
use sp_state_machine::{
25
	BackendTransaction, ChildStorageCollection, IndexOperation, StorageCollection, TrieBackend,
26
};
27
use std::future::Future;
28
use std::marker::PhantomData;
29
use std::time::Duration;
30
use std::{
31
	collections::{HashMap, HashSet},
32
	ptr,
33
	sync::{
34
		atomic::{AtomicU64, Ordering},
35
		Arc,
36
	},
37
};
38

            
39
use sc_client_api::{
40
	backend::{self, NewBlockState},
41
	blockchain::{self, BlockStatus, HeaderBackend},
42
	leaves::LeafSet,
43
	UsageInfo,
44
};
45

            
46
use jsonrpsee::http_client::HttpClient;
47
use sp_runtime::generic::SignedBlock;
48

            
49
use crate::chain_spec;
50
use crate::lazy_loading::lock::ReadWriteLock;
51
use crate::lazy_loading::state_overrides::StateEntry;
52
use crate::lazy_loading::{helpers, state_overrides};
53
use moonbeam_cli_opt::LazyLoadingConfig;
54
use moonbeam_core_primitives::BlockNumber;
55
use sc_client_api::StorageKey;
56
use sc_service::{Configuration, Error};
57
use serde::de::DeserializeOwned;
58
use sp_core::offchain::storage::InMemOffchainStorage;
59
use sp_core::{twox_128, H256};
60
use sp_rpc::list::ListOrValue;
61
use sp_rpc::number::NumberOrHex;
62
use sp_storage::{ChildInfo, StorageData};
63
use sp_trie::PrefixedMemoryDB;
64
use tokio_retry::strategy::FixedInterval;
65
use tokio_retry::Retry;
66

            
67
struct PendingBlock<B: BlockT> {
68
	block: StoredBlock<B>,
69
	state: NewBlockState,
70
}
71

            
72
#[derive(PartialEq, Eq, Clone)]
73
enum StoredBlock<B: BlockT> {
74
	Header(B::Header, Option<Justifications>),
75
	Full(B, Option<Justifications>),
76
}
77

            
78
impl<B: BlockT> StoredBlock<B> {
79
	fn new(
80
		header: B::Header,
81
		body: Option<Vec<B::Extrinsic>>,
82
		just: Option<Justifications>,
83
	) -> Self {
84
		match body {
85
			Some(body) => StoredBlock::Full(B::new(header, body), just),
86
			None => StoredBlock::Header(header, just),
87
		}
88
	}
89

            
90
	fn header(&self) -> &B::Header {
91
		match *self {
92
			StoredBlock::Header(ref h, _) => h,
93
			StoredBlock::Full(ref b, _) => b.header(),
94
		}
95
	}
96

            
97
	fn justifications(&self) -> Option<&Justifications> {
98
		match *self {
99
			StoredBlock::Header(_, ref j) | StoredBlock::Full(_, ref j) => j.as_ref(),
100
		}
101
	}
102

            
103
	fn extrinsics(&self) -> Option<&[B::Extrinsic]> {
104
		match *self {
105
			StoredBlock::Header(_, _) => None,
106
			StoredBlock::Full(ref b, _) => Some(b.extrinsics()),
107
		}
108
	}
109

            
110
	fn into_inner(self) -> (B::Header, Option<Vec<B::Extrinsic>>, Option<Justifications>) {
111
		match self {
112
			StoredBlock::Header(header, just) => (header, None, just),
113
			StoredBlock::Full(block, just) => {
114
				let (header, body) = block.deconstruct();
115
				(header, Some(body), just)
116
			}
117
		}
118
	}
119
}
120

            
121
#[derive(Clone)]
122
struct BlockchainStorage<Block: BlockT> {
123
	blocks: HashMap<Block::Hash, StoredBlock<Block>>,
124
	hashes: HashMap<NumberFor<Block>, Block::Hash>,
125
	best_hash: Block::Hash,
126
	best_number: NumberFor<Block>,
127
	finalized_hash: Block::Hash,
128
	finalized_number: NumberFor<Block>,
129
	genesis_hash: Block::Hash,
130
	header_cht_roots: HashMap<NumberFor<Block>, Block::Hash>,
131
	leaves: LeafSet<Block::Hash, NumberFor<Block>>,
132
	aux: HashMap<Vec<u8>, Vec<u8>>,
133
}
134

            
135
/// In-memory blockchain. Supports concurrent reads.
136
#[derive(Clone)]
137
pub struct Blockchain<Block: BlockT> {
138
	rpc_client: Arc<RPC>,
139
	storage: Arc<ReadWriteLock<BlockchainStorage<Block>>>,
140
}
141

            
142
impl<Block: BlockT + DeserializeOwned> Blockchain<Block> {
143
	/// Get header hash of given block.
144
	pub fn id(&self, id: BlockId<Block>) -> Option<Block::Hash> {
145
		match id {
146
			BlockId::Hash(h) => Some(h),
147
			BlockId::Number(n) => {
148
				let block_hash = self.storage.read().hashes.get(&n).cloned();
149
				match block_hash {
150
					None => {
151
						let block_hash =
152
							self.rpc_client.block_hash::<Block>(Some(n)).ok().flatten();
153

            
154
						block_hash.clone().map(|h| {
155
							self.storage.write().hashes.insert(n, h);
156
						});
157

            
158
						block_hash
159
					}
160
					block_hash => block_hash,
161
				}
162
			}
163
		}
164
	}
165

            
166
	/// Create new in-memory blockchain storage.
167
	fn new(rpc_client: Arc<RPC>) -> Blockchain<Block> {
168
		let storage = Arc::new(ReadWriteLock::new(BlockchainStorage {
169
			blocks: HashMap::new(),
170
			hashes: HashMap::new(),
171
			best_hash: Default::default(),
172
			best_number: Zero::zero(),
173
			finalized_hash: Default::default(),
174
			finalized_number: Zero::zero(),
175
			genesis_hash: Default::default(),
176
			header_cht_roots: HashMap::new(),
177
			leaves: LeafSet::new(),
178
			aux: HashMap::new(),
179
		}));
180
		Blockchain {
181
			rpc_client,
182
			storage,
183
		}
184
	}
185

            
186
	/// Insert a block header and associated data.
187
	pub fn insert(
188
		&self,
189
		hash: Block::Hash,
190
		header: <Block as BlockT>::Header,
191
		justifications: Option<Justifications>,
192
		body: Option<Vec<<Block as BlockT>::Extrinsic>>,
193
		new_state: NewBlockState,
194
	) -> sp_blockchain::Result<()> {
195
		let number = *header.number();
196
		if new_state.is_best() {
197
			self.apply_head(&header)?;
198
		}
199

            
200
		let mut storage = self.storage.write();
201
		if number.is_zero() {
202
			storage.genesis_hash = hash;
203
		} else {
204
			storage.leaves.import(hash, number, *header.parent_hash());
205
			storage
206
				.blocks
207
				.insert(hash, StoredBlock::new(header, body, justifications));
208

            
209
			if let NewBlockState::Final = new_state {
210
				storage.finalized_hash = hash;
211
				storage.finalized_number = number;
212
			}
213
		}
214

            
215
		Ok(())
216
	}
217

            
218
	/// Get total number of blocks.
219
	pub fn blocks_count(&self) -> usize {
220
		let count = self.storage.read().blocks.len();
221

            
222
		log::debug!(
223
			target: super::LAZY_LOADING_LOG_TARGET,
224
			"Total number of blocks: {:?}",
225
			count
226
		);
227

            
228
		count
229
	}
230

            
231
	/// Compare this blockchain with another in-mem blockchain
232
	pub fn equals_to(&self, other: &Self) -> bool {
233
		// Check ptr equality first to avoid double read locks.
234
		if ptr::eq(self, other) {
235
			return true;
236
		}
237
		self.canon_equals_to(other) && self.storage.read().blocks == other.storage.read().blocks
238
	}
239

            
240
	/// Compare canonical chain to other canonical chain.
241
	pub fn canon_equals_to(&self, other: &Self) -> bool {
242
		// Check ptr equality first to avoid double read locks.
243
		if ptr::eq(self, other) {
244
			return true;
245
		}
246
		let this = self.storage.read();
247
		let other = other.storage.read();
248
		this.hashes == other.hashes
249
			&& this.best_hash == other.best_hash
250
			&& this.best_number == other.best_number
251
			&& this.genesis_hash == other.genesis_hash
252
	}
253

            
254
	/// Insert header CHT root.
255
	pub fn insert_cht_root(&self, block: NumberFor<Block>, cht_root: Block::Hash) {
256
		self.storage
257
			.write()
258
			.header_cht_roots
259
			.insert(block, cht_root);
260
	}
261

            
262
	/// Set an existing block as head.
263
	pub fn set_head(&self, hash: Block::Hash) -> sp_blockchain::Result<()> {
264
		let header = self
265
			.header(hash)?
266
			.ok_or_else(|| sp_blockchain::Error::UnknownBlock(format!("{}", hash)))?;
267

            
268
		self.apply_head(&header)
269
	}
270

            
271
	fn apply_head(&self, header: &<Block as BlockT>::Header) -> sp_blockchain::Result<()> {
272
		let mut storage = self.storage.write();
273

            
274
		let hash = header.hash();
275
		let number = header.number();
276

            
277
		storage.best_hash = hash;
278
		storage.best_number = *number;
279
		storage.hashes.insert(*number, hash);
280

            
281
		Ok(())
282
	}
283

            
284
	fn finalize_header(
285
		&self,
286
		block: Block::Hash,
287
		justification: Option<Justification>,
288
	) -> sp_blockchain::Result<()> {
289
		let mut storage = self.storage.write();
290
		storage.finalized_hash = block;
291

            
292
		if justification.is_some() {
293
			let block = storage
294
				.blocks
295
				.get_mut(&block)
296
				.expect("hash was fetched from a block in the db; qed");
297

            
298
			let block_justifications = match block {
299
				StoredBlock::Header(_, ref mut j) | StoredBlock::Full(_, ref mut j) => j,
300
			};
301

            
302
			*block_justifications = justification.map(Justifications::from);
303
		}
304

            
305
		Ok(())
306
	}
307

            
308
	fn append_justification(
309
		&self,
310
		hash: Block::Hash,
311
		justification: Justification,
312
	) -> sp_blockchain::Result<()> {
313
		let mut storage = self.storage.write();
314

            
315
		let block = storage
316
			.blocks
317
			.get_mut(&hash)
318
			.expect("hash was fetched from a block in the db; qed");
319

            
320
		let block_justifications = match block {
321
			StoredBlock::Header(_, ref mut j) | StoredBlock::Full(_, ref mut j) => j,
322
		};
323

            
324
		if let Some(stored_justifications) = block_justifications {
325
			if !stored_justifications.append(justification) {
326
				return Err(sp_blockchain::Error::BadJustification(
327
					"Duplicate consensus engine ID".into(),
328
				));
329
			}
330
		} else {
331
			*block_justifications = Some(Justifications::from(justification));
332
		};
333

            
334
		Ok(())
335
	}
336

            
337
	fn write_aux(&self, ops: Vec<(Vec<u8>, Option<Vec<u8>>)>) {
338
		let mut storage = self.storage.write();
339
		for (k, v) in ops {
340
			match v {
341
				Some(v) => storage.aux.insert(k, v),
342
				None => storage.aux.remove(&k),
343
			};
344
		}
345
	}
346
}
347

            
348
impl<Block: BlockT + DeserializeOwned> HeaderBackend<Block> for Blockchain<Block> {
349
	fn header(
350
		&self,
351
		hash: Block::Hash,
352
	) -> sp_blockchain::Result<Option<<Block as BlockT>::Header>> {
353
		// First, try to get the header from local storage
354
		if let Some(header) = self
355
			.storage
356
			.read()
357
			.blocks
358
			.get(&hash)
359
			.map(|b| b.header().clone())
360
		{
361
			return Ok(Some(header));
362
		}
363

            
364
		// If not found in local storage, fetch from RPC client
365
		let header = self
366
			.rpc_client
367
			.block::<Block, _>(Some(hash))
368
			.ok()
369
			.flatten()
370
			.map(|full_block| {
371
				// Cache block header
372
				let block = full_block.block.clone();
373
				self.storage.write().blocks.insert(
374
					hash,
375
					StoredBlock::Full(block.clone(), full_block.justifications),
376
				);
377

            
378
				block.header().clone()
379
			});
380

            
381
		if header.is_none() {
382
			log::warn!(
383
				target: super::LAZY_LOADING_LOG_TARGET,
384
				"Expected block {:x?} to exist.",
385
				&hash
386
			);
387
		}
388

            
389
		Ok(header)
390
	}
391

            
392
	fn info(&self) -> blockchain::Info<Block> {
393
		let storage = self.storage.read();
394
		blockchain::Info {
395
			best_hash: storage.best_hash,
396
			best_number: storage.best_number,
397
			genesis_hash: storage.genesis_hash,
398
			finalized_hash: storage.finalized_hash,
399
			finalized_number: storage.finalized_number,
400
			finalized_state: Some((storage.finalized_hash, storage.finalized_number)),
401
			number_leaves: storage.leaves.count(),
402
			block_gap: None,
403
		}
404
	}
405

            
406
	fn status(&self, hash: Block::Hash) -> sp_blockchain::Result<BlockStatus> {
407
		match self.storage.read().blocks.contains_key(&hash) {
408
			true => Ok(BlockStatus::InChain),
409
			false => Ok(BlockStatus::Unknown),
410
		}
411
	}
412

            
413
	fn number(&self, hash: Block::Hash) -> sp_blockchain::Result<Option<NumberFor<Block>>> {
414
		let number = match self.storage.read().blocks.get(&hash) {
415
			Some(block) => *block.header().number(),
416
			_ => match self.rpc_client.block::<Block, _>(Some(hash)) {
417
				Ok(Some(block)) => *block.block.header().number(),
418
				err => {
419
					return Err(sp_blockchain::Error::UnknownBlock(
420
						format!("Failed to fetch block number from RPC: {:?}", err).into(),
421
					));
422
				}
423
			},
424
		};
425

            
426
		Ok(Some(number))
427
	}
428

            
429
	fn hash(
430
		&self,
431
		number: <<Block as BlockT>::Header as HeaderT>::Number,
432
	) -> sp_blockchain::Result<Option<Block::Hash>> {
433
		Ok(self.id(BlockId::Number(number)))
434
	}
435
}
436

            
437
impl<Block: BlockT + DeserializeOwned> HeaderMetadata<Block> for Blockchain<Block> {
438
	type Error = sp_blockchain::Error;
439

            
440
	fn header_metadata(
441
		&self,
442
		hash: Block::Hash,
443
	) -> Result<CachedHeaderMetadata<Block>, Self::Error> {
444
		self.header(hash)?
445
			.map(|header| CachedHeaderMetadata::from(&header))
446
			.ok_or_else(|| {
447
				sp_blockchain::Error::UnknownBlock(format!("header not found: {}", hash))
448
			})
449
	}
450

            
451
	fn insert_header_metadata(&self, _hash: Block::Hash, _metadata: CachedHeaderMetadata<Block>) {
452
		// No need to implement.
453
		unimplemented!("insert_header_metadata")
454
	}
455
	fn remove_header_metadata(&self, _hash: Block::Hash) {
456
		// No need to implement.
457
		unimplemented!("remove_header_metadata")
458
	}
459
}
460

            
461
impl<Block: BlockT + DeserializeOwned> blockchain::Backend<Block> for Blockchain<Block> {
462
	fn body(
463
		&self,
464
		hash: Block::Hash,
465
	) -> sp_blockchain::Result<Option<Vec<<Block as BlockT>::Extrinsic>>> {
466
		// First, try to get the header from local storage
467
		if let Some(extrinsics) = self
468
			.storage
469
			.read()
470
			.blocks
471
			.get(&hash)
472
			.and_then(|b| b.extrinsics().map(|x| x.to_vec()))
473
		{
474
			return Ok(Some(extrinsics));
475
		}
476
		let extrinsics = self
477
			.rpc_client
478
			.block::<Block, Block::Hash>(Some(hash))
479
			.ok()
480
			.flatten()
481
			.map(|b| b.block.extrinsics().to_vec());
482

            
483
		Ok(extrinsics)
484
	}
485

            
486
	fn justifications(&self, hash: Block::Hash) -> sp_blockchain::Result<Option<Justifications>> {
487
		Ok(self
488
			.storage
489
			.read()
490
			.blocks
491
			.get(&hash)
492
			.and_then(|b| b.justifications().cloned()))
493
	}
494

            
495
	fn last_finalized(&self) -> sp_blockchain::Result<Block::Hash> {
496
		let last_finalized = self.storage.read().finalized_hash;
497

            
498
		Ok(last_finalized)
499
	}
500

            
501
	fn leaves(&self) -> sp_blockchain::Result<Vec<Block::Hash>> {
502
		let leaves = self.storage.read().leaves.hashes();
503

            
504
		Ok(leaves)
505
	}
506

            
507
	fn children(&self, _parent_hash: Block::Hash) -> sp_blockchain::Result<Vec<Block::Hash>> {
508
		unimplemented!("Not supported by the `lazy-loading` backend.")
509
	}
510

            
511
	fn indexed_transaction(&self, _hash: Block::Hash) -> sp_blockchain::Result<Option<Vec<u8>>> {
512
		unimplemented!("Not supported by the `lazy-loading` backend.")
513
	}
514

            
515
	fn block_indexed_body(
516
		&self,
517
		_hash: Block::Hash,
518
	) -> sp_blockchain::Result<Option<Vec<Vec<u8>>>> {
519
		unimplemented!("Not supported by the `lazy-loading` backend.")
520
	}
521
}
522

            
523
impl<Block: BlockT + DeserializeOwned> backend::AuxStore for Blockchain<Block> {
524
	fn insert_aux<
525
		'a,
526
		'b: 'a,
527
		'c: 'a,
528
		I: IntoIterator<Item = &'a (&'c [u8], &'c [u8])>,
529
		D: IntoIterator<Item = &'a &'b [u8]>,
530
	>(
531
		&self,
532
		insert: I,
533
		delete: D,
534
	) -> sp_blockchain::Result<()> {
535
		let mut storage = self.storage.write();
536
		for (k, v) in insert {
537
			storage.aux.insert(k.to_vec(), v.to_vec());
538
		}
539
		for k in delete {
540
			storage.aux.remove(*k);
541
		}
542
		Ok(())
543
	}
544

            
545
	fn get_aux(&self, key: &[u8]) -> sp_blockchain::Result<Option<Vec<u8>>> {
546
		Ok(self.storage.read().aux.get(key).cloned())
547
	}
548
}
549

            
550
pub struct BlockImportOperation<Block: BlockT> {
551
	pending_block: Option<PendingBlock<Block>>,
552
	old_state: ForkedLazyBackend<Block>,
553
	new_state: Option<BackendTransaction<HashingFor<Block>>>,
554
	aux: Vec<(Vec<u8>, Option<Vec<u8>>)>,
555
	storage_updates: StorageCollection,
556
	finalized_blocks: Vec<(Block::Hash, Option<Justification>)>,
557
	set_head: Option<Block::Hash>,
558
	pub(crate) before_fork: bool,
559
}
560

            
561
impl<Block: BlockT + DeserializeOwned> BlockImportOperation<Block> {
562
	fn apply_storage(
563
		&mut self,
564
		storage: Storage,
565
		commit: bool,
566
		state_version: StateVersion,
567
	) -> sp_blockchain::Result<Block::Hash> {
568
		use sp_state_machine::Backend;
569
		check_genesis_storage(&storage)?;
570

            
571
		let child_delta = storage.children_default.values().map(|child_content| {
572
			(
573
				&child_content.child_info,
574
				child_content
575
					.data
576
					.iter()
577
					.map(|(k, v)| (k.as_ref(), Some(v.as_ref()))),
578
			)
579
		});
580

            
581
		let (root, transaction) = self.old_state.full_storage_root(
582
			storage
583
				.top
584
				.iter()
585
				.map(|(k, v)| (k.as_ref(), Some(v.as_ref()))),
586
			child_delta,
587
			state_version,
588
		);
589

            
590
		if commit {
591
			self.new_state = Some(transaction);
592
			self.storage_updates = storage
593
				.top
594
				.iter()
595
				.map(|(k, v)| {
596
					if v.is_empty() {
597
						(k.clone(), None)
598
					} else {
599
						(k.clone(), Some(v.clone()))
600
					}
601
				})
602
				.collect();
603
		}
604
		Ok(root)
605
	}
606
}
607

            
608
impl<Block: BlockT + DeserializeOwned> backend::BlockImportOperation<Block>
609
	for BlockImportOperation<Block>
610
{
611
	type State = ForkedLazyBackend<Block>;
612

            
613
	fn state(&self) -> sp_blockchain::Result<Option<&Self::State>> {
614
		Ok(Some(&self.old_state))
615
	}
616

            
617
	fn set_block_data(
618
		&mut self,
619
		header: <Block as BlockT>::Header,
620
		body: Option<Vec<<Block as BlockT>::Extrinsic>>,
621
		_indexed_body: Option<Vec<Vec<u8>>>,
622
		justifications: Option<Justifications>,
623
		state: NewBlockState,
624
	) -> sp_blockchain::Result<()> {
625
		assert!(
626
			self.pending_block.is_none(),
627
			"Only one block per operation is allowed"
628
		);
629
		self.pending_block = Some(PendingBlock {
630
			block: StoredBlock::new(header, body, justifications),
631
			state,
632
		});
633
		Ok(())
634
	}
635

            
636
	fn update_db_storage(
637
		&mut self,
638
		update: BackendTransaction<HashingFor<Block>>,
639
	) -> sp_blockchain::Result<()> {
640
		self.new_state = Some(update);
641
		Ok(())
642
	}
643

            
644
	fn set_genesis_state(
645
		&mut self,
646
		storage: Storage,
647
		commit: bool,
648
		state_version: StateVersion,
649
	) -> sp_blockchain::Result<Block::Hash> {
650
		self.apply_storage(storage, commit, state_version)
651
	}
652

            
653
	fn reset_storage(
654
		&mut self,
655
		storage: Storage,
656
		state_version: StateVersion,
657
	) -> sp_blockchain::Result<Block::Hash> {
658
		self.apply_storage(storage, true, state_version)
659
	}
660

            
661
	fn insert_aux<I>(&mut self, ops: I) -> sp_blockchain::Result<()>
662
	where
663
		I: IntoIterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
664
	{
665
		self.aux.append(&mut ops.into_iter().collect());
666
		Ok(())
667
	}
668

            
669
	fn update_storage(
670
		&mut self,
671
		update: StorageCollection,
672
		_child_update: ChildStorageCollection,
673
	) -> sp_blockchain::Result<()> {
674
		self.storage_updates = update.clone();
675
		Ok(())
676
	}
677

            
678
	fn mark_finalized(
679
		&mut self,
680
		hash: Block::Hash,
681
		justification: Option<Justification>,
682
	) -> sp_blockchain::Result<()> {
683
		self.finalized_blocks.push((hash, justification));
684
		Ok(())
685
	}
686

            
687
	fn mark_head(&mut self, hash: Block::Hash) -> sp_blockchain::Result<()> {
688
		assert!(
689
			self.pending_block.is_none(),
690
			"Only one set block per operation is allowed"
691
		);
692
		self.set_head = Some(hash);
693
		Ok(())
694
	}
695

            
696
	fn update_transaction_index(
697
		&mut self,
698
		_index: Vec<IndexOperation>,
699
	) -> sp_blockchain::Result<()> {
700
		Ok(())
701
	}
702

            
703
	fn set_create_gap(&mut self, _create_gap: bool) {
704
		// This implementation can be left empty or implemented as needed
705
		// For now, we're just implementing the trait method with no functionality
706
	}
707
}
708

            
709
/// DB-backed patricia trie state, transaction type is an overlay of changes to commit.
710
pub type DbState<B> = TrieBackend<Arc<dyn sp_state_machine::Storage<HashingFor<B>>>, HashingFor<B>>;
711

            
712
/// A struct containing arguments for iterating over the storage.
713
#[derive(Default)]
714
pub struct RawIterArgs {
715
	/// The prefix of the keys over which to iterate.
716
	pub prefix: Option<Vec<u8>>,
717

            
718
	/// The prefix from which to start the iteration from.
719
	///
720
	/// This is inclusive and the iteration will include the key which is specified here.
721
	pub start_at: Option<Vec<u8>>,
722

            
723
	/// If this is `true` then the iteration will *not* include
724
	/// the key specified in `start_at`, if there is such a key.
725
	pub start_at_exclusive: bool,
726
}
727

            
728
/// A raw iterator over the `BenchmarkingState`.
729
pub struct RawIter<Block: BlockT> {
730
	pub(crate) args: RawIterArgs,
731
	complete: bool,
732
	_phantom: PhantomData<Block>,
733
}
734

            
735
impl<Block: BlockT + DeserializeOwned> sp_state_machine::StorageIterator<HashingFor<Block>>
736
	for RawIter<Block>
737
{
738
	type Backend = ForkedLazyBackend<Block>;
739
	type Error = String;
740

            
741
	fn next_key(
742
		&mut self,
743
		backend: &Self::Backend,
744
	) -> Option<Result<sp_state_machine::StorageKey, Self::Error>> {
745
		use sp_state_machine::Backend;
746

            
747
		let remote_fetch =
748
			|key: Option<StorageKey>, start_key: Option<StorageKey>, block: Option<Block::Hash>| {
749
				let result = backend
750
					.rpc_client
751
					.storage_keys_paged(key, 5, start_key, block);
752

            
753
				match result {
754
					Ok(keys) => keys.first().map(|key| key.clone()),
755
					Err(err) => {
756
						log::trace!(
757
							target: super::LAZY_LOADING_LOG_TARGET,
758
							"Failed to fetch `next key` from RPC: {:?}",
759
							err
760
						);
761

            
762
						None
763
					}
764
				}
765
			};
766

            
767
		let prefix = self.args.prefix.clone().map(|k| StorageKey(k));
768
		let start_key = self.args.start_at.clone().map(|k| StorageKey(k));
769

            
770
		let maybe_next_key = if backend.before_fork {
771
			remote_fetch(prefix, start_key, backend.block_hash)
772
		} else {
773
			let mut iter_args = sp_state_machine::backend::IterArgs::default();
774
			iter_args.prefix = self.args.prefix.as_deref();
775
			iter_args.start_at = self.args.start_at.as_deref();
776
			iter_args.start_at_exclusive = true;
777
			iter_args.stop_on_incomplete_database = true;
778

            
779
			let readable_db = backend.db.read();
780
			let next_storage_key = readable_db
781
				.raw_iter(iter_args)
782
				.map(|mut iter| iter.next_key(&readable_db))
783
				.map(|op| op.and_then(|result| result.ok()))
784
				.ok()
785
				.flatten();
786

            
787
			// IMPORTANT: free storage read lock
788
			drop(readable_db);
789

            
790
			let removed_key = start_key
791
				.clone()
792
				.or(prefix.clone())
793
				.map(|key| backend.removed_keys.read().contains_key(&key.0))
794
				.unwrap_or(false);
795
			if next_storage_key.is_none() && !removed_key {
796
				let maybe_next_key = remote_fetch(prefix, start_key, Some(backend.fork_block));
797
				match maybe_next_key {
798
					Some(key) if !backend.removed_keys.read().contains_key(&key) => Some(key),
799
					_ => None,
800
				}
801
			} else {
802
				next_storage_key
803
			}
804
		};
805

            
806
		log::trace!(
807
			target: super::LAZY_LOADING_LOG_TARGET,
808
			"next_key: (prefix: {:?}, start_at: {:?}, next_key: {:?})",
809
			self.args.prefix.clone().map(|key| hex::encode(key)),
810
			self.args.start_at.clone().map(|key| hex::encode(key)),
811
			maybe_next_key.clone().map(|key| hex::encode(key))
812
		);
813

            
814
		if let Some(next_key) = maybe_next_key {
815
			if self
816
				.args
817
				.prefix
818
				.clone()
819
				.map(|filter_key| next_key.starts_with(&filter_key))
820
				.unwrap_or(false)
821
			{
822
				self.args.start_at = Some(next_key.clone());
823
				Some(Ok(next_key))
824
			} else {
825
				self.complete = true;
826
				None
827
			}
828
		} else {
829
			self.complete = true;
830
			None
831
		}
832
	}
833

            
834
	fn next_pair(
835
		&mut self,
836
		backend: &Self::Backend,
837
	) -> Option<Result<(sp_state_machine::StorageKey, sp_state_machine::StorageValue), Self::Error>>
838
	{
839
		use sp_state_machine::Backend;
840

            
841
		let remote_fetch =
842
			|key: Option<StorageKey>, start_key: Option<StorageKey>, block: Option<Block::Hash>| {
843
				let result = backend
844
					.rpc_client
845
					.storage_keys_paged(key, 5, start_key, block);
846

            
847
				match result {
848
					Ok(keys) => keys.first().map(|key| key.clone()),
849
					Err(err) => {
850
						log::trace!(
851
							target: super::LAZY_LOADING_LOG_TARGET,
852
							"Failed to fetch `next key` from RPC: {:?}",
853
							err
854
						);
855

            
856
						None
857
					}
858
				}
859
			};
860

            
861
		let prefix = self.args.prefix.clone().map(|k| StorageKey(k));
862
		let start_key = self.args.start_at.clone().map(|k| StorageKey(k));
863

            
864
		let maybe_next_key = if backend.before_fork {
865
			remote_fetch(prefix, start_key, backend.block_hash)
866
		} else {
867
			let mut iter_args = sp_state_machine::backend::IterArgs::default();
868
			iter_args.prefix = self.args.prefix.as_deref();
869
			iter_args.start_at = self.args.start_at.as_deref();
870
			iter_args.start_at_exclusive = true;
871
			iter_args.stop_on_incomplete_database = true;
872

            
873
			let readable_db = backend.db.read();
874
			let next_storage_key = readable_db
875
				.raw_iter(iter_args)
876
				.map(|mut iter| iter.next_key(&readable_db))
877
				.map(|op| op.and_then(|result| result.ok()))
878
				.ok()
879
				.flatten();
880

            
881
			// IMPORTANT: free storage read lock
882
			drop(readable_db);
883

            
884
			let removed_key = start_key
885
				.clone()
886
				.or(prefix.clone())
887
				.map(|key| backend.removed_keys.read().contains_key(&key.0))
888
				.unwrap_or(false);
889
			if next_storage_key.is_none() && !removed_key {
890
				let maybe_next_key = remote_fetch(prefix, start_key, Some(backend.fork_block));
891
				match maybe_next_key {
892
					Some(key) if !backend.removed_keys.read().contains_key(&key) => Some(key),
893
					_ => None,
894
				}
895
			} else {
896
				next_storage_key
897
			}
898
		};
899

            
900
		log::trace!(
901
			target: super::LAZY_LOADING_LOG_TARGET,
902
			"next_pair: (prefix: {:?}, start_at: {:?}, next_key: {:?})",
903
			self.args.prefix.clone().map(|key| hex::encode(key)),
904
			self.args.start_at.clone().map(|key| hex::encode(key)),
905
			maybe_next_key.clone().map(|key| hex::encode(key))
906
		);
907

            
908
		let maybe_value = maybe_next_key
909
			.clone()
910
			.and_then(|key| (*backend).storage(key.as_slice()).ok())
911
			.flatten();
912

            
913
		if let Some(next_key) = maybe_next_key {
914
			if self
915
				.args
916
				.prefix
917
				.clone()
918
				.map(|filter_key| next_key.starts_with(&filter_key))
919
				.unwrap_or(false)
920
			{
921
				self.args.start_at = Some(next_key.clone());
922

            
923
				match maybe_value {
924
					Some(value) => Some(Ok((next_key, value))),
925
					_ => None,
926
				}
927
			} else {
928
				self.complete = true;
929
				None
930
			}
931
		} else {
932
			self.complete = true;
933
			None
934
		}
935
	}
936

            
937
	fn was_complete(&self) -> bool {
938
		self.complete
939
	}
940
}
941

            
942
#[derive(Debug, Clone)]
943
pub struct ForkedLazyBackend<Block: BlockT> {
944
	rpc_client: Arc<RPC>,
945
	block_hash: Option<Block::Hash>,
946
	fork_block: Block::Hash,
947
	pub(crate) db: Arc<ReadWriteLock<sp_state_machine::InMemoryBackend<HashingFor<Block>>>>,
948
	pub(crate) removed_keys: Arc<ReadWriteLock<HashMap<Vec<u8>, ()>>>,
949
	before_fork: bool,
950
}
951

            
952
impl<Block: BlockT> ForkedLazyBackend<Block> {
953
	fn update_storage(&self, key: &[u8], value: &Option<Vec<u8>>) {
954
		if let Some(ref val) = value {
955
			let mut entries: HashMap<Option<ChildInfo>, StorageCollection> = Default::default();
956
			entries.insert(None, vec![(key.to_vec(), Some(val.clone()))]);
957

            
958
			self.db.write().insert(entries, StateVersion::V1);
959
		}
960
	}
961
}
962

            
963
impl<Block: BlockT + DeserializeOwned> sp_state_machine::Backend<HashingFor<Block>>
964
	for ForkedLazyBackend<Block>
965
{
966
	type Error = <DbState<Block> as sp_state_machine::Backend<HashingFor<Block>>>::Error;
967
	type TrieBackendStorage = PrefixedMemoryDB<HashingFor<Block>>;
968
	type RawIter = RawIter<Block>;
969

            
970
	fn storage(&self, key: &[u8]) -> Result<Option<sp_state_machine::StorageValue>, Self::Error> {
971
		let remote_fetch = |block: Option<Block::Hash>| {
972
			let result = self.rpc_client.storage(StorageKey(key.to_vec()), block);
973

            
974
			match result {
975
				Ok(data) => data.map(|v| v.0),
976
				Err(err) => {
977
					log::debug!(
978
						target: super::LAZY_LOADING_LOG_TARGET,
979
						"Failed to fetch storage from live network: {:?}",
980
						err
981
					);
982
					None
983
				}
984
			}
985
		};
986

            
987
		if self.before_fork {
988
			return Ok(remote_fetch(self.block_hash));
989
		}
990

            
991
		let readable_db = self.db.read();
992
		let maybe_storage = readable_db.storage(key);
993
		let value = match maybe_storage {
994
			Ok(Some(data)) => Some(data),
995
			_ if !self.removed_keys.read().contains_key(key) => {
996
				let result = remote_fetch(Some(self.fork_block));
997

            
998
				// Cache state
999
				drop(readable_db);
				self.update_storage(key, &result);
				result
			}
			_ => None,
		};
		Ok(value)
	}
	fn storage_hash(
		&self,
		key: &[u8],
	) -> Result<Option<<HashingFor<Block> as sp_core::Hasher>::Out>, Self::Error> {
		let remote_fetch = |block: Option<Block::Hash>| {
			let result = self
				.rpc_client
				.storage_hash(StorageKey(key.to_vec()), block);
			match result {
				Ok(hash) => Ok(hash),
				Err(err) => Err(format!("Failed to fetch storage hash from RPC: {:?}", err).into()),
			}
		};
		if self.before_fork {
			return remote_fetch(self.block_hash);
		}
		let storage_hash = self.db.read().storage_hash(key);
		match storage_hash {
			Ok(Some(hash)) => Ok(Some(hash)),
			_ if !self.removed_keys.read().contains_key(key) => remote_fetch(Some(self.fork_block)),
			_ => Ok(None),
		}
	}
	fn closest_merkle_value(
		&self,
		_key: &[u8],
	) -> Result<
		Option<sp_trie::MerkleValue<<HashingFor<Block> as sp_core::Hasher>::Out>>,
		Self::Error,
	> {
		unimplemented!("closest_merkle_value: unsupported feature for lazy loading")
	}
	fn child_closest_merkle_value(
		&self,
		_child_info: &sp_storage::ChildInfo,
		_key: &[u8],
	) -> Result<
		Option<sp_trie::MerkleValue<<HashingFor<Block> as sp_core::Hasher>::Out>>,
		Self::Error,
	> {
		unimplemented!("child_closest_merkle_value: unsupported feature for lazy loading")
	}
	fn child_storage(
		&self,
		_child_info: &sp_storage::ChildInfo,
		_key: &[u8],
	) -> Result<Option<sp_state_machine::StorageValue>, Self::Error> {
		unimplemented!("child_storage: unsupported feature for lazy loading");
	}
	fn child_storage_hash(
		&self,
		_child_info: &sp_storage::ChildInfo,
		_key: &[u8],
	) -> Result<Option<<HashingFor<Block> as sp_core::Hasher>::Out>, Self::Error> {
		unimplemented!("child_storage_hash: unsupported feature for lazy loading");
	}
	fn next_storage_key(
		&self,
		key: &[u8],
	) -> Result<Option<sp_state_machine::StorageKey>, Self::Error> {
		let remote_fetch = |block: Option<Block::Hash>| {
			let start_key = Some(StorageKey(key.to_vec()));
			let result = self
				.rpc_client
				.storage_keys_paged(start_key.clone(), 2, None, block);
			match result {
				Ok(keys) => keys.last().cloned(),
				Err(err) => {
					log::trace!(
						target: super::LAZY_LOADING_LOG_TARGET,
						"Failed to fetch `next storage key` from RPC: {:?}",
						err
					);
					None
				}
			}
		};
		let maybe_next_key = if self.before_fork {
			remote_fetch(self.block_hash)
		} else {
			let next_storage_key = self.db.read().next_storage_key(key);
			match next_storage_key {
				Ok(Some(key)) => Some(key),
				_ if !self.removed_keys.read().contains_key(key) => {
					remote_fetch(Some(self.fork_block))
				}
				_ => None,
			}
		};
		log::trace!(
			target: super::LAZY_LOADING_LOG_TARGET,
			"next_storage_key: (key: {:?}, next_key: {:?})",
			hex::encode(key),
			maybe_next_key.clone().map(|key| hex::encode(key))
		);
		Ok(maybe_next_key)
	}
	fn next_child_storage_key(
		&self,
		_child_info: &sp_storage::ChildInfo,
		_key: &[u8],
	) -> Result<Option<sp_state_machine::StorageKey>, Self::Error> {
		unimplemented!("next_child_storage_key: unsupported feature for lazy loading");
	}
	fn storage_root<'a>(
		&self,
		delta: impl Iterator<Item = (&'a [u8], Option<&'a [u8]>)>,
		state_version: StateVersion,
	) -> (
		<HashingFor<Block> as sp_core::Hasher>::Out,
		BackendTransaction<HashingFor<Block>>,
	)
	where
		<HashingFor<Block> as sp_core::Hasher>::Out: Ord,
	{
		self.db.read().storage_root(delta, state_version)
	}
	fn child_storage_root<'a>(
		&self,
		_child_info: &sp_storage::ChildInfo,
		_delta: impl Iterator<Item = (&'a [u8], Option<&'a [u8]>)>,
		_state_version: StateVersion,
	) -> (
		<HashingFor<Block> as sp_core::Hasher>::Out,
		bool,
		BackendTransaction<HashingFor<Block>>,
	)
	where
		<HashingFor<Block> as sp_core::Hasher>::Out: Ord,
	{
		unimplemented!("child_storage_root: unsupported in lazy loading")
	}
	fn raw_iter(&self, args: sp_state_machine::IterArgs) -> Result<Self::RawIter, Self::Error> {
		let mut clone: RawIterArgs = Default::default();
		clone.start_at_exclusive = args.start_at_exclusive.clone();
		clone.prefix = args.prefix.map(|v| v.to_vec());
		clone.start_at = args.start_at.map(|v| v.to_vec());
		Ok(RawIter::<Block> {
			args: clone,
			complete: false,
			_phantom: Default::default(),
		})
	}
	fn register_overlay_stats(&self, stats: &sp_state_machine::StateMachineStats) {
		self.db.read().register_overlay_stats(stats)
	}
	fn usage_info(&self) -> sp_state_machine::UsageInfo {
		self.db.read().usage_info()
	}
}
impl<B: BlockT> sp_state_machine::backend::AsTrieBackend<HashingFor<B>> for ForkedLazyBackend<B> {
	type TrieBackendStorage = PrefixedMemoryDB<HashingFor<B>>;
	fn as_trie_backend(
		&self,
	) -> &sp_state_machine::TrieBackend<Self::TrieBackendStorage, HashingFor<B>> {
		unimplemented!("`as_trie_backend` is not supported in lazy loading mode.")
	}
}
/// Lazy loading (In-memory) backend. Keeps all states and blocks in memory.
pub struct Backend<Block: BlockT> {
	pub(crate) rpc_client: Arc<RPC>,
	states: ReadWriteLock<HashMap<Block::Hash, ForkedLazyBackend<Block>>>,
	pub(crate) blockchain: Blockchain<Block>,
	import_lock: parking_lot::RwLock<()>,
	pinned_blocks: ReadWriteLock<HashMap<Block::Hash, i64>>,
	pub(crate) fork_checkpoint: Block::Header,
}
impl<Block: BlockT + DeserializeOwned> Backend<Block> {
	fn new(rpc_client: Arc<RPC>, fork_checkpoint: Block::Header) -> Self {
		Backend {
			rpc_client: rpc_client.clone(),
			states: Default::default(),
			blockchain: Blockchain::new(rpc_client),
			import_lock: Default::default(),
			pinned_blocks: Default::default(),
			fork_checkpoint,
		}
	}
}
impl<Block: BlockT + DeserializeOwned> backend::AuxStore for Backend<Block> {
	fn insert_aux<
		'a,
		'b: 'a,
		'c: 'a,
		I: IntoIterator<Item = &'a (&'c [u8], &'c [u8])>,
		D: IntoIterator<Item = &'a &'b [u8]>,
	>(
		&self,
		_insert: I,
		_delete: D,
	) -> sp_blockchain::Result<()> {
		unimplemented!("`insert_aux` is not supported in lazy loading mode.")
	}
	fn get_aux(&self, _key: &[u8]) -> sp_blockchain::Result<Option<Vec<u8>>> {
		unimplemented!("`get_aux` is not supported in lazy loading mode.")
	}
}
impl<Block: BlockT + DeserializeOwned> backend::Backend<Block> for Backend<Block> {
	type BlockImportOperation = BlockImportOperation<Block>;
	type Blockchain = Blockchain<Block>;
	type State = ForkedLazyBackend<Block>;
	type OffchainStorage = InMemOffchainStorage;
	fn begin_operation(&self) -> sp_blockchain::Result<Self::BlockImportOperation> {
		let old_state = self.state_at(Default::default())?;
		Ok(BlockImportOperation {
			pending_block: None,
			old_state,
			new_state: None,
			aux: Default::default(),
			storage_updates: Default::default(),
			finalized_blocks: Default::default(),
			set_head: None,
			before_fork: false,
		})
	}
	fn begin_state_operation(
		&self,
		operation: &mut Self::BlockImportOperation,
		block: Block::Hash,
	) -> sp_blockchain::Result<()> {
		operation.old_state = self.state_at(block)?;
		Ok(())
	}
	fn commit_operation(&self, operation: Self::BlockImportOperation) -> sp_blockchain::Result<()> {
		for (block, justification) in operation.finalized_blocks {
			self.blockchain.finalize_header(block, justification)?;
		}
		if let Some(pending_block) = operation.pending_block {
			let old_state = &operation.old_state;
			let (header, body, justification) = pending_block.block.into_inner();
			let hash = header.hash();
			let new_removed_keys = old_state.removed_keys.clone();
			for (key, value) in operation.storage_updates.clone() {
				if value.is_some() {
					new_removed_keys.write().remove(&key.clone());
				} else {
					new_removed_keys.write().insert(key.clone(), ());
				}
			}
			let new_db = old_state.db.clone();
			new_db.write().insert(
				vec![(None::<ChildInfo>, operation.storage_updates)],
				StateVersion::V1,
			);
			let new_state = ForkedLazyBackend {
				rpc_client: self.rpc_client.clone(),
				block_hash: Some(hash.clone()),
				fork_block: self.fork_checkpoint.hash(),
				db: new_db,
				removed_keys: new_removed_keys,
				before_fork: operation.before_fork,
			};
			self.states.write().insert(hash, new_state);
			self.blockchain
				.insert(hash, header, justification, body, pending_block.state)?;
		}
		if !operation.aux.is_empty() {
			self.blockchain.write_aux(operation.aux);
		}
		if let Some(set_head) = operation.set_head {
			self.blockchain.set_head(set_head)?;
		}
		Ok(())
	}
	fn finalize_block(
		&self,
		hash: Block::Hash,
		justification: Option<Justification>,
	) -> sp_blockchain::Result<()> {
		self.blockchain.finalize_header(hash, justification)
	}
	fn append_justification(
		&self,
		hash: Block::Hash,
		justification: Justification,
	) -> sp_blockchain::Result<()> {
		self.blockchain.append_justification(hash, justification)
	}
	fn blockchain(&self) -> &Self::Blockchain {
		&self.blockchain
	}
	fn usage_info(&self) -> Option<UsageInfo> {
		None
	}
	fn offchain_storage(&self) -> Option<Self::OffchainStorage> {
		None
	}
	fn state_at(&self, hash: Block::Hash) -> sp_blockchain::Result<Self::State> {
		if hash == Default::default() {
			return Ok(ForkedLazyBackend::<Block> {
				rpc_client: self.rpc_client.clone(),
				block_hash: Some(hash),
				fork_block: self.fork_checkpoint.hash(),
				db: Default::default(),
				removed_keys: Default::default(),
				before_fork: true,
			});
		}
		let (backend, should_write) = self
			.states
			.read()
			.get(&hash)
			.cloned()
			.map(|state| Ok((state, false)))
			.unwrap_or_else(|| {
				self.rpc_client
					.header::<Block>(Some(hash))
					.ok()
					.flatten()
					.ok_or(sp_blockchain::Error::UnknownBlock(
						format!("Failed to fetch block header: {:?}", hash).into(),
					))
					.map(|header| {
						let checkpoint = self.fork_checkpoint.clone();
						let state = if header.number().gt(checkpoint.number()) {
							let parent = self.state_at(*header.parent_hash()).ok();
							ForkedLazyBackend::<Block> {
								rpc_client: self.rpc_client.clone(),
								block_hash: Some(hash),
								fork_block: checkpoint.hash(),
								db: parent.clone().map_or(Default::default(), |p| p.db),
								removed_keys: parent.map_or(Default::default(), |p| p.removed_keys),
								before_fork: false,
							}
						} else {
							ForkedLazyBackend::<Block> {
								rpc_client: self.rpc_client.clone(),
								block_hash: Some(hash),
								fork_block: checkpoint.hash(),
								db: Default::default(),
								removed_keys: Default::default(),
								before_fork: true,
							}
						};
						(state, true)
					})
			})?;
		if should_write {
			self.states.write().insert(hash, backend.clone());
		}
		Ok(backend)
	}
	fn revert(
		&self,
		_n: NumberFor<Block>,
		_revert_finalized: bool,
	) -> sp_blockchain::Result<(NumberFor<Block>, HashSet<Block::Hash>)> {
		Ok((Zero::zero(), HashSet::new()))
	}
	fn remove_leaf_block(&self, _hash: Block::Hash) -> sp_blockchain::Result<()> {
		Ok(())
	}
	fn get_import_lock(&self) -> &parking_lot::RwLock<()> {
		&self.import_lock
	}
	fn requires_full_sync(&self) -> bool {
		false
	}
	fn pin_block(&self, hash: <Block as BlockT>::Hash) -> blockchain::Result<()> {
		let mut blocks = self.pinned_blocks.write();
		*blocks.entry(hash).or_default() += 1;
		Ok(())
	}
	fn unpin_block(&self, hash: <Block as BlockT>::Hash) {
		let mut blocks = self.pinned_blocks.write();
		blocks
			.entry(hash)
			.and_modify(|counter| *counter -= 1)
			.or_insert(-1);
	}
}
impl<Block: BlockT + DeserializeOwned> backend::LocalBackend<Block> for Backend<Block> {}
/// Check that genesis storage is valid.
pub fn check_genesis_storage(storage: &Storage) -> sp_blockchain::Result<()> {
	if storage
		.top
		.iter()
		.any(|(k, _)| well_known_keys::is_child_storage_key(k))
	{
		return Err(sp_blockchain::Error::InvalidState);
	}
	if storage
		.children_default
		.keys()
		.any(|child_key| !well_known_keys::is_child_storage_key(child_key))
	{
		return Err(sp_blockchain::Error::InvalidState);
	}
	Ok(())
}
#[derive(Debug, Clone)]
pub struct RPC {
	http_client: HttpClient,
	delay_between_requests_ms: u32,
	max_retries_per_request: u32,
	counter: Arc<AtomicU64>,
}
impl RPC {
	pub fn new(
		http_client: HttpClient,
		delay_between_requests_ms: u32,
		max_retries_per_request: u32,
	) -> Self {
		Self {
			http_client,
			delay_between_requests_ms,
			max_retries_per_request,
			counter: Default::default(),
		}
	}
	pub fn system_chain(&self) -> Result<String, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::SystemApi::<H256, BlockNumber>::system_chain(&self.http_client)
		};
		self.block_on(request)
	}
	pub fn system_properties(
		&self,
	) -> Result<sc_chain_spec::Properties, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::SystemApi::<H256, BlockNumber>::system_properties(
				&self.http_client,
			)
		};
		self.block_on(request)
	}
	pub fn system_name(&self) -> Result<String, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::SystemApi::<H256, BlockNumber>::system_name(&self.http_client)
		};
		self.block_on(request)
	}
	pub fn block<Block, Hash: Clone>(
		&self,
		hash: Option<Hash>,
	) -> Result<Option<SignedBlock<Block>>, jsonrpsee::core::ClientError>
	where
		Block: BlockT + DeserializeOwned,
		Hash: 'static + Send + Sync + sp_runtime::Serialize + DeserializeOwned,
	{
		let request = &|| {
			substrate_rpc_client::ChainApi::<
				BlockNumber,
				Hash,
				Block::Header,
				SignedBlock<Block>,
			>::block(&self.http_client, hash.clone())
		};
		self.block_on(request)
	}
	pub fn block_hash<Block: BlockT + DeserializeOwned>(
		&self,
		block_number: Option<<Block::Header as HeaderT>::Number>,
	) -> Result<Option<Block::Hash>, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::ChainApi::<
				<Block::Header as HeaderT>::Number,
				Block::Hash,
				Block::Header,
				SignedBlock<Block>,
			>::block_hash(
				&self.http_client,
				block_number.map(|n| ListOrValue::Value(NumberOrHex::Hex(n.into()))),
			)
		};
		self.block_on(request).map(|ok| match ok {
			ListOrValue::List(v) => v.get(0).map_or(None, |some| *some),
			ListOrValue::Value(v) => v,
		})
	}
	pub fn header<Block: BlockT + DeserializeOwned>(
		&self,
		hash: Option<Block::Hash>,
	) -> Result<Option<Block::Header>, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::ChainApi::<
				BlockNumber,
				Block::Hash,
				Block::Header,
				SignedBlock<Block>,
			>::header(&self.http_client, hash)
		};
		self.block_on(request)
	}
	pub fn storage_hash<
		Hash: 'static + Clone + Sync + Send + DeserializeOwned + sp_runtime::Serialize,
	>(
		&self,
		key: StorageKey,
		at: Option<Hash>,
	) -> Result<Option<Hash>, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::StateApi::<Hash>::storage_hash(
				&self.http_client,
				key.clone(),
				at.clone(),
			)
		};
		self.block_on(request)
	}
	pub fn storage<
		Hash: 'static + Clone + Sync + Send + DeserializeOwned + sp_runtime::Serialize + core::fmt::Debug,
	>(
		&self,
		key: StorageKey,
		at: Option<Hash>,
	) -> Result<Option<StorageData>, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::StateApi::<Hash>::storage(
				&self.http_client,
				key.clone(),
				at.clone(),
			)
		};
		self.block_on(request)
	}
	pub fn storage_keys_paged<
		Hash: 'static + Clone + Sync + Send + DeserializeOwned + sp_runtime::Serialize,
	>(
		&self,
		key: Option<StorageKey>,
		count: u32,
		start_key: Option<StorageKey>,
		at: Option<Hash>,
	) -> Result<Vec<sp_state_machine::StorageKey>, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::StateApi::<Hash>::storage_keys_paged(
				&self.http_client,
				key.clone(),
				count.clone(),
				start_key.clone(),
				at.clone(),
			)
		};
		let result = self.block_on(request);
		match result {
			Ok(result) => Ok(result.iter().map(|item| item.0.clone()).collect()),
			Err(err) => Err(err),
		}
	}
	pub fn query_storage_at<
		Hash: 'static + Clone + Sync + Send + DeserializeOwned + sp_runtime::Serialize,
	>(
		&self,
		keys: Vec<StorageKey>,
		from_block: Option<Hash>,
	) -> Result<Vec<(StorageKey, Option<StorageData>)>, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::StateApi::<Hash>::query_storage_at(
				&self.http_client,
				keys.clone(),
				from_block.clone(),
			)
		};
		let result = self.block_on(request);
		match result {
			Ok(result) => Ok(result
				.iter()
				.flat_map(|item| item.changes.clone())
				.collect()),
			Err(err) => Err(err),
		}
	}
	fn block_on<F, T, E>(&self, f: &dyn Fn() -> F) -> Result<T, E>
	where
		F: Future<Output = Result<T, E>>,
	{
		use tokio::runtime::Handle;
		let id = self.counter.fetch_add(1, Ordering::SeqCst);
		let start = std::time::Instant::now();
		tokio::task::block_in_place(move || {
			Handle::current().block_on(async move {
				let delay_between_requests =
					Duration::from_millis(self.delay_between_requests_ms.into());
				let start_req = std::time::Instant::now();
				log::debug!(
					target: super::LAZY_LOADING_LOG_TARGET,
					"Sending request: {}",
					id
				);
				// Explicit request delay, to avoid getting 429 errors
				let _ = tokio::time::sleep(delay_between_requests).await;
				// Retry request in case of failure
				// The maximum number of retries is specified by `self.max_retries_per_request`
				let retry_strategy = FixedInterval::new(delay_between_requests)
					.take(self.max_retries_per_request as usize);
				let result = Retry::spawn(retry_strategy, f).await;
				log::debug!(
					target: super::LAZY_LOADING_LOG_TARGET,
					"Completed request (id: {}, successful: {}, elapsed_time: {:?}, query_time: {:?})",
					id,
					result.is_ok(),
					start.elapsed(),
					start_req.elapsed()
				);
				result
			})
		})
	}
}
/// Create an instance of a lazy loading memory backend.
pub fn new_lazy_loading_backend<Block>(
	config: &mut Configuration,
	lazy_loading_config: &LazyLoadingConfig,
) -> Result<Arc<Backend<Block>>, Error>
where
	Block: BlockT + DeserializeOwned,
	Block::Hash: From<H256>,
{
	let http_client = jsonrpsee::http_client::HttpClientBuilder::default()
		.max_request_size(u32::MAX)
		.max_response_size(u32::MAX)
		.request_timeout(Duration::from_secs(10))
		.build(lazy_loading_config.state_rpc.clone())
		.map_err(|e| {
			sp_blockchain::Error::Backend(
				format!("failed to build http client: {:?}", e).to_string(),
			)
		})?;
	let rpc = RPC::new(
		http_client,
		lazy_loading_config.delay_between_requests,
		lazy_loading_config.max_retries_per_request,
	);
	let block_hash = lazy_loading_config
		.from_block
		.map(|block| Into::<Block::Hash>::into(block));
	let checkpoint: Block = rpc
		.block::<Block, _>(block_hash)
		.ok()
		.flatten()
		.expect("Fetching fork checkpoint")
		.block;
	let backend = Arc::new(Backend::new(Arc::new(rpc), checkpoint.header().clone()));
	let chain_name = backend
		.rpc_client
		.system_chain()
		.expect("Should fetch chain id");
	let chain_properties = backend
		.rpc_client
		.system_properties()
		.expect("Should fetch chain properties");
	let spec_builder = chain_spec::test_spec::lazy_loading_spec_builder()
		.with_name(chain_name.as_str())
		.with_properties(chain_properties);
	config.chain_spec = Box::new(spec_builder.build());
	let base_overrides =
		state_overrides::base_state_overrides(lazy_loading_config.runtime_override.clone());
	let custom_overrides = if let Some(path) = lazy_loading_config.state_overrides_path.clone() {
		state_overrides::read(path)?
	} else {
		Default::default()
	};
	let state_overrides: Vec<(Vec<u8>, Vec<u8>)> = [base_overrides, custom_overrides]
		.concat()
		.iter()
		.map(|entry| match entry {
			StateEntry::Concrete(v) => {
				let key = [
					&twox_128(v.pallet.as_bytes()),
					&twox_128(v.storage.as_bytes()),
					v.key.clone().unwrap_or(Vec::new()).as_slice(),
				]
				.concat();
				(key, v.value.clone())
			}
			StateEntry::Raw(raw) => (raw.key.clone(), raw.value.clone()),
		})
		.collect();
	// Produce first block after the fork
	let _ = helpers::produce_first_block(backend.clone(), checkpoint, state_overrides)?;
	Ok(backend)
}