1
// Copyright 2024 Moonbeam foundation
2
// This file is part of Moonbeam.
3

            
4
// Moonbeam is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Moonbeam is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Moonbeam.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
use crate::chain_spec::Extensions;
18
use crate::{
19
	lazy_loading, open_frontier_backend, rpc, set_prometheus_registry, BlockImportPipeline,
20
	ClientCustomizations, FrontierBlockImport, HostFunctions, PartialComponentsResult,
21
	PendingConsensusDataProvider, RuntimeApiCollection, SOFT_DEADLINE_PERCENT,
22
};
23
use cumulus_client_parachain_inherent::{MockValidationDataInherentDataProvider, MockXcmConfig};
24
use cumulus_primitives_core::{relay_chain, BlockT, CollectCollationInfo, ParaId};
25
use cumulus_primitives_parachain_inherent::ParachainInherentData;
26
use cumulus_test_relay_sproof_builder::RelayStateSproofBuilder;
27
use fc_rpc::StorageOverrideHandler;
28
use fc_rpc_core::types::{FeeHistoryCache, FilterPool};
29
use frontier_backend::LazyLoadingFrontierBackend;
30
use futures::{FutureExt, StreamExt};
31
use moonbeam_cli_opt::{EthApi as EthApiCmd, LazyLoadingConfig, RpcConfig};
32
use moonbeam_core_primitives::{Block, Hash};
33
use nimbus_consensus::NimbusManualSealConsensusDataProvider;
34
use nimbus_primitives::NimbusId;
35
use parity_scale_codec::Encode;
36
use polkadot_primitives::{
37
	AbridgedHostConfiguration, AsyncBackingParams, PersistedValidationData, Slot, UpgradeGoAhead,
38
};
39
use sc_chain_spec::{get_extension, BuildGenesisBlock, ChainType, GenesisBlockBuilder};
40
use sc_client_api::{Backend, BadBlocks, ExecutorProvider, ForkBlocks};
41
use sc_executor::{HeapAllocStrategy, RuntimeVersionOf, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
42
use sc_network::config::FullNetworkConfiguration;
43
use sc_network::NetworkBackend;
44
use sc_network_common::sync::SyncMode;
45
use sc_service::{
46
	error::Error as ServiceError, ClientConfig, Configuration, Error, KeystoreContainer,
47
	LocalCallExecutor, PartialComponents, TaskManager,
48
};
49
use sc_telemetry::{TelemetryHandle, TelemetryWorker};
50
use sc_transaction_pool_api::{OffchainTransactionPoolFactory, TransactionPool};
51
use sp_api::{ConstructRuntimeApi, ProvideRuntimeApi};
52
use sp_blockchain::HeaderBackend;
53
use sp_core::traits::CodeExecutor;
54
use sp_core::H256;
55
use sp_runtime::traits::NumberFor;
56
use std::collections::BTreeMap;
57
use std::str::FromStr;
58
use std::sync::{Arc, Mutex};
59
use std::time::Duration;
60

            
61
pub mod call_executor;
62
mod client;
63
pub mod frontier_backend;
64
mod helpers;
65
mod lock;
66
mod manual_sealing;
67
mod rpc_client;
68
mod state_overrides;
69
pub mod substrate_backend;
70

            
71
pub const LAZY_LOADING_LOG_TARGET: &'static str = "lazy-loading";
72

            
73
/// Lazy loading client type.
74
pub type TLazyLoadingClient<TBl, TRtApi, TExec> = sc_service::client::Client<
75
	TLazyLoadingBackend<TBl>,
76
	TLazyLoadingCallExecutor<TBl, TExec>,
77
	TBl,
78
	TRtApi,
79
>;
80

            
81
/// Lazy loading client backend type.
82
pub type TLazyLoadingBackend<TBl> = substrate_backend::Backend<TBl>;
83

            
84
/// Lazy loading client call executor type.
85
pub type TLazyLoadingCallExecutor<TBl, TExec> = call_executor::LazyLoadingCallExecutor<
86
	TBl,
87
	LocalCallExecutor<TBl, TLazyLoadingBackend<TBl>, TExec>,
88
>;
89

            
90
/// Lazy loading parts type.
91
pub type TLazyLoadingParts<TBl, TRtApi, TExec> = (
92
	TLazyLoadingClient<TBl, TRtApi, TExec>,
93
	Arc<TLazyLoadingBackend<TBl>>,
94
	KeystoreContainer,
95
	TaskManager,
96
);
97

            
98
type LazyLoadingClient<RuntimeApi> =
99
	TLazyLoadingClient<Block, RuntimeApi, WasmExecutor<HostFunctions>>;
100
type LazyLoadingBackend = TLazyLoadingBackend<Block>;
101

            
102
/// Create the initial parts of a lazy loading node.
103
pub fn new_lazy_loading_parts<TBl, TRtApi, TExec>(
104
	config: &mut Configuration,
105
	lazy_loading_config: &LazyLoadingConfig,
106
	telemetry: Option<TelemetryHandle>,
107
	executor: TExec,
108
) -> Result<TLazyLoadingParts<TBl, TRtApi, TExec>, Error>
109
where
110
	TBl: BlockT + sp_runtime::DeserializeOwned,
111
	TBl::Hash: From<H256>,
112
	TExec: CodeExecutor + RuntimeVersionOf + Clone,
113
{
114
	let backend = substrate_backend::new_backend(config, &lazy_loading_config)?;
115

            
116
	let genesis_block_builder = GenesisBlockBuilder::new(
117
		config.chain_spec.as_storage_builder(),
118
		!config.no_genesis(),
119
		backend.clone(),
120
		executor.clone(),
121
	)?;
122

            
123
	new_lazy_loading_parts_with_genesis_builder(
124
		config,
125
		telemetry,
126
		executor,
127
		backend,
128
		genesis_block_builder,
129
	)
130
}
131

            
132
/// Create the initial parts of a lazy loading node.
133
pub fn new_lazy_loading_parts_with_genesis_builder<TBl, TRtApi, TExec, TBuildGenesisBlock>(
134
	config: &Configuration,
135
	telemetry: Option<TelemetryHandle>,
136
	executor: TExec,
137
	backend: Arc<TLazyLoadingBackend<TBl>>,
138
	genesis_block_builder: TBuildGenesisBlock,
139
) -> Result<TLazyLoadingParts<TBl, TRtApi, TExec>, Error>
140
where
141
	TBl: BlockT + sp_runtime::DeserializeOwned,
142
	TBl::Hash: From<H256>,
143
	TExec: CodeExecutor + RuntimeVersionOf + Clone,
144
	TBuildGenesisBlock:
145
		BuildGenesisBlock<
146
			TBl,
147
			BlockImportOperation = <TLazyLoadingBackend<TBl> as sc_client_api::backend::Backend<
148
				TBl,
149
			>>::BlockImportOperation,
150
		>,
151
{
152
	let keystore_container = KeystoreContainer::new(&config.keystore)?;
153

            
154
	let task_manager = {
155
		let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
156
		TaskManager::new(config.tokio_handle.clone(), registry)?
157
	};
158

            
159
	let chain_spec = &config.chain_spec;
160
	let fork_blocks = get_extension::<ForkBlocks<TBl>>(chain_spec.extensions())
161
		.cloned()
162
		.unwrap_or_default();
163

            
164
	let bad_blocks = get_extension::<BadBlocks<TBl>>(chain_spec.extensions())
165
		.cloned()
166
		.unwrap_or_default();
167

            
168
	let client = {
169
		let extensions = sc_client_api::execution_extensions::ExecutionExtensions::new(
170
			None,
171
			Arc::new(executor.clone()),
172
		);
173

            
174
		let wasm_runtime_substitutes = config
175
			.chain_spec
176
			.code_substitutes()
177
			.into_iter()
178
			.map(|(n, c)| {
179
				let number = NumberFor::<TBl>::from_str(&n).map_err(|_| {
180
					Error::Application(Box::from(format!(
181
						"Failed to parse `{}` as block number for code substitutes. \
182
						 In an old version the key for code substitute was a block hash. \
183
						 Please update the chain spec to a version that is compatible with your node.",
184
						n
185
					)))
186
				})?;
187
				Ok((number, c))
188
			})
189
			.collect::<Result<std::collections::HashMap<_, _>, Error>>()?;
190

            
191
		let client = client::new_client(
192
			backend.clone(),
193
			executor,
194
			genesis_block_builder,
195
			fork_blocks,
196
			bad_blocks,
197
			extensions,
198
			Box::new(task_manager.spawn_handle()),
199
			config
200
				.prometheus_config
201
				.as_ref()
202
				.map(|config| config.registry.clone()),
203
			telemetry,
204
			ClientConfig {
205
				offchain_worker_enabled: config.offchain_worker.enabled,
206
				offchain_indexing_api: config.offchain_worker.indexing_enabled,
207
				wasmtime_precompiled: config.executor.wasmtime_precompiled.clone(),
208
				wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
209
				no_genesis: matches!(
210
					config.network.sync_mode,
211
					SyncMode::LightState { .. } | SyncMode::Warp { .. }
212
				),
213
				wasm_runtime_substitutes,
214
				enable_import_proof_recording: true,
215
			},
216
		)?;
217

            
218
		client
219
	};
220

            
221
	Ok((client, backend, keystore_container, task_manager))
222
}
223

            
224
/// Builds the PartialComponents for a lazy loading node.
225
#[allow(clippy::type_complexity)]
226
pub fn new_lazy_loading_partial<RuntimeApi, Customizations>(
227
	config: &mut Configuration,
228
	rpc_config: &RpcConfig,
229
	lazy_loading_config: &LazyLoadingConfig,
230
) -> PartialComponentsResult<LazyLoadingClient<RuntimeApi>, LazyLoadingBackend>
231
where
232
	RuntimeApi: ConstructRuntimeApi<Block, LazyLoadingClient<RuntimeApi>> + Send + Sync + 'static,
233
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
234
	Customizations: ClientCustomizations + 'static,
235
{
236
	set_prometheus_registry(config, rpc_config.no_prometheus_prefix)?;
237

            
238
	// Use ethereum style for subscription ids
239
	config.rpc.id_provider = Some(Box::new(fc_rpc::EthereumSubIdProvider));
240

            
241
	let telemetry = config
242
		.telemetry_endpoints
243
		.clone()
244
		.filter(|x| !x.is_empty())
245
		.map(|endpoints| -> Result<_, sc_telemetry::Error> {
246
			let worker = TelemetryWorker::new(16)?;
247
			let telemetry = worker.handle().new_telemetry(endpoints);
248
			Ok((worker, telemetry))
249
		})
250
		.transpose()?;
251

            
252
	let heap_pages = config
253
		.executor
254
		.default_heap_pages
255
		.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
256
			extra_pages: h as _,
257
		});
258
	let mut wasm_builder = WasmExecutor::builder()
259
		.with_execution_method(config.executor.wasm_method)
260
		.with_onchain_heap_alloc_strategy(heap_pages)
261
		.with_offchain_heap_alloc_strategy(heap_pages)
262
		.with_ignore_onchain_heap_pages(true)
263
		.with_max_runtime_instances(config.executor.max_runtime_instances)
264
		.with_runtime_cache_size(config.executor.runtime_cache_size);
265

            
266
	if let Some(ref wasmtime_precompiled_path) = config.executor.wasmtime_precompiled {
267
		wasm_builder = wasm_builder.with_wasmtime_precompiled_path(wasmtime_precompiled_path);
268
	}
269

            
270
	let executor = wasm_builder.build();
271

            
272
	let (client, backend, keystore_container, task_manager) =
273
		new_lazy_loading_parts::<Block, RuntimeApi, _>(
274
			config,
275
			lazy_loading_config,
276
			telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
277
			executor,
278
		)?;
279

            
280
	if let Some(block_number) = Customizations::first_block_number_compatible_with_ed25519_zebra() {
281
		client
282
			.execution_extensions()
283
			.set_extensions_factory(sc_client_api::execution_extensions::ExtensionBeforeBlock::<
284
			Block,
285
			sp_io::UseDalekExt,
286
		>::new(block_number));
287
	}
288

            
289
	let client = Arc::new(client);
290

            
291
	let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
292

            
293
	let telemetry = telemetry.map(|(worker, telemetry)| {
294
		task_manager
295
			.spawn_handle()
296
			.spawn("telemetry", None, worker.run());
297
		telemetry
298
	});
299

            
300
	let maybe_select_chain = Some(sc_consensus::LongestChain::new(backend.clone()));
301

            
302
	let transaction_pool = sc_transaction_pool::Builder::new(
303
		task_manager.spawn_essential_handle(),
304
		client.clone(),
305
		config.role.is_authority().into(),
306
	)
307
	.with_options(config.transaction_pool.clone())
308
	.with_prometheus(config.prometheus_registry())
309
	.build();
310

            
311
	let filter_pool: Option<FilterPool> = Some(Arc::new(Mutex::new(BTreeMap::new())));
312
	let fee_history_cache: FeeHistoryCache = Arc::new(Mutex::new(BTreeMap::new()));
313

            
314
	let frontier_backend = Arc::new(open_frontier_backend(client.clone(), config, rpc_config)?);
315
	let frontier_block_import = FrontierBlockImport::new(client.clone(), client.clone());
316

            
317
	let create_inherent_data_providers = move |_, _| async move {
318
		let time = sp_timestamp::InherentDataProvider::from_system_time();
319
		// Create a dummy parachain inherent data provider which is required to pass
320
		// the checks by the para chain system. We use dummy values because in the 'pending context'
321
		// neither do we have access to the real values nor do we need them.
322
		let (relay_parent_storage_root, relay_chain_state) =
323
			RelayStateSproofBuilder::default().into_state_root_and_proof();
324
		let vfp = PersistedValidationData {
325
			// This is a hack to make `cumulus_pallet_parachain_system::RelayNumberStrictlyIncreases`
326
			// happy. Relay parent number can't be bigger than u32::MAX.
327
			relay_parent_number: u32::MAX,
328
			relay_parent_storage_root,
329
			..Default::default()
330
		};
331
		let parachain_inherent_data = ParachainInherentData {
332
			validation_data: vfp,
333
			relay_chain_state,
334
			downward_messages: Default::default(),
335
			horizontal_messages: Default::default(),
336
		};
337
		Ok((time, parachain_inherent_data))
338
	};
339

            
340
	let import_queue = nimbus_consensus::import_queue(
341
		client.clone(),
342
		frontier_block_import.clone(),
343
		create_inherent_data_providers,
344
		&task_manager.spawn_essential_handle(),
345
		config.prometheus_registry(),
346
		false,
347
		false,
348
	)?;
349
	let block_import = BlockImportPipeline::Dev(frontier_block_import);
350

            
351
	Ok(PartialComponents {
352
		backend,
353
		client,
354
		import_queue,
355
		keystore_container,
356
		task_manager,
357
		transaction_pool: transaction_pool.into(),
358
		select_chain: maybe_select_chain,
359
		other: (
360
			block_import,
361
			filter_pool,
362
			telemetry,
363
			telemetry_worker_handle,
364
			frontier_backend,
365
			fee_history_cache,
366
		),
367
	})
368
}
369

            
370
/// Builds a new lazy loading service. This service uses manual seal, and mocks
371
/// the parachain inherent.
372
#[sc_tracing::logging::prefix_logs_with("Lazy loading 🌗")]
373
pub async fn new_lazy_loading_service<RuntimeApi, Customizations, Net>(
374
	mut config: Configuration,
375
	_author_id: Option<NimbusId>,
376
	sealing: moonbeam_cli_opt::Sealing,
377
	rpc_config: RpcConfig,
378
	lazy_loading_config: LazyLoadingConfig,
379
	hwbench: Option<sc_sysinfo::HwBench>,
380
) -> Result<TaskManager, ServiceError>
381
where
382
	RuntimeApi: ConstructRuntimeApi<Block, LazyLoadingClient<RuntimeApi>> + Send + Sync + 'static,
383
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
384
	Customizations: ClientCustomizations + 'static,
385
	Net: NetworkBackend<Block, Hash>,
386
{
387
	use async_io::Timer;
388
	use futures::Stream;
389
	use sc_consensus_manual_seal::{EngineCommand, ManualSealParams};
390

            
391
	let sc_service::PartialComponents {
392
		client,
393
		backend,
394
		mut task_manager,
395
		import_queue,
396
		keystore_container,
397
		select_chain: maybe_select_chain,
398
		transaction_pool,
399
		other:
400
			(
401
				block_import_pipeline,
402
				filter_pool,
403
				mut telemetry,
404
				_telemetry_worker_handle,
405
				frontier_backend,
406
				fee_history_cache,
407
			),
408
	} = lazy_loading::new_lazy_loading_partial::<RuntimeApi, Customizations>(
409
		&mut config,
410
		&rpc_config,
411
		&lazy_loading_config,
412
	)?;
413

            
414
	let start_delay = 10;
415
	let lazy_loading_startup_disclaimer = format!(
416
		r#"
417

            
418
		You are now running the Moonbeam client in lazy loading mode, where data is retrieved
419
		from a live RPC node on demand.
420

            
421
		Using remote state from: {rpc}
422
		Forking from block: {fork_block}
423

            
424
		To ensure the client works properly, please note the following:
425

            
426
		    1. *Avoid Throttling*: Ensure that the backing RPC node is not limiting the number of
427
		    requests, as this can prevent the lazy loading client from functioning correctly;
428

            
429
		    2. *Be Patient*: As the client may take approximately 20 times longer than normal to
430
		    retrieve and process the necessary data for the requested operation.
431

            
432

            
433
		The service will start in {start_delay} seconds...
434

            
435
		"#,
436
		rpc = lazy_loading_config.state_rpc,
437
		fork_block = backend.fork_checkpoint.number
438
	);
439

            
440
	log::warn!(
441
		"{}",
442
		ansi_term::Colour::Yellow.paint(lazy_loading_startup_disclaimer)
443
	);
444
	tokio::time::sleep(Duration::from_secs(start_delay)).await;
445

            
446
	let block_import = if let BlockImportPipeline::Dev(block_import) = block_import_pipeline {
447
		block_import
448
	} else {
449
		return Err(ServiceError::Other(
450
			"Block import pipeline is not dev".to_string(),
451
		));
452
	};
453

            
454
	let prometheus_registry = config.prometheus_registry().cloned();
455
	let net_config =
456
		FullNetworkConfiguration::<_, _, Net>::new(&config.network, prometheus_registry.clone());
457

            
458
	let metrics = Net::register_notification_metrics(
459
		config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
460
	);
461

            
462
	let (network, system_rpc_tx, tx_handler_controller, sync_service) =
463
		sc_service::build_network(sc_service::BuildNetworkParams {
464
			config: &config,
465
			client: client.clone(),
466
			transaction_pool: transaction_pool.clone(),
467
			spawn_handle: task_manager.spawn_handle(),
468
			import_queue,
469
			block_announce_validator_builder: None,
470
			warp_sync_config: None,
471
			net_config,
472
			block_relay: None,
473
			metrics,
474
		})?;
475

            
476
	if config.offchain_worker.enabled {
477
		task_manager.spawn_handle().spawn(
478
			"offchain-workers-runner",
479
			"offchain-work",
480
			sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
481
				runtime_api_provider: client.clone(),
482
				keystore: Some(keystore_container.keystore()),
483
				offchain_db: backend.offchain_storage(),
484
				transaction_pool: Some(OffchainTransactionPoolFactory::new(
485
					transaction_pool.clone(),
486
				)),
487
				network_provider: Arc::new(network.clone()),
488
				is_validator: config.role.is_authority(),
489
				enable_http_requests: true,
490
				custom_extensions: move |_| vec![],
491
			})?
492
			.run(client.clone(), task_manager.spawn_handle())
493
			.boxed(),
494
		);
495
	}
496

            
497
	let prometheus_registry = config.prometheus_registry().cloned();
498
	let overrides = Arc::new(StorageOverrideHandler::new(client.clone()));
499
	let fee_history_limit = rpc_config.fee_history_limit;
500
	let mut command_sink = None;
501
	let mut dev_rpc_data = None;
502
	let collator = config.role.is_authority();
503

            
504
	if collator {
505
		let mut env = sc_basic_authorship::ProposerFactory::with_proof_recording(
506
			task_manager.spawn_handle(),
507
			client.clone(),
508
			transaction_pool.clone(),
509
			prometheus_registry.as_ref(),
510
			telemetry.as_ref().map(|x| x.handle()),
511
		);
512
		env.set_soft_deadline(SOFT_DEADLINE_PERCENT);
513

            
514
		let commands_stream: Box<dyn Stream<Item = EngineCommand<H256>> + Send + Sync + Unpin> =
515
			match sealing {
516
				moonbeam_cli_opt::Sealing::Instant => {
517
					Box::new(
518
						// This bit cribbed from the implementation of instant seal.
519
						transaction_pool.import_notification_stream().map(|_| {
520
							EngineCommand::SealNewBlock {
521
								create_empty: false,
522
								finalize: false,
523
								parent_hash: None,
524
								sender: None,
525
							}
526
						}),
527
					)
528
				}
529
				moonbeam_cli_opt::Sealing::Manual => {
530
					let (sink, stream) = futures::channel::mpsc::channel(1000);
531
					// Keep a reference to the other end of the channel. It goes to the RPC.
532
					command_sink = Some(sink);
533
					Box::new(stream)
534
				}
535
				moonbeam_cli_opt::Sealing::Interval(millis) => Box::new(StreamExt::map(
536
					Timer::interval(Duration::from_millis(millis)),
537
					|_| EngineCommand::SealNewBlock {
538
						create_empty: true,
539
						finalize: false,
540
						parent_hash: None,
541
						sender: None,
542
					},
543
				)),
544
			};
545

            
546
		let select_chain = maybe_select_chain.expect(
547
			"`new_lazy_loading_partial` builds a `LongestChainRule` when building dev service.\
548
				We specified the dev service when calling `new_partial`.\
549
				Therefore, a `LongestChainRule` is present. qed.",
550
		);
551

            
552
		// Create channels for mocked XCM messages.
553
		let (downward_xcm_sender, downward_xcm_receiver) = flume::bounded::<Vec<u8>>(100);
554
		let (hrmp_xcm_sender, hrmp_xcm_receiver) = flume::bounded::<(ParaId, Vec<u8>)>(100);
555
		let additional_relay_offset = Arc::new(std::sync::atomic::AtomicU32::new(0));
556
		dev_rpc_data = Some((
557
			downward_xcm_sender,
558
			hrmp_xcm_sender,
559
			additional_relay_offset,
560
		));
561

            
562
		// Need to clone it and store here to avoid moving of `client`
563
		// variable in closure below.
564
		let client_vrf = client.clone();
565

            
566
		let keystore_clone = keystore_container.keystore().clone();
567
		let maybe_provide_vrf_digest =
568
			move |nimbus_id: NimbusId, parent: Hash| -> Option<sp_runtime::generic::DigestItem> {
569
				moonbeam_vrf::vrf_pre_digest::<Block, LazyLoadingClient<RuntimeApi>>(
570
					&client_vrf,
571
					&keystore_clone,
572
					nimbus_id,
573
					parent,
574
				)
575
			};
576

            
577
		let parachain_id = helpers::get_parachain_id(backend.rpc_client.clone())
578
			.unwrap_or_else(|| panic!("Could not get parachain identifier for lazy loading mode."));
579

            
580
		// Need to clone it and store here to avoid moving of `client`
581
		// variable in closure below.
582
		let client_for_cidp = client.clone();
583

            
584
		task_manager.spawn_essential_handle().spawn_blocking(
585
			"authorship_task",
586
			Some("block-authoring"),
587
			manual_sealing::run_manual_seal(ManualSealParams {
588
				block_import,
589
				env,
590
				client: client.clone(),
591
				pool: transaction_pool.clone(),
592
				commands_stream,
593
				select_chain,
594
				consensus_data_provider: Some(Box::new(NimbusManualSealConsensusDataProvider {
595
					keystore: keystore_container.keystore(),
596
					client: client.clone(),
597
					additional_digests_provider: maybe_provide_vrf_digest,
598
					_phantom: Default::default(),
599
				})),
600
				create_inherent_data_providers: move |block: H256, ()| {
601
					let maybe_current_para_block = client_for_cidp.number(block);
602
					let maybe_current_para_head = client_for_cidp.expect_header(block);
603
					let downward_xcm_receiver = downward_xcm_receiver.clone();
604
					let hrmp_xcm_receiver = hrmp_xcm_receiver.clone();
605

            
606
					// Need to clone it and store here to avoid moving of `client`
607
					// variable in closure below.
608
					let client_for_xcm = client_for_cidp.clone();
609

            
610
					async move {
611
						let time = sp_timestamp::InherentDataProvider::from_system_time();
612

            
613
						let current_para_block = maybe_current_para_block?
614
							.ok_or(sp_blockchain::Error::UnknownBlock(block.to_string()))?;
615

            
616
						let current_para_block_head = Some(polkadot_primitives::HeadData(
617
							maybe_current_para_head?.encode(),
618
						));
619

            
620
						let additional_key_values = vec![
621
							(
622
								moonbeam_core_primitives::well_known_relay_keys::TIMESTAMP_NOW
623
									.to_vec(),
624
								sp_timestamp::Timestamp::current().encode(),
625
							),
626
							(
627
								relay_chain::well_known_keys::ACTIVE_CONFIG.to_vec(),
628
								AbridgedHostConfiguration {
629
									max_code_size: 3_145_728,
630
									max_head_data_size: 20_480,
631
									max_upward_queue_count: 174_762,
632
									max_upward_queue_size: 1_048_576,
633
									max_upward_message_size: 65_531,
634
									max_upward_message_num_per_candidate: 16,
635
									hrmp_max_message_num_per_candidate: 10,
636
									validation_upgrade_cooldown: 14_400,
637
									validation_upgrade_delay: 600,
638
									async_backing_params: AsyncBackingParams {
639
										max_candidate_depth: 3,
640
										allowed_ancestry_len: 2,
641
									},
642
								}
643
								.encode(),
644
							),
645
							// Override current slot number
646
							(
647
								relay_chain::well_known_keys::CURRENT_SLOT.to_vec(),
648
								Slot::from(u64::from(current_para_block)).encode(),
649
							),
650
						];
651

            
652
						let current_para_head = client_for_xcm
653
							.header(block)
654
							.expect("Header lookup should succeed")
655
							.expect("Header passed in as parent should be present in backend.");
656

            
657
						let should_send_go_ahead = match client_for_xcm
658
							.runtime_api()
659
							.collect_collation_info(block, &current_para_head)
660
						{
661
							Ok(info) => info.new_validation_code.is_some(),
662
							Err(e) => {
663
								log::error!("Failed to collect collation info: {:?}", e);
664
								false
665
							}
666
						};
667

            
668
						let mocked_parachain = MockValidationDataInherentDataProvider {
669
							current_para_block,
670
							para_id: ParaId::new(parachain_id),
671
							upgrade_go_ahead: should_send_go_ahead.then(|| {
672
								log::info!(
673
									"Detected pending validation code, sending go-ahead signal."
674
								);
675
								UpgradeGoAhead::GoAhead
676
							}),
677
							current_para_block_head,
678
							relay_offset: 1000,
679
							relay_blocks_per_para_block: 2,
680
							// TODO: Recheck
681
							para_blocks_per_relay_epoch: 10,
682
							relay_randomness_config: (),
683
							xcm_config: MockXcmConfig::new(
684
								&*client_for_xcm,
685
								block,
686
								Default::default(),
687
							),
688
							raw_downward_messages: downward_xcm_receiver.drain().collect(),
689
							raw_horizontal_messages: hrmp_xcm_receiver.drain().collect(),
690
							additional_key_values: Some(additional_key_values),
691
						};
692

            
693
						let randomness = session_keys_primitives::InherentDataProvider;
694

            
695
						Ok((time, mocked_parachain, randomness))
696
					}
697
				},
698
			}),
699
		);
700
	}
701

            
702
	// Sinks for pubsub notifications.
703
	// Everytime a new subscription is created, a new mpsc channel is added to the sink pool.
704
	// The MappingSyncWorker sends through the channel on block import and the subscription emits a
705
	// notification to the subscriber on receiving a message through this channel.
706
	// This way we avoid race conditions when using native substrate block import notification
707
	// stream.
708
	let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
709
		fc_mapping_sync::EthereumBlockNotification<Block>,
710
	> = Default::default();
711
	let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
712

            
713
	/* TODO: only enable this when frontier backend is compatible with lazy-loading
714
	rpc::spawn_essential_tasks(
715
		rpc::SpawnTasksParams {
716
			task_manager: &task_manager,
717
			client: client.clone(),
718
			substrate_backend: backend.clone(),
719
			frontier_backend: frontier_backend.clone(),
720
			filter_pool: filter_pool.clone(),
721
			overrides: overrides.clone(),
722
			fee_history_limit,
723
			fee_history_cache: fee_history_cache.clone(),
724
		},
725
		sync_service.clone(),
726
		pubsub_notification_sinks.clone(),
727
	);
728
	*/
729

            
730
	let ethapi_cmd = rpc_config.ethapi.clone();
731
	let tracing_requesters =
732
		if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
733
			rpc::tracing::spawn_tracing_tasks(
734
				&rpc_config,
735
				prometheus_registry.clone(),
736
				rpc::SpawnTasksParams {
737
					task_manager: &task_manager,
738
					client: client.clone(),
739
					substrate_backend: backend.clone(),
740
					frontier_backend: frontier_backend.clone(),
741
					filter_pool: filter_pool.clone(),
742
					overrides: overrides.clone(),
743
					fee_history_limit,
744
					fee_history_cache: fee_history_cache.clone(),
745
				},
746
			)
747
		} else {
748
			rpc::tracing::RpcRequesters {
749
				debug: None,
750
				trace: None,
751
			}
752
		};
753

            
754
	let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
755
		task_manager.spawn_handle(),
756
		overrides.clone(),
757
		rpc_config.eth_log_block_cache,
758
		rpc_config.eth_statuses_cache,
759
		prometheus_registry,
760
	));
761

            
762
	let rpc_builder = {
763
		let client = client.clone();
764
		let pool = transaction_pool.clone();
765
		let backend = backend.clone();
766
		let network = network.clone();
767
		let sync = sync_service.clone();
768
		let ethapi_cmd = ethapi_cmd.clone();
769
		let max_past_logs = rpc_config.max_past_logs;
770
		let max_block_range = rpc_config.max_block_range;
771
		let overrides = overrides.clone();
772
		let fee_history_cache = fee_history_cache.clone();
773
		let block_data_cache = block_data_cache.clone();
774
		let pubsub_notification_sinks = pubsub_notification_sinks.clone();
775

            
776
		let keystore = keystore_container.keystore();
777
		let command_sink_for_task = command_sink.clone();
778
		move |subscription_task_executor| {
779
			let deps = rpc::FullDeps {
780
				backend: backend.clone(),
781
				client: client.clone(),
782
				command_sink: command_sink_for_task.clone(),
783
				ethapi_cmd: ethapi_cmd.clone(),
784
				filter_pool: filter_pool.clone(),
785
				frontier_backend: Arc::new(LazyLoadingFrontierBackend {
786
					rpc_client: backend.clone().rpc_client.clone(),
787
					frontier_backend: match *frontier_backend {
788
						fc_db::Backend::KeyValue(ref b) => b.clone(),
789
						fc_db::Backend::Sql(ref b) => b.clone(),
790
					},
791
				}),
792
				graph: pool.clone(),
793
				pool: pool.clone(),
794
				is_authority: collator,
795
				max_past_logs,
796
				max_block_range,
797
				fee_history_limit,
798
				fee_history_cache: fee_history_cache.clone(),
799
				network: network.clone(),
800
				sync: sync.clone(),
801
				dev_rpc_data: dev_rpc_data.clone(),
802
				overrides: overrides.clone(),
803
				block_data_cache: block_data_cache.clone(),
804
				forced_parent_hashes: None,
805
			};
806

            
807
			let pending_consensus_data_provider = Box::new(PendingConsensusDataProvider::new(
808
				client.clone(),
809
				keystore.clone(),
810
			));
811
			if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
812
				rpc::create_full(
813
					deps,
814
					subscription_task_executor,
815
					Some(crate::rpc::TracingConfig {
816
						tracing_requesters: tracing_requesters.clone(),
817
						trace_filter_max_count: rpc_config.ethapi_trace_max_count,
818
					}),
819
					pubsub_notification_sinks.clone(),
820
					pending_consensus_data_provider,
821
				)
822
				.map_err(Into::into)
823
			} else {
824
				rpc::create_full(
825
					deps,
826
					subscription_task_executor,
827
					None,
828
					pubsub_notification_sinks.clone(),
829
					pending_consensus_data_provider,
830
				)
831
				.map_err(Into::into)
832
			}
833
		}
834
	};
835

            
836
	let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
837
		network,
838
		client,
839
		keystore: keystore_container.keystore(),
840
		task_manager: &mut task_manager,
841
		transaction_pool,
842
		rpc_builder: Box::new(rpc_builder),
843
		backend,
844
		system_rpc_tx,
845
		sync_service: sync_service.clone(),
846
		config,
847
		tx_handler_controller,
848
		telemetry: None,
849
	})?;
850

            
851
	if let Some(hwbench) = hwbench {
852
		sc_sysinfo::print_hwbench(&hwbench);
853

            
854
		if let Some(ref mut telemetry) = telemetry {
855
			let telemetry_handle = telemetry.handle();
856
			task_manager.spawn_handle().spawn(
857
				"telemetry_hwbench",
858
				None,
859
				sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
860
			);
861
		}
862
	}
863

            
864
	log::info!("Service Ready");
865

            
866
	Ok(task_manager)
867
}
868

            
869
pub fn spec_builder() -> sc_chain_spec::ChainSpecBuilder<Extensions, HostFunctions> {
870
	crate::chain_spec::moonbeam::ChainSpec::builder(
871
		moonbeam_runtime::WASM_BINARY.expect("WASM binary was not build, please build it!"),
872
		Default::default(),
873
	)
874
	.with_name("Lazy Loading")
875
	.with_id("lazy_loading")
876
	.with_chain_type(ChainType::Development)
877
	.with_properties(
878
		serde_json::from_str(
879
			"{\"tokenDecimals\": 18, \"tokenSymbol\": \"GLMR\", \"SS58Prefix\": 1284}",
880
		)
881
		.expect("Provided valid json map"),
882
	)
883
	.with_genesis_config_preset_name(sp_genesis_builder::DEV_RUNTIME_PRESET)
884
}