1
// Copyright 2025 Moonbeam foundation
2
// This file is part of Moonbeam.
3

            
4
// Moonbeam is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Moonbeam is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Moonbeam.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
use core::marker::PhantomData;
18
use cumulus_primitives_core::AggregateMessageOrigin;
19
use frame_support::pallet_prelude::Get;
20
use frame_support::traits::{EnqueueMessage, ProcessMessage};
21
use frame_support::{ensure, BoundedVec};
22
use pallet_xcm_bridge::BridgeId;
23
use parity_scale_codec::{Decode, Encode};
24
use sp_std::vec::Vec;
25
use xcm::latest::{InteriorLocation, Location, SendError, SendResult, SendXcm, Xcm, XcmHash};
26
use xcm::{VersionedLocation, VersionedXcm};
27
use xcm_builder::{BridgeMessage, DispatchBlob, DispatchBlobError, InspectMessageQueues};
28

            
29
/// Threshold for determining if the message queue is congested.
30
/// Based on XcmpQueue pallet's QueueConfigData default (64KiB * 32 = 2MiB).
31
/// It should be a good heuristic to determine if the queue is congested.
32
const MESSAGE_QUEUE_CONGESTION_THRESHOLD: u32 = 32;
33

            
34
/// The target that will be used when publishing logs related to this component.
35
pub const LOG_TARGET: &str = "moonbeam-bridge";
36

            
37
pub struct BridgeXcmRouter<MessageExporter>(PhantomData<MessageExporter>);
38

            
39
// This struct acts as the `SendXcm` to the local instance of pallet_bridge_messages instead of
40
// regular XCMP/DMP transport.
41
impl<MessageExporter: SendXcm> SendXcm for BridgeXcmRouter<MessageExporter> {
42
	type Ticket = MessageExporter::Ticket;
43

            
44
15
	fn validate(
45
15
		dest: &mut Option<Location>,
46
15
		xcm: &mut Option<Xcm<()>>,
47
15
	) -> SendResult<Self::Ticket> {
48
15
		log::trace!(target: LOG_TARGET, "validate - msg: {xcm:?}, destination: {dest:?}");
49

            
50
15
		MessageExporter::validate(dest, xcm)
51
15
	}
52

            
53
2
	fn deliver(ticket: Self::Ticket) -> Result<XcmHash, SendError> {
54
2
		MessageExporter::deliver(ticket)
55
2
	}
56
}
57

            
58
/// This router needs to implement `InspectMessageQueues` but doesn't have to
59
/// return any messages, since it just reuses the `XcmpQueue` router.
60
impl<MessageExporter> InspectMessageQueues for BridgeXcmRouter<MessageExporter> {
61
	fn clear_messages() {}
62

            
63
	fn get_messages() -> Vec<(VersionedLocation, Vec<VersionedXcm<()>>)> {
64
		Vec::new()
65
	}
66
}
67

            
68
pub struct LocalBlobDispatcher<MQ, OurPlace, OurPlaceBridgeInstance>(
69
	PhantomData<(MQ, OurPlace, OurPlaceBridgeInstance)>,
70
);
71
impl<
72
		MQ: EnqueueMessage<AggregateMessageOrigin>,
73
		OurPlace: Get<InteriorLocation>,
74
		OurPlaceBridgeInstance: Get<Option<InteriorLocation>>,
75
	> DispatchBlob for LocalBlobDispatcher<MQ, OurPlace, OurPlaceBridgeInstance>
76
{
77
2
	fn dispatch_blob(blob: Vec<u8>) -> Result<(), DispatchBlobError> {
78
2
		let our_universal = OurPlace::get();
79
2
		let our_global = our_universal
80
2
			.global_consensus()
81
2
			.map_err(|()| DispatchBlobError::Unbridgable)?;
82
		let BridgeMessage {
83
2
			universal_dest,
84
2
			message,
85
2
		} = Decode::decode(&mut &blob[..]).map_err(|_| DispatchBlobError::InvalidEncoding)?;
86
2
		let universal_dest: InteriorLocation = universal_dest
87
2
			.try_into()
88
2
			.map_err(|_| DispatchBlobError::UnsupportedLocationVersion)?;
89
		// `universal_dest` is the desired destination within the universe: first we need to check
90
		// we're in the right global consensus.
91
2
		let intended_global = universal_dest
92
2
			.global_consensus()
93
2
			.map_err(|()| DispatchBlobError::NonUniversalDestination)?;
94
2
		ensure!(
95
2
			intended_global == our_global,
96
			DispatchBlobError::WrongGlobal
97
		);
98
2
		let xcm: Xcm<()> = message
99
2
			.try_into()
100
2
			.map_err(|_| DispatchBlobError::UnsupportedXcmVersion)?;
101

            
102
2
		let msg: BoundedVec<u8, MQ::MaxMessageLen> = xcm::opaque::VersionedXcm::V5(xcm)
103
2
			.encode()
104
2
			.try_into()
105
2
			.map_err(|_| DispatchBlobError::InvalidEncoding)?;
106

            
107
2
		MQ::enqueue_message(
108
2
			msg.as_bounded_slice(),
109
2
			AggregateMessageOrigin::Here, // The message came from the para-chain itself.
110
2
		);
111
2

            
112
2
		Ok(())
113
2
	}
114
}
115

            
116
/// Implementation of `bp_xcm_bridge_hub::LocalXcmChannelManager` for congestion management.
117
pub struct CongestionManager<Runtime>(PhantomData<Runtime>);
118
impl<Runtime: pallet_message_queue::Config> pallet_xcm_bridge::LocalXcmChannelManager
119
	for CongestionManager<Runtime>
120
where
121
	<Runtime as pallet_message_queue::Config>::MessageProcessor:
122
		ProcessMessage<Origin = AggregateMessageOrigin>,
123
{
124
	type Error = SendError;
125

            
126
6
	fn is_congested(_with: &Location) -> bool {
127
6
		let book_state =
128
6
			pallet_message_queue::Pallet::<Runtime>::footprint(AggregateMessageOrigin::Here);
129
6

            
130
6
		book_state.ready_pages >= MESSAGE_QUEUE_CONGESTION_THRESHOLD
131
6
	}
132

            
133
	fn suspend_bridge(_local_origin: &Location, _bridge: BridgeId) -> Result<(), Self::Error> {
134
		// Currently, we send a suspend message, but we reject inbound
135
		// messages when the queue is congested.
136
		Ok(())
137
	}
138

            
139
	fn resume_bridge(_local_origin: &Location, _bridge: BridgeId) -> Result<(), Self::Error> {
140
		Ok(())
141
	}
142
}