/*
Copyright (c) 2025 WuJingrun(吴京润)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
 */
package f_rx

import std.collection.*
import std.sync.*
import std.unittest.*
import std.unittest.testmacro.*

@Test
public class RxcjTest{
    private let mutex = Mutex()
    @TestCase
    public func testFutureIterableAndImmediately(): Unit {
        let i = AtomicInt64(1)
        Observable<Int64>.iterable({=> spawn{[1,2,3]}})
        .subscribe(FuncObserver<Int64>('testFutureIterableAndImmediately').setNext{v: Int64 => 
            synchronized(mutex){
                @Assert(i.load(), v)
                i.fetchAdd(1)
                println(v)
            }
        }.setComplete{
            synchronized(mutex){
                println("complete testFutureIterableAndImmediately")
            }
        }).immediately()
    }
    @TestCase
    public func testFutureIterableAndDefer(): Unit {
        let i = AtomicInt64(2)
        Observable<Int64>.iterable({=> spawn{[2,4,6]}})
        .subscribe(FuncObserver<Int64>('testFutureIterableAndDefer').setNext{v: Int64 => 
            synchronized(mutex){
                @Assert(i.load(), v)
                i.fetchAdd(2)
                println(v)
            }
        }.setComplete{
            synchronized(mutex){
                println("complete testFutureIterableAndDefer")
            }
        }).defer()
    }
    @TestCase
    public func testIterableAndDefer(): Unit {
        let i = AtomicInt64(1)
        Observable<Int64>.iterable({=> spawn{[1,3,5]}})
        .subscribe(FuncObserver<Int64>('testIterableAndDefer').setNext{v: Int64 => 
            synchronized(mutex){
                @Assert(i.load(), v)
                i.fetchAdd(2)
                println(v)
            }
        }.setComplete{
            synchronized(mutex){
                println("complete testIterableAndDefer")
            }
        }).defer()
    }
    @TestCase
    public func testIterableAndImmediately(): Unit {
        let i = AtomicInt64(11)
        Observable<Int64>.iterable({=> spawn{[11,13,15]}})
        .subscribe(FuncObserver<Int64>('testIterableAndImmediately').setNext{v: Int64 => 
            synchronized(mutex){
                @Assert(i.load(), v)
                i.fetchAdd(2)
                println(v)
            }
        }.setComplete{
            synchronized(mutex){
                println("complete testIterableAndImmediately")
            }
        }).immediately()
    }
    @TestCase
    public func testEmptyAndImmediately(): Unit {
        Observable<Int64>.empty()
        .subscribe(FuncObserver<Int64>('testEmptyAndImmediately').setNext{v: Int64 => 
            throw Exception('unreachable')
        }.setComplete{
            synchronized(mutex){
                println("complete testEmptyAndImmediately")
            }
        }).immediately()
    }
    @TestCase
    public func testEmptyAndDefer(): Unit {
        Observable<Int64>.empty()
        .subscribe(FuncObserver<Int64>('testEmptyAndDefer').setNext{v: Int64 => 
            throw Exception('unreachable')
        }.setComplete{
            synchronized(mutex){
                println("complete testEmptyAndDefer")
            }
        }).defer()
    }
    @TestCase
    public func testSingleAndDefer(): Unit {
        Observable<Int64>.single(100)
        .subscribe(FuncObserver<Int64>('testSingleAndDefer').setNext{v: Int64 => 
            synchronized(mutex){
                @Assert(100, v)
                println(100)
            }
        }.setComplete{
            synchronized(mutex){
                println("complete testSingleAndDefer")
            }
        }).defer()
    }
    @TestCase
    public func testSingleAndImmediately(): Unit {
        Observable<Int64>.single(101)
        .subscribe(FuncObserver<Int64>('testSingleAndImmediately').setNext{v: Int64 => 
            synchronized(mutex){
                @Assert(101, v)
                println(101)
            }
        }.setComplete{
            synchronized(mutex){
                println("complete testSingleAndImmediately")
            }
        }).immediately()
    }
    @TestCase
    public func testMaybeAndImmediately(): Unit {
        Observable<Int64>.maybe(200)
        .subscribe(FuncObserver<Int64>('testMaybeAndImmediately').setNext{v: Int64 => 
            synchronized(mutex){
                @Assert(200, v)
                println(200)
            }
        }.setComplete{
            synchronized(mutex){
                println("complete testMaybeAndImmediately")
            }
        }).immediately()
    }
    @TestCase
    public func testConcatAndImmediately(): Unit {
        let n = AtomicInt64(10)
        Observable<Int64>.concat([[10, 20, 30], [40, 50, 60], [70, 80, 90]])
        .subscribe(FuncObserver<Int64>('testConcatAndImmediately').setNext{v: Int64 => 
            synchronized(mutex){
                @Assert(n.load(), v)
                n.fetchAdd(10)
                println(v)
            }
        }.setComplete{
            synchronized(mutex){
                println("complete testConcatAndImmediately")
            }
        }).immediately()
    }
    @TestCase
    public func testEmitterAndImmediately(): Unit {
        let m = AtomicInt64(1000)
        let n = AtomicInt64(1000)
        Observable<Int64>.emitter({emitter =>
            while(let v <- m.fetchAdd(1000) && v <= 3000){
                emitter.onNext(v)
            }
            emitter.onComplete()
        })
        .subscribe(FuncObserver<Int64>('testEmitterAndImmediately').setNext{v: Int64 => 
            synchronized(mutex){
                @Assert(n.load(), v)
                n.fetchAdd(1000)
                println(v)
            }
        }.setComplete{
            synchronized(mutex){
                println("complete testConcatAndImmediately")
            }
        }).immediately()
        // sleep(Duration.second * 5)
    }
    @TestCase
    public func testEmitterErrorAndImmediately(): Unit {
        let m = AtomicInt64(4000)
        let n = AtomicInt64(4000)
        Observable<Int64>.emitter({emitter =>
            while(let v <- m.fetchAdd(1000) && v <= 3000){
                emitter.onNext(v)
            }
            emitter.onError(Exception('too large'))
        })
        .subscribe(FuncObserver<Int64>('testEmitterErrorAndImmediately').setNext{v: Int64 => 
            synchronized(mutex){
                @Assert(n.load(), v)
                n.fetchAdd(1000)
                println(v)
            }
        }.setComplete{
            synchronized(mutex){
                throw Exception('unreachable')
            }
        }.setError{e => 
            synchronized(mutex){
                @Assert("too large", e.message)
                println('emitter error too large ${e.message}')
            }
        }).immediately()
    }
    @TestCase
    public func testCurrentAndImmediately(): Unit {
        let map = HashMap<Int64, ArrayList<Int64>>()
        Observable<Int64>.iterable([10000, 20000, 30000])
        .subscribe(FuncObserver<Int64>('testCurrentAndImmediately').setNext{v: Int64 => 
            synchronized(mutex){
                println(v)
                map.addIfAbsent(Thread.currentThread.id, ArrayList<Int64>())?.add(v)
            }
        }.setComplete{
            synchronized(mutex){
                println("complete testAlwaysNewAndImmediately")
            }
        }).withCurrent().immediately()
        sleep(Duration.second * 5)
        println('currently threads: ${map}')
        @Assert(map.size, 1)
    }
    @TestCase
    public func testAlwaysNewAndImmediately(): Unit {
        let map = HashMap<Int64, ArrayList<Int64>>()
        Observable<Int64>.iterable([10000, 20000, 30000])
        .subscribe(FuncObserver<Int64>('testAlwaysNewAndImmediately').setNext{v: Int64 => 
            synchronized(mutex){
                println(v)
                map.addIfAbsent(Thread.currentThread.id, ArrayList<Int64>())?.add(v)
            }
        }.setComplete{
            synchronized(mutex){
                println("complete testAlwaysNewAndImmediately")
            }
        }).withAlwaysNew().immediately()
        sleep(Duration.second * 5)
        println('always new threads: ${map}')
        @Assert(map.size, 3)
    }
    @TestCase
    public func testFixedAndImmediately(): Unit {
        let map = HashMap<Int64, ArrayList<Int64>>()
        Observable<Int64>.iterable([40000, 50000, 60000])
        .subscribe(FuncObserver<Int64>('testFixedAndImmediately').setNext{v: Int64 => 
            synchronized(mutex){
                println(v)
                map.addIfAbsent(Thread.currentThread.id, ArrayList<Int64>())?.add(v)
            }
        }.setComplete{
            synchronized(mutex){
                println("complete testFixedAndImmediately")
            }
        }).withFixed(2).immediately()
        sleep(Duration.second * 5)
        println('fixed threads: ${map}')
        @Assert(map.size, 2)
    }
    @TestCase
    public func testMultiSyncObserver(): Unit {
        let i = AtomicInt64(4000)
        let onet = AtomicOptionReference<Thread>(None<Thread>)
        let twot = AtomicOptionReference<Thread>(None<Thread>)
        Observable<Int64>.iterable([4000,5000,6000], asyncCombined: false)
        .subscribe(FuncObserver<Int64>('testMultiSyncObserver-0').setNext{v: Int64 => 
            synchronized(mutex){
                onet.store(Thread.currentThread)
                @Assert(i.load(), v)
                i.fetchAdd(1000)
                println('${v} one')
            }
        }.setComplete{
            synchronized(mutex){
                println("complete testMultiObserver one")
            }
        }).subscribe(FuncObserver<Int64>('testMultiSyncObserver-1').setNext{v: Int64 => 
            twot.store(Thread.currentThread)
            println('${v} two')
        }.setComplete{
            synchronized(mutex){
                println("complete testMultiObserver two")
            }
        }).immediately()
        sleep(Duration.second * 5)
        let sameThread = refEq(onet.load().getOrThrow(), twot.load().getOrThrow())
        @Assert(sameThread, true)
    }
    @TestCase
    public func testMultiAsyncObserver(): Unit {
        let i = AtomicInt64(4000)
        let onet = AtomicOptionReference<Thread>(None<Thread>)
        let twot = AtomicOptionReference<Thread>(None<Thread>)
        Observable<Int64>.iterable([4000,5000,6000], asyncCombined: true)
        .subscribe(FuncObserver<Int64>('testMultiAsyncObserver-0').setNext{v: Int64 => 
            synchronized(mutex){
                println('${v} one')
                onet.store(Thread.currentThread)
                @Assert(i.load(), v)
                i.fetchAdd(1000)
            }
        }.setComplete{
            synchronized(mutex){
                println("complete testMultiObserver one")
            }
        }).subscribe(FuncObserver<Int64>('testMultiAsyncObserver-1').setNext{v: Int64 => 
            twot.store(Thread.currentThread)
            println('${v} two')
        }.setComplete{
            synchronized(mutex){
                println("complete testMultiObserver two")
            }
        }).immediately()
        sleep(Duration.second * 5)
        let sameThread = refEq(onet.load().getOrThrow(), twot.load().getOrThrow())
        @Assert(sameThread, false)
    }
    @TestCase
    public func testSleep(){
        sleep(Duration.second * 5)
    }
}