1
// Copyright 2019-2025 PureStake Inc.
2
// This file is part of Moonbeam.
3

            
4
// Moonbeam is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Moonbeam is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Moonbeam.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
//! This module assembles the Moonbeam service components, executes them, and manages communication
18
//! between them. This is the backbone of the client-side node implementation.
19
//!
20
//! This module can assemble:
21
//! PartialComponents: For maintence tasks without a complete node (eg import/export blocks, purge)
22
//! Full Service: A complete parachain node including the pool, rpc, network, embedded relay chain
23
//! Dev Service: A leaner service without the relay chain backing.
24

            
25
pub mod rpc;
26

            
27
use cumulus_client_cli::CollatorOptions;
28
use cumulus_client_collator::service::CollatorService;
29
use cumulus_client_consensus_common::ParachainBlockImport as TParachainBlockImport;
30
use cumulus_client_consensus_proposer::Proposer;
31
use cumulus_client_parachain_inherent::{MockValidationDataInherentDataProvider, MockXcmConfig};
32
use cumulus_client_service::{
33
	prepare_node_config, start_relay_chain_tasks, CollatorSybilResistance, DARecoveryProfile,
34
	ParachainHostFunctions, StartRelayChainTasksParams,
35
};
36
use cumulus_primitives_core::{
37
	relay_chain,
38
	relay_chain::{well_known_keys, CollatorPair},
39
	ParaId,
40
};
41
use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain;
42
use cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface, RelayChainResult};
43
use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node_with_rpc;
44
use fc_consensus::FrontierBlockImport as TFrontierBlockImport;
45
use fc_db::DatabaseSource;
46
use fc_rpc::StorageOverrideHandler;
47
use fc_rpc_core::types::{FeeHistoryCache, FilterPool};
48
use futures::{FutureExt, StreamExt};
49
use maplit::hashmap;
50
#[cfg(feature = "moonbase-native")]
51
pub use moonbase_runtime;
52
use moonbeam_cli_opt::{EthApi as EthApiCmd, FrontierBackendConfig, RpcConfig};
53
#[cfg(feature = "moonbeam-native")]
54
pub use moonbeam_runtime;
55
use moonbeam_vrf::VrfDigestsProvider;
56
#[cfg(feature = "moonriver-native")]
57
pub use moonriver_runtime;
58
use nimbus_consensus::NimbusManualSealConsensusDataProvider;
59
use nimbus_primitives::{DigestsProvider, NimbusId};
60
use polkadot_primitives::{AbridgedHostConfiguration, AsyncBackingParams, Slot};
61
use sc_client_api::{
62
	backend::{AuxStore, Backend, StateBackend, StorageProvider},
63
	ExecutorProvider,
64
};
65
use sc_consensus::ImportQueue;
66
use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
67
use sc_network::{config::FullNetworkConfiguration, NetworkBackend, NetworkBlock};
68
use sc_service::config::PrometheusConfig;
69
use sc_service::{
70
	error::Error as ServiceError, ChainSpec, Configuration, PartialComponents, TFullBackend,
71
	TFullClient, TaskManager,
72
};
73
use sc_telemetry::{Telemetry, TelemetryHandle, TelemetryWorker, TelemetryWorkerHandle};
74
use sc_transaction_pool_api::OffchainTransactionPoolFactory;
75
use session_keys_primitives::VrfApi;
76
use sp_api::{ConstructRuntimeApi, ProvideRuntimeApi};
77
use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
78
use sp_consensus::SyncOracle;
79
use sp_core::{twox_128, ByteArray, Encode, H256};
80
use sp_keystore::{Keystore, KeystorePtr};
81
use std::str::FromStr;
82
use std::sync::atomic::{AtomicU64, Ordering};
83
use std::sync::Arc;
84
use std::{collections::BTreeMap, path::Path, sync::Mutex, time::Duration};
85
use substrate_prometheus_endpoint::Registry;
86

            
87
pub use client::*;
88
pub mod chain_spec;
89
mod client;
90
#[cfg(feature = "lazy-loading")]
91
pub mod lazy_loading;
92

            
93
type FullClient<RuntimeApi> = TFullClient<Block, RuntimeApi, WasmExecutor<HostFunctions>>;
94
type FullBackend = TFullBackend<Block>;
95

            
96
type MaybeSelectChain<Backend> = Option<sc_consensus::LongestChain<Backend, Block>>;
97
type FrontierBlockImport<Client> = TFrontierBlockImport<Block, Arc<Client>, Client>;
98
type ParachainBlockImport<Client, Backend> =
99
	TParachainBlockImport<Block, FrontierBlockImport<Client>, Backend>;
100
type PartialComponentsResult<Client, Backend> = Result<
101
	PartialComponents<
102
		Client,
103
		Backend,
104
		MaybeSelectChain<Backend>,
105
		sc_consensus::DefaultImportQueue<Block>,
106
		sc_transaction_pool::FullPool<Block, Client>,
107
		(
108
			BlockImportPipeline<FrontierBlockImport<Client>, ParachainBlockImport<Client, Backend>>,
109
			Option<FilterPool>,
110
			Option<Telemetry>,
111
			Option<TelemetryWorkerHandle>,
112
			Arc<fc_db::Backend<Block, Client>>,
113
			FeeHistoryCache,
114
		),
115
	>,
116
	ServiceError,
117
>;
118

            
119
const RELAY_CHAIN_SLOT_DURATION_MILLIS: u64 = 6_000;
120

            
121
static TIMESTAMP: AtomicU64 = AtomicU64::new(0);
122

            
123
/// Provide a mock duration starting at 0 in millisecond for timestamp inherent.
124
/// Each call will increment timestamp by slot_duration making Aura think time has passed.
125
struct MockTimestampInherentDataProvider;
126
#[async_trait::async_trait]
127
impl sp_inherents::InherentDataProvider for MockTimestampInherentDataProvider {
128
	async fn provide_inherent_data(
129
		&self,
130
		inherent_data: &mut sp_inherents::InherentData,
131
26788
	) -> Result<(), sp_inherents::Error> {
132
26788
		TIMESTAMP.fetch_add(RELAY_CHAIN_SLOT_DURATION_MILLIS, Ordering::SeqCst);
133
26788
		inherent_data.put_data(
134
26788
			sp_timestamp::INHERENT_IDENTIFIER,
135
26788
			&TIMESTAMP.load(Ordering::SeqCst),
136
26788
		)
137
53576
	}
138

            
139
	async fn try_handle_error(
140
		&self,
141
		_identifier: &sp_inherents::InherentIdentifier,
142
		_error: &[u8],
143
	) -> Option<Result<(), sp_inherents::Error>> {
144
		// The pallet never reports error.
145
		None
146
	}
147
}
148

            
149
#[cfg(feature = "runtime-benchmarks")]
150
pub type HostFunctions = (
151
	frame_benchmarking::benchmarking::HostFunctions,
152
	ParachainHostFunctions,
153
	moonbeam_primitives_ext::moonbeam_ext::HostFunctions,
154
);
155
#[cfg(not(feature = "runtime-benchmarks"))]
156
pub type HostFunctions = (
157
	ParachainHostFunctions,
158
	moonbeam_primitives_ext::moonbeam_ext::HostFunctions,
159
);
160

            
161
/// Block Import Pipeline used.
162
pub enum BlockImportPipeline<T, E> {
163
	/// Used in dev mode to import new blocks as best blocks.
164
	Dev(T),
165
	/// Used in parachain mode.
166
	Parachain(E),
167
}
168

            
169
/// A trait that must be implemented by all moon* runtimes executors.
170
///
171
/// This feature allows, for instance, to customize the client extensions according to the type
172
/// of network.
173
/// For the moment, this feature is only used to specify the first block compatible with
174
/// ed25519-zebra, but it could be used for other things in the future.
175
pub trait ClientCustomizations {
176
	/// The host function ed25519_verify has changed its behavior in the substrate history,
177
	/// because of the change from lib ed25519-dalek to lib ed25519-zebra.
178
	/// Some networks may have old blocks that are not compatible with ed25519-zebra,
179
	/// for these networks this function should return the 1st block compatible with the new lib.
180
	/// If this function returns None (default behavior), it implies that all blocks are compatible
181
	/// with the new lib (ed25519-zebra).
182
	fn first_block_number_compatible_with_ed25519_zebra() -> Option<u32> {
183
		None
184
	}
185
}
186

            
187
#[cfg(feature = "moonbeam-native")]
188
pub struct MoonbeamCustomizations;
189
#[cfg(feature = "moonbeam-native")]
190
impl ClientCustomizations for MoonbeamCustomizations {
191
	fn first_block_number_compatible_with_ed25519_zebra() -> Option<u32> {
192
		Some(2_000_000)
193
	}
194
}
195

            
196
#[cfg(feature = "moonriver-native")]
197
pub struct MoonriverCustomizations;
198
#[cfg(feature = "moonriver-native")]
199
impl ClientCustomizations for MoonriverCustomizations {
200
	fn first_block_number_compatible_with_ed25519_zebra() -> Option<u32> {
201
		Some(3_000_000)
202
	}
203
}
204

            
205
#[cfg(feature = "moonbase-native")]
206
pub struct MoonbaseCustomizations;
207
#[cfg(feature = "moonbase-native")]
208
impl ClientCustomizations for MoonbaseCustomizations {
209
938
	fn first_block_number_compatible_with_ed25519_zebra() -> Option<u32> {
210
938
		Some(3_000_000)
211
938
	}
212
}
213

            
214
/// Trivial enum representing runtime variant
215
#[derive(Clone)]
216
pub enum RuntimeVariant {
217
	#[cfg(feature = "moonbeam-native")]
218
	Moonbeam,
219
	#[cfg(feature = "moonriver-native")]
220
	Moonriver,
221
	#[cfg(feature = "moonbase-native")]
222
	Moonbase,
223
	Unrecognized,
224
}
225

            
226
impl RuntimeVariant {
227
	pub fn from_chain_spec(chain_spec: &Box<dyn ChainSpec>) -> Self {
228
		match chain_spec {
229
			#[cfg(feature = "moonbeam-native")]
230
			spec if spec.is_moonbeam() => Self::Moonbeam,
231
			#[cfg(feature = "moonriver-native")]
232
			spec if spec.is_moonriver() => Self::Moonriver,
233
			#[cfg(feature = "moonbase-native")]
234
			spec if spec.is_moonbase() => Self::Moonbase,
235
			_ => Self::Unrecognized,
236
		}
237
	}
238
}
239

            
240
/// Can be called for a `Configuration` to check if it is a configuration for
241
/// the `Moonbeam` network.
242
pub trait IdentifyVariant {
243
	/// Returns `true` if this is a configuration for the `Moonbase` network.
244
	fn is_moonbase(&self) -> bool;
245

            
246
	/// Returns `true` if this is a configuration for the `Moonbeam` network.
247
	fn is_moonbeam(&self) -> bool;
248

            
249
	/// Returns `true` if this is a configuration for the `Moonriver` network.
250
	fn is_moonriver(&self) -> bool;
251

            
252
	/// Returns `true` if this is a configuration for a dev network.
253
	fn is_dev(&self) -> bool;
254
}
255

            
256
impl IdentifyVariant for Box<dyn ChainSpec> {
257
	fn is_moonbase(&self) -> bool {
258
		self.id().starts_with("moonbase")
259
	}
260

            
261
938
	fn is_moonbeam(&self) -> bool {
262
938
		self.id().starts_with("moonbeam")
263
938
	}
264

            
265
938
	fn is_moonriver(&self) -> bool {
266
938
		self.id().starts_with("moonriver")
267
938
	}
268

            
269
936
	fn is_dev(&self) -> bool {
270
936
		self.chain_type() == sc_chain_spec::ChainType::Development
271
936
	}
272
}
273

            
274
938
pub fn frontier_database_dir(config: &Configuration, path: &str) -> std::path::PathBuf {
275
938
	config
276
938
		.base_path
277
938
		.config_dir(config.chain_spec.id())
278
938
		.join("frontier")
279
938
		.join(path)
280
938
}
281

            
282
// TODO This is copied from frontier. It should be imported instead after
283
// https://github.com/paritytech/frontier/issues/333 is solved
284
938
pub fn open_frontier_backend<C, BE>(
285
938
	client: Arc<C>,
286
938
	config: &Configuration,
287
938
	rpc_config: &RpcConfig,
288
938
) -> Result<fc_db::Backend<Block, C>, String>
289
938
where
290
938
	C: ProvideRuntimeApi<Block> + StorageProvider<Block, BE> + AuxStore,
291
938
	C: HeaderBackend<Block> + HeaderMetadata<Block, Error = BlockChainError>,
292
938
	C: Send + Sync + 'static,
293
938
	C::Api: fp_rpc::EthereumRuntimeRPCApi<Block>,
294
938
	BE: Backend<Block> + 'static,
295
938
	BE::State: StateBackend<BlakeTwo256>,
296
938
{
297
938
	let frontier_backend = match rpc_config.frontier_backend_config {
298
		FrontierBackendConfig::KeyValue => {
299
			fc_db::Backend::KeyValue(Arc::new(fc_db::kv::Backend::<Block, C>::new(
300
938
				client,
301
938
				&fc_db::kv::DatabaseSettings {
302
938
					source: match config.database {
303
938
						DatabaseSource::RocksDb { .. } => DatabaseSource::RocksDb {
304
938
							path: frontier_database_dir(config, "db"),
305
938
							cache_size: 0,
306
938
						},
307
						DatabaseSource::ParityDb { .. } => DatabaseSource::ParityDb {
308
							path: frontier_database_dir(config, "paritydb"),
309
						},
310
						DatabaseSource::Auto { .. } => DatabaseSource::Auto {
311
							rocksdb_path: frontier_database_dir(config, "db"),
312
							paritydb_path: frontier_database_dir(config, "paritydb"),
313
							cache_size: 0,
314
						},
315
						_ => {
316
							return Err(
317
								"Supported db sources: `rocksdb` | `paritydb` | `auto`".to_string()
318
							)
319
						}
320
					},
321
				},
322
			)?))
323
		}
324
		FrontierBackendConfig::Sql {
325
			pool_size,
326
			num_ops_timeout,
327
			thread_count,
328
			cache_size,
329
		} => {
330
			let overrides = Arc::new(StorageOverrideHandler::new(client.clone()));
331
			let sqlite_db_path = frontier_database_dir(config, "sql");
332
			std::fs::create_dir_all(&sqlite_db_path).expect("failed creating sql db directory");
333
			let backend = futures::executor::block_on(fc_db::sql::Backend::new(
334
				fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig {
335
					path: Path::new("sqlite:///")
336
						.join(sqlite_db_path)
337
						.join("frontier.db3")
338
						.to_str()
339
						.expect("frontier sql path error"),
340
					create_if_missing: true,
341
					thread_count: thread_count,
342
					cache_size: cache_size,
343
				}),
344
				pool_size,
345
				std::num::NonZeroU32::new(num_ops_timeout),
346
				overrides.clone(),
347
			))
348
			.unwrap_or_else(|err| panic!("failed creating sql backend: {:?}", err));
349
			fc_db::Backend::Sql(Arc::new(backend))
350
		}
351
	};
352

            
353
938
	Ok(frontier_backend)
354
938
}
355

            
356
use sp_runtime::{traits::BlakeTwo256, DigestItem, Percent};
357

            
358
pub const SOFT_DEADLINE_PERCENT: Percent = Percent::from_percent(100);
359

            
360
/// Builds a new object suitable for chain operations.
361
#[allow(clippy::type_complexity)]
362
pub fn new_chain_ops(
363
	config: &mut Configuration,
364
	rpc_config: &RpcConfig,
365
	legacy_block_import_strategy: bool,
366
) -> Result<
367
	(
368
		Arc<Client>,
369
		Arc<FullBackend>,
370
		sc_consensus::BasicQueue<Block>,
371
		TaskManager,
372
	),
373
	ServiceError,
374
> {
375
	match &config.chain_spec {
376
		#[cfg(feature = "moonriver-native")]
377
		spec if spec.is_moonriver() => new_chain_ops_inner::<
378
			moonriver_runtime::RuntimeApi,
379
			MoonriverCustomizations,
380
		>(config, rpc_config, legacy_block_import_strategy),
381
		#[cfg(feature = "moonbeam-native")]
382
		spec if spec.is_moonbeam() => new_chain_ops_inner::<
383
			moonbeam_runtime::RuntimeApi,
384
			MoonbeamCustomizations,
385
		>(config, rpc_config, legacy_block_import_strategy),
386
		#[cfg(feature = "moonbase-native")]
387
		_ => new_chain_ops_inner::<moonbase_runtime::RuntimeApi, MoonbaseCustomizations>(
388
			config,
389
			rpc_config,
390
			legacy_block_import_strategy,
391
		),
392
		#[cfg(not(feature = "moonbase-native"))]
393
		_ => panic!("invalid chain spec"),
394
	}
395
}
396

            
397
#[allow(clippy::type_complexity)]
398
fn new_chain_ops_inner<RuntimeApi, Customizations>(
399
	config: &mut Configuration,
400
	rpc_config: &RpcConfig,
401
	legacy_block_import_strategy: bool,
402
) -> Result<
403
	(
404
		Arc<Client>,
405
		Arc<FullBackend>,
406
		sc_consensus::BasicQueue<Block>,
407
		TaskManager,
408
	),
409
	ServiceError,
410
>
411
where
412
	Client: From<Arc<crate::FullClient<RuntimeApi>>>,
413
	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
414
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
415
	Customizations: ClientCustomizations + 'static,
416
{
417
	config.keystore = sc_service::config::KeystoreConfig::InMemory;
418
	let PartialComponents {
419
		client,
420
		backend,
421
		import_queue,
422
		task_manager,
423
		..
424
	} = new_partial::<RuntimeApi, Customizations>(
425
		config,
426
		rpc_config,
427
		config.chain_spec.is_dev(),
428
		legacy_block_import_strategy,
429
	)?;
430
	Ok((
431
		Arc::new(Client::from(client)),
432
		backend,
433
		import_queue,
434
		task_manager,
435
	))
436
}
437

            
438
// If we're using prometheus, use a registry with a prefix of `moonbeam`.
439
941
fn set_prometheus_registry(
440
941
	config: &mut Configuration,
441
941
	skip_prefix: bool,
442
941
) -> Result<(), ServiceError> {
443
941
	if let Some(PrometheusConfig { registry, .. }) = config.prometheus_config.as_mut() {
444
3
		let labels = hashmap! {
445
3
			"chain".into() => config.chain_spec.id().into(),
446
3
		};
447
3
		let prefix = if skip_prefix {
448
1
			None
449
		} else {
450
2
			Some("moonbeam".into())
451
		};
452

            
453
3
		*registry = Registry::new_custom(prefix, Some(labels))?;
454
938
	}
455

            
456
941
	Ok(())
457
941
}
458

            
459
/// Builds the PartialComponents for a parachain or development service
460
///
461
/// Use this function if you don't actually need the full service, but just the partial in order to
462
/// be able to perform chain operations.
463
#[allow(clippy::type_complexity)]
464
938
pub fn new_partial<RuntimeApi, Customizations>(
465
938
	config: &mut Configuration,
466
938
	rpc_config: &RpcConfig,
467
938
	dev_service: bool,
468
938
	legacy_block_import_strategy: bool,
469
938
) -> PartialComponentsResult<FullClient<RuntimeApi>, FullBackend>
470
938
where
471
938
	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
472
938
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
473
938
	Customizations: ClientCustomizations + 'static,
474
938
{
475
938
	set_prometheus_registry(config, rpc_config.no_prometheus_prefix)?;
476

            
477
	// Use ethereum style for subscription ids
478
938
	config.rpc.id_provider = Some(Box::new(fc_rpc::EthereumSubIdProvider));
479

            
480
938
	let telemetry = config
481
938
		.telemetry_endpoints
482
938
		.clone()
483
938
		.filter(|x| !x.is_empty())
484
938
		.map(|endpoints| -> Result<_, sc_telemetry::Error> {
485
			let worker = TelemetryWorker::new(16)?;
486
			let telemetry = worker.handle().new_telemetry(endpoints);
487
			Ok((worker, telemetry))
488
938
		})
489
938
		.transpose()?;
490

            
491
938
	let heap_pages = config
492
938
		.executor
493
938
		.default_heap_pages
494
938
		.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
495
			extra_pages: h as _,
496
938
		});
497
938
	let mut wasm_builder = WasmExecutor::builder()
498
938
		.with_execution_method(config.executor.wasm_method)
499
938
		.with_onchain_heap_alloc_strategy(heap_pages)
500
938
		.with_offchain_heap_alloc_strategy(heap_pages)
501
938
		.with_ignore_onchain_heap_pages(true)
502
938
		.with_max_runtime_instances(config.executor.max_runtime_instances)
503
938
		.with_runtime_cache_size(config.executor.runtime_cache_size);
504

            
505
938
	if let Some(ref wasmtime_precompiled_path) = config.executor.wasmtime_precompiled {
506
936
		wasm_builder = wasm_builder.with_wasmtime_precompiled_path(wasmtime_precompiled_path);
507
936
	}
508

            
509
938
	let executor = wasm_builder.build();
510

            
511
938
	let (client, backend, keystore_container, task_manager) =
512
938
		sc_service::new_full_parts_record_import::<Block, RuntimeApi, _>(
513
938
			config,
514
938
			telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
515
938
			executor,
516
938
			true,
517
938
		)?;
518

            
519
938
	if let Some(block_number) = Customizations::first_block_number_compatible_with_ed25519_zebra() {
520
938
		client
521
938
			.execution_extensions()
522
938
			.set_extensions_factory(sc_client_api::execution_extensions::ExtensionBeforeBlock::<
523
938
			Block,
524
938
			sp_io::UseDalekExt,
525
938
		>::new(block_number));
526
938
	}
527

            
528
938
	let client = Arc::new(client);
529
938

            
530
938
	let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
531
938

            
532
938
	let telemetry = telemetry.map(|(worker, telemetry)| {
533
		task_manager
534
			.spawn_handle()
535
			.spawn("telemetry", None, worker.run());
536
		telemetry
537
938
	});
538

            
539
938
	let maybe_select_chain = if dev_service {
540
936
		Some(sc_consensus::LongestChain::new(backend.clone()))
541
	} else {
542
2
		None
543
	};
544

            
545
938
	let transaction_pool = sc_transaction_pool::BasicPool::new_full(
546
938
		config.transaction_pool.clone(),
547
938
		config.role.is_authority().into(),
548
938
		config.prometheus_registry(),
549
938
		task_manager.spawn_essential_handle(),
550
938
		client.clone(),
551
938
	);
552
938

            
553
938
	let filter_pool: Option<FilterPool> = Some(Arc::new(Mutex::new(BTreeMap::new())));
554
938
	let fee_history_cache: FeeHistoryCache = Arc::new(Mutex::new(BTreeMap::new()));
555

            
556
938
	let frontier_backend = Arc::new(open_frontier_backend(client.clone(), config, rpc_config)?);
557
938
	let frontier_block_import = FrontierBlockImport::new(client.clone(), client.clone());
558
938

            
559
938
	let create_inherent_data_providers = move |_, _| async move {
560
		let time = sp_timestamp::InherentDataProvider::from_system_time();
561
		Ok((time,))
562
	};
563

            
564
938
	let (import_queue, block_import) = if dev_service {
565
		(
566
936
			nimbus_consensus::import_queue(
567
936
				client.clone(),
568
936
				frontier_block_import.clone(),
569
936
				create_inherent_data_providers,
570
936
				&task_manager.spawn_essential_handle(),
571
936
				config.prometheus_registry(),
572
936
				legacy_block_import_strategy,
573
936
			)?,
574
936
			BlockImportPipeline::Dev(frontier_block_import),
575
		)
576
	} else {
577
2
		let parachain_block_import = if legacy_block_import_strategy {
578
			ParachainBlockImport::new_with_delayed_best_block(
579
				frontier_block_import,
580
				backend.clone(),
581
			)
582
		} else {
583
2
			ParachainBlockImport::new(frontier_block_import, backend.clone())
584
		};
585
		(
586
2
			nimbus_consensus::import_queue(
587
2
				client.clone(),
588
2
				parachain_block_import.clone(),
589
2
				create_inherent_data_providers,
590
2
				&task_manager.spawn_essential_handle(),
591
2
				config.prometheus_registry(),
592
2
				legacy_block_import_strategy,
593
2
			)?,
594
2
			BlockImportPipeline::Parachain(parachain_block_import),
595
		)
596
	};
597

            
598
938
	Ok(PartialComponents {
599
938
		backend,
600
938
		client,
601
938
		import_queue,
602
938
		keystore_container,
603
938
		task_manager,
604
938
		transaction_pool,
605
938
		select_chain: maybe_select_chain,
606
938
		other: (
607
938
			block_import,
608
938
			filter_pool,
609
938
			telemetry,
610
938
			telemetry_worker_handle,
611
938
			frontier_backend,
612
938
			fee_history_cache,
613
938
		),
614
938
	})
615
938
}
616

            
617
async fn build_relay_chain_interface(
618
	polkadot_config: Configuration,
619
	parachain_config: &Configuration,
620
	telemetry_worker_handle: Option<TelemetryWorkerHandle>,
621
	task_manager: &mut TaskManager,
622
	collator_options: CollatorOptions,
623
	hwbench: Option<sc_sysinfo::HwBench>,
624
) -> RelayChainResult<(
625
	Arc<(dyn RelayChainInterface + 'static)>,
626
	Option<CollatorPair>,
627
)> {
628
	if let cumulus_client_cli::RelayChainMode::ExternalRpc(rpc_target_urls) =
629
		collator_options.relay_chain_mode
630
	{
631
		build_minimal_relay_chain_node_with_rpc(polkadot_config, task_manager, rpc_target_urls)
632
			.await
633
	} else {
634
		build_inprocess_relay_chain(
635
			polkadot_config,
636
			parachain_config,
637
			telemetry_worker_handle,
638
			task_manager,
639
			hwbench,
640
		)
641
	}
642
}
643

            
644
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
645
///
646
/// This is the actual implementation that is abstract over the executor and the runtime api.
647
#[sc_tracing::logging::prefix_logs_with("🌗")]
648
async fn start_node_impl<RuntimeApi, Customizations, Net>(
649
	parachain_config: Configuration,
650
	polkadot_config: Configuration,
651
	collator_options: CollatorOptions,
652
	para_id: ParaId,
653
	rpc_config: RpcConfig,
654
	async_backing: bool,
655
	block_authoring_duration: Duration,
656
	hwbench: Option<sc_sysinfo::HwBench>,
657
	legacy_block_import_strategy: bool,
658
	nimbus_full_pov: bool,
659
) -> sc_service::error::Result<(TaskManager, Arc<FullClient<RuntimeApi>>)>
660
where
661
	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
662
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
663
	Customizations: ClientCustomizations + 'static,
664
	Net: NetworkBackend<Block, Hash>,
665
{
666
	let mut parachain_config = prepare_node_config(parachain_config);
667

            
668
	let params = new_partial::<RuntimeApi, Customizations>(
669
		&mut parachain_config,
670
		&rpc_config,
671
		false,
672
		legacy_block_import_strategy,
673
	)?;
674
	let (
675
		block_import,
676
		filter_pool,
677
		mut telemetry,
678
		telemetry_worker_handle,
679
		frontier_backend,
680
		fee_history_cache,
681
	) = params.other;
682

            
683
	let client = params.client.clone();
684
	let backend = params.backend.clone();
685
	let mut task_manager = params.task_manager;
686

            
687
	let (relay_chain_interface, collator_key) = build_relay_chain_interface(
688
		polkadot_config,
689
		&parachain_config,
690
		telemetry_worker_handle,
691
		&mut task_manager,
692
		collator_options.clone(),
693
		hwbench.clone(),
694
	)
695
	.await
696
	.map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
697

            
698
	let force_authoring = parachain_config.force_authoring;
699
	let collator = parachain_config.role.is_authority();
700
	let prometheus_registry = parachain_config.prometheus_registry().cloned();
701
	let transaction_pool = params.transaction_pool.clone();
702
	let import_queue_service = params.import_queue.service();
703
	let net_config = FullNetworkConfiguration::<_, _, Net>::new(
704
		&parachain_config.network,
705
		prometheus_registry.clone(),
706
	);
707

            
708
	let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) =
709
		cumulus_client_service::build_network(cumulus_client_service::BuildNetworkParams {
710
			parachain_config: &parachain_config,
711
			client: client.clone(),
712
			transaction_pool: transaction_pool.clone(),
713
			spawn_handle: task_manager.spawn_handle(),
714
			import_queue: params.import_queue,
715
			para_id,
716
			relay_chain_interface: relay_chain_interface.clone(),
717
			net_config,
718
			sybil_resistance_level: CollatorSybilResistance::Resistant,
719
		})
720
		.await?;
721

            
722
	let overrides = Arc::new(StorageOverrideHandler::new(client.clone()));
723
	let fee_history_limit = rpc_config.fee_history_limit;
724

            
725
	// Sinks for pubsub notifications.
726
	// Everytime a new subscription is created, a new mpsc channel is added to the sink pool.
727
	// The MappingSyncWorker sends through the channel on block import and the subscription emits a
728
	// notification to the subscriber on receiving a message through this channel.
729
	// This way we avoid race conditions when using native substrate block import notification
730
	// stream.
731
	let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
732
		fc_mapping_sync::EthereumBlockNotification<Block>,
733
	> = Default::default();
734
	let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
735

            
736
	rpc::spawn_essential_tasks(
737
		rpc::SpawnTasksParams {
738
			task_manager: &task_manager,
739
			client: client.clone(),
740
			substrate_backend: backend.clone(),
741
			frontier_backend: frontier_backend.clone(),
742
			filter_pool: filter_pool.clone(),
743
			overrides: overrides.clone(),
744
			fee_history_limit,
745
			fee_history_cache: fee_history_cache.clone(),
746
		},
747
		sync_service.clone(),
748
		pubsub_notification_sinks.clone(),
749
	);
750

            
751
	let ethapi_cmd = rpc_config.ethapi.clone();
752
	let tracing_requesters =
753
		if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
754
			rpc::tracing::spawn_tracing_tasks(
755
				&rpc_config,
756
				prometheus_registry.clone(),
757
				rpc::SpawnTasksParams {
758
					task_manager: &task_manager,
759
					client: client.clone(),
760
					substrate_backend: backend.clone(),
761
					frontier_backend: frontier_backend.clone(),
762
					filter_pool: filter_pool.clone(),
763
					overrides: overrides.clone(),
764
					fee_history_limit,
765
					fee_history_cache: fee_history_cache.clone(),
766
				},
767
			)
768
		} else {
769
			rpc::tracing::RpcRequesters {
770
				debug: None,
771
				trace: None,
772
			}
773
		};
774

            
775
	let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
776
		task_manager.spawn_handle(),
777
		overrides.clone(),
778
		rpc_config.eth_log_block_cache,
779
		rpc_config.eth_statuses_cache,
780
		prometheus_registry.clone(),
781
	));
782

            
783
	let rpc_builder = {
784
		let client = client.clone();
785
		let pool = transaction_pool.clone();
786
		let network = network.clone();
787
		let sync = sync_service.clone();
788
		let filter_pool = filter_pool.clone();
789
		let frontier_backend = frontier_backend.clone();
790
		let backend = backend.clone();
791
		let ethapi_cmd = ethapi_cmd.clone();
792
		let max_past_logs = rpc_config.max_past_logs;
793
		let max_block_range = rpc_config.max_block_range;
794
		let overrides = overrides.clone();
795
		let fee_history_cache = fee_history_cache.clone();
796
		let block_data_cache = block_data_cache.clone();
797
		let pubsub_notification_sinks = pubsub_notification_sinks.clone();
798

            
799
		let keystore = params.keystore_container.keystore();
800
		move |subscription_task_executor| {
801
			#[cfg(feature = "moonbase-native")]
802
			let forced_parent_hashes = {
803
				let mut forced_parent_hashes = BTreeMap::new();
804
				// Fixes for https://github.com/paritytech/frontier/pull/570
805
				// #1648995
806
				forced_parent_hashes.insert(
807
					H256::from_str(
808
						"0xa352fee3eef9c554a31ec0612af887796a920613358abf3353727760ea14207b",
809
					)
810
					.expect("must be valid hash"),
811
					H256::from_str(
812
						"0x0d0fd88778aec08b3a83ce36387dbf130f6f304fc91e9a44c9605eaf8a80ce5d",
813
					)
814
					.expect("must be valid hash"),
815
				);
816
				Some(forced_parent_hashes)
817
			};
818
			#[cfg(not(feature = "moonbase-native"))]
819
			let forced_parent_hashes = None;
820

            
821
			let deps = rpc::FullDeps {
822
				backend: backend.clone(),
823
				client: client.clone(),
824
				command_sink: None,
825
				ethapi_cmd: ethapi_cmd.clone(),
826
				filter_pool: filter_pool.clone(),
827
				frontier_backend: match &*frontier_backend {
828
					fc_db::Backend::KeyValue(b) => b.clone(),
829
					fc_db::Backend::Sql(b) => b.clone(),
830
				},
831
				graph: pool.pool().clone(),
832
				pool: pool.clone(),
833
				is_authority: collator,
834
				max_past_logs,
835
				max_block_range,
836
				fee_history_limit,
837
				fee_history_cache: fee_history_cache.clone(),
838
				network: network.clone(),
839
				sync: sync.clone(),
840
				dev_rpc_data: None,
841
				block_data_cache: block_data_cache.clone(),
842
				overrides: overrides.clone(),
843
				forced_parent_hashes,
844
			};
845
			let pending_consensus_data_provider = Box::new(PendingConsensusDataProvider::new(
846
				client.clone(),
847
				keystore.clone(),
848
			));
849
			if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
850
				rpc::create_full(
851
					deps,
852
					subscription_task_executor,
853
					Some(crate::rpc::TracingConfig {
854
						tracing_requesters: tracing_requesters.clone(),
855
						trace_filter_max_count: rpc_config.ethapi_trace_max_count,
856
					}),
857
					pubsub_notification_sinks.clone(),
858
					pending_consensus_data_provider,
859
				)
860
				.map_err(Into::into)
861
			} else {
862
				rpc::create_full(
863
					deps,
864
					subscription_task_executor,
865
					None,
866
					pubsub_notification_sinks.clone(),
867
					pending_consensus_data_provider,
868
				)
869
				.map_err(Into::into)
870
			}
871
		}
872
	};
873

            
874
	sc_service::spawn_tasks(sc_service::SpawnTasksParams {
875
		rpc_builder: Box::new(rpc_builder),
876
		client: client.clone(),
877
		transaction_pool: transaction_pool.clone(),
878
		task_manager: &mut task_manager,
879
		config: parachain_config,
880
		keystore: params.keystore_container.keystore(),
881
		backend: backend.clone(),
882
		network: network.clone(),
883
		sync_service: sync_service.clone(),
884
		system_rpc_tx,
885
		tx_handler_controller,
886
		telemetry: telemetry.as_mut(),
887
	})?;
888

            
889
	if let Some(hwbench) = hwbench {
890
		sc_sysinfo::print_hwbench(&hwbench);
891

            
892
		if let Some(ref mut telemetry) = telemetry {
893
			let telemetry_handle = telemetry.handle();
894
			task_manager.spawn_handle().spawn(
895
				"telemetry_hwbench",
896
				None,
897
				sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
898
			);
899
		}
900
	}
901

            
902
	let announce_block = {
903
		let sync_service = sync_service.clone();
904
		Arc::new(move |hash, data| sync_service.announce_block(hash, data))
905
	};
906

            
907
	let relay_chain_slot_duration = Duration::from_secs(6);
908
	let overseer_handle = relay_chain_interface
909
		.overseer_handle()
910
		.map_err(|e| sc_service::Error::Application(Box::new(e)))?;
911

            
912
	start_relay_chain_tasks(StartRelayChainTasksParams {
913
		client: client.clone(),
914
		announce_block: announce_block.clone(),
915
		para_id,
916
		relay_chain_interface: relay_chain_interface.clone(),
917
		task_manager: &mut task_manager,
918
		da_recovery_profile: if collator {
919
			DARecoveryProfile::Collator
920
		} else {
921
			DARecoveryProfile::FullNode
922
		},
923
		import_queue: import_queue_service,
924
		relay_chain_slot_duration,
925
		recovery_handle: Box::new(overseer_handle.clone()),
926
		sync_service: sync_service.clone(),
927
	})?;
928

            
929
	let BlockImportPipeline::Parachain(block_import) = block_import else {
930
		return Err(sc_service::Error::Other(
931
			"Block import pipeline is not for parachain".into(),
932
		));
933
	};
934

            
935
	if collator {
936
		start_consensus::<RuntimeApi, _>(
937
			async_backing,
938
			backend.clone(),
939
			client.clone(),
940
			block_import,
941
			prometheus_registry.as_ref(),
942
			telemetry.as_ref().map(|t| t.handle()),
943
			&task_manager,
944
			relay_chain_interface.clone(),
945
			transaction_pool,
946
			params.keystore_container.keystore(),
947
			para_id,
948
			collator_key.expect("Command line arguments do not allow this. qed"),
949
			overseer_handle,
950
			announce_block,
951
			force_authoring,
952
			relay_chain_slot_duration,
953
			block_authoring_duration,
954
			sync_service.clone(),
955
			nimbus_full_pov,
956
		)?;
957
		/*let parachain_consensus = build_consensus(
958
			client.clone(),
959
			backend,
960
			block_import,
961
			prometheus_registry.as_ref(),
962
			telemetry.as_ref().map(|t| t.handle()),
963
			&task_manager,
964
			relay_chain_interface.clone(),
965
			transaction_pool,
966
			sync_service.clone(),
967
			params.keystore_container.keystore(),
968
			force_authoring,
969
		)?;
970

            
971
		let spawner = task_manager.spawn_handle();
972

            
973
		let params = StartCollatorParams {
974
			para_id,
975
			block_status: client.clone(),
976
			announce_block,
977
			client: client.clone(),
978
			task_manager: &mut task_manager,
979
			relay_chain_interface,
980
			spawner,
981
			parachain_consensus,
982
			import_queue: import_queue_service,
983
			recovery_handle: Box::new(overseer_handle),
984
			collator_key: collator_key.ok_or(sc_service::error::Error::Other(
985
				"Collator Key is None".to_string(),
986
			))?,
987
			relay_chain_slot_duration,
988
			sync_service,
989
		};
990

            
991
		#[allow(deprecated)]
992
		start_collator(params).await?;*/
993
	}
994

            
995
	start_network.start_network();
996

            
997
	Ok((task_manager, client))
998
}
999

            
fn start_consensus<RuntimeApi, SO>(
	async_backing: bool,
	backend: Arc<FullBackend>,
	client: Arc<FullClient<RuntimeApi>>,
	block_import: ParachainBlockImport<FullClient<RuntimeApi>, FullBackend>,
	prometheus_registry: Option<&Registry>,
	telemetry: Option<TelemetryHandle>,
	task_manager: &TaskManager,
	relay_chain_interface: Arc<dyn RelayChainInterface>,
	transaction_pool: Arc<sc_transaction_pool::FullPool<Block, FullClient<RuntimeApi>>>,
	keystore: KeystorePtr,
	para_id: ParaId,
	collator_key: CollatorPair,
	overseer_handle: OverseerHandle,
	announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
	force_authoring: bool,
	relay_chain_slot_duration: Duration,
	block_authoring_duration: Duration,
	sync_oracle: SO,
	nimbus_full_pov: bool,
) -> Result<(), sc_service::Error>
where
	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
	sc_client_api::StateBackendFor<FullBackend, Block>: sc_client_api::StateBackend<BlakeTwo256>,
	SO: SyncOracle + Send + Sync + Clone + 'static,
{
	let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording(
		task_manager.spawn_handle(),
		client.clone(),
		transaction_pool,
		prometheus_registry,
		telemetry.clone(),
	);
	let proposer = Proposer::new(proposer_factory);
	let collator_service = CollatorService::new(
		client.clone(),
		Arc::new(task_manager.spawn_handle()),
		announce_block,
		client.clone(),
	);
	let create_inherent_data_providers = |_, _| async move {
		let time = sp_timestamp::InherentDataProvider::from_system_time();
		let author = nimbus_primitives::InherentDataProvider;
		let randomness = session_keys_primitives::InherentDataProvider;
		Ok((time, author, randomness))
	};
	let client_clone = client.clone();
	let keystore_clone = keystore.clone();
	let maybe_provide_vrf_digest =
		move |nimbus_id: NimbusId, parent: Hash| -> Option<sp_runtime::generic::DigestItem> {
			moonbeam_vrf::vrf_pre_digest::<Block, FullClient<RuntimeApi>>(
				&client_clone,
				&keystore_clone,
				nimbus_id,
				parent,
			)
		};
	if async_backing {
		log::info!("Collator started with asynchronous backing.");
		let client_clone = client.clone();
		let code_hash_provider = move |block_hash| {
			client_clone
				.code_at(block_hash)
				.ok()
				.map(polkadot_primitives::ValidationCode)
				.map(|c| c.hash())
		};
		task_manager.spawn_essential_handle().spawn(
			"nimbus",
			None,
			nimbus_consensus::collators::lookahead::run::<
				Block,
				_,
				_,
				_,
				FullBackend,
				_,
				_,
				_,
				_,
				_,
				_,
			>(nimbus_consensus::collators::lookahead::Params {
				additional_digests_provider: maybe_provide_vrf_digest,
				additional_relay_keys: vec![
					moonbeam_core_primitives::well_known_relay_keys::TIMESTAMP_NOW.to_vec(),
				],
				authoring_duration: block_authoring_duration,
				block_import,
				code_hash_provider,
				collator_key,
				collator_service,
				create_inherent_data_providers,
				force_authoring,
				keystore,
				overseer_handle,
				para_backend: backend,
				para_client: client,
				para_id,
				proposer,
				relay_chain_slot_duration,
				relay_client: relay_chain_interface,
				slot_duration: None,
				sync_oracle,
				reinitialize: false,
				full_pov_size: nimbus_full_pov,
			}),
		);
	} else {
		log::info!("Collator started without asynchronous backing.");
		task_manager.spawn_essential_handle().spawn(
			"nimbus",
			None,
			nimbus_consensus::collators::basic::run::<Block, _, _, FullBackend, _, _, _, _, _>(
				nimbus_consensus::collators::basic::Params {
					additional_digests_provider: maybe_provide_vrf_digest,
					additional_relay_keys: vec![
						moonbeam_core_primitives::well_known_relay_keys::TIMESTAMP_NOW.to_vec(),
					],
					//authoring_duration: Duration::from_millis(500),
					block_import,
					collator_key,
					collator_service,
					create_inherent_data_providers,
					force_authoring,
					keystore,
					overseer_handle,
					para_id,
					para_client: client,
					proposer,
					relay_client: relay_chain_interface,
					full_pov_size: nimbus_full_pov,
				},
			),
		);
	};
	Ok(())
}
/// Start a normal parachain node.
// Rustfmt wants to format the closure with space identation.
#[rustfmt::skip]
pub async fn start_node<RuntimeApi, Customizations>(
	parachain_config: Configuration,
	polkadot_config: Configuration,
	collator_options: CollatorOptions,
	para_id: ParaId,
	rpc_config: RpcConfig,
	async_backing: bool,
	block_authoring_duration: Duration,
	hwbench: Option<sc_sysinfo::HwBench>,
	legacy_block_import_strategy: bool,
	nimbus_full_pov: bool,
) -> sc_service::error::Result<(TaskManager, Arc<FullClient<RuntimeApi>>)>
where
	RuntimeApi:
		ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
	RuntimeApi::RuntimeApi:
		RuntimeApiCollection,
	Customizations: ClientCustomizations + 'static,
{
	start_node_impl::<RuntimeApi, Customizations, sc_network::NetworkWorker<_, _>>(
		parachain_config,
		polkadot_config,
		collator_options,
		para_id,
		rpc_config,
		async_backing,
		block_authoring_duration,
		hwbench,
		legacy_block_import_strategy,
		nimbus_full_pov,
	)
	.await
}
/// Builds a new development service. This service uses manual seal, and mocks
/// the parachain inherent.
936
pub async fn new_dev<RuntimeApi, Customizations, Net>(
936
	mut config: Configuration,
936
	para_id: Option<u32>,
936
	_author_id: Option<NimbusId>,
936
	sealing: moonbeam_cli_opt::Sealing,
936
	rpc_config: RpcConfig,
936
	hwbench: Option<sc_sysinfo::HwBench>,
936
) -> Result<TaskManager, ServiceError>
936
where
936
	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
936
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
936
	Customizations: ClientCustomizations + 'static,
936
	Net: NetworkBackend<Block, Hash>,
936
{
	use async_io::Timer;
	use futures::Stream;
	use sc_consensus_manual_seal::{run_manual_seal, EngineCommand, ManualSealParams};
	let sc_service::PartialComponents {
936
		client,
936
		backend,
936
		mut task_manager,
936
		import_queue,
936
		keystore_container,
936
		select_chain: maybe_select_chain,
936
		transaction_pool,
936
		other:
936
			(
936
				block_import_pipeline,
936
				filter_pool,
936
				mut telemetry,
936
				_telemetry_worker_handle,
936
				frontier_backend,
936
				fee_history_cache,
			),
936
	} = new_partial::<RuntimeApi, Customizations>(&mut config, &rpc_config, true, true)?;
936
	let block_import = if let BlockImportPipeline::Dev(block_import) = block_import_pipeline {
936
		block_import
	} else {
		return Err(ServiceError::Other(
			"Block import pipeline is not dev".to_string(),
		));
	};
936
	let prometheus_registry = config.prometheus_registry().cloned();
936
	let net_config =
936
		FullNetworkConfiguration::<_, _, Net>::new(&config.network, prometheus_registry.clone());
936

            
936
	let metrics = Net::register_notification_metrics(
936
		config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
936
	);
936
	let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
936
		sc_service::build_network(sc_service::BuildNetworkParams {
936
			config: &config,
936
			client: client.clone(),
936
			transaction_pool: transaction_pool.clone(),
936
			spawn_handle: task_manager.spawn_handle(),
936
			import_queue,
936
			block_announce_validator_builder: None,
936
			warp_sync_config: None,
936
			net_config,
936
			block_relay: None,
936
			metrics,
936
		})?;
936
	if config.offchain_worker.enabled {
936
		task_manager.spawn_handle().spawn(
936
			"offchain-workers-runner",
936
			"offchain-work",
936
			sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
936
				runtime_api_provider: client.clone(),
936
				keystore: Some(keystore_container.keystore()),
936
				offchain_db: backend.offchain_storage(),
936
				transaction_pool: Some(OffchainTransactionPoolFactory::new(
936
					transaction_pool.clone(),
936
				)),
936
				network_provider: Arc::new(network.clone()),
936
				is_validator: config.role.is_authority(),
936
				enable_http_requests: true,
26772
				custom_extensions: move |_| vec![],
936
			})
936
			.run(client.clone(), task_manager.spawn_handle())
936
			.boxed(),
936
		);
936
	}
936
	let prometheus_registry = config.prometheus_registry().cloned();
936
	let overrides = Arc::new(StorageOverrideHandler::new(client.clone()));
936
	let fee_history_limit = rpc_config.fee_history_limit;
936
	let mut command_sink = None;
936
	let mut dev_rpc_data = None;
936
	let collator = config.role.is_authority();
936

            
936
	if collator {
936
		let mut env = sc_basic_authorship::ProposerFactory::with_proof_recording(
936
			task_manager.spawn_handle(),
936
			client.clone(),
936
			transaction_pool.clone(),
936
			prometheus_registry.as_ref(),
936
			telemetry.as_ref().map(|x| x.handle()),
936
		);
936
		env.set_soft_deadline(SOFT_DEADLINE_PERCENT);
		// TODO: Need to cherry-pick
		//
		// https://github.com/moonbeam-foundation/substrate/commit/
		// d59476b362e38071d44d32c98c32fb35fd280930#diff-a1c022c97c7f9200cab161864c
		// 06d204f0c8b689955e42177731e232115e9a6f
		//
		// env.enable_ensure_proof_size_limit_after_each_extrinsic();
936
		let commands_stream: Box<dyn Stream<Item = EngineCommand<H256>> + Send + Sync + Unpin> =
936
			match sealing {
				moonbeam_cli_opt::Sealing::Instant => {
					Box::new(
						// This bit cribbed from the implementation of instant seal.
						transaction_pool
							.pool()
							.validated_pool()
							.import_notification_stream()
							.map(|_| EngineCommand::SealNewBlock {
								create_empty: false,
								finalize: false,
								parent_hash: None,
								sender: None,
							}),
					)
				}
				moonbeam_cli_opt::Sealing::Manual => {
936
					let (sink, stream) = futures::channel::mpsc::channel(1000);
936
					// Keep a reference to the other end of the channel. It goes to the RPC.
936
					command_sink = Some(sink);
936
					Box::new(stream)
				}
				moonbeam_cli_opt::Sealing::Interval(millis) => Box::new(StreamExt::map(
					Timer::interval(Duration::from_millis(millis)),
					|_| EngineCommand::SealNewBlock {
						create_empty: true,
						finalize: false,
						parent_hash: None,
						sender: None,
					},
				)),
			};
936
		let select_chain = maybe_select_chain.expect(
936
			"`new_partial` builds a `LongestChainRule` when building dev service.\
936
				We specified the dev service when calling `new_partial`.\
936
				Therefore, a `LongestChainRule` is present. qed.",
936
		);
936

            
936
		let client_set_aside_for_cidp = client.clone();
936

            
936
		// Create channels for mocked XCM messages.
936
		let (downward_xcm_sender, downward_xcm_receiver) = flume::bounded::<Vec<u8>>(100);
936
		let (hrmp_xcm_sender, hrmp_xcm_receiver) = flume::bounded::<(ParaId, Vec<u8>)>(100);
936
		let additional_relay_offset = Arc::new(std::sync::atomic::AtomicU32::new(0));
936
		dev_rpc_data = Some((
936
			downward_xcm_sender,
936
			hrmp_xcm_sender,
936
			additional_relay_offset.clone(),
936
		));
936

            
936
		let client_clone = client.clone();
936
		let keystore_clone = keystore_container.keystore().clone();
936
		let maybe_provide_vrf_digest =
26788
			move |nimbus_id: NimbusId, parent: Hash| -> Option<sp_runtime::generic::DigestItem> {
26788
				moonbeam_vrf::vrf_pre_digest::<Block, FullClient<RuntimeApi>>(
26788
					&client_clone,
26788
					&keystore_clone,
26788
					nimbus_id,
26788
					parent,
26788
				)
26788
			};
936
		task_manager.spawn_essential_handle().spawn_blocking(
936
			"authorship_task",
936
			Some("block-authoring"),
936
			run_manual_seal(ManualSealParams {
936
				block_import,
936
				env,
936
				client: client.clone(),
936
				pool: transaction_pool.clone(),
936
				commands_stream,
936
				select_chain,
936
				consensus_data_provider: Some(Box::new(NimbusManualSealConsensusDataProvider {
936
					keystore: keystore_container.keystore(),
936
					client: client.clone(),
936
					additional_digests_provider: maybe_provide_vrf_digest,
936
					_phantom: Default::default(),
936
				})),
26788
				create_inherent_data_providers: move |block: H256, ()| {
26788
					let maybe_current_para_block = client_set_aside_for_cidp.number(block);
26788
					let maybe_current_para_head = client_set_aside_for_cidp.expect_header(block);
26788
					let downward_xcm_receiver = downward_xcm_receiver.clone();
26788
					let hrmp_xcm_receiver = hrmp_xcm_receiver.clone();
26788
					let additional_relay_offset = additional_relay_offset.clone();
26788
					let relay_slot_key = well_known_keys::CURRENT_SLOT.to_vec();
26788

            
26788
					let client_for_xcm = client_set_aside_for_cidp.clone();
26788
					async move {
26788
						let time = MockTimestampInherentDataProvider;
26788
						let current_para_block = maybe_current_para_block?
26788
							.ok_or(sp_blockchain::Error::UnknownBlock(block.to_string()))?;
26788
						let current_para_block_head = Some(polkadot_primitives::HeadData(
26788
							maybe_current_para_head?.encode(),
26788
						));
26788

            
26788
						// Get the mocked timestamp
26788
						let timestamp = TIMESTAMP.load(Ordering::SeqCst);
26788
						// Calculate mocked slot number (should be consecutively 1, 2, ...)
26788
						let slot = timestamp.saturating_div(RELAY_CHAIN_SLOT_DURATION_MILLIS);
26788

            
26788
						let mut additional_key_values = vec![
26788
							(
26788
								moonbeam_core_primitives::well_known_relay_keys::TIMESTAMP_NOW
26788
									.to_vec(),
26788
								sp_timestamp::Timestamp::current().encode(),
26788
							),
26788
							(relay_slot_key, Slot::from(slot).encode()),
26788
							(
26788
								relay_chain::well_known_keys::ACTIVE_CONFIG.to_vec(),
26788
								AbridgedHostConfiguration {
26788
									max_code_size: 3_145_728,
26788
									max_head_data_size: 20_480,
26788
									max_upward_queue_count: 174_762,
26788
									max_upward_queue_size: 1_048_576,
26788
									max_upward_message_size: 65_531,
26788
									max_upward_message_num_per_candidate: 16,
26788
									hrmp_max_message_num_per_candidate: 10,
26788
									validation_upgrade_cooldown: 6,
26788
									validation_upgrade_delay: 6,
26788
									async_backing_params: AsyncBackingParams {
26788
										max_candidate_depth: 3,
26788
										allowed_ancestry_len: 2,
26788
									},
26788
								}
26788
								.encode(),
26788
							),
26788
						];
26788

            
26788
						let storage_key = [
26788
							twox_128(b"ParachainSystem"),
26788
							twox_128(b"PendingValidationCode"),
26788
						]
26788
						.concat();
26788
						let has_pending_upgrade = client_for_xcm
26788
							.storage(block, &sp_storage::StorageKey(storage_key))
26788
							.map_or(false, |ok| ok.map_or(false, |some| !some.0.is_empty()));
26788
						if has_pending_upgrade {
							additional_key_values.push((
								relay_chain::well_known_keys::upgrade_go_ahead_signal(ParaId::new(
									para_id.unwrap(),
								)),
								Some(relay_chain::UpgradeGoAhead::GoAhead).encode(),
							));
26788
						}
26788
						let mocked_parachain = MockValidationDataInherentDataProvider {
26788
							current_para_block,
26788
							para_id: para_id.unwrap().into(),
26788
							current_para_block_head,
26788
							relay_offset: 1000
26788
								+ additional_relay_offset.load(std::sync::atomic::Ordering::SeqCst),
26788
							relay_blocks_per_para_block: 2,
26788
							// TODO: Recheck
26788
							para_blocks_per_relay_epoch: 10,
26788
							relay_randomness_config: (),
26788
							xcm_config: MockXcmConfig::new(
26788
								&*client_for_xcm,
26788
								block,
26788
								Default::default(),
26788
							),
26788
							raw_downward_messages: downward_xcm_receiver.drain().collect(),
26788
							raw_horizontal_messages: hrmp_xcm_receiver.drain().collect(),
26788
							additional_key_values: Some(additional_key_values),
26788
						};
26788

            
26788
						let randomness = session_keys_primitives::InherentDataProvider;
26788

            
26788
						Ok((time, mocked_parachain, randomness))
26788
					}
26788
				},
936
			}),
936
		);
	}
	// Sinks for pubsub notifications.
	// Everytime a new subscription is created, a new mpsc channel is added to the sink pool.
	// The MappingSyncWorker sends through the channel on block import and the subscription emits a
	// notification to the subscriber on receiving a message through this channel.
	// This way we avoid race conditions when using native substrate block import notification
	// stream.
936
	let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
936
		fc_mapping_sync::EthereumBlockNotification<Block>,
936
	> = Default::default();
936
	let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
936

            
936
	rpc::spawn_essential_tasks(
936
		rpc::SpawnTasksParams {
936
			task_manager: &task_manager,
936
			client: client.clone(),
936
			substrate_backend: backend.clone(),
936
			frontier_backend: frontier_backend.clone(),
936
			filter_pool: filter_pool.clone(),
936
			overrides: overrides.clone(),
936
			fee_history_limit,
936
			fee_history_cache: fee_history_cache.clone(),
936
		},
936
		sync_service.clone(),
936
		pubsub_notification_sinks.clone(),
936
	);
936
	let ethapi_cmd = rpc_config.ethapi.clone();
936
	let tracing_requesters =
936
		if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
			rpc::tracing::spawn_tracing_tasks(
				&rpc_config,
				prometheus_registry.clone(),
				rpc::SpawnTasksParams {
					task_manager: &task_manager,
					client: client.clone(),
					substrate_backend: backend.clone(),
					frontier_backend: frontier_backend.clone(),
					filter_pool: filter_pool.clone(),
					overrides: overrides.clone(),
					fee_history_limit,
					fee_history_cache: fee_history_cache.clone(),
				},
			)
		} else {
936
			rpc::tracing::RpcRequesters {
936
				debug: None,
936
				trace: None,
936
			}
		};
936
	let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
936
		task_manager.spawn_handle(),
936
		overrides.clone(),
936
		rpc_config.eth_log_block_cache,
936
		rpc_config.eth_statuses_cache,
936
		prometheus_registry,
936
	));
936

            
936
	let rpc_builder = {
936
		let client = client.clone();
936
		let pool = transaction_pool.clone();
936
		let backend = backend.clone();
936
		let network = network.clone();
936
		let sync = sync_service.clone();
936
		let ethapi_cmd = ethapi_cmd.clone();
936
		let max_past_logs = rpc_config.max_past_logs;
936
		let max_block_range = rpc_config.max_block_range;
936
		let overrides = overrides.clone();
936
		let fee_history_cache = fee_history_cache.clone();
936
		let block_data_cache = block_data_cache.clone();
936
		let pubsub_notification_sinks = pubsub_notification_sinks.clone();
936

            
936
		let keystore = keystore_container.keystore();
1872
		move |subscription_task_executor| {
1872
			let deps = rpc::FullDeps {
1872
				backend: backend.clone(),
1872
				client: client.clone(),
1872
				command_sink: command_sink.clone(),
1872
				ethapi_cmd: ethapi_cmd.clone(),
1872
				filter_pool: filter_pool.clone(),
1872
				frontier_backend: match &*frontier_backend {
1872
					fc_db::Backend::KeyValue(b) => b.clone(),
					fc_db::Backend::Sql(b) => b.clone(),
				},
1872
				graph: pool.pool().clone(),
1872
				pool: pool.clone(),
1872
				is_authority: collator,
1872
				max_past_logs,
1872
				max_block_range,
1872
				fee_history_limit,
1872
				fee_history_cache: fee_history_cache.clone(),
1872
				network: network.clone(),
1872
				sync: sync.clone(),
1872
				dev_rpc_data: dev_rpc_data.clone(),
1872
				overrides: overrides.clone(),
1872
				block_data_cache: block_data_cache.clone(),
1872
				forced_parent_hashes: None,
1872
			};
1872

            
1872
			let pending_consensus_data_provider = Box::new(PendingConsensusDataProvider::new(
1872
				client.clone(),
1872
				keystore.clone(),
1872
			));
1872
			if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
				rpc::create_full(
					deps,
					subscription_task_executor,
					Some(crate::rpc::TracingConfig {
						tracing_requesters: tracing_requesters.clone(),
						trace_filter_max_count: rpc_config.ethapi_trace_max_count,
					}),
					pubsub_notification_sinks.clone(),
					pending_consensus_data_provider,
				)
				.map_err(Into::into)
			} else {
1872
				rpc::create_full(
1872
					deps,
1872
					subscription_task_executor,
1872
					None,
1872
					pubsub_notification_sinks.clone(),
1872
					pending_consensus_data_provider,
1872
				)
1872
				.map_err(Into::into)
			}
1872
		}
	};
936
	let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
936
		network,
936
		client,
936
		keystore: keystore_container.keystore(),
936
		task_manager: &mut task_manager,
936
		transaction_pool,
936
		rpc_builder: Box::new(rpc_builder),
936
		backend,
936
		system_rpc_tx,
936
		sync_service: sync_service.clone(),
936
		config,
936
		tx_handler_controller,
936
		telemetry: None,
936
	})?;
936
	if let Some(hwbench) = hwbench {
		sc_sysinfo::print_hwbench(&hwbench);
		if let Some(ref mut telemetry) = telemetry {
			let telemetry_handle = telemetry.handle();
			task_manager.spawn_handle().spawn(
				"telemetry_hwbench",
				None,
				sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
			);
		}
936
	}
936
	log::info!("Development Service Ready");
936
	network_starter.start_network();
936
	Ok(task_manager)
936
}
#[cfg(test)]
mod tests {
	use crate::chain_spec::moonbase::ChainSpec;
	use crate::chain_spec::Extensions;
	use jsonrpsee::server::BatchRequestConfig;
	use moonbase_runtime::{currency::UNIT, AccountId};
	use prometheus::{proto::LabelPair, Counter};
	use sc_network::config::NetworkConfiguration;
	use sc_service::config::RpcConfiguration;
	use sc_service::ChainType;
	use sc_service::{
		config::{BasePath, DatabaseSource, KeystoreConfig},
		Configuration, Role,
	};
	use std::path::Path;
	use std::str::FromStr;
	use super::*;
	#[test]
1
	fn test_set_prometheus_registry_uses_moonbeam_prefix() {
1
		let counter_name = "my_counter";
1
		let expected_metric_name = "moonbeam_my_counter";
1
		let counter = Box::new(Counter::new(counter_name, "foobar").unwrap());
1
		let mut config = Configuration {
1
			prometheus_config: Some(PrometheusConfig::new_with_default_registry(
1
				"0.0.0.0:8080".parse().unwrap(),
1
				"".into(),
1
			)),
1
			..test_config("test")
1
		};
1

            
1
		set_prometheus_registry(&mut config, false).unwrap();
1
		// generate metric
1
		let reg = config.prometheus_registry().unwrap();
1
		reg.register(counter.clone()).unwrap();
1
		counter.inc();
1

            
1
		let actual_metric_name = reg.gather().first().unwrap().get_name().to_string();
1
		assert_eq!(actual_metric_name.as_str(), expected_metric_name);
1
	}
	#[test]
1
	fn test_set_prometheus_registry_skips_moonbeam_prefix() {
1
		let counter_name = "my_counter";
1
		let counter = Box::new(Counter::new(counter_name, "foobar").unwrap());
1
		let mut config = Configuration {
1
			prometheus_config: Some(PrometheusConfig::new_with_default_registry(
1
				"0.0.0.0:8080".parse().unwrap(),
1
				"".into(),
1
			)),
1
			..test_config("test")
1
		};
1

            
1
		set_prometheus_registry(&mut config, true).unwrap();
1
		// generate metric
1
		let reg = config.prometheus_registry().unwrap();
1
		reg.register(counter.clone()).unwrap();
1
		counter.inc();
1

            
1
		let actual_metric_name = reg.gather().first().unwrap().get_name().to_string();
1
		assert_eq!(actual_metric_name.as_str(), counter_name);
1
	}
	#[test]
1
	fn test_set_prometheus_registry_adds_chain_id_as_label() {
1
		let input_chain_id = "moonriver";
1

            
1
		let mut expected_label = LabelPair::default();
1
		expected_label.set_name("chain".to_owned());
1
		expected_label.set_value("moonriver".to_owned());
1
		let expected_chain_label = Some(expected_label);
1

            
1
		let counter = Box::new(Counter::new("foo", "foobar").unwrap());
1
		let mut config = Configuration {
1
			prometheus_config: Some(PrometheusConfig::new_with_default_registry(
1
				"0.0.0.0:8080".parse().unwrap(),
1
				"".into(),
1
			)),
1
			..test_config(input_chain_id)
1
		};
1

            
1
		set_prometheus_registry(&mut config, false).unwrap();
1
		// generate metric
1
		let reg = config.prometheus_registry().unwrap();
1
		reg.register(counter.clone()).unwrap();
1
		counter.inc();
1

            
1
		let actual_chain_label = reg
1
			.gather()
1
			.first()
1
			.unwrap()
1
			.get_metric()
1
			.first()
1
			.unwrap()
1
			.get_label()
1
			.into_iter()
1
			.find(|x| x.get_name() == "chain")
1
			.cloned();
1

            
1
		assert_eq!(actual_chain_label, expected_chain_label);
1
	}
	#[test]
1
	fn dalek_does_not_panic() {
1
		use futures::executor::block_on;
1
		use sc_block_builder::BlockBuilderBuilder;
1
		use sc_client_db::{Backend, BlocksPruning, DatabaseSettings, DatabaseSource, PruningMode};
1
		use sp_api::ProvideRuntimeApi;
1
		use sp_consensus::BlockOrigin;
1
		use substrate_test_runtime::TestAPI;
1
		use substrate_test_runtime_client::runtime::Block;
1
		use substrate_test_runtime_client::{
1
			ClientBlockImportExt, TestClientBuilder, TestClientBuilderExt,
1
		};
1

            
1
		fn zero_ed_pub() -> sp_core::ed25519::Public {
1
			sp_core::ed25519::Public::default()
1
		}
1

            
1
		// This is an invalid signature
1
		// this breaks after ed25519 1.3. It makes the signature panic at creation
1
		// This test ensures we should never panic
1
		fn invalid_sig() -> sp_core::ed25519::Signature {
1
			let signature = hex_literal::hex!(
1
				"a25b94f9c64270fdfffa673f11cfe961633e3e4972e6940a3cf
1
		7351dd90b71447041a83583a52cee1cf21b36ba7fd1d0211dca58b48d997fc78d9bc82ab7a38e"
1
			);
1
			sp_core::ed25519::Signature::from_raw(signature[0..64].try_into().unwrap())
1
		}
1

            
1
		let tmp = tempfile::tempdir().unwrap();
1
		let backend = Arc::new(
1
			Backend::new(
1
				DatabaseSettings {
1
					trie_cache_maximum_size: Some(1 << 20),
1
					state_pruning: Some(PruningMode::ArchiveAll),
1
					blocks_pruning: BlocksPruning::KeepAll,
1
					source: DatabaseSource::RocksDb {
1
						path: tmp.path().into(),
1
						cache_size: 1024,
1
					},
1
				},
1
				u64::MAX,
1
			)
1
			.unwrap(),
1
		);
1
		let client = TestClientBuilder::with_backend(backend).build();
1

            
1
		client
1
			.execution_extensions()
1
			.set_extensions_factory(sc_client_api::execution_extensions::ExtensionBeforeBlock::<
1
			Block,
1
			sp_io::UseDalekExt,
1
		>::new(1));
1

            
1
		let a1 = BlockBuilderBuilder::new(&client)
1
			.on_parent_block(client.chain_info().genesis_hash)
1
			.with_parent_block_number(0)
1
			// Enable proof recording if required. This call is optional.
1
			.enable_proof_recording()
1
			.build()
1
			.unwrap()
1
			.build()
1
			.unwrap()
1
			.block;
1

            
1
		block_on(client.import(BlockOrigin::NetworkInitialSync, a1.clone())).unwrap();
1

            
1
		// On block zero it will use dalek
1
		// shouldnt panic on importing invalid sig
1
		assert!(!client
1
			.runtime_api()
1
			.verify_ed25519(
1
				client.chain_info().genesis_hash,
1
				invalid_sig(),
1
				zero_ed_pub(),
1
				vec![]
1
			)
1
			.unwrap());
1
	}
3
	fn test_config(chain_id: &str) -> Configuration {
3
		let network_config = NetworkConfiguration::new("", "", Default::default(), None);
3
		let runtime = tokio::runtime::Runtime::new().expect("failed creating tokio runtime");
3
		let spec = ChainSpec::builder(&[0u8], Extensions::default())
3
			.with_name("test")
3
			.with_id(chain_id)
3
			.with_chain_type(ChainType::Local)
3
			.with_genesis_config(moonbase_runtime::genesis_config_preset::testnet_genesis(
3
				AccountId::from_str("6Be02d1d3665660d22FF9624b7BE0551ee1Ac91b").unwrap(),
3
				vec![],
3
				vec![],
3
				vec![],
3
				vec![],
3
				vec![],
3
				1000 * UNIT,
3
				ParaId::new(0),
3
				0,
3
			))
3
			.build();
3

            
3
		Configuration {
3
			impl_name: String::from("test-impl"),
3
			impl_version: String::from("0.1"),
3
			role: Role::Full,
3
			tokio_handle: runtime.handle().clone(),
3
			transaction_pool: Default::default(),
3
			network: network_config,
3
			keystore: KeystoreConfig::Path {
3
				path: "key".into(),
3
				password: None,
3
			},
3
			database: DatabaseSource::RocksDb {
3
				path: "db".into(),
3
				cache_size: 128,
3
			},
3
			trie_cache_maximum_size: Some(16777216),
3
			state_pruning: Default::default(),
3
			blocks_pruning: sc_service::BlocksPruning::KeepAll,
3
			chain_spec: Box::new(spec),
3
			executor: Default::default(),
3
			wasm_runtime_overrides: Default::default(),
3
			rpc: RpcConfiguration {
3
				addr: None,
3
				max_connections: Default::default(),
3
				cors: None,
3
				methods: Default::default(),
3
				max_request_size: Default::default(),
3
				max_response_size: Default::default(),
3
				id_provider: None,
3
				max_subs_per_conn: Default::default(),
3
				port: Default::default(),
3
				message_buffer_capacity: Default::default(),
3
				batch_config: BatchRequestConfig::Unlimited,
3
				rate_limit: Default::default(),
3
				rate_limit_whitelisted_ips: vec![],
3
				rate_limit_trust_proxy_headers: false,
3
			},
3
			data_path: Default::default(),
3
			prometheus_config: None,
3
			telemetry_endpoints: None,
3
			offchain_worker: Default::default(),
3
			force_authoring: false,
3
			disable_grandpa: false,
3
			dev_key_seed: None,
3
			tracing_targets: None,
3
			tracing_receiver: Default::default(),
3
			announce_block: true,
3
			base_path: BasePath::new(Path::new("")),
3
		}
3
	}
}
struct PendingConsensusDataProvider<Client>
where
	Client: HeaderBackend<Block> + sp_api::ProvideRuntimeApi<Block> + Send + Sync,
	Client::Api: VrfApi<Block>,
{
	client: Arc<Client>,
	keystore: Arc<dyn Keystore>,
}
impl<Client> PendingConsensusDataProvider<Client>
where
	Client: HeaderBackend<Block> + sp_api::ProvideRuntimeApi<Block> + Send + Sync,
	Client::Api: VrfApi<Block>,
{
1872
	pub fn new(client: Arc<Client>, keystore: Arc<dyn Keystore>) -> Self {
1872
		Self { client, keystore }
1872
	}
}
impl<Client> fc_rpc::pending::ConsensusDataProvider<Block> for PendingConsensusDataProvider<Client>
where
	Client: HeaderBackend<Block> + sp_api::ProvideRuntimeApi<Block> + Send + Sync,
	Client::Api: VrfApi<Block>,
{
8
	fn create_digest(
8
		&self,
8
		parent: &Header,
8
		_data: &sp_inherents::InherentData,
8
	) -> Result<sp_runtime::Digest, sp_inherents::Error> {
8
		let hash = parent.hash();
		// Get the digest from the best block header.
8
		let mut digest = self
8
			.client
8
			.header(hash)
8
			.map_err(|e| sp_inherents::Error::Application(Box::new(e)))?
8
			.ok_or(sp_inherents::Error::Application(
8
				"Best block header should be present".into(),
8
			))?
			.digest;
		// Get the nimbus id from the digest.
8
		let nimbus_id = digest
8
			.logs
8
			.iter()
8
			.find_map(|x| {
8
				if let DigestItem::PreRuntime(nimbus_primitives::NIMBUS_ENGINE_ID, nimbus_id) = x {
8
					Some(NimbusId::from_slice(nimbus_id.as_slice()).map_err(|_| {
						sp_inherents::Error::Application(
							"Nimbus pre-runtime digest should be valid".into(),
						)
8
					}))
				} else {
					None
				}
8
			})
8
			.ok_or(sp_inherents::Error::Application(
8
				"Nimbus pre-runtime digest should be present".into(),
8
			))??;
		// Remove the old VRF digest.
16
		let pos = digest.logs.iter().position(|x| {
8
			matches!(
16
				x,
				DigestItem::PreRuntime(session_keys_primitives::VRF_ENGINE_ID, _)
			)
16
		});
8
		if let Some(pos) = pos {
8
			digest.logs.remove(pos);
8
		}
		// Create the VRF digest.
8
		let vrf_digest = VrfDigestsProvider::new(self.client.clone(), self.keystore.clone())
8
			.provide_digests(nimbus_id, hash);
8
		// Append the VRF digest to the digest.
8
		digest.logs.extend(vrf_digest);
8
		Ok(digest)
8
	}
}