1
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2
// This file is part of Parity Bridges Common.
3

            
4
// Parity Bridges Common is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Parity Bridges Common is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Parity Bridges Common.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
//! The module contains utilities for handling congestion between the bridge hub and routers.
18

            
19
use crate::{Bridges, Config, DispatchChannelStatusProvider, LOG_TARGET};
20
use bp_xcm_bridge::{BridgeId, LocalXcmChannelManager, Receiver};
21
use parity_scale_codec::{Decode, Encode};
22
use sp_runtime::traits::Convert;
23
use sp_std::{marker::PhantomData, vec::Vec};
24
use xcm::latest::{send_xcm, Location, SendXcm, Xcm};
25
use xcm_builder::{DispatchBlob, DispatchBlobError};
26

            
27
/// Limits for handling congestion.
28
#[derive(Debug, Decode, Encode)]
29
pub struct CongestionLimits {
30
	/// Maximal number of messages in the outbound bridge queue. Once we reach this limit, we
31
	/// suspend a bridge.
32
	pub outbound_lane_congested_threshold: bp_messages::MessageNonce,
33
	/// After we have suspended the bridge, we wait until number of messages in the outbound bridge
34
	/// queue drops to this count, before sending resuming the bridge.
35
	pub outbound_lane_uncongested_threshold: bp_messages::MessageNonce,
36
	/// Maximal number of messages in the outbound bridge queue after we have suspended the bridge.
37
	/// Once we reach this limit, we stop exporting more messages.
38
	pub outbound_lane_stop_threshold: bp_messages::MessageNonce,
39
}
40

            
41
impl CongestionLimits {
42
	/// Checks if limits are valid.
43
1
	pub fn is_valid(&self) -> bool {
44
1
		self.outbound_lane_uncongested_threshold < self.outbound_lane_congested_threshold
45
1
			&& self.outbound_lane_stop_threshold > self.outbound_lane_congested_threshold
46
1
	}
47
}
48

            
49
impl Default for CongestionLimits {
50
57389
	fn default() -> Self {
51
57389
		Self {
52
57389
			outbound_lane_congested_threshold: 8_192,
53
57389
			outbound_lane_uncongested_threshold: 1_024,
54
57389
			outbound_lane_stop_threshold: 12_288,
55
57389
		}
56
57389
	}
57
}
58

            
59
/// Switches the implementation of [`LocalXcmChannelManager`] based on the `local_origin`.
60
///
61
/// - `HereXcmChannelManager` is applied when the origin is `Here`.
62
/// - Otherwise, `LocalConsensusXcmChannelManager` is used.
63
///
64
/// This is useful when the `pallet-xcm-bridge` needs to support both:
65
/// - A local router deployed on the same chain as the `pallet-xcm-bridge`.
66
/// - A remote router deployed on a different chain than the `pallet-xcm-bridge`.
67
pub struct HereOrLocalConsensusXcmChannelManager<
68
	Bridge,
69
	HereXcmChannelManager,
70
	LocalConsensusXcmChannelManager,
71
>(
72
	PhantomData<(
73
		Bridge,
74
		HereXcmChannelManager,
75
		LocalConsensusXcmChannelManager,
76
	)>,
77
);
78
impl<
79
		Bridge: Encode + sp_std::fmt::Debug + Copy,
80
		HereXcmChannelManager: LocalXcmChannelManager<Bridge>,
81
		LocalConsensusXcmChannelManager: LocalXcmChannelManager<Bridge>,
82
	> LocalXcmChannelManager<Bridge>
83
	for HereOrLocalConsensusXcmChannelManager<
84
		Bridge,
85
		HereXcmChannelManager,
86
		LocalConsensusXcmChannelManager,
87
	>
88
{
89
	type Error = ();
90

            
91
5
	fn suspend_bridge(local_origin: &Location, bridge: Bridge) -> Result<(), Self::Error> {
92
5
		if local_origin.eq(&Location::here()) {
93
1
			HereXcmChannelManager::suspend_bridge(local_origin, bridge).map_err(|e| {
94
				log::error!(
95
					target: LOG_TARGET,
96
					"HereXcmChannelManager::suspend_bridge error: {e:?} for local_origin: {:?} and bridge: {:?}",
97
					local_origin,
98
					bridge,
99
				);
100
				()
101
1
			})
102
		} else {
103
4
			LocalConsensusXcmChannelManager::suspend_bridge(local_origin, bridge).map_err(|e| {
104
                log::error!(
105
                    target: LOG_TARGET,
106
					"LocalConsensusXcmChannelManager::suspend_bridge error: {e:?} for local_origin: {:?} and bridge: {:?}",
107
					local_origin,
108
					bridge,
109
				);
110
                ()
111
4
            })
112
		}
113
5
	}
114

            
115
4
	fn resume_bridge(local_origin: &Location, bridge: Bridge) -> Result<(), Self::Error> {
116
4
		if local_origin.eq(&Location::here()) {
117
1
			HereXcmChannelManager::resume_bridge(local_origin, bridge).map_err(|e| {
118
				log::error!(
119
					target: LOG_TARGET,
120
					"HereXcmChannelManager::resume_bridge error: {e:?} for local_origin: {:?} and bridge: {:?}",
121
					local_origin,
122
					bridge,
123
				);
124
				()
125
1
			})
126
		} else {
127
3
			LocalConsensusXcmChannelManager::resume_bridge(local_origin, bridge).map_err(|e| {
128
                log::error!(
129
                    target: LOG_TARGET,
130
					"LocalConsensusXcmChannelManager::resume_bridge error: {e:?} for local_origin: {:?} and bridge: {:?}",
131
					local_origin,
132
					bridge,
133
				);
134
                ()
135
3
            })
136
		}
137
4
	}
138
}
139

            
140
/// Manages the local XCM channels by sending XCM messages with the `update_bridge_status` extrinsic
141
/// to the `local_origin`. The `XcmProvider` type converts the encoded call to `XCM`, which is then
142
/// sent by `XcmSender` to the `local_origin`. This is useful, for example, when a router with
143
/// [`xcm::prelude::ExportMessage`] is deployed on a different chain, and we want to control
144
/// congestion by sending XCMs.
145
pub struct UpdateBridgeStatusXcmChannelManager<T, I, XcmProvider, XcmSender>(
146
	PhantomData<(T, I, XcmProvider, XcmSender)>,
147
);
148
impl<T: Config<I>, I: 'static, XcmProvider: Convert<Vec<u8>, Xcm<()>>, XcmSender: SendXcm>
149
	UpdateBridgeStatusXcmChannelManager<T, I, XcmProvider, XcmSender>
150
{
151
7
	fn update_bridge_status(
152
7
		local_origin: &Location,
153
7
		bridge_id: BridgeId,
154
7
		is_congested: bool,
155
7
	) -> Result<(), ()> {
156
		// check the bridge and get `maybe_notify` callback.
157
7
		let bridge = Bridges::<T, I>::get(&bridge_id).ok_or(())?;
158
		let Some(Receiver {
159
2
			pallet_index,
160
2
			call_index,
161
7
		}) = bridge.maybe_notify
162
		else {
163
			// `local_origin` did not set `maybe_notify`, so nothing to notify, so it is ok.
164
5
			return Ok(());
165
		};
166

            
167
		// constructing expected call
168
2
		let remote_runtime_call = (pallet_index, call_index, bridge_id, is_congested);
169
2
		// construct XCM
170
2
		let xcm = XcmProvider::convert(remote_runtime_call.encode());
171
2
		log::trace!(
172
			target: LOG_TARGET,
173
			"UpdateBridgeStatusXcmChannelManager is going to send status with is_congested: {:?} to the local_origin: {:?} and bridge: {:?} as xcm: {:?}",
174
			is_congested,
175
			local_origin,
176
			bridge,
177
			xcm,
178
		);
179

            
180
		// send XCM
181
2
		send_xcm::<XcmSender>(local_origin.clone(), xcm)
182
2
            .map(|result| {
183
2
                log::warn!(
184
                    target: LOG_TARGET,
185
					"UpdateBridgeStatusXcmChannelManager successfully sent status with is_congested: {:?} to the local_origin: {:?} and bridge: {:?} with result: {:?}",
186
                    is_congested,
187
					local_origin,
188
					bridge,
189
                    result,
190
				);
191
2
                ()
192
2
            })
193
2
            .map_err(|e| {
194
                log::error!(
195
                    target: LOG_TARGET,
196
					"UpdateBridgeStatusXcmChannelManager failed to send status with is_congested: {:?} to the local_origin: {:?} and bridge: {:?} with error: {:?}",
197
                    is_congested,
198
					local_origin,
199
					bridge,
200
                    e,
201
				);
202
                ()
203
2
            })
204
7
	}
205
}
206
impl<T: Config<I>, I: 'static, XcmProvider: Convert<Vec<u8>, Xcm<()>>, XcmSender: SendXcm>
207
	LocalXcmChannelManager<BridgeId>
208
	for UpdateBridgeStatusXcmChannelManager<T, I, XcmProvider, XcmSender>
209
{
210
	type Error = ();
211

            
212
4
	fn suspend_bridge(local_origin: &Location, bridge: BridgeId) -> Result<(), Self::Error> {
213
4
		Self::update_bridge_status(local_origin, bridge, true)
214
4
	}
215

            
216
3
	fn resume_bridge(local_origin: &Location, bridge: BridgeId) -> Result<(), Self::Error> {
217
3
		Self::update_bridge_status(local_origin, bridge, false)
218
3
	}
219
}
220

            
221
/// Adapter that ties together the [`DispatchBlob`] trait with the [`DispatchChannelStatusProvider`]
222
/// trait. The idea is that [`DispatchBlob`] triggers message dispatch/delivery on the receiver
223
/// side, while [`DispatchChannelStatusProvider`] provides a status check to ensure the dispatch
224
/// channel is active (not congested).
225
pub struct BlobDispatcherWithChannelStatus<ChannelDispatch, ChannelStatus>(
226
	PhantomData<(ChannelDispatch, ChannelStatus)>,
227
);
228
impl<ChannelDispatch: DispatchBlob, ChannelStatus> DispatchBlob
229
	for BlobDispatcherWithChannelStatus<ChannelDispatch, ChannelStatus>
230
{
231
1
	fn dispatch_blob(blob: Vec<u8>) -> Result<(), DispatchBlobError> {
232
1
		ChannelDispatch::dispatch_blob(blob)
233
1
	}
234
}
235
impl<ChannelDispatch, ChannelStatus: DispatchChannelStatusProvider> DispatchChannelStatusProvider
236
	for BlobDispatcherWithChannelStatus<ChannelDispatch, ChannelStatus>
237
{
238
2
	fn is_congested(with: &Location) -> bool {
239
2
		ChannelStatus::is_congested(with)
240
2
	}
241
}