/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 * This source file is part of the Cangjie project, licensed under Apache-2.0
 * with Runtime Library Exception.
 *
 * See https://cangjie-lang.cn/pages/LICENSE for license information.
 */

package stdx.net.http

import std.sync.{Timer, Mutex, AtomicUInt32, AtomicBool, AtomicUInt8, AtomicUInt64}
import std.net.{SocketAddress, StreamingSocket, SocketException}
import std.collection.{ArrayList, HashMap}
import stdx.log.LogLevel
import stdx.net.tls.common.TlsException
import std.time.MonoTime

class HttpServer2 <: ProtocolService {
    // a wrapped tls connection
    let conn: BufferedConn

    // frames to be send
    // level of DATA/HEADERS/CONTINUATION/PUSH_PROMISE frames is 1, level of other frames is 0
    let responseQueue = SPMCLevelQueue<Frame>(2)

    // requests to be handled, queued by 7 - urgency
    let requestQueue = SPMCLevelQueue<Stream>(8)

    // local settings can be set by serverBuilder
    // remote settings are set to default, they will be reset when receiving SETTINGS frame from client
    var localSettings = HashMap<UInt16, UInt32>()
    var remoteSettings = initializeSettings()

    // settings ack timer, it will be set and started when sending SETTINGS frame, and canceled when receiving SETTINGS frame with ack flag
    // in this implementation, SETTINGS frame will be sent only once
    var settingsTimer: ?Timer = None

    // stream management
    private let streams = HashMap<UInt32, Stream>()
    private let streamsMutex = Mutex()

    private var lastClientStreamId = UInt32(0)
    private var lastPushStreamId = UInt32(0)
    private let activeClientStreamNum = AtomicUInt32(0)
    private let activeClientRstStreamNum = AtomicUInt32(0)
    private let activePushStreamNum = AtomicUInt32(0)
    private let activePushRstStreamNum = AtomicUInt32(0)
    private let pushMutex = Mutex()

    // closing means service is in graceful closing, and it will refuse new requests
    private var closing = false

    // quit means underlying tls connection is closed
    private var quit = AtomicBool(false)

    // flow control on connection level
    var localWindow = AtomicUInt32(DEFAULT_WINDOW_SIZE)
    var remoteWindow = AtomicUInt32(DEFAULT_WINDOW_SIZE)
    //threshold to send window update
    var flowThreshold = DEFAULT_WINDOW_SIZE / 2

    // hpack
    let encoder = Encoder(name: "HttpServer2")
    let decoder = Decoder(encoder, name: "HttpServer2")
    let fieldsWriter: FieldsWriter

    // default value is 16384, will be reset when receiving ack to initialSettings
    var localMaxFrameSize = MIN_FRAME_SIZE

    // remote address
    var remoteAddress: ?SocketAddress = None

    // frame header buffer
    let frameHeaderBuffer = Array<Byte>(FRAME_HEAD_LEN, repeat: 0)

    // zeroValue is more suitable...
    var streamPool: ?PutSafeRingPool<Any> = None
    var arrayPool: ?ArrayPool = None

    // quitDone will be 0 when all threads are returned
    let quitDone = AtomicUInt8(3)

    /***************************************************** serve *****************************************************/
    init(socket: StreamingSocket) {
        this.conn = BufferedConn(socket)
        this.fieldsWriter = FieldsWriter(conn.bufferedWriter)
    }

    protected func serve() {
        if (logger.enabled(LogLevel.DEBUG)) {
            httpLogDebug(logger, "[HttpServer2#serve] Http/2.0 serve...")
        }

        conn.logger = logger
        decoder.logger = logger
        encoder.logger = logger
        remoteAddress = conn.socket.remoteAddress
        storeLocalSettings()

        // stream pool
        if (server.streamPools.isNone() && server.maxConcurrentStreams > 0) {
            server.streamPools = ConcurrentRingPool<PutSafeRingPool<Any>>(MAX_STREAM_POOLS_CAPACITY,
                STREAM_POOLS_THRESHOLD, newFn: {=> PutSafeRingPool<Any>(MAX_STREAM_POOL_CAPACITY, newFn: {=> Stream()})}
            )
        }
        streamPool = server.streamPools?.get()

        // array pool
        match (server.arrayPool) {
            case Some(v) => this.arrayPool = v
            case None =>
                let v = ArrayPool(MAX_ARRAY_POOL_CAPACITY, ARRAY_POOL_THRESHOLD)
                server.arrayPool = v
                this.arrayPool = v
        }

        // start threads
        let connId = ThreadContext.connId
        writeThread(connId)
        handleThread(connId)
        mainThread() // read thread

        // main thread returned
        handbackStream()
    }

    private func storeLocalSettings() {
        // SETTINGS_HEADER_TABLE_SIZE
        localSettings.add(SettingsHeaderTableSize.code, server.headerTableSize)
        decoder.setHeaderTableSizeLimit(Int64(server.headerTableSize))
        encoder.setHeaderTableSizeLimit(Int64(server.headerTableSize))
        // SETTINGS_MAX_CONCURRENT_STREAMS
        localSettings.add(SettingsMaxConcurrentStreams.code, server.maxConcurrentStreams)
        // SETTINGS_INITIAL_WINDOW_SIZE
        localSettings.add(SettingsInitialWindowSize.code, server.initialWindowSize)
        // we can set initialWindowSize only when constructing a server instance, so the flowThreshold will not be changed.
        flowThreshold = server.initialWindowSize / 2
        // SETTINGS_MAX_FRAME_SIZE
        localSettings.add(SettingsMaxFrameSize.code, server.maxFrameSize)
        // SETTINGS_MAX_HEADER_LIST_SIZE
        localSettings.add(SettingsMaxHeaderListSize.code, server.maxHeaderListSize)
        decoder.maxHeaderListSize = Int64(server.maxHeaderListSize)
        // SETTINGS_ENABLE_CONNECT_PROTOCOL
        if (server.enableConnectProtocol) {
            localSettings.add(SettingsEnableConnectProtocol.code, 1)
        } else {
            localSettings.add(SettingsEnableConnectProtocol.code, 0)
        }
        // this implementation does not support rfc 7540, use rfc 9218 instead
        localSettings.add(SettingsNoRfc7540Priorities.code, 1)
    }

    private func mainThread() {
        try {
            exchangePreface()
        } catch (e: Exception) {
            httpLogWarn(logger, "[HttpServer2#mainThread] exchange preface failed, ${e}")
            shutdown(shouldSleep: false)
            return
        }

        try {
            while (!quit.load()) {
                readFrames()
            }
        } catch (e: SocketException | ConnectionException | TlsException) {
            httpLogWarn(logger, "[HttpServer2#mainThread] read frame failed, ${e}")
            shutdown(shouldSleep: false)
        } catch (ce: HttpConnectionException) {
            close(ce.h2Error, ce.message)
        } catch (he: HpackException) {
            close(CompressionError, he.message)
        } catch (e: Exception) {
            close(H2Error.InternalError, e.toString())
        }

        if (logger.enabled(LogLevel.DEBUG)) {
            httpLogDebug(logger, "[HttpServer2#mainThread] connection closed, read thread returned")
        }
    }

    /***************************************************** h2 preface *****************************************************/
    // cjlint-ignore -start !G.OTH.03
    /*
     * Initialize h2 connection:
     *      step 1: receive and check client preface, "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".toArray()
     *      step 2: send server initialSettings, which carry localSettings
     *      step 3: receive and apply client initialSettings
     * for details: https://www.rfc-editor.org/rfc/rfc9113.html.html#preface
     *
     * @throws SocketException, ConnectionException or TlsException, if something wrong with socket
     * @throws HttpConnectionException, if some h2 connection error occurs
     */
    // cjlint-ignore -end
    private func exchangePreface() {
        var prefaceReceived = Array<UInt8>(PREFACE.size, repeat: 0)
        let _ = conn.readFull(prefaceReceived)
        if (prefaceReceived != PREFACE) {
            throw HttpConnectionException(ProtocolError, "Initialize h2 connection failed, client preface is required.")
        }

        sendSettings()

        let firstFrameReceived = match (Frame.read(conn, headerBuf: frameHeaderBuffer)) {
            case Some(v) => v as SettingsFrame
            case None => throw HttpConnectionException(ProtocolError,
                "Initialize h2 connection failed, settings frame is required.")
        }
        match (firstFrameReceived) {
            case Some(settingsFrame) =>
                if (settingsFrame.payloadLen > localMaxFrameSize) {
                    throw HttpConnectionException(ProtocolError,
                        "Size of initial settings exceeds default max frame size ${MIN_FRAME_SIZE}.")
                }
                onSettingsRead(settingsFrame)
            case None => throw HttpConnectionException(ProtocolError,
                "Initialize h2 connection failed, settings frame is required.")
        }

        if (logger.enabled(LogLevel.DEBUG)) {
            httpLogDebug(logger, "[HttpServer2#exchangePreface] initialize h2 connection succeed")
        }
    }

    /***************************************************** read thread *****************************************************/
    /*
     * Read and process frames from tls conn.
     *
     * @throws SocketException, ConnectionException or TlsException, if something wrong with socket
     * @throws HttpConnectionException, if some h2 connection error occurs
     * @throws HpackException, if decode fields failed
     */
    private func readFrames() {
        // may throw HttpConnectionException, SocketException, TlsException
        var frame = match (Frame.read(conn, maxFrameSize: localSettings[SettingsMaxFrameSize.code],
            headerBuf: frameHeaderBuffer, arrayPool: arrayPool)) {
            case Some(v) => v
            case None => return
        }
        if (logger.enabled(LogLevel.DEBUG)) {
            httpLogDebug(logger, "[HttpServer2#readFrames] received frame: ${frame}, stream id: ${frame.streamId}.")
        }

        if (frame.payloadLen > localMaxFrameSize) {
            throw HttpConnectionException(ProtocolError,
                "Frame size exceeds max frame size ${localMaxFrameSize}, which is set by server.")
        }

        // read and process HEADERS + CONTINUATION, which may be headers/trailers of request
        if (let Some(hf) <- (frame as HeadersFrame)) {
            if (logger.enabled(LogLevel.TRACE)) {
                httpLogTrace(logger, "[HttpServer2#readFrames] header frame received.")
            }
            readAndProcessFields(hf)
            return
        }

        // process global frames
        if (frame.streamId == 0) {
            preProcessGlobalFrame(frame)
            return
        }

        // process other frames on stream
        // process flow control when receiving DATA frame
        let (streamOp, isPurged) = getStream(frame.streamId)
        match (streamOp) {
            case None =>
                if (isPurged) {
                    if (frame is DataFrame) {
                        if (logger.enabled(LogLevel.TRACE)) {
                            httpLogTrace(logger, "[HttpServer2#readFrames] Stream ${frame.streamId} is purged. Data frame received.")
                        }
                        processDataFlow(frame.payloadLen)
                        windowUpdateOnDataConsumed(frame.payloadLen)
                    }
                } else {
                    throw HttpConnectionException(ProtocolError, "Unexpected frame on idle stream ${frame.streamId}.")
                }
            case Some(stream) =>
                if (frame is DataFrame) {
                    if (logger.enabled(LogLevel.DEBUG)) {
                        httpLogDebug(logger, "[HttpServer2#readFrames] Data frame received.")
                    }
                    processDataFlow(frame.payloadLen)

                    if (stream.dataBBQ.isClosed()) {
                        if (logger.enabled(LogLevel.DEBUG)) {
                            httpLogDebug(logger, "[HttpServer2#readFrames] Data queue has been closed.")
                        }
                        windowUpdateOnDataConsumed(frame.payloadLen)
                    }
                }
                preProcessStreamFrame(frame, stream)
        }
    }

    /**
     * @throws HttpConnectionException, if flow control check fail
     */
    private func processDataFlow(len: UInt32): Unit {
        // Local window should decrease when receiving data frame, increase when sending windowUpdate frame,
        if (localWindow.load() < len) {
            throw HttpConnectionException(FlowControlError,
                "PayloadLen of DATA frame exceeds local window on connection level.")
        }
        localWindow.fetchSub(len)
    }

    func windowUpdateOnDataConsumed(len: UInt32): Unit {
        if (len <= 0) {
            return
        }

        // Release local window when Data-Frame has been consumed.
        // Send Window-Update to peer.
        sendWindowUpdate()
    }

    /**
     * The first HEADERS frame on a stream will be treated as part of headers of request.
     * An Idle stream will be create or get (if it has already been created when receiving PRIORITY_UPDATE frame),
     * and read timer on new stream will be started, when read the first HEADERS frame, and the connection is not on closing.
     * The subsequent HEADERS frame on a stream will be treated as part of trailers of request.
     *
     * If the connection is on closing, or the related stream has already been purged, the fields will not be processed on stream level.
     * But the fields will always be decoded, in order to update the status of hpack.
     *
     * HEADERS frame and CONTINUATION frame of a field entity (that is, headers or trailers of request) must be received continuously on connection level.
     *
     * @throws SocketException, ConnectionException or TlsException, if something wrong with socket
     * @throws HttpConnectionException, if some h2 connection error occurs
     * @throws HpackException, if decode fields failed
     */
    private func readAndProcessFields(hf: HeadersFrame): Unit {
        let streamId = hf.streamId

        let streamOp: ?Stream = match (getStream(streamId)) { // (streamOp, isPurged)
            // an existed stream (trailers may received on existed streams)
            case (Some(v), _) => v
            // a new stream
            case (None, false) => createClientStream(streamId, startReadTimer: true)
            // a purged stream, we should decode and then discard the fields
            case _ => None
        }

        let fieldsList = if (hf.headerEnd) {
            // fast path, only HeadersFrame
            if (logger.enabled(LogLevel.TRACE)) {
                httpLogTrace(logger, "[HttpServer2#readAndProcessFields] Got END_HEADERS.")
            }
            decodeFields(decoder, hf.fieldBlock)
        } else {
            // slow path, HeadersFrame + ContinuationFrames
            let cfs = ArrayList<Frame>(1)
            let fieldsBlocks = ArrayList<UInt8>()
            fieldsBlocks.add(all: hf.fieldBlock)
            var headerEnd = false
            while (!headerEnd) {
                let cf = Frame
                    .read(conn, arrayPool: arrayPool)
                    .getOrThrow(
                        {
                            => HttpConnectionException(ProtocolError,
                                "Headers and continuation frames should be continuous on connection level.")
                        }) as ContinuationFrame ?? throw HttpConnectionException(ProtocolError, // cjlint-ignore !G.EXP.03
                    "Headers and continuation frames should be continuous on connection level.")
                if (cf.streamId != streamId) {
                    throw HttpConnectionException(ProtocolError, "Unexpected Continuation on stream ${streamId}.")
                }
                cfs.add(cf)
                fieldsBlocks.add(all: cf.fieldBlock)
                headerEnd = cf.headerEnd
            }
            for (cf in cfs) {
                cf.recyclePayload(arrayPool)
            }
            decodeFields(decoder, fieldsBlocks)
        }
        // make sure payload has no slice
        hf.recyclePayload(arrayPool)

        if (let Some(stream) <- streamOp) {
            let fieldsFrame = FieldsFrame(streamId, fieldsList, last: hf.streamEnd)
            preProcessStreamFrame(fieldsFrame, stream)
        }
    }

    var lastExceptionTime = MonoTime.now()

    /*
     * Process frame on stream level.
     *
     * @throws HttpConnectionException, if some h2 connection error occurs
     * @throws HpackException, if decode fields failed
     */
    private func preProcessStreamFrame(frame: Frame, stream: Stream): Unit {
        try {
            stream.preProcess(frame)
        } catch (se: HttpStreamException) {
            // stream exception must be caught and processed here, because the outer function can not get the stream
            let now = MonoTime.now()
            let needSlow = now - lastExceptionTime < Duration.second * 5
            lastExceptionTime = now
            stream.close(se.h2Error, se.message, slow: needSlow)
        }
    }

    /***************************************************** handle thread *****************************************************/
    private func handleThread(connId: ?UInt64) { //never close the connection
        spawn {
            ThreadContext.connId = connId
            try {
                handleRequests()
            } finally {
                handbackStream()
            }
        }
    }

    private func handleRequests() {
        if (logger.enabled(LogLevel.DEBUG)) {
            httpLogDebug(logger, "[HttpServer2#handleRequests] handle thread init")
        }
        let reqCount = AtomicUInt64(0)
        while (!quit.load()) {
            let stream = requestQueue.receive() ?? continue
            let connId = ThreadContext.connId
            spawn {
                ThreadContext.connId = connId
                reqCount.fetchAdd(1)
                try {
                    stream.handle()
                } catch (e: Exception) {
                    stream.close(H2Error.InternalError, e.message)
                } finally {
                    reqCount.fetchSub(1)
                }
            }
        }
        while (reqCount.load() > 0) {
            sleep(Duration.millisecond * 200)
        }
        if (logger.enabled(LogLevel.DEBUG)) {
            httpLogDebug(logger, "[HttpServer2#handleRequests] connection closed, handle thread returned")
        }
    }

    /***************************************************** write thread *****************************************************/
    private func writeThread(connId: ?UInt64) {
        spawn {
            ThreadContext.connId = connId
            try {
                writeFrames()
            } finally {
                handbackStream()
            }
        }
    }

    private func handbackStream() {
        if (quitDone.fetchSub(1) == 1) {
            server.streamPools?.put(streamPool ?? return ())
            streamPool = None
            arrayPool = None
            clearSettingsTimer()
        }
    }

    private func writeFrames() {
        if (logger.enabled(LogLevel.DEBUG)) {
            httpLogDebug(logger, "[HttpServer2#writeFrames] write thread init")
        }
        var stream = Box<Stream>(Stream())
        var toWriteFrames = ArrayList<Frame>()
        while (!quit.load()) {
            try {
                if (!writeFrame(stream, toWriteFrames)) {
                    break
                }
            } catch (e: HttpStreamException) {
                // stream must have been assigned, if HttpStreamException is thrown
                stream.value.close(e.h2Error, e.message)
            } catch (e: SocketException | ConnectionException | TlsException) {
                httpLogWarn(logger, "[HttpServer2#writeFrames] ${e}")
                shutdown(shouldSleep: false)
                break
            } catch (e: HpackException) {
                close(H2Error.CompressionError, e.message)
                break
            } catch (e: Exception) {
                close(H2Error.InternalError, e.toString())
                break
            }
        }
        if (logger.enabled(LogLevel.DEBUG)) {
            httpLogDebug(logger, "[HttpServer2#writeFrames] connection closed, write thread returned")
        }
    }

    private func writeFrame(stream: Box<Stream>, toWriteFrames: ArrayList<Frame>): Bool {
        let frame = responseQueue.receive() ?? return false

        if (frame.streamId == 0 && !(frame is UnblockFrame)) {
            postProcessGlobalFrame(frame)
            if (logger.enabled(LogLevel.DEBUG)) {
                httpLogDebug(logger, "[HttpServer2#writeFrames] write frame: ${frame}")
            }
            frame.writeTo(conn.bufferedWriter)
            return true
        }
        if (frame is WindowUpdateFrame || frame is RstStreamFrame) {
            stream.value = streams.get(frame.streamId) ?? return true
            stream.value.postProcess(frame)
            if (logger.enabled(LogLevel.DEBUG)) {
                httpLogDebug(logger, "[HttpServer2#writeFrames] write frame: ${frame}")
            }
            frame.writeTo(conn.bufferedWriter)
            return true
        }

        // DataFrame || FieldsFrame || UnblockFrame || ConnectRstStreamFrame
        // unblockFrame is added to responseQueue when receiving global WINDOW_UPDATE
        if (!(frame is UnblockFrame)) {
            toWriteFrames.add(frame)
        }
        while (!toWriteFrames.isEmpty()) {
            let waitingFrame = toWriteFrames.remove(at: 0)
            stream.value = streams.get(waitingFrame.streamId) ?? continue
            // when stream is closed, but hasn't been purged yet, for example, write timeout,
            // queued data and fields frames should be discarded, in order to avoid blocking due to connection flow control
            if (stream.value.isClosed()) {
                continue
            }
            if (!writeFrame(waitingFrame, stream.value)) {
                break
            }
        }
        return true
    }

    private func writeFrame(frame: Frame, stream: Stream): Bool {
        match (frame) {
            case f: DataFrame =>
                if (frame.payloadLen > remoteWindow.load()) {
                    if (logger.enabled(LogLevel.DEBUG)) {
                        httpLogDebug(logger, "[HttpServer2#writeFrames] no enough window on connection level")
                    }
                    return false
                }
                stream.postProcess(frame) //may throw stream exception and should not write
                remoteWindow.fetchSub(frame.payloadLen)
                f.writeTo(conn.bufferedWriter)
                f.recyclePayload(arrayPool)
                if (logger.enabled(LogLevel.DEBUG)) {
                    httpLogDebug(logger, "[HttpServer2#writeFrames] write frame: ${f}")
                }
                if (f.streamEnd) {
                    stream.writeTimer.cancel()
                    if (logger.enabled(LogLevel.DEBUG)) {
                        httpLogDebug(logger,
                            "[HttpServer2#writeFrames] cancel write timer after write body on stream ${stream.streamId}"
                        )
                    }
                }
            case f: FieldsFrame =>
                stream.postProcess(frame)
                writeFieldsFrame(f)
                if (f.streamEnd) {
                    stream.writeTimer.cancel()
                    if (logger.enabled(LogLevel.DEBUG)) {
                        httpLogDebug(logger,
                            "[HttpServer2#writeFrames] cancel write timer after write fields (headers or trailers) on stream ${stream.streamId}"
                        )
                    }
                }
            case f: ConnectRstStreamFrame =>
                stream.postProcess(f.rstFrame)
                f.rstFrame.writeTo(conn.bufferedWriter)
                if (logger.enabled(LogLevel.DEBUG)) {
                    httpLogDebug(logger, "[HttpServer2#writeFrames] write frame: ${f}")
                }
            case _ => throw HttpException("Unexpected frame: ${frame}.") //should never come here
        }
        true
    }

    private func writeFieldsFrame(frame: FieldsFrame) {
        fieldsWriter.streamId = frame.streamId
        fieldsWriter.streamEnd = frame.streamEnd
        fieldsWriter.pushId = frame.pushId
        frame.writeTo(fieldsWriter, encoder)
        if (logger.enabled(LogLevel.DEBUG)) {
            httpLogDebug(logger, "[HttpServer2#writeFieldsFrame] write frame: ${frame}")
        }
    }

    /***************************************************** get stream *****************************************************/
    /*
     * look for stream in streams
     * return tuple: (stream in streams, isPurged)
     */
    private func getStream(streamId: UInt32): (?Stream, Bool) {
        synchronized(streamsMutex) {
            let streamOp = streams.get(streamId)
            if (!streamOp.isNone()) {
                return (streamOp, false)
            }
            if (isPushStream(streamId)) {
                if (streamId <= lastPushStreamId) {
                    return (streamOp, true)
                } else {
                    return (streamOp, false)
                }
            } else {
                if (streamId <= lastClientStreamId) {
                    return (streamOp, true)
                } else {
                    return (streamOp, false)
                }
            }
        }
    }

    /*
     * return Some(v), if stream has been created successfully, and is ready to process request
     * return None, if connection is in graceful closing
     * return None, if stream has been created, but is already closed in this function
     *
     * @throws HttpConnectionException to exit read thread, if streamId is invalid
     */
    private func createClientStream(streamId: UInt32, startReadTimer!: Bool): ?Stream {
        if (closing) {
            synchronized(streamsMutex) {
                verifyClientStreamId(streamId)
            }
            return None
        }

        var stream: Stream
        synchronized(streamsMutex) {
            verifyClientStreamId(streamId)
            stream = match (streamPool) {
                case None => Stream(this, streamId)
                case Some(pool) => (pool.get() as Stream) ?? throw Exception("The pool get not Stream.")
            }
            stream.reset(this, streamId)
            streams.add(streamId, stream)
        }

        // update stream id records
        lastClientStreamId = streamId
        activeClientStreamNum.fetchAdd(1)

        // close the stream immediately, if num of client streams exceeds SettingsMaxConcurrentStreams
        // when closing, we inform client that the request hasn't been handled, and client can send it again
        if (activeClientStreamOverLimit()) {
            stream.close(RefusedStream,
                "received frame on stream ${streamId}, num of active client streams exceeds SettingsMaxConcurrentStreams, which is set by server"
            )
            return None
        }

        if (startReadTimer) {
            stream.startReadTimer()
        }
        return stream
    }

    private func activeClientStreamOverLimit(): Bool {
        return activeClientStreamNum.load() > localSettings[SettingsMaxConcurrentStreams.code] ||
            // no need to verify strictly 2 times
            (activeClientRstStreamNum.load() >> 1) > localSettings[SettingsMaxConcurrentStreams.code]
    }

    /*
     * create a push stream, id is lastPushStreamId + 2
     * it will be called only in user handler
     *
     * throw HttpException, if num of push streams exceeds MAX_STREAM_ID or num of active push streams exceeds SettingsMaxConcurrentStreams
     */
    func createPushStream(): Stream {
        synchronized(pushMutex) {
            lastPushStreamId += 2
            verifyPushStreamId(lastPushStreamId)

            let stream = Stream(this, lastPushStreamId)
            synchronized(streamsMutex) {
                streams.add(lastPushStreamId, stream)
            }

            activePushStreamNum.fetchAdd(1)
            return stream
        }
    }

    /*
     * @throws HttpConnectionException to exit read thread, if streamId is invalid
     */
    private func verifyClientStreamId(id: UInt32) {
        if (isPushStream(id)) {
            throw HttpConnectionException(ProtocolError, "Stream initiated by client should be identified with odd num.")
        }
        if (id <= lastClientStreamId) {
            throw HttpConnectionException(ProtocolError, "Id of new stream should be the largest.")
        }
        if (id > MAX_STREAM_ID) {
            throw HttpConnectionException(ProtocolError, "Stream id reaches the max, 2^31 - 1.")
        }
    }

    /*
     * @throws HttpException to user handler, if push streamId is invalid
     */
    private func verifyPushStreamId(id: UInt32) {
        if (activePushStreamOverLimit()) {
            throw HttpException(
                "Write push related to stream ${id}, num of active push streams exceeds SettingsMaxConcurrentStreams, which is set by client."
            )
        }
        if (id > MAX_STREAM_ID) {
            throw HttpException("Push stream id exceeds 2^31-1.") //to user handler
        }
    }

    private func activePushStreamOverLimit(): Bool {
        return activePushStreamNum.load() >= remoteSettings[SettingsMaxConcurrentStreams.code] ||
            // no need to verify strictly 2 times
            (activePushRstStreamNum.load() >> 1) >= remoteSettings[SettingsMaxConcurrentStreams.code]
    }

    /***************************************************** global frames *****************************************************/
    private func preProcessGlobalFrame(frame: Frame) {
        match (frame) {
            case f: SettingsFrame => onSettingsRead(f)
            case f: WindowUpdateFrame => onWindowUpdateRead(f)
            case f: GoawayFrame => onGoawayRead(f)
            case f: PriorityUpdateFrame => onPriorityUpdateRead(f)
            case f: PingFrame => onPingRead(f)
            case _ => throw HttpConnectionException(ProtocolError, "Received invalid frame on stream 0.")
        }
    }

    private func sendSettings(ack!: Bool = false): Unit {
        let frame = if (ack) {
            SettingsFrame() //settings frame with ack flag set
        } else {
            SettingsFrame(localSettings)
        }
        if (!responseQueue.send(frame, CONTROL_PRIORITY) && logger.enabled(LogLevel.DEBUG)) {
            httpLogDebug(logger, "[HttpServer2#sendSettings] connection closed, send Settings frame failed.")
        }
    }

    func sendWindowUpdate(): Unit {
        let remain = localWindow.load()
        if (remain > DEFAULT_WINDOW_SIZE / 2) {
            return
        }
        let initial = localSettings[SettingsInitialWindowSize.code]
        if (!localWindow.compareAndSwap(remain, initial)) {
            return
        }
        let frame = WindowUpdateFrame(0, initial - remain)
        if (!responseQueue.send(frame, CONTROL_PRIORITY) && logger.enabled(LogLevel.DEBUG)) {
            httpLogDebug(logger, "[HttpServer2#sendWindowUpdate] connection closed, send WINDOW_UPDATE frame failed")
        }
    }

    private func sendGoaway(h2Error: H2Error, lastProcessedId: UInt32, debugMsg: String) {
        let frame = GoawayFrame(lastProcessedId, h2Error.code, data: debugMsg.toArray())
        if (!responseQueue.send(frame, CONTROL_PRIORITY) && logger.enabled(LogLevel.DEBUG)) {
            httpLogDebug(logger, "[HttpServer2#sendGoaway] connection closed, send Goaway frame failed")
        }
    }

    private func onSettingsRead(frame: SettingsFrame): Unit {
        // ack settings, will be read only once
        if (frame.ack) {
            if (frame.payloadLen != 0) {
                throw HttpConnectionException(FrameSizeError, "Settings with ack should not have payload.")
            }
            if (!clearSettingsTimer()) {
                throw HttpConnectionException(ProtocolError, "Unexpected ack settings.")
            }

            localMaxFrameSize = server.maxFrameSize

            return
        }

        // non ack settings
        for ((k, v) in frame.settings) {
            match (getSettingByCode(k)) {
                case SettingsHeaderTableSize =>
                    // receive SETTINGS_HEADER_TABLE_SIZE by peer Decoder
                    encoder.receiveSettingsHeaderTableSize(Int64(v))
                case SettingsEnablePush =>
                    if (v != 0 && v != 1) {
                        throw HttpConnectionException(ProtocolError,
                            "Invalid client settings, SettingsEnablePush should be 0 or 1.")
                    }
                case SettingsMaxConcurrentStreams => ()
                case SettingsInitialWindowSize =>
                    if (v > MAX_WINDOW) {
                        throw HttpConnectionException(FlowControlError,
                            "Invalid client settings, SettingsInitialWindowSize should not exceeds 2^31.")
                    }
                    setActiveStreamWindow(v)
                case SettingsMaxFrameSize =>
                    if (v < UInt32(2 ** 14) || v > UInt32(2 ** 24 - 1)) {
                        throw HttpConnectionException(ProtocolError,
                            "Invalid client settings, SettingsMaxFrameSize should be in range [2^14..2^24-1].")
                    }
                case SettingsMaxHeaderListSize => encoder.maxHeaderListSize = Int64(v)
                case _ => ()
            }
            this.remoteSettings[k] = v
        }

        let fieldsBlockSize = Int64(remoteSettings[SettingsMaxFrameSize.code])
        if (fieldsWriter.blockSize != fieldsBlockSize) {
            fieldsWriter.buffer = Array<Byte>(fieldsBlockSize, repeat: 0)
            fieldsWriter.blockSize = fieldsBlockSize
        }

        sendSettings(ack: true)
    }

    // cjlint-ignore -start !G.OTH.03
    /*
     * see https://www.rfc-editor.org/rfc/rfc9113.html#InitialWindowSize
     */
    // cjlint-ignore -end
    private func setActiveStreamWindow(initialWindow: UInt32) {
        let increment = Int64(initialWindow) - Int64(this.remoteSettings[SettingsInitialWindowSize.code])

        synchronized(streamsMutex) {
            for ((_, stream) in streams) {
                if (stream.remoteWindow.load() + increment > Int64(MAX_WINDOW)) {
                    throw HttpConnectionException(FlowControlError,
                        "SettingsInitialWindowSize cause remote window of stream ${stream.streamId} exceeds max value")
                } else {
                    stream.remoteWindow.fetchAdd(increment)
                }
            }
        }
    }

    /*
     * in this implementation will not send ping
     */
    private func onPingRead(frame: PingFrame) {
        if (frame.ack) {
            return
        }
        let ping = PingFrame(isAck: true, payload: frame.payload)
        if (!responseQueue.send(ping, CONTROL_PRIORITY) && logger.enabled(LogLevel.DEBUG)) {
            httpLogDebug(logger, "[HttpServer2#onPingRead] connection closed, send Ping frame failed")
        }
    }

    private func onWindowUpdateRead(frame: WindowUpdateFrame) {
        remoteWindow.fetchAdd(frame.increment) //never overflow
        if (remoteWindow.load() > MAX_WINDOW) {
            throw HttpConnectionException(FlowControlError,
                "The WINDOW_UPDATE frame cause remote window exceeds 2^31-1 on connection level.")
        }
        // unblock write thread

        if (!responseQueue.send(UnblockFrame(), CONTROL_PRIORITY) && logger.enabled(LogLevel.DEBUG)) {
            httpLogDebug(logger, "[HttpServer2#onWindowUpdateRead] connection closed, send internal frame failed.")
        }
    }

    private func onGoawayRead(frame: GoawayFrame) {
        if (logger.enabled(LogLevel.DEBUG)) {
            httpLogDebug(logger,
                "[HttpServer2#onGoawayRead] connection closing, received goaway frame, errCode = ${frame.errorCode}, debugMsg = ${sanitizeForLog(frame.debugData)}"
            )
        }
        shutdown(shouldSleep: false)
    }

    // cjlint-ignore -start !G.OTH.03
    /*
     * the value of parameter urgency will be set on related stream
     * if the related stream has not been created yet, and the streamId is valid (odd, > lastClientStreamId, < MAX_STREAM_ID), a new Idle stream with the specified urgency will be created
     * if the related stream is already in handle or purged, the frame will be ignored
     * see https://www.rfc-editor.org/rfc/rfc9218.html
     *
     * @throws HttpConnectionException if streamId invalid
     */
    // cjlint-ignore -end
    private func onPriorityUpdateRead(frame: PriorityUpdateFrame): Unit {
        let (streamOp, isPurged) = getStream(frame.prioritizedId)
        if (streamOp.isNone() && !isPurged) {
            if (let Some(stream) <- createClientStream(frame.prioritizedId, startReadTimer: false)) {
                stream.queuePriority = MAX_URGENCY - parsePriority(frame.fieldValue)
            }
        }
    }

    private func postProcessGlobalFrame(frame: Frame) {
        match (frame) {
            case f: SettingsFrame =>
                if (!f.ack) {
                    onSettingsWrite()
                }
            case _: WindowUpdateFrame => () //nothing to do
            case _: GoawayFrame => ()
            case _: PingFrame => ()
            case _ => ()
        }
    }

    /* will be called only once, when write initial settings */
    private func onSettingsWrite() {
        settingsTimer = Timer.once(Duration.second * 10) {
            =>
            httpLogWarn(logger, "[HttpServer2#onSettingsWrite] connection closing, wait ack settings timeout")
            shutdown(shouldSleep: false)
        }
    }

    /***************************************************** closing *****************************************************/
    func purgeStream(stream: Stream, purge: Bool, rstCount: Bool): Unit {
        let id = stream.streamId
        if (purge) {
            synchronized(streamsMutex) {
                let removed = streams.remove(id)
                match (removed) {
                    case None => return
                    case Some(v) => streamPool?.put(v)
                }
                if (logger.enabled(LogLevel.DEBUG)) {
                    httpLogDebug(logger,
                        "[HttpServer2#purgeStream] purgeStream, stream id: ${id}, size of streams: ${streams.size}")
                }
            }
        }

        if (rstCount) {
            match ((isPushStream(id), stream.isRst.load())) {
                case (true, true) =>
                    if (stream.isRst.compareAndSwap(true, false)) {
                        activePushRstStreamNum.fetchSub(1)
                    }
                case (true, false) => activePushStreamNum.fetchSub(1)
                case (false, true) =>
                    if (stream.isRst.compareAndSwap(true, false)) {
                        activeClientRstStreamNum.fetchSub(1)
                    }
                case (false, false) => activeClientStreamNum.fetchSub(1)
            }
        }

        if (logger.enabled(LogLevel.TRACE)) {
            httpLogTrace(logger, "[HttpServer2#purgeStream] activePushRstStreamNum = ${activePushRstStreamNum.load()}, activeClientRstStreamNum = ${activeClientRstStreamNum.load()}.")
        }
    }

    protected func closeGracefully(): Unit {
        if (quit.load()) {
            return
        }
        if (logger.enabled(LogLevel.DEBUG)) {
            httpLogDebug(logger, "[HttpServer2#closeGracefully] closing connection gracefully")
        }
        sendGoaway(NoError, MAX_STREAM_ID, "server closing gracefully")
        waitStreamInflight()
    }

    private func waitStreamInflight() {
        sleep(Duration.second) // wait for 1s to allow client receiving goaway frame
        sendGoaway(NoError, lastClientStreamId, "server closing gracefully")
        closing = true
        waitStreamComplete()
    }

    private func waitStreamComplete() {
        for (_ in 0..10) {
            if (quit.load()) {
                return
            } // some error occurred and connection is already closed
            if (activeStreamNumIsZero()) {
                if (logger.enabled(LogLevel.DEBUG)) {
                    httpLogDebug(logger, "[HttpServer2#waitStreamComplete] connection closed gracefully")
                }
                shutdown()
            }
            sleep(Duration.second)
        }
        httpLogWarn(logger, "[HttpServer2#waitStreamComplete] graceful closing timeout")
        shutdown()
    }

    private func activeStreamNumIsZero(): Bool {
        return activeClientStreamNum.load() == 0u32 && activePushStreamNum.load() == 0u32 &&
            activeClientRstStreamNum.load() == 0u32 && activePushRstStreamNum.load() == 0u32
    }

    func addRstStreamCnt(rstFrame: RstStreamFrame): Unit {
        match (isPushStream(rstFrame.streamId)) {
            case true =>
                activePushRstStreamNum.fetchAdd(1)
                activePushStreamNum.fetchSub(1)
            case false =>
                activeClientRstStreamNum.fetchAdd(1)
                activeClientStreamNum.fetchSub(1)
        }
    }

    protected func close(): Unit {
        close(NoError, "server closing forcibly")
    }

    private func close(h2Error: H2Error, debugMsg: String) {
        httpLogWarn(logger, "[HttpServer2#close] connection closing, ${debugMsg}")
        if (quit.load()) {
            if (logger.enabled(LogLevel.TRACE)) {
                httpLogTrace(logger, "[HttpServer2#close] Already quit.")
            }
            return
        }
        sendGoaway(h2Error, this.lastClientStreamId, debugMsg)
        shutdown()
    }

    private func shutdown(shouldSleep!: Bool = true) {
        if (logger.enabled(LogLevel.TRACE)) {
            httpLogTrace(logger, "[HttpServer2#shutdown] Shutdown.")
        }
        if (quit.load()) {
            return
        }

        if (shouldSleep) { // allow client to receive goaway
            Timer.once(Duration.second) {
                =>
                quit.store(true)
                conn.close()
            }
        } else {
            quit.store(true)
            conn.close()
        }

        synchronized(streamsMutex) {
            for ((_, stream) in streams) {
                // unblock data read in handle thread
                stream.dataBBQ.close() //will cause receiving data frame return None

                // unblock handle threads, which is writing body
                stream.status = Closed
                stream.cancelAllTimers()
                stream.windowMonitor.lock()
                stream.windowMonitor.notifyAll()
                stream.windowMonitor.unlock()
            }
        }

        clearSettingsTimer()
        // unblock write thread and readBody in handler (a handle thread)
        // notify user who is writing body data handle thread, close responseQueue will cause send frame fail
        responseQueue.close()
        // unblock handle thread
        requestQueue.close()
    }

    private func clearSettingsTimer(): Bool {
        if (let Some(v) <- settingsTimer) {
            v.cancel()
            settingsTimer = None
            return true
        }
        false
    }
}