1
// Copyright 2024 Moonbeam foundation
2
// This file is part of Moonbeam.
3

            
4
// Moonbeam is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Moonbeam is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Moonbeam.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
use crate::{
18
	lazy_loading, open_frontier_backend, rpc, set_prometheus_registry, BlockImportPipeline,
19
	ClientCustomizations, FrontierBlockImport, HostFunctions, PartialComponentsResult,
20
	PendingConsensusDataProvider, RuntimeApiCollection, SOFT_DEADLINE_PERCENT,
21
};
22
use cumulus_client_parachain_inherent::{MockValidationDataInherentDataProvider, MockXcmConfig};
23
use cumulus_primitives_core::{relay_chain, BlockT, ParaId};
24
use cumulus_primitives_parachain_inherent::ParachainInherentData;
25
use cumulus_test_relay_sproof_builder::RelayStateSproofBuilder;
26
use fc_rpc::StorageOverrideHandler;
27
use fc_rpc_core::types::{FeeHistoryCache, FilterPool};
28
use futures::{FutureExt, StreamExt};
29
use moonbeam_cli_opt::{EthApi as EthApiCmd, LazyLoadingConfig, RpcConfig};
30
use moonbeam_core_primitives::{Block, Hash};
31
use nimbus_consensus::NimbusManualSealConsensusDataProvider;
32
use nimbus_primitives::NimbusId;
33
use parity_scale_codec::Encode;
34
use polkadot_primitives::{
35
	AbridgedHostConfiguration, AsyncBackingParams, PersistedValidationData, Slot, UpgradeGoAhead,
36
};
37
use sc_chain_spec::{get_extension, BuildGenesisBlock, GenesisBlockBuilder};
38
use sc_client_api::{Backend, BadBlocks, ExecutorProvider, ForkBlocks, StorageProvider};
39
use sc_executor::{HeapAllocStrategy, RuntimeVersionOf, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
40
use sc_network::config::FullNetworkConfiguration;
41
use sc_network::NetworkBackend;
42
use sc_network_common::sync::SyncMode;
43
use sc_service::{
44
	error::Error as ServiceError, ClientConfig, Configuration, Error, KeystoreContainer,
45
	PartialComponents, TaskManager,
46
};
47
use sc_telemetry::{TelemetryHandle, TelemetryWorker};
48
use sc_transaction_pool_api::OffchainTransactionPoolFactory;
49
use sp_api::ConstructRuntimeApi;
50
use sp_blockchain::HeaderBackend;
51
use sp_core::traits::CodeExecutor;
52
use sp_core::{twox_128, H256};
53
use sp_runtime::traits::NumberFor;
54
use sp_storage::StorageKey;
55
use std::collections::BTreeMap;
56
use std::str::FromStr;
57
use std::sync::{Arc, Mutex};
58
use std::time::Duration;
59

            
60
pub mod backend;
61
pub mod call_executor;
62
mod client;
63
mod helpers;
64
mod lock;
65
mod state_overrides;
66
mod wasm_override;
67
mod wasm_substitutes;
68

            
69
pub const LAZY_LOADING_LOG_TARGET: &'static str = "lazy-loading";
70

            
71
/// Lazy loading client type.
72
pub type TLazyLoadingClient<TBl, TRtApi, TExec> = sc_service::client::Client<
73
	TLazyLoadingBackend<TBl>,
74
	TLazyLoadingCallExecutor<TBl, TExec>,
75
	TBl,
76
	TRtApi,
77
>;
78

            
79
/// Lazy loading client backend type.
80
pub type TLazyLoadingBackend<TBl> = backend::Backend<TBl>;
81

            
82
/// Lazy loading client call executor type.
83
pub type TLazyLoadingCallExecutor<TBl, TExec> =
84
	call_executor::LazyLoadingCallExecutor<TBl, TLazyLoadingBackend<TBl>, TExec>;
85

            
86
/// Lazy loading parts type.
87
pub type TLazyLoadingParts<TBl, TRtApi, TExec> = (
88
	TLazyLoadingClient<TBl, TRtApi, TExec>,
89
	Arc<TLazyLoadingBackend<TBl>>,
90
	KeystoreContainer,
91
	TaskManager,
92
);
93

            
94
type LazyLoadingClient<RuntimeApi> =
95
	TLazyLoadingClient<Block, RuntimeApi, WasmExecutor<HostFunctions>>;
96
type LazyLoadingBackend = TLazyLoadingBackend<Block>;
97

            
98
/// Create the initial parts of a lazy loading node.
99
pub fn new_lazy_loading_parts<TBl, TRtApi, TExec>(
100
	config: &mut Configuration,
101
	lazy_loading_config: &LazyLoadingConfig,
102
	telemetry: Option<TelemetryHandle>,
103
	executor: TExec,
104
) -> Result<TLazyLoadingParts<TBl, TRtApi, TExec>, Error>
105
where
106
	TBl: BlockT + sp_runtime::DeserializeOwned,
107
	TBl::Hash: From<H256>,
108
	TExec: CodeExecutor + RuntimeVersionOf + Clone,
109
{
110
	let backend = backend::new_lazy_loading_backend(config, &lazy_loading_config)?;
111

            
112
	let genesis_block_builder = GenesisBlockBuilder::new(
113
		config.chain_spec.as_storage_builder(),
114
		!config.no_genesis(),
115
		backend.clone(),
116
		executor.clone(),
117
	)?;
118

            
119
	new_lazy_loading_parts_with_genesis_builder(
120
		config,
121
		lazy_loading_config,
122
		telemetry,
123
		executor,
124
		backend,
125
		genesis_block_builder,
126
	)
127
}
128

            
129
/// Create the initial parts of a lazy loading node.
130
pub fn new_lazy_loading_parts_with_genesis_builder<TBl, TRtApi, TExec, TBuildGenesisBlock>(
131
	config: &Configuration,
132
	lazy_loading_config: &LazyLoadingConfig,
133
	telemetry: Option<TelemetryHandle>,
134
	executor: TExec,
135
	backend: Arc<TLazyLoadingBackend<TBl>>,
136
	genesis_block_builder: TBuildGenesisBlock,
137
) -> Result<TLazyLoadingParts<TBl, TRtApi, TExec>, Error>
138
where
139
	TBl: BlockT + sp_runtime::DeserializeOwned,
140
	TBl::Hash: From<H256>,
141
	TExec: CodeExecutor + RuntimeVersionOf + Clone,
142
	TBuildGenesisBlock:
143
		BuildGenesisBlock<
144
			TBl,
145
			BlockImportOperation = <TLazyLoadingBackend<TBl> as sc_client_api::backend::Backend<
146
				TBl,
147
			>>::BlockImportOperation,
148
		>,
149
{
150
	let keystore_container = KeystoreContainer::new(&config.keystore)?;
151

            
152
	let task_manager = {
153
		let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
154
		TaskManager::new(config.tokio_handle.clone(), registry)?
155
	};
156

            
157
	let chain_spec = &config.chain_spec;
158
	let fork_blocks = get_extension::<ForkBlocks<TBl>>(chain_spec.extensions())
159
		.cloned()
160
		.unwrap_or_default();
161

            
162
	let bad_blocks = get_extension::<BadBlocks<TBl>>(chain_spec.extensions())
163
		.cloned()
164
		.unwrap_or_default();
165

            
166
	let client = {
167
		let extensions = sc_client_api::execution_extensions::ExecutionExtensions::new(
168
			None,
169
			Arc::new(executor.clone()),
170
		);
171

            
172
		let wasm_runtime_substitutes = config
173
			.chain_spec
174
			.code_substitutes()
175
			.into_iter()
176
			.map(|(n, c)| {
177
				let number = NumberFor::<TBl>::from_str(&n).map_err(|_| {
178
					Error::Application(Box::from(format!(
179
						"Failed to parse `{}` as block number for code substitutes. \
180
						 In an old version the key for code substitute was a block hash. \
181
						 Please update the chain spec to a version that is compatible with your node.",
182
						n
183
					)))
184
				})?;
185
				Ok((number, c))
186
			})
187
			.collect::<Result<std::collections::HashMap<_, _>, Error>>()?;
188

            
189
		let client = client::new_client(
190
			backend.clone(),
191
			executor,
192
			genesis_block_builder,
193
			fork_blocks,
194
			bad_blocks,
195
			extensions,
196
			Box::new(task_manager.spawn_handle()),
197
			config
198
				.prometheus_config
199
				.as_ref()
200
				.map(|config| config.registry.clone()),
201
			telemetry,
202
			ClientConfig {
203
				offchain_worker_enabled: config.offchain_worker.enabled,
204
				offchain_indexing_api: config.offchain_worker.indexing_enabled,
205
				wasmtime_precompiled: config.wasmtime_precompiled.clone(),
206
				wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
207
				no_genesis: matches!(
208
					config.network.sync_mode,
209
					SyncMode::LightState { .. } | SyncMode::Warp { .. }
210
				),
211
				wasm_runtime_substitutes,
212
				enable_import_proof_recording: false,
213
			},
214
			lazy_loading_config,
215
		)?;
216

            
217
		client
218
	};
219

            
220
	Ok((client, backend, keystore_container, task_manager))
221
}
222

            
223
/// Builds the PartialComponents for a lazy loading node.
224
#[allow(clippy::type_complexity)]
225
pub fn new_lazy_loading_partial<RuntimeApi, Customizations>(
226
	config: &mut Configuration,
227
	rpc_config: &RpcConfig,
228
	lazy_loading_config: &LazyLoadingConfig,
229
) -> PartialComponentsResult<LazyLoadingClient<RuntimeApi>, LazyLoadingBackend>
230
where
231
	RuntimeApi: ConstructRuntimeApi<Block, LazyLoadingClient<RuntimeApi>> + Send + Sync + 'static,
232
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
233
	Customizations: ClientCustomizations + 'static,
234
{
235
	set_prometheus_registry(config, rpc_config.no_prometheus_prefix)?;
236

            
237
	// Use ethereum style for subscription ids
238
	config.rpc_id_provider = Some(Box::new(fc_rpc::EthereumSubIdProvider));
239

            
240
	let telemetry = config
241
		.telemetry_endpoints
242
		.clone()
243
		.filter(|x| !x.is_empty())
244
		.map(|endpoints| -> Result<_, sc_telemetry::Error> {
245
			let worker = TelemetryWorker::new(16)?;
246
			let telemetry = worker.handle().new_telemetry(endpoints);
247
			Ok((worker, telemetry))
248
		})
249
		.transpose()?;
250

            
251
	let heap_pages = config
252
		.default_heap_pages
253
		.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
254
			extra_pages: h as _,
255
		});
256
	let mut wasm_builder = WasmExecutor::builder()
257
		.with_execution_method(config.wasm_method)
258
		.with_onchain_heap_alloc_strategy(heap_pages)
259
		.with_offchain_heap_alloc_strategy(heap_pages)
260
		.with_ignore_onchain_heap_pages(true)
261
		.with_max_runtime_instances(config.max_runtime_instances)
262
		.with_runtime_cache_size(config.runtime_cache_size);
263

            
264
	if let Some(ref wasmtime_precompiled_path) = config.wasmtime_precompiled {
265
		wasm_builder = wasm_builder.with_wasmtime_precompiled_path(wasmtime_precompiled_path);
266
	}
267

            
268
	let executor = wasm_builder.build();
269

            
270
	let (client, backend, keystore_container, task_manager) =
271
		new_lazy_loading_parts::<Block, RuntimeApi, _>(
272
			config,
273
			lazy_loading_config,
274
			telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
275
			executor,
276
		)?;
277

            
278
	if let Some(block_number) = Customizations::first_block_number_compatible_with_ed25519_zebra() {
279
		client
280
			.execution_extensions()
281
			.set_extensions_factory(sc_client_api::execution_extensions::ExtensionBeforeBlock::<
282
			Block,
283
			sp_io::UseDalekExt,
284
		>::new(block_number));
285
	}
286

            
287
	let client = Arc::new(client);
288

            
289
	let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
290

            
291
	let telemetry = telemetry.map(|(worker, telemetry)| {
292
		task_manager
293
			.spawn_handle()
294
			.spawn("telemetry", None, worker.run());
295
		telemetry
296
	});
297

            
298
	let maybe_select_chain = Some(sc_consensus::LongestChain::new(backend.clone()));
299

            
300
	let transaction_pool = sc_transaction_pool::BasicPool::new_full(
301
		config.transaction_pool.clone(),
302
		config.role.is_authority().into(),
303
		config.prometheus_registry(),
304
		task_manager.spawn_essential_handle(),
305
		client.clone(),
306
	);
307

            
308
	let filter_pool: Option<FilterPool> = Some(Arc::new(Mutex::new(BTreeMap::new())));
309
	let fee_history_cache: FeeHistoryCache = Arc::new(Mutex::new(BTreeMap::new()));
310

            
311
	let frontier_backend = Arc::new(open_frontier_backend(client.clone(), config, rpc_config)?);
312
	let frontier_block_import = FrontierBlockImport::new(client.clone(), client.clone());
313

            
314
	let create_inherent_data_providers = move |_, _| async move {
315
		let time = sp_timestamp::InherentDataProvider::from_system_time();
316
		// Create a dummy parachain inherent data provider which is required to pass
317
		// the checks by the para chain system. We use dummy values because in the 'pending context'
318
		// neither do we have access to the real values nor do we need them.
319
		let (relay_parent_storage_root, relay_chain_state) =
320
			RelayStateSproofBuilder::default().into_state_root_and_proof();
321
		let vfp = PersistedValidationData {
322
			// This is a hack to make `cumulus_pallet_parachain_system::RelayNumberStrictlyIncreases`
323
			// happy. Relay parent number can't be bigger than u32::MAX.
324
			relay_parent_number: u32::MAX,
325
			relay_parent_storage_root,
326
			..Default::default()
327
		};
328
		let parachain_inherent_data = ParachainInherentData {
329
			validation_data: vfp,
330
			relay_chain_state,
331
			downward_messages: Default::default(),
332
			horizontal_messages: Default::default(),
333
		};
334
		Ok((time, parachain_inherent_data))
335
	};
336

            
337
	let import_queue = nimbus_consensus::import_queue(
338
		client.clone(),
339
		frontier_block_import.clone(),
340
		create_inherent_data_providers,
341
		&task_manager.spawn_essential_handle(),
342
		config.prometheus_registry(),
343
		false,
344
	)?;
345
	let block_import = BlockImportPipeline::Dev(frontier_block_import);
346

            
347
	Ok(PartialComponents {
348
		backend,
349
		client,
350
		import_queue,
351
		keystore_container,
352
		task_manager,
353
		transaction_pool,
354
		select_chain: maybe_select_chain,
355
		other: (
356
			block_import,
357
			filter_pool,
358
			telemetry,
359
			telemetry_worker_handle,
360
			frontier_backend,
361
			fee_history_cache,
362
		),
363
	})
364
}
365

            
366
/// Builds a new lazy loading service. This service uses manual seal, and mocks
367
/// the parachain inherent.
368
#[sc_tracing::logging::prefix_logs_with("Lazy loading 🌗")]
369
pub async fn new_lazy_loading_service<RuntimeApi, Customizations, Net>(
370
	mut config: Configuration,
371
	_author_id: Option<NimbusId>,
372
	sealing: moonbeam_cli_opt::Sealing,
373
	rpc_config: RpcConfig,
374
	lazy_loading_config: LazyLoadingConfig,
375
	hwbench: Option<sc_sysinfo::HwBench>,
376
) -> Result<TaskManager, ServiceError>
377
where
378
	RuntimeApi: ConstructRuntimeApi<Block, LazyLoadingClient<RuntimeApi>> + Send + Sync + 'static,
379
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
380
	Customizations: ClientCustomizations + 'static,
381
	Net: NetworkBackend<Block, Hash>,
382
{
383
	use async_io::Timer;
384
	use futures::Stream;
385
	use sc_consensus_manual_seal::{run_manual_seal, EngineCommand, ManualSealParams};
386

            
387
	let sc_service::PartialComponents {
388
		client,
389
		backend,
390
		mut task_manager,
391
		import_queue,
392
		keystore_container,
393
		select_chain: maybe_select_chain,
394
		transaction_pool,
395
		other:
396
			(
397
				block_import_pipeline,
398
				filter_pool,
399
				mut telemetry,
400
				_telemetry_worker_handle,
401
				frontier_backend,
402
				fee_history_cache,
403
			),
404
	} = lazy_loading::new_lazy_loading_partial::<RuntimeApi, Customizations>(
405
		&mut config,
406
		&rpc_config,
407
		&lazy_loading_config,
408
	)?;
409

            
410
	let start_delay = 10;
411
	let lazy_loading_startup_disclaimer = format!(
412
		r#"
413

            
414
		You are now running the Moonbeam client in lazy loading mode, where data is retrieved
415
		from a live RPC node on demand.
416

            
417
		Using remote state from: {rpc}
418
		Forking from block: {fork_block}
419

            
420
		To ensure the client works properly, please note the following:
421

            
422
		    1. *Avoid Throttling*: Ensure that the backing RPC node is not limiting the number of
423
		    requests, as this can prevent the lazy loading client from functioning correctly;
424

            
425
		    2. *Be Patient*: As the client may take approximately 20 times longer than normal to
426
		    retrieve and process the necessary data for the requested operation.
427

            
428

            
429
		The service will start in {start_delay} seconds...
430

            
431
		"#,
432
		rpc = lazy_loading_config.state_rpc,
433
		fork_block = backend.fork_checkpoint.number
434
	);
435

            
436
	log::warn!(
437
		"{}",
438
		ansi_term::Colour::Yellow.paint(lazy_loading_startup_disclaimer)
439
	);
440
	tokio::time::sleep(Duration::from_secs(start_delay)).await;
441

            
442
	let block_import = if let BlockImportPipeline::Dev(block_import) = block_import_pipeline {
443
		block_import
444
	} else {
445
		return Err(ServiceError::Other(
446
			"Block import pipeline is not dev".to_string(),
447
		));
448
	};
449

            
450
	let net_config = FullNetworkConfiguration::<_, _, Net>::new(&config.network);
451

            
452
	let metrics = Net::register_notification_metrics(
453
		config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
454
	);
455

            
456
	let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
457
		sc_service::build_network(sc_service::BuildNetworkParams {
458
			config: &config,
459
			client: client.clone(),
460
			transaction_pool: transaction_pool.clone(),
461
			spawn_handle: task_manager.spawn_handle(),
462
			import_queue,
463
			block_announce_validator_builder: None,
464
			warp_sync_params: None,
465
			net_config,
466
			block_relay: None,
467
			metrics,
468
		})?;
469

            
470
	if config.offchain_worker.enabled {
471
		task_manager.spawn_handle().spawn(
472
			"offchain-workers-runner",
473
			"offchain-work",
474
			sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
475
				runtime_api_provider: client.clone(),
476
				keystore: Some(keystore_container.keystore()),
477
				offchain_db: backend.offchain_storage(),
478
				transaction_pool: Some(OffchainTransactionPoolFactory::new(
479
					transaction_pool.clone(),
480
				)),
481
				network_provider: Arc::new(network.clone()),
482
				is_validator: config.role.is_authority(),
483
				enable_http_requests: true,
484
				custom_extensions: move |_| vec![],
485
			})
486
			.run(client.clone(), task_manager.spawn_handle())
487
			.boxed(),
488
		);
489
	}
490

            
491
	let prometheus_registry = config.prometheus_registry().cloned();
492
	let overrides = Arc::new(StorageOverrideHandler::new(client.clone()));
493
	let fee_history_limit = rpc_config.fee_history_limit;
494
	let mut command_sink = None;
495
	let mut dev_rpc_data = None;
496
	let collator = config.role.is_authority();
497

            
498
	if collator {
499
		let mut env = sc_basic_authorship::ProposerFactory::with_proof_recording(
500
			task_manager.spawn_handle(),
501
			client.clone(),
502
			transaction_pool.clone(),
503
			prometheus_registry.as_ref(),
504
			telemetry.as_ref().map(|x| x.handle()),
505
		);
506
		env.set_soft_deadline(SOFT_DEADLINE_PERCENT);
507

            
508
		let commands_stream: Box<dyn Stream<Item = EngineCommand<H256>> + Send + Sync + Unpin> =
509
			match sealing {
510
				moonbeam_cli_opt::Sealing::Instant => {
511
					Box::new(
512
						// This bit cribbed from the implementation of instant seal.
513
						transaction_pool
514
							.pool()
515
							.validated_pool()
516
							.import_notification_stream()
517
							.map(|_| EngineCommand::SealNewBlock {
518
								create_empty: false,
519
								finalize: false,
520
								parent_hash: None,
521
								sender: None,
522
							}),
523
					)
524
				}
525
				moonbeam_cli_opt::Sealing::Manual => {
526
					let (sink, stream) = futures::channel::mpsc::channel(1000);
527
					// Keep a reference to the other end of the channel. It goes to the RPC.
528
					command_sink = Some(sink);
529
					Box::new(stream)
530
				}
531
				moonbeam_cli_opt::Sealing::Interval(millis) => Box::new(StreamExt::map(
532
					Timer::interval(Duration::from_millis(millis)),
533
					|_| EngineCommand::SealNewBlock {
534
						create_empty: true,
535
						finalize: false,
536
						parent_hash: None,
537
						sender: None,
538
					},
539
				)),
540
			};
541

            
542
		let select_chain = maybe_select_chain.expect(
543
			"`new_lazy_loading_partial` builds a `LongestChainRule` when building dev service.\
544
				We specified the dev service when calling `new_partial`.\
545
				Therefore, a `LongestChainRule` is present. qed.",
546
		);
547

            
548
		let client_set_aside_for_cidp = client.clone();
549

            
550
		// Create channels for mocked XCM messages.
551
		let (downward_xcm_sender, downward_xcm_receiver) = flume::bounded::<Vec<u8>>(100);
552
		let (hrmp_xcm_sender, hrmp_xcm_receiver) = flume::bounded::<(ParaId, Vec<u8>)>(100);
553
		let additional_relay_offset = Arc::new(std::sync::atomic::AtomicU32::new(0));
554
		dev_rpc_data = Some((
555
			downward_xcm_sender,
556
			hrmp_xcm_sender,
557
			additional_relay_offset,
558
		));
559

            
560
		let client_clone = client.clone();
561
		let keystore_clone = keystore_container.keystore().clone();
562
		let maybe_provide_vrf_digest =
563
			move |nimbus_id: NimbusId, parent: Hash| -> Option<sp_runtime::generic::DigestItem> {
564
				moonbeam_vrf::vrf_pre_digest::<Block, LazyLoadingClient<RuntimeApi>>(
565
					&client_clone,
566
					&keystore_clone,
567
					nimbus_id,
568
					parent,
569
				)
570
			};
571

            
572
		let parachain_id = helpers::get_parachain_id(backend.rpc_client.clone())
573
			.unwrap_or_else(|| panic!("Could not get parachain identifier for lazy loading mode."));
574

            
575
		task_manager.spawn_essential_handle().spawn_blocking(
576
			"authorship_task",
577
			Some("block-authoring"),
578
			run_manual_seal(ManualSealParams {
579
				block_import,
580
				env,
581
				client: client.clone(),
582
				pool: transaction_pool.clone(),
583
				commands_stream,
584
				select_chain,
585
				consensus_data_provider: Some(Box::new(NimbusManualSealConsensusDataProvider {
586
					keystore: keystore_container.keystore(),
587
					client: client.clone(),
588
					additional_digests_provider: maybe_provide_vrf_digest,
589
					_phantom: Default::default(),
590
				})),
591
				create_inherent_data_providers: move |block: H256, ()| {
592
					let maybe_current_para_block = client_set_aside_for_cidp.number(block);
593
					let maybe_current_para_head = client_set_aside_for_cidp.expect_header(block);
594
					let downward_xcm_receiver = downward_xcm_receiver.clone();
595
					let hrmp_xcm_receiver = hrmp_xcm_receiver.clone();
596

            
597
					let client_for_cidp = client_set_aside_for_cidp.clone();
598
					async move {
599
						let time = sp_timestamp::InherentDataProvider::from_system_time();
600

            
601
						let current_para_block = maybe_current_para_block?
602
							.ok_or(sp_blockchain::Error::UnknownBlock(block.to_string()))?;
603

            
604
						let current_para_block_head = Some(polkadot_primitives::HeadData(
605
							maybe_current_para_head?.encode(),
606
						));
607

            
608
						let mut additional_key_values = vec![
609
							(
610
								moonbeam_core_primitives::well_known_relay_keys::TIMESTAMP_NOW
611
									.to_vec(),
612
								sp_timestamp::Timestamp::current().encode(),
613
							),
614
							(
615
								relay_chain::well_known_keys::ACTIVE_CONFIG.to_vec(),
616
								AbridgedHostConfiguration {
617
									max_code_size: 3_145_728,
618
									max_head_data_size: 20_480,
619
									max_upward_queue_count: 174_762,
620
									max_upward_queue_size: 1_048_576,
621
									max_upward_message_size: 65_531,
622
									max_upward_message_num_per_candidate: 16,
623
									hrmp_max_message_num_per_candidate: 10,
624
									validation_upgrade_cooldown: 14_400,
625
									validation_upgrade_delay: 600,
626
									async_backing_params: AsyncBackingParams {
627
										max_candidate_depth: 3,
628
										allowed_ancestry_len: 2,
629
									},
630
								}
631
								.encode(),
632
							),
633
							// Override current slot number
634
							(
635
								relay_chain::well_known_keys::CURRENT_SLOT.to_vec(),
636
								Slot::from(u64::from(current_para_block)).encode(),
637
							),
638
						];
639

            
640
						// If there is a pending upgrade, lets mimic a GoAhead
641
						// signal from the relay
642

            
643
						let storage_key = [
644
							twox_128(b"ParachainSystem"),
645
							twox_128(b"PendingValidationCode"),
646
						]
647
						.concat();
648
						let has_pending_upgrade = client_for_cidp
649
							.storage(block, &StorageKey(storage_key))
650
							.map_or(false, |ok| ok.map_or(false, |some| !some.0.is_empty()));
651
						if has_pending_upgrade {
652
							additional_key_values.push((
653
								relay_chain::well_known_keys::upgrade_go_ahead_signal(ParaId::new(
654
									parachain_id,
655
								)),
656
								Some(UpgradeGoAhead::GoAhead).encode(),
657
							));
658
						}
659

            
660
						let mocked_parachain = MockValidationDataInherentDataProvider {
661
							current_para_block,
662
							para_id: ParaId::new(parachain_id),
663
							current_para_block_head,
664
							relay_offset: 1000,
665
							relay_blocks_per_para_block: 2,
666
							// TODO: Recheck
667
							para_blocks_per_relay_epoch: 10,
668
							relay_randomness_config: (),
669
							xcm_config: MockXcmConfig::new(
670
								&*client_for_cidp,
671
								block,
672
								Default::default(),
673
							),
674
							raw_downward_messages: downward_xcm_receiver.drain().collect(),
675
							raw_horizontal_messages: hrmp_xcm_receiver.drain().collect(),
676
							additional_key_values: Some(additional_key_values),
677
						};
678

            
679
						let randomness = session_keys_primitives::InherentDataProvider;
680

            
681
						Ok((time, mocked_parachain, randomness))
682
					}
683
				},
684
			}),
685
		);
686
	}
687

            
688
	// Sinks for pubsub notifications.
689
	// Everytime a new subscription is created, a new mpsc channel is added to the sink pool.
690
	// The MappingSyncWorker sends through the channel on block import and the subscription emits a
691
	// notification to the subscriber on receiving a message through this channel.
692
	// This way we avoid race conditions when using native substrate block import notification
693
	// stream.
694
	let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
695
		fc_mapping_sync::EthereumBlockNotification<Block>,
696
	> = Default::default();
697
	let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
698

            
699
	rpc::spawn_essential_tasks(
700
		rpc::SpawnTasksParams {
701
			task_manager: &task_manager,
702
			client: client.clone(),
703
			substrate_backend: backend.clone(),
704
			frontier_backend: frontier_backend.clone(),
705
			filter_pool: filter_pool.clone(),
706
			overrides: overrides.clone(),
707
			fee_history_limit,
708
			fee_history_cache: fee_history_cache.clone(),
709
		},
710
		sync_service.clone(),
711
		pubsub_notification_sinks.clone(),
712
	);
713
	let ethapi_cmd = rpc_config.ethapi.clone();
714
	let tracing_requesters =
715
		if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
716
			rpc::tracing::spawn_tracing_tasks(
717
				&rpc_config,
718
				prometheus_registry.clone(),
719
				rpc::SpawnTasksParams {
720
					task_manager: &task_manager,
721
					client: client.clone(),
722
					substrate_backend: backend.clone(),
723
					frontier_backend: frontier_backend.clone(),
724
					filter_pool: filter_pool.clone(),
725
					overrides: overrides.clone(),
726
					fee_history_limit,
727
					fee_history_cache: fee_history_cache.clone(),
728
				},
729
			)
730
		} else {
731
			rpc::tracing::RpcRequesters {
732
				debug: None,
733
				trace: None,
734
			}
735
		};
736

            
737
	let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
738
		task_manager.spawn_handle(),
739
		overrides.clone(),
740
		rpc_config.eth_log_block_cache,
741
		rpc_config.eth_statuses_cache,
742
		prometheus_registry,
743
	));
744

            
745
	let rpc_builder = {
746
		let client = client.clone();
747
		let pool = transaction_pool.clone();
748
		let backend = backend.clone();
749
		let network = network.clone();
750
		let sync = sync_service.clone();
751
		let ethapi_cmd = ethapi_cmd.clone();
752
		let max_past_logs = rpc_config.max_past_logs;
753
		let overrides = overrides.clone();
754
		let fee_history_cache = fee_history_cache.clone();
755
		let block_data_cache = block_data_cache.clone();
756
		let pubsub_notification_sinks = pubsub_notification_sinks.clone();
757

            
758
		let keystore = keystore_container.keystore();
759
		let command_sink_for_task = command_sink.clone();
760
		move |deny_unsafe, subscription_task_executor| {
761
			let deps = rpc::FullDeps {
762
				backend: backend.clone(),
763
				client: client.clone(),
764
				command_sink: command_sink_for_task.clone(),
765
				deny_unsafe,
766
				ethapi_cmd: ethapi_cmd.clone(),
767
				filter_pool: filter_pool.clone(),
768
				frontier_backend: match *frontier_backend {
769
					fc_db::Backend::KeyValue(ref b) => b.clone(),
770
					fc_db::Backend::Sql(ref b) => b.clone(),
771
				},
772
				graph: pool.pool().clone(),
773
				pool: pool.clone(),
774
				is_authority: collator,
775
				max_past_logs,
776
				fee_history_limit,
777
				fee_history_cache: fee_history_cache.clone(),
778
				network: network.clone(),
779
				sync: sync.clone(),
780
				dev_rpc_data: dev_rpc_data.clone(),
781
				overrides: overrides.clone(),
782
				block_data_cache: block_data_cache.clone(),
783
				forced_parent_hashes: None,
784
			};
785

            
786
			let pending_consensus_data_provider = Box::new(PendingConsensusDataProvider::new(
787
				client.clone(),
788
				keystore.clone(),
789
			));
790
			if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
791
				rpc::create_full(
792
					deps,
793
					subscription_task_executor,
794
					Some(crate::rpc::TracingConfig {
795
						tracing_requesters: tracing_requesters.clone(),
796
						trace_filter_max_count: rpc_config.ethapi_trace_max_count,
797
					}),
798
					pubsub_notification_sinks.clone(),
799
					pending_consensus_data_provider,
800
				)
801
				.map_err(Into::into)
802
			} else {
803
				rpc::create_full(
804
					deps,
805
					subscription_task_executor,
806
					None,
807
					pubsub_notification_sinks.clone(),
808
					pending_consensus_data_provider,
809
				)
810
				.map_err(Into::into)
811
			}
812
		}
813
	};
814

            
815
	let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
816
		network,
817
		client,
818
		keystore: keystore_container.keystore(),
819
		task_manager: &mut task_manager,
820
		transaction_pool,
821
		rpc_builder: Box::new(rpc_builder),
822
		backend,
823
		system_rpc_tx,
824
		sync_service: sync_service.clone(),
825
		config,
826
		tx_handler_controller,
827
		telemetry: None,
828
	})?;
829

            
830
	if let Some(hwbench) = hwbench {
831
		sc_sysinfo::print_hwbench(&hwbench);
832

            
833
		if let Some(ref mut telemetry) = telemetry {
834
			let telemetry_handle = telemetry.handle();
835
			task_manager.spawn_handle().spawn(
836
				"telemetry_hwbench",
837
				None,
838
				sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
839
			);
840
		}
841
	}
842

            
843
	network_starter.start_network();
844

            
845
	log::info!("Service Ready");
846

            
847
	Ok(task_manager)
848
}