1
// Copyright (C) Parity Technologies (UK) Ltd.
2
// This file is part of Polkadot.
3

            
4
// Polkadot is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Polkadot is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
//! To prevent Out of Memory errors on the `DownwardMessageQueue`, an
18
//! exponential fee factor (`DeliveryFeeFactor`) is set. The fee factor
19
//! increments exponentially after the number of messages in the
20
//! `DownwardMessageQueue` passes a threshold. This threshold is set as:
21
//!
22
//! ```ignore
23
//! // Maximum max sized messages that can be send to
24
//! // the DownwardMessageQueue before it runs out of memory
25
//! max_messages = MAX_POSSIBLE_ALLOCATION / max_downward_message_size
26
//! threshold = max_messages / THRESHOLD_FACTOR
27
//! ```
28
//! Based on the THRESHOLD_FACTOR, the threshold is set as a fraction of the
29
//! total messages. The `DeliveryFeeFactor` increases for a message over the
30
//! threshold by:
31
//!
32
//! `DeliveryFeeFactor = DeliveryFeeFactor *
33
//! (EXPONENTIAL_FEE_BASE + MESSAGE_SIZE_FEE_BASE * encoded_message_size_in_KB)`
34
//!
35
//! And decreases when the number of messages in the `DownwardMessageQueue` fall
36
//! below the threshold by:
37
//!
38
//! `DeliveryFeeFactor = DeliveryFeeFactor / EXPONENTIAL_FEE_BASE`
39
//!
40
//! As an extra defensive measure, a `max_messages` hard
41
//! limit is set to the number of messages in the DownwardMessageQueue. Messages
42
//! that would increase the number of messages in the queue above this hard
43
//! limit are dropped.
44

            
45
use crate::{
46
	configuration::{self, HostConfiguration},
47
	initializer, FeeTracker,
48
};
49
use alloc::vec::Vec;
50
use core::fmt;
51
use frame_support::pallet_prelude::*;
52
use frame_system::pallet_prelude::BlockNumberFor;
53
use polkadot_primitives::{DownwardMessage, Hash, Id as ParaId, InboundDownwardMessage};
54
use sp_core::MAX_POSSIBLE_ALLOCATION;
55
use sp_runtime::{
56
	traits::{BlakeTwo256, Hash as HashT, SaturatedConversion},
57
	FixedU128, Saturating,
58
};
59
use xcm::latest::SendError;
60

            
61
pub use pallet::*;
62

            
63
#[cfg(test)]
64
mod tests;
65

            
66
const THRESHOLD_FACTOR: u32 = 2;
67
const EXPONENTIAL_FEE_BASE: FixedU128 = FixedU128::from_rational(105, 100); // 1.05
68
const MESSAGE_SIZE_FEE_BASE: FixedU128 = FixedU128::from_rational(1, 1000); // 0.001
69

            
70
/// An error sending a downward message.
71
#[cfg_attr(test, derive(Debug))]
72
pub enum QueueDownwardMessageError {
73
	/// The message being sent exceeds the configured max message size.
74
	ExceedsMaxMessageSize,
75
}
76

            
77
impl From<QueueDownwardMessageError> for SendError {
78
	fn from(err: QueueDownwardMessageError) -> Self {
79
		match err {
80
			QueueDownwardMessageError::ExceedsMaxMessageSize => SendError::ExceedsMaxMessageSize,
81
		}
82
	}
83
}
84

            
85
/// An error returned by [`Pallet::check_processed_downward_messages`] that indicates an acceptance
86
/// check didn't pass.
87
pub(crate) enum ProcessedDownwardMessagesAcceptanceErr {
88
	/// If there are pending messages then `processed_downward_messages` should be at least 1,
89
	AdvancementRule,
90
	/// `processed_downward_messages` should not be greater than the number of pending messages.
91
	Underflow { processed_downward_messages: u32, dmq_length: u32 },
92
}
93

            
94
impl fmt::Debug for ProcessedDownwardMessagesAcceptanceErr {
95
	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
96
		use ProcessedDownwardMessagesAcceptanceErr::*;
97
		match *self {
98
			AdvancementRule => {
99
				write!(fmt, "DMQ is not empty, but processed_downward_messages is 0",)
100
			},
101
			Underflow { processed_downward_messages, dmq_length } => write!(
102
				fmt,
103
				"processed_downward_messages = {}, but dmq_length is only {}",
104
				processed_downward_messages, dmq_length,
105
			),
106
		}
107
	}
108
}
109

            
110
#[frame_support::pallet]
111
pub mod pallet {
112
	use super::*;
113

            
114
554790
	#[pallet::pallet]
115
	#[pallet::without_storage_info]
116
	pub struct Pallet<T>(_);
117

            
118
	#[pallet::config]
119
	pub trait Config: frame_system::Config + configuration::Config {}
120

            
121
	/// The downward messages addressed for a certain para.
122
4248
	#[pallet::storage]
123
	pub type DownwardMessageQueues<T: Config> = StorageMap<
124
		_,
125
		Twox64Concat,
126
		ParaId,
127
		Vec<InboundDownwardMessage<BlockNumberFor<T>>>,
128
		ValueQuery,
129
	>;
130

            
131
	/// A mapping that stores the downward message queue MQC head for each para.
132
	///
133
	/// Each link in this chain has a form:
134
	/// `(prev_head, B, H(M))`, where
135
	/// - `prev_head`: is the previous head hash or zero if none.
136
	/// - `B`: is the relay-chain block number in which a message was appended.
137
	/// - `H(M)`: is the hash of the message being appended.
138
1203
	#[pallet::storage]
139
	pub(crate) type DownwardMessageQueueHeads<T: Config> =
140
		StorageMap<_, Twox64Concat, ParaId, Hash, ValueQuery>;
141

            
142
	/// Initialization value for the DeliveryFee factor.
143
2660
	#[pallet::type_value]
144
2660
	pub fn InitialFactor() -> FixedU128 {
145
2660
		FixedU128::from_u32(1)
146
2660
	}
147

            
148
	/// The factor to multiply the base delivery fee by.
149
1995
	#[pallet::storage]
150
	pub(crate) type DeliveryFeeFactor<T: Config> =
151
		StorageMap<_, Twox64Concat, ParaId, FixedU128, ValueQuery, InitialFactor>;
152
}
153
/// Routines and getters related to downward message passing.
154
impl<T: Config> Pallet<T> {
155
	/// Block initialization logic, called by initializer.
156
194763
	pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
157
194763
		Weight::zero()
158
194763
	}
159

            
160
	/// Block finalization logic, called by initializer.
161
194763
	pub(crate) fn initializer_finalize() {}
162

            
163
	/// Called by the initializer to note that a new session has started.
164
135234
	pub(crate) fn initializer_on_new_session(
165
135234
		_notification: &initializer::SessionChangeNotification<BlockNumberFor<T>>,
166
135234
		outgoing_paras: &[ParaId],
167
135234
	) {
168
135234
		Self::perform_outgoing_para_cleanup(outgoing_paras);
169
135234
	}
170

            
171
	/// Iterate over all paras that were noted for offboarding and remove all the data
172
	/// associated with them.
173
135234
	fn perform_outgoing_para_cleanup(outgoing: &[ParaId]) {
174
135234
		for outgoing_para in outgoing {
175
			Self::clean_dmp_after_outgoing(outgoing_para);
176
		}
177
135234
	}
178

            
179
	/// Remove all relevant storage items for an outgoing parachain.
180
	fn clean_dmp_after_outgoing(outgoing_para: &ParaId) {
181
		DownwardMessageQueues::<T>::remove(outgoing_para);
182
		DownwardMessageQueueHeads::<T>::remove(outgoing_para);
183
	}
184

            
185
	/// Determine whether enqueuing a downward message to a specific recipient para would result
186
	/// in an error. If this returns `Ok(())` the caller can be certain that a call to
187
	/// `queue_downward_message` with the same parameters will be successful.
188
1842
	pub fn can_queue_downward_message(
189
1842
		config: &HostConfiguration<BlockNumberFor<T>>,
190
1842
		para: &ParaId,
191
1842
		msg: &DownwardMessage,
192
1842
	) -> Result<(), QueueDownwardMessageError> {
193
1842
		let serialized_len = msg.len() as u32;
194
1842
		if serialized_len > config.max_downward_message_size {
195
			return Err(QueueDownwardMessageError::ExceedsMaxMessageSize)
196
1842
		}
197
1842

            
198
1842
		// Hard limit on Queue size
199
1842
		if Self::dmq_length(*para) > Self::dmq_max_length(config.max_downward_message_size) {
200
			return Err(QueueDownwardMessageError::ExceedsMaxMessageSize)
201
1842
		}
202
1842

            
203
1842
		Ok(())
204
1842
	}
205

            
206
	/// Enqueue a downward message to a specific recipient para.
207
	///
208
	/// When encoded, the message should not exceed the `config.max_downward_message_size`.
209
	/// Otherwise, the message won't be sent and `Err` will be returned.
210
	///
211
	/// It is possible to send a downward message to a non-existent para. That, however, would lead
212
	/// to a dangling storage. If the caller cannot statically prove that the recipient exists
213
	/// then the caller should perform a runtime check.
214
1203
	pub fn queue_downward_message(
215
1203
		config: &HostConfiguration<BlockNumberFor<T>>,
216
1203
		para: ParaId,
217
1203
		msg: DownwardMessage,
218
1203
	) -> Result<(), QueueDownwardMessageError> {
219
1203
		let serialized_len = msg.len() as u32;
220
1203
		if serialized_len > config.max_downward_message_size {
221
			return Err(QueueDownwardMessageError::ExceedsMaxMessageSize)
222
1203
		}
223
1203

            
224
1203
		// Hard limit on Queue size
225
1203
		if Self::dmq_length(para) > Self::dmq_max_length(config.max_downward_message_size) {
226
			return Err(QueueDownwardMessageError::ExceedsMaxMessageSize)
227
1203
		}
228
1203

            
229
1203
		let inbound =
230
1203
			InboundDownwardMessage { msg, sent_at: frame_system::Pallet::<T>::block_number() };
231
1203

            
232
1203
		// obtain the new link in the MQC and update the head.
233
1604
		DownwardMessageQueueHeads::<T>::mutate(para, |head| {
234
1203
			let new_head =
235
1203
				BlakeTwo256::hash_of(&(*head, inbound.sent_at, T::Hashing::hash_of(&inbound.msg)));
236
1203
			*head = new_head;
237
1604
		});
238
1203

            
239
1604
		let q_len = DownwardMessageQueues::<T>::mutate(para, |v| {
240
1203
			v.push(inbound);
241
1203
			v.len()
242
1604
		});
243
1203

            
244
1203
		let threshold =
245
1203
			Self::dmq_max_length(config.max_downward_message_size).saturating_div(THRESHOLD_FACTOR);
246
1203
		if q_len > (threshold as usize) {
247
			let message_size_factor = FixedU128::from((serialized_len / 1024) as u128)
248
				.saturating_mul(MESSAGE_SIZE_FEE_BASE);
249
			Self::increase_fee_factor(para, message_size_factor);
250
1203
		}
251

            
252
1203
		Ok(())
253
1203
	}
254

            
255
	/// Checks if the number of processed downward messages is valid.
256
	pub(crate) fn check_processed_downward_messages(
257
		para: ParaId,
258
		relay_parent_number: BlockNumberFor<T>,
259
		processed_downward_messages: u32,
260
	) -> Result<(), ProcessedDownwardMessagesAcceptanceErr> {
261
		let dmq_length = Self::dmq_length(para);
262

            
263
		if dmq_length > 0 && processed_downward_messages == 0 {
264
			// The advancement rule is for at least one downwards message to be processed
265
			// if the queue is non-empty at the relay-parent. Downwards messages are annotated
266
			// with the block number, so we compare the earliest (first) against the relay parent.
267
			let contents = Self::dmq_contents(para);
268

            
269
			// sanity: if dmq_length is >0 this should always be 'Some'.
270
			if contents.get(0).map_or(false, |msg| msg.sent_at <= relay_parent_number) {
271
				return Err(ProcessedDownwardMessagesAcceptanceErr::AdvancementRule)
272
			}
273
		}
274

            
275
		// Note that we might be allowing a parachain to signal that it's processed
276
		// messages that hadn't been placed in the queue at the relay_parent.
277
		// only 'stupid' parachains would do it and we don't (and can't) force anyone
278
		// to act on messages, so the lenient approach is fine here.
279
		if dmq_length < processed_downward_messages {
280
			return Err(ProcessedDownwardMessagesAcceptanceErr::Underflow {
281
				processed_downward_messages,
282
				dmq_length,
283
			})
284
		}
285

            
286
		Ok(())
287
	}
288

            
289
	/// Prunes the specified number of messages from the downward message queue of the given para.
290
	pub(crate) fn prune_dmq(para: ParaId, processed_downward_messages: u32) -> Weight {
291
		let q_len = DownwardMessageQueues::<T>::mutate(para, |q| {
292
			let processed_downward_messages = processed_downward_messages as usize;
293
			if processed_downward_messages > q.len() {
294
				// reaching this branch is unexpected due to the constraint established by
295
				// `check_processed_downward_messages`. But better be safe than sorry.
296
				q.clear();
297
			} else {
298
				*q = q.split_off(processed_downward_messages);
299
			}
300
			q.len()
301
		});
302

            
303
		let config = configuration::ActiveConfig::<T>::get();
304
		let threshold =
305
			Self::dmq_max_length(config.max_downward_message_size).saturating_div(THRESHOLD_FACTOR);
306
		if q_len <= (threshold as usize) {
307
			Self::decrease_fee_factor(para);
308
		}
309
		T::DbWeight::get().reads_writes(1, 1)
310
	}
311

            
312
	/// Returns the Head of Message Queue Chain for the given para or `None` if there is none
313
	/// associated with it.
314
	#[cfg(test)]
315
	fn dmq_mqc_head(para: ParaId) -> Hash {
316
		DownwardMessageQueueHeads::<T>::get(&para)
317
	}
318

            
319
	/// Returns the number of pending downward messages addressed to the given para.
320
	///
321
	/// Returns 0 if the para doesn't have an associated downward message queue.
322
3045
	pub(crate) fn dmq_length(para: ParaId) -> u32 {
323
3045
		DownwardMessageQueues::<T>::decode_len(&para)
324
3045
			.unwrap_or(0)
325
3045
			.saturated_into::<u32>()
326
3045
	}
327

            
328
4248
	fn dmq_max_length(max_downward_message_size: u32) -> u32 {
329
4248
		MAX_POSSIBLE_ALLOCATION.checked_div(max_downward_message_size).unwrap_or(0)
330
4248
	}
331

            
332
	/// Returns the downward message queue contents for the given para.
333
	///
334
	/// The most recent messages are the latest in the vector.
335
	pub(crate) fn dmq_contents(
336
		recipient: ParaId,
337
	) -> Vec<InboundDownwardMessage<BlockNumberFor<T>>> {
338
		DownwardMessageQueues::<T>::get(&recipient)
339
	}
340
}
341

            
342
impl<T: Config> FeeTracker for Pallet<T> {
343
	type Id = ParaId;
344

            
345
1995
	fn get_fee_factor(id: Self::Id) -> FixedU128 {
346
1995
		DeliveryFeeFactor::<T>::get(id)
347
1995
	}
348

            
349
	fn increase_fee_factor(id: Self::Id, message_size_factor: FixedU128) -> FixedU128 {
350
		DeliveryFeeFactor::<T>::mutate(id, |f| {
351
			*f = f.saturating_mul(EXPONENTIAL_FEE_BASE.saturating_add(message_size_factor));
352
			*f
353
		})
354
	}
355

            
356
	fn decrease_fee_factor(id: Self::Id) -> FixedU128 {
357
		DeliveryFeeFactor::<T>::mutate(id, |f| {
358
			*f = InitialFactor::get().max(*f / EXPONENTIAL_FEE_BASE);
359
			*f
360
		})
361
	}
362
}