1
// Copyright (C) Parity Technologies (UK) Ltd.
2
// This file is part of Polkadot.
3

            
4
// Polkadot is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Polkadot is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
//! The scheduler module for parachains and parathreads.
18
//!
19
//! This module is responsible for two main tasks:
20
//!   - Partitioning validators into groups and assigning groups to parachains and parathreads
21
//!   - Scheduling parachains and parathreads
22
//!
23
//! It aims to achieve these tasks with these goals in mind:
24
//! - It should be possible to know at least a block ahead-of-time, ideally more, which validators
25
//!   are going to be assigned to which parachains.
26
//! - Parachains that have a candidate pending availability in this fork of the chain should not be
27
//!   assigned.
28
//! - Validator assignments should not be gameable. Malicious cartels should not be able to
29
//!   manipulate the scheduler to assign themselves as desired.
30
//! - High or close to optimal throughput of parachains and parathreads. Work among validator groups
31
//!   should be balanced.
32
//!
33
//! The Scheduler manages resource allocation using the concept of "Availability Cores".
34
//! There will be one availability core for each parachain, and a fixed number of cores
35
//! used for multiplexing parathreads. Validators will be partitioned into groups, with the same
36
//! number of groups as availability cores. Validator groups will be assigned to different
37
//! availability cores over time.
38

            
39
use core::iter::Peekable;
40

            
41
use crate::{configuration, initializer::SessionChangeNotification, paras};
42
use alloc::{
43
	collections::{
44
		btree_map::{self, BTreeMap},
45
		vec_deque::VecDeque,
46
	},
47
	vec::Vec,
48
};
49
use frame_support::{pallet_prelude::*, traits::Defensive};
50
use frame_system::pallet_prelude::BlockNumberFor;
51
pub use polkadot_core_primitives::v2::BlockNumber;
52
use polkadot_primitives::{
53
	CoreIndex, GroupIndex, GroupRotationInfo, Id as ParaId, ScheduledCore, ValidatorIndex,
54
};
55
use sp_runtime::traits::One;
56

            
57
pub mod common;
58

            
59
use common::{Assignment, AssignmentProvider};
60

            
61
pub use pallet::*;
62

            
63
#[cfg(test)]
64
mod tests;
65

            
66
const LOG_TARGET: &str = "runtime::parachains::scheduler";
67

            
68
pub mod migration;
69

            
70
#[frame_support::pallet]
71
pub mod pallet {
72
	use super::*;
73

            
74
	const STORAGE_VERSION: StorageVersion = StorageVersion::new(2);
75

            
76
554790
	#[pallet::pallet]
77
	#[pallet::without_storage_info]
78
	#[pallet::storage_version(STORAGE_VERSION)]
79
	pub struct Pallet<T>(_);
80

            
81
	#[pallet::config]
82
	pub trait Config: frame_system::Config + configuration::Config + paras::Config {
83
		type AssignmentProvider: AssignmentProvider<BlockNumberFor<Self>>;
84
	}
85

            
86
	/// All the validator groups. One for each core. Indices are into `ActiveValidators` - not the
87
	/// broader set of Polkadot validators, but instead just the subset used for parachains during
88
	/// this session.
89
	///
90
	/// Bound: The number of cores is the sum of the numbers of parachains and parathread
91
	/// multiplexers. Reasonably, 100-1000. The dominant factor is the number of validators: safe
92
	/// upper bound at 10k.
93
930462
	#[pallet::storage]
94
	pub type ValidatorGroups<T> = StorageValue<_, Vec<Vec<ValidatorIndex>>, ValueQuery>;
95

            
96
	/// One entry for each availability core. The i'th parachain belongs to the i'th core, with the
97
	/// remaining cores all being on demand parachain multiplexers.
98
	///
99
	/// Bounded by the maximum of either of these two values:
100
	///   * The number of parachains and parathread multiplexers
101
	///   * The number of validators divided by `configuration.max_validators_per_core`.
102
4079022
	#[pallet::storage]
103
	pub type AvailabilityCores<T: Config> = StorageValue<_, Vec<CoreOccupiedType<T>>, ValueQuery>;
104

            
105
	/// Representation of a core in `AvailabilityCores`.
106
	///
107
	/// This is not to be confused with `CoreState` which is an enriched variant of this and exposed
108
	/// to the node side. It also provides information about scheduled/upcoming assignments for
109
	/// example and is computed on the fly in the `availability_cores` runtime call.
110
	#[derive(Encode, Decode, TypeInfo, RuntimeDebug, PartialEq)]
111
	pub enum CoreOccupied<N> {
112
3594660
		/// No candidate is waiting availability on this core right now (the core is not occupied).
113
3594660
		Free,
114
		/// A para is currently waiting for availability/inclusion on this core.
115
		Paras(ParasEntry<N>),
116
	}
117

            
118
	/// Convenience type alias for `CoreOccupied`.
119
	pub type CoreOccupiedType<T> = CoreOccupied<BlockNumberFor<T>>;
120

            
121
	impl<N> CoreOccupied<N> {
122
		/// Is core free?
123
		pub fn is_free(&self) -> bool {
124
			matches!(self, Self::Free)
125
		}
126
	}
127

            
128
	/// Reasons a core might be freed.
129
	#[derive(Clone, Copy)]
130
	pub enum FreedReason {
131
		/// The core's work concluded and the parablock assigned to it is considered available.
132
		Concluded,
133
		/// The core's work timed out.
134
		TimedOut,
135
	}
136

            
137
	/// The block number where the session start occurred. Used to track how many group rotations
138
	/// have occurred.
139
	///
140
	/// Note that in the context of parachains modules the session change is signaled during
141
	/// the block and enacted at the end of the block (at the finalization stage, to be exact).
142
	/// Thus for all intents and purposes the effect of the session change is observed at the
143
	/// block following the session change, block number of which we save in this storage value.
144
1005654
	#[pallet::storage]
145
	pub type SessionStartBlock<T: Config> = StorageValue<_, BlockNumberFor<T>, ValueQuery>;
146

            
147
	/// One entry for each availability core. The `VecDeque` represents the assignments to be
148
	/// scheduled on that core. The value contained here will not be valid after the end of
149
	/// a block. Runtime APIs should be used to determine scheduled cores for the upcoming block.
150
1439046
	#[pallet::storage]
151
	pub type ClaimQueue<T: Config> =
152
		StorageValue<_, BTreeMap<CoreIndex, VecDeque<ParasEntryType<T>>>, ValueQuery>;
153

            
154
	/// Assignments as tracked in the claim queue.
155
	#[derive(Encode, Decode, TypeInfo, RuntimeDebug, PartialEq, Clone)]
156
	pub struct ParasEntry<N> {
157
		/// The underlying [`Assignment`].
158
		pub assignment: Assignment,
159
		/// The number of times the entry has timed out in availability already.
160
		pub availability_timeouts: u32,
161
		/// The block height until this entry needs to be backed.
162
		///
163
		/// If missed the entry will be removed from the claim queue without ever having occupied
164
		/// the core.
165
		pub ttl: N,
166
	}
167

            
168
	/// Convenience type declaration for `ParasEntry`.
169
	pub type ParasEntryType<T> = ParasEntry<BlockNumberFor<T>>;
170

            
171
	impl<N> ParasEntry<N> {
172
		/// Create a new `ParasEntry`.
173
		pub fn new(assignment: Assignment, now: N) -> Self {
174
			ParasEntry { assignment, availability_timeouts: 0, ttl: now }
175
		}
176

            
177
		/// Return `Id` from the underlying `Assignment`.
178
		pub fn para_id(&self) -> ParaId {
179
			self.assignment.para_id()
180
		}
181
	}
182

            
183
	/// Availability timeout status of a core.
184
	pub(crate) struct AvailabilityTimeoutStatus<BlockNumber> {
185
		/// Is the core already timed out?
186
		///
187
		/// If this is true the core will be freed at this block.
188
		pub timed_out: bool,
189

            
190
		/// When does this core timeout.
191
		///
192
		/// The block number the core times out. If `timed_out` is true, this will correspond to
193
		/// now (current block number).
194
		pub live_until: BlockNumber,
195
	}
196
}
197

            
198
type PositionInClaimQueue = u32;
199

            
200
struct ClaimQueueIterator<E> {
201
	next_idx: u32,
202
	queue: Peekable<btree_map::IntoIter<CoreIndex, VecDeque<E>>>,
203
}
204

            
205
impl<E> Iterator for ClaimQueueIterator<E> {
206
	type Item = (CoreIndex, VecDeque<E>);
207

            
208
194763
	fn next(&mut self) -> Option<Self::Item> {
209
194763
		let (idx, _) = self.queue.peek()?;
210
		let val = if idx != &CoreIndex(self.next_idx) {
211
			log::trace!(target: LOG_TARGET, "idx did not match claim queue idx: {:?} vs {:?}", idx, self.next_idx);
212
			(CoreIndex(self.next_idx), VecDeque::new())
213
		} else {
214
			let (idx, q) = self.queue.next()?;
215
			(idx, q)
216
		};
217
		self.next_idx += 1;
218
		Some(val)
219
194763
	}
220
}
221

            
222
impl<T: Config> Pallet<T> {
223
	/// Called by the initializer to initialize the scheduler pallet.
224
194763
	pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
225
194763
		Weight::zero()
226
194763
	}
227

            
228
	/// Called by the initializer to finalize the scheduler pallet.
229
194763
	pub(crate) fn initializer_finalize() {}
230

            
231
	/// Called before the initializer notifies of a new session.
232
135234
	pub(crate) fn pre_new_session() {
233
135234
		Self::push_claim_queue_items_to_assignment_provider();
234
135234
		Self::push_occupied_cores_to_assignment_provider();
235
135234
	}
236

            
237
	/// Called by the initializer to note that a new session has started.
238
135234
	pub(crate) fn initializer_on_new_session(
239
135234
		notification: &SessionChangeNotification<BlockNumberFor<T>>,
240
135234
	) {
241
135234
		let SessionChangeNotification { validators, new_config, .. } = notification;
242
135234
		let config = new_config;
243

            
244
135234
		let n_cores = core::cmp::max(
245
135234
			T::AssignmentProvider::session_core_count(),
246
135231
			match config.scheduler_params.max_validators_per_core {
247
135231
				Some(x) if x != 0 => validators.len() as u32 / x,
248
3
				_ => 0,
249
			},
250
		);
251

            
252
180312
		AvailabilityCores::<T>::mutate(|cores| {
253
170126
			cores.resize_with(n_cores as _, || CoreOccupied::Free);
254
180312
		});
255
135234

            
256
135234
		// shuffle validators into groups.
257
135234
		if n_cores == 0 || validators.is_empty() {
258
135234
			ValidatorGroups::<T>::set(Vec::new());
259
135234
		} else {
260
			let group_base_size = validators
261
				.len()
262
				.checked_div(n_cores as usize)
263
				.defensive_proof("n_cores should not be 0")
264
				.unwrap_or(0);
265
			let n_larger_groups = validators
266
				.len()
267
				.checked_rem(n_cores as usize)
268
				.defensive_proof("n_cores should not be 0")
269
				.unwrap_or(0);
270

            
271
			// Groups contain indices into the validators from the session change notification,
272
			// which are already shuffled.
273

            
274
			let mut groups: Vec<Vec<ValidatorIndex>> = Vec::new();
275
			for i in 0..n_larger_groups {
276
				let offset = (group_base_size + 1) * i;
277
				groups.push(
278
					(0..group_base_size + 1)
279
						.map(|j| offset + j)
280
						.map(|j| ValidatorIndex(j as _))
281
						.collect(),
282
				);
283
			}
284

            
285
			for i in 0..(n_cores as usize - n_larger_groups) {
286
				let offset = (n_larger_groups * (group_base_size + 1)) + (i * group_base_size);
287
				groups.push(
288
					(0..group_base_size)
289
						.map(|j| offset + j)
290
						.map(|j| ValidatorIndex(j as _))
291
						.collect(),
292
				);
293
			}
294

            
295
			ValidatorGroups::<T>::set(groups);
296
		}
297

            
298
135234
		let now = frame_system::Pallet::<T>::block_number() + One::one();
299
135234
		SessionStartBlock::<T>::set(now);
300
135234
	}
301

            
302
	/// Free unassigned cores. Provide a list of cores that should be considered newly-freed along
303
	/// with the reason for them being freed. Returns a tuple of concluded and timedout paras.
304
194763
	fn free_cores(
305
194763
		just_freed_cores: impl IntoIterator<Item = (CoreIndex, FreedReason)>,
306
194763
	) -> (BTreeMap<CoreIndex, Assignment>, BTreeMap<CoreIndex, ParasEntryType<T>>) {
307
194763
		let mut timedout_paras: BTreeMap<CoreIndex, ParasEntryType<T>> = BTreeMap::new();
308
194763
		let mut concluded_paras = BTreeMap::new();
309
194763

            
310
259684
		AvailabilityCores::<T>::mutate(|cores| {
311
194763
			let c_len = cores.len();
312
194763

            
313
194763
			just_freed_cores
314
194763
				.into_iter()
315
194763
				.filter(|(freed_index, _)| (freed_index.0 as usize) < c_len)
316
194763
				.for_each(|(freed_index, freed_reason)| {
317
					match core::mem::replace(&mut cores[freed_index.0 as usize], CoreOccupied::Free)
318
					{
319
						CoreOccupied::Free => {},
320
						CoreOccupied::Paras(entry) => {
321
							match freed_reason {
322
								FreedReason::Concluded => {
323
									concluded_paras.insert(freed_index, entry.assignment);
324
								},
325
								FreedReason::TimedOut => {
326
									timedout_paras.insert(freed_index, entry);
327
								},
328
							};
329
						},
330
					};
331
194763
				})
332
259684
		});
333
194763

            
334
194763
		(concluded_paras, timedout_paras)
335
194763
	}
336

            
337
	/// Get an iterator into the claim queues.
338
	///
339
	/// This iterator will have an item for each and every core index up to the maximum core index
340
	/// found in the claim queue. In other words there will be no holes/missing core indices,
341
	/// between core 0 and the maximum, even if the claim queue was missing entries for particular
342
	/// indices in between. (The iterator will return an empty `VecDeque` for those indices.
343
194763
	fn claim_queue_iterator() -> impl Iterator<Item = (CoreIndex, VecDeque<ParasEntryType<T>>)> {
344
194763
		let queues = ClaimQueue::<T>::get();
345
194763
		return ClaimQueueIterator::<ParasEntryType<T>> {
346
194763
			next_idx: 0,
347
194763
			queue: queues.into_iter().peekable(),
348
194763
		}
349
194763
	}
350

            
351
	/// Note that the given cores have become occupied. Update the claim queue accordingly.
352
	/// This will not push a new entry onto the claim queue, so the length after this call will be
353
	/// the expected length - 1. The claim_queue runtime API will take care of adding another entry
354
	/// here, to ensure the right lookahead.
355
194763
	pub(crate) fn occupied(
356
194763
		now_occupied: BTreeMap<CoreIndex, ParaId>,
357
194763
	) -> BTreeMap<CoreIndex, PositionInClaimQueue> {
358
194763
		let mut availability_cores = AvailabilityCores::<T>::get();
359
194763

            
360
194763
		log::debug!(target: LOG_TARGET, "[occupied] now_occupied {:?}", now_occupied);
361

            
362
194763
		let pos_mapping: BTreeMap<CoreIndex, PositionInClaimQueue> = now_occupied
363
194763
			.iter()
364
194763
			.flat_map(|(core_idx, para_id)| {
365
				match Self::remove_from_claim_queue(*core_idx, *para_id) {
366
					Err(e) => {
367
						log::debug!(
368
							target: LOG_TARGET,
369
							"[occupied] error on remove_from_claim queue {}",
370
							e
371
						);
372
						None
373
					},
374
					Ok((pos_in_claim_queue, pe)) => {
375
						availability_cores[core_idx.0 as usize] = CoreOccupied::Paras(pe);
376

            
377
						Some((*core_idx, pos_in_claim_queue))
378
					},
379
				}
380
194763
			})
381
194763
			.collect();
382
194763

            
383
194763
		// Drop expired claims after processing now_occupied.
384
194763
		Self::drop_expired_claims_from_claim_queue();
385
194763

            
386
194763
		AvailabilityCores::<T>::set(availability_cores);
387
194763

            
388
194763
		pos_mapping
389
194763
	}
390

            
391
	/// Iterates through every element in all claim queues and tries to add new assignments from the
392
	/// `AssignmentProvider`. A claim is considered expired if it's `ttl` field is lower than the
393
	/// current block height.
394
194763
	fn drop_expired_claims_from_claim_queue() {
395
194763
		let now = frame_system::Pallet::<T>::block_number();
396
194763
		let availability_cores = AvailabilityCores::<T>::get();
397
194763
		let ttl = configuration::ActiveConfig::<T>::get().scheduler_params.ttl;
398
194763

            
399
259684
		ClaimQueue::<T>::mutate(|cq| {
400
436248
			for (idx, _) in (0u32..).zip(availability_cores) {
401
436248
				let core_idx = CoreIndex(idx);
402
436248
				if let Some(core_claim_queue) = cq.get_mut(&core_idx) {
403
					let mut i = 0;
404
					let mut num_dropped = 0;
405
					while i < core_claim_queue.len() {
406
						let maybe_dropped = if let Some(entry) = core_claim_queue.get(i) {
407
							if entry.ttl < now {
408
								core_claim_queue.remove(i)
409
							} else {
410
								None
411
							}
412
						} else {
413
							None
414
						};
415

            
416
						if let Some(dropped) = maybe_dropped {
417
							num_dropped += 1;
418
							T::AssignmentProvider::report_processed(dropped.assignment);
419
						} else {
420
							i += 1;
421
						}
422
					}
423

            
424
					for _ in 0..num_dropped {
425
						// For all claims dropped due to TTL, attempt to pop a new entry to
426
						// the back of the claim queue.
427
						if let Some(assignment) =
428
							T::AssignmentProvider::pop_assignment_for_core(core_idx)
429
						{
430
							core_claim_queue.push_back(ParasEntry::new(assignment, now + ttl));
431
						}
432
					}
433
436248
				}
434
			}
435
259684
		});
436
194763
	}
437

            
438
	/// Get the validators in the given group, if the group index is valid for this session.
439
	pub(crate) fn group_validators(group_index: GroupIndex) -> Option<Vec<ValidatorIndex>> {
440
		ValidatorGroups::<T>::get().get(group_index.0 as usize).map(|g| g.clone())
441
	}
442

            
443
	/// Get the group assigned to a specific core by index at the current block number. Result
444
	/// undefined if the core index is unknown or the block number is less than the session start
445
	/// index.
446
	pub(crate) fn group_assigned_to_core(
447
		core: CoreIndex,
448
		at: BlockNumberFor<T>,
449
	) -> Option<GroupIndex> {
450
		let config = configuration::ActiveConfig::<T>::get();
451
		let session_start_block = SessionStartBlock::<T>::get();
452

            
453
		if at < session_start_block {
454
			return None
455
		}
456

            
457
		let validator_groups = ValidatorGroups::<T>::get();
458

            
459
		if core.0 as usize >= validator_groups.len() {
460
			return None
461
		}
462

            
463
		let rotations_since_session_start: BlockNumberFor<T> =
464
			(at - session_start_block) / config.scheduler_params.group_rotation_frequency;
465

            
466
		let rotations_since_session_start =
467
			<BlockNumberFor<T> as TryInto<u32>>::try_into(rotations_since_session_start)
468
				.unwrap_or(0);
469
		// Error case can only happen if rotations occur only once every u32::max(),
470
		// so functionally no difference in behavior.
471

            
472
		let group_idx =
473
			(core.0 as usize + rotations_since_session_start as usize) % validator_groups.len();
474
		Some(GroupIndex(group_idx as u32))
475
	}
476

            
477
	/// Returns a predicate that should be used for timing out occupied cores.
478
	///
479
	/// This only ever times out cores that have been occupied across a group rotation boundary.
480
86415
	pub(crate) fn availability_timeout_predicate(
481
86415
	) -> impl Fn(BlockNumberFor<T>) -> AvailabilityTimeoutStatus<BlockNumberFor<T>> {
482
86415
		let config = configuration::ActiveConfig::<T>::get();
483
86415
		let now = frame_system::Pallet::<T>::block_number();
484
86415
		let rotation_info = Self::group_rotation_info(now);
485
86415

            
486
86415
		let next_rotation = rotation_info.next_rotation_at();
487
86415

            
488
86415
		let times_out = Self::availability_timeout_check_required();
489

            
490
		move |pending_since| {
491
			let time_out_at = if times_out {
492
				// We are at the beginning of the rotation, here availability period is relevant.
493
				// Note: blocks backed in this rotation will never time out here as backed_in +
494
				// config.paras_availability_period will always be > now for these blocks, as
495
				// otherwise above condition would not be true.
496
				pending_since + config.scheduler_params.paras_availability_period
497
			} else {
498
				next_rotation + config.scheduler_params.paras_availability_period
499
			};
500

            
501
			AvailabilityTimeoutStatus { timed_out: time_out_at <= now, live_until: time_out_at }
502
		}
503
86415
	}
504

            
505
	/// Is evaluation of `availability_timeout_predicate` necessary at the current block?
506
	///
507
	/// This can be used to avoid calling `availability_timeout_predicate` for each core in case
508
	/// this function returns false.
509
281178
	pub(crate) fn availability_timeout_check_required() -> bool {
510
281178
		let config = configuration::ActiveConfig::<T>::get();
511
281178
		let now = frame_system::Pallet::<T>::block_number() + One::one();
512
281178
		let rotation_info = Self::group_rotation_info(now);
513
281178

            
514
281178
		let current_window =
515
281178
			rotation_info.last_rotation_at() + config.scheduler_params.paras_availability_period;
516
281178
		now < current_window
517
281178
	}
518

            
519
	/// Returns a helper for determining group rotation.
520
367593
	pub(crate) fn group_rotation_info(
521
367593
		now: BlockNumberFor<T>,
522
367593
	) -> GroupRotationInfo<BlockNumberFor<T>> {
523
367593
		let session_start_block = SessionStartBlock::<T>::get();
524
367593
		let group_rotation_frequency = configuration::ActiveConfig::<T>::get()
525
367593
			.scheduler_params
526
367593
			.group_rotation_frequency;
527
367593

            
528
367593
		GroupRotationInfo { session_start_block, now, group_rotation_frequency }
529
367593
	}
530

            
531
	/// Return the next thing that will be scheduled on this core assuming it is currently
532
	/// occupied and the candidate occupying it became available.
533
	pub(crate) fn next_up_on_available(core: CoreIndex) -> Option<ScheduledCore> {
534
		ClaimQueue::<T>::get()
535
			.get(&core)
536
			.and_then(|a| a.front().map(|pe| Self::paras_entry_to_scheduled_core(pe)))
537
	}
538

            
539
	fn paras_entry_to_scheduled_core(pe: &ParasEntryType<T>) -> ScheduledCore {
540
		ScheduledCore { para_id: pe.para_id(), collator: None }
541
	}
542

            
543
	/// Return the next thing that will be scheduled on this core assuming it is currently
544
	/// occupied and the candidate occupying it times out.
545
	pub(crate) fn next_up_on_time_out(core: CoreIndex) -> Option<ScheduledCore> {
546
		let max_availability_timeouts = configuration::ActiveConfig::<T>::get()
547
			.scheduler_params
548
			.max_availability_timeouts;
549
		Self::next_up_on_available(core).or_else(|| {
550
			// Or, if none, the claim currently occupying the core,
551
			// as it would be put back on the queue after timing out if number of retries is not at
552
			// the maximum.
553
			let cores = AvailabilityCores::<T>::get();
554
			cores.get(core.0 as usize).and_then(|c| match c {
555
				CoreOccupied::Free => None,
556
				CoreOccupied::Paras(pe) =>
557
					if pe.availability_timeouts < max_availability_timeouts {
558
						Some(Self::paras_entry_to_scheduled_core(pe))
559
					} else {
560
						None
561
					},
562
			})
563
		})
564
	}
565

            
566
	/// Pushes occupied cores to the assignment provider.
567
135234
	fn push_occupied_cores_to_assignment_provider() {
568
180312
		AvailabilityCores::<T>::mutate(|cores| {
569
436249
			for core in cores.iter_mut() {
570
436248
				match core::mem::replace(core, CoreOccupied::Free) {
571
436248
					CoreOccupied::Free => continue,
572
					CoreOccupied::Paras(entry) => {
573
						Self::maybe_push_assignment(entry);
574
					},
575
				}
576
			}
577
180312
		});
578
135234
	}
579

            
580
	// on new session
581
135234
	fn push_claim_queue_items_to_assignment_provider() {
582
135234
		for (_, claim_queue) in ClaimQueue::<T>::take() {
583
			// Push back in reverse order so that when we pop from the provider again,
584
			// the entries in the claim queue are in the same order as they are right now.
585
			for para_entry in claim_queue.into_iter().rev() {
586
				Self::maybe_push_assignment(para_entry);
587
			}
588
		}
589
135234
	}
590

            
591
	/// Push assignments back to the provider on session change unless the paras
592
	/// timed out on availability before.
593
	fn maybe_push_assignment(pe: ParasEntryType<T>) {
594
		if pe.availability_timeouts == 0 {
595
			T::AssignmentProvider::push_back_assignment(pe.assignment);
596
		}
597
	}
598

            
599
	/// Frees cores and fills the free claim queue spots by popping from the `AssignmentProvider`.
600
194763
	pub fn free_cores_and_fill_claim_queue(
601
194763
		just_freed_cores: impl IntoIterator<Item = (CoreIndex, FreedReason)>,
602
194763
		now: BlockNumberFor<T>,
603
194763
	) {
604
194763
		let (mut concluded_paras, mut timedout_paras) = Self::free_cores(just_freed_cores);
605
194763

            
606
194763
		// This can only happen on new sessions at which we move all assignments back to the
607
194763
		// provider. Hence, there's nothing we need to do here.
608
194763
		if ValidatorGroups::<T>::decode_len().map_or(true, |l| l == 0) {
609
194763
			return
610
		}
611
		let n_session_cores = T::AssignmentProvider::session_core_count();
612
		let cq = ClaimQueue::<T>::get();
613
		let config = configuration::ActiveConfig::<T>::get();
614
		// Extra sanity, config should already never be smaller than 1:
615
		let n_lookahead = config.scheduler_params.lookahead.max(1);
616
		let max_availability_timeouts = config.scheduler_params.max_availability_timeouts;
617
		let ttl = config.scheduler_params.ttl;
618

            
619
		for core_idx in 0..n_session_cores {
620
			let core_idx = CoreIndex::from(core_idx);
621

            
622
			let n_lookahead_used = cq.get(&core_idx).map_or(0, |v| v.len() as u32);
623

            
624
			// add previously timedout paras back into the queue
625
			if let Some(mut entry) = timedout_paras.remove(&core_idx) {
626
				if entry.availability_timeouts < max_availability_timeouts {
627
					// Increment the timeout counter.
628
					entry.availability_timeouts += 1;
629
					if n_lookahead_used < n_lookahead {
630
						entry.ttl = now + ttl;
631
					} else {
632
						// Over max capacity, we need to bump ttl (we exceeded the claim queue
633
						// size, so otherwise the entry might get dropped before reaching the top):
634
						entry.ttl = now + ttl + One::one();
635
					}
636
					Self::add_to_claim_queue(core_idx, entry);
637
					// The claim has been added back into the claim queue.
638
					// Do not pop another assignment for the core.
639
					continue
640
				} else {
641
					// Consider timed out assignments for on demand parachains as concluded for
642
					// the assignment provider
643
					let ret = concluded_paras.insert(core_idx, entry.assignment);
644
					debug_assert!(ret.is_none());
645
				}
646
			}
647

            
648
			if let Some(concluded_para) = concluded_paras.remove(&core_idx) {
649
				T::AssignmentProvider::report_processed(concluded_para);
650
			}
651
			for _ in n_lookahead_used..n_lookahead {
652
				if let Some(assignment) = T::AssignmentProvider::pop_assignment_for_core(core_idx) {
653
					Self::add_to_claim_queue(core_idx, ParasEntry::new(assignment, now + ttl));
654
				}
655
			}
656
		}
657

            
658
		debug_assert!(timedout_paras.is_empty());
659
		debug_assert!(concluded_paras.is_empty());
660
194763
	}
661

            
662
	fn add_to_claim_queue(core_idx: CoreIndex, pe: ParasEntryType<T>) {
663
		ClaimQueue::<T>::mutate(|la| {
664
			la.entry(core_idx).or_default().push_back(pe);
665
		});
666
	}
667

            
668
	/// Returns `ParasEntry` with `para_id` at `core_idx` if found.
669
	fn remove_from_claim_queue(
670
		core_idx: CoreIndex,
671
		para_id: ParaId,
672
	) -> Result<(PositionInClaimQueue, ParasEntryType<T>), &'static str> {
673
		ClaimQueue::<T>::mutate(|cq| {
674
			let core_claims = cq.get_mut(&core_idx).ok_or("core_idx not found in lookahead")?;
675

            
676
			let pos = core_claims
677
				.iter()
678
				.position(|pe| pe.para_id() == para_id)
679
				.ok_or("para id not found at core_idx lookahead")?;
680

            
681
			let pe = core_claims.remove(pos).ok_or("remove returned None")?;
682

            
683
			Ok((pos as u32, pe))
684
		})
685
	}
686

            
687
	/// Paras scheduled next in the claim queue.
688
	pub(crate) fn scheduled_paras() -> impl Iterator<Item = (CoreIndex, ParaId)> {
689
		let claim_queue = ClaimQueue::<T>::get();
690
		claim_queue
691
			.into_iter()
692
			.filter_map(|(core_idx, v)| v.front().map(|e| (core_idx, e.assignment.para_id())))
693
	}
694

            
695
	/// Paras that may get backed on cores.
696
	///
697
	/// 1. The para must be scheduled on core.
698
	/// 2. Core needs to be free, otherwise backing is not possible.
699
194763
	pub(crate) fn eligible_paras() -> impl Iterator<Item = (CoreIndex, ParaId)> {
700
194763
		let availability_cores = AvailabilityCores::<T>::get();
701
194763

            
702
194763
		Self::claim_queue_iterator().zip(availability_cores.into_iter()).filter_map(
703
194763
			|((core_idx, queue), core)| {
704
				if core != CoreOccupied::Free {
705
					return None
706
				}
707
				let next_scheduled = queue.front()?;
708
				Some((core_idx, next_scheduled.assignment.para_id()))
709
194763
			},
710
194763
		)
711
194763
	}
712

            
713
	#[cfg(any(feature = "try-runtime", test))]
714
	fn claim_queue_len() -> usize {
715
		ClaimQueue::<T>::get().iter().map(|la_vec| la_vec.1.len()).sum()
716
	}
717

            
718
	#[cfg(all(not(feature = "runtime-benchmarks"), test))]
719
	pub(crate) fn claim_queue_is_empty() -> bool {
720
		Self::claim_queue_len() == 0
721
	}
722

            
723
	#[cfg(test)]
724
	pub(crate) fn set_validator_groups(validator_groups: Vec<Vec<ValidatorIndex>>) {
725
		ValidatorGroups::<T>::set(validator_groups);
726
	}
727

            
728
	#[cfg(test)]
729
	pub(crate) fn set_claim_queue(claim_queue: BTreeMap<CoreIndex, VecDeque<ParasEntryType<T>>>) {
730
		ClaimQueue::<T>::set(claim_queue);
731
	}
732
}