1
// This file is part of Substrate.
2

            
3
// Copyright (C) Parity Technologies (UK) Ltd.
4
// SPDX-License-Identifier: Apache-2.0
5

            
6
// Licensed under the Apache License, Version 2.0 (the "License");
7
// you may not use this file except in compliance with the License.
8
// You may obtain a copy of the License at
9
//
10
// 	http://www.apache.org/licenses/LICENSE-2.0
11
//
12
// Unless required by applicable law or agreed to in writing, software
13
// distributed under the License is distributed on an "AS IS" BASIS,
14
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
// See the License for the specific language governing permissions and
16
// limitations under the License.
17

            
18
//! # Generalized Message Queue Pallet
19
//!
20
//! Provides generalized message queuing and processing capabilities on a per-queue basis for
21
//! arbitrary use-cases.
22
//!
23
//! # Design Goals
24
//!
25
//! 1. Minimal assumptions about `Message`s and `MessageOrigin`s. Both should be MEL bounded blobs.
26
//!  This ensures the generality and reusability of the pallet.
27
//! 2. Well known and tightly limited pre-dispatch PoV weights, especially for message execution.
28
//!  This is paramount for the success of the pallet since message execution is done in
29
//!  `on_initialize` which must _never_ under-estimate its PoV weight. It also needs a frugal PoV
30
//!  footprint since PoV is scarce and this is (possibly) done in every block. This must also hold
31
//! in  the presence of unpredictable message size distributions.
32
//! 3. Usable as XCMP, DMP and UMP message/dispatch queue - possibly through adapter types.
33
//!
34
//! # Design
35
//!
36
//! The pallet has means to enqueue, store and process messages. This is implemented by having
37
//! *queues* which store enqueued messages and can be *served* to process said messages. A queue is
38
//! identified by its origin in the `BookStateFor`. Each message has an origin which defines into
39
//! which queue it will be stored. Messages are stored by being appended to the last [`Page`] of a
40
//! book. Each book keeps track of its pages by indexing `Pages`. The `ReadyRing` contains all
41
//! queues which hold at least one unprocessed message and are thereby *ready* to be serviced. The
42
//! `ServiceHead` indicates which *ready* queue is the next to be serviced.
43
//! The pallet implements [`frame_support::traits::EnqueueMessage`],
44
//! [`frame_support::traits::ServiceQueues`] and has [`frame_support::traits::ProcessMessage`] and
45
//! [`OnQueueChanged`] hooks to communicate with the outside world.
46
//!
47
//! NOTE: The storage items are not linked since they are not public.
48
//!
49
//! **Message Execution**
50
//!
51
//! Executing a message is offloaded to the [`Config::MessageProcessor`] which contains the actual
52
//! logic of how to handle the message since they are blobs. Storage changes are not rolled back on
53
//! error.
54
//!
55
//! A failed message can be temporarily or permanently overweight. The pallet will perpetually try
56
//! to execute a temporarily overweight message. A permanently overweight message is skipped and
57
//! must be executed manually.
58
//!
59
//! **Reentrancy**
60
//!
61
//! This pallet has two entry points for executing (possibly recursive) logic;
62
//! [`Pallet::service_queues`] and [`Pallet::execute_overweight`]. Both entry points are guarded by
63
//! the same mutex to error on reentrancy. The only functions that are explicitly **allowed** to be
64
//! called by a message processor are: [`Pallet::enqueue_message`] and
65
//! [`Pallet::enqueue_messages`]. All other functions are forbidden and error with
66
//! [`Error::RecursiveDisallowed`].
67
//!
68
//! **Pagination**
69
//!
70
//! Queues are stored in a *paged* manner by splitting their messages into [`Page`]s. This results
71
//! in a lot of complexity when implementing the pallet but is completely necessary to achieve the
72
//! second #[Design Goal](design-goals). The problem comes from the fact a message can *possibly* be
73
//! quite large, lets say 64KiB. This then results in a *MEL* of at least 64KiB which results in a
74
//! PoV of at least 64KiB. Now we have the assumption that most messages are much shorter than their
75
//! maximum allowed length. This would result in most messages having a pre-dispatch PoV size which
76
//! is much larger than their post-dispatch PoV size, possibly by a factor of thousand. Disregarding
77
//! this observation would cripple the processing power of the pallet since it cannot straighten out
78
//! this discrepancy at runtime. Conceptually, the implementation is packing as many messages into a
79
//! single bounded vec, as actually fit into the bounds. This reduces the wasted PoV.
80
//!
81
//! **Page Data Layout**
82
//!
83
//! A Page contains a heap which holds all its messages. The heap is built by concatenating
84
//! `(ItemHeader, Message)` pairs. The [`ItemHeader`] contains the length of the message which is
85
//! needed for retrieving it. This layout allows for constant access time of the next message and
86
//! linear access time for any message in the page. The header must remain minimal to reduce its PoV
87
//! impact.
88
//!
89
//! **Weight Metering**
90
//!
91
//! The pallet utilizes the [`sp_weights::WeightMeter`] to manually track its consumption to always
92
//! stay within the required limit. This implies that the message processor hook can calculate the
93
//! weight of a message without executing it. This restricts the possible use-cases but is necessary
94
//! since the pallet runs in `on_initialize` which has a hard weight limit. The weight meter is used
95
//! in a way that `can_accrue` and `check_accrue` are always used to check the remaining weight of
96
//! an operation before committing to it. The process of exiting due to insufficient weight is
97
//! termed "bailing".
98
//!
99
//! # Scenario: Message enqueuing
100
//!
101
//! A message `m` is enqueued for origin `o` into queue `Q[o]` through
102
//! [`frame_support::traits::EnqueueMessage::enqueue_message`]`(m, o)`.
103
//!
104
//! First the queue is either loaded if it exists or otherwise created with empty default values.
105
//! The message is then inserted to the queue by appended it into its last `Page` or by creating a
106
//! new `Page` just for `m` if it does not fit in there. The number of messages in the `Book` is
107
//! incremented.
108
//!
109
//! `Q[o]` is now *ready* which will eventually result in `m` being processed.
110
//!
111
//! # Scenario: Message processing
112
//!
113
//! The pallet runs each block in `on_initialize` or when being manually called through
114
//! [`frame_support::traits::ServiceQueues::service_queues`].
115
//!
116
//! First it tries to "rotate" the `ReadyRing` by one through advancing the `ServiceHead` to the
117
//! next *ready* queue. It then starts to service this queue by servicing as many pages of it as
118
//! possible. Servicing a page means to execute as many message of it as possible. Each executed
119
//! message is marked as *processed* if the [`Config::MessageProcessor`] return Ok. An event
120
//! [`Event::Processed`] is emitted afterwards. It is possible that the weight limit of the pallet
121
//! will never allow a specific message to be executed. In this case it remains as unprocessed and
122
//! is skipped. This process stops if either there are no more messages in the queue or the
123
//! remaining weight became insufficient to service this queue. If there is enough weight it tries
124
//! to advance to the next *ready* queue and service it. This continues until there are no more
125
//! queues on which it can make progress or not enough weight to check that.
126
//!
127
//! # Scenario: Overweight execution
128
//!
129
//! A permanently over-weight message which was skipped by the message processing will never be
130
//! executed automatically through `on_initialize` nor by calling
131
//! [`frame_support::traits::ServiceQueues::service_queues`].
132
//!
133
//! Manual intervention in the form of
134
//! [`frame_support::traits::ServiceQueues::execute_overweight`] is necessary. Overweight messages
135
//! emit an [`Event::OverweightEnqueued`] event which can be used to extract the arguments for
136
//! manual execution. This only works on permanently overweight messages. There is no guarantee that
137
//! this will work since the message could be part of a stale page and be reaped before execution
138
//! commences.
139
//!
140
//! # Terminology
141
//!
142
//! - `Message`: A blob of data into which the pallet has no introspection, defined as
143
//! [`BoundedSlice<u8, MaxMessageLenOf<T>>`]. The message length is limited by [`MaxMessageLenOf`]
144
//! which is calculated from [`Config::HeapSize`] and [`ItemHeader::max_encoded_len()`].
145
//! - `MessageOrigin`: A generic *origin* of a message, defined as [`MessageOriginOf`]. The
146
//! requirements for it are kept minimal to remain as generic as possible. The type is defined in
147
//! [`frame_support::traits::ProcessMessage::Origin`].
148
//! - `Page`: An array of `Message`s, see [`Page`]. Can never be empty.
149
//! - `Book`: A list of `Page`s, see [`BookState`]. Can be empty.
150
//! - `Queue`: A `Book` together with an `MessageOrigin` which can be part of the `ReadyRing`. Can
151
//!   be empty.
152
//! - `ReadyRing`: A double-linked list which contains all *ready* `Queue`s. It chains together the
153
//!   queues via their `ready_neighbours` fields. A `Queue` is *ready* if it contains at least one
154
//!   `Message` which can be processed. Can be empty.
155
//! - `ServiceHead`: A pointer into the `ReadyRing` to the next `Queue` to be serviced.
156
//! - (`un`)`processed`: A message is marked as *processed* after it was executed by the pallet. A
157
//!   message which was either: not yet executed or could not be executed remains as `unprocessed`
158
//!   which is the default state for a message after being enqueued.
159
//! - `knitting`/`unknitting`: The means of adding or removing a `Queue` from the `ReadyRing`.
160
//! - `MEL`: The Max Encoded Length of a type, see [`codec::MaxEncodedLen`].
161
//! - `Reentrance`: To enter an execution context again before it has completed.
162
//!
163
//! # Properties
164
//!
165
//! **Liveness - Enqueueing**
166
//!
167
//! It is always possible to enqueue any message for any `MessageOrigin`.
168
//!
169
//! **Liveness - Processing**
170
//!
171
//! `on_initialize` always respects its finite weight-limit.
172
//!
173
//! **Progress - Enqueueing**
174
//!
175
//! An enqueued message immediately becomes *unprocessed* and thereby eligible for execution.
176
//!
177
//! **Progress - Processing**
178
//!
179
//! The pallet will execute at least one unprocessed message per block, if there is any. Ensuring
180
//! this property needs careful consideration of the concrete weights, since it is possible that the
181
//! weight limit of `on_initialize` never allows for the execution of even one message; trivially if
182
//! the limit is set to zero. `integrity_test` can be used to ensure that this property holds.
183
//!
184
//! **Fairness - Enqueuing**
185
//!
186
//! Enqueueing a message for a specific `MessageOrigin` does not influence the ability to enqueue a
187
//! message for the same of any other `MessageOrigin`; guaranteed by **Liveness - Enqueueing**.
188
//!
189
//! **Fairness - Processing**
190
//!
191
//! The average amount of weight available for message processing is the same for each queue if the
192
//! number of queues is constant. Creating a new queue must therefore be, possibly economically,
193
//! expensive. Currently this is archived by having one queue per para-chain/thread, which keeps the
194
//! number of queues within `O(n)` and should be "good enough".
195

            
196
#![deny(missing_docs)]
197
#![cfg_attr(not(feature = "std"), no_std)]
198

            
199
mod benchmarking;
200
mod integration_test;
201
mod mock;
202
pub mod mock_helpers;
203
mod tests;
204
pub mod weights;
205

            
206
extern crate alloc;
207

            
208
use alloc::{vec, vec::Vec};
209
use codec::{Codec, Decode, Encode, MaxEncodedLen};
210
use core::{fmt::Debug, ops::Deref};
211
use frame_support::{
212
	defensive,
213
	pallet_prelude::*,
214
	traits::{
215
		Defensive, DefensiveSaturating, DefensiveTruncateFrom, EnqueueMessage,
216
		ExecuteOverweightError, Footprint, ProcessMessage, ProcessMessageError, QueueFootprint,
217
		QueuePausedQuery, ServiceQueues,
218
	},
219
	BoundedSlice, CloneNoBound, DefaultNoBound,
220
};
221
use frame_system::pallet_prelude::*;
222
pub use pallet::*;
223
use scale_info::TypeInfo;
224
use sp_arithmetic::traits::{BaseArithmetic, Unsigned};
225
use sp_core::{defer, H256};
226
use sp_runtime::{
227
	traits::{One, Zero},
228
	SaturatedConversion, Saturating,
229
};
230
use sp_weights::WeightMeter;
231
pub use weights::WeightInfo;
232

            
233
/// Type for identifying a page.
234
type PageIndex = u32;
235

            
236
/// Data encoded and prefixed to the encoded `MessageItem`.
237
#[derive(Encode, Decode, PartialEq, MaxEncodedLen, Debug)]
238
pub struct ItemHeader<Size> {
239
	/// The length of this item, not including the size of this header. The next item of the page
240
	/// follows immediately after the payload of this item.
241
	payload_len: Size,
242
	/// Whether this item has been processed.
243
	is_processed: bool,
244
}
245

            
246
/// A page of messages. Pages always contain at least one item.
247
#[derive(
248
	CloneNoBound, Encode, Decode, RuntimeDebugNoBound, DefaultNoBound, TypeInfo, MaxEncodedLen,
249
)]
250
#[scale_info(skip_type_params(HeapSize))]
251
#[codec(mel_bound(Size: MaxEncodedLen))]
252
pub struct Page<Size: Into<u32> + Debug + Clone + Default, HeapSize: Get<Size>> {
253
	/// Messages remaining to be processed; this includes overweight messages which have been
254
	/// skipped.
255
	remaining: Size,
256
	/// The size of all remaining messages to be processed.
257
	///
258
	/// Includes overweight messages outside of the `first` to `last` window.
259
	remaining_size: Size,
260
	/// The number of items before the `first` item in this page.
261
	first_index: Size,
262
	/// The heap-offset of the header of the first message item in this page which is ready for
263
	/// processing.
264
	first: Size,
265
	/// The heap-offset of the header of the last message item in this page.
266
	last: Size,
267
	/// The heap. If `self.offset == self.heap.len()` then the page is empty and should be deleted.
268
	heap: BoundedVec<u8, IntoU32<HeapSize, Size>>,
269
}
270

            
271
impl<
272
		Size: BaseArithmetic + Unsigned + Copy + Into<u32> + Codec + MaxEncodedLen + Debug + Default,
273
		HeapSize: Get<Size>,
274
	> Page<Size, HeapSize>
275
{
276
	/// Create a [`Page`] from one unprocessed message.
277
	fn from_message<T: Config>(message: BoundedSlice<u8, MaxMessageLenOf<T>>) -> Self {
278
		let payload_len = message.len();
279
		let data_len = ItemHeader::<Size>::max_encoded_len().saturating_add(payload_len);
280
		let payload_len = payload_len.saturated_into();
281
		let header = ItemHeader::<Size> { payload_len, is_processed: false };
282

            
283
		let mut heap = Vec::with_capacity(data_len);
284
		header.using_encoded(|h| heap.extend_from_slice(h));
285
		heap.extend_from_slice(message.deref());
286

            
287
		Page {
288
			remaining: One::one(),
289
			remaining_size: payload_len,
290
			first_index: Zero::zero(),
291
			first: Zero::zero(),
292
			last: Zero::zero(),
293
			heap: BoundedVec::defensive_truncate_from(heap),
294
		}
295
	}
296

            
297
	/// Try to append one message to a page.
298
	fn try_append_message<T: Config>(
299
		&mut self,
300
		message: BoundedSlice<u8, MaxMessageLenOf<T>>,
301
	) -> Result<(), ()> {
302
		let pos = self.heap.len();
303
		let payload_len = message.len();
304
		let data_len = ItemHeader::<Size>::max_encoded_len().saturating_add(payload_len);
305
		let payload_len = payload_len.saturated_into();
306
		let header = ItemHeader::<Size> { payload_len, is_processed: false };
307
		let heap_size: u32 = HeapSize::get().into();
308
		if (heap_size as usize).saturating_sub(self.heap.len()) < data_len {
309
			// Can't fit.
310
			return Err(())
311
		}
312

            
313
		let mut heap = core::mem::take(&mut self.heap).into_inner();
314
		header.using_encoded(|h| heap.extend_from_slice(h));
315
		heap.extend_from_slice(message.deref());
316
		self.heap = BoundedVec::defensive_truncate_from(heap);
317
		self.last = pos.saturated_into();
318
		self.remaining.saturating_inc();
319
		self.remaining_size.saturating_accrue(payload_len);
320
		Ok(())
321
	}
322

            
323
	/// Returns the first message in the page without removing it.
324
	///
325
	/// SAFETY: Does not panic even on corrupted storage.
326
	fn peek_first(&self) -> Option<BoundedSlice<u8, IntoU32<HeapSize, Size>>> {
327
		if self.first > self.last {
328
			return None
329
		}
330
		let f = (self.first.into() as usize).min(self.heap.len());
331
		let mut item_slice = &self.heap[f..];
332
		if let Ok(h) = ItemHeader::<Size>::decode(&mut item_slice) {
333
			let payload_len = h.payload_len.into() as usize;
334
			if payload_len <= item_slice.len() {
335
				// impossible to truncate since is sliced up from `self.heap: BoundedVec<u8,
336
				// HeapSize>`
337
				return Some(BoundedSlice::defensive_truncate_from(&item_slice[..payload_len]))
338
			}
339
		}
340
		defensive!("message-queue: heap corruption");
341
		None
342
	}
343

            
344
	/// Point `first` at the next message, marking the first as processed if `is_processed` is true.
345
	fn skip_first(&mut self, is_processed: bool) {
346
		let f = (self.first.into() as usize).min(self.heap.len());
347
		if let Ok(mut h) = ItemHeader::decode(&mut &self.heap[f..]) {
348
			if is_processed && !h.is_processed {
349
				h.is_processed = true;
350
				h.using_encoded(|d| self.heap[f..f + d.len()].copy_from_slice(d));
351
				self.remaining.saturating_dec();
352
				self.remaining_size.saturating_reduce(h.payload_len);
353
			}
354
			self.first
355
				.saturating_accrue(ItemHeader::<Size>::max_encoded_len().saturated_into());
356
			self.first.saturating_accrue(h.payload_len);
357
			self.first_index.saturating_inc();
358
		}
359
	}
360

            
361
	/// Return the message with index `index` in the form of `(position, processed, message)`.
362
	fn peek_index(&self, index: usize) -> Option<(usize, bool, &[u8])> {
363
		let mut pos = 0;
364
		let mut item_slice = &self.heap[..];
365
		let header_len: usize = ItemHeader::<Size>::max_encoded_len().saturated_into();
366
		for _ in 0..index {
367
			let h = ItemHeader::<Size>::decode(&mut item_slice).ok()?;
368
			let item_len = h.payload_len.into() as usize;
369
			if item_slice.len() < item_len {
370
				return None
371
			}
372
			item_slice = &item_slice[item_len..];
373
			pos.saturating_accrue(header_len.saturating_add(item_len));
374
		}
375
		let h = ItemHeader::<Size>::decode(&mut item_slice).ok()?;
376
		if item_slice.len() < h.payload_len.into() as usize {
377
			return None
378
		}
379
		item_slice = &item_slice[..h.payload_len.into() as usize];
380
		Some((pos, h.is_processed, item_slice))
381
	}
382

            
383
	/// Set the `is_processed` flag for the item at `pos` to be `true` if not already and decrement
384
	/// the `remaining` counter of the page.
385
	///
386
	/// Does nothing if no [`ItemHeader`] could be decoded at the given position.
387
	fn note_processed_at_pos(&mut self, pos: usize) {
388
		if let Ok(mut h) = ItemHeader::<Size>::decode(&mut &self.heap[pos..]) {
389
			if !h.is_processed {
390
				h.is_processed = true;
391
				h.using_encoded(|d| self.heap[pos..pos + d.len()].copy_from_slice(d));
392
				self.remaining.saturating_dec();
393
				self.remaining_size.saturating_reduce(h.payload_len);
394
			}
395
		}
396
	}
397

            
398
	/// Returns whether the page is *complete* which means that no messages remain.
399
	fn is_complete(&self) -> bool {
400
		self.remaining.is_zero()
401
	}
402
}
403

            
404
/// A single link in the double-linked Ready Ring list.
405
#[derive(Clone, Encode, Decode, MaxEncodedLen, TypeInfo, RuntimeDebug, PartialEq)]
406
pub struct Neighbours<MessageOrigin> {
407
	/// The previous queue.
408
	prev: MessageOrigin,
409
	/// The next queue.
410
	next: MessageOrigin,
411
}
412

            
413
/// The state of a queue as represented by a book of its pages.
414
///
415
/// Each queue has exactly one book which holds all of its pages. All pages of a book combined
416
/// contain all of the messages of its queue; hence the name *Book*.
417
/// Books can be chained together in a double-linked fashion through their `ready_neighbours` field.
418
#[derive(Clone, Encode, Decode, MaxEncodedLen, TypeInfo, RuntimeDebug)]
419
pub struct BookState<MessageOrigin> {
420
	/// The first page with some items to be processed in it. If this is `>= end`, then there are
421
	/// no pages with items to be processing in them.
422
	begin: PageIndex,
423
	/// One more than the last page with some items to be processed in it.
424
	end: PageIndex,
425
	/// The number of pages stored at present.
426
	///
427
	/// This might be larger than `end-begin`, because we keep pages with unprocessed overweight
428
	/// messages outside of the end/begin window.
429
	count: PageIndex,
430
	/// If this book has any ready pages, then this will be `Some` with the previous and next
431
	/// neighbours. This wraps around.
432
	ready_neighbours: Option<Neighbours<MessageOrigin>>,
433
	/// The number of unprocessed messages stored at present.
434
	message_count: u64,
435
	/// The total size of all unprocessed messages stored at present.
436
	size: u64,
437
}
438

            
439
impl<MessageOrigin> Default for BookState<MessageOrigin> {
440
1083
	fn default() -> Self {
441
1083
		Self { begin: 0, end: 0, count: 0, ready_neighbours: None, message_count: 0, size: 0 }
442
1083
	}
443
}
444

            
445
impl<MessageOrigin> From<BookState<MessageOrigin>> for QueueFootprint {
446
	fn from(book: BookState<MessageOrigin>) -> Self {
447
		QueueFootprint {
448
			pages: book.count,
449
			ready_pages: book.end.defensive_saturating_sub(book.begin),
450
			storage: Footprint { count: book.message_count, size: book.size },
451
		}
452
	}
453
}
454

            
455
/// Handler code for when the items in a queue change.
456
pub trait OnQueueChanged<Id> {
457
	/// Note that the queue `id` now has `item_count` items in it, taking up `items_size` bytes.
458
	fn on_queue_changed(id: Id, fp: QueueFootprint);
459
}
460

            
461
impl<Id> OnQueueChanged<Id> for () {
462
	fn on_queue_changed(_: Id, _: QueueFootprint) {}
463
}
464

            
465
1145
#[frame_support::pallet]
466
pub mod pallet {
467
	use super::*;
468

            
469
1698
	#[pallet::pallet]
470
	pub struct Pallet<T>(_);
471

            
472
	/// The module configuration trait.
473
	#[pallet::config]
474
	pub trait Config: frame_system::Config {
475
		/// The overarching event type.
476
		type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
477

            
478
		/// Weight information for extrinsics in this pallet.
479
		type WeightInfo: WeightInfo;
480

            
481
		/// Processor for a message.
482
		///
483
		/// Storage changes are not rolled back on error.
484
		///
485
		/// # Benchmarking
486
		///
487
		/// Must be set to [`mock_helpers::NoopMessageProcessor`] for benchmarking.
488
		/// Other message processors that consumes exactly (1, 1) weight for any give message will
489
		/// work as well. Otherwise the benchmarking will also measure the weight of the message
490
		/// processor, which is not desired.
491
		type MessageProcessor: ProcessMessage;
492

            
493
		/// Page/heap size type.
494
		type Size: BaseArithmetic
495
			+ Unsigned
496
			+ Copy
497
			+ Into<u32>
498
			+ Member
499
			+ Encode
500
			+ Decode
501
			+ MaxEncodedLen
502
			+ TypeInfo
503
			+ Default;
504

            
505
		/// Code to be called when a message queue changes - either with items introduced or
506
		/// removed.
507
		type QueueChangeHandler: OnQueueChanged<<Self::MessageProcessor as ProcessMessage>::Origin>;
508

            
509
		/// Queried by the pallet to check whether a queue can be serviced.
510
		///
511
		/// This also applies to manual servicing via `execute_overweight` and `service_queues`. The
512
		/// value of this is only polled once before servicing the queue. This means that changes to
513
		/// it that happen *within* the servicing will not be reflected.
514
		type QueuePausedQuery: QueuePausedQuery<<Self::MessageProcessor as ProcessMessage>::Origin>;
515

            
516
		/// The size of the page; this implies the maximum message size which can be sent.
517
		///
518
		/// A good value depends on the expected message sizes, their weights, the weight that is
519
		/// available for processing them and the maximal needed message size. The maximal message
520
		/// size is slightly lower than this as defined by [`MaxMessageLenOf`].
521
		#[pallet::constant]
522
		type HeapSize: Get<Self::Size>;
523

            
524
		/// The maximum number of stale pages (i.e. of overweight messages) allowed before culling
525
		/// can happen. Once there are more stale pages than this, then historical pages may be
526
		/// dropped, even if they contain unprocessed overweight messages.
527
		#[pallet::constant]
528
		type MaxStale: Get<u32>;
529

            
530
		/// The amount of weight (if any) which should be provided to the message queue for
531
		/// servicing enqueued items `on_initialize`.
532
		///
533
		/// This may be legitimately `None` in the case that you will call
534
		/// `ServiceQueues::service_queues` manually or set [`Self::IdleMaxServiceWeight`] to have
535
		/// it run in `on_idle`.
536
		#[pallet::constant]
537
		type ServiceWeight: Get<Option<Weight>>;
538

            
539
		/// The maximum amount of weight (if any) to be used from remaining weight `on_idle` which
540
		/// should be provided to the message queue for servicing enqueued items `on_idle`.
541
		/// Useful for parachains to process messages at the same block they are received.
542
		///
543
		/// If `None`, it will not call `ServiceQueues::service_queues` in `on_idle`.
544
		#[pallet::constant]
545
		type IdleMaxServiceWeight: Get<Option<Weight>>;
546
	}
547

            
548
	#[pallet::event]
549
	#[pallet::generate_deposit(pub(super) fn deposit_event)]
550
	pub enum Event<T: Config> {
551
		/// Message discarded due to an error in the `MessageProcessor` (usually a format error).
552
		ProcessingFailed {
553
			/// The `blake2_256` hash of the message.
554
			id: H256,
555
			/// The queue of the message.
556
			origin: MessageOriginOf<T>,
557
			/// The error that occurred.
558
			///
559
			/// This error is pretty opaque. More fine-grained errors need to be emitted as events
560
			/// by the `MessageProcessor`.
561
			error: ProcessMessageError,
562
		},
563
		/// Message is processed.
564
		Processed {
565
			/// The `blake2_256` hash of the message.
566
			id: H256,
567
			/// The queue of the message.
568
			origin: MessageOriginOf<T>,
569
			/// How much weight was used to process the message.
570
			weight_used: Weight,
571
			/// Whether the message was processed.
572
			///
573
			/// Note that this does not mean that the underlying `MessageProcessor` was internally
574
			/// successful. It *solely* means that the MQ pallet will treat this as a success
575
			/// condition and discard the message. Any internal error needs to be emitted as events
576
			/// by the `MessageProcessor`.
577
			success: bool,
578
		},
579
		/// Message placed in overweight queue.
580
		OverweightEnqueued {
581
			/// The `blake2_256` hash of the message.
582
			id: [u8; 32],
583
			/// The queue of the message.
584
			origin: MessageOriginOf<T>,
585
			/// The page of the message.
586
			page_index: PageIndex,
587
			/// The index of the message within the page.
588
			message_index: T::Size,
589
		},
590
		/// This page was reaped.
591
		PageReaped {
592
			/// The queue of the page.
593
			origin: MessageOriginOf<T>,
594
			/// The index of the page.
595
			index: PageIndex,
596
		},
597
	}
598

            
599
2166
	#[pallet::error]
600
	pub enum Error<T> {
601
		/// Page is not reapable because it has items remaining to be processed and is not old
602
		/// enough.
603
		NotReapable,
604
		/// Page to be reaped does not exist.
605
		NoPage,
606
		/// The referenced message could not be found.
607
		NoMessage,
608
		/// The message was already processed and cannot be processed again.
609
		AlreadyProcessed,
610
		/// The message is queued for future execution.
611
		Queued,
612
		/// There is temporarily not enough weight to continue servicing messages.
613
		InsufficientWeight,
614
		/// This message is temporarily unprocessable.
615
		///
616
		/// Such errors are expected, but not guaranteed, to resolve themselves eventually through
617
		/// retrying.
618
		TemporarilyUnprocessable,
619
		/// The queue is paused and no message can be executed from it.
620
		///
621
		/// This can change at any time and may resolve in the future by re-trying.
622
		QueuePaused,
623
		/// Another call is in progress and needs to finish before this call can happen.
624
		RecursiveDisallowed,
625
	}
626

            
627
	/// The index of the first and last (non-empty) pages.
628
215631
	#[pallet::storage]
629
	pub(super) type BookStateFor<T: Config> =
630
		StorageMap<_, Twox64Concat, MessageOriginOf<T>, BookState<MessageOriginOf<T>>, ValueQuery>;
631

            
632
	/// The origin at which we should begin servicing.
633
615864
	#[pallet::storage]
634
	pub(super) type ServiceHead<T: Config> = StorageValue<_, MessageOriginOf<T>, OptionQuery>;
635

            
636
	/// The map of page indices to pages.
637
160971
	#[pallet::storage]
638
	pub(super) type Pages<T: Config> = StorageDoubleMap<
639
		_,
640
		Twox64Concat,
641
		MessageOriginOf<T>,
642
		Twox64Concat,
643
		PageIndex,
644
		Page<T::Size, T::HeapSize>,
645
		OptionQuery,
646
	>;
647

            
648
534523
	#[pallet::hooks]
649
	impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
650
194763
		fn on_initialize(_n: BlockNumberFor<T>) -> Weight {
651
194763
			if let Some(weight_limit) = T::ServiceWeight::get() {
652
194763
				Self::service_queues(weight_limit)
653
			} else {
654
				Weight::zero()
655
			}
656
194763
		}
657

            
658
59532
		fn on_idle(_n: BlockNumberFor<T>, remaining_weight: Weight) -> Weight {
659
59532
			if let Some(weight_limit) = T::IdleMaxServiceWeight::get() {
660
				// Make use of the remaining weight to process enqueued messages.
661
59532
				Self::service_queues(weight_limit.min(remaining_weight))
662
			} else {
663
				Weight::zero()
664
			}
665
59532
		}
666

            
667
		#[cfg(feature = "try-runtime")]
668
53637
		fn try_state(_: BlockNumberFor<T>) -> Result<(), sp_runtime::TryRuntimeError> {
669
53637
			Self::do_try_state()
670
53637
		}
671

            
672
		/// Check all compile-time assumptions about [`crate::Config`].
673
		#[cfg(test)]
674
		fn integrity_test() {
675
			Self::do_integrity_test().expect("Pallet config is valid; qed")
676
		}
677
	}
678

            
679
5295
	#[pallet::call]
680
	impl<T: Config> Pallet<T> {
681
		/// Remove a page which has no more messages remaining to be processed or is stale.
682
		#[pallet::call_index(0)]
683
		#[pallet::weight(T::WeightInfo::reap_page())]
684
		pub fn reap_page(
685
			origin: OriginFor<T>,
686
			message_origin: MessageOriginOf<T>,
687
			page_index: PageIndex,
688
1023
		) -> DispatchResult {
689
1023
			let _ = ensure_signed(origin)?;
690
1023
			Self::do_reap_page(&message_origin, page_index)
691
		}
692

            
693
		/// Execute an overweight message.
694
		///
695
		/// Temporary processing errors will be propagated whereas permanent errors are treated
696
		/// as success condition.
697
		///
698
		/// - `origin`: Must be `Signed`.
699
		/// - `message_origin`: The origin from which the message to be executed arrived.
700
		/// - `page`: The page in the queue in which the message to be executed is sitting.
701
		/// - `index`: The index into the queue of the message to be executed.
702
		/// - `weight_limit`: The maximum amount of weight allowed to be consumed in the execution
703
		///   of the message.
704
		///
705
		/// Benchmark complexity considerations: O(index + weight_limit).
706
		#[pallet::call_index(1)]
707
		#[pallet::weight(
708
			T::WeightInfo::execute_overweight_page_updated().max(
709
			T::WeightInfo::execute_overweight_page_removed()).saturating_add(*weight_limit)
710
		)]
711
		pub fn execute_overweight(
712
			origin: OriginFor<T>,
713
			message_origin: MessageOriginOf<T>,
714
			page: PageIndex,
715
			index: T::Size,
716
			weight_limit: Weight,
717
60
		) -> DispatchResultWithPostInfo {
718
60
			let _ = ensure_signed(origin)?;
719
			let actual_weight =
720
60
				Self::do_execute_overweight(message_origin, page, index, weight_limit)?;
721
			Ok(Some(actual_weight).into())
722
		}
723
	}
724
}
725

            
726
/// The status of a page after trying to execute its next message.
727
#[derive(PartialEq, Debug)]
728
enum PageExecutionStatus {
729
	/// The execution bailed because there was not enough weight remaining.
730
	Bailed,
731
	/// The page did not make any progress on its execution.
732
	///
733
	/// This is a transient condition and can be handled by retrying - exactly like [Bailed].
734
	NoProgress,
735
	/// No more messages could be loaded. This does _not_ imply `page.is_complete()`.
736
	///
737
	/// The reasons for this status are:
738
	///  - The end of the page is reached but there could still be skipped messages.
739
	///  - The storage is corrupted.
740
	NoMore,
741
}
742

            
743
/// The status after trying to execute the next item of a [`Page`].
744
#[derive(PartialEq, Debug)]
745
enum ItemExecutionStatus {
746
	/// The execution bailed because there was not enough weight remaining.
747
	Bailed,
748
	/// The item did not make any progress on its execution.
749
	///
750
	/// This is a transient condition and can be handled by retrying - exactly like [Bailed].
751
	NoProgress,
752
	/// The item was not found.
753
	NoItem,
754
	/// Whether the execution of an item resulted in it being processed.
755
	///
756
	/// One reason for `false` would be permanently overweight.
757
	Executed(bool),
758
}
759

            
760
/// The status of an attempt to process a message.
761
#[derive(PartialEq)]
762
enum MessageExecutionStatus {
763
	/// There is not enough weight remaining at present.
764
	InsufficientWeight,
765
	/// There will never be enough weight.
766
	Overweight,
767
	/// The message was processed successfully.
768
	Processed,
769
	/// The message was processed and resulted in a, possibly permanent, error.
770
	Unprocessable { permanent: bool },
771
	/// The stack depth limit was reached.
772
	///
773
	/// We cannot just return `Unprocessable` in this case, because the processability of the
774
	/// message depends on how the function was called. This may be a permanent error if it was
775
	/// called by a top-level function, or a transient error if it was already called in a nested
776
	/// function.
777
	StackLimitReached,
778
}
779

            
780
impl<T: Config> Pallet<T> {
781
	/// Knit `origin` into the ready ring right at the end.
782
	///
783
	/// Return the two ready ring neighbours of `origin`.
784
	fn ready_ring_knit(origin: &MessageOriginOf<T>) -> Result<Neighbours<MessageOriginOf<T>>, ()> {
785
		if let Some(head) = ServiceHead::<T>::get() {
786
			let mut head_book_state = BookStateFor::<T>::get(&head);
787
			let mut head_neighbours = head_book_state.ready_neighbours.take().ok_or(())?;
788
			let tail = head_neighbours.prev;
789
			head_neighbours.prev = origin.clone();
790
			head_book_state.ready_neighbours = Some(head_neighbours);
791
			BookStateFor::<T>::insert(&head, head_book_state);
792

            
793
			let mut tail_book_state = BookStateFor::<T>::get(&tail);
794
			let mut tail_neighbours = tail_book_state.ready_neighbours.take().ok_or(())?;
795
			tail_neighbours.next = origin.clone();
796
			tail_book_state.ready_neighbours = Some(tail_neighbours);
797
			BookStateFor::<T>::insert(&tail, tail_book_state);
798

            
799
			Ok(Neighbours { next: head, prev: tail })
800
		} else {
801
			ServiceHead::<T>::put(origin);
802
			Ok(Neighbours { next: origin.clone(), prev: origin.clone() })
803
		}
804
	}
805

            
806
	fn ready_ring_unknit(origin: &MessageOriginOf<T>, neighbours: Neighbours<MessageOriginOf<T>>) {
807
		if origin == &neighbours.next {
808
			debug_assert!(
809
				origin == &neighbours.prev,
810
				"unknitting from single item ring; outgoing must be only item"
811
			);
812
			// Service queue empty.
813
			ServiceHead::<T>::kill();
814
		} else {
815
			BookStateFor::<T>::mutate(&neighbours.next, |book_state| {
816
				if let Some(ref mut n) = book_state.ready_neighbours {
817
					n.prev = neighbours.prev.clone()
818
				}
819
			});
820
			BookStateFor::<T>::mutate(&neighbours.prev, |book_state| {
821
				if let Some(ref mut n) = book_state.ready_neighbours {
822
					n.next = neighbours.next.clone()
823
				}
824
			});
825
			if let Some(head) = ServiceHead::<T>::get() {
826
				if &head == origin {
827
					ServiceHead::<T>::put(neighbours.next);
828
				}
829
			} else {
830
				defensive!("`ServiceHead` must be some if there was a ready queue");
831
			}
832
		}
833
	}
834

            
835
	/// Tries to bump the current `ServiceHead` to the next ready queue.
836
	///
837
	/// Returns the current head if it got be bumped and `None` otherwise.
838
254295
	fn bump_service_head(weight: &mut WeightMeter) -> Option<MessageOriginOf<T>> {
839
254295
		if weight.try_consume(T::WeightInfo::bump_service_head()).is_err() {
840
			return None
841
254295
		}
842

            
843
254295
		if let Some(head) = ServiceHead::<T>::get() {
844
			let mut head_book_state = BookStateFor::<T>::get(&head);
845
			if let Some(head_neighbours) = head_book_state.ready_neighbours.take() {
846
				ServiceHead::<T>::put(&head_neighbours.next);
847
				Some(head)
848
			} else {
849
				None
850
			}
851
		} else {
852
254295
			None
853
		}
854
254295
	}
855

            
856
	/// The maximal weight that a single message can consume.
857
	///
858
	/// Any message using more than this will be marked as permanently overweight and not
859
	/// automatically re-attempted. Returns `None` if the servicing of a message cannot begin.
860
	/// `Some(0)` means that only messages with no weight may be served.
861
254295
	fn max_message_weight(limit: Weight) -> Option<Weight> {
862
254295
		limit.checked_sub(&Self::single_msg_overhead())
863
254295
	}
864

            
865
	/// The overhead of servicing a single message.
866
254295
	fn single_msg_overhead() -> Weight {
867
254295
		T::WeightInfo::bump_service_head()
868
254295
			.saturating_add(T::WeightInfo::service_queue_base())
869
254295
			.saturating_add(
870
254295
				T::WeightInfo::service_page_base_completion()
871
254295
					.max(T::WeightInfo::service_page_base_no_completion()),
872
254295
			)
873
254295
			.saturating_add(T::WeightInfo::service_page_item())
874
254295
			.saturating_add(T::WeightInfo::ready_ring_unknit())
875
254295
	}
876

            
877
	/// Checks invariants of the pallet config.
878
	///
879
	/// The results of this can only be relied upon if the config values are set to constants.
880
	#[cfg(test)]
881
	fn do_integrity_test() -> Result<(), String> {
882
		ensure!(!MaxMessageLenOf::<T>::get().is_zero(), "HeapSize too low");
883

            
884
		if let Some(service) = T::ServiceWeight::get() {
885
			if Self::max_message_weight(service).is_none() {
886
				return Err(format!(
887
					"ServiceWeight too low: {}. Must be at least {}",
888
					service,
889
					Self::single_msg_overhead(),
890
				))
891
			}
892
		}
893

            
894
		Ok(())
895
	}
896

            
897
	fn do_enqueue_message(
898
		origin: &MessageOriginOf<T>,
899
		message: BoundedSlice<u8, MaxMessageLenOf<T>>,
900
	) {
901
		let mut book_state = BookStateFor::<T>::get(origin);
902
		book_state.message_count.saturating_inc();
903
		book_state
904
			.size
905
			// This should be payload size, but here the payload *is* the message.
906
			.saturating_accrue(message.len() as u64);
907

            
908
		if book_state.end > book_state.begin {
909
			debug_assert!(book_state.ready_neighbours.is_some(), "Must be in ready ring if ready");
910
			// Already have a page in progress - attempt to append.
911
			let last = book_state.end - 1;
912
			let mut page = match Pages::<T>::get(origin, last) {
913
				Some(p) => p,
914
				None => {
915
					defensive!("Corruption: referenced page doesn't exist.");
916
					return
917
				},
918
			};
919
			if page.try_append_message::<T>(message).is_ok() {
920
				Pages::<T>::insert(origin, last, &page);
921
				BookStateFor::<T>::insert(origin, book_state);
922
				return
923
			}
924
		} else {
925
			debug_assert!(
926
				book_state.ready_neighbours.is_none(),
927
				"Must not be in ready ring if not ready"
928
			);
929
			// insert into ready queue.
930
			match Self::ready_ring_knit(origin) {
931
				Ok(neighbours) => book_state.ready_neighbours = Some(neighbours),
932
				Err(()) => {
933
					defensive!("Ring state invalid when knitting");
934
				},
935
			}
936
		}
937
		// No room on the page or no page - link in a new page.
938
		book_state.end.saturating_inc();
939
		book_state.count.saturating_inc();
940
		let page = Page::from_message::<T>(message);
941
		Pages::<T>::insert(origin, book_state.end - 1, page);
942
		// NOTE: `T::QueueChangeHandler` is called by the caller.
943
		BookStateFor::<T>::insert(origin, book_state);
944
	}
945

            
946
	/// Try to execute a single message that was marked as overweight.
947
	///
948
	/// The `weight_limit` is the weight that can be consumed to execute the message. The base
949
	/// weight of the function it self must be measured by the caller.
950
60
	pub fn do_execute_overweight(
951
60
		origin: MessageOriginOf<T>,
952
60
		page_index: PageIndex,
953
60
		index: T::Size,
954
60
		weight_limit: Weight,
955
60
	) -> Result<Weight, Error<T>> {
956
80
		match with_service_mutex(|| {
957
60
			Self::do_execute_overweight_inner(origin, page_index, index, weight_limit)
958
80
		}) {
959
			Err(()) => Err(Error::<T>::RecursiveDisallowed),
960
60
			Ok(x) => x,
961
		}
962
60
	}
963

            
964
	/// Same as `do_execute_overweight` but must be called while holding the `service_mutex`.
965
60
	fn do_execute_overweight_inner(
966
60
		origin: MessageOriginOf<T>,
967
60
		page_index: PageIndex,
968
60
		index: T::Size,
969
60
		weight_limit: Weight,
970
60
	) -> Result<Weight, Error<T>> {
971
60
		let mut book_state = BookStateFor::<T>::get(&origin);
972
60
		ensure!(!T::QueuePausedQuery::is_paused(&origin), Error::<T>::QueuePaused);
973

            
974
60
		let mut page = Pages::<T>::get(&origin, page_index).ok_or(Error::<T>::NoPage)?;
975
		let (pos, is_processed, payload) =
976
			page.peek_index(index.into() as usize).ok_or(Error::<T>::NoMessage)?;
977
		let payload_len = payload.len() as u64;
978
		ensure!(
979
			page_index < book_state.begin ||
980
				(page_index == book_state.begin && pos < page.first.into() as usize),
981
			Error::<T>::Queued
982
		);
983
		ensure!(!is_processed, Error::<T>::AlreadyProcessed);
984
		use MessageExecutionStatus::*;
985
		let mut weight_counter = WeightMeter::with_limit(weight_limit);
986
		match Self::process_message_payload(
987
			origin.clone(),
988
			page_index,
989
			index,
990
			payload,
991
			&mut weight_counter,
992
			Weight::MAX,
993
			// ^^^ We never recognise it as permanently overweight, since that would result in an
994
			// additional overweight event being deposited.
995
		) {
996
			Overweight | InsufficientWeight => Err(Error::<T>::InsufficientWeight),
997
			StackLimitReached | Unprocessable { permanent: false } =>
998
				Err(Error::<T>::TemporarilyUnprocessable),
999
			Unprocessable { permanent: true } | Processed => {
				page.note_processed_at_pos(pos);
				book_state.message_count.saturating_dec();
				book_state.size.saturating_reduce(payload_len);
				let page_weight = if page.remaining.is_zero() {
					debug_assert!(
						page.remaining_size.is_zero(),
						"no messages remaining; no space taken; qed"
					);
					Pages::<T>::remove(&origin, page_index);
					debug_assert!(book_state.count >= 1, "page exists, so book must have pages");
					book_state.count.saturating_dec();
					T::WeightInfo::execute_overweight_page_removed()
				// no need to consider .first or ready ring since processing an overweight page
				// would not alter that state.
				} else {
					Pages::<T>::insert(&origin, page_index, page);
					T::WeightInfo::execute_overweight_page_updated()
				};
				BookStateFor::<T>::insert(&origin, &book_state);
				T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
				Ok(weight_counter.consumed().saturating_add(page_weight))
			},
		}
60
	}
	/// Remove a stale page or one which has no more messages remaining to be processed.
1023
	fn do_reap_page(origin: &MessageOriginOf<T>, page_index: PageIndex) -> DispatchResult {
1364
		match with_service_mutex(|| Self::do_reap_page_inner(origin, page_index)) {
			Err(()) => Err(Error::<T>::RecursiveDisallowed.into()),
1023
			Ok(x) => x,
		}
1023
	}
	/// Same as `do_reap_page` but must be called while holding the `service_mutex`.
1023
	fn do_reap_page_inner(origin: &MessageOriginOf<T>, page_index: PageIndex) -> DispatchResult {
1023
		let mut book_state = BookStateFor::<T>::get(origin);
1023
		// definitely not reapable if the page's index is no less than the `begin`ning of ready
1023
		// pages.
1023
		ensure!(page_index < book_state.begin, Error::<T>::NotReapable);
		let page = Pages::<T>::get(origin, page_index).ok_or(Error::<T>::NoPage)?;
		// definitely reapable if the page has no messages in it.
		let reapable = page.remaining.is_zero();
		// also reapable if the page index has dropped below our watermark.
		let cullable = || {
			let total_pages = book_state.count;
			let ready_pages = book_state.end.saturating_sub(book_state.begin).min(total_pages);
			// The number of stale pages - i.e. pages which contain unprocessed overweight messages.
			// We would prefer to keep these around but will restrict how far into history they can
			// extend if we notice that there's too many of them.
			//
			// We don't know *where* in history these pages are so we use a dynamic formula which
			// reduces the historical time horizon as the stale pages pile up and increases it as
			// they reduce.
			let stale_pages = total_pages - ready_pages;
			// The maximum number of stale pages (i.e. of overweight messages) allowed before
			// culling can happen at all. Once there are more stale pages than this, then historical
			// pages may be dropped, even if they contain unprocessed overweight messages.
			let max_stale = T::MaxStale::get();
			// The amount beyond the maximum which are being used. If it's not beyond the maximum
			// then we exit now since no culling is needed.
			let overflow = match stale_pages.checked_sub(max_stale + 1) {
				Some(x) => x + 1,
				None => return false,
			};
			// The special formula which tells us how deep into index-history we will pages. As
			// the overflow is greater (and thus the need to drop items from storage is more urgent)
			// this is reduced, allowing a greater range of pages to be culled.
			// With a minimum `overflow` (`1`), this returns `max_stale ** 2`, indicating we only
			// cull beyond that number of indices deep into history.
			// At this overflow increases, our depth reduces down to a limit of `max_stale`. We
			// never want to reduce below this since this will certainly allow enough pages to be
			// culled in order to bring `overflow` back to zero.
			let backlog = (max_stale * max_stale / overflow).max(max_stale);
			let watermark = book_state.begin.saturating_sub(backlog);
			page_index < watermark
		};
		ensure!(reapable || cullable(), Error::<T>::NotReapable);
		Pages::<T>::remove(origin, page_index);
		debug_assert!(book_state.count > 0, "reaping a page implies there are pages");
		book_state.count.saturating_dec();
		book_state.message_count.saturating_reduce(page.remaining.into() as u64);
		book_state.size.saturating_reduce(page.remaining_size.into() as u64);
		BookStateFor::<T>::insert(origin, &book_state);
		T::QueueChangeHandler::on_queue_changed(origin.clone(), book_state.into());
		Self::deposit_event(Event::PageReaped { origin: origin.clone(), index: page_index });
		Ok(())
1023
	}
	/// Execute any messages remaining to be processed in the queue of `origin`, using up to
	/// `weight_limit` to do so. Any messages which would take more than `overweight_limit` to
	/// execute are deemed overweight and ignored.
	fn service_queue(
		origin: MessageOriginOf<T>,
		weight: &mut WeightMeter,
		overweight_limit: Weight,
	) -> (bool, Option<MessageOriginOf<T>>) {
		use PageExecutionStatus::*;
		if weight
			.try_consume(
				T::WeightInfo::service_queue_base()
					.saturating_add(T::WeightInfo::ready_ring_unknit()),
			)
			.is_err()
		{
			return (false, None)
		}
		let mut book_state = BookStateFor::<T>::get(&origin);
		let mut total_processed = 0;
		if T::QueuePausedQuery::is_paused(&origin) {
			let next_ready = book_state.ready_neighbours.as_ref().map(|x| x.next.clone());
			return (false, next_ready)
		}
		while book_state.end > book_state.begin {
			let (processed, status) =
				Self::service_page(&origin, &mut book_state, weight, overweight_limit);
			total_processed.saturating_accrue(processed);
			match status {
				// Store the page progress and do not go to the next one.
				Bailed | NoProgress => break,
				// Go to the next page if this one is at the end.
				NoMore => (),
			};
			book_state.begin.saturating_inc();
		}
		let next_ready = book_state.ready_neighbours.as_ref().map(|x| x.next.clone());
		if book_state.begin >= book_state.end {
			// No longer ready - unknit.
			if let Some(neighbours) = book_state.ready_neighbours.take() {
				Self::ready_ring_unknit(&origin, neighbours);
			} else if total_processed > 0 {
				defensive!("Freshly processed queue must have been ready");
			}
		}
		BookStateFor::<T>::insert(&origin, &book_state);
		if total_processed > 0 {
			T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
		}
		(total_processed > 0, next_ready)
	}
	/// Service as many messages of a page as possible.
	///
	/// Returns how many messages were processed and the page's status.
	fn service_page(
		origin: &MessageOriginOf<T>,
		book_state: &mut BookStateOf<T>,
		weight: &mut WeightMeter,
		overweight_limit: Weight,
	) -> (u32, PageExecutionStatus) {
		use PageExecutionStatus::*;
		if weight
			.try_consume(
				T::WeightInfo::service_page_base_completion()
					.max(T::WeightInfo::service_page_base_no_completion()),
			)
			.is_err()
		{
			return (0, Bailed)
		}
		let page_index = book_state.begin;
		let mut page = match Pages::<T>::get(origin, page_index) {
			Some(p) => p,
			None => {
				defensive!("message-queue: referenced page not found");
				return (0, NoMore)
			},
		};
		let mut total_processed = 0;
		// Execute as many messages as possible.
		let status = loop {
			use ItemExecutionStatus::*;
			match Self::service_page_item(
				origin,
				page_index,
				book_state,
				&mut page,
				weight,
				overweight_limit,
			) {
				Bailed => break PageExecutionStatus::Bailed,
				NoItem => break PageExecutionStatus::NoMore,
				NoProgress => break PageExecutionStatus::NoProgress,
				// Keep going as long as we make progress...
				Executed(true) => total_processed.saturating_inc(),
				Executed(false) => (),
			}
		};
		if page.is_complete() {
			debug_assert!(status != Bailed, "we never bail if a page became complete");
			Pages::<T>::remove(origin, page_index);
			debug_assert!(book_state.count > 0, "completing a page implies there are pages");
			book_state.count.saturating_dec();
		} else {
			Pages::<T>::insert(origin, page_index, page);
		}
		(total_processed, status)
	}
	/// Execute the next message of a page.
	pub(crate) fn service_page_item(
		origin: &MessageOriginOf<T>,
		page_index: PageIndex,
		book_state: &mut BookStateOf<T>,
		page: &mut PageOf<T>,
		weight: &mut WeightMeter,
		overweight_limit: Weight,
	) -> ItemExecutionStatus {
		use MessageExecutionStatus::*;
		// This ugly pre-checking is needed for the invariant
		// "we never bail if a page became complete".
		if page.is_complete() {
			return ItemExecutionStatus::NoItem
		}
		if weight.try_consume(T::WeightInfo::service_page_item()).is_err() {
			return ItemExecutionStatus::Bailed
		}
		let payload = &match page.peek_first() {
			Some(m) => m,
			None => return ItemExecutionStatus::NoItem,
		}[..];
		let payload_len = payload.len() as u64;
		// Store these for the case that `process_message_payload` is recursive.
		Pages::<T>::insert(origin, page_index, &*page);
		BookStateFor::<T>::insert(origin, &*book_state);
		let res = Self::process_message_payload(
			origin.clone(),
			page_index,
			page.first_index,
			payload,
			weight,
			overweight_limit,
		);
		// And restore them afterwards to see the changes of a recursive call.
		*book_state = BookStateFor::<T>::get(origin);
		if let Some(new_page) = Pages::<T>::get(origin, page_index) {
			*page = new_page;
		} else {
			defensive!("page must exist since we just inserted it and recursive calls are not allowed to remove anything");
			return ItemExecutionStatus::NoItem
		};
		let is_processed = match res {
			InsufficientWeight => return ItemExecutionStatus::Bailed,
			Unprocessable { permanent: false } => return ItemExecutionStatus::NoProgress,
			Processed | Unprocessable { permanent: true } | StackLimitReached => true,
			Overweight => false,
		};
		if is_processed {
			book_state.message_count.saturating_dec();
			book_state.size.saturating_reduce(payload_len as u64);
		}
		page.skip_first(is_processed);
		ItemExecutionStatus::Executed(is_processed)
	}
	/// Ensure the correctness of state of this pallet.
	///
	/// # Assumptions-
	///
	/// If `serviceHead` points to a ready Queue, then BookState of that Queue has:
	///
	/// * `message_count` > 0
	/// * `size` > 0
	/// * `end` > `begin`
	/// * Some(ready_neighbours)
	/// * If `ready_neighbours.next` == self.origin, then `ready_neighbours.prev` == self.origin
	///   (only queue in ring)
	///
	/// For Pages(begin to end-1) in BookState:
	///
	/// * `remaining` > 0
	/// * `remaining_size` > 0
	/// * `first` <= `last`
	/// * Every page can be decoded into peek_* functions
	#[cfg(any(test, feature = "try-runtime", feature = "std"))]
53637
	pub fn do_try_state() -> Result<(), sp_runtime::TryRuntimeError> {
53637
		// Checking memory corruption for BookStateFor
53637
		ensure!(
53637
			BookStateFor::<T>::iter_keys().count() == BookStateFor::<T>::iter_values().count(),
			"Memory Corruption in BookStateFor"
		);
		// Checking memory corruption for Pages
53637
		ensure!(
53637
			Pages::<T>::iter_keys().count() == Pages::<T>::iter_values().count(),
			"Memory Corruption in Pages"
		);
		// Basic checks for each book
53637
		for book in BookStateFor::<T>::iter_values() {
			ensure!(book.end >= book.begin, "Invariant");
			ensure!(book.end < 1 << 30, "Likely overflow or corruption");
			ensure!(book.message_count < 1 << 30, "Likely overflow or corruption");
			ensure!(book.size < 1 << 30, "Likely overflow or corruption");
			ensure!(book.count < 1 << 30, "Likely overflow or corruption");
			let fp: QueueFootprint = book.into();
			ensure!(fp.ready_pages <= fp.pages, "There cannot be more ready than total pages");
		}
		//loop around this origin
53637
		let Some(starting_origin) = ServiceHead::<T>::get() else { return Ok(()) };
		while let Some(head) = Self::bump_service_head(&mut WeightMeter::new()) {
			ensure!(
				BookStateFor::<T>::contains_key(&head),
				"Service head must point to an existing book"
			);
			let head_book_state = BookStateFor::<T>::get(&head);
			ensure!(
				head_book_state.message_count > 0,
				"There must be some messages if in ReadyRing"
			);
			ensure!(head_book_state.size > 0, "There must be some message size if in ReadyRing");
			ensure!(
				head_book_state.end > head_book_state.begin,
				"End > Begin if unprocessed messages exists"
			);
			ensure!(
				head_book_state.ready_neighbours.is_some(),
				"There must be neighbours if in ReadyRing"
			);
			if head_book_state.ready_neighbours.as_ref().unwrap().next == head {
				ensure!(
					head_book_state.ready_neighbours.as_ref().unwrap().prev == head,
					"Can only happen if only queue in ReadyRing"
				);
			}
			for page_index in head_book_state.begin..head_book_state.end {
				let page = Pages::<T>::get(&head, page_index).unwrap();
				let remaining_messages = page.remaining;
				let mut counted_remaining_messages: u32 = 0;
				ensure!(
					remaining_messages > 0.into(),
					"These must be some messages that have not been processed yet!"
				);
				for i in 0..u32::MAX {
					if let Some((_, processed, _)) = page.peek_index(i as usize) {
						if !processed {
							counted_remaining_messages += 1;
						}
					} else {
						break
					}
				}
				ensure!(
					remaining_messages.into() == counted_remaining_messages,
					"Memory Corruption"
				);
			}
			if head_book_state.ready_neighbours.as_ref().unwrap().next == starting_origin {
				break
			}
		}
		Ok(())
53637
	}
	/// Print the pages in each queue and the messages in each page.
	///
	/// Processed messages are prefixed with a `*` and the current `begin`ning page with a `>`.
	///
	/// # Example output
	///
	/// ```text
	/// queue Here:
	///   page 0: []
	/// > page 1: []
	///   page 2: ["\0weight=4", "\0c", ]
	///   page 3: ["\0bigbig 1", ]
	///   page 4: ["\0bigbig 2", ]
	///   page 5: ["\0bigbig 3", ]
	/// ```
	#[cfg(feature = "std")]
	pub fn debug_info() -> String {
		let mut info = String::new();
		for (origin, book_state) in BookStateFor::<T>::iter() {
			let mut queue = format!("queue {:?}:\n", &origin);
			let mut pages = Pages::<T>::iter_prefix(&origin).collect::<Vec<_>>();
			pages.sort_by(|(a, _), (b, _)| a.cmp(b));
			for (page_index, mut page) in pages.into_iter() {
				let page_info = if book_state.begin == page_index { ">" } else { " " };
				let mut page_info = format!(
					"{} page {} ({:?} first, {:?} last, {:?} remain): [ ",
					page_info, page_index, page.first, page.last, page.remaining
				);
				for i in 0..u32::MAX {
					if let Some((_, processed, message)) =
						page.peek_index(i.try_into().expect("std-only code"))
					{
						let msg = String::from_utf8_lossy(message);
						if processed {
							page_info.push('*');
						}
						page_info.push_str(&format!("{:?}, ", msg));
						page.skip_first(true);
					} else {
						break
					}
				}
				page_info.push_str("]\n");
				queue.push_str(&page_info);
			}
			info.push_str(&queue);
		}
		info
	}
	/// Process a single message.
	///
	/// The base weight of this function needs to be accounted for by the caller. `weight` is the
	/// remaining weight to process the message. `overweight_limit` is the maximum weight that a
	/// message can ever consume. Messages above this limit are marked as permanently overweight.
	fn process_message_payload(
		origin: MessageOriginOf<T>,
		page_index: PageIndex,
		message_index: T::Size,
		message: &[u8],
		meter: &mut WeightMeter,
		overweight_limit: Weight,
	) -> MessageExecutionStatus {
		let mut id = sp_io::hashing::blake2_256(message);
		use ProcessMessageError::*;
		let prev_consumed = meter.consumed();
		match T::MessageProcessor::process_message(message, origin.clone(), meter, &mut id) {
			Err(Overweight(w)) if w.any_gt(overweight_limit) => {
				// Permanently overweight.
				Self::deposit_event(Event::<T>::OverweightEnqueued {
					id,
					origin,
					page_index,
					message_index,
				});
				MessageExecutionStatus::Overweight
			},
			Err(Overweight(_)) => {
				// Temporarily overweight - save progress and stop processing this
				// queue.
				MessageExecutionStatus::InsufficientWeight
			},
			Err(Yield) => {
				// Processing should be reattempted later.
				MessageExecutionStatus::Unprocessable { permanent: false }
			},
			Err(error @ BadFormat | error @ Corrupt | error @ Unsupported) => {
				// Permanent error - drop
				Self::deposit_event(Event::<T>::ProcessingFailed { id: id.into(), origin, error });
				MessageExecutionStatus::Unprocessable { permanent: true }
			},
			Err(error @ StackLimitReached) => {
				Self::deposit_event(Event::<T>::ProcessingFailed { id: id.into(), origin, error });
				MessageExecutionStatus::StackLimitReached
			},
			Ok(success) => {
				// Success
				let weight_used = meter.consumed().saturating_sub(prev_consumed);
				Self::deposit_event(Event::<T>::Processed {
					id: id.into(),
					origin,
					weight_used,
					success,
				});
				MessageExecutionStatus::Processed
			},
		}
	}
}
/// Run a closure that errors on re-entrance. Meant to be used by anything that services queues.
255378
pub(crate) fn with_service_mutex<F: FnOnce() -> R, R>(f: F) -> Result<R, ()> {
255378
	// Holds the singleton token instance.
255378
	environmental::environmental!(token: Option<()>);
255378

            
340504
	token::using_once(&mut Some(()), || {
		// The first `ok_or` should always be `Ok` since we are inside a `using_once`.
255378
		let hold = token::with(|t| t.take()).ok_or(()).defensive()?.ok_or(())?;
		// Put the token back when we're done.
255378
		defer! {
340504
			token::with(|t| {
340504
				*t = Some(hold);
340504
			});
255378
		}
255378

            
255378
		Ok(f())
340504
	})
255378
}
/// Provides a [`sp_core::Get`] to access the `MEL` of a [`codec::MaxEncodedLen`] type.
pub struct MaxEncodedLenOf<T>(core::marker::PhantomData<T>);
impl<T: MaxEncodedLen> Get<u32> for MaxEncodedLenOf<T> {
	fn get() -> u32 {
		T::max_encoded_len() as u32
	}
}
/// Calculates the maximum message length and exposed it through the [`codec::MaxEncodedLen`] trait.
pub struct MaxMessageLen<Origin, Size, HeapSize>(
	core::marker::PhantomData<(Origin, Size, HeapSize)>,
);
impl<Origin: MaxEncodedLen, Size: MaxEncodedLen + Into<u32>, HeapSize: Get<Size>> Get<u32>
	for MaxMessageLen<Origin, Size, HeapSize>
{
	fn get() -> u32 {
		(HeapSize::get().into()).saturating_sub(ItemHeader::<Size>::max_encoded_len() as u32)
	}
}
/// The maximal message length.
pub type MaxMessageLenOf<T> =
	MaxMessageLen<MessageOriginOf<T>, <T as Config>::Size, <T as Config>::HeapSize>;
/// The maximal encoded origin length.
pub type MaxOriginLenOf<T> = MaxEncodedLenOf<MessageOriginOf<T>>;
/// The `MessageOrigin` of this pallet.
pub type MessageOriginOf<T> = <<T as Config>::MessageProcessor as ProcessMessage>::Origin;
/// The maximal heap size of a page.
pub type HeapSizeU32Of<T> = IntoU32<<T as Config>::HeapSize, <T as Config>::Size>;
/// The [`Page`] of this pallet.
pub type PageOf<T> = Page<<T as Config>::Size, <T as Config>::HeapSize>;
/// The [`BookState`] of this pallet.
pub type BookStateOf<T> = BookState<MessageOriginOf<T>>;
/// Converts a [`sp_core::Get`] with returns a type that can be cast into an `u32` into a `Get`
/// which returns an `u32`.
pub struct IntoU32<T, O>(core::marker::PhantomData<(T, O)>);
impl<T: Get<O>, O: Into<u32>> Get<u32> for IntoU32<T, O> {
	fn get() -> u32 {
		T::get().into()
	}
}
impl<T: Config> ServiceQueues for Pallet<T> {
	type OverweightMessageAddress = (MessageOriginOf<T>, PageIndex, T::Size);
254295
	fn service_queues(weight_limit: Weight) -> Weight {
254295
		let mut weight = WeightMeter::with_limit(weight_limit);
254295

            
254295
		// Get the maximum weight that processing a single message may take:
254295
		let max_weight = Self::max_message_weight(weight_limit).unwrap_or_else(|| {
			defensive!("Not enough weight to service a single message.");
			Weight::zero()
254295
		});
254295

            
339060
		match with_service_mutex(|| {
254295
			let mut next = match Self::bump_service_head(&mut weight) {
				Some(h) => h,
254295
				None => return weight.consumed(),
			};
			// The last queue that did not make any progress.
			// The loop aborts as soon as it arrives at this queue again without making any progress
			// on other queues in between.
			let mut last_no_progress = None;
			loop {
				let (progressed, n) = Self::service_queue(next.clone(), &mut weight, max_weight);
				next = match n {
					Some(n) =>
						if !progressed {
							if last_no_progress == Some(n.clone()) {
								break
							}
							if last_no_progress.is_none() {
								last_no_progress = Some(next.clone())
							}
							n
						} else {
							last_no_progress = None;
							n
						},
					None => break,
				}
			}
			weight.consumed()
339060
		}) {
			Err(()) => weight.consumed(),
254295
			Ok(w) => w,
		}
254295
	}
	/// Execute a single overweight message.
	///
	/// The weight limit must be enough for `execute_overweight` and the message execution itself.
	fn execute_overweight(
		weight_limit: Weight,
		(message_origin, page, index): Self::OverweightMessageAddress,
	) -> Result<Weight, ExecuteOverweightError> {
		let mut weight = WeightMeter::with_limit(weight_limit);
		if weight
			.try_consume(
				T::WeightInfo::execute_overweight_page_removed()
					.max(T::WeightInfo::execute_overweight_page_updated()),
			)
			.is_err()
		{
			return Err(ExecuteOverweightError::InsufficientWeight)
		}
		Pallet::<T>::do_execute_overweight(message_origin, page, index, weight.remaining()).map_err(
			|e| match e {
				Error::<T>::InsufficientWeight => ExecuteOverweightError::InsufficientWeight,
				Error::<T>::AlreadyProcessed => ExecuteOverweightError::AlreadyProcessed,
				Error::<T>::QueuePaused => ExecuteOverweightError::QueuePaused,
				Error::<T>::NoPage | Error::<T>::NoMessage | Error::<T>::Queued =>
					ExecuteOverweightError::NotFound,
				Error::<T>::RecursiveDisallowed => ExecuteOverweightError::RecursiveDisallowed,
				_ => ExecuteOverweightError::Other,
			},
		)
	}
}
impl<T: Config> EnqueueMessage<MessageOriginOf<T>> for Pallet<T> {
	type MaxMessageLen =
		MaxMessageLen<<T::MessageProcessor as ProcessMessage>::Origin, T::Size, T::HeapSize>;
	fn enqueue_message(
		message: BoundedSlice<u8, Self::MaxMessageLen>,
		origin: <T::MessageProcessor as ProcessMessage>::Origin,
	) {
		Self::do_enqueue_message(&origin, message);
		let book_state = BookStateFor::<T>::get(&origin);
		T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
	}
	fn enqueue_messages<'a>(
		messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
		origin: <T::MessageProcessor as ProcessMessage>::Origin,
	) {
		for message in messages {
			Self::do_enqueue_message(&origin, message);
		}
		let book_state = BookStateFor::<T>::get(&origin);
		T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
	}
	fn sweep_queue(origin: MessageOriginOf<T>) {
		if !BookStateFor::<T>::contains_key(&origin) {
			return
		}
		let mut book_state = BookStateFor::<T>::get(&origin);
		book_state.begin = book_state.end;
		if let Some(neighbours) = book_state.ready_neighbours.take() {
			Self::ready_ring_unknit(&origin, neighbours);
		}
		BookStateFor::<T>::insert(&origin, &book_state);
	}
	fn footprint(origin: MessageOriginOf<T>) -> QueueFootprint {
		BookStateFor::<T>::get(&origin).into()
	}
}