1
// Copyright 2019-2025 PureStake Inc.
2
// This file is part of Moonbeam.
3

            
4
// Moonbeam is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Moonbeam is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Moonbeam.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
//! This module assembles the Moonbeam service components, executes them, and manages communication
18
//! between them. This is the backbone of the client-side node implementation.
19
//!
20
//! This module can assemble:
21
//! PartialComponents: For maintenance tasks without a complete node (eg import/export blocks, purge)
22
//! Full Service: A complete parachain node including the pool, rpc, network, embedded relay chain
23
//! Dev Service: A leaner service without the relay chain backing.
24

            
25
pub mod rpc;
26

            
27
use cumulus_client_cli::CollatorOptions;
28
use cumulus_client_collator::service::CollatorService;
29
use cumulus_client_consensus_common::ParachainBlockImport as TParachainBlockImport;
30
use cumulus_client_consensus_proposer::Proposer;
31
use cumulus_client_parachain_inherent::{MockValidationDataInherentDataProvider, MockXcmConfig};
32
use cumulus_client_service::{
33
	prepare_node_config, start_relay_chain_tasks, CollatorSybilResistance, DARecoveryProfile,
34
	ParachainHostFunctions, StartRelayChainTasksParams,
35
};
36
use cumulus_primitives_core::{
37
	relay_chain::{self, well_known_keys, CollatorPair},
38
	CollectCollationInfo, ParaId,
39
};
40
use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain;
41
use cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface, RelayChainResult};
42
use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node_with_rpc;
43
use fc_consensus::FrontierBlockImport as TFrontierBlockImport;
44
use fc_db::DatabaseSource;
45
use fc_rpc::StorageOverrideHandler;
46
use fc_rpc_core::types::{FeeHistoryCache, FilterPool};
47
use futures::{FutureExt, StreamExt};
48
use maplit::hashmap;
49
#[cfg(feature = "moonbase-native")]
50
pub use moonbase_runtime;
51
use moonbeam_cli_opt::{
52
	AuthoringPolicy, EthApi as EthApiCmd, FrontierBackendConfig, NodeExtraArgs, RpcConfig,
53
};
54
#[cfg(feature = "moonbeam-native")]
55
pub use moonbeam_runtime;
56
use moonbeam_vrf::VrfDigestsProvider;
57
#[cfg(feature = "moonriver-native")]
58
pub use moonriver_runtime;
59
use nimbus_consensus::collators::slot_based::SlotBasedBlockImportHandle;
60
use nimbus_consensus::{
61
	collators::slot_based::SlotBasedBlockImport, NimbusManualSealConsensusDataProvider,
62
};
63
use nimbus_primitives::{DigestsProvider, NimbusId};
64
use polkadot_primitives::{AbridgedHostConfiguration, AsyncBackingParams, Slot, UpgradeGoAhead};
65
use sc_client_api::{
66
	backend::{AuxStore, Backend, StateBackend, StorageProvider},
67
	ExecutorProvider,
68
};
69
use sc_consensus::{BlockImport, ImportQueue};
70
use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
71
use sc_network::{config::FullNetworkConfiguration, NetworkBackend, NetworkBlock};
72
use sc_service::config::PrometheusConfig;
73
use sc_service::{
74
	error::Error as ServiceError, ChainSpec, Configuration, PartialComponents, TFullBackend,
75
	TFullClient, TaskManager,
76
};
77
use sc_telemetry::{Telemetry, TelemetryHandle, TelemetryWorker, TelemetryWorkerHandle};
78
use sc_transaction_pool_api::{OffchainTransactionPoolFactory, TransactionPool};
79
use session_keys_primitives::VrfApi;
80
use sp_api::{ConstructRuntimeApi, ProvideRuntimeApi};
81
use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
82
use sp_consensus::SyncOracle;
83
use sp_core::{ByteArray, Encode, H256};
84
use sp_keystore::{Keystore, KeystorePtr};
85
use std::str::FromStr;
86
use std::sync::atomic::{AtomicU64, Ordering};
87
use std::sync::Arc;
88
use std::{collections::BTreeMap, path::Path, sync::Mutex, time::Duration};
89
use substrate_prometheus_endpoint::Registry;
90

            
91
pub use client::*;
92
pub mod chain_spec;
93
mod client;
94
#[cfg(feature = "lazy-loading")]
95
pub mod lazy_loading;
96

            
97
type FullClient<RuntimeApi> = TFullClient<Block, RuntimeApi, WasmExecutor<HostFunctions>>;
98
type FullBackend = TFullBackend<Block>;
99

            
100
type MaybeSelectChain<Backend> = Option<sc_consensus::LongestChain<Backend, Block>>;
101
type FrontierBlockImport<Client> = TFrontierBlockImport<Block, Arc<Client>, Client>;
102
type PartialComponentsResult<Client, Backend> = Result<
103
	PartialComponents<
104
		Client,
105
		Backend,
106
		MaybeSelectChain<Backend>,
107
		sc_consensus::DefaultImportQueue<Block>,
108
		sc_transaction_pool::TransactionPoolHandle<Block, Client>,
109
		(
110
			MoonbeamBlockImport<Client>,
111
			Option<FilterPool>,
112
			Option<Telemetry>,
113
			Option<TelemetryWorkerHandle>,
114
			Arc<fc_db::Backend<Block, Client>>,
115
			FeeHistoryCache,
116
		),
117
	>,
118
	ServiceError,
119
>;
120

            
121
const RELAY_CHAIN_SLOT_DURATION_MILLIS: u64 = 6_000;
122

            
123
static TIMESTAMP: AtomicU64 = AtomicU64::new(0);
124

            
125
/// Provide a mock duration starting at 0 in millisecond for timestamp inherent.
126
/// Each call will increment timestamp by slot_duration making Aura think time has passed.
127
struct MockTimestampInherentDataProvider;
128

            
129
impl MockTimestampInherentDataProvider {
130
29090
	fn advance_timestamp(slot_duration: u64) {
131
29090
		if TIMESTAMP.load(Ordering::SeqCst) == 0 {
132
876
			// Initialize timestamp inherent provider
133
876
			TIMESTAMP.store(
134
876
				sp_timestamp::Timestamp::current().as_millis(),
135
876
				Ordering::SeqCst,
136
876
			);
137
28214
		} else {
138
28214
			TIMESTAMP.fetch_add(slot_duration, Ordering::SeqCst);
139
28214
		}
140
29090
	}
141
}
142

            
143
#[async_trait::async_trait]
144
impl sp_inherents::InherentDataProvider for MockTimestampInherentDataProvider {
145
	async fn provide_inherent_data(
146
		&self,
147
		inherent_data: &mut sp_inherents::InherentData,
148
58180
	) -> Result<(), sp_inherents::Error> {
149
29090
		inherent_data.put_data(
150
			sp_timestamp::INHERENT_IDENTIFIER,
151
29090
			&TIMESTAMP.load(Ordering::SeqCst),
152
		)
153
58180
	}
154

            
155
	async fn try_handle_error(
156
		&self,
157
		_identifier: &sp_inherents::InherentIdentifier,
158
		_error: &[u8],
159
	) -> Option<Result<(), sp_inherents::Error>> {
160
		// The pallet never reports error.
161
		None
162
	}
163
}
164

            
165
#[cfg(feature = "runtime-benchmarks")]
166
pub type HostFunctions = (
167
	frame_benchmarking::benchmarking::HostFunctions,
168
	ParachainHostFunctions,
169
	moonbeam_primitives_ext::moonbeam_ext::HostFunctions,
170
);
171
#[cfg(not(feature = "runtime-benchmarks"))]
172
pub type HostFunctions = (
173
	ParachainHostFunctions,
174
	moonbeam_primitives_ext::moonbeam_ext::HostFunctions,
175
);
176

            
177
pub enum MoonbeamBlockImport<Client> {
178
	/// Used in dev mode to import new blocks as best blocks.
179
	Dev(FrontierBlockImport<Client>),
180
	/// Used in parachain mode with lookahead authoring policy.
181
	ParachainLookahead(FrontierBlockImport<Client>),
182
	/// Used in parachain mode with slot-based authoring policy.
183
	ParachainSlotBased(
184
		SlotBasedBlockImport<Block, FrontierBlockImport<Client>, Client>,
185
		SlotBasedBlockImportHandle<Block>,
186
	),
187
}
188

            
189
impl<Client> MoonbeamBlockImport<Client>
190
where
191
	Client: Send + Sync + 'static,
192
	Client: ProvideRuntimeApi<Block>,
193
	Client::Api: sp_block_builder::BlockBuilder<Block> + fp_rpc::EthereumRuntimeRPCApi<Block>,
194
	FrontierBlockImport<Client>: BlockImport<Block>,
195
	SlotBasedBlockImport<Block, FrontierBlockImport<Client>, Client>: BlockImport<Block, Error = <FrontierBlockImport<Client> as BlockImport<Block>>::Error>
196
		+ 'static,
197
{
198
928
	fn new(
199
928
		frontier_block_import: FrontierBlockImport<Client>,
200
928
		client: Arc<Client>,
201
928
		authoring_policy: AuthoringPolicy,
202
928
		is_dev: bool,
203
928
	) -> Self {
204
928
		if is_dev {
205
928
			MoonbeamBlockImport::Dev(frontier_block_import)
206
		} else {
207
			match authoring_policy {
208
				AuthoringPolicy::Lookahead => {
209
					MoonbeamBlockImport::ParachainLookahead(frontier_block_import)
210
				}
211
				AuthoringPolicy::SlotBased => {
212
					let (block_import, block_import_auxiliary_data) =
213
						SlotBasedBlockImport::new(frontier_block_import, client);
214
					MoonbeamBlockImport::ParachainSlotBased(
215
						block_import,
216
						block_import_auxiliary_data,
217
					)
218
				}
219
			}
220
		}
221
928
	}
222
}
223

            
224
/// A trait that must be implemented by all moon* runtimes executors.
225
///
226
/// This feature allows, for instance, to customize the client extensions according to the type
227
/// of network.
228
/// For the moment, this feature is only used to specify the first block compatible with
229
/// ed25519-zebra, but it could be used for other things in the future.
230
pub trait ClientCustomizations {
231
	/// The host function ed25519_verify has changed its behavior in the substrate history,
232
	/// because of the change from lib ed25519-dalek to lib ed25519-zebra.
233
	/// Some networks may have old blocks that are not compatible with ed25519-zebra,
234
	/// for these networks this function should return the 1st block compatible with the new lib.
235
	/// If this function returns None (default behavior), it implies that all blocks are compatible
236
	/// with the new lib (ed25519-zebra).
237
	fn first_block_number_compatible_with_ed25519_zebra() -> Option<u32> {
238
		None
239
	}
240
}
241

            
242
#[cfg(feature = "moonbeam-native")]
243
pub struct MoonbeamCustomizations;
244
#[cfg(feature = "moonbeam-native")]
245
impl ClientCustomizations for MoonbeamCustomizations {
246
	fn first_block_number_compatible_with_ed25519_zebra() -> Option<u32> {
247
		Some(2_000_000)
248
	}
249
}
250

            
251
#[cfg(feature = "moonriver-native")]
252
pub struct MoonriverCustomizations;
253
#[cfg(feature = "moonriver-native")]
254
impl ClientCustomizations for MoonriverCustomizations {
255
	fn first_block_number_compatible_with_ed25519_zebra() -> Option<u32> {
256
		Some(3_000_000)
257
	}
258
}
259

            
260
#[cfg(feature = "moonbase-native")]
261
pub struct MoonbaseCustomizations;
262
#[cfg(feature = "moonbase-native")]
263
impl ClientCustomizations for MoonbaseCustomizations {
264
928
	fn first_block_number_compatible_with_ed25519_zebra() -> Option<u32> {
265
928
		Some(3_000_000)
266
928
	}
267
}
268

            
269
/// Trivial enum representing runtime variant
270
#[derive(Clone)]
271
pub enum RuntimeVariant {
272
	#[cfg(feature = "moonbeam-native")]
273
	Moonbeam,
274
	#[cfg(feature = "moonriver-native")]
275
	Moonriver,
276
	#[cfg(feature = "moonbase-native")]
277
	Moonbase,
278
	Unrecognized,
279
}
280

            
281
impl RuntimeVariant {
282
	pub fn from_chain_spec(chain_spec: &Box<dyn ChainSpec>) -> Self {
283
		match chain_spec {
284
			#[cfg(feature = "moonbeam-native")]
285
			spec if spec.is_moonbeam() => Self::Moonbeam,
286
			#[cfg(feature = "moonriver-native")]
287
			spec if spec.is_moonriver() => Self::Moonriver,
288
			#[cfg(feature = "moonbase-native")]
289
			spec if spec.is_moonbase() => Self::Moonbase,
290
			_ => Self::Unrecognized,
291
		}
292
	}
293
}
294

            
295
/// Can be called for a `Configuration` to check if it is a configuration for
296
/// the `Moonbeam` network.
297
pub trait IdentifyVariant {
298
	/// Returns `true` if this is a configuration for the `Moonbase` network.
299
	fn is_moonbase(&self) -> bool;
300

            
301
	/// Returns `true` if this is a configuration for the `Moonbeam` network.
302
	fn is_moonbeam(&self) -> bool;
303

            
304
	/// Returns `true` if this is a configuration for the `Moonriver` network.
305
	fn is_moonriver(&self) -> bool;
306

            
307
	/// Returns `true` if this is a configuration for a dev network.
308
	fn is_dev(&self) -> bool;
309
}
310

            
311
impl IdentifyVariant for Box<dyn ChainSpec> {
312
	fn is_moonbase(&self) -> bool {
313
		self.id().starts_with("moonbase")
314
	}
315

            
316
928
	fn is_moonbeam(&self) -> bool {
317
928
		self.id().starts_with("moonbeam")
318
928
	}
319

            
320
928
	fn is_moonriver(&self) -> bool {
321
928
		self.id().starts_with("moonriver")
322
928
	}
323

            
324
2782
	fn is_dev(&self) -> bool {
325
2782
		self.chain_type() == sc_chain_spec::ChainType::Development
326
2782
	}
327
}
328

            
329
928
pub fn frontier_database_dir(config: &Configuration, path: &str) -> std::path::PathBuf {
330
928
	config
331
928
		.base_path
332
928
		.config_dir(config.chain_spec.id())
333
928
		.join("frontier")
334
928
		.join(path)
335
928
}
336

            
337
// TODO This is copied from frontier. It should be imported instead after
338
// https://github.com/paritytech/frontier/issues/333 is solved
339
928
pub fn open_frontier_backend<C, BE>(
340
928
	client: Arc<C>,
341
928
	config: &Configuration,
342
928
	rpc_config: &RpcConfig,
343
928
) -> Result<fc_db::Backend<Block, C>, String>
344
928
where
345
928
	C: ProvideRuntimeApi<Block> + StorageProvider<Block, BE> + AuxStore,
346
928
	C: HeaderBackend<Block> + HeaderMetadata<Block, Error = BlockChainError>,
347
928
	C: Send + Sync + 'static,
348
928
	C::Api: fp_rpc::EthereumRuntimeRPCApi<Block>,
349
928
	BE: Backend<Block> + 'static,
350
928
	BE::State: StateBackend<BlakeTwo256>,
351
{
352
928
	let frontier_backend = match rpc_config.frontier_backend_config {
353
		FrontierBackendConfig::KeyValue => {
354
928
			fc_db::Backend::KeyValue(Arc::new(fc_db::kv::Backend::<Block, C>::new(
355
928
				client,
356
				&fc_db::kv::DatabaseSettings {
357
928
					source: match config.database {
358
928
						DatabaseSource::RocksDb { .. } => DatabaseSource::RocksDb {
359
928
							path: frontier_database_dir(config, "db"),
360
928
							cache_size: 0,
361
928
						},
362
						DatabaseSource::ParityDb { .. } => DatabaseSource::ParityDb {
363
							path: frontier_database_dir(config, "paritydb"),
364
						},
365
						DatabaseSource::Auto { .. } => DatabaseSource::Auto {
366
							rocksdb_path: frontier_database_dir(config, "db"),
367
							paritydb_path: frontier_database_dir(config, "paritydb"),
368
							cache_size: 0,
369
						},
370
						_ => {
371
							return Err(
372
								"Supported db sources: `rocksdb` | `paritydb` | `auto`".to_string()
373
							)
374
						}
375
					},
376
				},
377
			)?))
378
		}
379
		FrontierBackendConfig::Sql {
380
			pool_size,
381
			num_ops_timeout,
382
			thread_count,
383
			cache_size,
384
		} => {
385
			let overrides = Arc::new(StorageOverrideHandler::new(client.clone()));
386
			let sqlite_db_path = frontier_database_dir(config, "sql");
387
			std::fs::create_dir_all(&sqlite_db_path).expect("failed creating sql db directory");
388
			let backend = futures::executor::block_on(fc_db::sql::Backend::new(
389
				fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig {
390
					path: Path::new("sqlite:///")
391
						.join(sqlite_db_path)
392
						.join("frontier.db3")
393
						.to_str()
394
						.expect("frontier sql path error"),
395
					create_if_missing: true,
396
					thread_count: thread_count,
397
					cache_size: cache_size,
398
				}),
399
				pool_size,
400
				std::num::NonZeroU32::new(num_ops_timeout),
401
				overrides.clone(),
402
			))
403
			.unwrap_or_else(|err| panic!("failed creating sql backend: {:?}", err));
404
			fc_db::Backend::Sql(Arc::new(backend))
405
		}
406
	};
407

            
408
928
	Ok(frontier_backend)
409
928
}
410

            
411
use sp_runtime::{traits::BlakeTwo256, DigestItem, Percent};
412

            
413
pub const SOFT_DEADLINE_PERCENT: Percent = Percent::from_percent(100);
414

            
415
/// Builds a new object suitable for chain operations.
416
#[allow(clippy::type_complexity)]
417
pub fn new_chain_ops(
418
	config: &mut Configuration,
419
	rpc_config: &RpcConfig,
420
	node_extra_args: NodeExtraArgs,
421
) -> Result<
422
	(
423
		Arc<Client>,
424
		Arc<FullBackend>,
425
		sc_consensus::BasicQueue<Block>,
426
		TaskManager,
427
	),
428
	ServiceError,
429
> {
430
	match &config.chain_spec {
431
		#[cfg(feature = "moonriver-native")]
432
		spec if spec.is_moonriver() => new_chain_ops_inner::<
433
			moonriver_runtime::RuntimeApi,
434
			MoonriverCustomizations,
435
		>(config, rpc_config, node_extra_args),
436
		#[cfg(feature = "moonbeam-native")]
437
		spec if spec.is_moonbeam() => new_chain_ops_inner::<
438
			moonbeam_runtime::RuntimeApi,
439
			MoonbeamCustomizations,
440
		>(config, rpc_config, node_extra_args),
441
		#[cfg(feature = "moonbase-native")]
442
		_ => new_chain_ops_inner::<moonbase_runtime::RuntimeApi, MoonbaseCustomizations>(
443
			config,
444
			rpc_config,
445
			node_extra_args,
446
		),
447
		#[cfg(not(feature = "moonbase-native"))]
448
		_ => panic!("invalid chain spec"),
449
	}
450
}
451

            
452
#[allow(clippy::type_complexity)]
453
fn new_chain_ops_inner<RuntimeApi, Customizations>(
454
	config: &mut Configuration,
455
	rpc_config: &RpcConfig,
456
	node_extra_args: NodeExtraArgs,
457
) -> Result<
458
	(
459
		Arc<Client>,
460
		Arc<FullBackend>,
461
		sc_consensus::BasicQueue<Block>,
462
		TaskManager,
463
	),
464
	ServiceError,
465
>
466
where
467
	Client: From<Arc<crate::FullClient<RuntimeApi>>>,
468
	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
469
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
470
	Customizations: ClientCustomizations + 'static,
471
{
472
	config.keystore = sc_service::config::KeystoreConfig::InMemory;
473
	let PartialComponents {
474
		client,
475
		backend,
476
		import_queue,
477
		task_manager,
478
		..
479
	} = new_partial::<RuntimeApi, Customizations>(config, rpc_config, node_extra_args)?;
480
	Ok((
481
		Arc::new(Client::from(client)),
482
		backend,
483
		import_queue,
484
		task_manager,
485
	))
486
}
487

            
488
// If we're using prometheus, use a registry with a prefix of `moonbeam`.
489
928
fn set_prometheus_registry(
490
928
	config: &mut Configuration,
491
928
	skip_prefix: bool,
492
928
) -> Result<(), ServiceError> {
493
928
	if let Some(PrometheusConfig { registry, .. }) = config.prometheus_config.as_mut() {
494
		let labels = hashmap! {
495
			"chain".into() => config.chain_spec.id().into(),
496
		};
497
		let prefix = if skip_prefix {
498
			None
499
		} else {
500
			Some("moonbeam".into())
501
		};
502

            
503
		*registry = Registry::new_custom(prefix, Some(labels))?;
504
928
	}
505

            
506
928
	Ok(())
507
928
}
508

            
509
/// Builds the PartialComponents for a parachain or development service
510
///
511
/// Use this function if you don't actually need the full service, but just the partial in order to
512
/// be able to perform chain operations.
513
#[allow(clippy::type_complexity)]
514
928
pub fn new_partial<RuntimeApi, Customizations>(
515
928
	config: &mut Configuration,
516
928
	rpc_config: &RpcConfig,
517
928
	node_extra_args: NodeExtraArgs,
518
928
) -> PartialComponentsResult<FullClient<RuntimeApi>, FullBackend>
519
928
where
520
928
	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
521
928
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
522
928
	Customizations: ClientCustomizations + 'static,
523
{
524
928
	set_prometheus_registry(config, rpc_config.no_prometheus_prefix)?;
525

            
526
	// Use ethereum style for subscription ids
527
928
	config.rpc.id_provider = Some(Box::new(fc_rpc::EthereumSubIdProvider));
528

            
529
928
	let telemetry = config
530
928
		.telemetry_endpoints
531
928
		.clone()
532
928
		.filter(|x| !x.is_empty())
533
928
		.map(|endpoints| -> Result<_, sc_telemetry::Error> {
534
			let worker = TelemetryWorker::new(16)?;
535
			let telemetry = worker.handle().new_telemetry(endpoints);
536
			Ok((worker, telemetry))
537
		})
538
928
		.transpose()?;
539

            
540
928
	let heap_pages = config
541
928
		.executor
542
928
		.default_heap_pages
543
928
		.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
544
			extra_pages: h as _,
545
		});
546
928
	let mut wasm_builder = WasmExecutor::builder()
547
928
		.with_execution_method(config.executor.wasm_method)
548
928
		.with_onchain_heap_alloc_strategy(heap_pages)
549
928
		.with_offchain_heap_alloc_strategy(heap_pages)
550
928
		.with_ignore_onchain_heap_pages(true)
551
928
		.with_max_runtime_instances(config.executor.max_runtime_instances)
552
928
		.with_runtime_cache_size(config.executor.runtime_cache_size);
553

            
554
928
	if let Some(ref wasmtime_precompiled_path) = config.executor.wasmtime_precompiled {
555
926
		wasm_builder = wasm_builder.with_wasmtime_precompiled_path(wasmtime_precompiled_path);
556
926
	}
557

            
558
928
	let executor = wasm_builder.build();
559

            
560
928
	let (client, backend, keystore_container, task_manager) =
561
928
		sc_service::new_full_parts_record_import::<Block, RuntimeApi, _>(
562
928
			config,
563
928
			telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
564
928
			executor,
565
			true,
566
		)?;
567

            
568
928
	if let Some(block_number) = Customizations::first_block_number_compatible_with_ed25519_zebra() {
569
928
		client
570
928
			.execution_extensions()
571
928
			.set_extensions_factory(sc_client_api::execution_extensions::ExtensionBeforeBlock::<
572
928
			Block,
573
928
			sp_io::UseDalekExt,
574
928
		>::new(block_number));
575
928
	}
576

            
577
928
	let client = Arc::new(client);
578

            
579
928
	let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
580

            
581
928
	let telemetry = telemetry.map(|(worker, telemetry)| {
582
		task_manager
583
			.spawn_handle()
584
			.spawn("telemetry", None, worker.run());
585
		telemetry
586
	});
587

            
588
928
	let maybe_select_chain = if config.chain_spec.is_dev() {
589
928
		Some(sc_consensus::LongestChain::new(backend.clone()))
590
	} else {
591
		None
592
	};
593

            
594
928
	let transaction_pool = sc_transaction_pool::Builder::new(
595
928
		task_manager.spawn_essential_handle(),
596
928
		client.clone(),
597
928
		config.role.is_authority().into(),
598
	)
599
928
	.with_options(config.transaction_pool.clone())
600
928
	.with_prometheus(config.prometheus_registry())
601
928
	.build();
602

            
603
928
	let filter_pool: Option<FilterPool> = Some(Arc::new(Mutex::new(BTreeMap::new())));
604
928
	let fee_history_cache: FeeHistoryCache = Arc::new(Mutex::new(BTreeMap::new()));
605

            
606
928
	let frontier_backend = Arc::new(open_frontier_backend(client.clone(), config, rpc_config)?);
607
928
	let frontier_block_import = FrontierBlockImport::new(client.clone(), client.clone());
608
928
	let block_import = MoonbeamBlockImport::new(
609
928
		frontier_block_import.clone(),
610
928
		client.clone(),
611
928
		node_extra_args.authoring_policy,
612
928
		config.chain_spec.is_dev(),
613
	);
614

            
615
928
	let create_inherent_data_providers = move |_, _| async move {
616
		let time = sp_timestamp::InherentDataProvider::from_system_time();
617
		Ok((time,))
618
	};
619

            
620
928
	let import_queue = {
621
928
		let block_import_for_queue: Box<
622
928
			dyn sc_consensus::BlockImport<Block, Error = sp_consensus::Error> + Send + Sync,
623
928
		> = match &block_import {
624
928
			MoonbeamBlockImport::Dev(bi) => Box::new(bi.clone()),
625
			MoonbeamBlockImport::ParachainLookahead(bi) => {
626
				if node_extra_args.legacy_block_import_strategy {
627
					Box::new(TParachainBlockImport::new_with_delayed_best_block(
628
						bi.clone(),
629
						backend.clone(),
630
					))
631
				} else {
632
					Box::new(TParachainBlockImport::new(bi.clone(), backend.clone()))
633
				}
634
			}
635
			MoonbeamBlockImport::ParachainSlotBased(bi, _handle) => {
636
				if node_extra_args.legacy_block_import_strategy {
637
					Box::new(TParachainBlockImport::new_with_delayed_best_block(
638
						bi.clone(),
639
						backend.clone(),
640
					))
641
				} else {
642
					Box::new(TParachainBlockImport::new(bi.clone(), backend.clone()))
643
				}
644
			}
645
		};
646

            
647
928
		nimbus_consensus::import_queue(
648
928
			client.clone(),
649
928
			block_import_for_queue,
650
928
			create_inherent_data_providers,
651
928
			&task_manager.spawn_essential_handle(),
652
928
			config.prometheus_registry(),
653
928
			node_extra_args.legacy_block_import_strategy,
654
			false,
655
		)?
656
	};
657

            
658
928
	Ok(PartialComponents {
659
928
		backend,
660
928
		client,
661
928
		import_queue,
662
928
		keystore_container,
663
928
		task_manager,
664
928
		transaction_pool: transaction_pool.into(),
665
928
		select_chain: maybe_select_chain,
666
928
		other: (
667
928
			block_import,
668
928
			filter_pool,
669
928
			telemetry,
670
928
			telemetry_worker_handle,
671
928
			frontier_backend,
672
928
			fee_history_cache,
673
928
		),
674
928
	})
675
928
}
676

            
677
async fn build_relay_chain_interface(
678
	polkadot_config: Configuration,
679
	parachain_config: &Configuration,
680
	telemetry_worker_handle: Option<TelemetryWorkerHandle>,
681
	task_manager: &mut TaskManager,
682
	collator_options: CollatorOptions,
683
	hwbench: Option<sc_sysinfo::HwBench>,
684
) -> RelayChainResult<(
685
	Arc<(dyn RelayChainInterface + 'static)>,
686
	Option<CollatorPair>,
687
)> {
688
	let result = if let cumulus_client_cli::RelayChainMode::ExternalRpc(rpc_target_urls) =
689
		collator_options.relay_chain_mode
690
	{
691
		build_minimal_relay_chain_node_with_rpc(
692
			polkadot_config,
693
			parachain_config.prometheus_registry(),
694
			task_manager,
695
			rpc_target_urls,
696
		)
697
		.await
698
	} else {
699
		build_inprocess_relay_chain(
700
			polkadot_config,
701
			parachain_config,
702
			telemetry_worker_handle,
703
			task_manager,
704
			hwbench,
705
		)
706
	};
707

            
708
	// Extract only the first two elements from the 4-tuple
709
	result
710
		.map(|(relay_chain_interface, collator_pair, _, _)| (relay_chain_interface, collator_pair))
711
}
712

            
713
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
714
///
715
/// This is the actual implementation that is abstract over the executor and the runtime api.
716
#[sc_tracing::logging::prefix_logs_with("🌗")]
717
async fn start_node_impl<RuntimeApi, Customizations, Net>(
718
	parachain_config: Configuration,
719
	polkadot_config: Configuration,
720
	collator_options: CollatorOptions,
721
	para_id: ParaId,
722
	rpc_config: RpcConfig,
723
	block_authoring_duration: Duration,
724
	hwbench: Option<sc_sysinfo::HwBench>,
725
	node_extra_args: NodeExtraArgs,
726
) -> sc_service::error::Result<(TaskManager, Arc<FullClient<RuntimeApi>>)>
727
where
728
	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
729
	RuntimeApi::RuntimeApi: RuntimeApiCollection
730
		+ cumulus_primitives_core::GetCoreSelectorApi<Block>
731
		+ cumulus_primitives_core::RelayParentOffsetApi<Block>,
732
	Customizations: ClientCustomizations + 'static,
733
	Net: NetworkBackend<Block, Hash>,
734
{
735
	let mut parachain_config = prepare_node_config(parachain_config);
736

            
737
	let params = new_partial::<RuntimeApi, Customizations>(
738
		&mut parachain_config,
739
		&rpc_config,
740
		node_extra_args.clone(),
741
	)?;
742
	let (
743
		block_import,
744
		filter_pool,
745
		mut telemetry,
746
		telemetry_worker_handle,
747
		frontier_backend,
748
		fee_history_cache,
749
	) = params.other;
750

            
751
	let client = params.client.clone();
752
	let backend = params.backend.clone();
753
	let mut task_manager = params.task_manager;
754

            
755
	let (relay_chain_interface, collator_key) = build_relay_chain_interface(
756
		polkadot_config,
757
		&parachain_config,
758
		telemetry_worker_handle,
759
		&mut task_manager,
760
		collator_options.clone(),
761
		hwbench.clone(),
762
	)
763
	.await
764
	.map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
765

            
766
	let force_authoring = parachain_config.force_authoring;
767
	let collator = parachain_config.role.is_authority();
768
	let prometheus_registry = parachain_config.prometheus_registry().cloned();
769
	let transaction_pool = params.transaction_pool.clone();
770
	let import_queue_service = params.import_queue.service();
771
	let net_config = FullNetworkConfiguration::<_, _, Net>::new(
772
		&parachain_config.network,
773
		prometheus_registry.clone(),
774
	);
775

            
776
	let (network, system_rpc_tx, tx_handler_controller, sync_service) =
777
		cumulus_client_service::build_network(cumulus_client_service::BuildNetworkParams {
778
			parachain_config: &parachain_config,
779
			client: client.clone(),
780
			transaction_pool: transaction_pool.clone(),
781
			spawn_handle: task_manager.spawn_handle(),
782
			import_queue: params.import_queue,
783
			para_id: para_id.clone(),
784
			relay_chain_interface: relay_chain_interface.clone(),
785
			net_config,
786
			sybil_resistance_level: CollatorSybilResistance::Resistant,
787
			metrics: Net::register_notification_metrics(
788
				parachain_config
789
					.prometheus_config
790
					.as_ref()
791
					.map(|config| &config.registry),
792
			),
793
		})
794
		.await?;
795

            
796
	let overrides = Arc::new(StorageOverrideHandler::new(client.clone()));
797
	let fee_history_limit = rpc_config.fee_history_limit;
798

            
799
	// Sinks for pubsub notifications.
800
	// Everytime a new subscription is created, a new mpsc channel is added to the sink pool.
801
	// The MappingSyncWorker sends through the channel on block import and the subscription emits a
802
	// notification to the subscriber on receiving a message through this channel.
803
	// This way we avoid race conditions when using native substrate block import notification
804
	// stream.
805
	let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
806
		fc_mapping_sync::EthereumBlockNotification<Block>,
807
	> = Default::default();
808
	let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
809

            
810
	rpc::spawn_essential_tasks(
811
		rpc::SpawnTasksParams {
812
			task_manager: &task_manager,
813
			client: client.clone(),
814
			substrate_backend: backend.clone(),
815
			frontier_backend: frontier_backend.clone(),
816
			filter_pool: filter_pool.clone(),
817
			overrides: overrides.clone(),
818
			fee_history_limit,
819
			fee_history_cache: fee_history_cache.clone(),
820
		},
821
		sync_service.clone(),
822
		pubsub_notification_sinks.clone(),
823
	);
824

            
825
	let ethapi_cmd = rpc_config.ethapi.clone();
826
	let tracing_requesters =
827
		if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
828
			rpc::tracing::spawn_tracing_tasks(
829
				&rpc_config,
830
				prometheus_registry.clone(),
831
				rpc::SpawnTasksParams {
832
					task_manager: &task_manager,
833
					client: client.clone(),
834
					substrate_backend: backend.clone(),
835
					frontier_backend: frontier_backend.clone(),
836
					filter_pool: filter_pool.clone(),
837
					overrides: overrides.clone(),
838
					fee_history_limit,
839
					fee_history_cache: fee_history_cache.clone(),
840
				},
841
			)
842
		} else {
843
			rpc::tracing::RpcRequesters {
844
				debug: None,
845
				trace: None,
846
			}
847
		};
848

            
849
	let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
850
		task_manager.spawn_handle(),
851
		overrides.clone(),
852
		rpc_config.eth_log_block_cache,
853
		rpc_config.eth_statuses_cache,
854
		prometheus_registry.clone(),
855
	));
856

            
857
	let rpc_builder = {
858
		let client = client.clone();
859
		let pool = transaction_pool.clone();
860
		let network = network.clone();
861
		let sync = sync_service.clone();
862
		let filter_pool = filter_pool.clone();
863
		let frontier_backend = frontier_backend.clone();
864
		let backend = backend.clone();
865
		let ethapi_cmd = ethapi_cmd.clone();
866
		let max_past_logs = rpc_config.max_past_logs;
867
		let max_block_range = rpc_config.max_block_range;
868
		let overrides = overrides.clone();
869
		let fee_history_cache = fee_history_cache.clone();
870
		let block_data_cache = block_data_cache.clone();
871
		let pubsub_notification_sinks = pubsub_notification_sinks.clone();
872

            
873
		let keystore = params.keystore_container.keystore();
874
		move |subscription_task_executor| {
875
			#[cfg(feature = "moonbase-native")]
876
			let forced_parent_hashes = {
877
				let mut forced_parent_hashes = BTreeMap::new();
878
				// Fixes for https://github.com/paritytech/frontier/pull/570
879
				// #1648995
880
				forced_parent_hashes.insert(
881
					H256::from_str(
882
						"0xa352fee3eef9c554a31ec0612af887796a920613358abf3353727760ea14207b",
883
					)
884
					.expect("must be valid hash"),
885
					H256::from_str(
886
						"0x0d0fd88778aec08b3a83ce36387dbf130f6f304fc91e9a44c9605eaf8a80ce5d",
887
					)
888
					.expect("must be valid hash"),
889
				);
890
				Some(forced_parent_hashes)
891
			};
892
			#[cfg(not(feature = "moonbase-native"))]
893
			let forced_parent_hashes = None;
894

            
895
			let deps = rpc::FullDeps {
896
				backend: backend.clone(),
897
				client: client.clone(),
898
				command_sink: None,
899
				ethapi_cmd: ethapi_cmd.clone(),
900
				filter_pool: filter_pool.clone(),
901
				frontier_backend: match &*frontier_backend {
902
					fc_db::Backend::KeyValue(b) => b.clone(),
903
					fc_db::Backend::Sql(b) => b.clone(),
904
				},
905
				graph: pool.clone(),
906
				pool: pool.clone(),
907
				is_authority: collator,
908
				max_past_logs,
909
				max_block_range,
910
				fee_history_limit,
911
				fee_history_cache: fee_history_cache.clone(),
912
				network: network.clone(),
913
				sync: sync.clone(),
914
				dev_rpc_data: None,
915
				block_data_cache: block_data_cache.clone(),
916
				overrides: overrides.clone(),
917
				forced_parent_hashes,
918
			};
919
			let pending_consensus_data_provider = Box::new(PendingConsensusDataProvider::new(
920
				client.clone(),
921
				keystore.clone(),
922
			));
923
			if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
924
				rpc::create_full(
925
					deps,
926
					subscription_task_executor,
927
					Some(crate::rpc::TracingConfig {
928
						tracing_requesters: tracing_requesters.clone(),
929
						trace_filter_max_count: rpc_config.ethapi_trace_max_count,
930
					}),
931
					pubsub_notification_sinks.clone(),
932
					pending_consensus_data_provider,
933
					para_id,
934
				)
935
				.map_err(Into::into)
936
			} else {
937
				rpc::create_full(
938
					deps,
939
					subscription_task_executor,
940
					None,
941
					pubsub_notification_sinks.clone(),
942
					pending_consensus_data_provider,
943
					para_id,
944
				)
945
				.map_err(Into::into)
946
			}
947
		}
948
	};
949

            
950
	sc_service::spawn_tasks(sc_service::SpawnTasksParams {
951
		rpc_builder: Box::new(rpc_builder),
952
		client: client.clone(),
953
		transaction_pool: transaction_pool.clone(),
954
		task_manager: &mut task_manager,
955
		config: parachain_config,
956
		keystore: params.keystore_container.keystore(),
957
		backend: backend.clone(),
958
		network: network.clone(),
959
		sync_service: sync_service.clone(),
960
		system_rpc_tx,
961
		tx_handler_controller,
962
		telemetry: telemetry.as_mut(),
963
	})?;
964

            
965
	if let Some(hwbench) = hwbench {
966
		sc_sysinfo::print_hwbench(&hwbench);
967

            
968
		if let Some(ref mut telemetry) = telemetry {
969
			let telemetry_handle = telemetry.handle();
970
			task_manager.spawn_handle().spawn(
971
				"telemetry_hwbench",
972
				None,
973
				sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
974
			);
975
		}
976
	}
977

            
978
	let announce_block = {
979
		let sync_service = sync_service.clone();
980
		Arc::new(move |hash, data| sync_service.announce_block(hash, data))
981
	};
982

            
983
	let relay_chain_slot_duration = Duration::from_secs(6);
984
	let overseer_handle = relay_chain_interface
985
		.overseer_handle()
986
		.map_err(|e| sc_service::Error::Application(Box::new(e)))?;
987

            
988
	start_relay_chain_tasks(StartRelayChainTasksParams {
989
		client: client.clone(),
990
		announce_block: announce_block.clone(),
991
		para_id,
992
		relay_chain_interface: relay_chain_interface.clone(),
993
		task_manager: &mut task_manager,
994
		da_recovery_profile: if collator {
995
			DARecoveryProfile::Collator
996
		} else {
997
			DARecoveryProfile::FullNode
998
		},
999
		import_queue: import_queue_service,
		relay_chain_slot_duration,
		recovery_handle: Box::new(overseer_handle.clone()),
		sync_service: sync_service.clone(),
		prometheus_registry: prometheus_registry.as_ref(),
	})?;
	if matches!(block_import, MoonbeamBlockImport::Dev(_)) {
		return Err(sc_service::Error::Other(
			"Block import pipeline is not for parachain".into(),
		));
	}
	if collator {
		start_consensus::<RuntimeApi, _>(
			backend.clone(),
			client.clone(),
			block_import,
			prometheus_registry.as_ref(),
			telemetry.as_ref().map(|t| t.handle()),
			&task_manager,
			relay_chain_interface.clone(),
			transaction_pool,
			params.keystore_container.keystore(),
			para_id,
			collator_key.expect("Command line arguments do not allow this. qed"),
			overseer_handle,
			announce_block,
			force_authoring,
			relay_chain_slot_duration,
			block_authoring_duration,
			sync_service.clone(),
			node_extra_args,
		)?;
	}
	Ok((task_manager, client))
}
fn start_consensus<RuntimeApi, SO>(
	backend: Arc<FullBackend>,
	client: Arc<FullClient<RuntimeApi>>,
	block_import: MoonbeamBlockImport<FullClient<RuntimeApi>>,
	prometheus_registry: Option<&Registry>,
	telemetry: Option<TelemetryHandle>,
	task_manager: &TaskManager,
	relay_chain_interface: Arc<dyn RelayChainInterface>,
	transaction_pool: Arc<
		sc_transaction_pool::TransactionPoolHandle<Block, FullClient<RuntimeApi>>,
	>,
	keystore: KeystorePtr,
	para_id: ParaId,
	collator_key: CollatorPair,
	overseer_handle: OverseerHandle,
	announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
	force_authoring: bool,
	relay_chain_slot_duration: Duration,
	block_authoring_duration: Duration,
	sync_oracle: SO,
	node_extra_args: NodeExtraArgs,
) -> Result<(), sc_service::Error>
where
	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
	RuntimeApi::RuntimeApi: RuntimeApiCollection
		+ cumulus_primitives_core::GetCoreSelectorApi<Block>
		+ cumulus_primitives_core::RelayParentOffsetApi<Block>,
	sc_client_api::StateBackendFor<FullBackend, Block>: sc_client_api::StateBackend<BlakeTwo256>,
	SO: SyncOracle + Send + Sync + Clone + 'static,
{
	let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording(
		task_manager.spawn_handle(),
		client.clone(),
		transaction_pool,
		prometheus_registry,
		telemetry.clone(),
	);
	let proposer = Proposer::new(proposer_factory);
	let collator_service = CollatorService::new(
		client.clone(),
		Arc::new(task_manager.spawn_handle()),
		announce_block,
		client.clone(),
	);
	fn create_inherent_data_providers<A, B>(
		_: A,
		_: B,
	) -> impl futures::Future<
		Output = Result<
			(
				nimbus_primitives::InherentDataProvider,
				session_keys_primitives::InherentDataProvider,
			),
			Box<dyn std::error::Error + Send + Sync>,
		>,
	> {
		async move {
			let author = nimbus_primitives::InherentDataProvider;
			let randomness = session_keys_primitives::InherentDataProvider;
			Ok((author, randomness))
		}
	}
	let client_clone = client.clone();
	let keystore_clone = keystore.clone();
	let maybe_provide_vrf_digest =
		move |nimbus_id: NimbusId, parent: Hash| -> Option<sp_runtime::generic::DigestItem> {
			moonbeam_vrf::vrf_pre_digest::<Block, FullClient<RuntimeApi>>(
				&client_clone,
				&keystore_clone,
				nimbus_id,
				parent,
			)
		};
	log::info!("Collator started with asynchronous backing.");
	let client_clone = client.clone();
	let code_hash_provider = move |block_hash| {
		client_clone
			.code_at(block_hash)
			.ok()
			.map(polkadot_primitives::ValidationCode)
			.map(|c| c.hash())
	};
	match block_import {
		MoonbeamBlockImport::ParachainLookahead(bi) => {
			let block_import = if node_extra_args.legacy_block_import_strategy {
				TParachainBlockImport::new_with_delayed_best_block(bi, backend.clone())
			} else {
				TParachainBlockImport::new(bi, backend.clone())
			};
			let params = nimbus_consensus::collators::lookahead::Params {
				additional_digests_provider: maybe_provide_vrf_digest,
				additional_relay_keys: vec![relay_chain::well_known_keys::EPOCH_INDEX.to_vec()],
				authoring_duration: block_authoring_duration,
				block_import,
				code_hash_provider,
				collator_key,
				collator_service,
				create_inherent_data_providers,
				force_authoring,
				keystore,
				overseer_handle,
				para_backend: backend,
				para_client: client,
				para_id,
				proposer,
				relay_chain_slot_duration,
				relay_client: relay_chain_interface,
				slot_duration: None,
				sync_oracle,
				reinitialize: false,
				max_pov_percentage: node_extra_args.max_pov_percentage,
			};
			task_manager.spawn_essential_handle().spawn(
				"nimbus",
				None,
				nimbus_consensus::collators::lookahead::run::<
					Block,
					_,
					_,
					_,
					FullBackend,
					_,
					_,
					_,
					_,
					_,
					_,
				>(params),
			);
		}
		MoonbeamBlockImport::ParachainSlotBased(bi, handle) => {
			let block_import = if node_extra_args.legacy_block_import_strategy {
				TParachainBlockImport::new_with_delayed_best_block(bi, backend.clone())
			} else {
				TParachainBlockImport::new(bi, backend.clone())
			};
			nimbus_consensus::collators::slot_based::run::<
				Block,
				nimbus_primitives::NimbusPair,
				_,
				_,
				_,
				FullBackend,
				_,
				_,
				_,
				_,
				_,
				_,
			>(nimbus_consensus::collators::slot_based::Params {
				additional_digests_provider: maybe_provide_vrf_digest,
				additional_relay_state_keys: vec![
					relay_chain::well_known_keys::EPOCH_INDEX.to_vec()
				],
				authoring_duration: block_authoring_duration,
				block_import,
				code_hash_provider,
				collator_key,
				collator_service,
				create_inherent_data_providers: move |b, a| async move {
					create_inherent_data_providers(b, a).await
				},
				force_authoring,
				keystore,
				para_backend: backend,
				para_client: client,
				para_id,
				proposer,
				relay_chain_slot_duration,
				relay_client: relay_chain_interface,
				para_slot_duration: None,
				reinitialize: false,
				max_pov_percentage: node_extra_args.max_pov_percentage.map(|p| p as u32),
				export_pov: node_extra_args.export_pov,
				slot_offset: Duration::from_secs(1),
				spawner: task_manager.spawn_essential_handle(),
				block_import_handle: handle,
			});
		}
		MoonbeamBlockImport::Dev(_) => {
			return Err(sc_service::Error::Other(
				"Dev block import should not be used in parachain consensus".into(),
			))
		}
	}
	Ok(())
}
/// Start a normal parachain node.
// Rustfmt wants to format the closure with space indentation.
#[rustfmt::skip]
pub async fn start_node<RuntimeApi, Customizations>(
	parachain_config: Configuration,
	polkadot_config: Configuration,
	collator_options: CollatorOptions,
	para_id: ParaId,
	rpc_config: RpcConfig,
	block_authoring_duration: Duration,
	hwbench: Option<sc_sysinfo::HwBench>,
	node_extra_args: NodeExtraArgs,
) -> sc_service::error::Result<(TaskManager, Arc<FullClient<RuntimeApi>>)>
where
	RuntimeApi:
		ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
	RuntimeApi::RuntimeApi:
		RuntimeApiCollection + cumulus_primitives_core::GetCoreSelectorApi<Block>
		+ cumulus_primitives_core::RelayParentOffsetApi<Block>,
	Customizations: ClientCustomizations + 'static,
{
	start_node_impl::<RuntimeApi, Customizations, sc_network::NetworkWorker<_, _>>(
		parachain_config,
		polkadot_config,
		collator_options,
		para_id,
		rpc_config,
		block_authoring_duration,
		hwbench,
		node_extra_args
	)
	.await
}
/// Builds a new development service. This service uses manual seal, and mocks
/// the parachain inherent.
926
pub async fn new_dev<RuntimeApi, Customizations, Net>(
926
	mut config: Configuration,
926
	para_id: Option<u32>,
926
	_author_id: Option<NimbusId>,
926
	sealing: moonbeam_cli_opt::Sealing,
926
	rpc_config: RpcConfig,
926
	hwbench: Option<sc_sysinfo::HwBench>,
926
	node_extra_args: NodeExtraArgs,
926
) -> Result<TaskManager, ServiceError>
926
where
926
	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
926
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
926
	Customizations: ClientCustomizations + 'static,
926
	Net: NetworkBackend<Block, Hash>,
926
{
	use async_io::Timer;
	use futures::Stream;
	use sc_consensus_manual_seal::{run_manual_seal, EngineCommand, ManualSealParams};
	let sc_service::PartialComponents {
926
		client,
926
		backend,
926
		mut task_manager,
926
		import_queue,
926
		keystore_container,
926
		select_chain: maybe_select_chain,
926
		transaction_pool,
		other:
			(
926
				block_import_pipeline,
926
				filter_pool,
926
				mut telemetry,
926
				_telemetry_worker_handle,
926
				frontier_backend,
926
				fee_history_cache,
			),
926
	} = new_partial::<RuntimeApi, Customizations>(&mut config, &rpc_config, node_extra_args)?;
926
	let block_import = if let MoonbeamBlockImport::Dev(block_import) = block_import_pipeline {
926
		block_import
	} else {
		return Err(ServiceError::Other(
			"Block import pipeline is not dev".to_string(),
		));
	};
926
	let prometheus_registry = config.prometheus_registry().cloned();
926
	let net_config =
926
		FullNetworkConfiguration::<_, _, Net>::new(&config.network, prometheus_registry.clone());
926
	let metrics = Net::register_notification_metrics(
926
		config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
	);
926
	let (network, system_rpc_tx, tx_handler_controller, sync_service) =
926
		sc_service::build_network(sc_service::BuildNetworkParams {
926
			config: &config,
926
			client: client.clone(),
926
			transaction_pool: transaction_pool.clone(),
926
			spawn_handle: task_manager.spawn_handle(),
926
			import_queue,
926
			block_announce_validator_builder: None,
926
			warp_sync_config: None,
926
			net_config,
926
			block_relay: None,
926
			metrics,
926
		})?;
926
	if config.offchain_worker.enabled {
926
		task_manager.spawn_handle().spawn(
			"offchain-workers-runner",
			"offchain-work",
926
			sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
926
				runtime_api_provider: client.clone(),
926
				keystore: Some(keystore_container.keystore()),
926
				offchain_db: backend.offchain_storage(),
926
				transaction_pool: Some(OffchainTransactionPoolFactory::new(
926
					transaction_pool.clone(),
926
				)),
926
				network_provider: Arc::new(network.clone()),
926
				is_validator: config.role.is_authority(),
				enable_http_requests: true,
				custom_extensions: move |_| vec![],
			})?
926
			.run(client.clone(), task_manager.spawn_handle())
926
			.boxed(),
		);
	}
926
	let prometheus_registry = config.prometheus_registry().cloned();
926
	let overrides = Arc::new(StorageOverrideHandler::new(client.clone()));
926
	let fee_history_limit = rpc_config.fee_history_limit;
926
	let mut command_sink = None;
926
	let mut dev_rpc_data = None;
926
	let collator = config.role.is_authority();
926
	let parachain_id: ParaId = para_id
926
		.expect("para ID should be specified for dev service")
926
		.into();
926
	if collator {
926
		let mut env = sc_basic_authorship::ProposerFactory::with_proof_recording(
926
			task_manager.spawn_handle(),
926
			client.clone(),
926
			transaction_pool.clone(),
926
			prometheus_registry.as_ref(),
926
			telemetry.as_ref().map(|x| x.handle()),
		);
926
		env.set_soft_deadline(SOFT_DEADLINE_PERCENT);
926
		let commands_stream: Box<dyn Stream<Item = EngineCommand<H256>> + Send + Sync + Unpin> =
926
			match sealing {
				moonbeam_cli_opt::Sealing::Instant => {
					Box::new(
						// This bit cribbed from the implementation of instant seal.
						transaction_pool.import_notification_stream().map(|_| {
							EngineCommand::SealNewBlock {
								create_empty: false,
								finalize: false,
								parent_hash: None,
								sender: None,
							}
						}),
					)
				}
				moonbeam_cli_opt::Sealing::Manual => {
926
					let (sink, stream) = futures::channel::mpsc::channel(1000);
					// Keep a reference to the other end of the channel. It goes to the RPC.
926
					command_sink = Some(sink);
926
					Box::new(stream)
				}
				moonbeam_cli_opt::Sealing::Interval(millis) => Box::new(StreamExt::map(
					Timer::interval(Duration::from_millis(millis)),
					|_| EngineCommand::SealNewBlock {
						create_empty: true,
						finalize: false,
						parent_hash: None,
						sender: None,
					},
				)),
			};
926
		let select_chain = maybe_select_chain.expect(
926
			"`new_partial` builds a `LongestChainRule` when building dev service.\
926
				We specified the dev service when calling `new_partial`.\
926
				Therefore, a `LongestChainRule` is present. qed.",
		);
		// Create channels for mocked XCM messages.
926
		let (downward_xcm_sender, downward_xcm_receiver) = flume::bounded::<Vec<u8>>(100);
926
		let (hrmp_xcm_sender, hrmp_xcm_receiver) = flume::bounded::<(ParaId, Vec<u8>)>(100);
926
		let additional_relay_offset = Arc::new(std::sync::atomic::AtomicU32::new(0));
926
		dev_rpc_data = Some((
926
			downward_xcm_sender,
926
			hrmp_xcm_sender,
926
			additional_relay_offset.clone(),
926
		));
		// Need to clone it and store here to avoid moving of `client`
		// variable in closure below.
926
		let client_vrf = client.clone();
926
		let keystore_clone = keystore_container.keystore().clone();
926
		let maybe_provide_vrf_digest =
29090
			move |nimbus_id: NimbusId, parent: Hash| -> Option<sp_runtime::generic::DigestItem> {
29090
				moonbeam_vrf::vrf_pre_digest::<Block, FullClient<RuntimeApi>>(
29090
					&client_vrf,
29090
					&keystore_clone,
29090
					nimbus_id,
29090
					parent,
				)
29090
			};
		// Need to clone it and store here to avoid moving of `client`
		// variable in closure below.
926
		let client_for_cidp = client.clone();
926
		task_manager.spawn_essential_handle().spawn_blocking(
			"authorship_task",
926
			Some("block-authoring"),
926
			run_manual_seal(ManualSealParams {
926
				block_import,
926
				env,
926
				client: client.clone(),
926
				pool: transaction_pool.clone(),
926
				commands_stream,
926
				select_chain,
926
				consensus_data_provider: Some(Box::new(NimbusManualSealConsensusDataProvider {
926
					keystore: keystore_container.keystore(),
926
					client: client.clone(),
926
					additional_digests_provider: maybe_provide_vrf_digest,
926
					_phantom: Default::default(),
926
				})),
29090
				create_inherent_data_providers: move |block: H256, ()| {
29090
					let maybe_current_para_block = client_for_cidp.number(block);
29090
					let maybe_current_para_head = client_for_cidp.expect_header(block);
29090
					let downward_xcm_receiver = downward_xcm_receiver.clone();
29090
					let hrmp_xcm_receiver = hrmp_xcm_receiver.clone();
29090
					let additional_relay_offset = additional_relay_offset.clone();
29090
					let relay_slot_key = well_known_keys::CURRENT_SLOT.to_vec();
					// Need to clone it and store here to avoid moving of `client`
					// variable in closure below.
29090
					let client_for_xcm = client_for_cidp.clone();
29090
					async move {
29090
						MockTimestampInherentDataProvider::advance_timestamp(
							RELAY_CHAIN_SLOT_DURATION_MILLIS,
						);
29090
						let current_para_block = maybe_current_para_block?
29090
							.ok_or(sp_blockchain::Error::UnknownBlock(block.to_string()))?;
29090
						let current_para_block_head = Some(polkadot_primitives::HeadData(
29090
							maybe_current_para_head?.encode(),
						));
						// Get the mocked timestamp
29090
						let timestamp = TIMESTAMP.load(Ordering::SeqCst);
						// Calculate mocked slot number
29090
						let slot = timestamp.saturating_div(RELAY_CHAIN_SLOT_DURATION_MILLIS);
29090
						let additional_key_values = vec![
29090
							(relay_slot_key, Slot::from(slot).encode()),
29090
							(
29090
								relay_chain::well_known_keys::ACTIVE_CONFIG.to_vec(),
29090
								AbridgedHostConfiguration {
29090
									max_code_size: 3_145_728,
29090
									max_head_data_size: 20_480,
29090
									max_upward_queue_count: 174_762,
29090
									max_upward_queue_size: 1_048_576,
29090
									max_upward_message_size: 65_531,
29090
									max_upward_message_num_per_candidate: 16,
29090
									hrmp_max_message_num_per_candidate: 10,
29090
									validation_upgrade_cooldown: 6,
29090
									validation_upgrade_delay: 6,
29090
									async_backing_params: AsyncBackingParams {
29090
										max_candidate_depth: 3,
29090
										allowed_ancestry_len: 2,
29090
									},
29090
								}
29090
								.encode(),
29090
							),
						];
29090
						let current_para_head = client_for_xcm
29090
							.header(block)
29090
							.expect("Header lookup should succeed")
29090
							.expect("Header passed in as parent should be present in backend.");
29090
						let should_send_go_ahead = match client_for_xcm
29090
							.runtime_api()
29090
							.collect_collation_info(block, &current_para_head)
						{
29090
							Ok(info) => info.new_validation_code.is_some(),
							Err(e) => {
								log::error!("Failed to collect collation info: {:?}", e);
								false
							}
						};
29090
						let mocked_parachain = MockValidationDataInherentDataProvider {
29090
							current_para_block,
29090
							para_id: parachain_id,
29090
							upgrade_go_ahead: should_send_go_ahead.then(|| {
								log::info!(
									"Detected pending validation code, sending go-ahead signal."
								);
								UpgradeGoAhead::GoAhead
							}),
29090
							current_para_block_head,
29090
							relay_offset: additional_relay_offset.load(Ordering::SeqCst),
							relay_blocks_per_para_block: 1,
							para_blocks_per_relay_epoch: 10,
29090
							relay_randomness_config: (),
29090
							xcm_config: MockXcmConfig::new(
29090
								&*client_for_xcm,
29090
								block,
29090
								Default::default(),
							),
29090
							raw_downward_messages: downward_xcm_receiver.drain().collect(),
29090
							raw_horizontal_messages: hrmp_xcm_receiver.drain().collect(),
29090
							additional_key_values: Some(additional_key_values),
						};
29090
						let randomness = session_keys_primitives::InherentDataProvider;
29090
						Ok((
29090
							MockTimestampInherentDataProvider,
29090
							mocked_parachain,
29090
							randomness,
29090
						))
29090
					}
29090
				},
			}),
		);
	}
	// Sinks for pubsub notifications.
	// Everytime a new subscription is created, a new mpsc channel is added to the sink pool.
	// The MappingSyncWorker sends through the channel on block import and the subscription emits a
	// notification to the subscriber on receiving a message through this channel.
	// This way we avoid race conditions when using native substrate block import notification
	// stream.
926
	let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
926
		fc_mapping_sync::EthereumBlockNotification<Block>,
926
	> = Default::default();
926
	let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
926
	rpc::spawn_essential_tasks(
926
		rpc::SpawnTasksParams {
926
			task_manager: &task_manager,
926
			client: client.clone(),
926
			substrate_backend: backend.clone(),
926
			frontier_backend: frontier_backend.clone(),
926
			filter_pool: filter_pool.clone(),
926
			overrides: overrides.clone(),
926
			fee_history_limit,
926
			fee_history_cache: fee_history_cache.clone(),
926
		},
926
		sync_service.clone(),
926
		pubsub_notification_sinks.clone(),
	);
926
	let ethapi_cmd = rpc_config.ethapi.clone();
926
	let tracing_requesters =
926
		if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
			rpc::tracing::spawn_tracing_tasks(
				&rpc_config,
				prometheus_registry.clone(),
				rpc::SpawnTasksParams {
					task_manager: &task_manager,
					client: client.clone(),
					substrate_backend: backend.clone(),
					frontier_backend: frontier_backend.clone(),
					filter_pool: filter_pool.clone(),
					overrides: overrides.clone(),
					fee_history_limit,
					fee_history_cache: fee_history_cache.clone(),
				},
			)
		} else {
926
			rpc::tracing::RpcRequesters {
926
				debug: None,
926
				trace: None,
926
			}
		};
926
	let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
926
		task_manager.spawn_handle(),
926
		overrides.clone(),
926
		rpc_config.eth_log_block_cache,
926
		rpc_config.eth_statuses_cache,
926
		prometheus_registry,
	));
926
	let rpc_builder = {
926
		let client = client.clone();
926
		let pool = transaction_pool.clone();
926
		let backend = backend.clone();
926
		let network = network.clone();
926
		let sync = sync_service.clone();
926
		let ethapi_cmd = ethapi_cmd.clone();
926
		let max_past_logs = rpc_config.max_past_logs;
926
		let max_block_range = rpc_config.max_block_range;
926
		let overrides = overrides.clone();
926
		let fee_history_cache = fee_history_cache.clone();
926
		let block_data_cache = block_data_cache.clone();
926
		let pubsub_notification_sinks = pubsub_notification_sinks.clone();
926
		let keystore = keystore_container.keystore();
1852
		move |subscription_task_executor| {
1852
			let deps = rpc::FullDeps {
1852
				backend: backend.clone(),
1852
				client: client.clone(),
1852
				command_sink: command_sink.clone(),
1852
				ethapi_cmd: ethapi_cmd.clone(),
1852
				filter_pool: filter_pool.clone(),
1852
				frontier_backend: match &*frontier_backend {
1852
					fc_db::Backend::KeyValue(b) => b.clone(),
					fc_db::Backend::Sql(b) => b.clone(),
				},
1852
				graph: pool.clone(),
1852
				pool: pool.clone(),
1852
				is_authority: collator,
1852
				max_past_logs,
1852
				max_block_range,
1852
				fee_history_limit,
1852
				fee_history_cache: fee_history_cache.clone(),
1852
				network: network.clone(),
1852
				sync: sync.clone(),
1852
				dev_rpc_data: dev_rpc_data.clone(),
1852
				overrides: overrides.clone(),
1852
				block_data_cache: block_data_cache.clone(),
1852
				forced_parent_hashes: None,
			};
1852
			let pending_consensus_data_provider = Box::new(PendingConsensusDataProvider::new(
1852
				client.clone(),
1852
				keystore.clone(),
			));
1852
			if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
				rpc::create_full(
					deps,
					subscription_task_executor,
					Some(crate::rpc::TracingConfig {
						tracing_requesters: tracing_requesters.clone(),
						trace_filter_max_count: rpc_config.ethapi_trace_max_count,
					}),
					pubsub_notification_sinks.clone(),
					pending_consensus_data_provider,
					parachain_id,
				)
				.map_err(Into::into)
			} else {
1852
				rpc::create_full(
1852
					deps,
1852
					subscription_task_executor,
1852
					None,
1852
					pubsub_notification_sinks.clone(),
1852
					pending_consensus_data_provider,
1852
					parachain_id,
				)
1852
				.map_err(Into::into)
			}
1852
		}
	};
926
	let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
926
		network,
926
		client,
926
		keystore: keystore_container.keystore(),
926
		task_manager: &mut task_manager,
926
		transaction_pool,
926
		rpc_builder: Box::new(rpc_builder),
926
		backend,
926
		system_rpc_tx,
926
		sync_service: sync_service.clone(),
926
		config,
926
		tx_handler_controller,
926
		telemetry: None,
926
	})?;
926
	if let Some(hwbench) = hwbench {
		sc_sysinfo::print_hwbench(&hwbench);
		if let Some(ref mut telemetry) = telemetry {
			let telemetry_handle = telemetry.handle();
			task_manager.spawn_handle().spawn(
				"telemetry_hwbench",
				None,
				sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
			);
		}
926
	}
926
	log::info!("Development Service Ready");
926
	Ok(task_manager)
926
}
#[cfg(test)]
mod tests {
	use crate::chain_spec::moonbase::ChainSpec;
	use crate::chain_spec::Extensions;
	use jsonrpsee::server::BatchRequestConfig;
	use moonbase_runtime::AccountId;
	use prometheus::{proto::LabelPair, Counter};
	use sc_network::config::NetworkConfiguration;
	use sc_service::config::RpcConfiguration;
	use sc_service::ChainType;
	use sc_service::{
		config::{BasePath, DatabaseSource, KeystoreConfig},
		Configuration, Role,
	};
	use std::path::Path;
	use std::str::FromStr;
	use super::*;
	#[test]
	fn test_set_prometheus_registry_uses_moonbeam_prefix() {
		let counter_name = "my_counter";
		let expected_metric_name = "moonbeam_my_counter";
		let counter = Box::new(Counter::new(counter_name, "foobar").unwrap());
		let mut config = Configuration {
			prometheus_config: Some(PrometheusConfig::new_with_default_registry(
				"0.0.0.0:8080".parse().unwrap(),
				"".into(),
			)),
			..test_config("test")
		};
		set_prometheus_registry(&mut config, false).unwrap();
		// generate metric
		let reg = config.prometheus_registry().unwrap();
		reg.register(counter.clone()).unwrap();
		counter.inc();
		let actual_metric_name = reg.gather().first().unwrap().get_name().to_string();
		assert_eq!(actual_metric_name.as_str(), expected_metric_name);
	}
	#[test]
	fn test_set_prometheus_registry_skips_moonbeam_prefix() {
		let counter_name = "my_counter";
		let counter = Box::new(Counter::new(counter_name, "foobar").unwrap());
		let mut config = Configuration {
			prometheus_config: Some(PrometheusConfig::new_with_default_registry(
				"0.0.0.0:8080".parse().unwrap(),
				"".into(),
			)),
			..test_config("test")
		};
		set_prometheus_registry(&mut config, true).unwrap();
		// generate metric
		let reg = config.prometheus_registry().unwrap();
		reg.register(counter.clone()).unwrap();
		counter.inc();
		let actual_metric_name = reg.gather().first().unwrap().get_name().to_string();
		assert_eq!(actual_metric_name.as_str(), counter_name);
	}
	#[test]
	fn test_set_prometheus_registry_adds_chain_id_as_label() {
		let input_chain_id = "moonriver";
		let mut expected_label = LabelPair::default();
		expected_label.set_name("chain".to_owned());
		expected_label.set_value("moonriver".to_owned());
		let expected_chain_label = Some(expected_label);
		let counter = Box::new(Counter::new("foo", "foobar").unwrap());
		let mut config = Configuration {
			prometheus_config: Some(PrometheusConfig::new_with_default_registry(
				"0.0.0.0:8080".parse().unwrap(),
				"".into(),
			)),
			..test_config(input_chain_id)
		};
		set_prometheus_registry(&mut config, false).unwrap();
		// generate metric
		let reg = config.prometheus_registry().unwrap();
		reg.register(counter.clone()).unwrap();
		counter.inc();
		let actual_chain_label = reg
			.gather()
			.first()
			.unwrap()
			.get_metric()
			.first()
			.unwrap()
			.get_label()
			.into_iter()
			.find(|x| x.get_name() == "chain")
			.cloned();
		assert_eq!(actual_chain_label, expected_chain_label);
	}
	#[test]
	fn dalek_does_not_panic() {
		use futures::executor::block_on;
		use sc_block_builder::BlockBuilderBuilder;
		use sc_client_db::{Backend, BlocksPruning, DatabaseSettings, DatabaseSource, PruningMode};
		use sp_api::ProvideRuntimeApi;
		use sp_consensus::BlockOrigin;
		use substrate_test_runtime::TestAPI;
		use substrate_test_runtime_client::runtime::Block;
		use substrate_test_runtime_client::{
			ClientBlockImportExt, TestClientBuilder, TestClientBuilderExt,
		};
		fn zero_ed_pub() -> sp_core::ed25519::Public {
			sp_core::ed25519::Public::default()
		}
		// This is an invalid signature
		// this breaks after ed25519 1.3. It makes the signature panic at creation
		// This test ensures we should never panic
		fn invalid_sig() -> sp_core::ed25519::Signature {
			let signature = hex_literal::hex!(
				"a25b94f9c64270fdfffa673f11cfe961633e3e4972e6940a3cf
		7351dd90b71447041a83583a52cee1cf21b36ba7fd1d0211dca58b48d997fc78d9bc82ab7a38e"
			);
			sp_core::ed25519::Signature::from_raw(signature[0..64].try_into().unwrap())
		}
		let tmp = tempfile::tempdir().unwrap();
		let backend = Arc::new(
			Backend::new(
				DatabaseSettings {
					trie_cache_maximum_size: Some(1 << 20),
					state_pruning: Some(PruningMode::ArchiveAll),
					blocks_pruning: BlocksPruning::KeepAll,
					source: DatabaseSource::RocksDb {
						path: tmp.path().into(),
						cache_size: 1024,
					},
					metrics_registry: None,
				},
				u64::MAX,
			)
			.unwrap(),
		);
		let client = TestClientBuilder::with_backend(backend).build();
		client
			.execution_extensions()
			.set_extensions_factory(sc_client_api::execution_extensions::ExtensionBeforeBlock::<
			Block,
			sp_io::UseDalekExt,
		>::new(1));
		let a1 = BlockBuilderBuilder::new(&client)
			.on_parent_block(client.chain_info().genesis_hash)
			.with_parent_block_number(0)
			// Enable proof recording if required. This call is optional.
			.enable_proof_recording()
			.build()
			.unwrap()
			.build()
			.unwrap()
			.block;
		block_on(client.import(BlockOrigin::NetworkInitialSync, a1.clone())).unwrap();
		// On block zero it will use dalek
		// shouldnt panic on importing invalid sig
		assert!(!client
			.runtime_api()
			.verify_ed25519(
				client.chain_info().genesis_hash,
				invalid_sig(),
				zero_ed_pub(),
				vec![]
			)
			.unwrap());
	}
	fn test_config(chain_id: &str) -> Configuration {
		let network_config = NetworkConfiguration::new("", "", Default::default(), None);
		let runtime = tokio::runtime::Runtime::new().expect("failed creating tokio runtime");
		let spec = ChainSpec::builder(&[0u8], Extensions::default())
			.with_name("test")
			.with_id(chain_id)
			.with_chain_type(ChainType::Local)
			.with_genesis_config(moonbase_runtime::genesis_config_preset::testnet_genesis(
				AccountId::from_str("6Be02d1d3665660d22FF9624b7BE0551ee1Ac91b").unwrap(),
				vec![],
				vec![],
				vec![],
				vec![],
				vec![],
				ParaId::new(0),
				0,
			))
			.build();
		Configuration {
			impl_name: String::from("test-impl"),
			impl_version: String::from("0.1"),
			role: Role::Full,
			tokio_handle: runtime.handle().clone(),
			transaction_pool: Default::default(),
			network: network_config,
			keystore: KeystoreConfig::Path {
				path: "key".into(),
				password: None,
			},
			database: DatabaseSource::RocksDb {
				path: "db".into(),
				cache_size: 128,
			},
			trie_cache_maximum_size: Some(16777216),
			warm_up_trie_cache: None,
			state_pruning: Default::default(),
			blocks_pruning: sc_service::BlocksPruning::KeepAll,
			chain_spec: Box::new(spec),
			executor: Default::default(),
			wasm_runtime_overrides: Default::default(),
			rpc: RpcConfiguration {
				addr: None,
				max_connections: Default::default(),
				cors: None,
				methods: Default::default(),
				max_request_size: Default::default(),
				max_response_size: Default::default(),
				id_provider: None,
				max_subs_per_conn: Default::default(),
				port: Default::default(),
				message_buffer_capacity: Default::default(),
				batch_config: BatchRequestConfig::Unlimited,
				rate_limit: Default::default(),
				rate_limit_whitelisted_ips: vec![],
				rate_limit_trust_proxy_headers: false,
			},
			data_path: Default::default(),
			prometheus_config: None,
			telemetry_endpoints: None,
			offchain_worker: Default::default(),
			force_authoring: false,
			disable_grandpa: false,
			dev_key_seed: None,
			tracing_targets: None,
			tracing_receiver: Default::default(),
			announce_block: true,
			base_path: BasePath::new(Path::new("")),
		}
	}
}
struct PendingConsensusDataProvider<Client>
where
	Client: HeaderBackend<Block> + sp_api::ProvideRuntimeApi<Block> + Send + Sync,
	Client::Api: VrfApi<Block>,
{
	client: Arc<Client>,
	keystore: Arc<dyn Keystore>,
}
impl<Client> PendingConsensusDataProvider<Client>
where
	Client: HeaderBackend<Block> + sp_api::ProvideRuntimeApi<Block> + Send + Sync,
	Client::Api: VrfApi<Block>,
{
1852
	pub fn new(client: Arc<Client>, keystore: Arc<dyn Keystore>) -> Self {
1852
		Self { client, keystore }
1852
	}
}
impl<Client> fc_rpc::pending::ConsensusDataProvider<Block> for PendingConsensusDataProvider<Client>
where
	Client: HeaderBackend<Block> + sp_api::ProvideRuntimeApi<Block> + Send + Sync,
	Client::Api: VrfApi<Block>,
{
8
	fn create_digest(
8
		&self,
8
		parent: &Header,
8
		_data: &sp_inherents::InherentData,
8
	) -> Result<sp_runtime::Digest, sp_inherents::Error> {
8
		let hash = parent.hash();
		// Get the digest from the best block header.
8
		let mut digest = self
8
			.client
8
			.header(hash)
8
			.map_err(|e| sp_inherents::Error::Application(Box::new(e)))?
8
			.ok_or(sp_inherents::Error::Application(
8
				"Best block header should be present".into(),
8
			))?
			.digest;
		// Get the nimbus id from the digest.
8
		let nimbus_id = digest
8
			.logs
8
			.iter()
8
			.find_map(|x| {
8
				if let DigestItem::PreRuntime(nimbus_primitives::NIMBUS_ENGINE_ID, nimbus_id) = x {
8
					Some(NimbusId::from_slice(nimbus_id.as_slice()).map_err(|_| {
						sp_inherents::Error::Application(
							"Nimbus pre-runtime digest should be valid".into(),
						)
					}))
				} else {
					None
				}
8
			})
8
			.ok_or(sp_inherents::Error::Application(
8
				"Nimbus pre-runtime digest should be present".into(),
8
			))??;
		// Remove the old VRF digest.
16
		let pos = digest.logs.iter().position(|x| {
8
			matches!(
16
				x,
				DigestItem::PreRuntime(session_keys_primitives::VRF_ENGINE_ID, _)
			)
16
		});
8
		if let Some(pos) = pos {
8
			digest.logs.remove(pos);
8
		}
		// Create the VRF digest.
8
		let vrf_digest = VrfDigestsProvider::new(self.client.clone(), self.keystore.clone())
8
			.provide_digests(nimbus_id, hash);
		// Append the VRF digest to the digest.
8
		digest.logs.extend(vrf_digest);
8
		Ok(digest)
8
	}
}