1
// Copyright (C) Parity Technologies (UK) Ltd.
2
// This file is part of Polkadot.
3

            
4
// Polkadot is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Polkadot is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
use crate::{
18
	configuration::{self, HostConfiguration},
19
	dmp, ensure_parachain, initializer, paras,
20
};
21
use alloc::{
22
	collections::{btree_map::BTreeMap, btree_set::BTreeSet},
23
	vec,
24
	vec::Vec,
25
};
26
use codec::{Decode, Encode};
27
use core::{fmt, mem};
28
use frame_support::{pallet_prelude::*, traits::ReservableCurrency, DefaultNoBound};
29
use frame_system::pallet_prelude::*;
30
use polkadot_parachain_primitives::primitives::{HorizontalMessages, IsSystem};
31
use polkadot_primitives::{
32
	Balance, Hash, HrmpChannelId, Id as ParaId, InboundHrmpMessage, OutboundHrmpMessage,
33
	SessionIndex,
34
};
35
use scale_info::TypeInfo;
36
use sp_runtime::{
37
	traits::{AccountIdConversion, BlakeTwo256, Hash as HashT, UniqueSaturatedInto, Zero},
38
	ArithmeticError,
39
};
40

            
41
pub use pallet::*;
42

            
43
/// Maximum bound that can be set for inbound channels.
44
///
45
/// If inaccurate, the weighing of this pallet might become inaccurate. It is expected form the
46
/// `configurations` pallet to check these values before setting
47
pub const HRMP_MAX_INBOUND_CHANNELS_BOUND: u32 = 128;
48
/// Same as [`HRMP_MAX_INBOUND_CHANNELS_BOUND`], but for outbound channels.
49
pub const HRMP_MAX_OUTBOUND_CHANNELS_BOUND: u32 = 128;
50

            
51
#[cfg(test)]
52
pub(crate) mod tests;
53

            
54
#[cfg(feature = "runtime-benchmarks")]
55
mod benchmarking;
56

            
57
pub trait WeightInfo {
58
	fn hrmp_init_open_channel() -> Weight;
59
	fn hrmp_accept_open_channel() -> Weight;
60
	fn hrmp_close_channel() -> Weight;
61
	fn force_clean_hrmp(i: u32, e: u32) -> Weight;
62
	fn force_process_hrmp_open(c: u32) -> Weight;
63
	fn force_process_hrmp_close(c: u32) -> Weight;
64
	fn hrmp_cancel_open_request(c: u32) -> Weight;
65
	fn clean_open_channel_requests(c: u32) -> Weight;
66
	fn force_open_hrmp_channel(c: u32) -> Weight;
67
	fn establish_system_channel() -> Weight;
68
	fn poke_channel_deposits() -> Weight;
69
	fn establish_channel_with_system() -> Weight;
70
}
71

            
72
/// A weight info that is only suitable for testing.
73
pub struct TestWeightInfo;
74

            
75
impl WeightInfo for TestWeightInfo {
76
	fn hrmp_accept_open_channel() -> Weight {
77
		Weight::MAX
78
	}
79
	fn force_clean_hrmp(_: u32, _: u32) -> Weight {
80
		Weight::MAX
81
	}
82
	fn force_process_hrmp_close(_: u32) -> Weight {
83
		Weight::MAX
84
	}
85
	fn force_process_hrmp_open(_: u32) -> Weight {
86
		Weight::MAX
87
	}
88
	fn hrmp_cancel_open_request(_: u32) -> Weight {
89
		Weight::MAX
90
	}
91
	fn hrmp_close_channel() -> Weight {
92
		Weight::MAX
93
	}
94
	fn hrmp_init_open_channel() -> Weight {
95
		Weight::MAX
96
	}
97
	fn clean_open_channel_requests(_: u32) -> Weight {
98
		Weight::MAX
99
	}
100
	fn force_open_hrmp_channel(_: u32) -> Weight {
101
		Weight::MAX
102
	}
103
	fn establish_system_channel() -> Weight {
104
		Weight::MAX
105
	}
106
	fn poke_channel_deposits() -> Weight {
107
		Weight::MAX
108
	}
109
	fn establish_channel_with_system() -> Weight {
110
		Weight::MAX
111
	}
112
}
113

            
114
/// A description of a request to open an HRMP channel.
115
#[derive(Encode, Decode, TypeInfo)]
116
pub struct HrmpOpenChannelRequest {
117
	/// Indicates if this request was confirmed by the recipient.
118
	pub confirmed: bool,
119
	/// NOTE: this field is deprecated. Channel open requests became non-expiring and this value
120
	/// became unused.
121
	pub _age: SessionIndex,
122
	/// The amount that the sender supplied at the time of creation of this request.
123
	pub sender_deposit: Balance,
124
	/// The maximum message size that could be put into the channel.
125
	pub max_message_size: u32,
126
	/// The maximum number of messages that can be pending in the channel at once.
127
	pub max_capacity: u32,
128
	/// The maximum total size of the messages that can be pending in the channel at once.
129
	pub max_total_size: u32,
130
}
131

            
132
/// A metadata of an HRMP channel.
133
#[derive(Encode, Decode, TypeInfo)]
134
#[cfg_attr(test, derive(Debug))]
135
pub struct HrmpChannel {
136
	// NOTE: This structure is used by parachains via merkle proofs. Therefore, this struct
137
	// requires special treatment.
138
	//
139
	// A parachain requested this struct can only depend on the subset of this struct.
140
	// Specifically, only a first few fields can be depended upon (See `AbridgedHrmpChannel`).
141
	// These fields cannot be changed without corresponding migration of parachains.
142
	/// The maximum number of messages that can be pending in the channel at once.
143
	pub max_capacity: u32,
144
	/// The maximum total size of the messages that can be pending in the channel at once.
145
	pub max_total_size: u32,
146
	/// The maximum message size that could be put into the channel.
147
	pub max_message_size: u32,
148
	/// The current number of messages pending in the channel.
149
	/// Invariant: should be less or equal to `max_capacity`.s`.
150
	pub msg_count: u32,
151
	/// The total size in bytes of all message payloads in the channel.
152
	/// Invariant: should be less or equal to `max_total_size`.
153
	pub total_size: u32,
154
	/// A head of the Message Queue Chain for this channel. Each link in this chain has a form:
155
	/// `(prev_head, B, H(M))`, where
156
	/// - `prev_head`: is the previous value of `mqc_head` or zero if none.
157
	/// - `B`: is the [relay-chain] block number in which a message was appended
158
	/// - `H(M)`: is the hash of the message being appended.
159
	/// This value is initialized to a special value that consists of all zeroes which indicates
160
	/// that no messages were previously added.
161
	pub mqc_head: Option<Hash>,
162
	/// The amount that the sender supplied as a deposit when opening this channel.
163
	pub sender_deposit: Balance,
164
	/// The amount that the recipient supplied as a deposit when accepting opening this channel.
165
	pub recipient_deposit: Balance,
166
}
167

            
168
/// An error returned by [`Pallet::check_hrmp_watermark`] that indicates an acceptance criteria
169
/// check didn't pass.
170
pub(crate) enum HrmpWatermarkAcceptanceErr<BlockNumber> {
171
	AdvancementRule { new_watermark: BlockNumber, last_watermark: BlockNumber },
172
	AheadRelayParent { new_watermark: BlockNumber, relay_chain_parent_number: BlockNumber },
173
	LandsOnBlockWithNoMessages { new_watermark: BlockNumber },
174
}
175

            
176
/// An error returned by [`Pallet::check_outbound_hrmp`] that indicates an acceptance criteria check
177
/// didn't pass.
178
pub(crate) enum OutboundHrmpAcceptanceErr {
179
	MoreMessagesThanPermitted { sent: u32, permitted: u32 },
180
	NotSorted { idx: u32 },
181
	NoSuchChannel { idx: u32, channel_id: HrmpChannelId },
182
	MaxMessageSizeExceeded { idx: u32, msg_size: u32, max_size: u32 },
183
	TotalSizeExceeded { idx: u32, total_size: u32, limit: u32 },
184
	CapacityExceeded { idx: u32, count: u32, limit: u32 },
185
}
186

            
187
impl<BlockNumber> fmt::Debug for HrmpWatermarkAcceptanceErr<BlockNumber>
188
where
189
	BlockNumber: fmt::Debug,
190
{
191
	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
192
		use HrmpWatermarkAcceptanceErr::*;
193
		match self {
194
			AdvancementRule { new_watermark, last_watermark } => write!(
195
				fmt,
196
				"the HRMP watermark is not advanced relative to the last watermark ({:?} > {:?})",
197
				new_watermark, last_watermark,
198
			),
199
			AheadRelayParent { new_watermark, relay_chain_parent_number } => write!(
200
				fmt,
201
				"the HRMP watermark is ahead the relay-parent ({:?} > {:?})",
202
				new_watermark, relay_chain_parent_number
203
			),
204
			LandsOnBlockWithNoMessages { new_watermark } => write!(
205
				fmt,
206
				"the HRMP watermark ({:?}) doesn't land on a block with messages received",
207
				new_watermark
208
			),
209
		}
210
	}
211
}
212

            
213
impl fmt::Debug for OutboundHrmpAcceptanceErr {
214
	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
215
		use OutboundHrmpAcceptanceErr::*;
216
		match self {
217
			MoreMessagesThanPermitted { sent, permitted } => write!(
218
				fmt,
219
				"more HRMP messages than permitted by config ({} > {})",
220
				sent, permitted,
221
			),
222
			NotSorted { idx } => {
223
				write!(fmt, "the HRMP messages are not sorted (first unsorted is at index {})", idx,)
224
			},
225
			NoSuchChannel { idx, channel_id } => write!(
226
				fmt,
227
				"the HRMP message at index {} is sent to a non existent channel {:?}->{:?}",
228
				idx, channel_id.sender, channel_id.recipient,
229
			),
230
			MaxMessageSizeExceeded { idx, msg_size, max_size } => write!(
231
				fmt,
232
				"the HRMP message at index {} exceeds the negotiated channel maximum message size ({} > {})",
233
				idx, msg_size, max_size,
234
			),
235
			TotalSizeExceeded { idx, total_size, limit } => write!(
236
				fmt,
237
				"sending the HRMP message at index {} would exceed the negotiated channel total size  ({} > {})",
238
				idx, total_size, limit,
239
			),
240
			CapacityExceeded { idx, count, limit } => write!(
241
				fmt,
242
				"sending the HRMP message at index {} would exceed the negotiated channel capacity  ({} > {})",
243
				idx, count, limit,
244
			),
245
		}
246
	}
247
}
248

            
249
1948
#[frame_support::pallet]
250
pub mod pallet {
251
	use super::*;
252

            
253
554790
	#[pallet::pallet]
254
	#[pallet::without_storage_info]
255
	pub struct Pallet<T>(_);
256

            
257
	#[pallet::config]
258
	pub trait Config:
259
		frame_system::Config + configuration::Config + paras::Config + dmp::Config
260
	{
261
		/// The outer event type.
262
		type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
263

            
264
		type RuntimeOrigin: From<crate::Origin>
265
			+ From<<Self as frame_system::Config>::RuntimeOrigin>
266
			+ Into<Result<crate::Origin, <Self as Config>::RuntimeOrigin>>;
267

            
268
		/// The origin that can perform "force" actions on channels.
269
		type ChannelManager: EnsureOrigin<<Self as frame_system::Config>::RuntimeOrigin>;
270

            
271
		/// An interface for reserving deposits for opening channels.
272
		///
273
		/// NOTE that this Currency instance will be charged with the amounts defined in the
274
		/// `Configuration` pallet. Specifically, that means that the `Balance` of the `Currency`
275
		/// implementation should be the same as `Balance` as used in the `Configuration`.
276
		type Currency: ReservableCurrency<Self::AccountId>;
277

            
278
		/// The default channel size and capacity to use when opening a channel to a system
279
		/// parachain.
280
		type DefaultChannelSizeAndCapacityWithSystem: Get<(u32, u32)>;
281

            
282
		/// Means of converting an `Xcm` into a `VersionedXcm`. This pallet sends HRMP XCM
283
		/// notifications to the channel-related parachains, while the `WrapVersion` implementation
284
		/// attempts to wrap them into the most suitable XCM version for the destination parachain.
285
		///
286
		/// NOTE: For example, `pallet_xcm` provides an accurate implementation (recommended), or
287
		/// the default `()` implementation uses the latest XCM version for all parachains.
288
		type VersionWrapper: xcm::WrapVersion;
289

            
290
		/// Something that provides the weight of this pallet.
291
		type WeightInfo: WeightInfo;
292
	}
293

            
294
	#[pallet::event]
295
	#[pallet::generate_deposit(pub(super) fn deposit_event)]
296
	pub enum Event<T: Config> {
297
		/// Open HRMP channel requested.
298
		OpenChannelRequested {
299
			sender: ParaId,
300
			recipient: ParaId,
301
			proposed_max_capacity: u32,
302
			proposed_max_message_size: u32,
303
		},
304
		/// An HRMP channel request sent by the receiver was canceled by either party.
305
		OpenChannelCanceled { by_parachain: ParaId, channel_id: HrmpChannelId },
306
		/// Open HRMP channel accepted.
307
		OpenChannelAccepted { sender: ParaId, recipient: ParaId },
308
		/// HRMP channel closed.
309
		ChannelClosed { by_parachain: ParaId, channel_id: HrmpChannelId },
310
		/// An HRMP channel was opened via Root origin.
311
		HrmpChannelForceOpened {
312
			sender: ParaId,
313
			recipient: ParaId,
314
			proposed_max_capacity: u32,
315
			proposed_max_message_size: u32,
316
		},
317
		/// An HRMP channel was opened with a system chain.
318
		HrmpSystemChannelOpened {
319
			sender: ParaId,
320
			recipient: ParaId,
321
			proposed_max_capacity: u32,
322
			proposed_max_message_size: u32,
323
		},
324
		/// An HRMP channel's deposits were updated.
325
		OpenChannelDepositsUpdated { sender: ParaId, recipient: ParaId },
326
	}
327

            
328
1692
	#[pallet::error]
329
	pub enum Error<T> {
330
		/// The sender tried to open a channel to themselves.
331
		OpenHrmpChannelToSelf,
332
		/// The recipient is not a valid para.
333
		OpenHrmpChannelInvalidRecipient,
334
		/// The requested capacity is zero.
335
		OpenHrmpChannelZeroCapacity,
336
		/// The requested capacity exceeds the global limit.
337
		OpenHrmpChannelCapacityExceedsLimit,
338
		/// The requested maximum message size is 0.
339
		OpenHrmpChannelZeroMessageSize,
340
		/// The open request requested the message size that exceeds the global limit.
341
		OpenHrmpChannelMessageSizeExceedsLimit,
342
		/// The channel already exists
343
		OpenHrmpChannelAlreadyExists,
344
		/// There is already a request to open the same channel.
345
		OpenHrmpChannelAlreadyRequested,
346
		/// The sender already has the maximum number of allowed outbound channels.
347
		OpenHrmpChannelLimitExceeded,
348
		/// The channel from the sender to the origin doesn't exist.
349
		AcceptHrmpChannelDoesntExist,
350
		/// The channel is already confirmed.
351
		AcceptHrmpChannelAlreadyConfirmed,
352
		/// The recipient already has the maximum number of allowed inbound channels.
353
		AcceptHrmpChannelLimitExceeded,
354
		/// The origin tries to close a channel where it is neither the sender nor the recipient.
355
		CloseHrmpChannelUnauthorized,
356
		/// The channel to be closed doesn't exist.
357
		CloseHrmpChannelDoesntExist,
358
		/// The channel close request is already requested.
359
		CloseHrmpChannelAlreadyUnderway,
360
		/// Canceling is requested by neither the sender nor recipient of the open channel request.
361
		CancelHrmpOpenChannelUnauthorized,
362
		/// The open request doesn't exist.
363
		OpenHrmpChannelDoesntExist,
364
		/// Cannot cancel an HRMP open channel request because it is already confirmed.
365
		OpenHrmpChannelAlreadyConfirmed,
366
		/// The provided witness data is wrong.
367
		WrongWitness,
368
		/// The channel between these two chains cannot be authorized.
369
		ChannelCreationNotAuthorized,
370
	}
371

            
372
	/// The set of pending HRMP open channel requests.
373
	///
374
	/// The set is accompanied by a list for iteration.
375
	///
376
	/// Invariant:
377
	/// - There are no channels that exists in list but not in the set and vice versa.
378
	#[pallet::storage]
379
	pub type HrmpOpenChannelRequests<T: Config> =
380
		StorageMap<_, Twox64Concat, HrmpChannelId, HrmpOpenChannelRequest>;
381

            
382
	// NOTE: could become bounded, but we don't have a global maximum for this.
383
	// `HRMP_MAX_INBOUND_CHANNELS_BOUND` are per parachain, while this storage tracks the
384
	// global state.
385
811410
	#[pallet::storage]
386
	pub type HrmpOpenChannelRequestsList<T: Config> =
387
		StorageValue<_, Vec<HrmpChannelId>, ValueQuery>;
388

            
389
	/// This mapping tracks how many open channel requests are initiated by a given sender para.
390
	/// Invariant: `HrmpOpenChannelRequests` should contain the same number of items that has
391
	/// `(X, _)` as the number of `HrmpOpenChannelRequestCount` for `X`.
392
	#[pallet::storage]
393
	pub type HrmpOpenChannelRequestCount<T: Config> =
394
		StorageMap<_, Twox64Concat, ParaId, u32, ValueQuery>;
395

            
396
	/// This mapping tracks how many open channel requests were accepted by a given recipient para.
397
	/// Invariant: `HrmpOpenChannelRequests` should contain the same number of items `(_, X)` with
398
	/// `confirmed` set to true, as the number of `HrmpAcceptedChannelRequestCount` for `X`.
399
	#[pallet::storage]
400
	pub type HrmpAcceptedChannelRequestCount<T: Config> =
401
		StorageMap<_, Twox64Concat, ParaId, u32, ValueQuery>;
402

            
403
	/// A set of pending HRMP close channel requests that are going to be closed during the session
404
	/// change. Used for checking if a given channel is registered for closure.
405
	///
406
	/// The set is accompanied by a list for iteration.
407
	///
408
	/// Invariant:
409
	/// - There are no channels that exists in list but not in the set and vice versa.
410
	#[pallet::storage]
411
	pub type HrmpCloseChannelRequests<T: Config> = StorageMap<_, Twox64Concat, HrmpChannelId, ()>;
412

            
413
270468
	#[pallet::storage]
414
	pub type HrmpCloseChannelRequestsList<T: Config> =
415
		StorageValue<_, Vec<HrmpChannelId>, ValueQuery>;
416

            
417
	/// The HRMP watermark associated with each para.
418
	/// Invariant:
419
	/// - each para `P` used here as a key should satisfy `Paras::is_valid_para(P)` within a
420
	///   session.
421
	#[pallet::storage]
422
	pub type HrmpWatermarks<T: Config> = StorageMap<_, Twox64Concat, ParaId, BlockNumberFor<T>>;
423

            
424
	/// HRMP channel data associated with each para.
425
	/// Invariant:
426
	/// - each participant in the channel should satisfy `Paras::is_valid_para(P)` within a session.
427
162
	#[pallet::storage]
428
	pub type HrmpChannels<T: Config> = StorageMap<_, Twox64Concat, HrmpChannelId, HrmpChannel>;
429

            
430
	/// Ingress/egress indexes allow to find all the senders and receivers given the opposite side.
431
	/// I.e.
432
	///
433
	/// (a) ingress index allows to find all the senders for a given recipient.
434
	/// (b) egress index allows to find all the recipients for a given sender.
435
	///
436
	/// Invariants:
437
	/// - for each ingress index entry for `P` each item `I` in the index should present in
438
	///   `HrmpChannels` as `(I, P)`.
439
	/// - for each egress index entry for `P` each item `E` in the index should present in
440
	///   `HrmpChannels` as `(P, E)`.
441
	/// - there should be no other dangling channels in `HrmpChannels`.
442
	/// - the vectors are sorted.
443
	#[pallet::storage]
444
	pub type HrmpIngressChannelsIndex<T: Config> =
445
		StorageMap<_, Twox64Concat, ParaId, Vec<ParaId>, ValueQuery>;
446

            
447
	// NOTE that this field is used by parachains via merkle storage proofs, therefore changing
448
	// the format will require migration of parachains.
449
	#[pallet::storage]
450
	pub type HrmpEgressChannelsIndex<T: Config> =
451
		StorageMap<_, Twox64Concat, ParaId, Vec<ParaId>, ValueQuery>;
452

            
453
	/// Storage for the messages for each channel.
454
	/// Invariant: cannot be non-empty if the corresponding channel in `HrmpChannels` is `None`.
455
	#[pallet::storage]
456
	pub type HrmpChannelContents<T: Config> = StorageMap<
457
		_,
458
		Twox64Concat,
459
		HrmpChannelId,
460
		Vec<InboundHrmpMessage<BlockNumberFor<T>>>,
461
		ValueQuery,
462
	>;
463

            
464
	/// Maintains a mapping that can be used to answer the question: What paras sent a message at
465
	/// the given block number for a given receiver. Invariants:
466
	/// - The inner `Vec<ParaId>` is never empty.
467
	/// - The inner `Vec<ParaId>` cannot store two same `ParaId`.
468
	/// - The outer vector is sorted ascending by block number and cannot store two items with the
469
	///   same block number.
470
	#[pallet::storage]
471
	pub type HrmpChannelDigests<T: Config> =
472
		StorageMap<_, Twox64Concat, ParaId, Vec<(BlockNumberFor<T>, Vec<ParaId>)>, ValueQuery>;
473

            
474
	/// Preopen the given HRMP channels.
475
	///
476
	/// The values in the tuple corresponds to
477
	/// `(sender, recipient, max_capacity, max_message_size)`, i.e. similar to `init_open_channel`.
478
	/// In fact, the initialization is performed as if the `init_open_channel` and
479
	/// `accept_open_channel` were called with the respective parameters and the session change take
480
	///  place.
481
	///
482
	/// As such, each channel initializer should satisfy the same constraints, namely:
483
	///
484
	/// 1. `max_capacity` and `max_message_size` should be within the limits set by the
485
	///    configuration pallet.
486
	/// 2. `sender` and `recipient` must be valid paras.
487
	#[pallet::genesis_config]
488
	#[derive(DefaultNoBound)]
489
	pub struct GenesisConfig<T: Config> {
490
		#[serde(skip)]
491
		_config: core::marker::PhantomData<T>,
492
		preopen_hrmp_channels: Vec<(ParaId, ParaId, u32, u32)>,
493
	}
494

            
495
	#[pallet::genesis_build]
496
	impl<T: Config> BuildGenesisConfig for GenesisConfig<T> {
497
3
		fn build(&self) {
498
3
			initialize_storage::<T>(&self.preopen_hrmp_channels);
499
3
		}
500
	}
501

            
502
11802
	#[pallet::call]
503
	impl<T: Config> Pallet<T> {
504
		/// Initiate opening a channel from a parachain to a given recipient with given channel
505
		/// parameters.
506
		///
507
		/// - `proposed_max_capacity` - specifies how many messages can be in the channel at once.
508
		/// - `proposed_max_message_size` - specifies the maximum size of the messages.
509
		///
510
		/// These numbers are a subject to the relay-chain configuration limits.
511
		///
512
		/// The channel can be opened only after the recipient confirms it and only on a session
513
		/// change.
514
		#[pallet::call_index(0)]
515
		#[pallet::weight(<T as Config>::WeightInfo::hrmp_init_open_channel())]
516
		pub fn hrmp_init_open_channel(
517
			origin: OriginFor<T>,
518
			recipient: ParaId,
519
			proposed_max_capacity: u32,
520
			proposed_max_message_size: u32,
521
297
		) -> DispatchResult {
522
297
			let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
523
			Self::init_open_channel(
524
				origin,
525
				recipient,
526
				proposed_max_capacity,
527
				proposed_max_message_size,
528
			)?;
529
			Self::deposit_event(Event::OpenChannelRequested {
530
				sender: origin,
531
				recipient,
532
				proposed_max_capacity,
533
				proposed_max_message_size,
534
			});
535
			Ok(())
536
		}
537

            
538
		/// Accept a pending open channel request from the given sender.
539
		///
540
		/// The channel will be opened only on the next session boundary.
541
		#[pallet::call_index(1)]
542
		#[pallet::weight(<T as Config>::WeightInfo::hrmp_accept_open_channel())]
543
63
		pub fn hrmp_accept_open_channel(origin: OriginFor<T>, sender: ParaId) -> DispatchResult {
544
63
			let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
545
			Self::accept_open_channel(origin, sender)?;
546
			Self::deposit_event(Event::OpenChannelAccepted { sender, recipient: origin });
547
			Ok(())
548
		}
549

            
550
		/// Initiate unilateral closing of a channel. The origin must be either the sender or the
551
		/// recipient in the channel being closed.
552
		///
553
		/// The closure can only happen on a session change.
554
		#[pallet::call_index(2)]
555
		#[pallet::weight(<T as Config>::WeightInfo::hrmp_close_channel())]
556
		pub fn hrmp_close_channel(
557
			origin: OriginFor<T>,
558
			channel_id: HrmpChannelId,
559
72
		) -> DispatchResult {
560
72
			let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
561
			Self::close_channel(origin, channel_id.clone())?;
562
			Self::deposit_event(Event::ChannelClosed { by_parachain: origin, channel_id });
563
			Ok(())
564
		}
565

            
566
		/// This extrinsic triggers the cleanup of all the HRMP storage items that a para may have.
567
		/// Normally this happens once per session, but this allows you to trigger the cleanup
568
		/// immediately for a specific parachain.
569
		///
570
		/// Number of inbound and outbound channels for `para` must be provided as witness data.
571
		///
572
		/// Origin must be the `ChannelManager`.
573
		#[pallet::call_index(3)]
574
		#[pallet::weight(<T as Config>::WeightInfo::force_clean_hrmp(*num_inbound, *num_outbound))]
575
		pub fn force_clean_hrmp(
576
			origin: OriginFor<T>,
577
			para: ParaId,
578
			num_inbound: u32,
579
			num_outbound: u32,
580
18
		) -> DispatchResult {
581
18
			T::ChannelManager::ensure_origin(origin)?;
582

            
583
			ensure!(
584
				HrmpIngressChannelsIndex::<T>::decode_len(para).unwrap_or_default() <=
585
					num_inbound as usize,
586
				Error::<T>::WrongWitness
587
			);
588
			ensure!(
589
				HrmpEgressChannelsIndex::<T>::decode_len(para).unwrap_or_default() <=
590
					num_outbound as usize,
591
				Error::<T>::WrongWitness
592
			);
593

            
594
			Self::clean_hrmp_after_outgoing(&para);
595
			Ok(())
596
		}
597

            
598
		/// Force process HRMP open channel requests.
599
		///
600
		/// If there are pending HRMP open channel requests, you can use this function to process
601
		/// all of those requests immediately.
602
		///
603
		/// Total number of opening channels must be provided as witness data.
604
		///
605
		/// Origin must be the `ChannelManager`.
606
		#[pallet::call_index(4)]
607
		#[pallet::weight(<T as Config>::WeightInfo::force_process_hrmp_open(*channels))]
608
12
		pub fn force_process_hrmp_open(origin: OriginFor<T>, channels: u32) -> DispatchResult {
609
12
			T::ChannelManager::ensure_origin(origin)?;
610

            
611
			ensure!(
612
				HrmpOpenChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
613
					channels,
614
				Error::<T>::WrongWitness
615
			);
616

            
617
			let host_config = configuration::ActiveConfig::<T>::get();
618
			Self::process_hrmp_open_channel_requests(&host_config);
619
			Ok(())
620
		}
621

            
622
		/// Force process HRMP close channel requests.
623
		///
624
		/// If there are pending HRMP close channel requests, you can use this function to process
625
		/// all of those requests immediately.
626
		///
627
		/// Total number of closing channels must be provided as witness data.
628
		///
629
		/// Origin must be the `ChannelManager`.
630
		#[pallet::call_index(5)]
631
		#[pallet::weight(<T as Config>::WeightInfo::force_process_hrmp_close(*channels))]
632
9
		pub fn force_process_hrmp_close(origin: OriginFor<T>, channels: u32) -> DispatchResult {
633
9
			T::ChannelManager::ensure_origin(origin)?;
634

            
635
			ensure!(
636
				HrmpCloseChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
637
					channels,
638
				Error::<T>::WrongWitness
639
			);
640

            
641
			Self::process_hrmp_close_channel_requests();
642
			Ok(())
643
		}
644

            
645
		/// This cancels a pending open channel request. It can be canceled by either of the sender
646
		/// or the recipient for that request. The origin must be either of those.
647
		///
648
		/// The cancellation happens immediately. It is not possible to cancel the request if it is
649
		/// already accepted.
650
		///
651
		/// Total number of open requests (i.e. `HrmpOpenChannelRequestsList`) must be provided as
652
		/// witness data.
653
		#[pallet::call_index(6)]
654
		#[pallet::weight(<T as Config>::WeightInfo::hrmp_cancel_open_request(*open_requests))]
655
		pub fn hrmp_cancel_open_request(
656
			origin: OriginFor<T>,
657
			channel_id: HrmpChannelId,
658
			open_requests: u32,
659
348
		) -> DispatchResult {
660
348
			let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
661
			ensure!(
662
				HrmpOpenChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
663
					open_requests,
664
				Error::<T>::WrongWitness
665
			);
666
			Self::cancel_open_request(origin, channel_id.clone())?;
667
			Self::deposit_event(Event::OpenChannelCanceled { by_parachain: origin, channel_id });
668
			Ok(())
669
		}
670

            
671
		/// Open a channel from a `sender` to a `recipient` `ParaId`. Although opened by governance,
672
		/// the `max_capacity` and `max_message_size` are still subject to the Relay Chain's
673
		/// configured limits.
674
		///
675
		/// Expected use is when one (and only one) of the `ParaId`s involved in the channel is
676
		/// governed by the system, e.g. a system parachain.
677
		///
678
		/// Origin must be the `ChannelManager`.
679
		#[pallet::call_index(7)]
680
		#[pallet::weight(<T as Config>::WeightInfo::force_open_hrmp_channel(1))]
681
		pub fn force_open_hrmp_channel(
682
			origin: OriginFor<T>,
683
			sender: ParaId,
684
			recipient: ParaId,
685
			max_capacity: u32,
686
			max_message_size: u32,
687
42
		) -> DispatchResultWithPostInfo {
688
42
			T::ChannelManager::ensure_origin(origin)?;
689

            
690
			// Guard against a common footgun where someone makes a channel request to a system
691
			// parachain and then makes a proposal to open the channel via governance, which fails
692
			// because `init_open_channel` fails if there is an existing request. This check will
693
			// clear an existing request such that `init_open_channel` should otherwise succeed.
694
			let channel_id = HrmpChannelId { sender, recipient };
695
			let cancel_request: u32 =
696
				if let Some(_open_channel) = HrmpOpenChannelRequests::<T>::get(&channel_id) {
697
					Self::cancel_open_request(sender, channel_id)?;
698
					1
699
				} else {
700
					0
701
				};
702

            
703
			// Now we proceed with normal init/accept, except that we set `no_deposit` to true such
704
			// that it will not require deposits from either member.
705
			Self::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
706
			Self::accept_open_channel(recipient, sender)?;
707
			Self::deposit_event(Event::HrmpChannelForceOpened {
708
				sender,
709
				recipient,
710
				proposed_max_capacity: max_capacity,
711
				proposed_max_message_size: max_message_size,
712
			});
713

            
714
			Ok(Some(<T as Config>::WeightInfo::force_open_hrmp_channel(cancel_request)).into())
715
		}
716

            
717
		/// Establish an HRMP channel between two system chains. If the channel does not already
718
		/// exist, the transaction fees will be refunded to the caller. The system does not take
719
		/// deposits for channels between system chains, and automatically sets the message number
720
		/// and size limits to the maximum allowed by the network's configuration.
721
		///
722
		/// Arguments:
723
		///
724
		/// - `sender`: A system chain, `ParaId`.
725
		/// - `recipient`: A system chain, `ParaId`.
726
		///
727
		/// Any signed origin can call this function, but _both_ inputs MUST be system chains. If
728
		/// the channel does not exist yet, there is no fee.
729
		#[pallet::call_index(8)]
730
		#[pallet::weight(<T as Config>::WeightInfo::establish_system_channel())]
731
		pub fn establish_system_channel(
732
			origin: OriginFor<T>,
733
			sender: ParaId,
734
			recipient: ParaId,
735
684
		) -> DispatchResultWithPostInfo {
736
684
			let _caller = ensure_signed(origin)?;
737

            
738
			// both chains must be system
739
684
			ensure!(
740
684
				sender.is_system() && recipient.is_system(),
741
222
				Error::<T>::ChannelCreationNotAuthorized
742
			);
743

            
744
462
			let config = configuration::ActiveConfig::<T>::get();
745
462
			let max_message_size = config.hrmp_channel_max_message_size;
746
462
			let max_capacity = config.hrmp_channel_max_capacity;
747
462

            
748
462
			Self::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
749
			Self::accept_open_channel(recipient, sender)?;
750

            
751
			Self::deposit_event(Event::HrmpSystemChannelOpened {
752
				sender,
753
				recipient,
754
				proposed_max_capacity: max_capacity,
755
				proposed_max_message_size: max_message_size,
756
			});
757

            
758
			Ok(Pays::No.into())
759
		}
760

            
761
		/// Update the deposits held for an HRMP channel to the latest `Configuration`. Channels
762
		/// with system chains do not require a deposit.
763
		///
764
		/// Arguments:
765
		///
766
		/// - `sender`: A chain, `ParaId`.
767
		/// - `recipient`: A chain, `ParaId`.
768
		///
769
		/// Any signed origin can call this function.
770
		#[pallet::call_index(9)]
771
		#[pallet::weight(<T as Config>::WeightInfo::poke_channel_deposits())]
772
		pub fn poke_channel_deposits(
773
			origin: OriginFor<T>,
774
			sender: ParaId,
775
			recipient: ParaId,
776
162
		) -> DispatchResult {
777
162
			let _caller = ensure_signed(origin)?;
778
162
			let channel_id = HrmpChannelId { sender, recipient };
779
162
			let is_system = sender.is_system() || recipient.is_system();
780

            
781
162
			let config = configuration::ActiveConfig::<T>::get();
782

            
783
			// Channels with and amongst the system do not require a deposit.
784
162
			let (new_sender_deposit, new_recipient_deposit) = if is_system {
785
102
				(0, 0)
786
			} else {
787
60
				(config.hrmp_sender_deposit, config.hrmp_recipient_deposit)
788
			};
789

            
790
216
			let _ = HrmpChannels::<T>::mutate(&channel_id, |channel| -> DispatchResult {
791
162
				if let Some(ref mut channel) = channel {
792
					let current_sender_deposit = channel.sender_deposit;
793
					let current_recipient_deposit = channel.recipient_deposit;
794

            
795
					// nothing to update
796
					if current_sender_deposit == new_sender_deposit &&
797
						current_recipient_deposit == new_recipient_deposit
798
					{
799
						return Ok(())
800
					}
801

            
802
					// sender
803
					if current_sender_deposit > new_sender_deposit {
804
						// Can never underflow, but be paranoid.
805
						let amount = current_sender_deposit
806
							.checked_sub(new_sender_deposit)
807
							.ok_or(ArithmeticError::Underflow)?;
808
						T::Currency::unreserve(
809
							&channel_id.sender.into_account_truncating(),
810
							// The difference should always be convertible into `Balance`, but be
811
							// paranoid and do nothing in case.
812
							amount.try_into().unwrap_or(Zero::zero()),
813
						);
814
					} else if current_sender_deposit < new_sender_deposit {
815
						let amount = new_sender_deposit
816
							.checked_sub(current_sender_deposit)
817
							.ok_or(ArithmeticError::Underflow)?;
818
						T::Currency::reserve(
819
							&channel_id.sender.into_account_truncating(),
820
							amount.try_into().unwrap_or(Zero::zero()),
821
						)?;
822
					}
823

            
824
					// recipient
825
					if current_recipient_deposit > new_recipient_deposit {
826
						let amount = current_recipient_deposit
827
							.checked_sub(new_recipient_deposit)
828
							.ok_or(ArithmeticError::Underflow)?;
829
						T::Currency::unreserve(
830
							&channel_id.recipient.into_account_truncating(),
831
							amount.try_into().unwrap_or(Zero::zero()),
832
						);
833
					} else if current_recipient_deposit < new_recipient_deposit {
834
						let amount = new_recipient_deposit
835
							.checked_sub(current_recipient_deposit)
836
							.ok_or(ArithmeticError::Underflow)?;
837
						T::Currency::reserve(
838
							&channel_id.recipient.into_account_truncating(),
839
							amount.try_into().unwrap_or(Zero::zero()),
840
						)?;
841
					}
842

            
843
					// update storage
844
					channel.sender_deposit = new_sender_deposit;
845
					channel.recipient_deposit = new_recipient_deposit;
846
				} else {
847
162
					return Err(Error::<T>::OpenHrmpChannelDoesntExist.into())
848
				}
849
				Ok(())
850
216
			})?;
851

            
852
			Self::deposit_event(Event::OpenChannelDepositsUpdated { sender, recipient });
853

            
854
			Ok(())
855
		}
856

            
857
		/// Establish a bidirectional HRMP channel between a parachain and a system chain.
858
		///
859
		/// Arguments:
860
		///
861
		/// - `target_system_chain`: A system chain, `ParaId`.
862
		///
863
		/// The origin needs to be the parachain origin.
864
		#[pallet::call_index(10)]
865
		#[pallet::weight(<T as Config>::WeightInfo::establish_channel_with_system())]
866
		pub fn establish_channel_with_system(
867
			origin: OriginFor<T>,
868
			target_system_chain: ParaId,
869
237
		) -> DispatchResultWithPostInfo {
870
237
			let sender = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
871

            
872
			ensure!(target_system_chain.is_system(), Error::<T>::ChannelCreationNotAuthorized);
873

            
874
			let (max_message_size, max_capacity) =
875
				T::DefaultChannelSizeAndCapacityWithSystem::get();
876

            
877
			// create bidirectional channel
878
			Self::init_open_channel(sender, target_system_chain, max_capacity, max_message_size)?;
879
			Self::accept_open_channel(target_system_chain, sender)?;
880

            
881
			Self::init_open_channel(target_system_chain, sender, max_capacity, max_message_size)?;
882
			Self::accept_open_channel(sender, target_system_chain)?;
883

            
884
			Self::deposit_event(Event::HrmpSystemChannelOpened {
885
				sender,
886
				recipient: target_system_chain,
887
				proposed_max_capacity: max_capacity,
888
				proposed_max_message_size: max_message_size,
889
			});
890

            
891
			Self::deposit_event(Event::HrmpSystemChannelOpened {
892
				sender: target_system_chain,
893
				recipient: sender,
894
				proposed_max_capacity: max_capacity,
895
				proposed_max_message_size: max_message_size,
896
			});
897

            
898
			Ok(Pays::No.into())
899
		}
900
	}
901
}
902

            
903
3
fn initialize_storage<T: Config>(preopen_hrmp_channels: &[(ParaId, ParaId, u32, u32)]) {
904
3
	let host_config = configuration::ActiveConfig::<T>::get();
905
3
	for &(sender, recipient, max_capacity, max_message_size) in preopen_hrmp_channels {
906
		if let Err(err) =
907
			preopen_hrmp_channel::<T>(sender, recipient, max_capacity, max_message_size)
908
		{
909
			panic!("failed to initialize the genesis storage: {:?}", err);
910
		}
911
	}
912
3
	Pallet::<T>::process_hrmp_open_channel_requests(&host_config);
913
3
}
914

            
915
fn preopen_hrmp_channel<T: Config>(
916
	sender: ParaId,
917
	recipient: ParaId,
918
	max_capacity: u32,
919
	max_message_size: u32,
920
) -> DispatchResult {
921
	Pallet::<T>::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
922
	Pallet::<T>::accept_open_channel(recipient, sender)?;
923
	Ok(())
924
}
925

            
926
/// Routines and getters related to HRMP.
927
impl<T: Config> Pallet<T> {
928
	/// Block initialization logic, called by initializer.
929
194763
	pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
930
194763
		Weight::zero()
931
194763
	}
932

            
933
	/// Block finalization logic, called by initializer.
934
194763
	pub(crate) fn initializer_finalize() {}
935

            
936
	/// Called by the initializer to note that a new session has started.
937
135234
	pub(crate) fn initializer_on_new_session(
938
135234
		notification: &initializer::SessionChangeNotification<BlockNumberFor<T>>,
939
135234
		outgoing_paras: &[ParaId],
940
135234
	) -> Weight {
941
135234
		let w1 = Self::perform_outgoing_para_cleanup(&notification.prev_config, outgoing_paras);
942
135234
		Self::process_hrmp_open_channel_requests(&notification.prev_config);
943
135234
		Self::process_hrmp_close_channel_requests();
944
135234
		w1.saturating_add(<T as Config>::WeightInfo::force_process_hrmp_open(
945
135234
			outgoing_paras.len() as u32
946
135234
		))
947
135234
		.saturating_add(<T as Config>::WeightInfo::force_process_hrmp_close(
948
135234
			outgoing_paras.len() as u32
949
135234
		))
950
135234
	}
951

            
952
	/// Iterate over all paras that were noted for offboarding and remove all the data
953
	/// associated with them.
954
135234
	fn perform_outgoing_para_cleanup(
955
135234
		config: &HostConfiguration<BlockNumberFor<T>>,
956
135234
		outgoing: &[ParaId],
957
135234
	) -> Weight {
958
135234
		let mut w = Self::clean_open_channel_requests(config, outgoing);
959
135234
		for outgoing_para in outgoing {
960
			Self::clean_hrmp_after_outgoing(outgoing_para);
961

            
962
			// we need a few extra bits of data to weigh this -- all of this is read internally
963
			// anyways, so no overhead.
964
			let ingress_count =
965
				HrmpIngressChannelsIndex::<T>::decode_len(outgoing_para).unwrap_or_default() as u32;
966
			let egress_count =
967
				HrmpEgressChannelsIndex::<T>::decode_len(outgoing_para).unwrap_or_default() as u32;
968
			w = w.saturating_add(<T as Config>::WeightInfo::force_clean_hrmp(
969
				ingress_count,
970
				egress_count,
971
			));
972
		}
973
135234
		w
974
135234
	}
975

            
976
	// Go over the HRMP open channel requests and remove all in which offboarding paras participate.
977
	//
978
	// This will also perform the refunds for the counterparty if it doesn't offboard.
979
135234
	pub(crate) fn clean_open_channel_requests(
980
135234
		config: &HostConfiguration<BlockNumberFor<T>>,
981
135234
		outgoing: &[ParaId],
982
135234
	) -> Weight {
983
135234
		// First collect all the channel ids of the open requests in which there is at least one
984
135234
		// party presents in the outgoing list.
985
135234
		//
986
135234
		// Both the open channel request list and outgoing list are expected to be small enough.
987
135234
		// In the most common case there will be only single outgoing para.
988
135234
		let open_channel_reqs = HrmpOpenChannelRequestsList::<T>::get();
989
135234
		let (go, stay): (Vec<HrmpChannelId>, Vec<HrmpChannelId>) = open_channel_reqs
990
135234
			.into_iter()
991
135234
			.partition(|req_id| outgoing.iter().any(|id| req_id.is_participant(*id)));
992
135234
		HrmpOpenChannelRequestsList::<T>::put(stay);
993

            
994
		// Then iterate over all open requests to be removed, pull them out of the set and perform
995
		// the refunds if applicable.
996
135234
		for req_id in go {
997
			let req_data = match HrmpOpenChannelRequests::<T>::take(&req_id) {
998
				Some(req_data) => req_data,
999
				None => {
					// Can't normally happen but no need to panic.
					continue
				},
			};
			// Return the deposit of the sender, but only if it is not the para being offboarded.
			if !outgoing.contains(&req_id.sender) {
				T::Currency::unreserve(
					&req_id.sender.into_account_truncating(),
					req_data.sender_deposit.unique_saturated_into(),
				);
			}
			// If the request was confirmed, then it means it was confirmed in the finished session.
			// Therefore, the config's hrmp_recipient_deposit represents the actual value of the
			// deposit.
			//
			// We still want to refund the deposit only if the para is not being offboarded.
			if req_data.confirmed {
				if !outgoing.contains(&req_id.recipient) {
					T::Currency::unreserve(
						&req_id.recipient.into_account_truncating(),
						config.hrmp_recipient_deposit.unique_saturated_into(),
					);
				}
				Self::decrease_accepted_channel_request_count(req_id.recipient);
			}
		}
135234
		<T as Config>::WeightInfo::clean_open_channel_requests(outgoing.len() as u32)
135234
	}
	/// Remove all storage entries associated with the given para.
	fn clean_hrmp_after_outgoing(outgoing_para: &ParaId) {
		HrmpOpenChannelRequestCount::<T>::remove(outgoing_para);
		HrmpAcceptedChannelRequestCount::<T>::remove(outgoing_para);
		let ingress = HrmpIngressChannelsIndex::<T>::take(outgoing_para)
			.into_iter()
			.map(|sender| HrmpChannelId { sender, recipient: *outgoing_para });
		let egress = HrmpEgressChannelsIndex::<T>::take(outgoing_para)
			.into_iter()
			.map(|recipient| HrmpChannelId { sender: *outgoing_para, recipient });
		let mut to_close = ingress.chain(egress).collect::<Vec<_>>();
		to_close.sort();
		to_close.dedup();
		for channel in to_close {
			Self::close_hrmp_channel(&channel);
		}
	}
	/// Iterate over all open channel requests and:
	///
	/// - prune the stale requests
	/// - enact the confirmed requests
135237
	fn process_hrmp_open_channel_requests(config: &HostConfiguration<BlockNumberFor<T>>) {
135237
		let mut open_req_channels = HrmpOpenChannelRequestsList::<T>::get();
135237
		if open_req_channels.is_empty() {
135237
			return
		}
		// iterate the vector starting from the end making our way to the beginning. This way we
		// can leverage `swap_remove` to efficiently remove an item during iteration.
		let mut idx = open_req_channels.len();
		loop {
			// bail if we've iterated over all items.
			if idx == 0 {
				break
			}
			idx -= 1;
			let channel_id = open_req_channels[idx].clone();
			let request = HrmpOpenChannelRequests::<T>::get(&channel_id).expect(
				"can't be `None` due to the invariant that the list contains the same items as the set; qed",
			);
			let system_channel = channel_id.sender.is_system() || channel_id.recipient.is_system();
			let sender_deposit = request.sender_deposit;
			let recipient_deposit = if system_channel { 0 } else { config.hrmp_recipient_deposit };
			if request.confirmed {
				if paras::Pallet::<T>::is_valid_para(channel_id.sender) &&
					paras::Pallet::<T>::is_valid_para(channel_id.recipient)
				{
					HrmpChannels::<T>::insert(
						&channel_id,
						HrmpChannel {
							sender_deposit,
							recipient_deposit,
							max_capacity: request.max_capacity,
							max_total_size: request.max_total_size,
							max_message_size: request.max_message_size,
							msg_count: 0,
							total_size: 0,
							mqc_head: None,
						},
					);
					HrmpIngressChannelsIndex::<T>::mutate(&channel_id.recipient, |v| {
						if let Err(i) = v.binary_search(&channel_id.sender) {
							v.insert(i, channel_id.sender);
						}
					});
					HrmpEgressChannelsIndex::<T>::mutate(&channel_id.sender, |v| {
						if let Err(i) = v.binary_search(&channel_id.recipient) {
							v.insert(i, channel_id.recipient);
						}
					});
				}
				Self::decrease_open_channel_request_count(channel_id.sender);
				Self::decrease_accepted_channel_request_count(channel_id.recipient);
				let _ = open_req_channels.swap_remove(idx);
				HrmpOpenChannelRequests::<T>::remove(&channel_id);
			}
		}
		HrmpOpenChannelRequestsList::<T>::put(open_req_channels);
135237
	}
	/// Iterate over all close channel requests unconditionally closing the channels.
135234
	fn process_hrmp_close_channel_requests() {
135234
		let close_reqs = HrmpCloseChannelRequestsList::<T>::take();
135234
		for condemned_ch_id in close_reqs {
			HrmpCloseChannelRequests::<T>::remove(&condemned_ch_id);
			Self::close_hrmp_channel(&condemned_ch_id);
		}
135234
	}
	/// Close and remove the designated HRMP channel.
	///
	/// This includes returning the deposits.
	///
	/// This function is idempotent, meaning that after the first application it should have no
	/// effect (i.e. it won't return the deposits twice).
	fn close_hrmp_channel(channel_id: &HrmpChannelId) {
		if let Some(HrmpChannel { sender_deposit, recipient_deposit, .. }) =
			HrmpChannels::<T>::take(channel_id)
		{
			T::Currency::unreserve(
				&channel_id.sender.into_account_truncating(),
				sender_deposit.unique_saturated_into(),
			);
			T::Currency::unreserve(
				&channel_id.recipient.into_account_truncating(),
				recipient_deposit.unique_saturated_into(),
			);
		}
		HrmpChannelContents::<T>::remove(channel_id);
		HrmpEgressChannelsIndex::<T>::mutate(&channel_id.sender, |v| {
			if let Ok(i) = v.binary_search(&channel_id.recipient) {
				v.remove(i);
			}
		});
		HrmpIngressChannelsIndex::<T>::mutate(&channel_id.recipient, |v| {
			if let Ok(i) = v.binary_search(&channel_id.sender) {
				v.remove(i);
			}
		});
	}
	/// Check that the candidate of the given recipient controls the HRMP watermark properly.
	pub(crate) fn check_hrmp_watermark(
		recipient: ParaId,
		relay_chain_parent_number: BlockNumberFor<T>,
		new_hrmp_watermark: BlockNumberFor<T>,
	) -> Result<(), HrmpWatermarkAcceptanceErr<BlockNumberFor<T>>> {
		// First, check where the watermark CANNOT legally land.
		//
		// (a) For ensuring that messages are eventually processed, we require each parablock's
		//     watermark to be greater than the last one. The exception to this is if the previous
		//     watermark was already equal to the current relay-parent number.
		//
		// (b) However, a parachain cannot read into "the future", therefore the watermark should
		//     not be greater than the relay-chain context block which the parablock refers to.
		if new_hrmp_watermark == relay_chain_parent_number {
			return Ok(())
		}
		if new_hrmp_watermark > relay_chain_parent_number {
			return Err(HrmpWatermarkAcceptanceErr::AheadRelayParent {
				new_watermark: new_hrmp_watermark,
				relay_chain_parent_number,
			})
		}
		if let Some(last_watermark) = HrmpWatermarks::<T>::get(&recipient) {
			if new_hrmp_watermark <= last_watermark {
				return Err(HrmpWatermarkAcceptanceErr::AdvancementRule {
					new_watermark: new_hrmp_watermark,
					last_watermark,
				})
			}
		}
		// Second, check where the watermark CAN land. It's one of the following:
		//
		// (a) The relay parent block number (checked above).
		// (b) A relay-chain block in which this para received at least one message (checked here)
		let digest = HrmpChannelDigests::<T>::get(&recipient);
		if !digest
			.binary_search_by_key(&new_hrmp_watermark, |(block_no, _)| *block_no)
			.is_ok()
		{
			return Err(HrmpWatermarkAcceptanceErr::LandsOnBlockWithNoMessages {
				new_watermark: new_hrmp_watermark,
			})
		}
		Ok(())
	}
	/// Returns HRMP watermarks of previously sent messages to a given para.
	pub(crate) fn valid_watermarks(recipient: ParaId) -> Vec<BlockNumberFor<T>> {
		HrmpChannelDigests::<T>::get(&recipient)
			.into_iter()
			.map(|(block_no, _)| block_no)
			.collect()
	}
	pub(crate) fn check_outbound_hrmp(
		config: &HostConfiguration<BlockNumberFor<T>>,
		sender: ParaId,
		out_hrmp_msgs: &[OutboundHrmpMessage<ParaId>],
	) -> Result<(), OutboundHrmpAcceptanceErr> {
		if out_hrmp_msgs.len() as u32 > config.hrmp_max_message_num_per_candidate {
			return Err(OutboundHrmpAcceptanceErr::MoreMessagesThanPermitted {
				sent: out_hrmp_msgs.len() as u32,
				permitted: config.hrmp_max_message_num_per_candidate,
			})
		}
		let mut last_recipient = None::<ParaId>;
		for (idx, out_msg) in
			out_hrmp_msgs.iter().enumerate().map(|(idx, out_msg)| (idx as u32, out_msg))
		{
			match last_recipient {
				// the messages must be sorted in ascending order and there must be no two messages
				// sent to the same recipient. Thus we can check that every recipient is strictly
				// greater than the previous one.
				Some(last_recipient) if out_msg.recipient <= last_recipient =>
					return Err(OutboundHrmpAcceptanceErr::NotSorted { idx }),
				_ => last_recipient = Some(out_msg.recipient),
			}
			let channel_id = HrmpChannelId { sender, recipient: out_msg.recipient };
			let channel = match HrmpChannels::<T>::get(&channel_id) {
				Some(channel) => channel,
				None => return Err(OutboundHrmpAcceptanceErr::NoSuchChannel { channel_id, idx }),
			};
			let msg_size = out_msg.data.len() as u32;
			if msg_size > channel.max_message_size {
				return Err(OutboundHrmpAcceptanceErr::MaxMessageSizeExceeded {
					idx,
					msg_size,
					max_size: channel.max_message_size,
				})
			}
			let new_total_size = channel.total_size + out_msg.data.len() as u32;
			if new_total_size > channel.max_total_size {
				return Err(OutboundHrmpAcceptanceErr::TotalSizeExceeded {
					idx,
					total_size: new_total_size,
					limit: channel.max_total_size,
				})
			}
			let new_msg_count = channel.msg_count + 1;
			if new_msg_count > channel.max_capacity {
				return Err(OutboundHrmpAcceptanceErr::CapacityExceeded {
					idx,
					count: new_msg_count,
					limit: channel.max_capacity,
				})
			}
		}
		Ok(())
	}
	/// Returns remaining outbound channels capacity in messages and in bytes per recipient para.
	pub(crate) fn outbound_remaining_capacity(sender: ParaId) -> Vec<(ParaId, (u32, u32))> {
		let recipients = HrmpEgressChannelsIndex::<T>::get(&sender);
		let mut remaining = Vec::with_capacity(recipients.len());
		for recipient in recipients {
			let Some(channel) = HrmpChannels::<T>::get(&HrmpChannelId { sender, recipient }) else {
				continue
			};
			remaining.push((
				recipient,
				(
					channel.max_capacity - channel.msg_count,
					channel.max_total_size - channel.total_size,
				),
			));
		}
		remaining
	}
	pub(crate) fn prune_hrmp(recipient: ParaId, new_hrmp_watermark: BlockNumberFor<T>) -> Weight {
		let mut weight = Weight::zero();
		// sift through the incoming messages digest to collect the paras that sent at least one
		// message to this parachain between the old and new watermarks.
		let senders = HrmpChannelDigests::<T>::mutate(&recipient, |digest| {
			let mut senders = BTreeSet::new();
			let mut leftover = Vec::with_capacity(digest.len());
			for (block_no, paras_sent_msg) in mem::replace(digest, Vec::new()) {
				if block_no <= new_hrmp_watermark {
					senders.extend(paras_sent_msg);
				} else {
					leftover.push((block_no, paras_sent_msg));
				}
			}
			*digest = leftover;
			senders
		});
		weight += T::DbWeight::get().reads_writes(1, 1);
		// having all senders we can trivially find out the channels which we need to prune.
		let channels_to_prune =
			senders.into_iter().map(|sender| HrmpChannelId { sender, recipient });
		for channel_id in channels_to_prune {
			// prune each channel up to the new watermark keeping track how many messages we removed
			// and what is the total byte size of them.
			let (mut pruned_cnt, mut pruned_size) = (0, 0);
			let contents = HrmpChannelContents::<T>::get(&channel_id);
			let mut leftover = Vec::with_capacity(contents.len());
			for msg in contents {
				if msg.sent_at <= new_hrmp_watermark {
					pruned_cnt += 1;
					pruned_size += msg.data.len();
				} else {
					leftover.push(msg);
				}
			}
			if !leftover.is_empty() {
				HrmpChannelContents::<T>::insert(&channel_id, leftover);
			} else {
				HrmpChannelContents::<T>::remove(&channel_id);
			}
			// update the channel metadata.
			HrmpChannels::<T>::mutate(&channel_id, |channel| {
				if let Some(ref mut channel) = channel {
					channel.msg_count -= pruned_cnt as u32;
					channel.total_size -= pruned_size as u32;
				}
			});
			weight += T::DbWeight::get().reads_writes(2, 2);
		}
		HrmpWatermarks::<T>::insert(&recipient, new_hrmp_watermark);
		weight += T::DbWeight::get().reads_writes(0, 1);
		weight
	}
	/// Process the outbound HRMP messages by putting them into the appropriate recipient queues.
	///
	/// Returns the amount of weight consumed.
	pub(crate) fn queue_outbound_hrmp(sender: ParaId, out_hrmp_msgs: HorizontalMessages) -> Weight {
		let mut weight = Weight::zero();
		let now = frame_system::Pallet::<T>::block_number();
		for out_msg in out_hrmp_msgs {
			let channel_id = HrmpChannelId { sender, recipient: out_msg.recipient };
			let mut channel = match HrmpChannels::<T>::get(&channel_id) {
				Some(channel) => channel,
				None => {
					// apparently, that since acceptance of this candidate the recipient was
					// offboarded and the channel no longer exists.
					continue
				},
			};
			let inbound = InboundHrmpMessage { sent_at: now, data: out_msg.data };
			// book keeping
			channel.msg_count += 1;
			channel.total_size += inbound.data.len() as u32;
			// compute the new MQC head of the channel
			let prev_head = channel.mqc_head.unwrap_or(Default::default());
			let new_head = BlakeTwo256::hash_of(&(
				prev_head,
				inbound.sent_at,
				T::Hashing::hash_of(&inbound.data),
			));
			channel.mqc_head = Some(new_head);
			HrmpChannels::<T>::insert(&channel_id, channel);
			HrmpChannelContents::<T>::append(&channel_id, inbound);
			// The digests are sorted in ascending by block number order. There are only two
			// possible scenarios here ("the current" is the block of candidate's inclusion):
			//
			// (a) It's the first time anybody sends a message to this recipient within this block.
			//     In this case, the digest vector would be empty or the block number of the latest
			//     entry is smaller than the current.
			//
			// (b) Somebody has already sent a message within the current block. That means that
			//     the block number of the latest entry is equal to the current.
			//
			// Note that having the latest entry greater than the current block number is a logical
			// error.
			let mut recipient_digest = HrmpChannelDigests::<T>::get(&channel_id.recipient);
			if let Some(cur_block_digest) = recipient_digest
				.last_mut()
				.filter(|(block_no, _)| *block_no == now)
				.map(|(_, ref mut d)| d)
			{
				cur_block_digest.push(sender);
			} else {
				recipient_digest.push((now, vec![sender]));
			}
			HrmpChannelDigests::<T>::insert(&channel_id.recipient, recipient_digest);
			weight += T::DbWeight::get().reads_writes(2, 2);
		}
		weight
	}
	/// Initiate opening a channel from a parachain to a given recipient with given channel
	/// parameters. If neither chain is part of the system, then a deposit from the `Configuration`
	/// will be required for `origin` (the sender) upon opening the request and the `recipient` upon
	/// accepting it.
	///
	/// Basically the same as [`hrmp_init_open_channel`](Pallet::hrmp_init_open_channel) but
	/// intended for calling directly from other pallets rather than dispatched.
462
	pub fn init_open_channel(
462
		origin: ParaId,
462
		recipient: ParaId,
462
		proposed_max_capacity: u32,
462
		proposed_max_message_size: u32,
462
	) -> DispatchResult {
462
		ensure!(origin != recipient, Error::<T>::OpenHrmpChannelToSelf);
42
		ensure!(
42
			paras::Pallet::<T>::is_valid_para(recipient),
42
			Error::<T>::OpenHrmpChannelInvalidRecipient,
		);
		let config = configuration::ActiveConfig::<T>::get();
		ensure!(proposed_max_capacity > 0, Error::<T>::OpenHrmpChannelZeroCapacity);
		ensure!(
			proposed_max_capacity <= config.hrmp_channel_max_capacity,
			Error::<T>::OpenHrmpChannelCapacityExceedsLimit,
		);
		ensure!(proposed_max_message_size > 0, Error::<T>::OpenHrmpChannelZeroMessageSize);
		ensure!(
			proposed_max_message_size <= config.hrmp_channel_max_message_size,
			Error::<T>::OpenHrmpChannelMessageSizeExceedsLimit,
		);
		let channel_id = HrmpChannelId { sender: origin, recipient };
		ensure!(
			HrmpOpenChannelRequests::<T>::get(&channel_id).is_none(),
			Error::<T>::OpenHrmpChannelAlreadyRequested,
		);
		ensure!(
			HrmpChannels::<T>::get(&channel_id).is_none(),
			Error::<T>::OpenHrmpChannelAlreadyExists,
		);
		let egress_cnt = HrmpEgressChannelsIndex::<T>::decode_len(&origin).unwrap_or(0) as u32;
		let open_req_cnt = HrmpOpenChannelRequestCount::<T>::get(&origin);
		let channel_num_limit = config.hrmp_max_parachain_outbound_channels;
		ensure!(
			egress_cnt + open_req_cnt < channel_num_limit,
			Error::<T>::OpenHrmpChannelLimitExceeded,
		);
		// Do not require deposits for channels with or amongst the system.
		let is_system = origin.is_system() || recipient.is_system();
		let deposit = if is_system { 0 } else { config.hrmp_sender_deposit };
		if !deposit.is_zero() {
			T::Currency::reserve(
				&origin.into_account_truncating(),
				deposit.unique_saturated_into(),
			)?;
		}
		// mutating storage directly now -- shall not bail henceforth.
		HrmpOpenChannelRequestCount::<T>::insert(&origin, open_req_cnt + 1);
		HrmpOpenChannelRequests::<T>::insert(
			&channel_id,
			HrmpOpenChannelRequest {
				confirmed: false,
				_age: 0,
				sender_deposit: deposit,
				max_capacity: proposed_max_capacity,
				max_message_size: proposed_max_message_size,
				max_total_size: config.hrmp_channel_max_total_size,
			},
		);
		HrmpOpenChannelRequestsList::<T>::append(channel_id);
		Self::send_to_para(
			"init_open_channel",
			&config,
			recipient,
			Self::wrap_notification(|| {
				use xcm::opaque::latest::{prelude::*, Xcm};
				Xcm(vec![HrmpNewChannelOpenRequest {
					sender: origin.into(),
					max_capacity: proposed_max_capacity,
					max_message_size: proposed_max_message_size,
				}])
			}),
		);
		Ok(())
462
	}
	/// Accept a pending open channel request from the given sender.
	///
	/// Basically the same as [`hrmp_accept_open_channel`](Pallet::hrmp_accept_open_channel) but
	/// intended for calling directly from other pallets rather than dispatched.
	pub fn accept_open_channel(origin: ParaId, sender: ParaId) -> DispatchResult {
		let channel_id = HrmpChannelId { sender, recipient: origin };
		let mut channel_req = HrmpOpenChannelRequests::<T>::get(&channel_id)
			.ok_or(Error::<T>::AcceptHrmpChannelDoesntExist)?;
		ensure!(!channel_req.confirmed, Error::<T>::AcceptHrmpChannelAlreadyConfirmed);
		// check if by accepting this open channel request, this parachain would exceed the
		// number of inbound channels.
		let config = configuration::ActiveConfig::<T>::get();
		let channel_num_limit = config.hrmp_max_parachain_inbound_channels;
		let ingress_cnt = HrmpIngressChannelsIndex::<T>::decode_len(&origin).unwrap_or(0) as u32;
		let accepted_cnt = HrmpAcceptedChannelRequestCount::<T>::get(&origin);
		ensure!(
			ingress_cnt + accepted_cnt < channel_num_limit,
			Error::<T>::AcceptHrmpChannelLimitExceeded,
		);
		// Do not require deposits for channels with or amongst the system.
		let is_system = origin.is_system() || sender.is_system();
		let deposit = if is_system { 0 } else { config.hrmp_recipient_deposit };
		if !deposit.is_zero() {
			T::Currency::reserve(
				&origin.into_account_truncating(),
				deposit.unique_saturated_into(),
			)?;
		}
		// persist the updated open channel request and then increment the number of accepted
		// channels.
		channel_req.confirmed = true;
		HrmpOpenChannelRequests::<T>::insert(&channel_id, channel_req);
		HrmpAcceptedChannelRequestCount::<T>::insert(&origin, accepted_cnt + 1);
		Self::send_to_para(
			"accept_open_channel",
			&config,
			sender,
			Self::wrap_notification(|| {
				use xcm::opaque::latest::{prelude::*, Xcm};
				Xcm(vec![HrmpChannelAccepted { recipient: origin.into() }])
			}),
		);
		Ok(())
	}
	fn cancel_open_request(origin: ParaId, channel_id: HrmpChannelId) -> DispatchResult {
		// check if the origin is allowed to close the channel.
		ensure!(channel_id.is_participant(origin), Error::<T>::CancelHrmpOpenChannelUnauthorized);
		let open_channel_req = HrmpOpenChannelRequests::<T>::get(&channel_id)
			.ok_or(Error::<T>::OpenHrmpChannelDoesntExist)?;
		ensure!(!open_channel_req.confirmed, Error::<T>::OpenHrmpChannelAlreadyConfirmed);
		// Remove the request by the channel id and sync the accompanying list with the set.
		HrmpOpenChannelRequests::<T>::remove(&channel_id);
		HrmpOpenChannelRequestsList::<T>::mutate(|open_req_channels| {
			if let Some(pos) = open_req_channels.iter().position(|x| x == &channel_id) {
				open_req_channels.swap_remove(pos);
			}
		});
		Self::decrease_open_channel_request_count(channel_id.sender);
		// Don't decrease `HrmpAcceptedChannelRequestCount` because we don't consider confirmed
		// requests here.
		// Unreserve the sender's deposit. The recipient could not have left their deposit because
		// we ensured that the request is not confirmed.
		T::Currency::unreserve(
			&channel_id.sender.into_account_truncating(),
			open_channel_req.sender_deposit.unique_saturated_into(),
		);
		Ok(())
	}
	fn close_channel(origin: ParaId, channel_id: HrmpChannelId) -> Result<(), Error<T>> {
		// check if the origin is allowed to close the channel.
		ensure!(channel_id.is_participant(origin), Error::<T>::CloseHrmpChannelUnauthorized);
		// check if the channel requested to close does exist.
		ensure!(
			HrmpChannels::<T>::get(&channel_id).is_some(),
			Error::<T>::CloseHrmpChannelDoesntExist,
		);
		// check that there is no outstanding close request for this channel
		ensure!(
			HrmpCloseChannelRequests::<T>::get(&channel_id).is_none(),
			Error::<T>::CloseHrmpChannelAlreadyUnderway,
		);
		HrmpCloseChannelRequests::<T>::insert(&channel_id, ());
		HrmpCloseChannelRequestsList::<T>::append(channel_id.clone());
		let config = configuration::ActiveConfig::<T>::get();
		let opposite_party =
			if origin == channel_id.sender { channel_id.recipient } else { channel_id.sender };
		Self::send_to_para(
			"close_channel",
			&config,
			opposite_party,
			Self::wrap_notification(|| {
				use xcm::opaque::latest::{prelude::*, Xcm};
				Xcm(vec![HrmpChannelClosing {
					initiator: origin.into(),
					sender: channel_id.sender.into(),
					recipient: channel_id.recipient.into(),
				}])
			}),
		);
		Ok(())
	}
	/// Returns the list of MQC heads for the inbound channels of the given recipient para paired
	/// with the sender para ids. This vector is sorted ascending by the para id and doesn't contain
	/// multiple entries with the same sender.
	#[cfg(test)]
	fn hrmp_mqc_heads(recipient: ParaId) -> Vec<(ParaId, Hash)> {
		let sender_set = HrmpIngressChannelsIndex::<T>::get(&recipient);
		// The ingress channels vector is sorted, thus `mqc_heads` is sorted as well.
		let mut mqc_heads = Vec::with_capacity(sender_set.len());
		for sender in sender_set {
			let channel_metadata = HrmpChannels::<T>::get(&HrmpChannelId { sender, recipient });
			let mqc_head = channel_metadata
				.and_then(|metadata| metadata.mqc_head)
				.unwrap_or(Hash::default());
			mqc_heads.push((sender, mqc_head));
		}
		mqc_heads
	}
	/// Returns contents of all channels addressed to the given recipient. Channels that have no
	/// messages in them are also included.
	pub(crate) fn inbound_hrmp_channels_contents(
		recipient: ParaId,
	) -> BTreeMap<ParaId, Vec<InboundHrmpMessage<BlockNumberFor<T>>>> {
		let sender_set = HrmpIngressChannelsIndex::<T>::get(&recipient);
		let mut inbound_hrmp_channels_contents = BTreeMap::new();
		for sender in sender_set {
			let channel_contents =
				HrmpChannelContents::<T>::get(&HrmpChannelId { sender, recipient });
			inbound_hrmp_channels_contents.insert(sender, channel_contents);
		}
		inbound_hrmp_channels_contents
	}
}
impl<T: Config> Pallet<T> {
	/// Decreases the open channel request count for the given sender. If the value reaches zero
	/// it is removed completely.
	fn decrease_open_channel_request_count(sender: ParaId) {
		HrmpOpenChannelRequestCount::<T>::mutate_exists(&sender, |opt_rc| {
			*opt_rc = opt_rc.and_then(|rc| match rc.saturating_sub(1) {
				0 => None,
				n => Some(n),
			});
		});
	}
	/// Decreases the accepted channel request count for the given sender. If the value reaches
	/// zero it is removed completely.
	fn decrease_accepted_channel_request_count(recipient: ParaId) {
		HrmpAcceptedChannelRequestCount::<T>::mutate_exists(&recipient, |opt_rc| {
			*opt_rc = opt_rc.and_then(|rc| match rc.saturating_sub(1) {
				0 => None,
				n => Some(n),
			});
		});
	}
	#[cfg(any(feature = "runtime-benchmarks", test))]
	fn assert_storage_consistency_exhaustive() {
		fn assert_is_sorted<T: Ord>(slice: &[T], id: &str) {
			assert!(slice.windows(2).all(|xs| xs[0] <= xs[1]), "{} supposed to be sorted", id);
		}
		let assert_contains_only_onboarded = |paras: Vec<ParaId>, cause: &str| {
			for para in paras {
				assert!(
					crate::paras::Pallet::<T>::is_valid_para(para),
					"{}: {:?} para is offboarded",
					cause,
					para
				);
			}
		};
		assert_eq!(
			HrmpOpenChannelRequests::<T>::iter().map(|(k, _)| k).collect::<BTreeSet<_>>(),
			HrmpOpenChannelRequestsList::<T>::get().into_iter().collect::<BTreeSet<_>>(),
		);
		// verify that the set of keys in `HrmpOpenChannelRequestCount` corresponds to the set
		// of _senders_ in `HrmpOpenChannelRequests`.
		//
		// having ensured that, we can go ahead and go over all counts and verify that they match.
		assert_eq!(
			HrmpOpenChannelRequestCount::<T>::iter()
				.map(|(k, _)| k)
				.collect::<BTreeSet<_>>(),
			HrmpOpenChannelRequests::<T>::iter()
				.map(|(k, _)| k.sender)
				.collect::<BTreeSet<_>>(),
		);
		for (open_channel_initiator, expected_num) in HrmpOpenChannelRequestCount::<T>::iter() {
			let actual_num = HrmpOpenChannelRequests::<T>::iter()
				.filter(|(ch, _)| ch.sender == open_channel_initiator)
				.count() as u32;
			assert_eq!(expected_num, actual_num);
		}
		// The same as above, but for accepted channel request count. Note that we are interested
		// only in confirmed open requests.
		assert_eq!(
			HrmpAcceptedChannelRequestCount::<T>::iter()
				.map(|(k, _)| k)
				.collect::<BTreeSet<_>>(),
			HrmpOpenChannelRequests::<T>::iter()
				.filter(|(_, v)| v.confirmed)
				.map(|(k, _)| k.recipient)
				.collect::<BTreeSet<_>>(),
		);
		for (channel_recipient, expected_num) in HrmpAcceptedChannelRequestCount::<T>::iter() {
			let actual_num = HrmpOpenChannelRequests::<T>::iter()
				.filter(|(ch, v)| ch.recipient == channel_recipient && v.confirmed)
				.count() as u32;
			assert_eq!(expected_num, actual_num);
		}
		assert_eq!(
			HrmpCloseChannelRequests::<T>::iter().map(|(k, _)| k).collect::<BTreeSet<_>>(),
			HrmpCloseChannelRequestsList::<T>::get().into_iter().collect::<BTreeSet<_>>(),
		);
		// A HRMP watermark can be None for an onboarded parachain. However, an offboarded parachain
		// cannot have an HRMP watermark: it should've been cleanup.
		assert_contains_only_onboarded(
			HrmpWatermarks::<T>::iter().map(|(k, _)| k).collect::<Vec<_>>(),
			"HRMP watermarks should contain only onboarded paras",
		);
		// An entry in `HrmpChannels` indicates that the channel is open. Only open channels can
		// have contents.
		for (non_empty_channel, contents) in HrmpChannelContents::<T>::iter() {
			assert!(HrmpChannels::<T>::contains_key(&non_empty_channel));
			// pedantic check: there should be no empty vectors in storage, those should be modeled
			// by a removed kv pair.
			assert!(!contents.is_empty());
		}
		// Senders and recipients must be onboarded. Otherwise, all channels associated with them
		// are removed.
		assert_contains_only_onboarded(
			HrmpChannels::<T>::iter()
				.flat_map(|(k, _)| vec![k.sender, k.recipient])
				.collect::<Vec<_>>(),
			"senders and recipients in all channels should be onboarded",
		);
		// Check the docs for `HrmpIngressChannelsIndex` and `HrmpEgressChannelsIndex` in decl
		// storage to get an index what are the channel mappings indexes.
		//
		// Here, from indexes.
		//
		// ingress         egress
		//
		// a -> [x, y]     x -> [a, b]
		// b -> [x, z]     y -> [a]
		//                 z -> [b]
		//
		// we derive a list of channels they represent.
		//
		//   (a, x)         (a, x)
		//   (a, y)         (a, y)
		//   (b, x)         (b, x)
		//   (b, z)         (b, z)
		//
		// and then that we compare that to the channel list in the `HrmpChannels`.
		let channel_set_derived_from_ingress = HrmpIngressChannelsIndex::<T>::iter()
			.flat_map(|(p, v)| v.into_iter().map(|i| (i, p)).collect::<Vec<_>>())
			.collect::<BTreeSet<_>>();
		let channel_set_derived_from_egress = HrmpEgressChannelsIndex::<T>::iter()
			.flat_map(|(p, v)| v.into_iter().map(|e| (p, e)).collect::<Vec<_>>())
			.collect::<BTreeSet<_>>();
		let channel_set_ground_truth = HrmpChannels::<T>::iter()
			.map(|(k, _)| (k.sender, k.recipient))
			.collect::<BTreeSet<_>>();
		assert_eq!(channel_set_derived_from_ingress, channel_set_derived_from_egress);
		assert_eq!(channel_set_derived_from_egress, channel_set_ground_truth);
		HrmpIngressChannelsIndex::<T>::iter()
			.map(|(_, v)| v)
			.for_each(|v| assert_is_sorted(&v, "HrmpIngressChannelsIndex"));
		HrmpEgressChannelsIndex::<T>::iter()
			.map(|(_, v)| v)
			.for_each(|v| assert_is_sorted(&v, "HrmpIngressChannelsIndex"));
		assert_contains_only_onboarded(
			HrmpChannelDigests::<T>::iter().map(|(k, _)| k).collect::<Vec<_>>(),
			"HRMP channel digests should contain only onboarded paras",
		);
		for (_digest_for_para, digest) in HrmpChannelDigests::<T>::iter() {
			// Assert that items are in **strictly** ascending order. The strictness also implies
			// there are no duplicates.
			assert!(digest.windows(2).all(|xs| xs[0].0 < xs[1].0));
			for (_, mut senders) in digest {
				assert!(!senders.is_empty());
				// check for duplicates. For that we sort the vector, then perform deduplication.
				// if the vector stayed the same, there are no duplicates.
				senders.sort();
				let orig_senders = senders.clone();
				senders.dedup();
				assert_eq!(
					orig_senders, senders,
					"duplicates removed implies existence of duplicates"
				);
			}
		}
	}
}
impl<T: Config> Pallet<T> {
	/// Wraps HRMP XCM notifications to the most suitable XCM version for the destination para.
	/// If the XCM version is unknown, the latest XCM version is used as a best effort.
	fn wrap_notification(
		mut notification: impl FnMut() -> xcm::opaque::latest::opaque::Xcm,
	) -> impl FnOnce(ParaId) -> polkadot_primitives::DownwardMessage {
		use xcm::{
			opaque::VersionedXcm,
			prelude::{Junction, Location},
			WrapVersion,
		};
		// Return a closure that can prepare notifications.
		move |dest| {
			// Attempt to wrap the notification for the destination parachain.
			T::VersionWrapper::wrap_version(
				&Location::new(0, [Junction::Parachain(dest.into())]),
				notification(),
			)
			.unwrap_or_else(|_| {
				// As a best effort, if we cannot resolve the version, fallback to using the latest
				// version.
				VersionedXcm::from(notification())
			})
			.encode()
		}
	}
	/// Sends/enqueues notification to the destination parachain.
	fn send_to_para(
		log_label: &str,
		config: &HostConfiguration<BlockNumberFor<T>>,
		dest: ParaId,
		notification_bytes_for: impl FnOnce(ParaId) -> polkadot_primitives::DownwardMessage,
	) {
		// prepare notification
		let notification_bytes = notification_bytes_for(dest);
		// try to enqueue
		if let Err(dmp::QueueDownwardMessageError::ExceedsMaxMessageSize) =
			dmp::Pallet::<T>::queue_downward_message(&config, dest, notification_bytes)
		{
			// this should never happen unless the max downward message size is configured to a
			// jokingly small number.
			log::error!(
				target: "runtime::hrmp",
				"sending '{log_label}::notification_bytes' failed."
			);
			debug_assert!(false);
		}
	}
}