/*
* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* This source file is part of the Cangjie project, licensed under Apache-2.0
* with Runtime Library Exception.
*
* See https://cangjie-lang.cn/pages/LICENSE for license information.
*/
package stdx.net.http
import std.net.*
import std.io.*
import std.collection.*
import std.sync.{Monitor, AtomicInt64, AtomicUInt32}
import std.convert.Parsable
import stdx.log.Logger
class ClientStream {
let streamId: UInt32
let engine: HttpClientEngine2
// initialed when we use it
var engineConn: ?Http2ClientEngineConn = None
var streamEnd: Bool = false
var status: Status = Idle
// queues
// outputQueue for request, inputQueue for response
let inputQueue: ClosableBlockingQueue<Frame> = ClosableBlockingQueue<Frame>(Int64.Max)
let outputQueue: SPMCLevelQueue<Frame>
// for 100-continue
var continueResponse: ?HttpResponse = None
// for server push
let pushStreams: ArrayList<Object> = ArrayList<Object>()
var request: ?HttpRequest = None
var response: ?HttpResponse = None
// set true if request contains "expect: 100-continue" & request have body
var expectContinuation: Bool = false
// sync for 100-continue
let continueMonitor: Monitor = Monitor()
var respHasTrailer = false
let logger: Logger
let decoder: Decoder
// flow control
var localWindow: AtomicUInt32
var remoteWindow: AtomicInt64
let remoteWindowMonitor: Monitor = Monitor()
// store header blocks, clear when read header end
let headerfields = HeaderBlock()
var readTimer = HttpTimer.empty
var writeTimer = HttpTimer.empty
var timeout: ?HttpTimeoutException = None
init(streamId: UInt32, engine: HttpClientEngine2) {
this.streamId = streamId
this.engine = engine
this.outputQueue = engine.outputQueue
this.decoder = engine.decoder
this.logger = engine.logger
this.localWindow = AtomicUInt32(engine.localSettings[SettingsInitialWindowSize.code])
this.remoteWindow = AtomicInt64(Int64(engine.remoteSettings[SettingsInitialWindowSize.code]))
// initial status of push stream is ReservedRemote
if (isPushStream(streamId)) {
status = ReservedRemote
}
}
/**************************************** encode request & decode response ****************************************/
/* this func should be called only once in a stream */
func send(req: HttpRequest): Unit {
request = req
match (request?.headers.getFirst("expect") ?? None) {
case Some(v) =>
let isContinue = v == "100-continue"
expectContinuation = isContinue && !(req.body is HttpEmptyBody)
case None => ()
}
engineConn = Http2ClientEngineConn(this)
// start timer before write request
writeTimer = HttpTimer(
start: req.writeTimeout ?? engine.writeTimeout,
task: {
=>
// close stream and throw exception
close(Cancel)
timeout = HttpTimeoutException("Client2_0 write request timeout.")
}
)
try {
writeRequest()
} catch (e: Exception) {
if (let Some(t) <- timeout) {
throw t
}
throw e
} finally {
// stop timer after write request
writeTimer.cancel()
}
}
private func writeRequest(): Unit {
httpLogDebug(logger, "[ClientStream#writeRequest] start write request of stream: ${streamId}")
let req = request ?? throw HttpException("InternalError, request of stream ${streamId} is none.")
let hasTrailer = !req.trailers.isEmpty()
let bodySize = req.bodySize ?? -1
writeHeaders(bodySize == 0 && !hasTrailer && req.method != "CONNECT")
if (expectContinuation) {
waitContinue()
match (continueResponse) {
case Some(v) =>
if (v.status != 100) {
return
}
case None => () // no 100-continue after timeout
}
}
httpLogDebug(logger, "[ClientStream#writeRequest] start write body of stream: ${streamId}")
if (bodySize != 0) {
match (req.body) {
case rb: HttpRawBody => writeRawBody(rb, hasTrailer)
case body: InputStream => writeNormalBody(body, hasTrailer)
}
}
httpLogDebug(logger, "[ClientStream#writeRequest] finish write body of stream: ${streamId}")
if (hasTrailer) {
let headers = FieldsList()
for ((k, v) in req.trailers.map) {
headers.add((k.toString(), v.toString()))
}
engineConn?.writeHeaders(headers, streamEnd: true)
httpLogDebug(logger, "[ClientStream#writeRequest] finish write trailer of stream: ${streamId}")
}
}
private func writeRawBody(rb: HttpRawBody, hasTrailer: Bool): Unit {
var remaining = rb.length
httpLogDebug(logger, "[ClientStream#writeRequest] writing body size = ${remaining}")
var index = 0
var maxFrameSize = min(Int64(engine.remoteSettings[SettingsMaxFrameSize.code]), remoteWindow.load())
while (remaining > maxFrameSize) {
if (status == Closed || (engineConn?.quit ?? false)) {
return
}
if (maxFrameSize <= 0) {
maxFrameSize = blockForWindowUpdate()
continue
}
httpLogDebug(logger,
"[ClientStream#writeRequest] maxFrameSize=${maxFrameSize} index=${index} remoteWindow=${remoteWindow.load()}"
)
engineConn?.writeBody(rb.rawBody[index..maxFrameSize + index], false)
remaining -= maxFrameSize
index += maxFrameSize
maxFrameSize = min(Int64(engine.remoteSettings[SettingsMaxFrameSize.code]), remoteWindow.load())
}
httpLogDebug(logger,
"[ClientStream#writeRequest] maxFrameSize=${maxFrameSize} index=${index} remoteWindow=${remoteWindow.load()}"
)
engineConn?.writeBody(rb.rawBody[index..], !hasTrailer)
}
private func writeNormalBody(body: InputStream, hasTrailer: Bool): Unit {
var maxFrameSize = min(Int64(engine.remoteSettings[SettingsMaxFrameSize.code]), remoteWindow.load())
while (true) {
let buf = Array<UInt8>(maxFrameSize, repeat: 0)
let len = body.read(buf)
if (len <= 0) {
break
}
if (status == Closed || (engineConn?.quit ?? false)) {
return
}
engineConn?.writeBody(buf[0..len], false)
maxFrameSize = min(Int64(engine.remoteSettings[SettingsMaxFrameSize.code]), remoteWindow.load())
if (maxFrameSize == 0) {
maxFrameSize = min(blockForWindowUpdate(), buf.size)
}
}
if (!hasTrailer) {
engineConn?.writeBody("".toArray(), true)
}
}
/* block when remote window is zero, and return new max frame size */
private func blockForWindowUpdate(): Int64 {
while (remoteWindow.load() <= 0 && status != Closed && !(engineConn?.quit ?? false)) {
remoteWindowMonitor.lock()
httpLogDebug(logger,
"[ClientStream#blockForWindowUpdate] stream wait for window update, current window:${remoteWindow.load()}, stream.status: ${status}"
)
remoteWindowMonitor.wait(timeout: Duration.millisecond * 10)
remoteWindowMonitor.unlock()
}
let maxFrameSize = min(Int64(engine.remoteSettings[SettingsMaxFrameSize.code]), remoteWindow.load())
if (maxFrameSize == 0 && status != Closed && !(engineConn?.quit ?? false)) {
throw HttpStreamException(FlowControlError, "Client2_0 stream remote window is zero.")
}
return maxFrameSize
}
private func writeHeaders(streamEnd: Bool): Unit {
let headers = FieldsList()
let req = request.getOrThrow()
let host = req.headers.getFirst("host") ?? throw HttpException("Host not set.")
let isWebsocket: Bool = match (req.headers.getFirst(":protocol")) {
case Some(v) =>
headers.add((":protocol", v))
v == "websocket"
case None => false
}
let authority = if (req.url.port.isEmpty()) {
req.url.host + ":443"
} else {
req.url.host
}
var path: String
// use proxy
if (host != authority) {
path = req.url.toString()
} else if (req.url.path.isEmpty() && req.method == "OPTIONS") {
path = ASTERISK
} else {
path = canonicalPath(req.url.path)
if (let Some(query) <- req.url.query) {
path += "?${query}"
}
if (let Some(fragment) <- req.url.fragment) {
path += "#${fragment}"
}
}
headers.add((":authority", authority))
headers.add((":method", req.method))
if (req.method != "CONNECT" || isWebsocket) {
headers.add((":scheme", "https"))
headers.add((":path", path))
}
for ((k, v) in req.headers.map) {
if (H2_EXCLUDE_HEADERS.contains(k.toString())) {
continue
}
// we don't send content-length header
if (k == Str("content-length") || k.startWith(b':')) {
continue
}
headers.add((k.toString(), v.toString()))
}
engineConn?.writeHeaders(headers, streamEnd: streamEnd)
httpLogDebug(logger, "[ClientStream#writeHeaders] finish write header of stream: ${streamId}")
}
/* wait up to 1s, if receive response end waiting immediately */
private func waitContinue() {
continueMonitor.lock()
continueMonitor.wait(timeout: Duration.second)
continueMonitor.unlock()
}
private func notifyContinue() {
continueMonitor.lock()
continueMonitor.notifyAll()
continueMonitor.unlock()
}
/* use only when this stream is a push promised stream */
func constructPushResponse(): HttpResponse {
if (let Some(rsp) <- response) {
return rsp
}
if (engineConn.isNone()) {
engineConn = Http2ClientEngineConn(this)
}
request = decodeRequest()
response = constructResponseInternal()
response?._request = request.getOrThrow()
return response.getOrThrow()
}
func constructResponse(): HttpResponse {
// start readTimer
// stop timer when stream end
readTimer = HttpTimer(
start: (request?.readTimeout ?? None) ?? engine.readTimeout,
task: {
=>
close(StreamClosed)
timeout = HttpTimeoutException("Client2_0 receive response for stream ${streamId} timeout.")
}
)
if (expectContinuation) {
httpLogDebug(logger, "[ClientStream#constructResponse] read continuation response")
let resp = constructResponseInternal(continuation: true)
expectContinuation = false
// server skip 100-continue or meet error
match (resp.status) {
case 100 => continueResponse = resp
case _ =>
resp._body = Http2ClientBodyProvider(engineConn.getOrThrow(), false)
response = resp
return resp
}
}
response = constructResponseInternal()
return response.getOrThrow()
}
private func constructResponseInternal(continuation!: Bool = false): HttpResponse {
// block on decodeResponseHeader
let (headers, status, hasBody) = try {
decodeResponseHeader()
} catch (e: Exception) {
if (let Some(t) <- timeout) {
throw t
}
readTimer.cancel()
throw e
}
let req = request.getOrThrow()
let isConnect = (req.method == "CONNECT" && status > 199 && status < 300)
if (isConnect) {
readTimer.cancel()
}
let body: InputStream = match {
case isConnect && !isWebsocket() => engineConn.getOrThrow()
case req.method != "HEAD" && hasBody && !continuation => Http2ClientBodyProvider(engineConn.getOrThrow(),
isConnect)
case _ => HttpEmptyBody.INSTANCE
}
let response = HttpResponseBuilder()
.status(status)
.version(HTTP2_0)
.setHeaders(headers)
.body(body)
.request(req)
.build()
if (engine.localSettings[SettingsEnablePush.code] == 1) {
response.pushResponses = pushStreams
}
return response
}
private func isWebsocket(): Bool {
let req = request.getOrThrow()
let ws = req.headers.getFirst(":protocol") ?? ""
return ws == "websocket"
}
/* for push request, must no body */
private func decodeRequest(): HttpRequest {
// this request is from push frame, which doesn't have streamEnd flag
let frame = match (inputQueue.dequeue().getOrThrow({=> HttpException("Response queue closed.")}) as FieldsFrame) {
case Some(v) => v
case None => throw HttpException("Push request lost.")
}
let headers = HttpHeaders()
let requestBuilder = HttpRequestBuilder()
var index = 1
for ((key, values) in frame.fields) {
match (key) {
case ":authority" => headers.add("host", values)
case ":path" => requestBuilder.url(values)
case ":scheme" =>
if (values != "https") {
throw HttpException("Scheme must be https!")
}
case ":method" => requestBuilder.method(values)
case _ =>
if (H2_EXCLUDE_HEADERS.contains(key)) {
continue
}
let value = values.split(",")
for (v in value) {
headers.add(key, v)
}
}
index++
}
requestBuilder.setHeaders(headers).version(HTTP2_0)
return requestBuilder.build()
}
/* headers, response status, hasBody */
private func decodeResponseHeader(): (HttpHeaders, UInt16, Bool) {
httpLogDebug(logger, "[ClientStream#decodeResponseHeader] start decode headers")
let frame = match (inputQueue.dequeue().getOrThrow({=> HttpException("Stream closed.")}) as FieldsFrame) {
case Some(v) => v
case None => throw HttpException("Response header lost.")
}
let headers = HttpHeaders()
var status: UInt16 = 0
httpLogDebug(logger, "[ClientStream#decodeResponseHeader] decode header ok")
var index = 1
for ((key, values) in frame.fields) {
if (index != 1 && key.startsWith(":")) {
throw HttpException("Malformed response.")
}
if (key == ":status") {
status = UInt16.parse(values)
continue
}
if (H2_EXCLUDE_HEADERS.contains(key)) {
continue
}
if (key == "trailer") {
respHasTrailer = true
}
// some header value contains comma, cannot split
headers.add(key, values)
index++
}
if (status == 0) {
throw HttpException("Malformed response.")
}
return (headers, status, !frame.streamEnd)
}
func decodeTrailer(headerList: FieldsList): Unit {
let trailers = HttpHeaders()
for ((key, values) in headerList) {
trailers.add(key, values)
}
let rsp = response.getOrThrow()
checkTrailer(rsp)
rsp._trailers = trailers
}
/**************************************** status machine ****************************************/
/* process stream status before write frames */
func preProcess(frame: Frame): Unit {
match (frame) {
case frame: DataFrame => onDataWrite(frame)
case frame: FieldsFrame => onFieldsWrite(frame)
case _: RstStreamFrame => onRstWrite()
case _: WindowUpdateFrame => ()
case _ =>
close(ProtocolError)
throw HttpStreamException(ProtocolError, "Unexpected frame:${frame}.")
}
}
private func onDataWrite(frame: DataFrame): Unit {
match (status) {
case Open =>
if (frame.streamEnd) {
status = HalfClosedLocal
}
case HalfClosedRemote =>
if (frame.streamEnd) {
status = Closed
engine.purgeStream(streamId)
}
case _ => throw HttpConnectionException(ProtocolError, "Write data in ${status} status.")
}
}
private func onFieldsWrite(frame: FieldsFrame): Unit {
match (status) {
case Idle =>
if (frame.streamEnd) {
status = HalfClosedLocal
} else {
status = Open
}
case Open =>
if (request?.trailers.isEmpty() ?? false) {
throw HttpConnectionException(ProtocolError,
"Send header in Open state, but request don't have trailer.")
}
if (!frame.streamEnd) {
throw HttpConnectionException(ProtocolError, "Trailer must have streamEnd flag.")
}
status = HalfClosedLocal
case HalfClosedRemote =>
if (request?.trailers.isEmpty() ?? false) {
throw HttpConnectionException(ProtocolError,
"Send header in Open state, but request don't have trailer.")
}
if (!frame.streamEnd) {
throw HttpConnectionException(ProtocolError, "Trailer must have streamEnd flag.")
}
status = Closed
engine.purgeStream(streamId)
case _ => throw HttpConnectionException(ProtocolError,
"Headers frame can only send in Idle or Open or HalfClosedRemote.")
}
}
/* write reset come from stream.close, only happen in user thread */
private func onRstWrite(): Unit {
status = Closed
writeTimer.cancel()
readTimer.cancel()
engine.purgeStream(streamId)
closeConn()
}
/* process stream status after receive frames */
func postProcess(frame: Frame): Unit {
match (frame) {
case frame: FieldsFrame => onFieldsRead(frame)
case frame: DataFrame => onDataRead(frame)
case frame: RstStreamFrame => onRstRead(frame)
case frame: WindowUpdateFrame => onWindowUpdateRead(frame)
case _ => throw HttpStreamException(ProtocolError, "Unexpected frame: ${frame}.")
}
if ((status == Closed || status == HalfClosedRemote) && !inputQueue.isClosed()) {
inputQueue.close()
}
}
private func onDataRead(frame: DataFrame): Unit {
streamEnd = frame.streamEnd
if (streamEnd) {
readTimer.cancel()
}
match (status) {
case Open =>
if (streamEnd) {
status = HalfClosedRemote
}
case HalfClosedLocal =>
if (streamEnd) {
status = Closed
engine.purgeStream(streamId)
}
case HalfClosedRemote | Closed => throw HttpStreamException(StreamClosed, "Receive data when remote closed.")
case _ => throw HttpConnectionException(ProtocolError, "Receive data in state ${status}.")
}
if (localWindow.load() < 5 * 1024 * 1024) {
sendWindowUpdate(5 * 1024 * 1024)
}
localWindow.fetchSub(frame.payloadLen)
inputQueue.enqueue(frame)
}
private func sendWindowUpdate(increment: UInt32): Unit {
if (increment == 0) {
return
}
localWindow.fetchAdd(increment)
outputQueue.send(WindowUpdateFrame(streamId, increment), CONTROL_PRIORITY)
}
private func onFieldsRead(frame: FieldsFrame): Unit {
streamEnd = frame.streamEnd
if (streamEnd) {
readTimer.cancel()
}
if (expectContinuation) {
notifyContinue()
}
if (frame.pushId == 0) {
onHeadersRead()
inputQueue.enqueue(frame)
} else {
onPushRead()
let pushStream = engine
.getStream(frame.pushId)
.getOrThrow({
=> HttpException("Stream not found, id:${frame.pushId}.")
})
let fieldsFrame = FieldsFrame(frame.pushId, frame.fields)
pushStream.inputQueue.enqueue(fieldsFrame)
}
}
private func onHeadersRead(): Unit {
match (status) {
case Open =>
if (streamEnd) {
status = HalfClosedRemote
}
case HalfClosedLocal =>
if (streamEnd) {
status = Closed
engine.purgeStream(streamId)
}
// this case will occur when client receives PushPromise
case ReservedRemote =>
status = HalfClosedLocal
if (streamEnd) {
status = Closed
engine.purgeStream(streamId)
}
case Closed => throw HttpConnectionException(StreamClosed, "Receive headers frame on state Closed.")
case _ => throw HttpConnectionException(ProtocolError, "Receive headers frame on state ${status}.")
}
}
private func onRstRead(frame: RstStreamFrame): Unit {
httpLogDebug(logger, "[ClientStream#onRstRead] stream received reset: ${frame}")
match (status) {
case Idle => throw HttpConnectionException(ProtocolError, "Read rstStream on idle stream.")
case _ =>
status = Closed
writeTimer.cancel()
readTimer.cancel()
engine.purgeStream(frame.streamId)
closeConn()
}
}
/* http_client will throw push if setting not allowed */
private func onPushRead(): Unit {
match (status) {
case Idle => throw HttpConnectionException(ProtocolError, "Idle stream received push.")
case HalfClosedLocal | Open => ()
case HalfClosedRemote => throw HttpStreamException(StreamClosed, "Push frame read in ${status} status.")
case Closed => throw HttpConnectionException(StreamClosed, "Push frame read in ${status} status.")
case _ => throw HttpConnectionException(ProtocolError, "Push frame read in ${status} status.")
}
}
private func onWindowUpdateRead(frame: WindowUpdateFrame): Unit {
match (status) {
case Idle => throw HttpConnectionException(ProtocolError, "Read window update frame on idle stream.")
case _ => ()
}
if (remoteWindow.load() + Int64(frame.increment) > Int64(MAX_WINDOW)) {
throw HttpConnectionException(FlowControlError, "Window over flow.")
}
remoteWindow.fetchAdd(Int64(frame.increment))
// locked at func writeRequest write body
remoteWindowMonitor.lock()
remoteWindowMonitor.notifyAll()
remoteWindowMonitor.unlock()
}
/**************************************** closing ****************************************/
func close(code: H2Error) {
// shutdown all thread, change status
outputQueue.send(RstStreamFrame(streamId, code.code), CONTROL_PRIORITY)
closeConn()
}
func closeConn() {
engineConn?.close()
remoteWindowMonitor.lock()
remoteWindowMonitor.notifyAll()
remoteWindowMonitor.unlock()
}
}
class Http2ClientEngineConn <: StreamingSocket {
let stream: ClientStream
let streamId: UInt32
let inputQueue: ClosableBlockingQueue<Frame>
let outputQueue: SPMCLevelQueue<Frame>
var quit = false
var bodyReadEnd = false
var remaining: BytesIOStream = BytesIOStream()
let logger: Logger
init(stream: ClientStream) {
this.stream = stream
this.streamId = stream.streamId
this.inputQueue = stream.inputQueue
this.outputQueue = stream.outputQueue
this.logger = stream.logger
}
public prop localAddress: SocketAddress {
get() {
return stream.engine.conn.socket.localAddress
}
}
public prop remoteAddress: SocketAddress {
get() {
return stream.engine.conn.socket.remoteAddress
}
}
public mut prop readTimeout: ?Duration {
get() {
return None
}
set(_) {
throw HttpException("Should not set timeout on h2 stream.")
}
}
public mut prop writeTimeout: ?Duration {
get() {
return None
}
set(_) {
throw HttpException("Should not set timeout on h2 stream.")
}
}
/* read raw bytes, normally used in upgraded protocol, e.g. websocket. */
public func read(buf: Array<UInt8>): Int64 {
if (buf.size == 0) {
throw HttpException("Read use empty buf.")
}
if (remaining.remainLength > 0) {
return remaining.read(buf)
}
let frame = inputQueue.dequeue()
match (frame) {
case Some(v) => match (v) {
case f: DataFrame =>
// not enough
return if (f.data.size <= buf.size) {
f.data.copyTo(buf, 0, 0, f.data.size)
f.data.size
} else {
f.data.copyTo(buf, 0, 0, buf.size)
remaining.write(f.data[buf.size..])
buf.size
}
case _ =>
quit = true
throw HttpStreamException(ProtocolError, "The ws meet unexpected frame ${frame}.")
}
case None =>
quit = true
throw HttpException("Failed to fetch data, the connection is closed.")
}
}
/* write raw bytes, normally used in upgraded protocol, e.g. websocket. */
public func write(bytes: Array<UInt8>): Unit {
writeBody(bytes, false)
}
/* for websocket close*/
func writeWebSocketReset(): Unit {
outputQueue.send(RstStreamFrame(streamId, Cancel.code), MESSAGE_PRIORITY)
}
/* receive field blocks (from encoder), send frames */
func writeHeaders(fieldBlock: FieldsList, streamEnd!: Bool = false): Unit {
if (quit) {
return
}
let fieldsFrame = FieldsFrame(streamId, fieldBlock, last: streamEnd)
if (!outputQueue.send(fieldsFrame, MESSAGE_PRIORITY)) {
throw HttpException("OutputQueue closed while writing headers.")
}
}
func writeBody(bytes: Array<UInt8>, streamEnd: Bool): Unit {
if (quit) {
return
}
let data = DataFrame(streamId, bytes, bytes.size, last: streamEnd)
if (!outputQueue.send(data, MESSAGE_PRIORITY)) {
throw HttpException("OutputQueue closed while writing body.")
}
if (bytes.size > 0) {
stream.remoteWindow.fetchSub(bytes.size)
}
}
func readBody(buf: Array<UInt8>): Int64 {
if (buf.size == 0) {
throw HttpException("Read buffer size can not be zero!")
}
if (bodyReadEnd && remaining.remainLength == 0) {
return 0
}
if (remaining.remainLength > 0) {
return remaining.read(buf)
}
let frame = match (inputQueue.dequeue()) {
case Some(v) => v
case None =>
if (let Some(t) <- stream.timeout) {
throw t
}
stream.readTimer.cancel()
throw HttpException("Failed to fetch data, the connection or stream is closed.")
}
match (frame) {
case f: DataFrame =>
bodyReadEnd = f.streamEnd
let copyLen = min(buf.size, f.size)
// receive empty data frame but stream not end, there must be trailer
if (copyLen == 0) {
if (!f.streamEnd) {
return readBody(buf)
} else {
return 0
}
}
f.data.copyTo(buf, 0, 0, copyLen)
if (f.size > copyLen) {
remaining.write(f.data[copyLen..])
}
return copyLen
case f: FieldsFrame =>
// read and decode trailer after read body end
// if no body but have trailer, must call body.read to get trailer
bodyReadEnd = true
stream.decodeTrailer(f.fields)
return 0
case _ => throw HttpStreamException(ProtocolError, "Meet unexpected frame: ${frame}.")
}
}
public func close(): Unit {
// can not receive data if inputQueue closed
if (!inputQueue.isClosed()) {
inputQueue.close()
}
// cannot close outputQueue, it`s belong to connection
quit = true
}
public func isClosed(): Bool {
quit
}
public func toString(): String {
"Client Stream"
}
}
class Http2ClientBodyProvider <: InputStream & WebSocketConn & Resource {
var readEnd = false
let isConnect: Bool
let conn: Http2ClientEngineConn
init(conn: Http2ClientEngineConn, isConnect: Bool) {
this.conn = conn
this.isConnect = isConnect
}
public func read(buf: Array<Byte>): Int64 {
if (isConnect) {
throw HttpException("Call readRaw & writeRaw for tunnel.")
}
if (readEnd) {
return 0
}
let len = conn.readBody(buf)
readEnd = len == 0
return len
}
public func readRaw(byteArray: Array<UInt8>): Int64 {
if (!isConnect) {
throw HttpException("Not Connect response, call read.")
}
try {
var len: Int64 = 0
while (len < byteArray.size) {
let singleReadLen = conn.read(byteArray[len..])
if (singleReadLen == 0) {
break
}
len += singleReadLen
}
return len
} catch (e: Exception) {
throw ConnectionException("Connection closed.")
}
}
public func writeRaw(byteArray: Array<UInt8>): Unit {
if (!isConnect) {
throw HttpException("Not Connect response, call read.")
}
if (conn.quit) {
throw ConnectionException("Connection closed.")
}
conn.write(byteArray)
}
public func close(): Unit {
if (conn.quit) {
return
}
conn.writeWebSocketReset()
conn.close()
}
public func isClosed(): Bool {
conn.quit
}
}