1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
#![doc = include_str!("../README.md")]
18
#![cfg_attr(not(feature = "std"), no_std)]
19

            
20
#[cfg(test)]
21
mod mock;
22

            
23
#[cfg(test)]
24
mod tests;
25

            
26
#[cfg(feature = "runtime-benchmarks")]
27
mod benchmarking;
28

            
29
pub mod weights;
30
pub use weights::WeightInfo;
31

            
32
use {
33
    core::cmp::min,
34
    frame_support::{
35
        dispatch::DispatchErrorWithPostInfo,
36
        pallet,
37
        pallet_prelude::*,
38
        storage::types::{StorageDoubleMap, StorageMap},
39
        traits::{
40
            fungible::{Inspect, MutateHold},
41
            tokens::{Balance, Precision},
42
        },
43
        Blake2_128Concat,
44
    },
45
    frame_system::pallet_prelude::*,
46
    parity_scale_codec::{FullCodec, MaxEncodedLen},
47
    scale_info::TypeInfo,
48
    serde::{Deserialize, Serialize},
49
    sp_runtime::{
50
        traits::{AtLeast32BitUnsigned, CheckedAdd, CheckedSub, One, Saturating, Zero},
51
        ArithmeticError,
52
    },
53
    sp_std::{fmt::Debug, marker::PhantomData},
54
};
55

            
56
pub use pallet::*;
57

            
58
/// Type able to provide the current time for given unit.
59
/// For each unit the returned number should monotonically increase and not
60
/// overflow.
61
pub trait TimeProvider<Unit, Number> {
62
    fn now(unit: &Unit) -> Option<Number>;
63

            
64
    /// Benchmarks: should return the time unit which has the worst performance calling
65
    /// `TimeProvider::now(unit)` with.
66
    #[cfg(feature = "runtime-benchmarks")]
67
    fn bench_worst_case_time_unit() -> Unit;
68

            
69
    /// Benchmarks: sets the "now" time for time unit returned by `bench_worst_case_time_unit`.
70
    #[cfg(feature = "runtime-benchmarks")]
71
    fn bench_set_now(instant: Number);
72
}
73

            
74
/// Interactions the pallet needs with assets.
75
pub trait Assets<AccountId, AssetId, Balance> {
76
    /// Transfer assets deposited by an account to another account.
77
    /// Those assets should not be considered deposited in the target account.
78
    fn transfer_deposit(
79
        asset_id: &AssetId,
80
        from: &AccountId,
81
        to: &AccountId,
82
        amount: Balance,
83
    ) -> DispatchResult;
84

            
85
    /// Increase the deposit for an account and asset id. Should fail if account doesn't have
86
    /// enough of that asset. Funds should be safe and not slashable.
87
    fn increase_deposit(asset_id: &AssetId, account: &AccountId, amount: Balance)
88
        -> DispatchResult;
89

            
90
    /// Decrease the deposit for an account and asset id. Should fail on underflow.
91
    fn decrease_deposit(asset_id: &AssetId, account: &AccountId, amount: Balance)
92
        -> DispatchResult;
93

            
94
    /// Return the deposit for given asset and account.
95
    fn get_deposit(asset_id: &AssetId, account: &AccountId) -> Balance;
96

            
97
    /// Benchmarks: should return the asset id which has the worst performance when interacting
98
    /// with it.
99
    #[cfg(feature = "runtime-benchmarks")]
100
    fn bench_worst_case_asset_id() -> AssetId;
101

            
102
    /// Benchmarks: should return the another asset id which has the worst performance when interacting
103
    /// with it afther `bench_worst_case_asset_id`. This is to benchmark the worst case when changing config
104
    /// from one asset to another. If there is only one asset id it is fine to return it in both
105
    /// `bench_worst_case_asset_id` and `bench_worst_case_asset_id2`.
106
    #[cfg(feature = "runtime-benchmarks")]
107
    fn bench_worst_case_asset_id2() -> AssetId;
108

            
109
    /// Benchmarks: should set the balance.
110
    #[cfg(feature = "runtime-benchmarks")]
111
    fn bench_set_balance(asset_id: &AssetId, account: &AccountId, amount: Balance);
112
}
113

            
114
#[pallet]
115
pub mod pallet {
116
    use super::*;
117

            
118
    /// Pooled Staking pallet.
119
    #[pallet::pallet]
120
    pub struct Pallet<T>(PhantomData<T>);
121

            
122
    #[pallet::config]
123
    pub trait Config: frame_system::Config {
124
        /// Overarching event type
125
        type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
126

            
127
        /// Type used to represent stream ids. Should be large enough to not overflow.
128
        type StreamId: AtLeast32BitUnsigned
129
            + Default
130
            + Debug
131
            + Copy
132
            + Clone
133
            + FullCodec
134
            + TypeInfo
135
            + MaxEncodedLen;
136

            
137
        /// The balance type, which is also the type representing time (as this
138
        /// pallet will do math with both time and balances to compute how
139
        /// much should be paid).
140
        type Balance: Balance;
141

            
142
        /// Type representing an asset id, a identifier allowing distinguishing assets.
143
        type AssetId: Debug + Clone + FullCodec + TypeInfo + MaxEncodedLen + PartialEq + Eq;
144

            
145
        /// Provide interaction with assets.
146
        type Assets: Assets<Self::AccountId, Self::AssetId, Self::Balance>;
147

            
148
        /// Currency for the opening balance hold for the storage used by the Stream.
149
        /// NOT to be confused with Assets.
150
        type Currency: Inspect<Self::AccountId, Balance = Self::Balance>
151
            + MutateHold<Self::AccountId, Reason = Self::RuntimeHoldReason>;
152

            
153
        type RuntimeHoldReason: From<HoldReason>;
154

            
155
        #[pallet::constant]
156
        type OpenStreamHoldAmount: Get<Self::Balance>;
157

            
158
        /// Represents which units of time can be used. Designed to be an enum
159
        /// with a variant for each kind of time source/scale supported.
160
        type TimeUnit: Debug + Clone + FullCodec + TypeInfo + MaxEncodedLen + Eq;
161

            
162
        /// Provide the current time in given unit.
163
        type TimeProvider: TimeProvider<Self::TimeUnit, Self::Balance>;
164

            
165
        type WeightInfo: weights::WeightInfo;
166
    }
167

            
168
    type AccountIdOf<T> = <T as frame_system::Config>::AccountId;
169
    type AssetIdOf<T> = <T as Config>::AssetId;
170

            
171
    pub type RequestNonce = u32;
172

            
173
    /// A stream payment from source to target.
174
    /// Stores the last time the stream was updated, which allows to compute
175
    /// elapsed time and perform payment.
176
    #[derive(
177
        RuntimeDebug,
178
        PartialEq,
179
        Eq,
180
        Encode,
181
        Decode,
182
        Clone,
183
        TypeInfo,
184
        Serialize,
185
        Deserialize,
186
        MaxEncodedLen,
187
    )]
188
    pub struct Stream<AccountId, Unit, AssetId, Balance> {
189
        /// Payer, source of the stream.
190
        pub source: AccountId,
191
        /// Payee, target of the stream.
192
        pub target: AccountId,
193
        /// Steam config (time unit, asset id, rate)
194
        pub config: StreamConfig<Unit, AssetId, Balance>,
195
        /// How much is deposited to fund this stream.
196
        pub deposit: Balance,
197
        /// Last time the stream was updated in `config.time_unit`.
198
        pub last_time_updated: Balance,
199
        /// Nonce for requests. This prevents a request to make a first request
200
        /// then change it to another request to frontrun the other party
201
        /// accepting.
202
        pub request_nonce: RequestNonce,
203
        /// A pending change request if any.
204
        pub pending_request: Option<ChangeRequest<Unit, AssetId, Balance>>,
205
        /// One-time opening deposit. Will be released on close.
206
        pub opening_deposit: Balance,
207
    }
208

            
209
    impl<AccountId: PartialEq, Unit, AssetId, Balance> Stream<AccountId, Unit, AssetId, Balance> {
210
        pub fn account_to_party(&self, account: AccountId) -> Option<Party> {
211
            match account {
212
                a if a == self.source => Some(Party::Source),
213
                a if a == self.target => Some(Party::Target),
214
                _ => None,
215
            }
216
        }
217
    }
218

            
219
    /// Stream configuration.
220
    #[derive(
221
        RuntimeDebug,
222
        PartialEq,
223
        Eq,
224
        Encode,
225
        Decode,
226
        Copy,
227
        Clone,
228
        TypeInfo,
229
        Serialize,
230
        Deserialize,
231
        MaxEncodedLen,
232
    )]
233
    pub struct StreamConfig<Unit, AssetId, Balance> {
234
        /// Unit in which time is measured using a `TimeProvider`.
235
        pub time_unit: Unit,
236
        /// Asset used for payment.
237
        pub asset_id: AssetId,
238
        /// Amount of asset / unit.
239
        pub rate: Balance,
240
    }
241

            
242
    /// Origin of a change request.
243
    #[derive(
244
        RuntimeDebug,
245
        PartialEq,
246
        Eq,
247
        Encode,
248
        Decode,
249
        Copy,
250
        Clone,
251
        TypeInfo,
252
        Serialize,
253
        Deserialize,
254
        MaxEncodedLen,
255
    )]
256
    pub enum Party {
257
        Source,
258
        Target,
259
    }
260

            
261
    impl Party {
262
        pub fn inverse(self) -> Self {
263
            match self {
264
                Party::Source => Party::Target,
265
                Party::Target => Party::Source,
266
            }
267
        }
268
    }
269

            
270
    /// Kind of change requested.
271
    #[derive(
272
        RuntimeDebug,
273
        PartialEq,
274
        Eq,
275
        Encode,
276
        Decode,
277
        Copy,
278
        Clone,
279
        TypeInfo,
280
        Serialize,
281
        Deserialize,
282
        MaxEncodedLen,
283
    )]
284
    pub enum ChangeKind<Time> {
285
        /// The requested change is a suggestion, and the other party doesn't
286
        /// need to accept it.
287
        Suggestion,
288
        /// The requested change is mandatory, and the other party must either
289
        /// accept the change or close the stream. Reaching the deadline will
290
        /// close the stream too.
291
        Mandatory { deadline: Time },
292
    }
293

            
294
    /// Describe how the deposit should change.
295
    #[derive(
296
        RuntimeDebug,
297
        PartialEq,
298
        Eq,
299
        Encode,
300
        Decode,
301
        Copy,
302
        Clone,
303
        TypeInfo,
304
        Serialize,
305
        Deserialize,
306
        MaxEncodedLen,
307
    )]
308
    pub enum DepositChange<Balance> {
309
        /// Increase deposit by given amount.
310
        Increase(Balance),
311
        /// Decrease deposit by given amount.
312
        Decrease(Balance),
313
        /// Set deposit to given amount.
314
        Absolute(Balance),
315
    }
316

            
317
    /// A request to change a stream config.
318
    #[derive(
319
        RuntimeDebug,
320
        PartialEq,
321
        Eq,
322
        Encode,
323
        Decode,
324
        Clone,
325
        TypeInfo,
326
        Serialize,
327
        Deserialize,
328
        MaxEncodedLen,
329
    )]
330
    pub struct ChangeRequest<Unit, AssetId, Balance> {
331
        pub requester: Party,
332
        pub kind: ChangeKind<Balance>,
333
        pub new_config: StreamConfig<Unit, AssetId, Balance>,
334
        pub deposit_change: Option<DepositChange<Balance>>,
335
    }
336

            
337
    pub type StreamOf<T> =
338
        Stream<AccountIdOf<T>, <T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
339

            
340
    pub type StreamConfigOf<T> =
341
        StreamConfig<<T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
342

            
343
    pub type ChangeRequestOf<T> =
344
        ChangeRequest<<T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
345

            
346
    #[derive(Debug, Copy, Clone, PartialEq, Eq)]
347
    pub struct StreamPaymentStatus<Balance> {
348
        pub payment: Balance,
349
        pub deposit_left: Balance,
350
        /// Whenever the stream is stalled, which can occur either when no funds are left or
351
        /// if the time is past a mandatory request deadline.
352
        pub stalled: bool,
353
    }
354

            
355
    /// Store the next available stream id.
356
    #[pallet::storage]
357
    pub type NextStreamId<T: Config> = StorageValue<Value = T::StreamId, QueryKind = ValueQuery>;
358

            
359
    /// Store each stream indexed by an Id.
360
    #[pallet::storage]
361
    pub type Streams<T: Config> = StorageMap<
362
        Hasher = Blake2_128Concat,
363
        Key = T::StreamId,
364
        Value = StreamOf<T>,
365
        QueryKind = OptionQuery,
366
    >;
367

            
368
    /// Lookup for all streams with given source.
369
    /// To avoid maintaining a growing list of stream ids, they are stored in
370
    /// the form of an entry (AccountId, StreamId). If such entry exists then
371
    /// this AccountId is a source in StreamId. One can iterate over all storage
372
    /// keys starting with the AccountId to find all StreamIds.
373
    #[pallet::storage]
374
    pub type LookupStreamsWithSource<T: Config> = StorageDoubleMap<
375
        Key1 = AccountIdOf<T>,
376
        Hasher1 = Blake2_128Concat,
377
        Key2 = T::StreamId,
378
        Hasher2 = Blake2_128Concat,
379
        Value = (),
380
        QueryKind = OptionQuery,
381
    >;
382

            
383
    /// Lookup for all streams with given target.
384
    /// To avoid maintaining a growing list of stream ids, they are stored in
385
    /// the form of an entry (AccountId, StreamId). If such entry exists then
386
    /// this AccountId is a target in StreamId. One can iterate over all storage
387
    /// keys starting with the AccountId to find all StreamIds.
388
    #[pallet::storage]
389
    pub type LookupStreamsWithTarget<T: Config> = StorageDoubleMap<
390
        Key1 = AccountIdOf<T>,
391
        Hasher1 = Blake2_128Concat,
392
        Key2 = T::StreamId,
393
        Hasher2 = Blake2_128Concat,
394
        Value = (),
395
        QueryKind = OptionQuery,
396
    >;
397

            
398
    #[pallet::error]
399
    #[derive(Clone, PartialEq, Eq)]
400
    pub enum Error<T> {
401
        UnknownStreamId,
402
        StreamIdOverflow,
403
        UnauthorizedOrigin,
404
        CantBeBothSourceAndTarget,
405
        CantFetchCurrentTime,
406
        SourceCantDecreaseRate,
407
        TargetCantIncreaseRate,
408
        CantOverrideMandatoryChange,
409
        NoPendingRequest,
410
        CantAcceptOwnRequest,
411
        CanOnlyCancelOwnRequest,
412
        WrongRequestNonce,
413
        ChangingAssetRequiresAbsoluteDepositChange,
414
        TargetCantChangeDeposit,
415
        ImmediateDepositChangeRequiresSameAssetId,
416
        DeadlineCantBeInPast,
417
        CantFetchStatusBeforeLastTimeUpdated,
418
    }
419

            
420
    #[pallet::event]
421
    #[pallet::generate_deposit(pub(super) fn deposit_event)]
422
    pub enum Event<T: Config> {
423
        StreamOpened {
424
            stream_id: T::StreamId,
425
        },
426
        StreamClosed {
427
            stream_id: T::StreamId,
428
            refunded: T::Balance,
429
        },
430
        StreamPayment {
431
            stream_id: T::StreamId,
432
            source: AccountIdOf<T>,
433
            target: AccountIdOf<T>,
434
            amount: T::Balance,
435
            stalled: bool,
436
        },
437
        StreamConfigChangeRequested {
438
            stream_id: T::StreamId,
439
            request_nonce: RequestNonce,
440
            requester: Party,
441
            old_config: StreamConfigOf<T>,
442
            new_config: StreamConfigOf<T>,
443
        },
444
        StreamConfigChanged {
445
            stream_id: T::StreamId,
446
            old_config: StreamConfigOf<T>,
447
            new_config: StreamConfigOf<T>,
448
            deposit_change: Option<DepositChange<T::Balance>>,
449
        },
450
    }
451

            
452
    /// Freeze reason to use if needed.
453
    #[pallet::composite_enum]
454
    pub enum FreezeReason {
455
        StreamPayment,
456
    }
457

            
458
    /// Hold reason to use if needed.
459
    #[pallet::composite_enum]
460
    pub enum HoldReason {
461
        StreamPayment,
462
        StreamOpened,
463
    }
464

            
465
    #[pallet::call]
466
    impl<T: Config> Pallet<T> {
467
        /// Create a payment stream from the origin to the target with provided config
468
        /// and initial deposit (in the asset defined in the config).
469
        #[pallet::call_index(0)]
470
        #[pallet::weight(T::WeightInfo::open_stream())]
471
        pub fn open_stream(
472
            origin: OriginFor<T>,
473
            target: AccountIdOf<T>,
474
            config: StreamConfigOf<T>,
475
            initial_deposit: T::Balance,
476
        ) -> DispatchResultWithPostInfo {
477
            let origin = ensure_signed(origin)?;
478

            
479
            let _stream_id = Self::open_stream_returns_id(origin, target, config, initial_deposit)?;
480

            
481
            Ok(().into())
482
        }
483

            
484
        /// Close a given stream in which the origin is involved. It performs the pending payment
485
        /// before closing the stream.
486
        #[pallet::call_index(1)]
487
        #[pallet::weight(T::WeightInfo::close_stream())]
488
        pub fn close_stream(
489
            origin: OriginFor<T>,
490
            stream_id: T::StreamId,
491
        ) -> DispatchResultWithPostInfo {
492
            let origin = ensure_signed(origin)?;
493
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
494

            
495
            // Only source or target can close a stream.
496
            ensure!(
497
                origin == stream.source || origin == stream.target,
498
                Error::<T>::UnauthorizedOrigin
499
            );
500

            
501
            // Update stream before closing it to ensure fair payment.
502
            Self::perform_stream_payment(stream_id, &mut stream)?;
503

            
504
            // Unfreeze funds left in the stream.
505
            T::Assets::decrease_deposit(&stream.config.asset_id, &stream.source, stream.deposit)?;
506

            
507
            // Release opening deposit
508
            if stream.opening_deposit > 0u32.into() {
509
                T::Currency::release(
510
                    &HoldReason::StreamOpened.into(),
511
                    &stream.source,
512
                    stream.opening_deposit,
513
                    Precision::Exact,
514
                )?;
515
            }
516

            
517
            // Remove stream from storage.
518
            Streams::<T>::remove(stream_id);
519
            LookupStreamsWithSource::<T>::remove(stream.source, stream_id);
520
            LookupStreamsWithTarget::<T>::remove(stream.target, stream_id);
521

            
522
            // Emit event.
523
            Pallet::<T>::deposit_event(Event::<T>::StreamClosed {
524
                stream_id,
525
                refunded: stream.deposit.saturating_add(stream.opening_deposit),
526
            });
527

            
528
            Ok(().into())
529
        }
530

            
531
        /// Perform the pending payment of a stream. Anyone can call this.
532
        #[pallet::call_index(2)]
533
        #[pallet::weight(T::WeightInfo::perform_payment())]
534
        pub fn perform_payment(
535
            origin: OriginFor<T>,
536
            stream_id: T::StreamId,
537
        ) -> DispatchResultWithPostInfo {
538
            // No problem with anyone updating any stream.
539
            let _ = ensure_signed(origin)?;
540

            
541
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
542
            Self::perform_stream_payment(stream_id, &mut stream)?;
543
            Streams::<T>::insert(stream_id, stream);
544

            
545
            Ok(().into())
546
        }
547

            
548
        /// Requests a change to a stream config or deposit.
549
        ///
550
        /// If the new config don't change the time unit and asset id, the change will be applied
551
        /// immediately if it is at the desadvantage of the caller. Otherwise, the request is stored
552
        /// in the stream and will have to be approved by the other party.
553
        ///
554
        /// This call accepts a deposit change, which can only be provided by the source of the
555
        /// stream. An absolute change is required when changing asset id, as the current deposit
556
        /// will be released and a new deposit is required in the new asset.
557
        #[pallet::call_index(3)]
558
        #[pallet::weight(
559
            T::WeightInfo::request_change_immediate()
560
            .max(T::WeightInfo::request_change_delayed())
561
        )]
562
        pub fn request_change(
563
            origin: OriginFor<T>,
564
            stream_id: T::StreamId,
565
            kind: ChangeKind<T::Balance>,
566
            new_config: StreamConfigOf<T>,
567
            deposit_change: Option<DepositChange<T::Balance>>,
568
        ) -> DispatchResultWithPostInfo {
569
            let origin = ensure_signed(origin)?;
570
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
571

            
572
            let requester = stream
573
                .account_to_party(origin)
574
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
575

            
576
            ensure!(
577
                requester == Party::Source || deposit_change.is_none(),
578
                Error::<T>::TargetCantChangeDeposit
579
            );
580

            
581
            if stream.config == new_config && deposit_change.is_none() {
582
                return Ok(().into());
583
            }
584

            
585
            if let ChangeKind::Mandatory { deadline } = kind {
586
                let now = T::TimeProvider::now(&stream.config.time_unit)
587
                    .ok_or(Error::<T>::CantFetchCurrentTime)?;
588

            
589
                ensure!(deadline >= now, Error::<T>::DeadlineCantBeInPast);
590
            }
591

            
592
            // If asset id and time unit are the same, we allow to make the change
593
            // immediatly if the origin is at a disadvantage.
594
            // We allow this even if there is already a pending request.
595
            if Self::maybe_immediate_change(
596
                stream_id,
597
                &mut stream,
598
                &new_config,
599
                deposit_change,
600
                requester,
601
            )? {
602
                return Ok(().into());
603
            }
604

            
605
            // If the source is requesting a change of asset, they must provide an absolute change.
606
            if requester == Party::Source
607
                && new_config.asset_id != stream.config.asset_id
608
                && !matches!(deposit_change, Some(DepositChange::Absolute(_)))
609
            {
610
                Err(Error::<T>::ChangingAssetRequiresAbsoluteDepositChange)?;
611
            }
612

            
613
            // If there is already a mandatory change request, only the origin
614
            // of this request can change it.
615
            if let Some(ChangeRequest {
616
                kind: ChangeKind::Mandatory { .. },
617
                requester: pending_requester,
618
                ..
619
            }) = &stream.pending_request
620
            {
621
                ensure!(
622
                    &requester == pending_requester,
623
                    Error::<T>::CantOverrideMandatoryChange
624
                );
625
            }
626

            
627
            stream.request_nonce = stream.request_nonce.wrapping_add(1);
628
            stream.pending_request = Some(ChangeRequest {
629
                requester,
630
                kind,
631
                new_config: new_config.clone(),
632
                deposit_change,
633
            });
634

            
635
            // Emit event.
636
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChangeRequested {
637
                stream_id,
638
                request_nonce: stream.request_nonce,
639
                requester,
640
                old_config: stream.config.clone(),
641
                new_config,
642
            });
643

            
644
            // Update storage.
645
            Streams::<T>::insert(stream_id, stream);
646

            
647
            Ok(().into())
648
        }
649

            
650
        /// Accepts a change requested before by the other party. Takes a nonce to prevent
651
        /// frontrunning attacks. If the target made a request, the source is able to change their
652
        /// deposit.
653
        #[pallet::call_index(4)]
654
        #[pallet::weight(T::WeightInfo::accept_requested_change())]
655
        pub fn accept_requested_change(
656
            origin: OriginFor<T>,
657
            stream_id: T::StreamId,
658
            request_nonce: RequestNonce,
659
            deposit_change: Option<DepositChange<T::Balance>>,
660
        ) -> DispatchResultWithPostInfo {
661
            let origin = ensure_signed(origin)?;
662
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
663

            
664
            let accepter = stream
665
                .account_to_party(origin)
666
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
667

            
668
            let Some(request) = stream.pending_request.clone() else {
669
                return Err(Error::<T>::NoPendingRequest.into());
670
            };
671

            
672
            ensure!(
673
                request_nonce == stream.request_nonce,
674
                Error::<T>::WrongRequestNonce
675
            );
676
            ensure!(
677
                accepter != request.requester,
678
                Error::<T>::CantAcceptOwnRequest
679
            );
680

            
681
            ensure!(
682
                accepter == Party::Source || deposit_change.is_none(),
683
                Error::<T>::TargetCantChangeDeposit
684
            );
685

            
686
            // Perform pending payment before changing config.
687
            Self::perform_stream_payment(stream_id, &mut stream)?;
688

            
689
            // Apply change.
690
            let deposit_change = deposit_change.or(request.deposit_change);
691
            match (
692
                stream.config.asset_id == request.new_config.asset_id,
693
                deposit_change,
694
            ) {
695
                // Same asset and a change, we apply it like in `change_deposit` call.
696
                (true, Some(change)) => {
697
                    Self::apply_deposit_change(&mut stream, change)?;
698
                }
699
                // Same asset and no change, no problem.
700
                (true, None) => (),
701
                // Change in asset with absolute new amount
702
                (false, Some(DepositChange::Absolute(amount))) => {
703
                    // Release deposit in old asset.
704
                    T::Assets::decrease_deposit(
705
                        &stream.config.asset_id,
706
                        &stream.source,
707
                        stream.deposit,
708
                    )?;
709

            
710
                    // Make deposit in new asset.
711
                    T::Assets::increase_deposit(
712
                        &request.new_config.asset_id,
713
                        &stream.source,
714
                        amount,
715
                    )?;
716
                    stream.deposit = amount;
717
                }
718
                // It doesn't make sense to change asset while not providing an absolute new
719
                // amount.
720
                (false, _) => Err(Error::<T>::ChangingAssetRequiresAbsoluteDepositChange)?,
721
            }
722

            
723
            // If time unit changes we need to update `last_time_updated` to be in the
724
            // new unit.
725
            if stream.config.time_unit != request.new_config.time_unit {
726
                stream.last_time_updated = T::TimeProvider::now(&request.new_config.time_unit)
727
                    .ok_or(Error::<T>::CantFetchCurrentTime)?;
728
            }
729

            
730
            // Event
731
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
732
                stream_id,
733
                old_config: stream.config,
734
                new_config: request.new_config.clone(),
735
                deposit_change,
736
            });
737

            
738
            // Update config in storage.
739
            stream.config = request.new_config;
740
            stream.pending_request = None;
741
            Streams::<T>::insert(stream_id, stream);
742

            
743
            Ok(().into())
744
        }
745

            
746
        #[pallet::call_index(5)]
747
        #[pallet::weight(T::WeightInfo::cancel_change_request())]
748
        pub fn cancel_change_request(
749
            origin: OriginFor<T>,
750
            stream_id: T::StreamId,
751
        ) -> DispatchResultWithPostInfo {
752
            let origin = ensure_signed(origin)?;
753
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
754

            
755
            let accepter = stream
756
                .account_to_party(origin)
757
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
758

            
759
            let Some(request) = stream.pending_request.take() else {
760
                return Err(Error::<T>::NoPendingRequest.into());
761
            };
762

            
763
            ensure!(
764
                accepter == request.requester,
765
                Error::<T>::CanOnlyCancelOwnRequest
766
            );
767

            
768
            // Update storage.
769
            // Pending request is removed by calling `.take()`.
770
            Streams::<T>::insert(stream_id, stream);
771

            
772
            Ok(().into())
773
        }
774

            
775
        /// Allows immediately changing the deposit for a stream, which is simpler than
776
        /// calling `request_change` with the proper parameters.
777
        /// The call takes an asset id to ensure it has not changed (by an accepted request) before
778
        /// the call is included in a block, in which case the unit is no longer the same and quantities
779
        /// will not have the same scale/value.
780
        #[pallet::call_index(6)]
781
        #[pallet::weight(T::WeightInfo::immediately_change_deposit())]
782
        pub fn immediately_change_deposit(
783
            origin: OriginFor<T>,
784
            stream_id: T::StreamId,
785
            asset_id: T::AssetId,
786
            change: DepositChange<T::Balance>,
787
        ) -> DispatchResultWithPostInfo {
788
            let origin = ensure_signed(origin)?;
789
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
790

            
791
            ensure!(stream.source == origin, Error::<T>::UnauthorizedOrigin);
792
            ensure!(
793
                stream.config.asset_id == asset_id,
794
                Error::<T>::ImmediateDepositChangeRequiresSameAssetId
795
            );
796

            
797
            // Perform pending payment before changing deposit.
798
            Self::perform_stream_payment(stream_id, &mut stream)?;
799

            
800
            // Apply change.
801
            Self::apply_deposit_change(&mut stream, change)?;
802

            
803
            // Event
804
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
805
                stream_id,
806
                old_config: stream.config.clone(),
807
                new_config: stream.config.clone(),
808
                deposit_change: Some(change),
809
            });
810

            
811
            // Update stream in storage.
812
            Streams::<T>::insert(stream_id, stream);
813

            
814
            Ok(().into())
815
        }
816
    }
817

            
818
    impl<T: Config> Pallet<T> {
819
        /// Try to open a stream and returns its id.
820
        /// Prefers calling this function from other pallets instead of `open_stream` as the
821
        /// latter can't return the id.
822
        pub fn open_stream_returns_id(
823
            origin: AccountIdOf<T>,
824
            target: AccountIdOf<T>,
825
            config: StreamConfigOf<T>,
826
            initial_deposit: T::Balance,
827
        ) -> Result<T::StreamId, DispatchErrorWithPostInfo> {
828
            ensure!(origin != target, Error::<T>::CantBeBothSourceAndTarget);
829

            
830
            // Generate a new stream id.
831
            let stream_id = NextStreamId::<T>::get();
832
            let next_stream_id = stream_id
833
                .checked_add(&One::one())
834
                .ok_or(Error::<T>::StreamIdOverflow)?;
835
            NextStreamId::<T>::set(next_stream_id);
836

            
837
            // Hold opening deposit for the storage used by Stream
838
            let opening_deposit = T::OpenStreamHoldAmount::get();
839
            if opening_deposit > 0u32.into() {
840
                T::Currency::hold(&HoldReason::StreamOpened.into(), &origin, opening_deposit)?;
841
            }
842

            
843
            // Freeze initial deposit.
844
            T::Assets::increase_deposit(&config.asset_id, &origin, initial_deposit)?;
845

            
846
            // Create stream data.
847
            let now =
848
                T::TimeProvider::now(&config.time_unit).ok_or(Error::<T>::CantFetchCurrentTime)?;
849
            let stream = Stream {
850
                source: origin.clone(),
851
                target: target.clone(),
852
                config,
853
                deposit: initial_deposit,
854
                last_time_updated: now,
855
                request_nonce: 0,
856
                pending_request: None,
857
                opening_deposit,
858
            };
859

            
860
            // Insert stream in storage.
861
            Streams::<T>::insert(stream_id, stream);
862
            LookupStreamsWithSource::<T>::insert(origin, stream_id, ());
863
            LookupStreamsWithTarget::<T>::insert(target, stream_id, ());
864

            
865
            // Emit event.
866
            Pallet::<T>::deposit_event(Event::<T>::StreamOpened { stream_id });
867

            
868
            Ok(stream_id)
869
        }
870

            
871
        /// Get the stream payment current status, telling how much payment is
872
        /// pending, how much deposit will be left and whenever the stream is stalled.
873
        /// The stream is considered stalled if no funds are left or if the provided
874
        /// time is past a mandatory request deadline. If the provided `now` is `None`
875
        /// then the current time will be fetched. Being able to provide a custom `now`
876
        /// allows to check the status in the future. It is invalid to provide a `now` that is
877
        /// before `last_time_updated`.
878
        pub fn stream_payment_status(
879
            stream_id: T::StreamId,
880
            now: Option<T::Balance>,
881
        ) -> Result<StreamPaymentStatus<T::Balance>, Error<T>> {
882
            let stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
883
            let now = match now {
884
                Some(v) => v,
885
                None => T::TimeProvider::now(&stream.config.time_unit)
886
                    .ok_or(Error::<T>::CantFetchCurrentTime)?,
887
            };
888

            
889
            let last_time_updated = stream.last_time_updated;
890

            
891
            ensure!(
892
                now >= last_time_updated,
893
                Error::<T>::CantFetchStatusBeforeLastTimeUpdated
894
            );
895

            
896
            Self::stream_payment_status_by_ref(&stream, last_time_updated, now)
897
        }
898

            
899
        fn stream_payment_status_by_ref(
900
            stream: &StreamOf<T>,
901
            last_time_updated: T::Balance,
902
            mut now: T::Balance,
903
        ) -> Result<StreamPaymentStatus<T::Balance>, Error<T>> {
904
            let mut stalled_by_deadline = false;
905

            
906
            // Take into account mandatory change request deadline. Note that
907
            // while it'll perform payment up to deadline,
908
            // `stream.last_time_updated` is still the "real now" to avoid
909
            // retroactive payment in case the deadline changes.
910
            if let Some(ChangeRequest {
911
                kind: ChangeKind::Mandatory { deadline },
912
                ..
913
            }) = &stream.pending_request
914
            {
915
                now = min(now, *deadline);
916

            
917
                if now == *deadline {
918
                    stalled_by_deadline = true;
919
                }
920
            }
921

            
922
            // If deposit is zero the stream is fully drained and there is nothing to transfer.
923
            if stream.deposit.is_zero() {
924
                return Ok(StreamPaymentStatus {
925
                    payment: 0u32.into(),
926
                    deposit_left: stream.deposit,
927
                    stalled: true,
928
                });
929
            }
930

            
931
            // Dont perform payment if now is before or equal to `last_time_updated`.
932
            // It can be before due to the deadline adjustment.
933
            let Some(delta) = now.checked_sub(&last_time_updated) else {
934
                return Ok(StreamPaymentStatus {
935
                    payment: 0u32.into(),
936
                    deposit_left: stream.deposit,
937
                    stalled: true,
938
                });
939
            };
940

            
941
            // We compute the amount due to the target according to the rate, which may be
942
            // lowered if the stream deposit is lower.
943
            // Saturating is fine as it'll be clamped to the source deposit. It is also safer as
944
            // considering it an error can make a stream un-updatable if too much time has passed
945
            // without updates.
946
            let mut payment = delta.saturating_mul(stream.config.rate);
947

            
948
            // We compute the new amount of locked funds. If it underflows it
949
            // means that there is more to pay that what is left, in which case
950
            // we pay all that is left.
951
            let (deposit_left, stalled) = match stream.deposit.checked_sub(&payment) {
952
                Some(v) if v.is_zero() => (v, true),
953
                Some(v) => (v, stalled_by_deadline),
954
                None => {
955
                    payment = stream.deposit;
956
                    (Zero::zero(), true)
957
                }
958
            };
959

            
960
            Ok(StreamPaymentStatus {
961
                payment,
962
                deposit_left,
963
                stalled,
964
            })
965
        }
966

            
967
        /// Behavior:
968
        /// A stream payment consist of a locked deposit, a rate per unit of time and the
969
        /// last time the stream was updated. When updating the stream, **at most**
970
        /// `elapsed_time * rate` is unlocked from the source account and transfered to the target
971
        /// account. If this amount is greater than the left deposit, the stream is considered
972
        /// drained **but not closed**. The source can come back later and refill the stream,
973
        /// however there will be no retroactive payment for the time spent as drained.
974
        /// If the stream payment is used to rent a service, the target should pause the service
975
        /// while the stream is drained, and resume it once it is refilled.
976
        fn perform_stream_payment(
977
            stream_id: T::StreamId,
978
            stream: &mut StreamOf<T>,
979
        ) -> Result<T::Balance, DispatchErrorWithPostInfo> {
980
            let now = T::TimeProvider::now(&stream.config.time_unit)
981
                .ok_or(Error::<T>::CantFetchCurrentTime)?;
982

            
983
            // We want to update `stream.last_time_updated` to `now` as soon
984
            // as possible to avoid forgetting to do it. We copy the old value
985
            // for payment computation.
986
            let last_time_updated = stream.last_time_updated;
987
            stream.last_time_updated = now;
988

            
989
            let StreamPaymentStatus {
990
                payment,
991
                deposit_left,
992
                stalled,
993
            } = Self::stream_payment_status_by_ref(stream, last_time_updated, now)?;
994

            
995
            if payment.is_zero() {
996
                return Ok(0u32.into());
997
            }
998

            
999
            // Transfer from the source to target.
            T::Assets::transfer_deposit(
                &stream.config.asset_id,
                &stream.source,
                &stream.target,
                payment,
            )?;
            // Update stream info.
            stream.deposit = deposit_left;
            // Emit event.
            Pallet::<T>::deposit_event(Event::<T>::StreamPayment {
                stream_id,
                source: stream.source.clone(),
                target: stream.target.clone(),
                amount: payment,
                stalled,
            });
            Ok(payment)
        }
        fn apply_deposit_change(
            stream: &mut StreamOf<T>,
            change: DepositChange<T::Balance>,
        ) -> DispatchResultWithPostInfo {
            match change {
                DepositChange::Absolute(amount) => {
                    if let Some(increase) = amount.checked_sub(&stream.deposit) {
                        T::Assets::increase_deposit(
                            &stream.config.asset_id,
                            &stream.source,
                            increase,
                        )?;
                    } else if let Some(decrease) = stream.deposit.checked_sub(&amount) {
                        T::Assets::decrease_deposit(
                            &stream.config.asset_id,
                            &stream.source,
                            decrease,
                        )?;
                    }
                    stream.deposit = amount;
                }
                DepositChange::Increase(increase) => {
                    stream.deposit = stream
                        .deposit
                        .checked_add(&increase)
                        .ok_or(ArithmeticError::Overflow)?;
                    T::Assets::increase_deposit(&stream.config.asset_id, &stream.source, increase)?;
                }
                DepositChange::Decrease(decrease) => {
                    stream.deposit = stream
                        .deposit
                        .checked_sub(&decrease)
                        .ok_or(ArithmeticError::Underflow)?;
                    T::Assets::decrease_deposit(&stream.config.asset_id, &stream.source, decrease)?;
                }
            }
            Ok(().into())
        }
        /// Tries to apply a possibly immediate change. Return if that change was immediate and
        /// applied or not.
        ///
        /// If asset id and time unit are the same, we allow to make the change
        /// immediatly if the origin is at a disadvantage.
        /// We allow this even if there is already a pending request.
        fn maybe_immediate_change(
            stream_id: T::StreamId,
            stream: &mut StreamOf<T>,
            new_config: &StreamConfigOf<T>,
            deposit_change: Option<DepositChange<T::Balance>>,
            requester: Party,
        ) -> Result<bool, DispatchErrorWithPostInfo> {
            if new_config.time_unit != stream.config.time_unit
                || new_config.asset_id != stream.config.asset_id
            {
                return Ok(false);
            }
            if requester == Party::Source && new_config.rate < stream.config.rate {
                return Ok(false);
            }
            if requester == Party::Target && new_config.rate > stream.config.rate {
                return Ok(false);
            }
            // Perform pending payment before changing config.
            Self::perform_stream_payment(stream_id, stream)?;
            // We apply the requested deposit change.
            if let Some(change) = deposit_change {
                Self::apply_deposit_change(stream, change)?;
            }
            // Emit event.
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
                stream_id,
                old_config: stream.config.clone(),
                new_config: new_config.clone(),
                deposit_change,
            });
            // Update storage.
            stream.config = new_config.clone();
            Streams::<T>::insert(stream_id, stream);
            Ok(true)
        }
    }
}