1
// Copyright (C) Parity Technologies (UK) Ltd.
2
// This file is part of Cumulus.
3

            
4
// Substrate is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Substrate is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Cumulus.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
//! A pallet which uses the XCMP transport layer to handle both incoming and outgoing XCM message
18
//! sending and dispatch, queuing, signalling and backpressure. To do so, it implements:
19
//! * `XcmpMessageHandler`
20
//! * `XcmpMessageSource`
21
//!
22
//! Also provides an implementation of `SendXcm` which can be placed in a router tuple for relaying
23
//! XCM over XCMP if the destination is `Parent/Parachain`. It requires an implementation of
24
//! `XcmExecutor` for dispatching incoming XCM messages.
25
//!
26
//! To prevent out of memory errors on the `OutboundXcmpMessages` queue, an exponential fee factor
27
//! (`DeliveryFeeFactor`) is set, much like the one used in DMP.
28
//! The fee factor increases whenever the total size of messages in a particular channel passes a
29
//! threshold. This threshold is defined as a percentage of the maximum total size the channel can
30
//! have. More concretely, the threshold is `max_total_size` / `THRESHOLD_FACTOR`, where:
31
//! - `max_total_size` is the maximum size, in bytes, of the channel, not number of messages.
32
//! It is defined in the channel configuration.
33
//! - `THRESHOLD_FACTOR` just declares which percentage of the max size is the actual threshold.
34
//! If it's 2, then the threshold is half of the max size, if it's 4, it's a quarter, and so on.
35

            
36
#![cfg_attr(not(feature = "std"), no_std)]
37

            
38
pub mod migration;
39

            
40
#[cfg(test)]
41
mod mock;
42

            
43
#[cfg(test)]
44
mod tests;
45

            
46
#[cfg(feature = "runtime-benchmarks")]
47
mod benchmarking;
48
#[cfg(feature = "bridging")]
49
pub mod bridging;
50
pub mod weights;
51
pub use weights::WeightInfo;
52

            
53
extern crate alloc;
54

            
55
use alloc::vec::Vec;
56
use bounded_collections::BoundedBTreeSet;
57
use codec::{Decode, DecodeLimit, Encode, MaxEncodedLen};
58
use cumulus_primitives_core::{
59
	relay_chain::BlockNumber as RelayBlockNumber, ChannelStatus, GetChannelInfo, MessageSendError,
60
	ParaId, XcmpMessageFormat, XcmpMessageHandler, XcmpMessageSource,
61
};
62

            
63
use frame_support::{
64
	defensive, defensive_assert,
65
	traits::{Defensive, EnqueueMessage, EnsureOrigin, Get, QueueFootprint, QueuePausedQuery},
66
	weights::{Weight, WeightMeter},
67
	BoundedVec,
68
};
69
use pallet_message_queue::OnQueueChanged;
70
use polkadot_runtime_common::xcm_sender::PriceForMessageDelivery;
71
use polkadot_runtime_parachains::FeeTracker;
72
use scale_info::TypeInfo;
73
use sp_core::MAX_POSSIBLE_ALLOCATION;
74
use sp_runtime::{FixedU128, RuntimeDebug, Saturating, WeakBoundedVec};
75
use xcm::{latest::prelude::*, VersionedLocation, VersionedXcm, WrapVersion, MAX_XCM_DECODE_DEPTH};
76
use xcm_builder::InspectMessageQueues;
77
use xcm_executor::traits::ConvertOrigin;
78

            
79
pub use pallet::*;
80

            
81
/// Index used to identify overweight XCMs.
82
pub type OverweightIndex = u64;
83
/// The max length of an XCMP message.
84
pub type MaxXcmpMessageLenOf<T> =
85
	<<T as Config>::XcmpQueue as EnqueueMessage<ParaId>>::MaxMessageLen;
86

            
87
const LOG_TARGET: &str = "xcmp_queue";
88
const DEFAULT_POV_SIZE: u64 = 64 * 1024; // 64 KB
89

            
90
/// Constants related to delivery fee calculation
91
pub mod delivery_fee_constants {
92
	use super::FixedU128;
93

            
94
	/// Fees will start increasing when queue is half full
95
	pub const THRESHOLD_FACTOR: u32 = 2;
96
	/// The base number the delivery fee factor gets multiplied by every time it is increased.
97
	/// Also, the number it gets divided by when decreased.
98
	pub const EXPONENTIAL_FEE_BASE: FixedU128 = FixedU128::from_rational(105, 100); // 1.05
99
	/// The contribution of each KB to a fee factor increase
100
	pub const MESSAGE_SIZE_FEE_BASE: FixedU128 = FixedU128::from_rational(1, 1000); // 0.001
101
}
102

            
103
#[frame_support::pallet]
104
pub mod pallet {
105
	use super::*;
106
	use frame_support::{pallet_prelude::*, Twox64Concat};
107
	use frame_system::pallet_prelude::*;
108

            
109
	#[pallet::pallet]
110
	#[pallet::storage_version(migration::STORAGE_VERSION)]
111
	pub struct Pallet<T>(_);
112

            
113
	#[pallet::config]
114
	pub trait Config: frame_system::Config {
115
		type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
116

            
117
		/// Information on the available XCMP channels.
118
		type ChannelInfo: GetChannelInfo;
119

            
120
		/// Means of converting an `Xcm` into a `VersionedXcm`.
121
		type VersionWrapper: WrapVersion;
122

            
123
		/// Enqueue an inbound horizontal message for later processing.
124
		///
125
		/// This defines the maximal message length via [`crate::MaxXcmpMessageLenOf`]. The pallet
126
		/// assumes that this hook will eventually process all the pushed messages.
127
		type XcmpQueue: EnqueueMessage<ParaId>;
128

            
129
		/// The maximum number of inbound XCMP channels that can be suspended simultaneously.
130
		///
131
		/// Any further channel suspensions will fail and messages may get dropped without further
132
		/// notice. Choosing a high value (1000) is okay; the trade-off that is described in
133
		/// [`InboundXcmpSuspended`] still applies at that scale.
134
		#[pallet::constant]
135
		type MaxInboundSuspended: Get<u32>;
136

            
137
		/// Maximal number of outbound XCMP channels that can have messages queued at the same time.
138
		///
139
		/// If this is reached, then no further messages can be sent to channels that do not yet
140
		/// have a message queued. This should be set to the expected maximum of outbound channels
141
		/// which is determined by [`Self::ChannelInfo`]. It is important to set this large enough,
142
		/// since otherwise the congestion control protocol will not work as intended and messages
143
		/// may be dropped. This value increases the PoV and should therefore not be picked too
144
		/// high. Governance needs to pay attention to not open more channels than this value.
145
		#[pallet::constant]
146
		type MaxActiveOutboundChannels: Get<u32>;
147

            
148
		/// The maximal page size for HRMP message pages.
149
		///
150
		/// A lower limit can be set dynamically, but this is the hard-limit for the PoV worst case
151
		/// benchmarking. The limit for the size of a message is slightly below this, since some
152
		/// overhead is incurred for encoding the format.
153
		#[pallet::constant]
154
		type MaxPageSize: Get<u32>;
155

            
156
		/// The origin that is allowed to resume or suspend the XCMP queue.
157
		type ControllerOrigin: EnsureOrigin<Self::RuntimeOrigin>;
158

            
159
		/// The conversion function used to attempt to convert an XCM `Location` origin to a
160
		/// superuser origin.
161
		type ControllerOriginConverter: ConvertOrigin<Self::RuntimeOrigin>;
162

            
163
		/// The price for delivering an XCM to a sibling parachain destination.
164
		type PriceForSiblingDelivery: PriceForMessageDelivery<Id = ParaId>;
165

            
166
		/// The weight information of this pallet.
167
		type WeightInfo: WeightInfo;
168
	}
169

            
170
	#[pallet::call]
171
	impl<T: Config> Pallet<T> {
172
		/// Suspends all XCM executions for the XCMP queue, regardless of the sender's origin.
173
		///
174
		/// - `origin`: Must pass `ControllerOrigin`.
175
		#[pallet::call_index(1)]
176
		#[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
177
		pub fn suspend_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
178
			T::ControllerOrigin::ensure_origin(origin)?;
179

            
180
			QueueSuspended::<T>::try_mutate(|suspended| {
181
				if *suspended {
182
					Err(Error::<T>::AlreadySuspended.into())
183
				} else {
184
					*suspended = true;
185
					Ok(())
186
				}
187
			})
188
		}
189

            
190
		/// Resumes all XCM executions for the XCMP queue.
191
		///
192
		/// Note that this function doesn't change the status of the in/out bound channels.
193
		///
194
		/// - `origin`: Must pass `ControllerOrigin`.
195
		#[pallet::call_index(2)]
196
		#[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
197
		pub fn resume_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
198
			T::ControllerOrigin::ensure_origin(origin)?;
199

            
200
			QueueSuspended::<T>::try_mutate(|suspended| {
201
				if !*suspended {
202
					Err(Error::<T>::AlreadyResumed.into())
203
				} else {
204
					*suspended = false;
205
					Ok(())
206
				}
207
			})
208
		}
209

            
210
		/// Overwrites the number of pages which must be in the queue for the other side to be
211
		/// told to suspend their sending.
212
		///
213
		/// - `origin`: Must pass `Root`.
214
		/// - `new`: Desired value for `QueueConfigData.suspend_value`
215
		#[pallet::call_index(3)]
216
		#[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
217
		pub fn update_suspend_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
218
			ensure_root(origin)?;
219

            
220
			QueueConfig::<T>::try_mutate(|data| {
221
				data.suspend_threshold = new;
222
				data.validate::<T>()
223
			})
224
		}
225

            
226
		/// Overwrites the number of pages which must be in the queue after which we drop any
227
		/// further messages from the channel.
228
		///
229
		/// - `origin`: Must pass `Root`.
230
		/// - `new`: Desired value for `QueueConfigData.drop_threshold`
231
		#[pallet::call_index(4)]
232
		#[pallet::weight((T::WeightInfo::set_config_with_u32(),DispatchClass::Operational,))]
233
		pub fn update_drop_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
234
			ensure_root(origin)?;
235

            
236
			QueueConfig::<T>::try_mutate(|data| {
237
				data.drop_threshold = new;
238
				data.validate::<T>()
239
			})
240
		}
241

            
242
		/// Overwrites the number of pages which the queue must be reduced to before it signals
243
		/// that message sending may recommence after it has been suspended.
244
		///
245
		/// - `origin`: Must pass `Root`.
246
		/// - `new`: Desired value for `QueueConfigData.resume_threshold`
247
		#[pallet::call_index(5)]
248
		#[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
249
		pub fn update_resume_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
250
			ensure_root(origin)?;
251

            
252
			QueueConfig::<T>::try_mutate(|data| {
253
				data.resume_threshold = new;
254
				data.validate::<T>()
255
			})
256
		}
257
	}
258

            
259
	#[pallet::hooks]
260
	impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
261
		fn integrity_test() {
262
			let w = Self::on_idle_weight();
263
			assert!(w != Weight::zero());
264
			assert!(w.all_lte(T::BlockWeights::get().max_block));
265
		}
266

            
267
		fn on_idle(_block: BlockNumberFor<T>, limit: Weight) -> Weight {
268
			let mut meter = WeightMeter::with_limit(limit);
269

            
270
			if meter.try_consume(Self::on_idle_weight()).is_err() {
271
				log::debug!(
272
					"Not enough weight for on_idle. {} < {}",
273
					Self::on_idle_weight(),
274
					limit
275
				);
276
				return meter.consumed()
277
			}
278

            
279
			migration::v3::lazy_migrate_inbound_queue::<T>();
280

            
281
			meter.consumed()
282
		}
283
	}
284

            
285
	#[pallet::event]
286
	#[pallet::generate_deposit(pub(super) fn deposit_event)]
287
	pub enum Event<T: Config> {
288
		/// An HRMP message was sent to a sibling parachain.
289
		XcmpMessageSent { message_hash: XcmHash },
290
	}
291

            
292
	#[pallet::error]
293
	pub enum Error<T> {
294
		/// Setting the queue config failed since one of its values was invalid.
295
		BadQueueConfig,
296
		/// The execution is already suspended.
297
		AlreadySuspended,
298
		/// The execution is already resumed.
299
		AlreadyResumed,
300
		/// There are too many active outbound channels.
301
		TooManyActiveOutboundChannels,
302
		/// The message is too big.
303
		TooBig,
304
	}
305

            
306
	/// The suspended inbound XCMP channels. All others are not suspended.
307
	///
308
	/// This is a `StorageValue` instead of a `StorageMap` since we expect multiple reads per block
309
	/// to different keys with a one byte payload. The access to `BoundedBTreeSet` will be cached
310
	/// within the block and therefore only included once in the proof size.
311
	///
312
	/// NOTE: The PoV benchmarking cannot know this and will over-estimate, but the actual proof
313
	/// will be smaller.
314
	#[pallet::storage]
315
	pub type InboundXcmpSuspended<T: Config> =
316
		StorageValue<_, BoundedBTreeSet<ParaId, T::MaxInboundSuspended>, ValueQuery>;
317

            
318
	/// The non-empty XCMP channels in order of becoming non-empty, and the index of the first
319
	/// and last outbound message. If the two indices are equal, then it indicates an empty
320
	/// queue and there must be a non-`Ok` `OutboundStatus`. We assume queues grow no greater
321
	/// than 65535 items. Queue indices for normal messages begin at one; zero is reserved in
322
	/// case of the need to send a high-priority signal message this block.
323
	/// The bool is true if there is a signal message waiting to be sent.
324
	#[pallet::storage]
325
	pub(super) type OutboundXcmpStatus<T: Config> = StorageValue<
326
		_,
327
		BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>,
328
		ValueQuery,
329
	>;
330

            
331
	/// The messages outbound in a given XCMP channel.
332
	#[pallet::storage]
333
	pub(super) type OutboundXcmpMessages<T: Config> = StorageDoubleMap<
334
		_,
335
		Blake2_128Concat,
336
		ParaId,
337
		Twox64Concat,
338
		u16,
339
		WeakBoundedVec<u8, T::MaxPageSize>,
340
		ValueQuery,
341
	>;
342

            
343
	/// Any signal messages waiting to be sent.
344
	#[pallet::storage]
345
	pub(super) type SignalMessages<T: Config> =
346
		StorageMap<_, Blake2_128Concat, ParaId, WeakBoundedVec<u8, T::MaxPageSize>, ValueQuery>;
347

            
348
	/// The configuration which controls the dynamics of the outbound queue.
349
	#[pallet::storage]
350
	pub(super) type QueueConfig<T: Config> = StorageValue<_, QueueConfigData, ValueQuery>;
351

            
352
	/// Whether or not the XCMP queue is suspended from executing incoming XCMs or not.
353
	#[pallet::storage]
354
	pub(super) type QueueSuspended<T: Config> = StorageValue<_, bool, ValueQuery>;
355

            
356
	/// Initialization value for the DeliveryFee factor.
357
	#[pallet::type_value]
358
	pub fn InitialFactor() -> FixedU128 {
359
		FixedU128::from_u32(1)
360
	}
361

            
362
	/// The factor to multiply the base delivery fee by.
363
	#[pallet::storage]
364
	pub(super) type DeliveryFeeFactor<T: Config> =
365
		StorageMap<_, Twox64Concat, ParaId, FixedU128, ValueQuery, InitialFactor>;
366
}
367

            
368
#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
369
pub enum OutboundState {
370
	Ok,
371
	Suspended,
372
}
373

            
374
/// Struct containing detailed information about the outbound channel.
375
#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo, RuntimeDebug, MaxEncodedLen)]
376
pub struct OutboundChannelDetails {
377
	/// The `ParaId` of the parachain that this channel is connected with.
378
	recipient: ParaId,
379
	/// The state of the channel.
380
	state: OutboundState,
381
	/// Whether or not any signals exist in this channel.
382
	signals_exist: bool,
383
	/// The index of the first outbound message.
384
	first_index: u16,
385
	/// The index of the last outbound message.
386
	last_index: u16,
387
}
388

            
389
impl OutboundChannelDetails {
390
	pub fn new(recipient: ParaId) -> OutboundChannelDetails {
391
		OutboundChannelDetails {
392
			recipient,
393
			state: OutboundState::Ok,
394
			signals_exist: false,
395
			first_index: 0,
396
			last_index: 0,
397
		}
398
	}
399

            
400
	pub fn with_signals(mut self) -> OutboundChannelDetails {
401
		self.signals_exist = true;
402
		self
403
	}
404

            
405
	pub fn with_suspended_state(mut self) -> OutboundChannelDetails {
406
		self.state = OutboundState::Suspended;
407
		self
408
	}
409
}
410

            
411
#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
412
pub struct QueueConfigData {
413
	/// The number of pages which must be in the queue for the other side to be told to suspend
414
	/// their sending.
415
	suspend_threshold: u32,
416
	/// The number of pages which must be in the queue after which we drop any further messages
417
	/// from the channel. This should normally not happen since the `suspend_threshold` can be used
418
	/// to suspend the channel.
419
	drop_threshold: u32,
420
	/// The number of pages which the queue must be reduced to before it signals that
421
	/// message sending may recommence after it has been suspended.
422
	resume_threshold: u32,
423
}
424

            
425
impl Default for QueueConfigData {
426
	fn default() -> Self {
427
		// NOTE that these default values are only used on genesis. They should give a rough idea of
428
		// what to set these values to, but is in no way a requirement.
429
		Self {
430
			drop_threshold: 48,    // 64KiB * 48 = 3MiB
431
			suspend_threshold: 32, // 64KiB * 32 = 2MiB
432
			resume_threshold: 8,   // 64KiB * 8 = 512KiB
433
		}
434
	}
435
}
436

            
437
impl QueueConfigData {
438
	/// Validate all assumptions about `Self`.
439
	///
440
	/// Should be called prior to accepting this as new config.
441
	pub fn validate<T: crate::Config>(&self) -> sp_runtime::DispatchResult {
442
		if self.resume_threshold < self.suspend_threshold &&
443
			self.suspend_threshold <= self.drop_threshold &&
444
			self.resume_threshold > 0
445
		{
446
			Ok(())
447
		} else {
448
			Err(Error::<T>::BadQueueConfig.into())
449
		}
450
	}
451
}
452

            
453
#[derive(PartialEq, Eq, Copy, Clone, Encode, Decode, TypeInfo)]
454
pub enum ChannelSignal {
455
	Suspend,
456
	Resume,
457
}
458

            
459
impl<T: Config> Pallet<T> {
460
	/// Place a message `fragment` on the outgoing XCMP queue for `recipient`.
461
	///
462
	/// Format is the type of aggregate message that the `fragment` may be safely encoded and
463
	/// appended onto. Whether earlier unused space is used for the fragment at the risk of sending
464
	/// it out of order is determined with `qos`. NOTE: For any two messages to be guaranteed to be
465
	/// dispatched in order, then both must be sent with `ServiceQuality::Ordered`.
466
	///
467
	/// ## Background
468
	///
469
	/// For our purposes, one HRMP "message" is actually an aggregated block of XCM "messages".
470
	///
471
	/// For the sake of clarity, we distinguish between them as message AGGREGATEs versus
472
	/// message FRAGMENTs.
473
	///
474
	/// So each AGGREGATE is comprised of one or more concatenated SCALE-encoded `Vec<u8>`
475
	/// FRAGMENTs. Though each fragment is already probably a SCALE-encoded Xcm, we can't be
476
	/// certain, so we SCALE encode each `Vec<u8>` fragment in order to ensure we have the
477
	/// length prefixed and can thus decode each fragment from the aggregate stream. With this,
478
	/// we can concatenate them into a single aggregate blob without needing to be concerned
479
	/// about encoding fragment boundaries.
480
	///
481
	/// If successful, returns the number of pages in the outbound queue after enqueuing the new
482
	/// fragment.
483
	fn send_fragment<Fragment: Encode>(
484
		recipient: ParaId,
485
		format: XcmpMessageFormat,
486
		fragment: Fragment,
487
	) -> Result<u32, MessageSendError> {
488
		let encoded_fragment = fragment.encode();
489

            
490
		// Optimization note: `max_message_size` could potentially be stored in
491
		// `OutboundXcmpMessages` once known; that way it's only accessed when a new page is needed.
492

            
493
		let channel_info =
494
			T::ChannelInfo::get_channel_info(recipient).ok_or(MessageSendError::NoChannel)?;
495
		// Max message size refers to aggregates, or pages. Not to individual fragments.
496
		let max_message_size = channel_info.max_message_size.min(T::MaxPageSize::get()) as usize;
497
		let format_size = format.encoded_size();
498
		// We check the encoded fragment length plus the format size against the max message size
499
		// because the format is concatenated if a new page is needed.
500
		let size_to_check = encoded_fragment
501
			.len()
502
			.checked_add(format_size)
503
			.ok_or(MessageSendError::TooBig)?;
504
		if size_to_check > max_message_size {
505
			return Err(MessageSendError::TooBig)
506
		}
507

            
508
		let mut all_channels = <OutboundXcmpStatus<T>>::get();
509
		let channel_details = if let Some(details) =
510
			all_channels.iter_mut().find(|channel| channel.recipient == recipient)
511
		{
512
			details
513
		} else {
514
			all_channels.try_push(OutboundChannelDetails::new(recipient)).map_err(|e| {
515
				log::error!("Failed to activate HRMP channel: {:?}", e);
516
				MessageSendError::TooManyChannels
517
			})?;
518
			all_channels
519
				.last_mut()
520
				.expect("can't be empty; a new element was just pushed; qed")
521
		};
522
		let have_active = channel_details.last_index > channel_details.first_index;
523
		// Try to append fragment to the last page, if there is enough space.
524
		// We return the size of the last page inside of the option, to not calculate it again.
525
		let appended_to_last_page = have_active
526
			.then(|| {
527
				<OutboundXcmpMessages<T>>::try_mutate(
528
					recipient,
529
					channel_details.last_index - 1,
530
					|page| {
531
						if XcmpMessageFormat::decode_with_depth_limit(
532
							MAX_XCM_DECODE_DEPTH,
533
							&mut &page[..],
534
						) != Ok(format)
535
						{
536
							defensive!("Bad format in outbound queue; dropping message");
537
							return Err(())
538
						}
539
						if page.len() + encoded_fragment.len() > max_message_size {
540
							return Err(())
541
						}
542
						for frag in encoded_fragment.iter() {
543
							page.try_push(*frag)?;
544
						}
545
						Ok(page.len())
546
					},
547
				)
548
				.ok()
549
			})
550
			.flatten();
551

            
552
		let (number_of_pages, last_page_size) = if let Some(size) = appended_to_last_page {
553
			let number_of_pages = (channel_details.last_index - channel_details.first_index) as u32;
554
			(number_of_pages, size)
555
		} else {
556
			// Need to add a new page.
557
			let page_index = channel_details.last_index;
558
			channel_details.last_index += 1;
559
			let mut new_page = format.encode();
560
			new_page.extend_from_slice(&encoded_fragment[..]);
561
			let last_page_size = new_page.len();
562
			let number_of_pages = (channel_details.last_index - channel_details.first_index) as u32;
563
			let bounded_page = BoundedVec::<u8, T::MaxPageSize>::try_from(new_page)
564
				.map_err(|_| MessageSendError::TooBig)?;
565
			let bounded_page = WeakBoundedVec::force_from(bounded_page.into_inner(), None);
566
			<OutboundXcmpMessages<T>>::insert(recipient, page_index, bounded_page);
567
			<OutboundXcmpStatus<T>>::put(all_channels);
568
			(number_of_pages, last_page_size)
569
		};
570

            
571
		// We have to count the total size here since `channel_info.total_size` is not updated at
572
		// this point in time. We assume all previous pages are filled, which, in practice, is not
573
		// always the case.
574
		let total_size =
575
			number_of_pages.saturating_sub(1) * max_message_size as u32 + last_page_size as u32;
576
		let threshold = channel_info.max_total_size / delivery_fee_constants::THRESHOLD_FACTOR;
577
		if total_size > threshold {
578
			let message_size_factor = FixedU128::from((encoded_fragment.len() / 1024) as u128)
579
				.saturating_mul(delivery_fee_constants::MESSAGE_SIZE_FEE_BASE);
580
			Self::increase_fee_factor(recipient, message_size_factor);
581
		}
582

            
583
		Ok(number_of_pages)
584
	}
585

            
586
	/// Sends a signal to the `dest` chain over XCMP. This is guaranteed to be dispatched on this
587
	/// block.
588
	fn send_signal(dest: ParaId, signal: ChannelSignal) -> Result<(), Error<T>> {
589
		let mut s = <OutboundXcmpStatus<T>>::get();
590
		if let Some(details) = s.iter_mut().find(|item| item.recipient == dest) {
591
			details.signals_exist = true;
592
		} else {
593
			s.try_push(OutboundChannelDetails::new(dest).with_signals())
594
				.map_err(|_| Error::<T>::TooManyActiveOutboundChannels)?;
595
		}
596

            
597
		let page = BoundedVec::<u8, T::MaxPageSize>::try_from(
598
			(XcmpMessageFormat::Signals, signal).encode(),
599
		)
600
		.map_err(|_| Error::<T>::TooBig)?;
601
		let page = WeakBoundedVec::force_from(page.into_inner(), None);
602

            
603
		<SignalMessages<T>>::insert(dest, page);
604
		<OutboundXcmpStatus<T>>::put(s);
605
		Ok(())
606
	}
607

            
608
	fn suspend_channel(target: ParaId) {
609
		<OutboundXcmpStatus<T>>::mutate(|s| {
610
			if let Some(details) = s.iter_mut().find(|item| item.recipient == target) {
611
				let ok = details.state == OutboundState::Ok;
612
				defensive_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok.");
613
				details.state = OutboundState::Suspended;
614
			} else {
615
				if s.try_push(OutboundChannelDetails::new(target).with_suspended_state()).is_err() {
616
					defensive!("Cannot pause channel; too many outbound channels");
617
				}
618
			}
619
		});
620
	}
621

            
622
	fn resume_channel(target: ParaId) {
623
		<OutboundXcmpStatus<T>>::mutate(|s| {
624
			if let Some(index) = s.iter().position(|item| item.recipient == target) {
625
				let suspended = s[index].state == OutboundState::Suspended;
626
				defensive_assert!(
627
					suspended,
628
					"WARNING: Attempt to resume channel that was not suspended."
629
				);
630
				if s[index].first_index == s[index].last_index {
631
					s.remove(index);
632
				} else {
633
					s[index].state = OutboundState::Ok;
634
				}
635
			} else {
636
				defensive!("WARNING: Attempt to resume channel that was not suspended.");
637
			}
638
		});
639
	}
640

            
641
	fn enqueue_xcmp_message(
642
		sender: ParaId,
643
		xcm: BoundedVec<u8, MaxXcmpMessageLenOf<T>>,
644
		meter: &mut WeightMeter,
645
	) -> Result<(), ()> {
646
		if meter.try_consume(T::WeightInfo::enqueue_xcmp_message()).is_err() {
647
			defensive!("Out of weight: cannot enqueue XCMP messages; dropping msg");
648
			return Err(())
649
		}
650

            
651
		let QueueConfigData { drop_threshold, .. } = <QueueConfig<T>>::get();
652
		let fp = T::XcmpQueue::footprint(sender);
653
		// Assume that it will not fit into the current page:
654
		let new_pages = fp.ready_pages.saturating_add(1);
655
		if new_pages > drop_threshold {
656
			// This should not happen since the channel should have been suspended in
657
			// [`on_queue_changed`].
658
			log::error!("XCMP queue for sibling {:?} is full; dropping messages.", sender);
659
			return Err(())
660
		}
661

            
662
		T::XcmpQueue::enqueue_message(xcm.as_bounded_slice(), sender);
663
		Ok(())
664
	}
665

            
666
	/// Split concatenated encoded `VersionedXcm`s or `MaybeDoubleEncodedVersionedXcm`s into
667
	/// individual items.
668
	///
669
	/// We directly encode them again since that is needed later on.
670
	pub(crate) fn take_first_concatenated_xcm(
671
		data: &mut &[u8],
672
		meter: &mut WeightMeter,
673
	) -> Result<BoundedVec<u8, MaxXcmpMessageLenOf<T>>, ()> {
674
		if data.is_empty() {
675
			return Err(())
676
		}
677

            
678
		if meter.try_consume(T::WeightInfo::take_first_concatenated_xcm()).is_err() {
679
			defensive!("Out of weight; could not decode all; dropping");
680
			return Err(())
681
		}
682

            
683
		let xcm = VersionedXcm::<()>::decode_with_depth_limit(MAX_XCM_DECODE_DEPTH, data)
684
			.map_err(|_| ())?;
685
		xcm.encode().try_into().map_err(|_| ())
686
	}
687

            
688
	/// The worst-case weight of `on_idle`.
689
	pub fn on_idle_weight() -> Weight {
690
		<T as crate::Config>::WeightInfo::on_idle_good_msg()
691
			.max(<T as crate::Config>::WeightInfo::on_idle_large_msg())
692
	}
693

            
694
	#[cfg(feature = "bridging")]
695
	fn is_inbound_channel_suspended(sender: ParaId) -> bool {
696
		<InboundXcmpSuspended<T>>::get().iter().any(|c| c == &sender)
697
	}
698

            
699
	#[cfg(feature = "bridging")]
700
	/// Returns tuple of `OutboundState` and number of queued pages.
701
	fn outbound_channel_state(target: ParaId) -> Option<(OutboundState, u16)> {
702
		<OutboundXcmpStatus<T>>::get().iter().find(|c| c.recipient == target).map(|c| {
703
			let queued_pages = c.last_index.saturating_sub(c.first_index);
704
			(c.state, queued_pages)
705
		})
706
	}
707
}
708

            
709
impl<T: Config> OnQueueChanged<ParaId> for Pallet<T> {
710
	// Suspends/Resumes the queue when certain thresholds are reached.
711
	fn on_queue_changed(para: ParaId, fp: QueueFootprint) {
712
		let QueueConfigData { resume_threshold, suspend_threshold, .. } = <QueueConfig<T>>::get();
713

            
714
		let mut suspended_channels = <InboundXcmpSuspended<T>>::get();
715
		let suspended = suspended_channels.contains(&para);
716

            
717
		if suspended && fp.ready_pages <= resume_threshold {
718
			if let Err(err) = Self::send_signal(para, ChannelSignal::Resume) {
719
				log::error!("defensive: Could not send resumption signal to inbound channel of sibling {:?}: {:?}; channel remains suspended.", para, err);
720
			} else {
721
				suspended_channels.remove(&para);
722
				<InboundXcmpSuspended<T>>::put(suspended_channels);
723
			}
724
		} else if !suspended && fp.ready_pages >= suspend_threshold {
725
			log::warn!("XCMP queue for sibling {:?} is full; suspending channel.", para);
726

            
727
			if let Err(err) = Self::send_signal(para, ChannelSignal::Suspend) {
728
				// It will retry if `drop_threshold` is not reached, but it could be too late.
729
				log::error!(
730
					"defensive: Could not send suspension signal; future messages may be dropped: {:?}", err
731
				);
732
			} else if let Err(err) = suspended_channels.try_insert(para) {
733
				log::error!("Too many channels suspended; cannot suspend sibling {:?}: {:?}; further messages may be dropped.", para, err);
734
			} else {
735
				<InboundXcmpSuspended<T>>::put(suspended_channels);
736
			}
737
		}
738
	}
739
}
740

            
741
impl<T: Config> QueuePausedQuery<ParaId> for Pallet<T> {
742
	fn is_paused(para: &ParaId) -> bool {
743
		if !QueueSuspended::<T>::get() {
744
			return false
745
		}
746

            
747
		// Make an exception for the superuser queue:
748
		let sender_origin = T::ControllerOriginConverter::convert_origin(
749
			(Parent, Parachain((*para).into())),
750
			OriginKind::Superuser,
751
		);
752
		let is_controller =
753
			sender_origin.map_or(false, |origin| T::ControllerOrigin::try_origin(origin).is_ok());
754

            
755
		!is_controller
756
	}
757
}
758

            
759
impl<T: Config> XcmpMessageHandler for Pallet<T> {
760
	fn handle_xcmp_messages<'a, I: Iterator<Item = (ParaId, RelayBlockNumber, &'a [u8])>>(
761
		iter: I,
762
		max_weight: Weight,
763
	) -> Weight {
764
		let mut meter = WeightMeter::with_limit(max_weight);
765

            
766
		for (sender, _sent_at, mut data) in iter {
767
			let format = match XcmpMessageFormat::decode(&mut data) {
768
				Ok(f) => f,
769
				Err(_) => {
770
					defensive!("Unknown XCMP message format - dropping");
771
					continue
772
				},
773
			};
774

            
775
			match format {
776
				XcmpMessageFormat::Signals =>
777
					while !data.is_empty() {
778
						if meter
779
							.try_consume(
780
								T::WeightInfo::suspend_channel()
781
									.max(T::WeightInfo::resume_channel()),
782
							)
783
							.is_err()
784
						{
785
							defensive!("Not enough weight to process signals - dropping");
786
							break
787
						}
788

            
789
						match ChannelSignal::decode(&mut data) {
790
							Ok(ChannelSignal::Suspend) => Self::suspend_channel(sender),
791
							Ok(ChannelSignal::Resume) => Self::resume_channel(sender),
792
							Err(_) => {
793
								defensive!("Undecodable channel signal - dropping");
794
								break
795
							},
796
						}
797
					},
798
				XcmpMessageFormat::ConcatenatedVersionedXcm =>
799
					while !data.is_empty() {
800
						let Ok(xcm) = Self::take_first_concatenated_xcm(&mut data, &mut meter)
801
						else {
802
							defensive!("HRMP inbound decode stream broke; page will be dropped.",);
803
							break
804
						};
805

            
806
						if let Err(()) = Self::enqueue_xcmp_message(sender, xcm, &mut meter) {
807
							defensive!(
808
								"Could not enqueue XCMP messages. Used weight: ",
809
								meter.consumed_ratio()
810
							);
811
							break
812
						}
813
					},
814
				XcmpMessageFormat::ConcatenatedEncodedBlob => {
815
					defensive!("Blob messages are unhandled - dropping");
816
					continue
817
				},
818
			}
819
		}
820

            
821
		meter.consumed()
822
	}
823
}
824

            
825
impl<T: Config> XcmpMessageSource for Pallet<T> {
826
	fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec<u8>)> {
827
		let mut statuses = <OutboundXcmpStatus<T>>::get();
828
		let old_statuses_len = statuses.len();
829
		let max_message_count = statuses.len().min(maximum_channels);
830
		let mut result = Vec::with_capacity(max_message_count);
831

            
832
		for status in statuses.iter_mut() {
833
			let OutboundChannelDetails {
834
				recipient: para_id,
835
				state: outbound_state,
836
				mut signals_exist,
837
				mut first_index,
838
				mut last_index,
839
			} = *status;
840

            
841
			let (max_size_now, max_size_ever) = match T::ChannelInfo::get_channel_status(para_id) {
842
				ChannelStatus::Closed => {
843
					// This means that there is no such channel anymore. Nothing to be done but
844
					// swallow the messages and discard the status.
845
					for i in first_index..last_index {
846
						<OutboundXcmpMessages<T>>::remove(para_id, i);
847
					}
848
					if signals_exist {
849
						<SignalMessages<T>>::remove(para_id);
850
					}
851
					*status = OutboundChannelDetails::new(para_id);
852
					continue
853
				},
854
				ChannelStatus::Full => continue,
855
				ChannelStatus::Ready(n, e) => (n, e),
856
			};
857

            
858
			// This is a hard limit from the host config; not even signals can bypass it.
859
			if result.len() == max_message_count {
860
				// We check this condition in the beginning of the loop so that we don't include
861
				// a message where the limit is 0.
862
				break
863
			}
864

            
865
			let page = if signals_exist {
866
				let page = <SignalMessages<T>>::get(para_id);
867
				defensive_assert!(!page.is_empty(), "Signals must exist");
868

            
869
				if page.len() < max_size_now {
870
					<SignalMessages<T>>::remove(para_id);
871
					signals_exist = false;
872
					page
873
				} else {
874
					defensive!("Signals should fit into a single page");
875
					continue
876
				}
877
			} else if outbound_state == OutboundState::Suspended {
878
				// Signals are exempt from suspension.
879
				continue
880
			} else if last_index > first_index {
881
				let page = <OutboundXcmpMessages<T>>::get(para_id, first_index);
882
				if page.len() < max_size_now {
883
					<OutboundXcmpMessages<T>>::remove(para_id, first_index);
884
					first_index += 1;
885
					page
886
				} else {
887
					continue
888
				}
889
			} else {
890
				continue
891
			};
892
			if first_index == last_index {
893
				first_index = 0;
894
				last_index = 0;
895
			}
896

            
897
			if page.len() > max_size_ever {
898
				// TODO: #274 This means that the channel's max message size has changed since
899
				//   the message was sent. We should parse it and split into smaller messages but
900
				//   since it's so unlikely then for now we just drop it.
901
				defensive!("WARNING: oversize message in queue - dropping");
902
			} else {
903
				result.push((para_id, page.into_inner()));
904
			}
905

            
906
			let max_total_size = match T::ChannelInfo::get_channel_info(para_id) {
907
				Some(channel_info) => channel_info.max_total_size,
908
				None => {
909
					log::warn!("calling `get_channel_info` with no RelevantMessagingState?!");
910
					MAX_POSSIBLE_ALLOCATION // We use this as a fallback in case the messaging state is not present
911
				},
912
			};
913
			let threshold = max_total_size.saturating_div(delivery_fee_constants::THRESHOLD_FACTOR);
914
			let remaining_total_size: usize = (first_index..last_index)
915
				.map(|index| OutboundXcmpMessages::<T>::decode_len(para_id, index).unwrap())
916
				.sum();
917
			if remaining_total_size <= threshold as usize {
918
				Self::decrease_fee_factor(para_id);
919
			}
920

            
921
			*status = OutboundChannelDetails {
922
				recipient: para_id,
923
				state: outbound_state,
924
				signals_exist,
925
				first_index,
926
				last_index,
927
			};
928
		}
929
		debug_assert!(!statuses.iter().any(|s| s.signals_exist), "Signals should be handled");
930

            
931
		// Sort the outbound messages by ascending recipient para id to satisfy the acceptance
932
		// criteria requirement.
933
		result.sort_by_key(|m| m.0);
934

            
935
		// Prune hrmp channels that became empty. Additionally, because it may so happen that we
936
		// only gave attention to some channels in `non_empty_hrmp_channels` it's important to
937
		// change the order. Otherwise, the next `on_finalize` we will again give attention
938
		// only to those channels that happen to be in the beginning, until they are emptied.
939
		// This leads to "starvation" of the channels near to the end.
940
		//
941
		// To mitigate this we shift all processed elements towards the end of the vector using
942
		// `rotate_left`. To get intuition how it works see the examples in its rustdoc.
943
		statuses.retain(|x| {
944
			x.state == OutboundState::Suspended || x.signals_exist || x.first_index < x.last_index
945
		});
946

            
947
		// old_status_len must be >= status.len() since we never add anything to status.
948
		let pruned = old_statuses_len - statuses.len();
949
		// removing an item from status implies a message being sent, so the result messages must
950
		// be no less than the pruned channels.
951
		let _ = statuses.try_rotate_left(result.len().saturating_sub(pruned)).defensive_proof(
952
			"Could not store HRMP channels config. Some HRMP channels may be broken.",
953
		);
954

            
955
		<OutboundXcmpStatus<T>>::put(statuses);
956

            
957
		result
958
	}
959
}
960

            
961
/// Xcm sender for sending to a sibling parachain.
962
impl<T: Config> SendXcm for Pallet<T> {
963
	type Ticket = (ParaId, VersionedXcm<()>);
964

            
965
	fn validate(
966
		dest: &mut Option<Location>,
967
		msg: &mut Option<Xcm<()>>,
968
	) -> SendResult<(ParaId, VersionedXcm<()>)> {
969
		let d = dest.take().ok_or(SendError::MissingArgument)?;
970

            
971
		match d.unpack() {
972
			// An HRMP message for a sibling parachain.
973
			(1, [Parachain(id)]) => {
974
				let xcm = msg.take().ok_or(SendError::MissingArgument)?;
975
				let id = ParaId::from(*id);
976
				let price = T::PriceForSiblingDelivery::price_for_delivery(id, &xcm);
977
				let versioned_xcm = T::VersionWrapper::wrap_version(&d, xcm)
978
					.map_err(|()| SendError::DestinationUnsupported)?;
979
				versioned_xcm
980
					.validate_xcm_nesting()
981
					.map_err(|()| SendError::ExceedsMaxMessageSize)?;
982

            
983
				Ok(((id, versioned_xcm), price))
984
			},
985
			_ => {
986
				// Anything else is unhandled. This includes a message that is not meant for us.
987
				// We need to make sure that dest/msg is not consumed here.
988
				*dest = Some(d);
989
				Err(SendError::NotApplicable)
990
			},
991
		}
992
	}
993

            
994
	fn deliver((id, xcm): (ParaId, VersionedXcm<()>)) -> Result<XcmHash, SendError> {
995
		let hash = xcm.using_encoded(sp_io::hashing::blake2_256);
996

            
997
		match Self::send_fragment(id, XcmpMessageFormat::ConcatenatedVersionedXcm, xcm) {
998
			Ok(_) => {
999
				Self::deposit_event(Event::XcmpMessageSent { message_hash: hash });
				Ok(hash)
			},
			Err(e) => {
				log::error!(target: LOG_TARGET, "Deliver error: {e:?}");
				Err(SendError::Transport(e.into()))
			},
		}
	}
}
impl<T: Config> InspectMessageQueues for Pallet<T> {
	fn get_messages() -> Vec<(VersionedLocation, Vec<VersionedXcm<()>>)> {
		use xcm::prelude::*;
		OutboundXcmpMessages::<T>::iter()
			.map(|(para_id, _, messages)| {
				let mut data = &messages[..];
				let decoded_format =
					XcmpMessageFormat::decode_with_depth_limit(MAX_XCM_DECODE_DEPTH, &mut data)
						.unwrap();
				if decoded_format != XcmpMessageFormat::ConcatenatedVersionedXcm {
					panic!("Unexpected format.")
				}
				let mut decoded_messages = Vec::new();
				while !data.is_empty() {
					let decoded_message = VersionedXcm::<()>::decode_with_depth_limit(
						MAX_XCM_DECODE_DEPTH,
						&mut data,
					)
					.unwrap();
					decoded_messages.push(decoded_message);
				}
				(
					VersionedLocation::V4((Parent, Parachain(para_id.into())).into()),
					decoded_messages,
				)
			})
			.collect()
	}
}
impl<T: Config> FeeTracker for Pallet<T> {
	type Id = ParaId;
	fn get_fee_factor(id: Self::Id) -> FixedU128 {
		<DeliveryFeeFactor<T>>::get(id)
	}
	fn increase_fee_factor(id: Self::Id, message_size_factor: FixedU128) -> FixedU128 {
		<DeliveryFeeFactor<T>>::mutate(id, |f| {
			*f = f.saturating_mul(
				delivery_fee_constants::EXPONENTIAL_FEE_BASE.saturating_add(message_size_factor),
			);
			*f
		})
	}
	fn decrease_fee_factor(id: Self::Id) -> FixedU128 {
		<DeliveryFeeFactor<T>>::mutate(id, |f| {
			*f = InitialFactor::get().max(*f / delivery_fee_constants::EXPONENTIAL_FEE_BASE);
			*f
		})
	}
}