1
// Copyright 2024 Moonbeam foundation
2
// This file is part of Moonbeam.
3

            
4
// Moonbeam is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Moonbeam is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Moonbeam.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
use crate::{
18
	lazy_loading, open_frontier_backend, rpc, set_prometheus_registry, BlockImportPipeline,
19
	ClientCustomizations, FrontierBlockImport, HostFunctions, PartialComponentsResult,
20
	PendingConsensusDataProvider, RuntimeApiCollection, SOFT_DEADLINE_PERCENT,
21
};
22
use cumulus_client_parachain_inherent::{MockValidationDataInherentDataProvider, MockXcmConfig};
23
use cumulus_primitives_core::{relay_chain, BlockT, CollectCollationInfo, ParaId};
24
use cumulus_primitives_parachain_inherent::ParachainInherentData;
25
use cumulus_test_relay_sproof_builder::RelayStateSproofBuilder;
26
use fc_rpc::StorageOverrideHandler;
27
use fc_rpc_core::types::{FeeHistoryCache, FilterPool};
28
use futures::{FutureExt, StreamExt};
29
use moonbeam_cli_opt::{EthApi as EthApiCmd, LazyLoadingConfig, RpcConfig};
30
use moonbeam_core_primitives::{Block, Hash};
31
use nimbus_consensus::NimbusManualSealConsensusDataProvider;
32
use nimbus_primitives::NimbusId;
33
use parity_scale_codec::Encode;
34
use polkadot_primitives::{
35
	AbridgedHostConfiguration, AsyncBackingParams, PersistedValidationData, Slot, UpgradeGoAhead,
36
};
37
use sc_chain_spec::{get_extension, BuildGenesisBlock, GenesisBlockBuilder};
38
use sc_client_api::{Backend, BadBlocks, ExecutorProvider, ForkBlocks};
39
use sc_executor::{HeapAllocStrategy, RuntimeVersionOf, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
40
use sc_network::config::FullNetworkConfiguration;
41
use sc_network::NetworkBackend;
42
use sc_network_common::sync::SyncMode;
43
use sc_service::{
44
	error::Error as ServiceError, ClientConfig, Configuration, Error, KeystoreContainer,
45
	LocalCallExecutor, PartialComponents, TaskManager,
46
};
47
use sc_telemetry::{TelemetryHandle, TelemetryWorker};
48
use sc_transaction_pool_api::{OffchainTransactionPoolFactory, TransactionPool};
49
use sp_api::{ConstructRuntimeApi, ProvideRuntimeApi};
50
use sp_blockchain::HeaderBackend;
51
use sp_core::traits::CodeExecutor;
52
use sp_core::H256;
53
use sp_runtime::traits::NumberFor;
54
use std::collections::BTreeMap;
55
use std::str::FromStr;
56
use std::sync::{Arc, Mutex};
57
use std::time::Duration;
58

            
59
pub mod backend;
60
pub mod call_executor;
61
mod client;
62
mod helpers;
63
mod lock;
64
mod manual_sealing;
65
mod state_overrides;
66

            
67
pub const LAZY_LOADING_LOG_TARGET: &'static str = "lazy-loading";
68

            
69
/// Lazy loading client type.
70
pub type TLazyLoadingClient<TBl, TRtApi, TExec> = sc_service::client::Client<
71
	TLazyLoadingBackend<TBl>,
72
	TLazyLoadingCallExecutor<TBl, TExec>,
73
	TBl,
74
	TRtApi,
75
>;
76

            
77
/// Lazy loading client backend type.
78
pub type TLazyLoadingBackend<TBl> = backend::Backend<TBl>;
79

            
80
/// Lazy loading client call executor type.
81
pub type TLazyLoadingCallExecutor<TBl, TExec> = call_executor::LazyLoadingCallExecutor<
82
	TBl,
83
	LocalCallExecutor<TBl, TLazyLoadingBackend<TBl>, TExec>,
84
>;
85

            
86
/// Lazy loading parts type.
87
pub type TLazyLoadingParts<TBl, TRtApi, TExec> = (
88
	TLazyLoadingClient<TBl, TRtApi, TExec>,
89
	Arc<TLazyLoadingBackend<TBl>>,
90
	KeystoreContainer,
91
	TaskManager,
92
);
93

            
94
type LazyLoadingClient<RuntimeApi> =
95
	TLazyLoadingClient<Block, RuntimeApi, WasmExecutor<HostFunctions>>;
96
type LazyLoadingBackend = TLazyLoadingBackend<Block>;
97

            
98
/// Create the initial parts of a lazy loading node.
99
pub fn new_lazy_loading_parts<TBl, TRtApi, TExec>(
100
	config: &mut Configuration,
101
	lazy_loading_config: &LazyLoadingConfig,
102
	telemetry: Option<TelemetryHandle>,
103
	executor: TExec,
104
) -> Result<TLazyLoadingParts<TBl, TRtApi, TExec>, Error>
105
where
106
	TBl: BlockT + sp_runtime::DeserializeOwned,
107
	TBl::Hash: From<H256>,
108
	TExec: CodeExecutor + RuntimeVersionOf + Clone,
109
{
110
	let backend = backend::new_lazy_loading_backend(config, &lazy_loading_config)?;
111

            
112
	let genesis_block_builder = GenesisBlockBuilder::new(
113
		config.chain_spec.as_storage_builder(),
114
		!config.no_genesis(),
115
		backend.clone(),
116
		executor.clone(),
117
	)?;
118

            
119
	new_lazy_loading_parts_with_genesis_builder(
120
		config,
121
		telemetry,
122
		executor,
123
		backend,
124
		genesis_block_builder,
125
	)
126
}
127

            
128
/// Create the initial parts of a lazy loading node.
129
pub fn new_lazy_loading_parts_with_genesis_builder<TBl, TRtApi, TExec, TBuildGenesisBlock>(
130
	config: &Configuration,
131
	telemetry: Option<TelemetryHandle>,
132
	executor: TExec,
133
	backend: Arc<TLazyLoadingBackend<TBl>>,
134
	genesis_block_builder: TBuildGenesisBlock,
135
) -> Result<TLazyLoadingParts<TBl, TRtApi, TExec>, Error>
136
where
137
	TBl: BlockT + sp_runtime::DeserializeOwned,
138
	TBl::Hash: From<H256>,
139
	TExec: CodeExecutor + RuntimeVersionOf + Clone,
140
	TBuildGenesisBlock:
141
		BuildGenesisBlock<
142
			TBl,
143
			BlockImportOperation = <TLazyLoadingBackend<TBl> as sc_client_api::backend::Backend<
144
				TBl,
145
			>>::BlockImportOperation,
146
		>,
147
{
148
	let keystore_container = KeystoreContainer::new(&config.keystore)?;
149

            
150
	let task_manager = {
151
		let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
152
		TaskManager::new(config.tokio_handle.clone(), registry)?
153
	};
154

            
155
	let chain_spec = &config.chain_spec;
156
	let fork_blocks = get_extension::<ForkBlocks<TBl>>(chain_spec.extensions())
157
		.cloned()
158
		.unwrap_or_default();
159

            
160
	let bad_blocks = get_extension::<BadBlocks<TBl>>(chain_spec.extensions())
161
		.cloned()
162
		.unwrap_or_default();
163

            
164
	let client = {
165
		let extensions = sc_client_api::execution_extensions::ExecutionExtensions::new(
166
			None,
167
			Arc::new(executor.clone()),
168
		);
169

            
170
		let wasm_runtime_substitutes = config
171
			.chain_spec
172
			.code_substitutes()
173
			.into_iter()
174
			.map(|(n, c)| {
175
				let number = NumberFor::<TBl>::from_str(&n).map_err(|_| {
176
					Error::Application(Box::from(format!(
177
						"Failed to parse `{}` as block number for code substitutes. \
178
						 In an old version the key for code substitute was a block hash. \
179
						 Please update the chain spec to a version that is compatible with your node.",
180
						n
181
					)))
182
				})?;
183
				Ok((number, c))
184
			})
185
			.collect::<Result<std::collections::HashMap<_, _>, Error>>()?;
186

            
187
		let client = client::new_client(
188
			backend.clone(),
189
			executor,
190
			genesis_block_builder,
191
			fork_blocks,
192
			bad_blocks,
193
			extensions,
194
			Box::new(task_manager.spawn_handle()),
195
			config
196
				.prometheus_config
197
				.as_ref()
198
				.map(|config| config.registry.clone()),
199
			telemetry,
200
			ClientConfig {
201
				offchain_worker_enabled: config.offchain_worker.enabled,
202
				offchain_indexing_api: config.offchain_worker.indexing_enabled,
203
				wasmtime_precompiled: config.executor.wasmtime_precompiled.clone(),
204
				wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
205
				no_genesis: matches!(
206
					config.network.sync_mode,
207
					SyncMode::LightState { .. } | SyncMode::Warp { .. }
208
				),
209
				wasm_runtime_substitutes,
210
				enable_import_proof_recording: true,
211
			},
212
		)?;
213

            
214
		client
215
	};
216

            
217
	Ok((client, backend, keystore_container, task_manager))
218
}
219

            
220
/// Builds the PartialComponents for a lazy loading node.
221
#[allow(clippy::type_complexity)]
222
pub fn new_lazy_loading_partial<RuntimeApi, Customizations>(
223
	config: &mut Configuration,
224
	rpc_config: &RpcConfig,
225
	lazy_loading_config: &LazyLoadingConfig,
226
) -> PartialComponentsResult<LazyLoadingClient<RuntimeApi>, LazyLoadingBackend>
227
where
228
	RuntimeApi: ConstructRuntimeApi<Block, LazyLoadingClient<RuntimeApi>> + Send + Sync + 'static,
229
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
230
	Customizations: ClientCustomizations + 'static,
231
{
232
	set_prometheus_registry(config, rpc_config.no_prometheus_prefix)?;
233

            
234
	// Use ethereum style for subscription ids
235
	config.rpc.id_provider = Some(Box::new(fc_rpc::EthereumSubIdProvider));
236

            
237
	let telemetry = config
238
		.telemetry_endpoints
239
		.clone()
240
		.filter(|x| !x.is_empty())
241
		.map(|endpoints| -> Result<_, sc_telemetry::Error> {
242
			let worker = TelemetryWorker::new(16)?;
243
			let telemetry = worker.handle().new_telemetry(endpoints);
244
			Ok((worker, telemetry))
245
		})
246
		.transpose()?;
247

            
248
	let heap_pages = config
249
		.executor
250
		.default_heap_pages
251
		.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
252
			extra_pages: h as _,
253
		});
254
	let mut wasm_builder = WasmExecutor::builder()
255
		.with_execution_method(config.executor.wasm_method)
256
		.with_onchain_heap_alloc_strategy(heap_pages)
257
		.with_offchain_heap_alloc_strategy(heap_pages)
258
		.with_ignore_onchain_heap_pages(true)
259
		.with_max_runtime_instances(config.executor.max_runtime_instances)
260
		.with_runtime_cache_size(config.executor.runtime_cache_size);
261

            
262
	if let Some(ref wasmtime_precompiled_path) = config.executor.wasmtime_precompiled {
263
		wasm_builder = wasm_builder.with_wasmtime_precompiled_path(wasmtime_precompiled_path);
264
	}
265

            
266
	let executor = wasm_builder.build();
267

            
268
	let (client, backend, keystore_container, task_manager) =
269
		new_lazy_loading_parts::<Block, RuntimeApi, _>(
270
			config,
271
			lazy_loading_config,
272
			telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
273
			executor,
274
		)?;
275

            
276
	if let Some(block_number) = Customizations::first_block_number_compatible_with_ed25519_zebra() {
277
		client
278
			.execution_extensions()
279
			.set_extensions_factory(sc_client_api::execution_extensions::ExtensionBeforeBlock::<
280
			Block,
281
			sp_io::UseDalekExt,
282
		>::new(block_number));
283
	}
284

            
285
	let client = Arc::new(client);
286

            
287
	let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
288

            
289
	let telemetry = telemetry.map(|(worker, telemetry)| {
290
		task_manager
291
			.spawn_handle()
292
			.spawn("telemetry", None, worker.run());
293
		telemetry
294
	});
295

            
296
	let maybe_select_chain = Some(sc_consensus::LongestChain::new(backend.clone()));
297

            
298
	let transaction_pool = sc_transaction_pool::Builder::new(
299
		task_manager.spawn_essential_handle(),
300
		client.clone(),
301
		config.role.is_authority().into(),
302
	)
303
	.with_options(config.transaction_pool.clone())
304
	.with_prometheus(config.prometheus_registry())
305
	.build();
306

            
307
	let filter_pool: Option<FilterPool> = Some(Arc::new(Mutex::new(BTreeMap::new())));
308
	let fee_history_cache: FeeHistoryCache = Arc::new(Mutex::new(BTreeMap::new()));
309

            
310
	let frontier_backend = Arc::new(open_frontier_backend(client.clone(), config, rpc_config)?);
311
	let frontier_block_import = FrontierBlockImport::new(client.clone(), client.clone());
312

            
313
	let create_inherent_data_providers = move |_, _| async move {
314
		let time = sp_timestamp::InherentDataProvider::from_system_time();
315
		// Create a dummy parachain inherent data provider which is required to pass
316
		// the checks by the para chain system. We use dummy values because in the 'pending context'
317
		// neither do we have access to the real values nor do we need them.
318
		let (relay_parent_storage_root, relay_chain_state) =
319
			RelayStateSproofBuilder::default().into_state_root_and_proof();
320
		let vfp = PersistedValidationData {
321
			// This is a hack to make `cumulus_pallet_parachain_system::RelayNumberStrictlyIncreases`
322
			// happy. Relay parent number can't be bigger than u32::MAX.
323
			relay_parent_number: u32::MAX,
324
			relay_parent_storage_root,
325
			..Default::default()
326
		};
327
		let parachain_inherent_data = ParachainInherentData {
328
			validation_data: vfp,
329
			relay_chain_state,
330
			downward_messages: Default::default(),
331
			horizontal_messages: Default::default(),
332
		};
333
		Ok((time, parachain_inherent_data))
334
	};
335

            
336
	let import_queue = nimbus_consensus::import_queue(
337
		client.clone(),
338
		frontier_block_import.clone(),
339
		create_inherent_data_providers,
340
		&task_manager.spawn_essential_handle(),
341
		config.prometheus_registry(),
342
		false,
343
	)?;
344
	let block_import = BlockImportPipeline::Dev(frontier_block_import);
345

            
346
	Ok(PartialComponents {
347
		backend,
348
		client,
349
		import_queue,
350
		keystore_container,
351
		task_manager,
352
		transaction_pool: transaction_pool.into(),
353
		select_chain: maybe_select_chain,
354
		other: (
355
			block_import,
356
			filter_pool,
357
			telemetry,
358
			telemetry_worker_handle,
359
			frontier_backend,
360
			fee_history_cache,
361
		),
362
	})
363
}
364

            
365
/// Builds a new lazy loading service. This service uses manual seal, and mocks
366
/// the parachain inherent.
367
#[sc_tracing::logging::prefix_logs_with("Lazy loading 🌗")]
368
pub async fn new_lazy_loading_service<RuntimeApi, Customizations, Net>(
369
	mut config: Configuration,
370
	_author_id: Option<NimbusId>,
371
	sealing: moonbeam_cli_opt::Sealing,
372
	rpc_config: RpcConfig,
373
	lazy_loading_config: LazyLoadingConfig,
374
	hwbench: Option<sc_sysinfo::HwBench>,
375
) -> Result<TaskManager, ServiceError>
376
where
377
	RuntimeApi: ConstructRuntimeApi<Block, LazyLoadingClient<RuntimeApi>> + Send + Sync + 'static,
378
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
379
	Customizations: ClientCustomizations + 'static,
380
	Net: NetworkBackend<Block, Hash>,
381
{
382
	use async_io::Timer;
383
	use futures::Stream;
384
	use sc_consensus_manual_seal::{EngineCommand, ManualSealParams};
385

            
386
	let sc_service::PartialComponents {
387
		client,
388
		backend,
389
		mut task_manager,
390
		import_queue,
391
		keystore_container,
392
		select_chain: maybe_select_chain,
393
		transaction_pool,
394
		other:
395
			(
396
				block_import_pipeline,
397
				filter_pool,
398
				mut telemetry,
399
				_telemetry_worker_handle,
400
				frontier_backend,
401
				fee_history_cache,
402
			),
403
	} = lazy_loading::new_lazy_loading_partial::<RuntimeApi, Customizations>(
404
		&mut config,
405
		&rpc_config,
406
		&lazy_loading_config,
407
	)?;
408

            
409
	let start_delay = 10;
410
	let lazy_loading_startup_disclaimer = format!(
411
		r#"
412

            
413
		You are now running the Moonbeam client in lazy loading mode, where data is retrieved
414
		from a live RPC node on demand.
415

            
416
		Using remote state from: {rpc}
417
		Forking from block: {fork_block}
418

            
419
		To ensure the client works properly, please note the following:
420

            
421
		    1. *Avoid Throttling*: Ensure that the backing RPC node is not limiting the number of
422
		    requests, as this can prevent the lazy loading client from functioning correctly;
423

            
424
		    2. *Be Patient*: As the client may take approximately 20 times longer than normal to
425
		    retrieve and process the necessary data for the requested operation.
426

            
427

            
428
		The service will start in {start_delay} seconds...
429

            
430
		"#,
431
		rpc = lazy_loading_config.state_rpc,
432
		fork_block = backend.fork_checkpoint.number
433
	);
434

            
435
	log::warn!(
436
		"{}",
437
		ansi_term::Colour::Yellow.paint(lazy_loading_startup_disclaimer)
438
	);
439
	tokio::time::sleep(Duration::from_secs(start_delay)).await;
440

            
441
	let block_import = if let BlockImportPipeline::Dev(block_import) = block_import_pipeline {
442
		block_import
443
	} else {
444
		return Err(ServiceError::Other(
445
			"Block import pipeline is not dev".to_string(),
446
		));
447
	};
448

            
449
	let prometheus_registry = config.prometheus_registry().cloned();
450
	let net_config =
451
		FullNetworkConfiguration::<_, _, Net>::new(&config.network, prometheus_registry.clone());
452

            
453
	let metrics = Net::register_notification_metrics(
454
		config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
455
	);
456

            
457
	let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
458
		sc_service::build_network(sc_service::BuildNetworkParams {
459
			config: &config,
460
			client: client.clone(),
461
			transaction_pool: transaction_pool.clone(),
462
			spawn_handle: task_manager.spawn_handle(),
463
			import_queue,
464
			block_announce_validator_builder: None,
465
			warp_sync_config: None,
466
			net_config,
467
			block_relay: None,
468
			metrics,
469
		})?;
470

            
471
	if config.offchain_worker.enabled {
472
		task_manager.spawn_handle().spawn(
473
			"offchain-workers-runner",
474
			"offchain-work",
475
			sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
476
				runtime_api_provider: client.clone(),
477
				keystore: Some(keystore_container.keystore()),
478
				offchain_db: backend.offchain_storage(),
479
				transaction_pool: Some(OffchainTransactionPoolFactory::new(
480
					transaction_pool.clone(),
481
				)),
482
				network_provider: Arc::new(network.clone()),
483
				is_validator: config.role.is_authority(),
484
				enable_http_requests: true,
485
				custom_extensions: move |_| vec![],
486
			})?
487
			.run(client.clone(), task_manager.spawn_handle())
488
			.boxed(),
489
		);
490
	}
491

            
492
	let prometheus_registry = config.prometheus_registry().cloned();
493
	let overrides = Arc::new(StorageOverrideHandler::new(client.clone()));
494
	let fee_history_limit = rpc_config.fee_history_limit;
495
	let mut command_sink = None;
496
	let mut dev_rpc_data = None;
497
	let collator = config.role.is_authority();
498

            
499
	if collator {
500
		let mut env = sc_basic_authorship::ProposerFactory::with_proof_recording(
501
			task_manager.spawn_handle(),
502
			client.clone(),
503
			transaction_pool.clone(),
504
			prometheus_registry.as_ref(),
505
			telemetry.as_ref().map(|x| x.handle()),
506
		);
507
		env.set_soft_deadline(SOFT_DEADLINE_PERCENT);
508

            
509
		let commands_stream: Box<dyn Stream<Item = EngineCommand<H256>> + Send + Sync + Unpin> =
510
			match sealing {
511
				moonbeam_cli_opt::Sealing::Instant => {
512
					Box::new(
513
						// This bit cribbed from the implementation of instant seal.
514
						transaction_pool.import_notification_stream().map(|_| {
515
							EngineCommand::SealNewBlock {
516
								create_empty: false,
517
								finalize: false,
518
								parent_hash: None,
519
								sender: None,
520
							}
521
						}),
522
					)
523
				}
524
				moonbeam_cli_opt::Sealing::Manual => {
525
					let (sink, stream) = futures::channel::mpsc::channel(1000);
526
					// Keep a reference to the other end of the channel. It goes to the RPC.
527
					command_sink = Some(sink);
528
					Box::new(stream)
529
				}
530
				moonbeam_cli_opt::Sealing::Interval(millis) => Box::new(StreamExt::map(
531
					Timer::interval(Duration::from_millis(millis)),
532
					|_| EngineCommand::SealNewBlock {
533
						create_empty: true,
534
						finalize: false,
535
						parent_hash: None,
536
						sender: None,
537
					},
538
				)),
539
			};
540

            
541
		let select_chain = maybe_select_chain.expect(
542
			"`new_lazy_loading_partial` builds a `LongestChainRule` when building dev service.\
543
				We specified the dev service when calling `new_partial`.\
544
				Therefore, a `LongestChainRule` is present. qed.",
545
		);
546

            
547
		// Create channels for mocked XCM messages.
548
		let (downward_xcm_sender, downward_xcm_receiver) = flume::bounded::<Vec<u8>>(100);
549
		let (hrmp_xcm_sender, hrmp_xcm_receiver) = flume::bounded::<(ParaId, Vec<u8>)>(100);
550
		let additional_relay_offset = Arc::new(std::sync::atomic::AtomicU32::new(0));
551
		dev_rpc_data = Some((
552
			downward_xcm_sender,
553
			hrmp_xcm_sender,
554
			additional_relay_offset,
555
		));
556

            
557
		// Need to clone it and store here to avoid moving of `client`
558
		// variable in closure below.
559
		let client_vrf = client.clone();
560

            
561
		let keystore_clone = keystore_container.keystore().clone();
562
		let maybe_provide_vrf_digest =
563
			move |nimbus_id: NimbusId, parent: Hash| -> Option<sp_runtime::generic::DigestItem> {
564
				moonbeam_vrf::vrf_pre_digest::<Block, LazyLoadingClient<RuntimeApi>>(
565
					&client_vrf,
566
					&keystore_clone,
567
					nimbus_id,
568
					parent,
569
				)
570
			};
571

            
572
		let parachain_id = helpers::get_parachain_id(backend.rpc_client.clone())
573
			.unwrap_or_else(|| panic!("Could not get parachain identifier for lazy loading mode."));
574

            
575
		// Need to clone it and store here to avoid moving of `client`
576
		// variable in closure below.
577
		let client_for_cidp = client.clone();
578

            
579
		task_manager.spawn_essential_handle().spawn_blocking(
580
			"authorship_task",
581
			Some("block-authoring"),
582
			manual_sealing::run_manual_seal(ManualSealParams {
583
				block_import,
584
				env,
585
				client: client.clone(),
586
				pool: transaction_pool.clone(),
587
				commands_stream,
588
				select_chain,
589
				consensus_data_provider: Some(Box::new(NimbusManualSealConsensusDataProvider {
590
					keystore: keystore_container.keystore(),
591
					client: client.clone(),
592
					additional_digests_provider: maybe_provide_vrf_digest,
593
					_phantom: Default::default(),
594
				})),
595
				create_inherent_data_providers: move |block: H256, ()| {
596
					let maybe_current_para_block = client_for_cidp.number(block);
597
					let maybe_current_para_head = client_for_cidp.expect_header(block);
598
					let downward_xcm_receiver = downward_xcm_receiver.clone();
599
					let hrmp_xcm_receiver = hrmp_xcm_receiver.clone();
600

            
601
					// Need to clone it and store here to avoid moving of `client`
602
					// variable in closure below.
603
					let client_for_xcm = client_for_cidp.clone();
604

            
605
					async move {
606
						let time = sp_timestamp::InherentDataProvider::from_system_time();
607

            
608
						let current_para_block = maybe_current_para_block?
609
							.ok_or(sp_blockchain::Error::UnknownBlock(block.to_string()))?;
610

            
611
						let current_para_block_head = Some(polkadot_primitives::HeadData(
612
							maybe_current_para_head?.encode(),
613
						));
614

            
615
						let additional_key_values = vec![
616
							(
617
								moonbeam_core_primitives::well_known_relay_keys::TIMESTAMP_NOW
618
									.to_vec(),
619
								sp_timestamp::Timestamp::current().encode(),
620
							),
621
							(
622
								relay_chain::well_known_keys::ACTIVE_CONFIG.to_vec(),
623
								AbridgedHostConfiguration {
624
									max_code_size: 3_145_728,
625
									max_head_data_size: 20_480,
626
									max_upward_queue_count: 174_762,
627
									max_upward_queue_size: 1_048_576,
628
									max_upward_message_size: 65_531,
629
									max_upward_message_num_per_candidate: 16,
630
									hrmp_max_message_num_per_candidate: 10,
631
									validation_upgrade_cooldown: 14_400,
632
									validation_upgrade_delay: 600,
633
									async_backing_params: AsyncBackingParams {
634
										max_candidate_depth: 3,
635
										allowed_ancestry_len: 2,
636
									},
637
								}
638
								.encode(),
639
							),
640
							// Override current slot number
641
							(
642
								relay_chain::well_known_keys::CURRENT_SLOT.to_vec(),
643
								Slot::from(u64::from(current_para_block)).encode(),
644
							),
645
						];
646

            
647
						let current_para_head = client_for_xcm
648
							.header(block)
649
							.expect("Header lookup should succeed")
650
							.expect("Header passed in as parent should be present in backend.");
651

            
652
						let should_send_go_ahead = match client_for_xcm
653
							.runtime_api()
654
							.collect_collation_info(block, &current_para_head)
655
						{
656
							Ok(info) => info.new_validation_code.is_some(),
657
							Err(e) => {
658
								log::error!("Failed to collect collation info: {:?}", e);
659
								false
660
							}
661
						};
662

            
663
						let mocked_parachain = MockValidationDataInherentDataProvider {
664
							current_para_block,
665
							para_id: ParaId::new(parachain_id),
666
							upgrade_go_ahead: should_send_go_ahead.then(|| {
667
								log::info!(
668
									"Detected pending validation code, sending go-ahead signal."
669
								);
670
								UpgradeGoAhead::GoAhead
671
							}),
672
							current_para_block_head,
673
							relay_offset: 1000,
674
							relay_blocks_per_para_block: 2,
675
							// TODO: Recheck
676
							para_blocks_per_relay_epoch: 10,
677
							relay_randomness_config: (),
678
							xcm_config: MockXcmConfig::new(
679
								&*client_for_xcm,
680
								block,
681
								Default::default(),
682
							),
683
							raw_downward_messages: downward_xcm_receiver.drain().collect(),
684
							raw_horizontal_messages: hrmp_xcm_receiver.drain().collect(),
685
							additional_key_values: Some(additional_key_values),
686
						};
687

            
688
						let randomness = session_keys_primitives::InherentDataProvider;
689

            
690
						Ok((time, mocked_parachain, randomness))
691
					}
692
				},
693
			}),
694
		);
695
	}
696

            
697
	// Sinks for pubsub notifications.
698
	// Everytime a new subscription is created, a new mpsc channel is added to the sink pool.
699
	// The MappingSyncWorker sends through the channel on block import and the subscription emits a
700
	// notification to the subscriber on receiving a message through this channel.
701
	// This way we avoid race conditions when using native substrate block import notification
702
	// stream.
703
	let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
704
		fc_mapping_sync::EthereumBlockNotification<Block>,
705
	> = Default::default();
706
	let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
707

            
708
	/* TODO: only enable this when frontier backend is compatible with lazy-loading
709
	rpc::spawn_essential_tasks(
710
		rpc::SpawnTasksParams {
711
			task_manager: &task_manager,
712
			client: client.clone(),
713
			substrate_backend: backend.clone(),
714
			frontier_backend: frontier_backend.clone(),
715
			filter_pool: filter_pool.clone(),
716
			overrides: overrides.clone(),
717
			fee_history_limit,
718
			fee_history_cache: fee_history_cache.clone(),
719
		},
720
		sync_service.clone(),
721
		pubsub_notification_sinks.clone(),
722
	);
723
	*/
724

            
725
	let ethapi_cmd = rpc_config.ethapi.clone();
726
	let tracing_requesters =
727
		if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
728
			rpc::tracing::spawn_tracing_tasks(
729
				&rpc_config,
730
				prometheus_registry.clone(),
731
				rpc::SpawnTasksParams {
732
					task_manager: &task_manager,
733
					client: client.clone(),
734
					substrate_backend: backend.clone(),
735
					frontier_backend: frontier_backend.clone(),
736
					filter_pool: filter_pool.clone(),
737
					overrides: overrides.clone(),
738
					fee_history_limit,
739
					fee_history_cache: fee_history_cache.clone(),
740
				},
741
			)
742
		} else {
743
			rpc::tracing::RpcRequesters {
744
				debug: None,
745
				trace: None,
746
			}
747
		};
748

            
749
	let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
750
		task_manager.spawn_handle(),
751
		overrides.clone(),
752
		rpc_config.eth_log_block_cache,
753
		rpc_config.eth_statuses_cache,
754
		prometheus_registry,
755
	));
756

            
757
	let rpc_builder = {
758
		let client = client.clone();
759
		let pool = transaction_pool.clone();
760
		let backend = backend.clone();
761
		let network = network.clone();
762
		let sync = sync_service.clone();
763
		let ethapi_cmd = ethapi_cmd.clone();
764
		let max_past_logs = rpc_config.max_past_logs;
765
		let max_block_range = rpc_config.max_block_range;
766
		let overrides = overrides.clone();
767
		let fee_history_cache = fee_history_cache.clone();
768
		let block_data_cache = block_data_cache.clone();
769
		let pubsub_notification_sinks = pubsub_notification_sinks.clone();
770

            
771
		let keystore = keystore_container.keystore();
772
		let command_sink_for_task = command_sink.clone();
773
		move |subscription_task_executor| {
774
			let deps = rpc::FullDeps {
775
				backend: backend.clone(),
776
				client: client.clone(),
777
				command_sink: command_sink_for_task.clone(),
778
				ethapi_cmd: ethapi_cmd.clone(),
779
				filter_pool: filter_pool.clone(),
780
				frontier_backend: match *frontier_backend {
781
					fc_db::Backend::KeyValue(ref b) => b.clone(),
782
					fc_db::Backend::Sql(ref b) => b.clone(),
783
				},
784
				graph: pool.clone(),
785
				pool: pool.clone(),
786
				is_authority: collator,
787
				max_past_logs,
788
				max_block_range,
789
				fee_history_limit,
790
				fee_history_cache: fee_history_cache.clone(),
791
				network: network.clone(),
792
				sync: sync.clone(),
793
				dev_rpc_data: dev_rpc_data.clone(),
794
				overrides: overrides.clone(),
795
				block_data_cache: block_data_cache.clone(),
796
				forced_parent_hashes: None,
797
			};
798

            
799
			let pending_consensus_data_provider = Box::new(PendingConsensusDataProvider::new(
800
				client.clone(),
801
				keystore.clone(),
802
			));
803
			if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
804
				rpc::create_full(
805
					deps,
806
					subscription_task_executor,
807
					Some(crate::rpc::TracingConfig {
808
						tracing_requesters: tracing_requesters.clone(),
809
						trace_filter_max_count: rpc_config.ethapi_trace_max_count,
810
					}),
811
					pubsub_notification_sinks.clone(),
812
					pending_consensus_data_provider,
813
				)
814
				.map_err(Into::into)
815
			} else {
816
				rpc::create_full(
817
					deps,
818
					subscription_task_executor,
819
					None,
820
					pubsub_notification_sinks.clone(),
821
					pending_consensus_data_provider,
822
				)
823
				.map_err(Into::into)
824
			}
825
		}
826
	};
827

            
828
	let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
829
		network,
830
		client,
831
		keystore: keystore_container.keystore(),
832
		task_manager: &mut task_manager,
833
		transaction_pool,
834
		rpc_builder: Box::new(rpc_builder),
835
		backend,
836
		system_rpc_tx,
837
		sync_service: sync_service.clone(),
838
		config,
839
		tx_handler_controller,
840
		telemetry: None,
841
	})?;
842

            
843
	if let Some(hwbench) = hwbench {
844
		sc_sysinfo::print_hwbench(&hwbench);
845

            
846
		if let Some(ref mut telemetry) = telemetry {
847
			let telemetry_handle = telemetry.handle();
848
			task_manager.spawn_handle().spawn(
849
				"telemetry_hwbench",
850
				None,
851
				sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
852
			);
853
		}
854
	}
855

            
856
	network_starter.start_network();
857

            
858
	log::info!("Service Ready");
859

            
860
	Ok(task_manager)
861
}