RrunningW```
d5e8a39b创建于 1月17日历史提交
/*
Copyright (c) 2025 WuJingrun(吴京润)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
 */
package f_log

import std.collection.ArrayList
import std.collection.concurrent.{ArrayBlockingQueue, ConcurrentHashMap}
import std.env
import std.io.OutputStream
import std.sync.AtomicInt64
import std.time.DateTime
import stdx.log.{LogLevel, LogRecord}
import f_base.Console
import f_collection.*
import f_io.DummyOutputStream
import f_log.exception.LogException

private struct SyncQueueStream{
    let queue: ArrayBlockingQueue<() -> Unit>
    let output: OutputStream
    let refCount = AtomicInt64(0)
    init(output: OutputStream){
        let bufSize = LoggerConfig.getAsyncBufSize()
        queue = ArrayBlockingQueue<() -> Unit>(bufSize)
        this.output = match(output){
            case x: DummyOutputStream => output
            case _ => 
                SyncQueueOutputStream(bufSize * 10)
        }
        remove(queue)
    }
    private static func remove(queue: ArrayBlockingQueue<() -> Unit>): Unit {
        spawn {
            while (true) {
                try {
                    queue.remove()()
                } catch (e: Exception) {
                    e.printStackTrace()
                }
            }
        }
    }
}
public abstract class AsyncLogger <: AbstractLogger {
    private static let queueMap = ConcurrentHashMap<String, SyncQueueStream>()
    private let timeout: Duration
    private let policy: AsyncTimeoutPolicy
    private let queue: ArrayBlockingQueue<() -> Unit>
    protected var actualOutput: OutputStream
    private let queueTag: String
    public init(
        name: String,
        level: LogLevel,
        pattern: LogPattern,
        output: OutputStream,
        queueTag: String
    ) {
        super(name, level, pattern, confirmDummy(output, queueTag), queueTag)
        this.queueTag = queueTag
        this.actualOutput = output
        timeout = LoggerConfig.getAsyncTimeout()
        policy = LoggerConfig.getAsyncTimeoutPolicy()
        let qstream = queueStream(output, queueTag)
        this.queue = qstream.queue
        env.atExit{doClose(output, true, queueTag)}
    }
    public open func isClosed(): Bool {
        if(let r: Resource <- actualOutput){
            r.isClosed()
        }else{
            false
        }
    }
    private static func doClose(output: OutputStream, closable: Bool, tag: String): Unit {
        if(let Some(q) <- queueMap.get(tag) && q.refCount.fetchSub(1) == 1){
            func wait(q: ArrayBlockingQueue<() -> Unit>){
                while(q.size > 0){
                    sleep(Duration.Zero)// 循环次数可能很多,避免当前线程长时间占用CPU
                }
            }
            wait(q.queue)
            sleep(Duration.microsecond * 100)// 等待写线程能够把最后一个字节数组写出
        }
        if(closable && let r: Resource <- output &&!r.isClosed()){
            r.close()
        }
        queueMap.remove(tag)
    }
    public open func close(): Unit {
        doClose(actualOutput, closable, queueTag)
    }
    private static func confirmDummy(output: OutputStream, tag: String): OutputStream {
        queueStream(output, tag).output
    }
    private static func queueStream(output: OutputStream, tag: String){
        let q = queueMap.computeIfAbsent(tag){
            SyncQueueStream(output)
        }
        q.refCount.fetchAdd(1)
        q
    }
    private func append() {
        let buffer = (super.output as SyncQueueOutputStream).getOrThrow()
        if (buffer.size > 0) {
            append {
                buffer.writeTo(actualOutput)
            }
        }
    }
    public open func log(record: LogRecord): Unit {
        super.log(record)
        append()
    }
    protected func append(fn: () -> Unit): Unit {
        while (!queue.add(fn, timeout)) {
            match (policy) {
                case AlwaysWaiting => continue
                case Discard => return // 忽略
                case Abort => throw LogException("to add for current logger is timeout ${timeout}")
            }
        }
    }
    protected open func append(level: LogLevel, message: () -> String, now: DateTime, tid: Int64, ex: Option<Exception>): Unit {
        super.append(level, message, now, tid, ex)
        append()
    }
}
public struct SyncQueueOutputStream <: OutputStream {
    private static let pool: ArrayBlockingQueue<ArrayList<Array<Byte>>>
    static init() {
        let size = LoggerConfig.getAsyncBufSize()
        pool = ArrayBlockingQueue<ArrayList<Array<Byte>>>(size)
        for(i in 0 .. size){
            pool.add(ArrayList<Array<Byte>>())
        }
    }
    private let queue: ArrayBlockingQueue<ArrayList<Array<Byte>>>
    private let buffer = ThreadLocal<ArrayList<Array<Byte>>>()
    init(bufSize: Int64){
        queue = ArrayBlockingQueue<ArrayList<Array<Byte>>>(bufSize)
    }
    public func write(bytes: Array<Byte>): Unit {
        let buf = if(let Some(buf) <- buffer.get()){
            buf
        }else if (let Some(buf) <- pool.remove(Duration.millisecond * 5)) {
            buffer.set(buf)
            buf
        } else {
            Console.writeln('AsyncLogger.SyncQueueOutputStream.EmptyPool')
            return
        }
        // if(bytes[bytes.size - 1] == b'\0'){//todo 这个做法性能有些低,但是性能高的做法会导致BUG。
        //     let b = bytes[0 .. bytes.size - 1]
        //     buf.add(b)
        //     queue.add(buf)
        //     buffer.set(None)
        // }else{
        //     buf.add(bytes)
        // }
        if(bytes[0] == b'\0'){
            queue.add(buf)
            buffer.set(None)
        }else{
            buf.add(bytes)
        }
    }
    prop size: Int64 {
        get(){
            queue.size
        }
    }
    func writeTo(output: OutputStream): Unit {
        while (let Some(buf) <- queue.peek()){
            try{
                if(buf.size > 0){
                    for(bytes in buf where bytes.size > 0) {
                        output.write(bytes)
                    }
                }
            } finally {
                output.flush()
                returnBuffer(queue.remove())
            }
        }
    }
    private func returnBuffer(buf: ArrayList<Array<Byte>>){
        buf.clear()
        pool.add(buf)
    }
}