/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 * This source file is part of the Cangjie project, licensed under Apache-2.0
 * with Runtime Library Exception.
 *
 * See https://cangjie-lang.cn/pages/LICENSE for license information.
 */

package stdx.net.http

import std.net.*
import std.io.*
import std.collection.*
import std.sync.{Monitor, AtomicInt64, AtomicUInt32}
import std.convert.Parsable
import stdx.log.Logger

class ClientStream {
    let streamId: UInt32
    let engine: HttpClientEngine2
    // initialed when we use it
    var engineConn: ?Http2ClientEngineConn = None
    var streamEnd: Bool = false
    var status: Status = Idle
    // queues
    // outputQueue for request, inputQueue for response
    let inputQueue: ClosableBlockingQueue<Frame> = ClosableBlockingQueue<Frame>(Int64.Max)
    let outputQueue: SPMCLevelQueue<Frame>

    // for 100-continue
    var continueResponse: ?HttpResponse = None
    // for server push
    let pushStreams: ArrayList<Object> = ArrayList<Object>()
    var request: ?HttpRequest = None
    var response: ?HttpResponse = None
    // set true if request contains "expect: 100-continue" & request have body
    var expectContinuation: Bool = false
    // sync for 100-continue
    let continueMonitor: Monitor = Monitor()
    var respHasTrailer = false
    let logger: Logger
    let decoder: Decoder

    // flow control
    var localWindow: AtomicUInt32
    var remoteWindow: AtomicInt64
    let remoteWindowMonitor: Monitor = Monitor()

    // store header blocks, clear when read header end
    let headerfields = HeaderBlock()

    var readTimer = HttpTimer.empty
    var writeTimer = HttpTimer.empty
    var timeout: ?HttpTimeoutException = None

    init(streamId: UInt32, engine: HttpClientEngine2) {
        this.streamId = streamId
        this.engine = engine
        this.outputQueue = engine.outputQueue
        this.decoder = engine.decoder
        this.logger = engine.logger
        this.localWindow = AtomicUInt32(engine.localSettings[SettingsInitialWindowSize.code])
        this.remoteWindow = AtomicInt64(Int64(engine.remoteSettings[SettingsInitialWindowSize.code]))
        // initial status of push stream is ReservedRemote
        if (isPushStream(streamId)) {
            status = ReservedRemote
        }
    }

    /**************************************** encode request & decode response ****************************************/
    /* this func should be called only once in a stream */
    func send(req: HttpRequest): Unit {
        request = req
        match (request?.headers.getFirst("expect") ?? None) {
            case Some(v) =>
                let isContinue = v == "100-continue"
                expectContinuation = isContinue && !(req.body is HttpEmptyBody)
            case None => ()
        }
        engineConn = Http2ClientEngineConn(this)
        // start timer before write request
        writeTimer = HttpTimer(
            start: req.writeTimeout ?? engine.writeTimeout,
            task: {
                =>
                // close stream and throw exception
                close(Cancel)
                timeout = HttpTimeoutException("Client2_0 write request timeout.")
            }
        )
        try {
            writeRequest()
        } catch (e: Exception) {
            if (let Some(t) <- timeout) {
                throw t
            }
            throw e
        } finally {
            // stop timer after write request
            writeTimer.cancel()
        }
    }

    private func writeRequest(): Unit {
        httpLogDebug(logger, "[ClientStream#writeRequest] start write request of stream: ${streamId}")
        let req = request ?? throw HttpException("InternalError, request of stream ${streamId} is none.")
        let hasTrailer = !req.trailers.isEmpty()
        let bodySize = req.bodySize ?? -1

        writeHeaders(bodySize == 0 && !hasTrailer && req.method != "CONNECT")
        if (expectContinuation) {
            waitContinue()
            match (continueResponse) {
                case Some(v) =>
                    if (v.status != 100) {
                        return
                    }
                case None => () // no 100-continue after timeout
            }
        }
        httpLogDebug(logger, "[ClientStream#writeRequest] start write body of stream: ${streamId}")
        if (bodySize != 0) {
            match (req.body) {
                case rb: HttpRawBody => writeRawBody(rb, hasTrailer)
                case body: InputStream => writeNormalBody(body, hasTrailer)
            }
        }
        httpLogDebug(logger, "[ClientStream#writeRequest] finish write body of stream: ${streamId}")
        if (hasTrailer) {
            let headers = FieldsList()
            for ((k, v) in req.trailers.map) {
                headers.add((k.toString(), v.toString()))
            }
            engineConn?.writeHeaders(headers, streamEnd: true)
            httpLogDebug(logger, "[ClientStream#writeRequest] finish write trailer of stream: ${streamId}")
        }
    }

    private func writeRawBody(rb: HttpRawBody, hasTrailer: Bool): Unit {
        var remaining = rb.length
        httpLogDebug(logger, "[ClientStream#writeRequest] writing body size = ${remaining}")
        var index = 0
        var maxFrameSize = min(Int64(engine.remoteSettings[SettingsMaxFrameSize.code]), remoteWindow.load())
        while (remaining > maxFrameSize) {
            if (status == Closed || (engineConn?.quit ?? false)) {
                return
            }
            if (maxFrameSize <= 0) {
                maxFrameSize = blockForWindowUpdate()
                continue
            }
            httpLogDebug(logger,
                "[ClientStream#writeRequest] maxFrameSize=${maxFrameSize} index=${index} remoteWindow=${remoteWindow.load()}"
            )
            engineConn?.writeBody(rb.rawBody[index..maxFrameSize + index], false)
            remaining -= maxFrameSize
            index += maxFrameSize
            maxFrameSize = min(Int64(engine.remoteSettings[SettingsMaxFrameSize.code]), remoteWindow.load())
        }
        httpLogDebug(logger,
            "[ClientStream#writeRequest] maxFrameSize=${maxFrameSize} index=${index} remoteWindow=${remoteWindow.load()}"
        )
        engineConn?.writeBody(rb.rawBody[index..], !hasTrailer)
    }

    private func writeNormalBody(body: InputStream, hasTrailer: Bool): Unit {
        var maxFrameSize = min(Int64(engine.remoteSettings[SettingsMaxFrameSize.code]), remoteWindow.load())
        while (true) {
            let buf = Array<UInt8>(maxFrameSize, repeat: 0)
            let len = body.read(buf)
            if (len <= 0) {
                break
            }
            if (status == Closed || (engineConn?.quit ?? false)) {
                return
            }
            engineConn?.writeBody(buf[0..len], false)
            maxFrameSize = min(Int64(engine.remoteSettings[SettingsMaxFrameSize.code]), remoteWindow.load())
            if (maxFrameSize == 0) {
                maxFrameSize = min(blockForWindowUpdate(), buf.size)
            }
        }
        if (!hasTrailer) {
            engineConn?.writeBody("".toArray(), true)
        }
    }

    /* block when remote window is zero, and return new max frame size */
    private func blockForWindowUpdate(): Int64 {
        while (remoteWindow.load() <= 0 && status != Closed && !(engineConn?.quit ?? false)) {
            remoteWindowMonitor.lock()
            httpLogDebug(logger,
                "[ClientStream#blockForWindowUpdate] stream wait for window update, current window:${remoteWindow.load()}, stream.status: ${status}"
            )
            remoteWindowMonitor.wait(timeout: Duration.millisecond * 10)
            remoteWindowMonitor.unlock()
        }

        let maxFrameSize = min(Int64(engine.remoteSettings[SettingsMaxFrameSize.code]), remoteWindow.load())
        if (maxFrameSize == 0 && status != Closed && !(engineConn?.quit ?? false)) {
            throw HttpStreamException(FlowControlError, "Client2_0 stream remote window is zero.")
        }
        return maxFrameSize
    }

    private func writeHeaders(streamEnd: Bool): Unit {
        let headers = FieldsList()
        let req = request.getOrThrow()
        let host = req.headers.getFirst("host") ?? throw HttpException("Host not set.")
        let isWebsocket: Bool = match (req.headers.getFirst(":protocol")) {
            case Some(v) =>
                headers.add((":protocol", v))
                v == "websocket"
            case None => false
        }
        let authority = if (req.url.port.isEmpty()) {
            req.url.host + ":443"
        } else {
            req.url.host
        }
        var path: String
        // use proxy
        if (host != authority) {
            path = req.url.toString()
        } else if (req.url.path.isEmpty() && req.method == "OPTIONS") {
            path = ASTERISK
        } else {
            path = canonicalPath(req.url.path)
            if (let Some(query) <- req.url.query) {
                path += "?${query}"
            }
            if (let Some(fragment) <- req.url.fragment) {
                path += "#${fragment}"
            }
        }
        headers.add((":authority", authority))
        headers.add((":method", req.method))
        if (req.method != "CONNECT" || isWebsocket) {
            headers.add((":scheme", "https"))
            headers.add((":path", path))
        }

        for ((k, v) in req.headers.map) {
            if (H2_EXCLUDE_HEADERS.contains(k.toString())) {
                continue
            }
            // we don't send content-length header
            if (k == Str("content-length") || k.startWith(b':')) {
                continue
            }

            headers.add((k.toString(), v.toString()))
        }
        engineConn?.writeHeaders(headers, streamEnd: streamEnd)
        httpLogDebug(logger, "[ClientStream#writeHeaders] finish write header of stream: ${streamId}")
    }

    /* wait up to 1s, if receive response end waiting immediately */
    private func waitContinue() {
        continueMonitor.lock()
        continueMonitor.wait(timeout: Duration.second)
        continueMonitor.unlock()
    }

    private func notifyContinue() {
        continueMonitor.lock()
        continueMonitor.notifyAll()
        continueMonitor.unlock()
    }

    /* use only when this stream is a push promised stream */
    func constructPushResponse(): HttpResponse {
        if (let Some(rsp) <- response) {
            return rsp
        }
        if (engineConn.isNone()) {
            engineConn = Http2ClientEngineConn(this)
        }

        request = decodeRequest()
        response = constructResponseInternal()
        response?._request = request.getOrThrow()
        return response.getOrThrow()
    }

    func constructResponse(): HttpResponse {
        // start readTimer
        // stop timer when stream end
        readTimer = HttpTimer(
            start: (request?.readTimeout ?? None) ?? engine.readTimeout,
            task: {
                =>
                close(StreamClosed)
                timeout = HttpTimeoutException("Client2_0 receive response for stream ${streamId} timeout.")
            }
        )

        if (expectContinuation) {
            httpLogDebug(logger, "[ClientStream#constructResponse] read continuation response")
            let resp = constructResponseInternal(continuation: true)
            expectContinuation = false
            // server skip 100-continue or meet error
            match (resp.status) {
                case 100 => continueResponse = resp
                case _ =>
                    resp._body = Http2ClientBodyProvider(engineConn.getOrThrow(), false)
                    response = resp
                    return resp
            }
        }
        response = constructResponseInternal()
        return response.getOrThrow()
    }

    private func constructResponseInternal(continuation!: Bool = false): HttpResponse {
        // block on decodeResponseHeader
        let (headers, status, hasBody) = try {
            decodeResponseHeader()
        } catch (e: Exception) {
            if (let Some(t) <- timeout) {
                throw t
            }
            readTimer.cancel()
            throw e
        }
        let req = request.getOrThrow()
        let isConnect = (req.method == "CONNECT" && status > 199 && status < 300)
        if (isConnect) {
            readTimer.cancel()
        }
        let body: InputStream = match {
            case isConnect && !isWebsocket() => engineConn.getOrThrow()
            case req.method != "HEAD" && hasBody && !continuation => Http2ClientBodyProvider(engineConn.getOrThrow(),
                isConnect)
            case _ => HttpEmptyBody.INSTANCE
        }

        let response = HttpResponseBuilder()
            .status(status)
            .version(HTTP2_0)
            .setHeaders(headers)
            .body(body)
            .request(req)
            .build()
        if (engine.localSettings[SettingsEnablePush.code] == 1) {
            response.pushResponses = pushStreams
        }
        return response
    }

    private func isWebsocket(): Bool {
        let req = request.getOrThrow()
        let ws = req.headers.getFirst(":protocol") ?? ""
        return ws == "websocket"
    }

    /* for push request, must no body */
    private func decodeRequest(): HttpRequest {
        // this request is from push frame, which doesn't have streamEnd flag
        let frame = match (inputQueue.dequeue().getOrThrow({=> HttpException("Response queue closed.")}) as FieldsFrame) {
            case Some(v) => v
            case None => throw HttpException("Push request lost.")
        }
        let headers = HttpHeaders()
        let requestBuilder = HttpRequestBuilder()

        var index = 1
        for ((key, values) in frame.fields) {
            match (key) {
                case ":authority" => headers.add("host", values)
                case ":path" => requestBuilder.url(values)
                case ":scheme" =>
                    if (values != "https") {
                        throw HttpException("Scheme must be https!")
                    }
                case ":method" => requestBuilder.method(values)
                case _ =>
                    if (H2_EXCLUDE_HEADERS.contains(key)) {
                        continue
                    }
                    let value = values.split(",")
                    for (v in value) {
                        headers.add(key, v)
                    }
            }
            index++
        }
        requestBuilder.setHeaders(headers).version(HTTP2_0)
        return requestBuilder.build()
    }

    /* headers, response status, hasBody */
    private func decodeResponseHeader(): (HttpHeaders, UInt16, Bool) {
        httpLogDebug(logger, "[ClientStream#decodeResponseHeader] start decode headers")
        let frame = match (inputQueue.dequeue().getOrThrow({=> HttpException("Stream closed.")}) as FieldsFrame) {
            case Some(v) => v
            case None => throw HttpException("Response header lost.")
        }
        let headers = HttpHeaders()
        var status: UInt16 = 0
        httpLogDebug(logger, "[ClientStream#decodeResponseHeader] decode header ok")

        var index = 1
        for ((key, values) in frame.fields) {
            if (index != 1 && key.startsWith(":")) {
                throw HttpException("Malformed response.")
            }
            if (key == ":status") {
                status = UInt16.parse(values)
                continue
            }
            if (H2_EXCLUDE_HEADERS.contains(key)) {
                continue
            }
            if (key == "trailer") {
                respHasTrailer = true
            }
            // some header value contains comma, cannot split
            headers.add(key, values)
            index++
        }
        if (status == 0) {
            throw HttpException("Malformed response.")
        }
        return (headers, status, !frame.streamEnd)
    }

    func decodeTrailer(headerList: FieldsList): Unit {
        let trailers = HttpHeaders()

        for ((key, values) in headerList) {
            trailers.add(key, values)
        }
        let rsp = response.getOrThrow()
        checkTrailer(rsp)
        rsp._trailers = trailers
    }

    /**************************************** status machine ****************************************/
    /* process stream status before write frames */
    func preProcess(frame: Frame): Unit {
        match (frame) {
            case frame: DataFrame => onDataWrite(frame)
            case frame: FieldsFrame => onFieldsWrite(frame)
            case _: RstStreamFrame => onRstWrite()
            case _: WindowUpdateFrame => ()
            case _ =>
                close(ProtocolError)
                throw HttpStreamException(ProtocolError, "Unexpected frame:${frame}.")
        }
    }

    private func onDataWrite(frame: DataFrame): Unit {
        match (status) {
            case Open =>
                if (frame.streamEnd) {
                    status = HalfClosedLocal
                }
            case HalfClosedRemote =>
                if (frame.streamEnd) {
                    status = Closed
                    engine.purgeStream(streamId)
                }
            case _ => throw HttpConnectionException(ProtocolError, "Write data in ${status} status.")
        }
    }

    private func onFieldsWrite(frame: FieldsFrame): Unit {
        match (status) {
            case Idle =>
                if (frame.streamEnd) {
                    status = HalfClosedLocal
                } else {
                    status = Open
                }
            case Open =>
                if (request?.trailers.isEmpty() ?? false) {
                    throw HttpConnectionException(ProtocolError,
                        "Send header in Open state, but request don't have trailer.")
                }
                if (!frame.streamEnd) {
                    throw HttpConnectionException(ProtocolError, "Trailer must have streamEnd flag.")
                }
                status = HalfClosedLocal
            case HalfClosedRemote =>
                if (request?.trailers.isEmpty() ?? false) {
                    throw HttpConnectionException(ProtocolError,
                        "Send header in Open state, but request don't have trailer.")
                }
                if (!frame.streamEnd) {
                    throw HttpConnectionException(ProtocolError, "Trailer must have streamEnd flag.")
                }
                status = Closed
                engine.purgeStream(streamId)
            case _ => throw HttpConnectionException(ProtocolError,
                "Headers frame can only send in Idle or Open or HalfClosedRemote.")
        }
    }

    /* write reset come from stream.close, only happen in user thread */
    private func onRstWrite(): Unit {
        status = Closed
        writeTimer.cancel()
        readTimer.cancel()
        engine.purgeStream(streamId)
        closeConn()
    }

    /* process stream status after receive frames */
    func postProcess(frame: Frame): Unit {
        match (frame) {
            case frame: FieldsFrame => onFieldsRead(frame)
            case frame: DataFrame => onDataRead(frame)
            case frame: RstStreamFrame => onRstRead(frame)
            case frame: WindowUpdateFrame => onWindowUpdateRead(frame)
            case _ => throw HttpStreamException(ProtocolError, "Unexpected frame: ${frame}.")
        }
        if ((status == Closed || status == HalfClosedRemote) && !inputQueue.isClosed()) {
            inputQueue.close()
        }
    }

    private func onDataRead(frame: DataFrame): Unit {
        streamEnd = frame.streamEnd
        if (streamEnd) {
            readTimer.cancel()
        }
        match (status) {
            case Open =>
                if (streamEnd) {
                    status = HalfClosedRemote
                }
            case HalfClosedLocal =>
                if (streamEnd) {
                    status = Closed
                    engine.purgeStream(streamId)
                }
            case HalfClosedRemote | Closed => throw HttpStreamException(StreamClosed, "Receive data when remote closed.")
            case _ => throw HttpConnectionException(ProtocolError, "Receive data in state ${status}.")
        }
        if (localWindow.load() < 5 * 1024 * 1024) {
            sendWindowUpdate(5 * 1024 * 1024)
        }
        localWindow.fetchSub(frame.payloadLen)
        inputQueue.enqueue(frame)
    }

    private func sendWindowUpdate(increment: UInt32): Unit {
        if (increment == 0) {
            return
        }
        localWindow.fetchAdd(increment)
        outputQueue.send(WindowUpdateFrame(streamId, increment), CONTROL_PRIORITY)
    }

    private func onFieldsRead(frame: FieldsFrame): Unit {
        streamEnd = frame.streamEnd
        if (streamEnd) {
            readTimer.cancel()
        }
        if (expectContinuation) {
            notifyContinue()
        }

        if (frame.pushId == 0) {
            onHeadersRead()
            inputQueue.enqueue(frame)
        } else {
            onPushRead()
            let pushStream = engine
                .getStream(frame.pushId)
                .getOrThrow({
                    => HttpException("Stream not found, id:${frame.pushId}.")
                })
            let fieldsFrame = FieldsFrame(frame.pushId, frame.fields)
            pushStream.inputQueue.enqueue(fieldsFrame)
        }
    }

    private func onHeadersRead(): Unit {
        match (status) {
            case Open =>
                if (streamEnd) {
                    status = HalfClosedRemote
                }
            case HalfClosedLocal =>
                if (streamEnd) {
                    status = Closed
                    engine.purgeStream(streamId)
                }
            // this case will occur when client receives PushPromise
            case ReservedRemote =>
                status = HalfClosedLocal
                if (streamEnd) {
                    status = Closed
                    engine.purgeStream(streamId)
                }
            case Closed => throw HttpConnectionException(StreamClosed, "Receive headers frame on state Closed.")
            case _ => throw HttpConnectionException(ProtocolError, "Receive headers frame on state ${status}.")
        }
    }

    private func onRstRead(frame: RstStreamFrame): Unit {
        httpLogDebug(logger, "[ClientStream#onRstRead] stream received reset: ${frame}")
        match (status) {
            case Idle => throw HttpConnectionException(ProtocolError, "Read rstStream on idle stream.")
            case _ =>
                status = Closed
                writeTimer.cancel()
                readTimer.cancel()
                engine.purgeStream(frame.streamId)
                closeConn()
        }
    }

    /* http_client will throw push if setting not allowed */
    private func onPushRead(): Unit {
        match (status) {
            case Idle => throw HttpConnectionException(ProtocolError, "Idle stream received push.")
            case HalfClosedLocal | Open => ()
            case HalfClosedRemote => throw HttpStreamException(StreamClosed, "Push frame read in ${status} status.")
            case Closed => throw HttpConnectionException(StreamClosed, "Push frame read in ${status} status.")
            case _ => throw HttpConnectionException(ProtocolError, "Push frame read in ${status} status.")
        }
    }

    private func onWindowUpdateRead(frame: WindowUpdateFrame): Unit {
        match (status) {
            case Idle => throw HttpConnectionException(ProtocolError, "Read window update frame on idle stream.")
            case _ => ()
        }
        if (remoteWindow.load() + Int64(frame.increment) > Int64(MAX_WINDOW)) {
            throw HttpConnectionException(FlowControlError, "Window over flow.")
        }
        remoteWindow.fetchAdd(Int64(frame.increment))
        // locked at func writeRequest write body
        remoteWindowMonitor.lock()
        remoteWindowMonitor.notifyAll()
        remoteWindowMonitor.unlock()
    }

    /**************************************** closing  ****************************************/
    func close(code: H2Error) {
        // shutdown all thread, change status
        outputQueue.send(RstStreamFrame(streamId, code.code), CONTROL_PRIORITY)
        closeConn()
    }

    func closeConn() {
        engineConn?.close()
        remoteWindowMonitor.lock()
        remoteWindowMonitor.notifyAll()
        remoteWindowMonitor.unlock()
    }
}

class Http2ClientEngineConn <: StreamingSocket {
    let stream: ClientStream
    let streamId: UInt32
    let inputQueue: ClosableBlockingQueue<Frame>
    let outputQueue: SPMCLevelQueue<Frame>

    var quit = false
    var bodyReadEnd = false
    var remaining: BytesIOStream = BytesIOStream()
    let logger: Logger

    init(stream: ClientStream) {
        this.stream = stream
        this.streamId = stream.streamId
        this.inputQueue = stream.inputQueue
        this.outputQueue = stream.outputQueue
        this.logger = stream.logger
    }

    public prop localAddress: SocketAddress {
        get() {
            return stream.engine.conn.socket.localAddress
        }
    }

    public prop remoteAddress: SocketAddress {
        get() {
            return stream.engine.conn.socket.remoteAddress
        }
    }

    public mut prop readTimeout: ?Duration {
        get() {
            return None
        }
        set(_) {
            throw HttpException("Should not set timeout on h2 stream.")
        }
    }

    public mut prop writeTimeout: ?Duration {
        get() {
            return None
        }
        set(_) {
            throw HttpException("Should not set timeout on h2 stream.")
        }
    }

    /* read raw bytes, normally used in upgraded protocol, e.g. websocket. */
    public func read(buf: Array<UInt8>): Int64 {
        if (buf.size == 0) {
            throw HttpException("Read use empty buf.")
        }
        if (remaining.remainLength > 0) {
            return remaining.read(buf)
        }
        let frame = inputQueue.dequeue()
        match (frame) {
            case Some(v) => match (v) {
                case f: DataFrame =>
                    // not enough
                    return if (f.data.size <= buf.size) {
                    f.data.copyTo(buf, 0, 0, f.data.size)
                    f.data.size
                } else {
                    f.data.copyTo(buf, 0, 0, buf.size)
                    remaining.write(f.data[buf.size..])
                    buf.size
                }
                case _ =>
                    quit = true
                    throw HttpStreamException(ProtocolError, "The ws meet unexpected frame ${frame}.")
            }
            case None =>
                quit = true
                throw HttpException("Failed to fetch data, the connection is closed.")
        }
    }

    /* write raw bytes, normally used in upgraded protocol, e.g. websocket. */
    public func write(bytes: Array<UInt8>): Unit {
        writeBody(bytes, false)
    }

    /* for websocket close*/
    func writeWebSocketReset(): Unit {
        outputQueue.send(RstStreamFrame(streamId, Cancel.code), MESSAGE_PRIORITY)
    }

    /* receive field blocks (from encoder), send frames */
    func writeHeaders(fieldBlock: FieldsList, streamEnd!: Bool = false): Unit {
        if (quit) {
            return
        }
        let fieldsFrame = FieldsFrame(streamId, fieldBlock, last: streamEnd)
        if (!outputQueue.send(fieldsFrame, MESSAGE_PRIORITY)) {
            throw HttpException("OutputQueue closed while writing headers.")
        }
    }

    func writeBody(bytes: Array<UInt8>, streamEnd: Bool): Unit {
        if (quit) {
            return
        }
        let data = DataFrame(streamId, bytes, bytes.size, last: streamEnd)
        if (!outputQueue.send(data, MESSAGE_PRIORITY)) {
            throw HttpException("OutputQueue closed while writing body.")
        }
        if (bytes.size > 0) {
            stream.remoteWindow.fetchSub(bytes.size)
        }
    }

    func readBody(buf: Array<UInt8>): Int64 {
        if (buf.size == 0) {
            throw HttpException("Read buffer size can not be zero!")
        }
        if (bodyReadEnd && remaining.remainLength == 0) {
            return 0
        }
        if (remaining.remainLength > 0) {
            return remaining.read(buf)
        }
        let frame = match (inputQueue.dequeue()) {
            case Some(v) => v
            case None =>
                if (let Some(t) <- stream.timeout) {
                    throw t
                }
                stream.readTimer.cancel()
                throw HttpException("Failed to fetch data, the connection or stream is closed.")
        }
        match (frame) {
            case f: DataFrame =>
                bodyReadEnd = f.streamEnd
                let copyLen = min(buf.size, f.size)
                // receive empty data frame but stream not end, there must be trailer
                if (copyLen == 0) {
                    if (!f.streamEnd) {
                        return readBody(buf)
                    } else {
                        return 0
                    }
                }
                f.data.copyTo(buf, 0, 0, copyLen)
                if (f.size > copyLen) {
                    remaining.write(f.data[copyLen..])
                }
                return copyLen
            case f: FieldsFrame =>
                // read and decode trailer after read body end
                // if no body but have trailer, must call body.read to get trailer
                bodyReadEnd = true
                stream.decodeTrailer(f.fields)
                return 0
            case _ => throw HttpStreamException(ProtocolError, "Meet unexpected frame: ${frame}.")
        }
    }

    public func close(): Unit {
        // can not receive data if inputQueue closed
        if (!inputQueue.isClosed()) {
            inputQueue.close()
        }
        // cannot close outputQueue, it`s belong to connection
        quit = true
    }

    public func isClosed(): Bool {
        quit
    }

    public func toString(): String {
        "Client Stream"
    }
}

class Http2ClientBodyProvider <: InputStream & WebSocketConn & Resource {
    var readEnd = false
    let isConnect: Bool
    let conn: Http2ClientEngineConn
    init(conn: Http2ClientEngineConn, isConnect: Bool) {
        this.conn = conn
        this.isConnect = isConnect
    }

    public func read(buf: Array<Byte>): Int64 {
        if (isConnect) {
            throw HttpException("Call readRaw & writeRaw for tunnel.")
        }
        if (readEnd) {
            return 0
        }
        let len = conn.readBody(buf)
        readEnd = len == 0
        return len
    }

    public func readRaw(byteArray: Array<UInt8>): Int64 {
        if (!isConnect) {
            throw HttpException("Not Connect response, call read.")
        }
        try {
            var len: Int64 = 0
            while (len < byteArray.size) {
                let singleReadLen = conn.read(byteArray[len..])
                if (singleReadLen == 0) {
                    break
                }
                len += singleReadLen
            }
            return len
        } catch (e: Exception) {
            throw ConnectionException("Connection closed.")
        }
    }

    public func writeRaw(byteArray: Array<UInt8>): Unit {
        if (!isConnect) {
            throw HttpException("Not Connect response, call read.")
        }
        if (conn.quit) {
            throw ConnectionException("Connection closed.")
        }
        conn.write(byteArray)
    }

    public func close(): Unit {
        if (conn.quit) {
            return
        }
        conn.writeWebSocketReset()
        conn.close()
    }

    public func isClosed(): Bool {
        conn.quit
    }
}