f4839374创建于 2025年10月2日历史提交
/*
Copyright (c) 2025 WuJingrun(吴京润)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package f_rx

import f_concurrent.{RateLimiter, AnyMomentRateLimiter, LeakingBucketRateLimiter, SlidingWindowRateLimiter, TokenBucketRateLimiter}

/**
 * limiter 是限流算法,policy是被限流算法命中时执行的策略,discardedPolicy是被限流算法丢弃时的背压策略
 */
public class RateLimited<T> <: BackPressure<T> {
    public RateLimited(scheduler: SingleScheduler<T>, 
        private let limiter: RateLimiter<Unit>,
        private let policy!: BackPressure<T> = CurrentThread<T>(),
        private let discardedPolicy!: BackPressure<T> = Discarding<T>()){
        super.scheduler = scheduler
        policy.scheduler = scheduler
        discardedPolicy.scheduler = scheduler
    }
    public init(limiter: RateLimiter<Unit>, policy!: BackPressure<T> = CurrentThread<T>(), discardedPolicy!: BackPressure<T> = Discarding<T>()){
        this.limiter = limiter
        this.policy = policy
        this.discardedPolicy = discardedPolicy
    }
    public static func tokenBucket(scheduler: SingleScheduler<T>, tokens!: Int64, timeout!: Duration, populationPeriod!: Duration, 
        policy!: BackPressure<T> = CurrentThread<T>(), discardedPolicy!: BackPressure<T> = Discarding<T>()) {
        RateLimited<T>(scheduler, TokenBucketRateLimiter<Unit>(tokens: tokens, timeout: timeout, populationPeriod: populationPeriod), policy: policy, discardedPolicy: discardedPolicy)
    }
    public static func tokenBucket(tokens!: Int64, timeout!: Duration, populationPeriod!: Duration, 
        policy!: BackPressure<T> = CurrentThread<T>(), discardedPolicy!: BackPressure<T> = Discarding<T>()){
        RateLimited<T>(TokenBucketRateLimiter<Unit>(tokens: tokens, timeout: timeout, populationPeriod: populationPeriod), policy: policy, discardedPolicy: discardedPolicy)
    }
    public static func slidingWindow(scheduler: SingleScheduler<T>, window!: Duration, timeout!: Duration, limit!: Int64, 
        policy!: BackPressure<T> = CurrentThread<T>(), discardedPolicy!: BackPressure<T> = Discarding<T>()){
        RateLimited<T>(scheduler, SlidingWindowRateLimiter<Unit>(window: window, timeout: timeout, limit: limit), policy: policy, discardedPolicy: discardedPolicy)
    }
    public static func slidingWindow(window!: Duration, timeout!: Duration, limit!: Int64, 
        policy!: BackPressure<T> = CurrentThread<T>(), discardedPolicy!: BackPressure<T> = Discarding<T>()){
        RateLimited<T>(SlidingWindowRateLimiter<Unit>(window: window, timeout: timeout, limit: limit), policy: policy, discardedPolicy: discardedPolicy)
    }
    public static func leakingBucket(scheduler: SingleScheduler<T>, timeout!: Duration, maxWaitings!: Int64, leakingPerDuration!: Int64, leakingDuration!: Duration, 
        policy!: BackPressure<T> = CurrentThread<T>(), discardedPolicy!: BackPressure<T> = Discarding<T>()){
        RateLimited<T>(scheduler, LeakingBucketRateLimiter<Unit>(timeout: timeout, maxWaitings: maxWaitings, leakingPerDuration: leakingPerDuration, leakingDuration: leakingDuration), policy: policy, discardedPolicy: discardedPolicy)
    }
    public static func leakingBucket(timeout!: Duration, maxWaitings!: Int64, leakingPerDuration!: Int64, leakingDuration!: Duration, 
        policy!: BackPressure<T> = CurrentThread<T>(), discardedPolicy!: BackPressure<T> = Discarding<T>()){
        RateLimited<T>(LeakingBucketRateLimiter<Unit>(timeout: timeout, maxWaitings: maxWaitings, leakingPerDuration: leakingPerDuration, leakingDuration: leakingDuration), policy: policy, discardedPolicy: discardedPolicy)
    }
    public static func anyMoment(scheduler: SingleScheduler<T>, maxTokens!: Int64, timeout!: Duration, 
        policy!: BackPressure<T> = CurrentThread<T>(), discardedPolicy!: BackPressure<T> = Discarding<T>()){
        RateLimited<T>(scheduler, AnyMomentRateLimiter<Unit>(maxTokens: maxTokens, timeout: timeout), policy: policy, discardedPolicy: discardedPolicy)
    }
    public static func anyMoment(maxTokens!: Int64, timeout!: Duration, 
        policy!: BackPressure<T> = CurrentThread<T>(), discardedPolicy!: BackPressure<T> = Discarding<T>()){
        RateLimited<T>(AnyMomentRateLimiter<Unit>(maxTokens: maxTokens, timeout: timeout), policy: policy, discardedPolicy: discardedPolicy)
    }
    
    protected func schedule(state: SingleSchedulerState<T>): Unit {
        let exe = limiter.exec{
            policy.schedule(state)
        }
        if(exe.isNone()){
            discardedPolicy.schedule(state)
        }
    }
    protected mut prop scheduler: SingleScheduler<T> {
        get(){
            super.scheduler
        }
        set(value){
            super.scheduler = value
            policy.scheduler = value
            discardedPolicy.scheduler = value
        }
    }
    protected func clone(): BackPressure<T> {
        let p = policy.clone()
        let dp = discardedPolicy.clone()
        let bp = RateLimited<T>(this.limiter, policy: p, discardedPolicy: dp)
        bp.scheduler_ = this.scheduler_
        p.scheduler_ = this.scheduler_
        dp.scheduler_ = this.scheduler_
        bp
    }
}