1
// Copyright 2024 Moonbeam foundation
2
// This file is part of Moonbeam.
3

            
4
// Moonbeam is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Moonbeam is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Moonbeam.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
use crate::chain_spec::Extensions;
18
use crate::{
19
	lazy_loading, open_frontier_backend, rpc, set_prometheus_registry, ClientCustomizations,
20
	FrontierBlockImport, HostFunctions, MockTimestampInherentDataProvider, MoonbeamBlockImport,
21
	PartialComponentsResult, PendingConsensusDataProvider, RuntimeApiCollection,
22
	RELAY_CHAIN_SLOT_DURATION_MILLIS, SOFT_DEADLINE_PERCENT, TIMESTAMP,
23
};
24
use cumulus_client_parachain_inherent::{MockValidationDataInherentDataProvider, MockXcmConfig};
25
use cumulus_client_service::ParachainTracingExecuteBlock;
26
use cumulus_primitives_core::{relay_chain, BlockT, CollectCollationInfo, ParaId};
27
use fc_rpc::StorageOverrideHandler;
28
use fc_rpc_core::types::{FeeHistoryCache, FilterPool};
29
use frontier_backend::LazyLoadingFrontierBackend;
30
use futures::{FutureExt, StreamExt};
31
use moonbeam_cli_opt::{EthApi as EthApiCmd, LazyLoadingConfig, RpcConfig};
32
use moonbeam_core_primitives::{Block, Hash};
33
use nimbus_consensus::NimbusManualSealConsensusDataProvider;
34
use nimbus_primitives::NimbusId;
35
use parity_scale_codec::Encode;
36
use polkadot_primitives::{AbridgedHostConfiguration, AsyncBackingParams, Slot, UpgradeGoAhead};
37
use sc_chain_spec::{get_extension, BuildGenesisBlock, ChainType, GenesisBlockBuilder};
38
use sc_client_api::{Backend, BadBlocks, ExecutorProvider, ForkBlocks};
39
use sc_executor::{HeapAllocStrategy, RuntimeVersionOf, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
40
use sc_network::config::FullNetworkConfiguration;
41
use sc_network::NetworkBackend;
42
use sc_network_common::sync::SyncMode;
43
use sc_service::{
44
	error::Error as ServiceError, ClientConfig, Configuration, Error, KeystoreContainer,
45
	LocalCallExecutor, PartialComponents, TaskManager,
46
};
47
use sc_telemetry::{TelemetryHandle, TelemetryWorker};
48
use sc_transaction_pool_api::{OffchainTransactionPoolFactory, TransactionPool};
49
use sp_api::{ConstructRuntimeApi, ProvideRuntimeApi};
50
use sp_blockchain::HeaderBackend;
51
use sp_core::traits::CodeExecutor;
52
use sp_core::H256;
53
use sp_runtime::traits::NumberFor;
54
use std::collections::BTreeMap;
55
use std::str::FromStr;
56
use std::sync::atomic::Ordering;
57
use std::sync::{Arc, Mutex};
58
use std::time::Duration;
59

            
60
pub mod call_executor;
61
mod client;
62
pub mod frontier_backend;
63
mod helpers;
64
mod lock;
65
mod manual_sealing;
66
mod rpc_client;
67
mod state_overrides;
68
pub mod substrate_backend;
69

            
70
pub const LAZY_LOADING_LOG_TARGET: &'static str = "lazy-loading";
71

            
72
/// Lazy loading client type.
73
pub type TLazyLoadingClient<TBl, TRtApi, TExec> = sc_service::client::Client<
74
	TLazyLoadingBackend<TBl>,
75
	TLazyLoadingCallExecutor<TBl, TExec>,
76
	TBl,
77
	TRtApi,
78
>;
79

            
80
/// Lazy loading client backend type.
81
pub type TLazyLoadingBackend<TBl> = substrate_backend::Backend<TBl>;
82

            
83
/// Lazy loading client call executor type.
84
pub type TLazyLoadingCallExecutor<TBl, TExec> = call_executor::LazyLoadingCallExecutor<
85
	TBl,
86
	LocalCallExecutor<TBl, TLazyLoadingBackend<TBl>, TExec>,
87
>;
88

            
89
/// Lazy loading parts type.
90
pub type TLazyLoadingParts<TBl, TRtApi, TExec> = (
91
	TLazyLoadingClient<TBl, TRtApi, TExec>,
92
	Arc<TLazyLoadingBackend<TBl>>,
93
	KeystoreContainer,
94
	TaskManager,
95
);
96

            
97
type LazyLoadingClient<RuntimeApi> =
98
	TLazyLoadingClient<Block, RuntimeApi, WasmExecutor<HostFunctions>>;
99
type LazyLoadingBackend = TLazyLoadingBackend<Block>;
100

            
101
/// Create the initial parts of a lazy loading node.
102
pub fn new_lazy_loading_parts<TBl, TRtApi, TExec>(
103
	config: &mut Configuration,
104
	lazy_loading_config: &LazyLoadingConfig,
105
	telemetry: Option<TelemetryHandle>,
106
	executor: TExec,
107
) -> Result<TLazyLoadingParts<TBl, TRtApi, TExec>, Error>
108
where
109
	TBl: BlockT + sp_runtime::DeserializeOwned,
110
	TBl::Hash: From<H256>,
111
	TExec: CodeExecutor + RuntimeVersionOf + Clone,
112
{
113
	let backend = substrate_backend::new_backend(config, &lazy_loading_config)?;
114

            
115
	let genesis_block_builder = GenesisBlockBuilder::new(
116
		config.chain_spec.as_storage_builder(),
117
		!config.no_genesis(),
118
		backend.clone(),
119
		executor.clone(),
120
	)?;
121

            
122
	new_lazy_loading_parts_with_genesis_builder(
123
		config,
124
		telemetry,
125
		executor,
126
		backend,
127
		genesis_block_builder,
128
	)
129
}
130

            
131
/// Create the initial parts of a lazy loading node.
132
pub fn new_lazy_loading_parts_with_genesis_builder<TBl, TRtApi, TExec, TBuildGenesisBlock>(
133
	config: &Configuration,
134
	telemetry: Option<TelemetryHandle>,
135
	executor: TExec,
136
	backend: Arc<TLazyLoadingBackend<TBl>>,
137
	genesis_block_builder: TBuildGenesisBlock,
138
) -> Result<TLazyLoadingParts<TBl, TRtApi, TExec>, Error>
139
where
140
	TBl: BlockT + sp_runtime::DeserializeOwned,
141
	TBl::Hash: From<H256>,
142
	TExec: CodeExecutor + RuntimeVersionOf + Clone,
143
	TBuildGenesisBlock:
144
		BuildGenesisBlock<
145
			TBl,
146
			BlockImportOperation = <TLazyLoadingBackend<TBl> as sc_client_api::backend::Backend<
147
				TBl,
148
			>>::BlockImportOperation,
149
		>,
150
{
151
	let keystore_container = KeystoreContainer::new(&config.keystore)?;
152

            
153
	let task_manager = {
154
		let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
155
		TaskManager::new(config.tokio_handle.clone(), registry)?
156
	};
157

            
158
	let chain_spec = &config.chain_spec;
159
	let fork_blocks = get_extension::<ForkBlocks<TBl>>(chain_spec.extensions())
160
		.cloned()
161
		.unwrap_or_default();
162

            
163
	let bad_blocks = get_extension::<BadBlocks<TBl>>(chain_spec.extensions())
164
		.cloned()
165
		.unwrap_or_default();
166

            
167
	let client = {
168
		let extensions = sc_client_api::execution_extensions::ExecutionExtensions::new(
169
			None,
170
			Arc::new(executor.clone()),
171
		);
172

            
173
		let wasm_runtime_substitutes = config
174
			.chain_spec
175
			.code_substitutes()
176
			.into_iter()
177
			.map(|(n, c)| {
178
				let number = NumberFor::<TBl>::from_str(&n).map_err(|_| {
179
					Error::Application(Box::from(format!(
180
						"Failed to parse `{}` as block number for code substitutes. \
181
						 In an old version the key for code substitute was a block hash. \
182
						 Please update the chain spec to a version that is compatible with your node.",
183
						n
184
					)))
185
				})?;
186
				Ok((number, c))
187
			})
188
			.collect::<Result<std::collections::HashMap<_, _>, Error>>()?;
189

            
190
		let client = client::new_client(
191
			backend.clone(),
192
			executor,
193
			genesis_block_builder,
194
			fork_blocks,
195
			bad_blocks,
196
			extensions,
197
			Box::new(task_manager.spawn_handle()),
198
			config
199
				.prometheus_config
200
				.as_ref()
201
				.map(|config| config.registry.clone()),
202
			telemetry,
203
			ClientConfig {
204
				offchain_worker_enabled: config.offchain_worker.enabled,
205
				offchain_indexing_api: config.offchain_worker.indexing_enabled,
206
				wasmtime_precompiled: config.executor.wasmtime_precompiled.clone(),
207
				wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
208
				no_genesis: matches!(
209
					config.network.sync_mode,
210
					SyncMode::LightState { .. } | SyncMode::Warp { .. }
211
				),
212
				wasm_runtime_substitutes,
213
				enable_import_proof_recording: true,
214
			},
215
		)?;
216

            
217
		client
218
	};
219

            
220
	Ok((client, backend, keystore_container, task_manager))
221
}
222

            
223
/// Builds the PartialComponents for a lazy loading node.
224
#[allow(clippy::type_complexity)]
225
pub fn new_lazy_loading_partial<RuntimeApi, Customizations>(
226
	config: &mut Configuration,
227
	rpc_config: &RpcConfig,
228
	lazy_loading_config: &LazyLoadingConfig,
229
) -> PartialComponentsResult<LazyLoadingClient<RuntimeApi>, LazyLoadingBackend>
230
where
231
	RuntimeApi: ConstructRuntimeApi<Block, LazyLoadingClient<RuntimeApi>> + Send + Sync + 'static,
232
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
233
	Customizations: ClientCustomizations + 'static,
234
{
235
	set_prometheus_registry(config, rpc_config.no_prometheus_prefix)?;
236

            
237
	// Use ethereum style for subscription ids
238
	config.rpc.id_provider = Some(Box::new(fc_rpc::EthereumSubIdProvider));
239

            
240
	let telemetry = config
241
		.telemetry_endpoints
242
		.clone()
243
		.filter(|x| !x.is_empty())
244
		.map(|endpoints| -> Result<_, sc_telemetry::Error> {
245
			let worker = TelemetryWorker::new(16)?;
246
			let telemetry = worker.handle().new_telemetry(endpoints);
247
			Ok((worker, telemetry))
248
		})
249
		.transpose()?;
250

            
251
	let heap_pages = config
252
		.executor
253
		.default_heap_pages
254
		.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
255
			extra_pages: h as _,
256
		});
257
	let mut wasm_builder = WasmExecutor::builder()
258
		.with_execution_method(config.executor.wasm_method)
259
		.with_onchain_heap_alloc_strategy(heap_pages)
260
		.with_offchain_heap_alloc_strategy(heap_pages)
261
		.with_ignore_onchain_heap_pages(true)
262
		.with_max_runtime_instances(config.executor.max_runtime_instances)
263
		.with_runtime_cache_size(config.executor.runtime_cache_size);
264

            
265
	if let Some(ref wasmtime_precompiled_path) = config.executor.wasmtime_precompiled {
266
		wasm_builder = wasm_builder.with_wasmtime_precompiled_path(wasmtime_precompiled_path);
267
	}
268

            
269
	let executor = wasm_builder.build();
270

            
271
	let (client, backend, keystore_container, task_manager) =
272
		new_lazy_loading_parts::<Block, RuntimeApi, _>(
273
			config,
274
			lazy_loading_config,
275
			telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
276
			executor,
277
		)?;
278

            
279
	if let Some(block_number) = Customizations::first_block_number_compatible_with_ed25519_zebra() {
280
		client
281
			.execution_extensions()
282
			.set_extensions_factory(sc_client_api::execution_extensions::ExtensionBeforeBlock::<
283
			Block,
284
			sp_io::UseDalekExt,
285
		>::new(block_number));
286
	}
287

            
288
	let client = Arc::new(client);
289

            
290
	let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
291

            
292
	let telemetry = telemetry.map(|(worker, telemetry)| {
293
		task_manager
294
			.spawn_handle()
295
			.spawn("telemetry", None, worker.run());
296
		telemetry
297
	});
298

            
299
	let maybe_select_chain = Some(sc_consensus::LongestChain::new(backend.clone()));
300

            
301
	let transaction_pool = sc_transaction_pool::Builder::new(
302
		task_manager.spawn_essential_handle(),
303
		client.clone(),
304
		config.role.is_authority().into(),
305
	)
306
	.with_options(config.transaction_pool.clone())
307
	.with_prometheus(config.prometheus_registry())
308
	.build();
309

            
310
	let filter_pool: Option<FilterPool> = Some(Arc::new(Mutex::new(BTreeMap::new())));
311
	let fee_history_cache: FeeHistoryCache = Arc::new(Mutex::new(BTreeMap::new()));
312

            
313
	let frontier_backend = Arc::new(open_frontier_backend(client.clone(), config, rpc_config)?);
314
	let frontier_block_import = FrontierBlockImport::new(client.clone(), client.clone());
315

            
316
	let create_inherent_data_providers = move |_, _| async move { Ok(()) };
317

            
318
	let import_queue = nimbus_consensus::import_queue(
319
		client.clone(),
320
		frontier_block_import.clone(),
321
		create_inherent_data_providers,
322
		&task_manager.spawn_essential_handle(),
323
		config.prometheus_registry(),
324
		false,
325
		false,
326
	)?;
327
	let block_import = MoonbeamBlockImport::Dev(frontier_block_import);
328

            
329
	Ok(PartialComponents {
330
		backend,
331
		client,
332
		import_queue,
333
		keystore_container,
334
		task_manager,
335
		transaction_pool: transaction_pool.into(),
336
		select_chain: maybe_select_chain,
337
		other: (
338
			block_import,
339
			filter_pool,
340
			telemetry,
341
			telemetry_worker_handle,
342
			frontier_backend,
343
			fee_history_cache,
344
		),
345
	})
346
}
347

            
348
/// Builds a new lazy loading service. This service uses manual seal, and mocks
349
/// the parachain inherent.
350
#[sc_tracing::logging::prefix_logs_with("Lazy loading 🌗")]
351
pub async fn new_lazy_loading_service<RuntimeApi, Customizations, Net>(
352
	mut config: Configuration,
353
	_author_id: Option<NimbusId>,
354
	sealing: moonbeam_cli_opt::Sealing,
355
	rpc_config: RpcConfig,
356
	lazy_loading_config: LazyLoadingConfig,
357
	hwbench: Option<sc_sysinfo::HwBench>,
358
) -> Result<TaskManager, ServiceError>
359
where
360
	RuntimeApi: ConstructRuntimeApi<Block, LazyLoadingClient<RuntimeApi>> + Send + Sync + 'static,
361
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
362
	Customizations: ClientCustomizations + 'static,
363
	Net: NetworkBackend<Block, Hash>,
364
{
365
	use async_io::Timer;
366
	use futures::Stream;
367
	use sc_consensus_manual_seal::{EngineCommand, ManualSealParams};
368

            
369
	let sc_service::PartialComponents {
370
		client,
371
		backend,
372
		mut task_manager,
373
		import_queue,
374
		keystore_container,
375
		select_chain: maybe_select_chain,
376
		transaction_pool,
377
		other:
378
			(
379
				block_import_pipeline,
380
				filter_pool,
381
				mut telemetry,
382
				_telemetry_worker_handle,
383
				frontier_backend,
384
				fee_history_cache,
385
			),
386
	} = lazy_loading::new_lazy_loading_partial::<RuntimeApi, Customizations>(
387
		&mut config,
388
		&rpc_config,
389
		&lazy_loading_config,
390
	)?;
391

            
392
	let start_delay = 10;
393
	let lazy_loading_startup_disclaimer = format!(
394
		r#"
395

            
396
		You are now running the Moonbeam client in lazy loading mode, where data is retrieved
397
		from a live RPC node on demand.
398

            
399
		Using remote state from: {rpc}
400
		Forking from block: {fork_block}
401

            
402
		To ensure the client works properly, please note the following:
403

            
404
		    1. *Avoid Throttling*: Ensure that the backing RPC node is not limiting the number of
405
		    requests, as this can prevent the lazy loading client from functioning correctly;
406

            
407
		    2. *Be Patient*: As the client may take approximately 20 times longer than normal to
408
		    retrieve and process the necessary data for the requested operation.
409

            
410

            
411
		The service will start in {start_delay} seconds...
412

            
413
		"#,
414
		rpc = lazy_loading_config.state_rpc,
415
		fork_block = backend.fork_checkpoint.number
416
	);
417

            
418
	log::warn!(
419
		"{}",
420
		ansi_term::Colour::Yellow.paint(lazy_loading_startup_disclaimer)
421
	);
422
	tokio::time::sleep(Duration::from_secs(start_delay)).await;
423

            
424
	let block_import = if let MoonbeamBlockImport::Dev(block_import) = block_import_pipeline {
425
		block_import
426
	} else {
427
		return Err(ServiceError::Other(
428
			"Block import pipeline is not dev".to_string(),
429
		));
430
	};
431

            
432
	let prometheus_registry = config.prometheus_registry().cloned();
433
	let net_config =
434
		FullNetworkConfiguration::<_, _, Net>::new(&config.network, prometheus_registry.clone());
435

            
436
	let metrics = Net::register_notification_metrics(
437
		config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
438
	);
439

            
440
	let (network, system_rpc_tx, tx_handler_controller, sync_service) =
441
		sc_service::build_network(sc_service::BuildNetworkParams {
442
			config: &config,
443
			client: client.clone(),
444
			transaction_pool: transaction_pool.clone(),
445
			spawn_handle: task_manager.spawn_handle(),
446
			import_queue,
447
			block_announce_validator_builder: None,
448
			warp_sync_config: None,
449
			net_config,
450
			block_relay: None,
451
			metrics,
452
		})?;
453

            
454
	if config.offchain_worker.enabled {
455
		task_manager.spawn_handle().spawn(
456
			"offchain-workers-runner",
457
			"offchain-work",
458
			sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
459
				runtime_api_provider: client.clone(),
460
				keystore: Some(keystore_container.keystore()),
461
				offchain_db: backend.offchain_storage(),
462
				transaction_pool: Some(OffchainTransactionPoolFactory::new(
463
					transaction_pool.clone(),
464
				)),
465
				network_provider: Arc::new(network.clone()),
466
				is_validator: config.role.is_authority(),
467
				enable_http_requests: true,
468
				custom_extensions: move |_| vec![],
469
			})?
470
			.run(client.clone(), task_manager.spawn_handle())
471
			.boxed(),
472
		);
473
	}
474

            
475
	let prometheus_registry = config.prometheus_registry().cloned();
476
	let overrides = Arc::new(StorageOverrideHandler::new(client.clone()));
477
	let fee_history_limit = rpc_config.fee_history_limit;
478
	let mut command_sink = None;
479
	let mut dev_rpc_data = None;
480
	let collator = config.role.is_authority();
481

            
482
	let parachain_id: ParaId = helpers::get_parachain_id(backend.rpc_client.clone())
483
		.unwrap_or_else(|| panic!("Could not get parachain identifier for lazy loading mode."))
484
		.into();
485

            
486
	if collator {
487
		let mut env = sc_basic_authorship::ProposerFactory::with_proof_recording(
488
			task_manager.spawn_handle(),
489
			client.clone(),
490
			transaction_pool.clone(),
491
			prometheus_registry.as_ref(),
492
			telemetry.as_ref().map(|x| x.handle()),
493
		);
494
		env.set_soft_deadline(SOFT_DEADLINE_PERCENT);
495

            
496
		let commands_stream: Box<dyn Stream<Item = EngineCommand<H256>> + Send + Sync + Unpin> =
497
			match sealing {
498
				moonbeam_cli_opt::Sealing::Instant => {
499
					Box::new(
500
						// This bit cribbed from the implementation of instant seal.
501
						transaction_pool.import_notification_stream().map(|_| {
502
							EngineCommand::SealNewBlock {
503
								create_empty: false,
504
								finalize: false,
505
								parent_hash: None,
506
								sender: None,
507
							}
508
						}),
509
					)
510
				}
511
				moonbeam_cli_opt::Sealing::Manual => {
512
					let (sink, stream) = futures::channel::mpsc::channel(1000);
513
					// Keep a reference to the other end of the channel. It goes to the RPC.
514
					command_sink = Some(sink);
515
					Box::new(stream)
516
				}
517
				moonbeam_cli_opt::Sealing::Interval(millis) => Box::new(StreamExt::map(
518
					Timer::interval(Duration::from_millis(millis)),
519
					|_| EngineCommand::SealNewBlock {
520
						create_empty: true,
521
						finalize: false,
522
						parent_hash: None,
523
						sender: None,
524
					},
525
				)),
526
			};
527

            
528
		let select_chain = maybe_select_chain.expect(
529
			"`new_lazy_loading_partial` builds a `LongestChainRule` when building dev service.\
530
				We specified the dev service when calling `new_partial`.\
531
				Therefore, a `LongestChainRule` is present. qed.",
532
		);
533

            
534
		// Create channels for mocked XCM messages.
535
		let (downward_xcm_sender, downward_xcm_receiver) = flume::bounded::<Vec<u8>>(100);
536
		let (hrmp_xcm_sender, hrmp_xcm_receiver) = flume::bounded::<(ParaId, Vec<u8>)>(100);
537
		let additional_relay_offset = Arc::new(std::sync::atomic::AtomicU32::new(0));
538
		dev_rpc_data = Some((
539
			downward_xcm_sender,
540
			hrmp_xcm_sender,
541
			additional_relay_offset.clone(),
542
		));
543

            
544
		// Need to clone it and store here to avoid moving of `client`
545
		// variable in closure below.
546
		let client_vrf = client.clone();
547

            
548
		let keystore_clone = keystore_container.keystore().clone();
549
		let maybe_provide_vrf_digest =
550
			move |nimbus_id: NimbusId, parent: Hash| -> Option<sp_runtime::generic::DigestItem> {
551
				moonbeam_vrf::vrf_pre_digest::<Block, LazyLoadingClient<RuntimeApi>>(
552
					&client_vrf,
553
					&keystore_clone,
554
					nimbus_id,
555
					parent,
556
				)
557
			};
558

            
559
		// Need to clone it and store here to avoid moving of `client`
560
		// variable in closure below.
561
		let client_for_cidp = client.clone();
562

            
563
		task_manager.spawn_essential_handle().spawn_blocking(
564
			"authorship_task",
565
			Some("block-authoring"),
566
			manual_sealing::run_manual_seal(ManualSealParams {
567
				block_import,
568
				env,
569
				client: client.clone(),
570
				pool: transaction_pool.clone(),
571
				commands_stream,
572
				select_chain,
573
				consensus_data_provider: Some(Box::new(NimbusManualSealConsensusDataProvider {
574
					keystore: keystore_container.keystore(),
575
					client: client.clone(),
576
					additional_digests_provider: maybe_provide_vrf_digest,
577
					_phantom: Default::default(),
578
				})),
579
				create_inherent_data_providers: move |block: H256, ()| {
580
					let maybe_current_para_block = client_for_cidp.number(block);
581
					let maybe_current_para_head = client_for_cidp.expect_header(block);
582
					let downward_xcm_receiver = downward_xcm_receiver.clone();
583
					let hrmp_xcm_receiver = hrmp_xcm_receiver.clone();
584
					let additional_relay_offset = additional_relay_offset.clone();
585

            
586
					// Need to clone it and store here to avoid moving of `client`
587
					// variable in closure below.
588
					let client_for_xcm = client_for_cidp.clone();
589

            
590
					async move {
591
						MockTimestampInherentDataProvider::advance_timestamp(
592
							RELAY_CHAIN_SLOT_DURATION_MILLIS,
593
						);
594

            
595
						// Get the mocked timestamp
596
						let timestamp = TIMESTAMP.load(Ordering::SeqCst);
597
						// Calculate mocked slot number
598
						let slot = timestamp.saturating_div(RELAY_CHAIN_SLOT_DURATION_MILLIS);
599

            
600
						let current_para_block = maybe_current_para_block?
601
							.ok_or(sp_blockchain::Error::UnknownBlock(block.to_string()))?;
602

            
603
						let current_para_block_head = Some(polkadot_primitives::HeadData(
604
							maybe_current_para_head?.encode(),
605
						));
606

            
607
						let additional_key_values = vec![
608
							(
609
								// TIMESTAMP_NOW was deprecated in runtime 4000, but should
610
								// be kept for backwards compatibility with old runtimes
611
								pallet_timestamp::Now::<moonbeam_runtime::Runtime>::hashed_key()
612
									.to_vec(),
613
								timestamp.encode(),
614
							),
615
							// Override current slot number
616
							(
617
								relay_chain::well_known_keys::CURRENT_SLOT.to_vec(),
618
								Slot::from(slot).encode(),
619
							),
620
							(
621
								relay_chain::well_known_keys::ACTIVE_CONFIG.to_vec(),
622
								AbridgedHostConfiguration {
623
									max_code_size: 3_145_728,
624
									max_head_data_size: 20_480,
625
									max_upward_queue_count: 174_762,
626
									max_upward_queue_size: 1_048_576,
627
									max_upward_message_size: 65_531,
628
									max_upward_message_num_per_candidate: 16,
629
									hrmp_max_message_num_per_candidate: 10,
630
									validation_upgrade_cooldown: 14_400,
631
									validation_upgrade_delay: 600,
632
									async_backing_params: AsyncBackingParams {
633
										max_candidate_depth: 3,
634
										allowed_ancestry_len: 2,
635
									},
636
								}
637
								.encode(),
638
							),
639
						];
640

            
641
						let current_para_head = client_for_xcm
642
							.header(block)
643
							.expect("Header lookup should succeed")
644
							.expect("Header passed in as parent should be present in backend.");
645

            
646
						let should_send_go_ahead = match client_for_xcm
647
							.runtime_api()
648
							.collect_collation_info(block, &current_para_head)
649
						{
650
							Ok(info) => info.new_validation_code.is_some(),
651
							Err(e) => {
652
								log::error!("Failed to collect collation info: {:?}", e);
653
								false
654
							}
655
						};
656

            
657
						let mocked_parachain = MockValidationDataInherentDataProvider {
658
							current_para_block,
659
							para_id: parachain_id,
660
							upgrade_go_ahead: should_send_go_ahead.then(|| {
661
								log::info!(
662
									"Detected pending validation code, sending go-ahead signal."
663
								);
664
								UpgradeGoAhead::GoAhead
665
							}),
666
							current_para_block_head,
667
							relay_offset: additional_relay_offset.load(Ordering::SeqCst),
668
							relay_blocks_per_para_block: 1,
669
							para_blocks_per_relay_epoch: 10,
670
							relay_randomness_config: (),
671
							xcm_config: MockXcmConfig::new(
672
								&*client_for_xcm,
673
								block,
674
								Default::default(),
675
							),
676
							raw_downward_messages: downward_xcm_receiver.drain().collect(),
677
							raw_horizontal_messages: hrmp_xcm_receiver.drain().collect(),
678
							additional_key_values: Some(additional_key_values),
679
						};
680

            
681
						let randomness = session_keys_primitives::InherentDataProvider;
682

            
683
						Ok((
684
							MockTimestampInherentDataProvider,
685
							mocked_parachain,
686
							randomness,
687
						))
688
					}
689
				},
690
			}),
691
		);
692
	}
693

            
694
	// Sinks for pubsub notifications.
695
	// Everytime a new subscription is created, a new mpsc channel is added to the sink pool.
696
	// The MappingSyncWorker sends through the channel on block import and the subscription emits a
697
	// notification to the subscriber on receiving a message through this channel.
698
	// This way we avoid race conditions when using native substrate block import notification
699
	// stream.
700
	let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
701
		fc_mapping_sync::EthereumBlockNotification<Block>,
702
	> = Default::default();
703
	let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
704

            
705
	let ethapi_cmd = rpc_config.ethapi.clone();
706
	let tracing_requesters =
707
		if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
708
			rpc::tracing::spawn_tracing_tasks(
709
				&rpc_config,
710
				prometheus_registry.clone(),
711
				rpc::SpawnTasksParams {
712
					task_manager: &task_manager,
713
					client: client.clone(),
714
					substrate_backend: backend.clone(),
715
					frontier_backend: frontier_backend.clone(),
716
					filter_pool: filter_pool.clone(),
717
					overrides: overrides.clone(),
718
					fee_history_limit,
719
					fee_history_cache: fee_history_cache.clone(),
720
				},
721
			)
722
		} else {
723
			rpc::tracing::RpcRequesters {
724
				debug: None,
725
				trace: None,
726
			}
727
		};
728

            
729
	let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
730
		task_manager.spawn_handle(),
731
		overrides.clone(),
732
		rpc_config.eth_log_block_cache,
733
		rpc_config.eth_statuses_cache,
734
		prometheus_registry,
735
	));
736

            
737
	let rpc_builder = {
738
		let client = client.clone();
739
		let pool = transaction_pool.clone();
740
		let backend = backend.clone();
741
		let network = network.clone();
742
		let sync = sync_service.clone();
743
		let ethapi_cmd = ethapi_cmd.clone();
744
		let max_past_logs = rpc_config.max_past_logs;
745
		let max_block_range = rpc_config.max_block_range;
746
		let overrides = overrides.clone();
747
		let fee_history_cache = fee_history_cache.clone();
748
		let block_data_cache = block_data_cache.clone();
749
		let pubsub_notification_sinks = pubsub_notification_sinks.clone();
750

            
751
		let keystore = keystore_container.keystore();
752
		let command_sink_for_task = command_sink.clone();
753
		move |subscription_task_executor| {
754
			let deps = rpc::FullDeps {
755
				backend: backend.clone(),
756
				client: client.clone(),
757
				command_sink: command_sink_for_task.clone(),
758
				ethapi_cmd: ethapi_cmd.clone(),
759
				filter_pool: filter_pool.clone(),
760
				frontier_backend: Arc::new(LazyLoadingFrontierBackend {
761
					rpc_client: backend.clone().rpc_client.clone(),
762
					frontier_backend: match *frontier_backend {
763
						fc_db::Backend::KeyValue(ref b) => b.clone(),
764
						fc_db::Backend::Sql(ref b) => b.clone(),
765
					},
766
				}),
767
				graph: pool.clone(),
768
				pool: pool.clone(),
769
				is_authority: collator,
770
				max_past_logs,
771
				max_block_range,
772
				fee_history_limit,
773
				fee_history_cache: fee_history_cache.clone(),
774
				network: network.clone(),
775
				sync: sync.clone(),
776
				dev_rpc_data: dev_rpc_data.clone(),
777
				overrides: overrides.clone(),
778
				block_data_cache: block_data_cache.clone(),
779
				forced_parent_hashes: None,
780
			};
781

            
782
			let pending_consensus_data_provider = Box::new(PendingConsensusDataProvider::new(
783
				client.clone(),
784
				keystore.clone(),
785
			));
786
			if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
787
				rpc::create_full(
788
					deps,
789
					subscription_task_executor,
790
					Some(crate::rpc::TracingConfig {
791
						tracing_requesters: tracing_requesters.clone(),
792
						trace_filter_max_count: rpc_config.ethapi_trace_max_count,
793
						max_block_range: rpc_config.max_block_range,
794
					}),
795
					pubsub_notification_sinks.clone(),
796
					pending_consensus_data_provider,
797
					parachain_id,
798
				)
799
				.map_err(Into::into)
800
			} else {
801
				rpc::create_full(
802
					deps,
803
					subscription_task_executor,
804
					None,
805
					pubsub_notification_sinks.clone(),
806
					pending_consensus_data_provider,
807
					parachain_id,
808
				)
809
				.map_err(Into::into)
810
			}
811
		}
812
	};
813

            
814
	let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
815
		network,
816
		client: client.clone(),
817
		keystore: keystore_container.keystore(),
818
		task_manager: &mut task_manager,
819
		transaction_pool,
820
		rpc_builder: Box::new(rpc_builder),
821
		backend,
822
		system_rpc_tx,
823
		sync_service: sync_service.clone(),
824
		config,
825
		tx_handler_controller,
826
		telemetry: None,
827
		tracing_execute_block: Some(Arc::new(ParachainTracingExecuteBlock::new(client.clone()))),
828
	})?;
829

            
830
	if let Some(hwbench) = hwbench {
831
		sc_sysinfo::print_hwbench(&hwbench);
832

            
833
		if let Some(ref mut telemetry) = telemetry {
834
			let telemetry_handle = telemetry.handle();
835
			task_manager.spawn_handle().spawn(
836
				"telemetry_hwbench",
837
				None,
838
				sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
839
			);
840
		}
841
	}
842

            
843
	log::info!("Service Ready");
844

            
845
	Ok(task_manager)
846
}
847

            
848
pub fn spec_builder() -> sc_chain_spec::ChainSpecBuilder<Extensions, HostFunctions> {
849
	crate::chain_spec::moonbeam::ChainSpec::builder(
850
		moonbeam_runtime::WASM_BINARY.expect("WASM binary was not build, please build it!"),
851
		Default::default(),
852
	)
853
	.with_name("Lazy Loading")
854
	.with_id("lazy_loading")
855
	.with_chain_type(ChainType::Development)
856
	.with_properties(
857
		serde_json::from_str(
858
			"{\"tokenDecimals\": 18, \"tokenSymbol\": \"GLMR\", \"SS58Prefix\": 1284}",
859
		)
860
		.expect("Provided valid json map"),
861
	)
862
	.with_genesis_config_preset_name(sp_genesis_builder::DEV_RUNTIME_PRESET)
863
}