1
// Copyright 2019-2025 PureStake Inc.
2
// This file is part of Moonbeam.
3

            
4
// Moonbeam is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Moonbeam is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Moonbeam.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
//! This module assembles the Moonbeam service components, executes them, and manages communication
18
//! between them. This is the backbone of the client-side node implementation.
19
//!
20
//! This module can assemble:
21
//! PartialComponents: For maintenance tasks without a complete node (eg import/export blocks, purge)
22
//! Full Service: A complete parachain node including the pool, rpc, network, embedded relay chain
23
//! Dev Service: A leaner service without the relay chain backing.
24

            
25
pub mod rpc;
26

            
27
use cumulus_client_cli::CollatorOptions;
28
use cumulus_client_collator::service::CollatorService;
29
use cumulus_client_consensus_common::ParachainBlockImport as TParachainBlockImport;
30
use cumulus_client_parachain_inherent::{MockValidationDataInherentDataProvider, MockXcmConfig};
31
use cumulus_client_service::{
32
	prepare_node_config, start_relay_chain_tasks, CollatorSybilResistance, DARecoveryProfile,
33
	ParachainHostFunctions, ParachainTracingExecuteBlock, StartRelayChainTasksParams,
34
};
35
use cumulus_primitives_core::{
36
	relay_chain::{self, well_known_keys, CollatorPair},
37
	CollectCollationInfo, ParaId,
38
};
39
use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain;
40
use cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface, RelayChainResult};
41
use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node_with_rpc;
42
use fc_consensus::FrontierBlockImport as TFrontierBlockImport;
43
use fc_db::DatabaseSource;
44
use fc_rpc::StorageOverrideHandler;
45
use fc_rpc_core::types::{FeeHistoryCache, FilterPool};
46
use futures::{FutureExt, StreamExt};
47
use maplit::hashmap;
48
#[cfg(feature = "moonbase-native")]
49
pub use moonbase_runtime;
50
use moonbeam_cli_opt::{
51
	AuthoringPolicy, EthApi as EthApiCmd, FrontierBackendConfig, NodeExtraArgs, RpcConfig,
52
};
53
#[cfg(feature = "moonbeam-native")]
54
pub use moonbeam_runtime;
55
use moonbeam_vrf::VrfDigestsProvider;
56
#[cfg(feature = "moonriver-native")]
57
pub use moonriver_runtime;
58
use nimbus_consensus::collators::slot_based::SlotBasedBlockImportHandle;
59
use nimbus_consensus::{
60
	collators::slot_based::SlotBasedBlockImport, NimbusManualSealConsensusDataProvider,
61
};
62
use nimbus_primitives::{DigestsProvider, NimbusId};
63
use polkadot_primitives::{AbridgedHostConfiguration, AsyncBackingParams, Slot, UpgradeGoAhead};
64
use sc_client_api::{
65
	backend::{AuxStore, Backend, StateBackend, StorageProvider},
66
	ExecutorProvider,
67
};
68
use sc_consensus::{BlockImport, ImportQueue};
69
use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
70
use sc_network::{config::FullNetworkConfiguration, NetworkBackend, NetworkBlock, PeerId};
71
use sc_service::config::PrometheusConfig;
72
use sc_service::{
73
	error::Error as ServiceError, ChainSpec, Configuration, PartialComponents, TFullBackend,
74
	TFullClient, TaskManager,
75
};
76
use sc_telemetry::{Telemetry, TelemetryHandle, TelemetryWorker, TelemetryWorkerHandle};
77
use sc_transaction_pool_api::{OffchainTransactionPoolFactory, TransactionPool};
78
use session_keys_primitives::VrfApi;
79
use sp_api::{ConstructRuntimeApi, ProvideRuntimeApi};
80
use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
81
use sp_consensus::SyncOracle;
82
use sp_core::{ByteArray, Encode, H256};
83
use sp_keystore::{Keystore, KeystorePtr};
84
use std::str::FromStr;
85
use std::sync::atomic::{AtomicU64, Ordering};
86
use std::sync::Arc;
87
use std::{collections::BTreeMap, path::Path, sync::Mutex, time::Duration};
88
use substrate_prometheus_endpoint::Registry;
89

            
90
pub use client::*;
91
pub mod chain_spec;
92
mod client;
93
#[cfg(feature = "lazy-loading")]
94
pub mod lazy_loading;
95

            
96
type FullClient<RuntimeApi> = TFullClient<Block, RuntimeApi, WasmExecutor<HostFunctions>>;
97
type FullBackend = TFullBackend<Block>;
98

            
99
type MaybeSelectChain<Backend> = Option<sc_consensus::LongestChain<Backend, Block>>;
100
type FrontierBlockImport<Client> = TFrontierBlockImport<Block, Arc<Client>, Client>;
101
type PartialComponentsResult<Client, Backend> = Result<
102
	PartialComponents<
103
		Client,
104
		Backend,
105
		MaybeSelectChain<Backend>,
106
		sc_consensus::DefaultImportQueue<Block>,
107
		sc_transaction_pool::TransactionPoolHandle<Block, Client>,
108
		(
109
			MoonbeamBlockImport<Client>,
110
			Option<FilterPool>,
111
			Option<Telemetry>,
112
			Option<TelemetryWorkerHandle>,
113
			Arc<fc_db::Backend<Block, Client>>,
114
			FeeHistoryCache,
115
		),
116
	>,
117
	ServiceError,
118
>;
119

            
120
const RELAY_CHAIN_SLOT_DURATION_MILLIS: u64 = 6_000;
121

            
122
static TIMESTAMP: AtomicU64 = AtomicU64::new(0);
123

            
124
/// Provide a mock duration starting at 0 in millisecond for timestamp inherent.
125
/// Each call will increment timestamp by slot_duration making Aura think time has passed.
126
struct MockTimestampInherentDataProvider;
127

            
128
impl MockTimestampInherentDataProvider {
129
29082
	fn advance_timestamp(slot_duration: u64) {
130
29082
		if TIMESTAMP.load(Ordering::SeqCst) == 0 {
131
878
			// Initialize timestamp inherent provider
132
878
			TIMESTAMP.store(
133
878
				sp_timestamp::Timestamp::current().as_millis(),
134
878
				Ordering::SeqCst,
135
878
			);
136
28204
		} else {
137
28204
			TIMESTAMP.fetch_add(slot_duration, Ordering::SeqCst);
138
28204
		}
139
29082
	}
140
}
141

            
142
#[async_trait::async_trait]
143
impl sp_inherents::InherentDataProvider for MockTimestampInherentDataProvider {
144
	async fn provide_inherent_data(
145
		&self,
146
		inherent_data: &mut sp_inherents::InherentData,
147
29082
	) -> Result<(), sp_inherents::Error> {
148
		inherent_data.put_data(
149
			sp_timestamp::INHERENT_IDENTIFIER,
150
			&TIMESTAMP.load(Ordering::SeqCst),
151
		)
152
29082
	}
153

            
154
	async fn try_handle_error(
155
		&self,
156
		_identifier: &sp_inherents::InherentIdentifier,
157
		_error: &[u8],
158
	) -> Option<Result<(), sp_inherents::Error>> {
159
		// The pallet never reports error.
160
		None
161
	}
162
}
163

            
164
#[cfg(feature = "runtime-benchmarks")]
165
pub type HostFunctions = (
166
	frame_benchmarking::benchmarking::HostFunctions,
167
	ParachainHostFunctions,
168
	moonbeam_primitives_ext::moonbeam_ext::HostFunctions,
169
);
170
#[cfg(not(feature = "runtime-benchmarks"))]
171
pub type HostFunctions = (
172
	ParachainHostFunctions,
173
	moonbeam_primitives_ext::moonbeam_ext::HostFunctions,
174
);
175

            
176
pub enum MoonbeamBlockImport<Client> {
177
	/// Used in dev mode to import new blocks as best blocks.
178
	Dev(FrontierBlockImport<Client>),
179
	/// Used in parachain mode with lookahead authoring policy.
180
	ParachainLookahead(FrontierBlockImport<Client>),
181
	/// Used in parachain mode with slot-based authoring policy.
182
	ParachainSlotBased(
183
		SlotBasedBlockImport<Block, FrontierBlockImport<Client>, Client>,
184
		SlotBasedBlockImportHandle<Block>,
185
	),
186
}
187

            
188
impl<Client> MoonbeamBlockImport<Client>
189
where
190
	Client: Send + Sync + 'static,
191
	Client: ProvideRuntimeApi<Block>,
192
	Client::Api: sp_block_builder::BlockBuilder<Block> + fp_rpc::EthereumRuntimeRPCApi<Block>,
193
	FrontierBlockImport<Client>: BlockImport<Block>,
194
	SlotBasedBlockImport<Block, FrontierBlockImport<Client>, Client>: BlockImport<Block, Error = <FrontierBlockImport<Client> as BlockImport<Block>>::Error>
195
		+ 'static,
196
{
197
930
	fn new(
198
930
		frontier_block_import: FrontierBlockImport<Client>,
199
930
		client: Arc<Client>,
200
930
		authoring_policy: AuthoringPolicy,
201
930
		is_dev: bool,
202
930
	) -> Self {
203
930
		if is_dev {
204
930
			MoonbeamBlockImport::Dev(frontier_block_import)
205
		} else {
206
			match authoring_policy {
207
				AuthoringPolicy::Lookahead => {
208
					MoonbeamBlockImport::ParachainLookahead(frontier_block_import)
209
				}
210
				AuthoringPolicy::SlotBased => {
211
					let (block_import, block_import_auxiliary_data) =
212
						SlotBasedBlockImport::new(frontier_block_import, client);
213
					MoonbeamBlockImport::ParachainSlotBased(
214
						block_import,
215
						block_import_auxiliary_data,
216
					)
217
				}
218
			}
219
		}
220
930
	}
221
}
222

            
223
/// A trait that must be implemented by all moon* runtimes executors.
224
///
225
/// This feature allows, for instance, to customize the client extensions according to the type
226
/// of network.
227
/// For the moment, this feature is only used to specify the first block compatible with
228
/// ed25519-zebra, but it could be used for other things in the future.
229
pub trait ClientCustomizations {
230
	/// The host function ed25519_verify has changed its behavior in the substrate history,
231
	/// because of the change from lib ed25519-dalek to lib ed25519-zebra.
232
	/// Some networks may have old blocks that are not compatible with ed25519-zebra,
233
	/// for these networks this function should return the 1st block compatible with the new lib.
234
	/// If this function returns None (default behavior), it implies that all blocks are compatible
235
	/// with the new lib (ed25519-zebra).
236
	fn first_block_number_compatible_with_ed25519_zebra() -> Option<u32> {
237
		None
238
	}
239
}
240

            
241
#[cfg(feature = "moonbeam-native")]
242
pub struct MoonbeamCustomizations;
243
#[cfg(feature = "moonbeam-native")]
244
impl ClientCustomizations for MoonbeamCustomizations {
245
	fn first_block_number_compatible_with_ed25519_zebra() -> Option<u32> {
246
		Some(2_000_000)
247
	}
248
}
249

            
250
#[cfg(feature = "moonriver-native")]
251
pub struct MoonriverCustomizations;
252
#[cfg(feature = "moonriver-native")]
253
impl ClientCustomizations for MoonriverCustomizations {
254
	fn first_block_number_compatible_with_ed25519_zebra() -> Option<u32> {
255
		Some(3_000_000)
256
	}
257
}
258

            
259
#[cfg(feature = "moonbase-native")]
260
pub struct MoonbaseCustomizations;
261
#[cfg(feature = "moonbase-native")]
262
impl ClientCustomizations for MoonbaseCustomizations {
263
930
	fn first_block_number_compatible_with_ed25519_zebra() -> Option<u32> {
264
930
		Some(3_000_000)
265
930
	}
266
}
267

            
268
/// Trivial enum representing runtime variant
269
#[derive(Clone)]
270
pub enum RuntimeVariant {
271
	#[cfg(feature = "moonbeam-native")]
272
	Moonbeam,
273
	#[cfg(feature = "moonriver-native")]
274
	Moonriver,
275
	#[cfg(feature = "moonbase-native")]
276
	Moonbase,
277
	Unrecognized,
278
}
279

            
280
impl RuntimeVariant {
281
	pub fn from_chain_spec(chain_spec: &Box<dyn ChainSpec>) -> Self {
282
		match chain_spec {
283
			#[cfg(feature = "moonbeam-native")]
284
			spec if spec.is_moonbeam() => Self::Moonbeam,
285
			#[cfg(feature = "moonriver-native")]
286
			spec if spec.is_moonriver() => Self::Moonriver,
287
			#[cfg(feature = "moonbase-native")]
288
			spec if spec.is_moonbase() => Self::Moonbase,
289
			_ => Self::Unrecognized,
290
		}
291
	}
292
}
293

            
294
/// Can be called for a `Configuration` to check if it is a configuration for
295
/// the `Moonbeam` network.
296
pub trait IdentifyVariant {
297
	/// Returns `true` if this is a configuration for the `Moonbase` network.
298
	fn is_moonbase(&self) -> bool;
299

            
300
	/// Returns `true` if this is a configuration for the `Moonbeam` network.
301
	fn is_moonbeam(&self) -> bool;
302

            
303
	/// Returns `true` if this is a configuration for the `Moonriver` network.
304
	fn is_moonriver(&self) -> bool;
305

            
306
	/// Returns `true` if this is a configuration for a dev network.
307
	fn is_dev(&self) -> bool;
308
}
309

            
310
impl IdentifyVariant for Box<dyn ChainSpec> {
311
	fn is_moonbase(&self) -> bool {
312
		self.id().starts_with("moonbase")
313
	}
314

            
315
930
	fn is_moonbeam(&self) -> bool {
316
930
		self.id().starts_with("moonbeam")
317
930
	}
318

            
319
930
	fn is_moonriver(&self) -> bool {
320
930
		self.id().starts_with("moonriver")
321
930
	}
322

            
323
2788
	fn is_dev(&self) -> bool {
324
2788
		self.chain_type() == sc_chain_spec::ChainType::Development
325
2788
	}
326
}
327

            
328
930
pub fn frontier_database_dir(config: &Configuration, path: &str) -> std::path::PathBuf {
329
930
	config
330
930
		.base_path
331
930
		.config_dir(config.chain_spec.id())
332
930
		.join("frontier")
333
930
		.join(path)
334
930
}
335

            
336
// TODO This is copied from frontier. It should be imported instead after
337
// https://github.com/paritytech/frontier/issues/333 is solved
338
930
pub fn open_frontier_backend<C, BE>(
339
930
	client: Arc<C>,
340
930
	config: &Configuration,
341
930
	rpc_config: &RpcConfig,
342
930
) -> Result<fc_db::Backend<Block, C>, String>
343
930
where
344
930
	C: ProvideRuntimeApi<Block> + StorageProvider<Block, BE> + AuxStore,
345
930
	C: HeaderBackend<Block> + HeaderMetadata<Block, Error = BlockChainError>,
346
930
	C: Send + Sync + 'static,
347
930
	C::Api: fp_rpc::EthereumRuntimeRPCApi<Block>,
348
930
	BE: Backend<Block> + 'static,
349
930
	BE::State: StateBackend<BlakeTwo256>,
350
{
351
930
	let frontier_backend = match rpc_config.frontier_backend_config {
352
		FrontierBackendConfig::KeyValue => {
353
930
			fc_db::Backend::KeyValue(Arc::new(fc_db::kv::Backend::<Block, C>::new(
354
930
				client,
355
				&fc_db::kv::DatabaseSettings {
356
930
					source: match config.database {
357
930
						DatabaseSource::RocksDb { .. } => DatabaseSource::RocksDb {
358
930
							path: frontier_database_dir(config, "db"),
359
930
							cache_size: 0,
360
930
						},
361
						DatabaseSource::ParityDb { .. } => DatabaseSource::ParityDb {
362
							path: frontier_database_dir(config, "paritydb"),
363
						},
364
						DatabaseSource::Auto { .. } => DatabaseSource::Auto {
365
							rocksdb_path: frontier_database_dir(config, "db"),
366
							paritydb_path: frontier_database_dir(config, "paritydb"),
367
							cache_size: 0,
368
						},
369
						_ => {
370
							return Err(
371
								"Supported db sources: `rocksdb` | `paritydb` | `auto`".to_string()
372
							)
373
						}
374
					},
375
				},
376
			)?))
377
		}
378
		FrontierBackendConfig::Sql {
379
			pool_size,
380
			num_ops_timeout,
381
			thread_count,
382
			cache_size,
383
		} => {
384
			let overrides = Arc::new(StorageOverrideHandler::new(client.clone()));
385
			let sqlite_db_path = frontier_database_dir(config, "sql");
386
			std::fs::create_dir_all(&sqlite_db_path).expect("failed creating sql db directory");
387
			let backend = futures::executor::block_on(fc_db::sql::Backend::new(
388
				fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig {
389
					path: Path::new("sqlite:///")
390
						.join(sqlite_db_path)
391
						.join("frontier.db3")
392
						.to_str()
393
						.expect("frontier sql path error"),
394
					create_if_missing: true,
395
					thread_count: thread_count,
396
					cache_size: cache_size,
397
				}),
398
				pool_size,
399
				std::num::NonZeroU32::new(num_ops_timeout),
400
				overrides.clone(),
401
			))
402
			.unwrap_or_else(|err| panic!("failed creating sql backend: {:?}", err));
403
			fc_db::Backend::Sql(Arc::new(backend))
404
		}
405
	};
406

            
407
930
	Ok(frontier_backend)
408
930
}
409

            
410
use sp_runtime::{traits::BlakeTwo256, DigestItem, Percent};
411

            
412
pub const SOFT_DEADLINE_PERCENT: Percent = Percent::from_percent(100);
413

            
414
/// Builds a new object suitable for chain operations.
415
#[allow(clippy::type_complexity)]
416
pub fn new_chain_ops(
417
	config: &mut Configuration,
418
	rpc_config: &RpcConfig,
419
	node_extra_args: NodeExtraArgs,
420
) -> Result<
421
	(
422
		Arc<Client>,
423
		Arc<FullBackend>,
424
		sc_consensus::BasicQueue<Block>,
425
		TaskManager,
426
	),
427
	ServiceError,
428
> {
429
	match &config.chain_spec {
430
		#[cfg(feature = "moonriver-native")]
431
		spec if spec.is_moonriver() => new_chain_ops_inner::<
432
			moonriver_runtime::RuntimeApi,
433
			MoonriverCustomizations,
434
		>(config, rpc_config, node_extra_args),
435
		#[cfg(feature = "moonbeam-native")]
436
		spec if spec.is_moonbeam() => new_chain_ops_inner::<
437
			moonbeam_runtime::RuntimeApi,
438
			MoonbeamCustomizations,
439
		>(config, rpc_config, node_extra_args),
440
		#[cfg(feature = "moonbase-native")]
441
		_ => new_chain_ops_inner::<moonbase_runtime::RuntimeApi, MoonbaseCustomizations>(
442
			config,
443
			rpc_config,
444
			node_extra_args,
445
		),
446
		#[cfg(not(feature = "moonbase-native"))]
447
		_ => panic!("invalid chain spec"),
448
	}
449
}
450

            
451
#[allow(clippy::type_complexity)]
452
fn new_chain_ops_inner<RuntimeApi, Customizations>(
453
	config: &mut Configuration,
454
	rpc_config: &RpcConfig,
455
	node_extra_args: NodeExtraArgs,
456
) -> Result<
457
	(
458
		Arc<Client>,
459
		Arc<FullBackend>,
460
		sc_consensus::BasicQueue<Block>,
461
		TaskManager,
462
	),
463
	ServiceError,
464
>
465
where
466
	Client: From<Arc<crate::FullClient<RuntimeApi>>>,
467
	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
468
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
469
	Customizations: ClientCustomizations + 'static,
470
{
471
	config.keystore = sc_service::config::KeystoreConfig::InMemory;
472
	let PartialComponents {
473
		client,
474
		backend,
475
		import_queue,
476
		task_manager,
477
		..
478
	} = new_partial::<RuntimeApi, Customizations>(config, rpc_config, node_extra_args)?;
479
	Ok((
480
		Arc::new(Client::from(client)),
481
		backend,
482
		import_queue,
483
		task_manager,
484
	))
485
}
486

            
487
// If we're using prometheus, use a registry with a prefix of `moonbeam`.
488
933
fn set_prometheus_registry(
489
933
	config: &mut Configuration,
490
933
	skip_prefix: bool,
491
933
) -> Result<(), ServiceError> {
492
933
	if let Some(PrometheusConfig { registry, .. }) = config.prometheus_config.as_mut() {
493
3
		let labels = hashmap! {
494
3
			"chain".into() => config.chain_spec.id().into(),
495
		};
496
3
		let prefix = if skip_prefix {
497
1
			None
498
		} else {
499
2
			Some("moonbeam".into())
500
		};
501

            
502
3
		*registry = Registry::new_custom(prefix, Some(labels))?;
503
930
	}
504

            
505
933
	Ok(())
506
933
}
507

            
508
/// Builds the PartialComponents for a parachain or development service
509
///
510
/// Use this function if you don't actually need the full service, but just the partial in order to
511
/// be able to perform chain operations.
512
#[allow(clippy::type_complexity)]
513
930
pub fn new_partial<RuntimeApi, Customizations>(
514
930
	config: &mut Configuration,
515
930
	rpc_config: &RpcConfig,
516
930
	node_extra_args: NodeExtraArgs,
517
930
) -> PartialComponentsResult<FullClient<RuntimeApi>, FullBackend>
518
930
where
519
930
	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
520
930
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
521
930
	Customizations: ClientCustomizations + 'static,
522
{
523
930
	set_prometheus_registry(config, rpc_config.no_prometheus_prefix)?;
524

            
525
	// Use ethereum style for subscription ids
526
930
	config.rpc.id_provider = Some(Box::new(fc_rpc::EthereumSubIdProvider));
527

            
528
930
	let telemetry = config
529
930
		.telemetry_endpoints
530
930
		.clone()
531
930
		.filter(|x| !x.is_empty())
532
930
		.map(|endpoints| -> Result<_, sc_telemetry::Error> {
533
			let worker = TelemetryWorker::new(16)?;
534
			let telemetry = worker.handle().new_telemetry(endpoints);
535
			Ok((worker, telemetry))
536
		})
537
930
		.transpose()?;
538

            
539
930
	let heap_pages = config
540
930
		.executor
541
930
		.default_heap_pages
542
930
		.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
543
			extra_pages: h as _,
544
		});
545
930
	let mut wasm_builder = WasmExecutor::builder()
546
930
		.with_execution_method(config.executor.wasm_method)
547
930
		.with_onchain_heap_alloc_strategy(heap_pages)
548
930
		.with_offchain_heap_alloc_strategy(heap_pages)
549
930
		.with_ignore_onchain_heap_pages(true)
550
930
		.with_max_runtime_instances(config.executor.max_runtime_instances)
551
930
		.with_runtime_cache_size(config.executor.runtime_cache_size);
552

            
553
930
	if let Some(ref wasmtime_precompiled_path) = config.executor.wasmtime_precompiled {
554
928
		wasm_builder = wasm_builder.with_wasmtime_precompiled_path(wasmtime_precompiled_path);
555
928
	}
556

            
557
930
	let executor = wasm_builder.build();
558

            
559
930
	let (client, backend, keystore_container, task_manager) =
560
930
		sc_service::new_full_parts_record_import::<Block, RuntimeApi, _>(
561
930
			config,
562
930
			telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
563
930
			executor,
564
			true,
565
		)?;
566

            
567
930
	if let Some(block_number) = Customizations::first_block_number_compatible_with_ed25519_zebra() {
568
930
		client
569
930
			.execution_extensions()
570
930
			.set_extensions_factory(sc_client_api::execution_extensions::ExtensionBeforeBlock::<
571
930
			Block,
572
930
			sp_io::UseDalekExt,
573
930
		>::new(block_number));
574
930
	}
575

            
576
930
	let client = Arc::new(client);
577

            
578
930
	let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
579

            
580
930
	let telemetry = telemetry.map(|(worker, telemetry)| {
581
		task_manager
582
			.spawn_handle()
583
			.spawn("telemetry", None, worker.run());
584
		telemetry
585
	});
586

            
587
930
	let maybe_select_chain = if config.chain_spec.is_dev() {
588
930
		Some(sc_consensus::LongestChain::new(backend.clone()))
589
	} else {
590
		None
591
	};
592

            
593
930
	let transaction_pool = sc_transaction_pool::Builder::new(
594
930
		task_manager.spawn_essential_handle(),
595
930
		client.clone(),
596
930
		config.role.is_authority().into(),
597
	)
598
930
	.with_options(config.transaction_pool.clone())
599
930
	.with_prometheus(config.prometheus_registry())
600
930
	.build();
601

            
602
930
	let filter_pool: Option<FilterPool> = Some(Arc::new(Mutex::new(BTreeMap::new())));
603
930
	let fee_history_cache: FeeHistoryCache = Arc::new(Mutex::new(BTreeMap::new()));
604

            
605
930
	let frontier_backend = Arc::new(open_frontier_backend(client.clone(), config, rpc_config)?);
606
930
	let frontier_block_import = FrontierBlockImport::new(client.clone(), client.clone());
607
930
	let block_import = MoonbeamBlockImport::new(
608
930
		frontier_block_import.clone(),
609
930
		client.clone(),
610
930
		node_extra_args.authoring_policy,
611
930
		config.chain_spec.is_dev(),
612
	);
613

            
614
930
	let create_inherent_data_providers = move |_, _| async move {
615
		let time = sp_timestamp::InherentDataProvider::from_system_time();
616
		Ok((time,))
617
	};
618

            
619
930
	let import_queue = {
620
930
		let block_import_for_queue: Box<
621
930
			dyn sc_consensus::BlockImport<Block, Error = sp_consensus::Error> + Send + Sync,
622
930
		> = match &block_import {
623
930
			MoonbeamBlockImport::Dev(bi) => Box::new(bi.clone()),
624
			MoonbeamBlockImport::ParachainLookahead(bi) => {
625
				if node_extra_args.legacy_block_import_strategy {
626
					Box::new(TParachainBlockImport::new_with_delayed_best_block(
627
						bi.clone(),
628
						backend.clone(),
629
					))
630
				} else {
631
					Box::new(TParachainBlockImport::new(bi.clone(), backend.clone()))
632
				}
633
			}
634
			MoonbeamBlockImport::ParachainSlotBased(bi, _handle) => {
635
				if node_extra_args.legacy_block_import_strategy {
636
					Box::new(TParachainBlockImport::new_with_delayed_best_block(
637
						bi.clone(),
638
						backend.clone(),
639
					))
640
				} else {
641
					Box::new(TParachainBlockImport::new(bi.clone(), backend.clone()))
642
				}
643
			}
644
		};
645

            
646
930
		nimbus_consensus::import_queue(
647
930
			client.clone(),
648
930
			block_import_for_queue,
649
930
			create_inherent_data_providers,
650
930
			&task_manager.spawn_essential_handle(),
651
930
			config.prometheus_registry(),
652
930
			node_extra_args.legacy_block_import_strategy,
653
			false,
654
		)?
655
	};
656

            
657
930
	Ok(PartialComponents {
658
930
		backend,
659
930
		client,
660
930
		import_queue,
661
930
		keystore_container,
662
930
		task_manager,
663
930
		transaction_pool: transaction_pool.into(),
664
930
		select_chain: maybe_select_chain,
665
930
		other: (
666
930
			block_import,
667
930
			filter_pool,
668
930
			telemetry,
669
930
			telemetry_worker_handle,
670
930
			frontier_backend,
671
930
			fee_history_cache,
672
930
		),
673
930
	})
674
930
}
675

            
676
async fn build_relay_chain_interface(
677
	polkadot_config: Configuration,
678
	parachain_config: &Configuration,
679
	telemetry_worker_handle: Option<TelemetryWorkerHandle>,
680
	task_manager: &mut TaskManager,
681
	collator_options: CollatorOptions,
682
	hwbench: Option<sc_sysinfo::HwBench>,
683
) -> RelayChainResult<(
684
	Arc<(dyn RelayChainInterface + 'static)>,
685
	Option<CollatorPair>,
686
)> {
687
	let result = if let cumulus_client_cli::RelayChainMode::ExternalRpc(rpc_target_urls) =
688
		collator_options.relay_chain_mode
689
	{
690
		build_minimal_relay_chain_node_with_rpc(
691
			polkadot_config,
692
			parachain_config.prometheus_registry(),
693
			task_manager,
694
			rpc_target_urls,
695
		)
696
		.await
697
	} else {
698
		build_inprocess_relay_chain(
699
			polkadot_config,
700
			parachain_config,
701
			telemetry_worker_handle,
702
			task_manager,
703
			hwbench,
704
		)
705
	};
706

            
707
	// Extract only the first two elements from the 4-tuple
708
	result
709
		.map(|(relay_chain_interface, collator_pair, _, _)| (relay_chain_interface, collator_pair))
710
}
711

            
712
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
713
///
714
/// This is the actual implementation that is abstract over the executor and the runtime api.
715
#[sc_tracing::logging::prefix_logs_with("🌗")]
716
async fn start_node_impl<RuntimeApi, Customizations, Net>(
717
	parachain_config: Configuration,
718
	polkadot_config: Configuration,
719
	collator_options: CollatorOptions,
720
	para_id: ParaId,
721
	rpc_config: RpcConfig,
722
	block_authoring_duration: Duration,
723
	hwbench: Option<sc_sysinfo::HwBench>,
724
	node_extra_args: NodeExtraArgs,
725
) -> sc_service::error::Result<(TaskManager, Arc<FullClient<RuntimeApi>>)>
726
where
727
	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
728
	RuntimeApi::RuntimeApi:
729
		RuntimeApiCollection + cumulus_primitives_core::RelayParentOffsetApi<Block>,
730
	Customizations: ClientCustomizations + 'static,
731
	Net: NetworkBackend<Block, Hash>,
732
{
733
	let mut parachain_config = prepare_node_config(parachain_config);
734

            
735
	let params = new_partial::<RuntimeApi, Customizations>(
736
		&mut parachain_config,
737
		&rpc_config,
738
		node_extra_args.clone(),
739
	)?;
740
	let (
741
		block_import,
742
		filter_pool,
743
		mut telemetry,
744
		telemetry_worker_handle,
745
		frontier_backend,
746
		fee_history_cache,
747
	) = params.other;
748

            
749
	let client = params.client.clone();
750
	let backend = params.backend.clone();
751
	let mut task_manager = params.task_manager;
752

            
753
	let (relay_chain_interface, collator_key) = build_relay_chain_interface(
754
		polkadot_config,
755
		&parachain_config,
756
		telemetry_worker_handle,
757
		&mut task_manager,
758
		collator_options.clone(),
759
		hwbench.clone(),
760
	)
761
	.await
762
	.map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
763

            
764
	let force_authoring = parachain_config.force_authoring;
765
	let collator = parachain_config.role.is_authority();
766
	let prometheus_registry = parachain_config.prometheus_registry().cloned();
767
	let transaction_pool = params.transaction_pool.clone();
768
	let import_queue_service = params.import_queue.service();
769
	let net_config = FullNetworkConfiguration::<_, _, Net>::new(
770
		&parachain_config.network,
771
		prometheus_registry.clone(),
772
	);
773

            
774
	let (network, system_rpc_tx, tx_handler_controller, sync_service) =
775
		cumulus_client_service::build_network(cumulus_client_service::BuildNetworkParams {
776
			parachain_config: &parachain_config,
777
			client: client.clone(),
778
			transaction_pool: transaction_pool.clone(),
779
			spawn_handle: task_manager.spawn_handle(),
780
			import_queue: params.import_queue,
781
			para_id: para_id.clone(),
782
			relay_chain_interface: relay_chain_interface.clone(),
783
			net_config,
784
			sybil_resistance_level: CollatorSybilResistance::Resistant,
785
			metrics: Net::register_notification_metrics(
786
				parachain_config
787
					.prometheus_config
788
					.as_ref()
789
					.map(|config| &config.registry),
790
			),
791
		})
792
		.await?;
793

            
794
	let overrides = Arc::new(StorageOverrideHandler::new(client.clone()));
795
	let fee_history_limit = rpc_config.fee_history_limit;
796

            
797
	// Sinks for pubsub notifications.
798
	// Everytime a new subscription is created, a new mpsc channel is added to the sink pool.
799
	// The MappingSyncWorker sends through the channel on block import and the subscription emits a
800
	// notification to the subscriber on receiving a message through this channel.
801
	// This way we avoid race conditions when using native substrate block import notification
802
	// stream.
803
	let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
804
		fc_mapping_sync::EthereumBlockNotification<Block>,
805
	> = Default::default();
806
	let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
807

            
808
	rpc::spawn_essential_tasks(
809
		rpc::SpawnTasksParams {
810
			task_manager: &task_manager,
811
			client: client.clone(),
812
			substrate_backend: backend.clone(),
813
			frontier_backend: frontier_backend.clone(),
814
			filter_pool: filter_pool.clone(),
815
			overrides: overrides.clone(),
816
			fee_history_limit,
817
			fee_history_cache: fee_history_cache.clone(),
818
		},
819
		sync_service.clone(),
820
		pubsub_notification_sinks.clone(),
821
	);
822

            
823
	let ethapi_cmd = rpc_config.ethapi.clone();
824
	let tracing_requesters =
825
		if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
826
			rpc::tracing::spawn_tracing_tasks(
827
				&rpc_config,
828
				prometheus_registry.clone(),
829
				rpc::SpawnTasksParams {
830
					task_manager: &task_manager,
831
					client: client.clone(),
832
					substrate_backend: backend.clone(),
833
					frontier_backend: frontier_backend.clone(),
834
					filter_pool: filter_pool.clone(),
835
					overrides: overrides.clone(),
836
					fee_history_limit,
837
					fee_history_cache: fee_history_cache.clone(),
838
				},
839
			)
840
		} else {
841
			rpc::tracing::RpcRequesters {
842
				debug: None,
843
				trace: None,
844
			}
845
		};
846

            
847
	let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
848
		task_manager.spawn_handle(),
849
		overrides.clone(),
850
		rpc_config.eth_log_block_cache,
851
		rpc_config.eth_statuses_cache,
852
		prometheus_registry.clone(),
853
	));
854

            
855
	let rpc_builder = {
856
		let client = client.clone();
857
		let pool = transaction_pool.clone();
858
		let network = network.clone();
859
		let sync = sync_service.clone();
860
		let filter_pool = filter_pool.clone();
861
		let frontier_backend = frontier_backend.clone();
862
		let backend = backend.clone();
863
		let ethapi_cmd = ethapi_cmd.clone();
864
		let max_past_logs = rpc_config.max_past_logs;
865
		let max_block_range = rpc_config.max_block_range;
866
		let overrides = overrides.clone();
867
		let fee_history_cache = fee_history_cache.clone();
868
		let block_data_cache = block_data_cache.clone();
869
		let pubsub_notification_sinks = pubsub_notification_sinks.clone();
870

            
871
		let keystore = params.keystore_container.keystore();
872
		move |subscription_task_executor| {
873
			#[cfg(feature = "moonbase-native")]
874
			let forced_parent_hashes = {
875
				let mut forced_parent_hashes = BTreeMap::new();
876
				// Fixes for https://github.com/paritytech/frontier/pull/570
877
				// #1648995
878
				forced_parent_hashes.insert(
879
					H256::from_str(
880
						"0xa352fee3eef9c554a31ec0612af887796a920613358abf3353727760ea14207b",
881
					)
882
					.expect("must be valid hash"),
883
					H256::from_str(
884
						"0x0d0fd88778aec08b3a83ce36387dbf130f6f304fc91e9a44c9605eaf8a80ce5d",
885
					)
886
					.expect("must be valid hash"),
887
				);
888
				Some(forced_parent_hashes)
889
			};
890
			#[cfg(not(feature = "moonbase-native"))]
891
			let forced_parent_hashes = None;
892

            
893
			let deps = rpc::FullDeps {
894
				backend: backend.clone(),
895
				client: client.clone(),
896
				command_sink: None,
897
				ethapi_cmd: ethapi_cmd.clone(),
898
				filter_pool: filter_pool.clone(),
899
				frontier_backend: match &*frontier_backend {
900
					fc_db::Backend::KeyValue(b) => b.clone(),
901
					fc_db::Backend::Sql(b) => b.clone(),
902
				},
903
				graph: pool.clone(),
904
				pool: pool.clone(),
905
				is_authority: collator,
906
				max_past_logs,
907
				max_block_range,
908
				fee_history_limit,
909
				fee_history_cache: fee_history_cache.clone(),
910
				network: network.clone(),
911
				sync: sync.clone(),
912
				dev_rpc_data: None,
913
				block_data_cache: block_data_cache.clone(),
914
				overrides: overrides.clone(),
915
				forced_parent_hashes,
916
			};
917
			let pending_consensus_data_provider = Box::new(PendingConsensusDataProvider::new(
918
				client.clone(),
919
				keystore.clone(),
920
			));
921
			if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
922
				rpc::create_full(
923
					deps,
924
					subscription_task_executor,
925
					Some(crate::rpc::TracingConfig {
926
						tracing_requesters: tracing_requesters.clone(),
927
						trace_filter_max_count: rpc_config.ethapi_trace_max_count,
928
						max_block_range: rpc_config.max_block_range,
929
					}),
930
					pubsub_notification_sinks.clone(),
931
					pending_consensus_data_provider,
932
					para_id,
933
				)
934
				.map_err(Into::into)
935
			} else {
936
				rpc::create_full(
937
					deps,
938
					subscription_task_executor,
939
					None,
940
					pubsub_notification_sinks.clone(),
941
					pending_consensus_data_provider,
942
					para_id,
943
				)
944
				.map_err(Into::into)
945
			}
946
		}
947
	};
948

            
949
	sc_service::spawn_tasks(sc_service::SpawnTasksParams {
950
		rpc_builder: Box::new(rpc_builder),
951
		client: client.clone(),
952
		transaction_pool: transaction_pool.clone(),
953
		task_manager: &mut task_manager,
954
		config: parachain_config,
955
		keystore: params.keystore_container.keystore(),
956
		backend: backend.clone(),
957
		network: network.clone(),
958
		sync_service: sync_service.clone(),
959
		system_rpc_tx,
960
		tx_handler_controller,
961
		telemetry: telemetry.as_mut(),
962
		tracing_execute_block: Some(Arc::new(ParachainTracingExecuteBlock::new(client.clone()))),
963
	})?;
964

            
965
	if let Some(hwbench) = hwbench {
966
		sc_sysinfo::print_hwbench(&hwbench);
967

            
968
		if let Some(ref mut telemetry) = telemetry {
969
			let telemetry_handle = telemetry.handle();
970
			task_manager.spawn_handle().spawn(
971
				"telemetry_hwbench",
972
				None,
973
				sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
974
			);
975
		}
976
	}
977

            
978
	let announce_block = {
979
		let sync_service = sync_service.clone();
980
		Arc::new(move |hash, data| sync_service.announce_block(hash, data))
981
	};
982

            
983
	let relay_chain_slot_duration = Duration::from_secs(6);
984
	let overseer_handle = relay_chain_interface
985
		.overseer_handle()
986
		.map_err(|e| sc_service::Error::Application(Box::new(e)))?;
987

            
988
	start_relay_chain_tasks(StartRelayChainTasksParams {
989
		client: client.clone(),
990
		announce_block: announce_block.clone(),
991
		para_id,
992
		relay_chain_interface: relay_chain_interface.clone(),
993
		task_manager: &mut task_manager,
994
		da_recovery_profile: if collator {
995
			DARecoveryProfile::Collator
996
		} else {
997
			DARecoveryProfile::FullNode
998
		},
999
		import_queue: import_queue_service,
		relay_chain_slot_duration,
		recovery_handle: Box::new(overseer_handle.clone()),
		sync_service: sync_service.clone(),
		prometheus_registry: prometheus_registry.as_ref(),
	})?;
	if matches!(block_import, MoonbeamBlockImport::Dev(_)) {
		return Err(sc_service::Error::Other(
			"Block import pipeline is not for parachain".into(),
		));
	}
	if collator {
		start_consensus::<RuntimeApi, _>(
			backend.clone(),
			client.clone(),
			block_import,
			prometheus_registry.as_ref(),
			telemetry.as_ref().map(|t| t.handle()),
			&task_manager,
			relay_chain_interface.clone(),
			transaction_pool,
			params.keystore_container.keystore(),
			para_id,
			collator_key.expect("Command line arguments do not allow this. qed"),
			network.local_peer_id(),
			overseer_handle,
			announce_block,
			force_authoring,
			relay_chain_slot_duration,
			block_authoring_duration,
			sync_service.clone(),
			node_extra_args,
		)?;
	}
	Ok((task_manager, client))
}
fn start_consensus<RuntimeApi, SO>(
	backend: Arc<FullBackend>,
	client: Arc<FullClient<RuntimeApi>>,
	block_import: MoonbeamBlockImport<FullClient<RuntimeApi>>,
	prometheus_registry: Option<&Registry>,
	telemetry: Option<TelemetryHandle>,
	task_manager: &TaskManager,
	relay_chain_interface: Arc<dyn RelayChainInterface>,
	transaction_pool: Arc<
		sc_transaction_pool::TransactionPoolHandle<Block, FullClient<RuntimeApi>>,
	>,
	keystore: KeystorePtr,
	para_id: ParaId,
	collator_key: CollatorPair,
	collator_peer_id: PeerId,
	overseer_handle: OverseerHandle,
	announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
	force_authoring: bool,
	relay_chain_slot_duration: Duration,
	block_authoring_duration: Duration,
	sync_oracle: SO,
	node_extra_args: NodeExtraArgs,
) -> Result<(), sc_service::Error>
where
	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
	RuntimeApi::RuntimeApi:
		RuntimeApiCollection + cumulus_primitives_core::RelayParentOffsetApi<Block>,
	sc_client_api::StateBackendFor<FullBackend, Block>: sc_client_api::StateBackend<BlakeTwo256>,
	SO: SyncOracle + Send + Sync + Clone + 'static,
{
	let proposer = sc_basic_authorship::ProposerFactory::with_proof_recording(
		task_manager.spawn_handle(),
		client.clone(),
		transaction_pool,
		prometheus_registry,
		telemetry.clone(),
	);
	let collator_service = CollatorService::new(
		client.clone(),
		Arc::new(task_manager.spawn_handle()),
		announce_block,
		client.clone(),
	);
	fn create_inherent_data_providers<A, B>(
		_: A,
		_: B,
	) -> impl futures::Future<
		Output = Result<
			(
				nimbus_primitives::InherentDataProvider,
				session_keys_primitives::InherentDataProvider,
			),
			Box<dyn std::error::Error + Send + Sync>,
		>,
	> {
		async move {
			let author = nimbus_primitives::InherentDataProvider;
			let randomness = session_keys_primitives::InherentDataProvider;
			Ok((author, randomness))
		}
	}
	let client_clone = client.clone();
	let keystore_clone = keystore.clone();
	let maybe_provide_vrf_digest =
		move |nimbus_id: NimbusId, parent: Hash| -> Option<sp_runtime::generic::DigestItem> {
			moonbeam_vrf::vrf_pre_digest::<Block, FullClient<RuntimeApi>>(
				&client_clone,
				&keystore_clone,
				nimbus_id,
				parent,
			)
		};
	log::info!("Collator started with asynchronous backing.");
	let client_clone = client.clone();
	let code_hash_provider = move |block_hash| {
		client_clone
			.code_at(block_hash)
			.ok()
			.map(polkadot_primitives::ValidationCode)
			.map(|c| c.hash())
	};
	match block_import {
		MoonbeamBlockImport::ParachainLookahead(bi) => {
			let block_import = if node_extra_args.legacy_block_import_strategy {
				TParachainBlockImport::new_with_delayed_best_block(bi, backend.clone())
			} else {
				TParachainBlockImport::new(bi, backend.clone())
			};
			let params = nimbus_consensus::collators::lookahead::Params {
				additional_digests_provider: maybe_provide_vrf_digest,
				additional_relay_state_keys: vec![
					relay_chain::well_known_keys::EPOCH_INDEX.to_vec()
				],
				authoring_duration: block_authoring_duration,
				block_import,
				code_hash_provider,
				collator_key,
				collator_peer_id,
				collator_service,
				create_inherent_data_providers,
				force_authoring,
				keystore,
				overseer_handle,
				para_backend: backend,
				para_client: client,
				para_id,
				proposer,
				relay_chain_slot_duration,
				relay_client: relay_chain_interface,
				slot_duration: None,
				sync_oracle,
				reinitialize: false,
				max_pov_percentage: node_extra_args.max_pov_percentage,
			};
			task_manager.spawn_essential_handle().spawn(
				"nimbus",
				None,
				nimbus_consensus::collators::lookahead::run::<
					Block,
					_,
					_,
					_,
					FullBackend,
					_,
					_,
					_,
					_,
					_,
					_,
				>(params),
			);
		}
		MoonbeamBlockImport::ParachainSlotBased(bi, handle) => {
			let block_import = if node_extra_args.legacy_block_import_strategy {
				TParachainBlockImport::new_with_delayed_best_block(bi, backend.clone())
			} else {
				TParachainBlockImport::new(bi, backend.clone())
			};
			nimbus_consensus::collators::slot_based::run::<
				Block,
				nimbus_primitives::NimbusPair,
				_,
				_,
				_,
				FullBackend,
				_,
				_,
				_,
				_,
				_,
				_,
			>(nimbus_consensus::collators::slot_based::Params {
				additional_digests_provider: maybe_provide_vrf_digest,
				additional_relay_state_keys: vec![
					relay_chain::well_known_keys::EPOCH_INDEX.to_vec()
				],
				authoring_duration: block_authoring_duration,
				block_import,
				code_hash_provider,
				collator_key,
				collator_peer_id,
				collator_service,
				create_inherent_data_providers: move |b, a| async move {
					create_inherent_data_providers(b, a).await
				},
				force_authoring,
				keystore,
				para_backend: backend,
				para_client: client,
				para_id,
				proposer,
				relay_chain_slot_duration,
				relay_client: relay_chain_interface,
				para_slot_duration: None,
				reinitialize: false,
				max_pov_percentage: node_extra_args.max_pov_percentage.map(|p| p as u32),
				export_pov: node_extra_args.export_pov,
				slot_offset: Duration::from_secs(1),
				spawner: task_manager.spawn_essential_handle(),
				block_import_handle: handle,
			});
		}
		MoonbeamBlockImport::Dev(_) => {
			return Err(sc_service::Error::Other(
				"Dev block import should not be used in parachain consensus".into(),
			))
		}
	}
	Ok(())
}
/// Start a normal parachain node.
// Rustfmt wants to format the closure with space indentation.
#[rustfmt::skip]
pub async fn start_node<RuntimeApi, Customizations>(
	parachain_config: Configuration,
	polkadot_config: Configuration,
	collator_options: CollatorOptions,
	para_id: ParaId,
	rpc_config: RpcConfig,
	block_authoring_duration: Duration,
	hwbench: Option<sc_sysinfo::HwBench>,
	node_extra_args: NodeExtraArgs,
) -> sc_service::error::Result<(TaskManager, Arc<FullClient<RuntimeApi>>)>
where
	RuntimeApi:
		ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
	RuntimeApi::RuntimeApi:
		RuntimeApiCollection + cumulus_primitives_core::RelayParentOffsetApi<Block>,
	Customizations: ClientCustomizations + 'static,
{
	match parachain_config.network.network_backend {
			sc_network::config::NetworkBackendType::Libp2p =>
				start_node_impl::<RuntimeApi, Customizations, sc_network::NetworkWorker<_, _>>(
					parachain_config,
					polkadot_config,
					collator_options,
					para_id,
					rpc_config,
					block_authoring_duration,
					hwbench,
					node_extra_args
				).await,
			sc_network::config::NetworkBackendType::Litep2p =>
				start_node_impl::<RuntimeApi, Customizations, sc_network::Litep2pNetworkBackend>(
					parachain_config,
					polkadot_config,
					collator_options,
					para_id,
					rpc_config,
					block_authoring_duration,
					hwbench,
					node_extra_args
				).await
		}
}
/// Builds a new development service. This service uses manual seal, and mocks
/// the parachain inherent.
928
pub async fn new_dev<RuntimeApi, Customizations, Net>(
928
	mut config: Configuration,
928
	para_id: Option<u32>,
928
	_author_id: Option<NimbusId>,
928
	sealing: moonbeam_cli_opt::Sealing,
928
	rpc_config: RpcConfig,
928
	hwbench: Option<sc_sysinfo::HwBench>,
928
	node_extra_args: NodeExtraArgs,
928
) -> Result<TaskManager, ServiceError>
928
where
928
	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
928
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
928
	Customizations: ClientCustomizations + 'static,
928
	Net: NetworkBackend<Block, Hash>,
928
{
	use async_io::Timer;
	use futures::Stream;
	use sc_consensus_manual_seal::{run_manual_seal, EngineCommand, ManualSealParams};
	let sc_service::PartialComponents {
928
		client,
928
		backend,
928
		mut task_manager,
928
		import_queue,
928
		keystore_container,
928
		select_chain: maybe_select_chain,
928
		transaction_pool,
		other:
			(
928
				block_import_pipeline,
928
				filter_pool,
928
				mut telemetry,
928
				_telemetry_worker_handle,
928
				frontier_backend,
928
				fee_history_cache,
			),
928
	} = new_partial::<RuntimeApi, Customizations>(&mut config, &rpc_config, node_extra_args)?;
928
	let block_import = if let MoonbeamBlockImport::Dev(block_import) = block_import_pipeline {
928
		block_import
	} else {
		return Err(ServiceError::Other(
			"Block import pipeline is not dev".to_string(),
		));
	};
928
	let prometheus_registry = config.prometheus_registry().cloned();
928
	let net_config =
928
		FullNetworkConfiguration::<_, _, Net>::new(&config.network, prometheus_registry.clone());
928
	let metrics = Net::register_notification_metrics(
928
		config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
	);
928
	let (network, system_rpc_tx, tx_handler_controller, sync_service) =
928
		sc_service::build_network(sc_service::BuildNetworkParams {
928
			config: &config,
928
			client: client.clone(),
928
			transaction_pool: transaction_pool.clone(),
928
			spawn_handle: task_manager.spawn_handle(),
928
			import_queue,
928
			block_announce_validator_builder: None,
928
			warp_sync_config: None,
928
			net_config,
928
			block_relay: None,
928
			metrics,
928
		})?;
928
	if config.offchain_worker.enabled {
928
		task_manager.spawn_handle().spawn(
			"offchain-workers-runner",
			"offchain-work",
928
			sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
928
				runtime_api_provider: client.clone(),
928
				keystore: Some(keystore_container.keystore()),
928
				offchain_db: backend.offchain_storage(),
928
				transaction_pool: Some(OffchainTransactionPoolFactory::new(
928
					transaction_pool.clone(),
928
				)),
928
				network_provider: Arc::new(network.clone()),
928
				is_validator: config.role.is_authority(),
				enable_http_requests: true,
				custom_extensions: move |_| vec![],
			})?
928
			.run(client.clone(), task_manager.spawn_handle())
928
			.boxed(),
		);
	}
928
	let prometheus_registry = config.prometheus_registry().cloned();
928
	let overrides = Arc::new(StorageOverrideHandler::new(client.clone()));
928
	let fee_history_limit = rpc_config.fee_history_limit;
928
	let mut command_sink = None;
928
	let mut dev_rpc_data = None;
928
	let collator = config.role.is_authority();
928
	let parachain_id: ParaId = para_id
928
		.expect("para ID should be specified for dev service")
928
		.into();
928
	if collator {
928
		let mut env = sc_basic_authorship::ProposerFactory::with_proof_recording(
928
			task_manager.spawn_handle(),
928
			client.clone(),
928
			transaction_pool.clone(),
928
			prometheus_registry.as_ref(),
928
			telemetry.as_ref().map(|x| x.handle()),
		);
928
		env.set_soft_deadline(SOFT_DEADLINE_PERCENT);
928
		let commands_stream: Box<dyn Stream<Item = EngineCommand<H256>> + Send + Sync + Unpin> =
928
			match sealing {
				moonbeam_cli_opt::Sealing::Instant => {
					Box::new(
						// This bit cribbed from the implementation of instant seal.
						transaction_pool.import_notification_stream().map(|_| {
							EngineCommand::SealNewBlock {
								create_empty: false,
								finalize: false,
								parent_hash: None,
								sender: None,
							}
						}),
					)
				}
				moonbeam_cli_opt::Sealing::Manual => {
928
					let (sink, stream) = futures::channel::mpsc::channel(1000);
					// Keep a reference to the other end of the channel. It goes to the RPC.
928
					command_sink = Some(sink);
928
					Box::new(stream)
				}
				moonbeam_cli_opt::Sealing::Interval(millis) => Box::new(StreamExt::map(
					Timer::interval(Duration::from_millis(millis)),
					|_| EngineCommand::SealNewBlock {
						create_empty: true,
						finalize: false,
						parent_hash: None,
						sender: None,
					},
				)),
			};
928
		let select_chain = maybe_select_chain.expect(
928
			"`new_partial` builds a `LongestChainRule` when building dev service.\
928
				We specified the dev service when calling `new_partial`.\
928
				Therefore, a `LongestChainRule` is present. qed.",
		);
		// Create channels for mocked XCM messages.
928
		let (downward_xcm_sender, downward_xcm_receiver) = flume::bounded::<Vec<u8>>(100);
928
		let (hrmp_xcm_sender, hrmp_xcm_receiver) = flume::bounded::<(ParaId, Vec<u8>)>(100);
928
		let additional_relay_offset = Arc::new(std::sync::atomic::AtomicU32::new(0));
928
		dev_rpc_data = Some((
928
			downward_xcm_sender,
928
			hrmp_xcm_sender,
928
			additional_relay_offset.clone(),
928
		));
		// Need to clone it and store here to avoid moving of `client`
		// variable in closure below.
928
		let client_vrf = client.clone();
928
		let keystore_clone = keystore_container.keystore().clone();
928
		let maybe_provide_vrf_digest =
29082
			move |nimbus_id: NimbusId, parent: Hash| -> Option<sp_runtime::generic::DigestItem> {
29082
				moonbeam_vrf::vrf_pre_digest::<Block, FullClient<RuntimeApi>>(
29082
					&client_vrf,
29082
					&keystore_clone,
29082
					nimbus_id,
29082
					parent,
				)
29082
			};
		// Need to clone it and store here to avoid moving of `client`
		// variable in closure below.
928
		let client_for_cidp = client.clone();
928
		task_manager.spawn_essential_handle().spawn_blocking(
			"authorship_task",
928
			Some("block-authoring"),
928
			run_manual_seal(ManualSealParams {
928
				block_import,
928
				env,
928
				client: client.clone(),
928
				pool: transaction_pool.clone(),
928
				commands_stream,
928
				select_chain,
928
				consensus_data_provider: Some(Box::new(NimbusManualSealConsensusDataProvider {
928
					keystore: keystore_container.keystore(),
928
					client: client.clone(),
928
					additional_digests_provider: maybe_provide_vrf_digest,
928
					_phantom: Default::default(),
928
				})),
29082
				create_inherent_data_providers: move |block: H256, ()| {
29082
					let maybe_current_para_block = client_for_cidp.number(block);
29082
					let maybe_current_para_head = client_for_cidp.expect_header(block);
29082
					let downward_xcm_receiver = downward_xcm_receiver.clone();
29082
					let hrmp_xcm_receiver = hrmp_xcm_receiver.clone();
29082
					let additional_relay_offset = additional_relay_offset.clone();
29082
					let relay_slot_key = well_known_keys::CURRENT_SLOT.to_vec();
					// Need to clone it and store here to avoid moving of `client`
					// variable in closure below.
29082
					let client_for_xcm = client_for_cidp.clone();
29082
					async move {
29082
						MockTimestampInherentDataProvider::advance_timestamp(
							RELAY_CHAIN_SLOT_DURATION_MILLIS,
						);
29082
						let current_para_block = maybe_current_para_block?
29082
							.ok_or(sp_blockchain::Error::UnknownBlock(block.to_string()))?;
29082
						let current_para_block_head = Some(polkadot_primitives::HeadData(
29082
							maybe_current_para_head?.encode(),
						));
						// Get the mocked timestamp
29082
						let timestamp = TIMESTAMP.load(Ordering::SeqCst);
						// Calculate mocked slot number
29082
						let slot = timestamp.saturating_div(RELAY_CHAIN_SLOT_DURATION_MILLIS);
29082
						let additional_key_values = vec![
29082
							(relay_slot_key, Slot::from(slot).encode()),
29082
							(
29082
								relay_chain::well_known_keys::ACTIVE_CONFIG.to_vec(),
29082
								AbridgedHostConfiguration {
29082
									max_code_size: 3_145_728,
29082
									max_head_data_size: 20_480,
29082
									max_upward_queue_count: 174_762,
29082
									max_upward_queue_size: 1_048_576,
29082
									max_upward_message_size: 65_531,
29082
									max_upward_message_num_per_candidate: 16,
29082
									hrmp_max_message_num_per_candidate: 10,
29082
									validation_upgrade_cooldown: 6,
29082
									validation_upgrade_delay: 6,
29082
									async_backing_params: AsyncBackingParams {
29082
										max_candidate_depth: 3,
29082
										allowed_ancestry_len: 2,
29082
									},
29082
								}
29082
								.encode(),
29082
							),
						];
29082
						let current_para_head = client_for_xcm
29082
							.header(block)
29082
							.expect("Header lookup should succeed")
29082
							.expect("Header passed in as parent should be present in backend.");
29082
						let should_send_go_ahead = match client_for_xcm
29082
							.runtime_api()
29082
							.collect_collation_info(block, &current_para_head)
						{
29082
							Ok(info) => info.new_validation_code.is_some(),
							Err(e) => {
								log::error!("Failed to collect collation info: {:?}", e);
								false
							}
						};
29082
						let mocked_parachain = MockValidationDataInherentDataProvider {
29082
							current_para_block,
29082
							para_id: parachain_id,
29082
							upgrade_go_ahead: should_send_go_ahead.then(|| {
								log::info!(
									"Detected pending validation code, sending go-ahead signal."
								);
								UpgradeGoAhead::GoAhead
							}),
29082
							current_para_block_head,
29082
							relay_offset: additional_relay_offset.load(Ordering::SeqCst),
							relay_blocks_per_para_block: 1,
							para_blocks_per_relay_epoch: 10,
29082
							relay_randomness_config: (),
29082
							xcm_config: MockXcmConfig::new(
29082
								&*client_for_xcm,
29082
								block,
29082
								Default::default(),
							),
29082
							raw_downward_messages: downward_xcm_receiver.drain().collect(),
29082
							raw_horizontal_messages: hrmp_xcm_receiver.drain().collect(),
29082
							additional_key_values: Some(additional_key_values),
						};
29082
						let randomness = session_keys_primitives::InherentDataProvider;
29082
						Ok((
29082
							MockTimestampInherentDataProvider,
29082
							mocked_parachain,
29082
							randomness,
29082
						))
29082
					}
29082
				},
			}),
		);
	}
	// Sinks for pubsub notifications.
	// Everytime a new subscription is created, a new mpsc channel is added to the sink pool.
	// The MappingSyncWorker sends through the channel on block import and the subscription emits a
	// notification to the subscriber on receiving a message through this channel.
	// This way we avoid race conditions when using native substrate block import notification
	// stream.
928
	let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
928
		fc_mapping_sync::EthereumBlockNotification<Block>,
928
	> = Default::default();
928
	let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
928
	rpc::spawn_essential_tasks(
928
		rpc::SpawnTasksParams {
928
			task_manager: &task_manager,
928
			client: client.clone(),
928
			substrate_backend: backend.clone(),
928
			frontier_backend: frontier_backend.clone(),
928
			filter_pool: filter_pool.clone(),
928
			overrides: overrides.clone(),
928
			fee_history_limit,
928
			fee_history_cache: fee_history_cache.clone(),
928
		},
928
		sync_service.clone(),
928
		pubsub_notification_sinks.clone(),
	);
928
	let ethapi_cmd = rpc_config.ethapi.clone();
928
	let tracing_requesters =
928
		if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
			rpc::tracing::spawn_tracing_tasks(
				&rpc_config,
				prometheus_registry.clone(),
				rpc::SpawnTasksParams {
					task_manager: &task_manager,
					client: client.clone(),
					substrate_backend: backend.clone(),
					frontier_backend: frontier_backend.clone(),
					filter_pool: filter_pool.clone(),
					overrides: overrides.clone(),
					fee_history_limit,
					fee_history_cache: fee_history_cache.clone(),
				},
			)
		} else {
928
			rpc::tracing::RpcRequesters {
928
				debug: None,
928
				trace: None,
928
			}
		};
928
	let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
928
		task_manager.spawn_handle(),
928
		overrides.clone(),
928
		rpc_config.eth_log_block_cache,
928
		rpc_config.eth_statuses_cache,
928
		prometheus_registry,
	));
928
	let rpc_builder = {
928
		let client = client.clone();
928
		let pool = transaction_pool.clone();
928
		let backend = backend.clone();
928
		let network = network.clone();
928
		let sync = sync_service.clone();
928
		let ethapi_cmd = ethapi_cmd.clone();
928
		let max_past_logs = rpc_config.max_past_logs;
928
		let max_block_range = rpc_config.max_block_range;
928
		let overrides = overrides.clone();
928
		let fee_history_cache = fee_history_cache.clone();
928
		let block_data_cache = block_data_cache.clone();
928
		let pubsub_notification_sinks = pubsub_notification_sinks.clone();
928
		let keystore = keystore_container.keystore();
1856
		move |subscription_task_executor| {
1856
			let deps = rpc::FullDeps {
1856
				backend: backend.clone(),
1856
				client: client.clone(),
1856
				command_sink: command_sink.clone(),
1856
				ethapi_cmd: ethapi_cmd.clone(),
1856
				filter_pool: filter_pool.clone(),
1856
				frontier_backend: match &*frontier_backend {
1856
					fc_db::Backend::KeyValue(b) => b.clone(),
					fc_db::Backend::Sql(b) => b.clone(),
				},
1856
				graph: pool.clone(),
1856
				pool: pool.clone(),
1856
				is_authority: collator,
1856
				max_past_logs,
1856
				max_block_range,
1856
				fee_history_limit,
1856
				fee_history_cache: fee_history_cache.clone(),
1856
				network: network.clone(),
1856
				sync: sync.clone(),
1856
				dev_rpc_data: dev_rpc_data.clone(),
1856
				overrides: overrides.clone(),
1856
				block_data_cache: block_data_cache.clone(),
1856
				forced_parent_hashes: None,
			};
1856
			let pending_consensus_data_provider = Box::new(PendingConsensusDataProvider::new(
1856
				client.clone(),
1856
				keystore.clone(),
			));
1856
			if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
				rpc::create_full(
					deps,
					subscription_task_executor,
					Some(crate::rpc::TracingConfig {
						tracing_requesters: tracing_requesters.clone(),
						trace_filter_max_count: rpc_config.ethapi_trace_max_count,
						max_block_range: rpc_config.max_block_range,
					}),
					pubsub_notification_sinks.clone(),
					pending_consensus_data_provider,
					parachain_id,
				)
				.map_err(Into::into)
			} else {
1856
				rpc::create_full(
1856
					deps,
1856
					subscription_task_executor,
1856
					None,
1856
					pubsub_notification_sinks.clone(),
1856
					pending_consensus_data_provider,
1856
					parachain_id,
				)
1856
				.map_err(Into::into)
			}
1856
		}
	};
928
	let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
928
		network,
928
		client: client.clone(),
928
		keystore: keystore_container.keystore(),
928
		task_manager: &mut task_manager,
928
		transaction_pool,
928
		rpc_builder: Box::new(rpc_builder),
928
		backend,
928
		system_rpc_tx,
928
		sync_service: sync_service.clone(),
928
		config,
928
		tx_handler_controller,
928
		telemetry: None,
928
		tracing_execute_block: Some(Arc::new(ParachainTracingExecuteBlock::new(client.clone()))),
928
	})?;
928
	if let Some(hwbench) = hwbench {
		sc_sysinfo::print_hwbench(&hwbench);
		if let Some(ref mut telemetry) = telemetry {
			let telemetry_handle = telemetry.handle();
			task_manager.spawn_handle().spawn(
				"telemetry_hwbench",
				None,
				sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
			);
		}
928
	}
928
	log::info!("Development Service Ready");
928
	Ok(task_manager)
928
}
#[cfg(test)]
mod tests {
	use crate::chain_spec::moonbase::ChainSpec;
	use crate::chain_spec::Extensions;
	use jsonrpsee::server::BatchRequestConfig;
	use moonbase_runtime::AccountId;
	use prometheus::{proto::LabelPair, Counter};
	use sc_network::config::NetworkConfiguration;
	use sc_service::config::RpcConfiguration;
	use sc_service::ChainType;
	use sc_service::{
		config::{BasePath, DatabaseSource, KeystoreConfig},
		Configuration, Role,
	};
	use std::path::Path;
	use std::str::FromStr;
	use super::*;
	#[test]
1
	fn test_set_prometheus_registry_uses_moonbeam_prefix() {
1
		let counter_name = "my_counter";
1
		let expected_metric_name = "moonbeam_my_counter";
1
		let counter = Box::new(Counter::new(counter_name, "foobar").unwrap());
1
		let mut config = Configuration {
1
			prometheus_config: Some(PrometheusConfig::new_with_default_registry(
1
				"0.0.0.0:8080".parse().unwrap(),
1
				"".into(),
1
			)),
1
			..test_config("test")
1
		};
1
		set_prometheus_registry(&mut config, false).unwrap();
		// generate metric
1
		let reg = config.prometheus_registry().unwrap();
1
		reg.register(counter.clone()).unwrap();
1
		counter.inc();
1
		let actual_metric_name = reg.gather().first().unwrap().get_name().to_string();
1
		assert_eq!(actual_metric_name.as_str(), expected_metric_name);
1
	}
	#[test]
1
	fn test_set_prometheus_registry_skips_moonbeam_prefix() {
1
		let counter_name = "my_counter";
1
		let counter = Box::new(Counter::new(counter_name, "foobar").unwrap());
1
		let mut config = Configuration {
1
			prometheus_config: Some(PrometheusConfig::new_with_default_registry(
1
				"0.0.0.0:8080".parse().unwrap(),
1
				"".into(),
1
			)),
1
			..test_config("test")
1
		};
1
		set_prometheus_registry(&mut config, true).unwrap();
		// generate metric
1
		let reg = config.prometheus_registry().unwrap();
1
		reg.register(counter.clone()).unwrap();
1
		counter.inc();
1
		let actual_metric_name = reg.gather().first().unwrap().get_name().to_string();
1
		assert_eq!(actual_metric_name.as_str(), counter_name);
1
	}
	#[test]
1
	fn test_set_prometheus_registry_adds_chain_id_as_label() {
1
		let input_chain_id = "moonriver";
1
		let mut expected_label = LabelPair::default();
1
		expected_label.set_name("chain".to_owned());
1
		expected_label.set_value("moonriver".to_owned());
1
		let expected_chain_label = Some(expected_label);
1
		let counter = Box::new(Counter::new("foo", "foobar").unwrap());
1
		let mut config = Configuration {
1
			prometheus_config: Some(PrometheusConfig::new_with_default_registry(
1
				"0.0.0.0:8080".parse().unwrap(),
1
				"".into(),
1
			)),
1
			..test_config(input_chain_id)
1
		};
1
		set_prometheus_registry(&mut config, false).unwrap();
		// generate metric
1
		let reg = config.prometheus_registry().unwrap();
1
		reg.register(counter.clone()).unwrap();
1
		counter.inc();
1
		let actual_chain_label = reg
1
			.gather()
1
			.first()
1
			.unwrap()
1
			.get_metric()
1
			.first()
1
			.unwrap()
1
			.get_label()
1
			.into_iter()
1
			.find(|x| x.get_name() == "chain")
1
			.cloned();
1
		assert_eq!(actual_chain_label, expected_chain_label);
1
	}
	#[test]
1
	fn dalek_does_not_panic() {
		use futures::executor::block_on;
		use sc_block_builder::BlockBuilderBuilder;
		use sc_client_db::{Backend, BlocksPruning, DatabaseSettings, DatabaseSource, PruningMode};
		use sp_api::ProvideRuntimeApi;
		use sp_consensus::BlockOrigin;
		use substrate_test_runtime::TestAPI;
		use substrate_test_runtime_client::runtime::Block;
		use substrate_test_runtime_client::{
			ClientBlockImportExt, TestClientBuilder, TestClientBuilderExt,
		};
1
		fn zero_ed_pub() -> sp_core::ed25519::Public {
1
			sp_core::ed25519::Public::default()
1
		}
		// This is an invalid signature
		// this breaks after ed25519 1.3. It makes the signature panic at creation
		// This test ensures we should never panic
1
		fn invalid_sig() -> sp_core::ed25519::Signature {
1
			let signature = hex_literal::hex!(
				"a25b94f9c64270fdfffa673f11cfe961633e3e4972e6940a3cf
		7351dd90b71447041a83583a52cee1cf21b36ba7fd1d0211dca58b48d997fc78d9bc82ab7a38e"
			);
1
			sp_core::ed25519::Signature::from_raw(signature[0..64].try_into().unwrap())
1
		}
1
		let tmp = tempfile::tempdir().unwrap();
1
		let backend = Arc::new(
1
			Backend::new(
1
				DatabaseSettings {
1
					trie_cache_maximum_size: Some(1 << 20),
1
					state_pruning: Some(PruningMode::ArchiveAll),
1
					blocks_pruning: BlocksPruning::KeepAll,
1
					source: DatabaseSource::RocksDb {
1
						path: tmp.path().into(),
1
						cache_size: 1024,
1
					},
1
					metrics_registry: None,
1
				},
				u64::MAX,
			)
1
			.unwrap(),
		);
1
		let client = TestClientBuilder::with_backend(backend).build();
1
		client
1
			.execution_extensions()
1
			.set_extensions_factory(sc_client_api::execution_extensions::ExtensionBeforeBlock::<
1
			Block,
1
			sp_io::UseDalekExt,
1
		>::new(1));
1
		let a1 = BlockBuilderBuilder::new(&client)
1
			.on_parent_block(client.chain_info().genesis_hash)
1
			.with_parent_block_number(0)
1
			// Enable proof recording if required. This call is optional.
1
			.enable_proof_recording()
1
			.build()
1
			.unwrap()
1
			.build()
1
			.unwrap()
1
			.block;
1
		block_on(client.import(BlockOrigin::NetworkInitialSync, a1.clone())).unwrap();
		// On block zero it will use dalek
		// shouldnt panic on importing invalid sig
1
		assert!(!client
1
			.runtime_api()
1
			.verify_ed25519(
1
				client.chain_info().genesis_hash,
1
				invalid_sig(),
1
				zero_ed_pub(),
1
				vec![]
1
			)
1
			.unwrap());
1
	}
3
	fn test_config(chain_id: &str) -> Configuration {
3
		let network_config = NetworkConfiguration::new("", "", Default::default(), None);
3
		let runtime = tokio::runtime::Runtime::new().expect("failed creating tokio runtime");
3
		let spec = ChainSpec::builder(&[0u8], Extensions::default())
3
			.with_name("test")
3
			.with_id(chain_id)
3
			.with_chain_type(ChainType::Local)
3
			.with_genesis_config(moonbase_runtime::genesis_config_preset::testnet_genesis(
3
				AccountId::from_str("6Be02d1d3665660d22FF9624b7BE0551ee1Ac91b").unwrap(),
3
				vec![],
3
				vec![],
3
				vec![],
3
				vec![],
3
				vec![],
3
				ParaId::new(0),
				0,
			))
3
			.build();
3
		Configuration {
3
			impl_name: String::from("test-impl"),
3
			impl_version: String::from("0.1"),
3
			role: Role::Full,
3
			tokio_handle: runtime.handle().clone(),
3
			transaction_pool: Default::default(),
3
			network: network_config,
3
			keystore: KeystoreConfig::Path {
3
				path: "key".into(),
3
				password: None,
3
			},
3
			database: DatabaseSource::RocksDb {
3
				path: "db".into(),
3
				cache_size: 128,
3
			},
3
			trie_cache_maximum_size: Some(16777216),
3
			warm_up_trie_cache: None,
3
			state_pruning: Default::default(),
3
			blocks_pruning: sc_service::BlocksPruning::KeepAll,
3
			chain_spec: Box::new(spec),
3
			executor: Default::default(),
3
			wasm_runtime_overrides: Default::default(),
3
			rpc: RpcConfiguration {
3
				addr: None,
3
				max_connections: Default::default(),
3
				cors: None,
3
				methods: Default::default(),
3
				max_request_size: Default::default(),
3
				max_response_size: Default::default(),
3
				id_provider: None,
3
				max_subs_per_conn: Default::default(),
3
				port: Default::default(),
3
				message_buffer_capacity: Default::default(),
3
				batch_config: BatchRequestConfig::Unlimited,
3
				rate_limit: Default::default(),
3
				rate_limit_whitelisted_ips: vec![],
3
				rate_limit_trust_proxy_headers: false,
3
				request_logger_limit: 1024,
3
			},
3
			data_path: Default::default(),
3
			prometheus_config: None,
3
			telemetry_endpoints: None,
3
			offchain_worker: Default::default(),
3
			force_authoring: false,
3
			disable_grandpa: false,
3
			dev_key_seed: None,
3
			tracing_targets: None,
3
			tracing_receiver: Default::default(),
3
			announce_block: true,
3
			base_path: BasePath::new(Path::new("")),
3
		}
3
	}
}
struct PendingConsensusDataProvider<Client>
where
	Client: HeaderBackend<Block> + sp_api::ProvideRuntimeApi<Block> + Send + Sync,
	Client::Api: VrfApi<Block>,
{
	client: Arc<Client>,
	keystore: Arc<dyn Keystore>,
}
impl<Client> PendingConsensusDataProvider<Client>
where
	Client: HeaderBackend<Block> + sp_api::ProvideRuntimeApi<Block> + Send + Sync,
	Client::Api: VrfApi<Block>,
{
1856
	pub fn new(client: Arc<Client>, keystore: Arc<dyn Keystore>) -> Self {
1856
		Self { client, keystore }
1856
	}
}
impl<Client> fc_rpc::pending::ConsensusDataProvider<Block> for PendingConsensusDataProvider<Client>
where
	Client: HeaderBackend<Block> + sp_api::ProvideRuntimeApi<Block> + Send + Sync,
	Client::Api: VrfApi<Block>,
{
8
	fn create_digest(
8
		&self,
8
		parent: &Header,
8
		_data: &sp_inherents::InherentData,
8
	) -> Result<sp_runtime::Digest, sp_inherents::Error> {
8
		let hash = parent.hash();
		// Get the digest from the best block header.
8
		let mut digest = self
8
			.client
8
			.header(hash)
8
			.map_err(|e| sp_inherents::Error::Application(Box::new(e)))?
8
			.ok_or(sp_inherents::Error::Application(
8
				"Best block header should be present".into(),
8
			))?
			.digest;
		// Get the nimbus id from the digest.
8
		let nimbus_id = digest
8
			.logs
8
			.iter()
8
			.find_map(|x| {
8
				if let DigestItem::PreRuntime(nimbus_primitives::NIMBUS_ENGINE_ID, nimbus_id) = x {
8
					Some(NimbusId::from_slice(nimbus_id.as_slice()).map_err(|_| {
						sp_inherents::Error::Application(
							"Nimbus pre-runtime digest should be valid".into(),
						)
					}))
				} else {
					None
				}
8
			})
8
			.ok_or(sp_inherents::Error::Application(
8
				"Nimbus pre-runtime digest should be present".into(),
8
			))??;
		// Remove the old VRF digest.
16
		let pos = digest.logs.iter().position(|x| {
8
			matches!(
16
				x,
				DigestItem::PreRuntime(session_keys_primitives::VRF_ENGINE_ID, _)
			)
16
		});
8
		if let Some(pos) = pos {
8
			digest.logs.remove(pos);
8
		}
		// Create the VRF digest.
8
		let vrf_digest = VrfDigestsProvider::new(self.client.clone(), self.keystore.clone())
8
			.provide_digests(nimbus_id, hash);
		// Append the VRF digest to the digest.
8
		digest.logs.extend(vrf_digest);
8
		Ok(digest)
8
	}
}