1
// Copyright 2024 Moonbeam foundation
2
// This file is part of Moonbeam.
3

            
4
// Moonbeam is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Moonbeam is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Moonbeam.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
use crate::chain_spec::Extensions;
18
use crate::{
19
	lazy_loading, open_frontier_backend, rpc, set_prometheus_registry, BlockImportPipeline,
20
	ClientCustomizations, FrontierBlockImport, HostFunctions, MockTimestampInherentDataProvider,
21
	PartialComponentsResult, PendingConsensusDataProvider, RuntimeApiCollection,
22
	RELAY_CHAIN_SLOT_DURATION_MILLIS, SOFT_DEADLINE_PERCENT, TIMESTAMP,
23
};
24
use cumulus_client_parachain_inherent::{MockValidationDataInherentDataProvider, MockXcmConfig};
25
use cumulus_primitives_core::{relay_chain, BlockT, CollectCollationInfo, ParaId};
26
use fc_rpc::StorageOverrideHandler;
27
use fc_rpc_core::types::{FeeHistoryCache, FilterPool};
28
use frontier_backend::LazyLoadingFrontierBackend;
29
use futures::{FutureExt, StreamExt};
30
use moonbeam_cli_opt::{EthApi as EthApiCmd, LazyLoadingConfig, RpcConfig};
31
use moonbeam_core_primitives::{Block, Hash};
32
use nimbus_consensus::NimbusManualSealConsensusDataProvider;
33
use nimbus_primitives::NimbusId;
34
use parity_scale_codec::Encode;
35
use polkadot_primitives::{AbridgedHostConfiguration, AsyncBackingParams, Slot, UpgradeGoAhead};
36
use sc_chain_spec::{get_extension, BuildGenesisBlock, ChainType, GenesisBlockBuilder};
37
use sc_client_api::{Backend, BadBlocks, ExecutorProvider, ForkBlocks};
38
use sc_executor::{HeapAllocStrategy, RuntimeVersionOf, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
39
use sc_network::config::FullNetworkConfiguration;
40
use sc_network::NetworkBackend;
41
use sc_network_common::sync::SyncMode;
42
use sc_service::{
43
	error::Error as ServiceError, ClientConfig, Configuration, Error, KeystoreContainer,
44
	LocalCallExecutor, PartialComponents, TaskManager,
45
};
46
use sc_telemetry::{TelemetryHandle, TelemetryWorker};
47
use sc_transaction_pool_api::{OffchainTransactionPoolFactory, TransactionPool};
48
use sp_api::{ConstructRuntimeApi, ProvideRuntimeApi};
49
use sp_blockchain::HeaderBackend;
50
use sp_core::traits::CodeExecutor;
51
use sp_core::H256;
52
use sp_runtime::traits::NumberFor;
53
use std::collections::BTreeMap;
54
use std::str::FromStr;
55
use std::sync::atomic::Ordering;
56
use std::sync::{Arc, Mutex};
57
use std::time::Duration;
58

            
59
pub mod call_executor;
60
mod client;
61
pub mod frontier_backend;
62
mod helpers;
63
mod lock;
64
mod manual_sealing;
65
mod rpc_client;
66
mod state_overrides;
67
pub mod substrate_backend;
68

            
69
pub const LAZY_LOADING_LOG_TARGET: &'static str = "lazy-loading";
70

            
71
/// Lazy loading client type.
72
pub type TLazyLoadingClient<TBl, TRtApi, TExec> = sc_service::client::Client<
73
	TLazyLoadingBackend<TBl>,
74
	TLazyLoadingCallExecutor<TBl, TExec>,
75
	TBl,
76
	TRtApi,
77
>;
78

            
79
/// Lazy loading client backend type.
80
pub type TLazyLoadingBackend<TBl> = substrate_backend::Backend<TBl>;
81

            
82
/// Lazy loading client call executor type.
83
pub type TLazyLoadingCallExecutor<TBl, TExec> = call_executor::LazyLoadingCallExecutor<
84
	TBl,
85
	LocalCallExecutor<TBl, TLazyLoadingBackend<TBl>, TExec>,
86
>;
87

            
88
/// Lazy loading parts type.
89
pub type TLazyLoadingParts<TBl, TRtApi, TExec> = (
90
	TLazyLoadingClient<TBl, TRtApi, TExec>,
91
	Arc<TLazyLoadingBackend<TBl>>,
92
	KeystoreContainer,
93
	TaskManager,
94
);
95

            
96
type LazyLoadingClient<RuntimeApi> =
97
	TLazyLoadingClient<Block, RuntimeApi, WasmExecutor<HostFunctions>>;
98
type LazyLoadingBackend = TLazyLoadingBackend<Block>;
99

            
100
/// Create the initial parts of a lazy loading node.
101
pub fn new_lazy_loading_parts<TBl, TRtApi, TExec>(
102
	config: &mut Configuration,
103
	lazy_loading_config: &LazyLoadingConfig,
104
	telemetry: Option<TelemetryHandle>,
105
	executor: TExec,
106
) -> Result<TLazyLoadingParts<TBl, TRtApi, TExec>, Error>
107
where
108
	TBl: BlockT + sp_runtime::DeserializeOwned,
109
	TBl::Hash: From<H256>,
110
	TExec: CodeExecutor + RuntimeVersionOf + Clone,
111
{
112
	let backend = substrate_backend::new_backend(config, &lazy_loading_config)?;
113

            
114
	let genesis_block_builder = GenesisBlockBuilder::new(
115
		config.chain_spec.as_storage_builder(),
116
		!config.no_genesis(),
117
		backend.clone(),
118
		executor.clone(),
119
	)?;
120

            
121
	new_lazy_loading_parts_with_genesis_builder(
122
		config,
123
		telemetry,
124
		executor,
125
		backend,
126
		genesis_block_builder,
127
	)
128
}
129

            
130
/// Create the initial parts of a lazy loading node.
131
pub fn new_lazy_loading_parts_with_genesis_builder<TBl, TRtApi, TExec, TBuildGenesisBlock>(
132
	config: &Configuration,
133
	telemetry: Option<TelemetryHandle>,
134
	executor: TExec,
135
	backend: Arc<TLazyLoadingBackend<TBl>>,
136
	genesis_block_builder: TBuildGenesisBlock,
137
) -> Result<TLazyLoadingParts<TBl, TRtApi, TExec>, Error>
138
where
139
	TBl: BlockT + sp_runtime::DeserializeOwned,
140
	TBl::Hash: From<H256>,
141
	TExec: CodeExecutor + RuntimeVersionOf + Clone,
142
	TBuildGenesisBlock:
143
		BuildGenesisBlock<
144
			TBl,
145
			BlockImportOperation = <TLazyLoadingBackend<TBl> as sc_client_api::backend::Backend<
146
				TBl,
147
			>>::BlockImportOperation,
148
		>,
149
{
150
	let keystore_container = KeystoreContainer::new(&config.keystore)?;
151

            
152
	let task_manager = {
153
		let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
154
		TaskManager::new(config.tokio_handle.clone(), registry)?
155
	};
156

            
157
	let chain_spec = &config.chain_spec;
158
	let fork_blocks = get_extension::<ForkBlocks<TBl>>(chain_spec.extensions())
159
		.cloned()
160
		.unwrap_or_default();
161

            
162
	let bad_blocks = get_extension::<BadBlocks<TBl>>(chain_spec.extensions())
163
		.cloned()
164
		.unwrap_or_default();
165

            
166
	let client = {
167
		let extensions = sc_client_api::execution_extensions::ExecutionExtensions::new(
168
			None,
169
			Arc::new(executor.clone()),
170
		);
171

            
172
		let wasm_runtime_substitutes = config
173
			.chain_spec
174
			.code_substitutes()
175
			.into_iter()
176
			.map(|(n, c)| {
177
				let number = NumberFor::<TBl>::from_str(&n).map_err(|_| {
178
					Error::Application(Box::from(format!(
179
						"Failed to parse `{}` as block number for code substitutes. \
180
						 In an old version the key for code substitute was a block hash. \
181
						 Please update the chain spec to a version that is compatible with your node.",
182
						n
183
					)))
184
				})?;
185
				Ok((number, c))
186
			})
187
			.collect::<Result<std::collections::HashMap<_, _>, Error>>()?;
188

            
189
		let client = client::new_client(
190
			backend.clone(),
191
			executor,
192
			genesis_block_builder,
193
			fork_blocks,
194
			bad_blocks,
195
			extensions,
196
			Box::new(task_manager.spawn_handle()),
197
			config
198
				.prometheus_config
199
				.as_ref()
200
				.map(|config| config.registry.clone()),
201
			telemetry,
202
			ClientConfig {
203
				offchain_worker_enabled: config.offchain_worker.enabled,
204
				offchain_indexing_api: config.offchain_worker.indexing_enabled,
205
				wasmtime_precompiled: config.executor.wasmtime_precompiled.clone(),
206
				wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
207
				no_genesis: matches!(
208
					config.network.sync_mode,
209
					SyncMode::LightState { .. } | SyncMode::Warp { .. }
210
				),
211
				wasm_runtime_substitutes,
212
				enable_import_proof_recording: true,
213
			},
214
		)?;
215

            
216
		client
217
	};
218

            
219
	Ok((client, backend, keystore_container, task_manager))
220
}
221

            
222
/// Builds the PartialComponents for a lazy loading node.
223
#[allow(clippy::type_complexity)]
224
pub fn new_lazy_loading_partial<RuntimeApi, Customizations>(
225
	config: &mut Configuration,
226
	rpc_config: &RpcConfig,
227
	lazy_loading_config: &LazyLoadingConfig,
228
) -> PartialComponentsResult<LazyLoadingClient<RuntimeApi>, LazyLoadingBackend>
229
where
230
	RuntimeApi: ConstructRuntimeApi<Block, LazyLoadingClient<RuntimeApi>> + Send + Sync + 'static,
231
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
232
	Customizations: ClientCustomizations + 'static,
233
{
234
	set_prometheus_registry(config, rpc_config.no_prometheus_prefix)?;
235

            
236
	// Use ethereum style for subscription ids
237
	config.rpc.id_provider = Some(Box::new(fc_rpc::EthereumSubIdProvider));
238

            
239
	let telemetry = config
240
		.telemetry_endpoints
241
		.clone()
242
		.filter(|x| !x.is_empty())
243
		.map(|endpoints| -> Result<_, sc_telemetry::Error> {
244
			let worker = TelemetryWorker::new(16)?;
245
			let telemetry = worker.handle().new_telemetry(endpoints);
246
			Ok((worker, telemetry))
247
		})
248
		.transpose()?;
249

            
250
	let heap_pages = config
251
		.executor
252
		.default_heap_pages
253
		.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
254
			extra_pages: h as _,
255
		});
256
	let mut wasm_builder = WasmExecutor::builder()
257
		.with_execution_method(config.executor.wasm_method)
258
		.with_onchain_heap_alloc_strategy(heap_pages)
259
		.with_offchain_heap_alloc_strategy(heap_pages)
260
		.with_ignore_onchain_heap_pages(true)
261
		.with_max_runtime_instances(config.executor.max_runtime_instances)
262
		.with_runtime_cache_size(config.executor.runtime_cache_size);
263

            
264
	if let Some(ref wasmtime_precompiled_path) = config.executor.wasmtime_precompiled {
265
		wasm_builder = wasm_builder.with_wasmtime_precompiled_path(wasmtime_precompiled_path);
266
	}
267

            
268
	let executor = wasm_builder.build();
269

            
270
	let (client, backend, keystore_container, task_manager) =
271
		new_lazy_loading_parts::<Block, RuntimeApi, _>(
272
			config,
273
			lazy_loading_config,
274
			telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
275
			executor,
276
		)?;
277

            
278
	if let Some(block_number) = Customizations::first_block_number_compatible_with_ed25519_zebra() {
279
		client
280
			.execution_extensions()
281
			.set_extensions_factory(sc_client_api::execution_extensions::ExtensionBeforeBlock::<
282
			Block,
283
			sp_io::UseDalekExt,
284
		>::new(block_number));
285
	}
286

            
287
	let client = Arc::new(client);
288

            
289
	let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
290

            
291
	let telemetry = telemetry.map(|(worker, telemetry)| {
292
		task_manager
293
			.spawn_handle()
294
			.spawn("telemetry", None, worker.run());
295
		telemetry
296
	});
297

            
298
	let maybe_select_chain = Some(sc_consensus::LongestChain::new(backend.clone()));
299

            
300
	let transaction_pool = sc_transaction_pool::Builder::new(
301
		task_manager.spawn_essential_handle(),
302
		client.clone(),
303
		config.role.is_authority().into(),
304
	)
305
	.with_options(config.transaction_pool.clone())
306
	.with_prometheus(config.prometheus_registry())
307
	.build();
308

            
309
	let filter_pool: Option<FilterPool> = Some(Arc::new(Mutex::new(BTreeMap::new())));
310
	let fee_history_cache: FeeHistoryCache = Arc::new(Mutex::new(BTreeMap::new()));
311

            
312
	let frontier_backend = Arc::new(open_frontier_backend(client.clone(), config, rpc_config)?);
313
	let frontier_block_import = FrontierBlockImport::new(client.clone(), client.clone());
314

            
315
	let create_inherent_data_providers = move |_, _| async move { Ok(()) };
316

            
317
	let import_queue = nimbus_consensus::import_queue(
318
		client.clone(),
319
		frontier_block_import.clone(),
320
		create_inherent_data_providers,
321
		&task_manager.spawn_essential_handle(),
322
		config.prometheus_registry(),
323
		false,
324
		false,
325
	)?;
326
	let block_import = BlockImportPipeline::Dev(frontier_block_import);
327

            
328
	Ok(PartialComponents {
329
		backend,
330
		client,
331
		import_queue,
332
		keystore_container,
333
		task_manager,
334
		transaction_pool: transaction_pool.into(),
335
		select_chain: maybe_select_chain,
336
		other: (
337
			block_import,
338
			filter_pool,
339
			telemetry,
340
			telemetry_worker_handle,
341
			frontier_backend,
342
			fee_history_cache,
343
		),
344
	})
345
}
346

            
347
/// Builds a new lazy loading service. This service uses manual seal, and mocks
348
/// the parachain inherent.
349
#[sc_tracing::logging::prefix_logs_with("Lazy loading 🌗")]
350
pub async fn new_lazy_loading_service<RuntimeApi, Customizations, Net>(
351
	mut config: Configuration,
352
	_author_id: Option<NimbusId>,
353
	sealing: moonbeam_cli_opt::Sealing,
354
	rpc_config: RpcConfig,
355
	lazy_loading_config: LazyLoadingConfig,
356
	hwbench: Option<sc_sysinfo::HwBench>,
357
) -> Result<TaskManager, ServiceError>
358
where
359
	RuntimeApi: ConstructRuntimeApi<Block, LazyLoadingClient<RuntimeApi>> + Send + Sync + 'static,
360
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
361
	Customizations: ClientCustomizations + 'static,
362
	Net: NetworkBackend<Block, Hash>,
363
{
364
	use async_io::Timer;
365
	use futures::Stream;
366
	use sc_consensus_manual_seal::{EngineCommand, ManualSealParams};
367

            
368
	let sc_service::PartialComponents {
369
		client,
370
		backend,
371
		mut task_manager,
372
		import_queue,
373
		keystore_container,
374
		select_chain: maybe_select_chain,
375
		transaction_pool,
376
		other:
377
			(
378
				block_import_pipeline,
379
				filter_pool,
380
				mut telemetry,
381
				_telemetry_worker_handle,
382
				frontier_backend,
383
				fee_history_cache,
384
			),
385
	} = lazy_loading::new_lazy_loading_partial::<RuntimeApi, Customizations>(
386
		&mut config,
387
		&rpc_config,
388
		&lazy_loading_config,
389
	)?;
390

            
391
	let start_delay = 10;
392
	let lazy_loading_startup_disclaimer = format!(
393
		r#"
394

            
395
		You are now running the Moonbeam client in lazy loading mode, where data is retrieved
396
		from a live RPC node on demand.
397

            
398
		Using remote state from: {rpc}
399
		Forking from block: {fork_block}
400

            
401
		To ensure the client works properly, please note the following:
402

            
403
		    1. *Avoid Throttling*: Ensure that the backing RPC node is not limiting the number of
404
		    requests, as this can prevent the lazy loading client from functioning correctly;
405

            
406
		    2. *Be Patient*: As the client may take approximately 20 times longer than normal to
407
		    retrieve and process the necessary data for the requested operation.
408

            
409

            
410
		The service will start in {start_delay} seconds...
411

            
412
		"#,
413
		rpc = lazy_loading_config.state_rpc,
414
		fork_block = backend.fork_checkpoint.number
415
	);
416

            
417
	log::warn!(
418
		"{}",
419
		ansi_term::Colour::Yellow.paint(lazy_loading_startup_disclaimer)
420
	);
421
	tokio::time::sleep(Duration::from_secs(start_delay)).await;
422

            
423
	let block_import = if let BlockImportPipeline::Dev(block_import) = block_import_pipeline {
424
		block_import
425
	} else {
426
		return Err(ServiceError::Other(
427
			"Block import pipeline is not dev".to_string(),
428
		));
429
	};
430

            
431
	let prometheus_registry = config.prometheus_registry().cloned();
432
	let net_config =
433
		FullNetworkConfiguration::<_, _, Net>::new(&config.network, prometheus_registry.clone());
434

            
435
	let metrics = Net::register_notification_metrics(
436
		config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
437
	);
438

            
439
	let (network, system_rpc_tx, tx_handler_controller, sync_service) =
440
		sc_service::build_network(sc_service::BuildNetworkParams {
441
			config: &config,
442
			client: client.clone(),
443
			transaction_pool: transaction_pool.clone(),
444
			spawn_handle: task_manager.spawn_handle(),
445
			import_queue,
446
			block_announce_validator_builder: None,
447
			warp_sync_config: None,
448
			net_config,
449
			block_relay: None,
450
			metrics,
451
		})?;
452

            
453
	if config.offchain_worker.enabled {
454
		task_manager.spawn_handle().spawn(
455
			"offchain-workers-runner",
456
			"offchain-work",
457
			sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
458
				runtime_api_provider: client.clone(),
459
				keystore: Some(keystore_container.keystore()),
460
				offchain_db: backend.offchain_storage(),
461
				transaction_pool: Some(OffchainTransactionPoolFactory::new(
462
					transaction_pool.clone(),
463
				)),
464
				network_provider: Arc::new(network.clone()),
465
				is_validator: config.role.is_authority(),
466
				enable_http_requests: true,
467
				custom_extensions: move |_| vec![],
468
			})?
469
			.run(client.clone(), task_manager.spawn_handle())
470
			.boxed(),
471
		);
472
	}
473

            
474
	let prometheus_registry = config.prometheus_registry().cloned();
475
	let overrides = Arc::new(StorageOverrideHandler::new(client.clone()));
476
	let fee_history_limit = rpc_config.fee_history_limit;
477
	let mut command_sink = None;
478
	let mut dev_rpc_data = None;
479
	let collator = config.role.is_authority();
480

            
481
	let parachain_id: ParaId = helpers::get_parachain_id(backend.rpc_client.clone())
482
		.unwrap_or_else(|| panic!("Could not get parachain identifier for lazy loading mode."))
483
		.into();
484

            
485
	if collator {
486
		let mut env = sc_basic_authorship::ProposerFactory::with_proof_recording(
487
			task_manager.spawn_handle(),
488
			client.clone(),
489
			transaction_pool.clone(),
490
			prometheus_registry.as_ref(),
491
			telemetry.as_ref().map(|x| x.handle()),
492
		);
493
		env.set_soft_deadline(SOFT_DEADLINE_PERCENT);
494

            
495
		let commands_stream: Box<dyn Stream<Item = EngineCommand<H256>> + Send + Sync + Unpin> =
496
			match sealing {
497
				moonbeam_cli_opt::Sealing::Instant => {
498
					Box::new(
499
						// This bit cribbed from the implementation of instant seal.
500
						transaction_pool.import_notification_stream().map(|_| {
501
							EngineCommand::SealNewBlock {
502
								create_empty: false,
503
								finalize: false,
504
								parent_hash: None,
505
								sender: None,
506
							}
507
						}),
508
					)
509
				}
510
				moonbeam_cli_opt::Sealing::Manual => {
511
					let (sink, stream) = futures::channel::mpsc::channel(1000);
512
					// Keep a reference to the other end of the channel. It goes to the RPC.
513
					command_sink = Some(sink);
514
					Box::new(stream)
515
				}
516
				moonbeam_cli_opt::Sealing::Interval(millis) => Box::new(StreamExt::map(
517
					Timer::interval(Duration::from_millis(millis)),
518
					|_| EngineCommand::SealNewBlock {
519
						create_empty: true,
520
						finalize: false,
521
						parent_hash: None,
522
						sender: None,
523
					},
524
				)),
525
			};
526

            
527
		let select_chain = maybe_select_chain.expect(
528
			"`new_lazy_loading_partial` builds a `LongestChainRule` when building dev service.\
529
				We specified the dev service when calling `new_partial`.\
530
				Therefore, a `LongestChainRule` is present. qed.",
531
		);
532

            
533
		// Create channels for mocked XCM messages.
534
		let (downward_xcm_sender, downward_xcm_receiver) = flume::bounded::<Vec<u8>>(100);
535
		let (hrmp_xcm_sender, hrmp_xcm_receiver) = flume::bounded::<(ParaId, Vec<u8>)>(100);
536
		let additional_relay_offset = Arc::new(std::sync::atomic::AtomicU32::new(0));
537
		dev_rpc_data = Some((
538
			downward_xcm_sender,
539
			hrmp_xcm_sender,
540
			additional_relay_offset.clone(),
541
		));
542

            
543
		// Need to clone it and store here to avoid moving of `client`
544
		// variable in closure below.
545
		let client_vrf = client.clone();
546

            
547
		let keystore_clone = keystore_container.keystore().clone();
548
		let maybe_provide_vrf_digest =
549
			move |nimbus_id: NimbusId, parent: Hash| -> Option<sp_runtime::generic::DigestItem> {
550
				moonbeam_vrf::vrf_pre_digest::<Block, LazyLoadingClient<RuntimeApi>>(
551
					&client_vrf,
552
					&keystore_clone,
553
					nimbus_id,
554
					parent,
555
				)
556
			};
557

            
558
		// Need to clone it and store here to avoid moving of `client`
559
		// variable in closure below.
560
		let client_for_cidp = client.clone();
561

            
562
		task_manager.spawn_essential_handle().spawn_blocking(
563
			"authorship_task",
564
			Some("block-authoring"),
565
			manual_sealing::run_manual_seal(ManualSealParams {
566
				block_import,
567
				env,
568
				client: client.clone(),
569
				pool: transaction_pool.clone(),
570
				commands_stream,
571
				select_chain,
572
				consensus_data_provider: Some(Box::new(NimbusManualSealConsensusDataProvider {
573
					keystore: keystore_container.keystore(),
574
					client: client.clone(),
575
					additional_digests_provider: maybe_provide_vrf_digest,
576
					_phantom: Default::default(),
577
				})),
578
				create_inherent_data_providers: move |block: H256, ()| {
579
					let maybe_current_para_block = client_for_cidp.number(block);
580
					let maybe_current_para_head = client_for_cidp.expect_header(block);
581
					let downward_xcm_receiver = downward_xcm_receiver.clone();
582
					let hrmp_xcm_receiver = hrmp_xcm_receiver.clone();
583
					let additional_relay_offset = additional_relay_offset.clone();
584

            
585
					// Need to clone it and store here to avoid moving of `client`
586
					// variable in closure below.
587
					let client_for_xcm = client_for_cidp.clone();
588

            
589
					async move {
590
						MockTimestampInherentDataProvider::advance_timestamp(
591
							RELAY_CHAIN_SLOT_DURATION_MILLIS,
592
						);
593

            
594
						// Get the mocked timestamp
595
						let timestamp = TIMESTAMP.load(Ordering::SeqCst);
596
						// Calculate mocked slot number
597
						let slot = timestamp.saturating_div(RELAY_CHAIN_SLOT_DURATION_MILLIS);
598

            
599
						let current_para_block = maybe_current_para_block?
600
							.ok_or(sp_blockchain::Error::UnknownBlock(block.to_string()))?;
601

            
602
						let current_para_block_head = Some(polkadot_primitives::HeadData(
603
							maybe_current_para_head?.encode(),
604
						));
605

            
606
						let additional_key_values = vec![
607
							// TODO: TIMESTAMP_NOW can be removed after runtime 4000
608
							(
609
								#[allow(deprecated)]
610
								moonbeam_core_primitives::well_known_relay_keys::TIMESTAMP_NOW
611
									.to_vec(),
612
								timestamp.encode(),
613
							),
614
							// Override current slot number
615
							(
616
								relay_chain::well_known_keys::CURRENT_SLOT.to_vec(),
617
								Slot::from(slot).encode(),
618
							),
619
							(
620
								relay_chain::well_known_keys::ACTIVE_CONFIG.to_vec(),
621
								AbridgedHostConfiguration {
622
									max_code_size: 3_145_728,
623
									max_head_data_size: 20_480,
624
									max_upward_queue_count: 174_762,
625
									max_upward_queue_size: 1_048_576,
626
									max_upward_message_size: 65_531,
627
									max_upward_message_num_per_candidate: 16,
628
									hrmp_max_message_num_per_candidate: 10,
629
									validation_upgrade_cooldown: 14_400,
630
									validation_upgrade_delay: 600,
631
									async_backing_params: AsyncBackingParams {
632
										max_candidate_depth: 3,
633
										allowed_ancestry_len: 2,
634
									},
635
								}
636
								.encode(),
637
							),
638
						];
639

            
640
						let current_para_head = client_for_xcm
641
							.header(block)
642
							.expect("Header lookup should succeed")
643
							.expect("Header passed in as parent should be present in backend.");
644

            
645
						let should_send_go_ahead = match client_for_xcm
646
							.runtime_api()
647
							.collect_collation_info(block, &current_para_head)
648
						{
649
							Ok(info) => info.new_validation_code.is_some(),
650
							Err(e) => {
651
								log::error!("Failed to collect collation info: {:?}", e);
652
								false
653
							}
654
						};
655

            
656
						let mocked_parachain = MockValidationDataInherentDataProvider {
657
							current_para_block,
658
							para_id: parachain_id,
659
							upgrade_go_ahead: should_send_go_ahead.then(|| {
660
								log::info!(
661
									"Detected pending validation code, sending go-ahead signal."
662
								);
663
								UpgradeGoAhead::GoAhead
664
							}),
665
							current_para_block_head,
666
							relay_offset: additional_relay_offset.load(Ordering::SeqCst),
667
							relay_blocks_per_para_block: 1,
668
							para_blocks_per_relay_epoch: 10,
669
							relay_randomness_config: (),
670
							xcm_config: MockXcmConfig::new(
671
								&*client_for_xcm,
672
								block,
673
								Default::default(),
674
							),
675
							raw_downward_messages: downward_xcm_receiver.drain().collect(),
676
							raw_horizontal_messages: hrmp_xcm_receiver.drain().collect(),
677
							additional_key_values: Some(additional_key_values),
678
						};
679

            
680
						let randomness = session_keys_primitives::InherentDataProvider;
681

            
682
						Ok((
683
							MockTimestampInherentDataProvider,
684
							mocked_parachain,
685
							randomness,
686
						))
687
					}
688
				},
689
			}),
690
		);
691
	}
692

            
693
	// Sinks for pubsub notifications.
694
	// Everytime a new subscription is created, a new mpsc channel is added to the sink pool.
695
	// The MappingSyncWorker sends through the channel on block import and the subscription emits a
696
	// notification to the subscriber on receiving a message through this channel.
697
	// This way we avoid race conditions when using native substrate block import notification
698
	// stream.
699
	let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
700
		fc_mapping_sync::EthereumBlockNotification<Block>,
701
	> = Default::default();
702
	let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
703

            
704
	/* TODO: only enable this when frontier backend is compatible with lazy-loading
705
	rpc::spawn_essential_tasks(
706
		rpc::SpawnTasksParams {
707
			task_manager: &task_manager,
708
			client: client.clone(),
709
			substrate_backend: backend.clone(),
710
			frontier_backend: frontier_backend.clone(),
711
			filter_pool: filter_pool.clone(),
712
			overrides: overrides.clone(),
713
			fee_history_limit,
714
			fee_history_cache: fee_history_cache.clone(),
715
		},
716
		sync_service.clone(),
717
		pubsub_notification_sinks.clone(),
718
	);
719
	*/
720

            
721
	let ethapi_cmd = rpc_config.ethapi.clone();
722
	let tracing_requesters =
723
		if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
724
			rpc::tracing::spawn_tracing_tasks(
725
				&rpc_config,
726
				prometheus_registry.clone(),
727
				rpc::SpawnTasksParams {
728
					task_manager: &task_manager,
729
					client: client.clone(),
730
					substrate_backend: backend.clone(),
731
					frontier_backend: frontier_backend.clone(),
732
					filter_pool: filter_pool.clone(),
733
					overrides: overrides.clone(),
734
					fee_history_limit,
735
					fee_history_cache: fee_history_cache.clone(),
736
				},
737
			)
738
		} else {
739
			rpc::tracing::RpcRequesters {
740
				debug: None,
741
				trace: None,
742
			}
743
		};
744

            
745
	let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
746
		task_manager.spawn_handle(),
747
		overrides.clone(),
748
		rpc_config.eth_log_block_cache,
749
		rpc_config.eth_statuses_cache,
750
		prometheus_registry,
751
	));
752

            
753
	let rpc_builder = {
754
		let client = client.clone();
755
		let pool = transaction_pool.clone();
756
		let backend = backend.clone();
757
		let network = network.clone();
758
		let sync = sync_service.clone();
759
		let ethapi_cmd = ethapi_cmd.clone();
760
		let max_past_logs = rpc_config.max_past_logs;
761
		let max_block_range = rpc_config.max_block_range;
762
		let overrides = overrides.clone();
763
		let fee_history_cache = fee_history_cache.clone();
764
		let block_data_cache = block_data_cache.clone();
765
		let pubsub_notification_sinks = pubsub_notification_sinks.clone();
766

            
767
		let keystore = keystore_container.keystore();
768
		let command_sink_for_task = command_sink.clone();
769
		move |subscription_task_executor| {
770
			let deps = rpc::FullDeps {
771
				backend: backend.clone(),
772
				client: client.clone(),
773
				command_sink: command_sink_for_task.clone(),
774
				ethapi_cmd: ethapi_cmd.clone(),
775
				filter_pool: filter_pool.clone(),
776
				frontier_backend: Arc::new(LazyLoadingFrontierBackend {
777
					rpc_client: backend.clone().rpc_client.clone(),
778
					frontier_backend: match *frontier_backend {
779
						fc_db::Backend::KeyValue(ref b) => b.clone(),
780
						fc_db::Backend::Sql(ref b) => b.clone(),
781
					},
782
				}),
783
				graph: pool.clone(),
784
				pool: pool.clone(),
785
				is_authority: collator,
786
				max_past_logs,
787
				max_block_range,
788
				fee_history_limit,
789
				fee_history_cache: fee_history_cache.clone(),
790
				network: network.clone(),
791
				sync: sync.clone(),
792
				dev_rpc_data: dev_rpc_data.clone(),
793
				overrides: overrides.clone(),
794
				block_data_cache: block_data_cache.clone(),
795
				forced_parent_hashes: None,
796
			};
797

            
798
			let pending_consensus_data_provider = Box::new(PendingConsensusDataProvider::new(
799
				client.clone(),
800
				keystore.clone(),
801
			));
802
			if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
803
				rpc::create_full(
804
					deps,
805
					subscription_task_executor,
806
					Some(crate::rpc::TracingConfig {
807
						tracing_requesters: tracing_requesters.clone(),
808
						trace_filter_max_count: rpc_config.ethapi_trace_max_count,
809
					}),
810
					pubsub_notification_sinks.clone(),
811
					pending_consensus_data_provider,
812
					parachain_id,
813
				)
814
				.map_err(Into::into)
815
			} else {
816
				rpc::create_full(
817
					deps,
818
					subscription_task_executor,
819
					None,
820
					pubsub_notification_sinks.clone(),
821
					pending_consensus_data_provider,
822
					parachain_id,
823
				)
824
				.map_err(Into::into)
825
			}
826
		}
827
	};
828

            
829
	let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
830
		network,
831
		client,
832
		keystore: keystore_container.keystore(),
833
		task_manager: &mut task_manager,
834
		transaction_pool,
835
		rpc_builder: Box::new(rpc_builder),
836
		backend,
837
		system_rpc_tx,
838
		sync_service: sync_service.clone(),
839
		config,
840
		tx_handler_controller,
841
		telemetry: None,
842
	})?;
843

            
844
	if let Some(hwbench) = hwbench {
845
		sc_sysinfo::print_hwbench(&hwbench);
846

            
847
		if let Some(ref mut telemetry) = telemetry {
848
			let telemetry_handle = telemetry.handle();
849
			task_manager.spawn_handle().spawn(
850
				"telemetry_hwbench",
851
				None,
852
				sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
853
			);
854
		}
855
	}
856

            
857
	log::info!("Service Ready");
858

            
859
	Ok(task_manager)
860
}
861

            
862
pub fn spec_builder() -> sc_chain_spec::ChainSpecBuilder<Extensions, HostFunctions> {
863
	crate::chain_spec::moonbeam::ChainSpec::builder(
864
		moonbeam_runtime::WASM_BINARY.expect("WASM binary was not build, please build it!"),
865
		Default::default(),
866
	)
867
	.with_name("Lazy Loading")
868
	.with_id("lazy_loading")
869
	.with_chain_type(ChainType::Development)
870
	.with_properties(
871
		serde_json::from_str(
872
			"{\"tokenDecimals\": 18, \"tokenSymbol\": \"GLMR\", \"SS58Prefix\": 1284}",
873
		)
874
		.expect("Provided valid json map"),
875
	)
876
	.with_genesis_config_preset_name(sp_genesis_builder::DEV_RUNTIME_PRESET)
877
}