/*
* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* This source file is part of the Cangjie project, licensed under Apache-2.0
* with Runtime Library Exception.
*
* See https://cangjie-lang.cn/pages/LICENSE for license information.
*/
package stdx.net.http
import std.net.*
import std.io.*
import std.collection.{ArrayList, HashMap}
import std.sync.{Monitor, AtomicUInt32, AtomicInt64, AtomicBool, SyncCounter}
import std.convert.Parsable
import std.time.MonoTime
import stdx.net.tls.common.TlsConnection
import stdx.crypto.common.Certificate
import stdx.log.{Logger, LogLevel}
import stdx.encoding.url.URL
class Stream {
var _server: ?HttpServer2
// will be set when received the whole headers of request
var engineConn: ?HttpEngineConn2 = None
var streamId: UInt32
var status: Status = Idle
// if the stream is reset by peer
let isRst = AtomicBool(false)
// store DATA frames, represent request body
let dataBBQ = ClosableBlockingQueue<DataFrame>(Int64.Max)
// set true when there is only CONTINUATION frames to be read/write on this stream
var waitContinuationToPurge = false
// flow control
// remoteWindow on stream level may be negative, when being adjusted according SettingsInitialWindowSize in SETTINGS frame
// we didn't provide method to set settings frame when server serving, so localWindow will never be negative, and never exceeds UInt32.Max
var remoteWindow: AtomicInt64
var localWindow: AtomicUInt32
let windowMonitor = Monitor()
// queuePriority is 7 - urgency, cause in SPMCLevelQueue, bigger number means higher level
// urgency falls in 0 ~ 7, 0 means highest level, the default value is 3
var queuePriority = MAX_URGENCY - DEFAULT_URGENCY
// timeout
// will be started when read the first HEADERS frame on stream, and be canceled once finishing receiving the headers of request
var readHeaderTimer = HttpTimer.empty
// will be started when read the first HEADERS frame on stream, and be canceled once finishing receiving the whole request, including headers, body and trailer
var readTimer = HttpTimer.empty
// will be started when started to write headers of response, and be canceled once the whole response has been written to tls connection
var writeTimer = HttpTimer.empty
// the value content-length field should be the same as the body size
var reqContentLength: Int64 = -1
var receivedBodySize = 0
var requestFields = ArrayList<(String, String)>(0)
let ctx = HttpContext(HttpRequest(), HttpResponseBuilder())
let isRstCounted = AtomicBool(false)
/********************************************* new stream *********************************************/
init(server: HttpServer2, id: UInt32) {
this.streamId = id
this._server = server
this.remoteWindow = AtomicInt64(Int64(server.remoteSettings[SettingsInitialWindowSize.code]))
this.localWindow = AtomicUInt32(server.localSettings[SettingsInitialWindowSize.code])
}
init() {
this.streamId = 0
this._server = None
this.remoteWindow = AtomicInt64(0)
this.localWindow = AtomicUInt32(0)
}
mut prop server: HttpServer2 {
get() {
_server ?? throw HttpException("server in stream is None")
}
set(v) {
_server = v
}
}
func reset(server: HttpServer2, id: UInt32): Unit {
this._server = server
this.streamId = id
this.status = Idle
this.waitContinuationToPurge = false
this.dataBBQ.reset()
this.localWindow.store(server.localSettings[SettingsInitialWindowSize.code])
this.remoteWindow.store(Int64(server.remoteSettings[SettingsInitialWindowSize.code]))
this.engineConn?.reset(this)
this.readHeaderTimer = HttpTimer.empty
this.readTimer = HttpTimer.empty
this.writeTimer = HttpTimer.empty
this.reqContentLength = -1
this.receivedBodySize = 0
this.ctx.reset()
this.isRst.store(false)
this.isRstCounted.store(false)
}
func startReadTimer(): Unit {
if (server.logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(server.logger,
"[Stream#startReadTimer] set read header timer, readHeaderTimeout = ${server.readHeaderTimeout}")
}
let connId = ThreadContext.connId
readHeaderTimer = HttpTimer(
start: server.readHeaderTimeout,
task: {
=>
ThreadContext.connId = connId // set connection id for logger
close(ProtocolError, "read request headers timeout on stream ${streamId}")
ThreadContext.connId = None // clear connection id
}
)
if (server.logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(server.logger,
"[Stream#startReadTimer] set read header timer, readTimeout = ${server.readTimeout}")
}
readTimer = HttpTimer(
start: server.readTimeout,
task: {
=>
ThreadContext.connId = connId // set connection id for logger
close(ProtocolError, "read request timeout on stream ${streamId}")
ThreadContext.connId = None // clear connection id
}
)
}
/********************************************* pre process / status machine *********************************************/
func preProcess(frame: Frame): Unit {
match (frame) {
case f: DataFrame => onDataRead(f)
case f: FieldsFrame => onFieldsRead(f)
case f: WindowUpdateFrame => onWindowUpdateRead(f)
case f: RstStreamFrame => onRSTRead(f)
case _: ContinuationFrame => throw HttpConnectionException(ProtocolError,
"unexpected CONTINUATION frame on stream ${streamId}")
case _ => () // ignore or throw HttpStreamException?
}
}
private func onDataRead(frame: DataFrame): Unit {
if (localWindow.load() < frame.payloadLen) {
throw HttpStreamException(FlowControlError,
"Payload length of data frame exceeds window on stream ${streamId}.")
}
receivedBodySize += frame.data.size
if (reqContentLength >= 0) {
if (receivedBodySize > reqContentLength) {
throw HttpStreamException(ProtocolError, "Body size exceeds content-length ${reqContentLength}.")
}
}
match (status) {
case Open =>
if (frame.streamEnd) {
readTimer.cancel()
if (server.logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(server.logger,
"[Stream#onDataRead] cancel read timer after read body on stream ${streamId}")
}
status = HalfClosedRemote
}
case HalfClosedLocal =>
if (frame.streamEnd) {
readTimer.cancel()
status = Closed
if (server.logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(server.logger,
"[Stream#onDataRead] cancel read timer after read body on stream ${streamId}")
httpLogDebug(server.logger,
"[Stream#onDataRead] purge stream when read data with streamEnd on stream ${streamId}")
}
purgeStream() //status should be set to Closed when purge?
}
case _ => throw HttpStreamException(StreamClosed,
"Received data frame on stream ${streamId}, status is ${status}.")
}
if (!dataBBQ.enqueue(frame)) {
server.windowUpdateOnDataConsumed(frame.payloadLen)
}
if (frame.streamEnd) {
dataEnd()
}
if (frame.payloadLen > 0) {
localWindow.fetchSub(frame.payloadLen)
sendWindowUpdateOnStream()
}
}
private func sendWindowUpdateOnStream(): Unit {
let remain = localWindow.load()
if (server.flowThreshold < remain) {
return
}
let initial = server.localSettings[SettingsInitialWindowSize.code]
if (!localWindow.compareAndSwap(remain, initial)) {
return
}
let windowFrame = WindowUpdateFrame(streamId, initial - remain)
if (!server.responseQueue.send(windowFrame, CONTROL_PRIORITY) && server.logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(server.logger, "[Stream#onDataRead] connection closed, write WINDOW_UPDATE frame failed")
}
}
private func onFieldsRead(frame: FieldsFrame): Unit {
match (status) {
case Idle => //headers
status = if (frame.streamEnd) {
dataEnd()
HalfClosedRemote
} else {
Open
}
processHeaders(frame)
case Open => //trailers
if (!frame.streamEnd) {
throw HttpStreamException(ProtocolError, "Received trailers without streamEnd flag.")
}
processTrailers(frame)
dataEnd()
status = HalfClosedRemote
case HalfClosedLocal => //trailers
if (!frame.streamEnd) {
throw HttpStreamException(ProtocolError, "Received trailers without streamEnd flag.")
}
processTrailers(frame)
dataEnd()
status = Closed
if (server.logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(server.logger,
"[Stream#onFieldsRead] purge stream when read headers with headerEnd on stream ${streamId}")
}
purgeStream()
case _ => throw HttpStreamException(StreamClosed,
"Received headers and continuation frame on stream ${streamId}, status is ${status}.")
}
}
private func onWindowUpdateRead(frame: WindowUpdateFrame): Unit {
match (status) {
case Idle => throw HttpConnectionException(ProtocolError, "Received window update frame on idle stream.")
case _ => ()
}
remoteWindow.fetchAdd(Int64(frame.increment))
if (remoteWindow.load() > Int64(MAX_WINDOW)) {
throw HttpStreamException(FlowControlError,
"WINDOW_UPDATE frame cause remote window on stream ${streamId} exceeds 2^31-1.")
}
windowMonitor.lock()
windowMonitor.notifyAll()
windowMonitor.unlock()
}
private func rstErrorDesciption(errorCode: UInt32): (String, String) {
match (errorCode) {
case 0x1 => ("PROTOCOL_ERROR", "Protocol error detected.")
case 0x2 => ("INTERNAL_ERROR", "Implementation fault.")
case 0x3 => ("FLOW_CONTROL_ERROR", "Flow-control limits exceeded.")
case 0x4 => ("SETTINGS_TIMEOUT", "Settings not acknowledged.")
case 0x5 => ("STREAM_CLOSED", "Frame received for closed stream.")
case 0x6 => ("FRAME_SIZE_ERROR", "Frame size incorrect.")
case 0x7 => ("REFUSED_STREAM", "Stream not processed.")
case 0x8 => ("CANCEL", "Stream cancelled.")
case 0x9 => ("COMPRESSION_ERROR", "Compression state not updated.")
case 0xa => ("CONNECT_ERROR", "TCP connection error for CONNECT method.")
case 0xb => ("ENHANCE_YOUR_CALM", "Processing capacity exceeded.")
case 0xc => ("INADEQUATE_SECURITY", "Negotiated TLS parameters not acceptable.")
case 0xd => ("HTTP_1_1_REQUIRED", "Use HTTP/1.1 for the request.")
case _ => ("UNKNOWN_ERROR", "Unknown error.")
}
}
private func onRSTRead(frame: RstStreamFrame): Unit {
let (errorName, errorDescription) = rstErrorDesciption(frame.errorCode)
httpLogWarn(server.logger,
"[Stream#onRSTRead] stream received rst frame, stream id: ${streamId}, errorCode: ${errorName}(${frame.errorCode}): ${errorDescription}")
if (status == Idle) {
throw HttpConnectionException(ProtocolError, "Received rst frame on idle stream.")
}
purgeStream(rstCount: false)
status = Closed
cancelAllTimers()
server.addRstStreamCnt(frame)
this.isRst.store(true)
dataBBQ.close()
}
/*
* Construct engineConn to process request and response.
*
* @throws HpackException, if decode failed
*/
private func processHeaders(frame: FieldsFrame): Unit {
readHeaderTimer.cancel()
if (server.logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(server.logger,
"[Stream#processHeaders] cancel read header timer when received headers on stream ${streamId}")
}
if (frame.streamEnd) {
readTimer.cancel()
if (server.logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(server.logger,
"[Stream#processHeaders] cancel read timer when received headers on stream ${streamId}")
}
}
if (engineConn.isNone()) {
let engine = HttpEngineConn2(this)
engine.readTimeout = server.readTimeout
engine.writeTimeout = server.writeTimeout
engine.readHeaderTimeout = server.readHeaderTimeout
engine.maxRequestHeaderSize = server.maxRequestHeaderSize
engine.maxRequestBodySize = server.maxRequestBodySize
engine.request = ctx.request
engineConn = engine
ctx.httpConn = engine
}
requestFields = frame.fields
if (!server.requestQueue.send(this, queuePriority)) {
if (server.logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(server.logger,
"[Stream#processHeaders] connection closed, discard request on stream ${streamId}")
}
}
}
private func processTrailers(frame: FieldsFrame): Unit {
readHeaderTimer.cancel()
readTimer.cancel()
if (server.logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(server.logger,
"[Stream#processHeaders] cancel read header timer and read timer when received trailers on stream ${streamId}"
)
}
engineConn?.checkAndSetReqTrailer(frame.fields) // throw HttpStreamException, if check fails
}
private func dataEnd(): Unit {
dataBBQ.close()
if (reqContentLength >= 0) {
if (receivedBodySize < reqContentLength) {
throw HttpStreamException(ProtocolError, "Body size is less than content-length ${reqContentLength}.")
}
}
}
/********************************************* handle *********************************************/
func handle(): Unit {
engineConn?.constructRequest(requestFields)
let engine = engineConn ?? throw HttpException("Internal error, engineConn in stream is None.")
if (let Some(expectBody) <- (ctx.request.body as HttpExpectBodyProviderH2)) {
expectBody.context = ctx
}
let handler = if (ctx.request.url.path == ASTERISK && ctx.request.method == "OPTIONS") {
OptionsHandler()
} else {
server.distributor.distribute(ctx.request.url.path)
}
try {
let handlerStartTime = MonoTime.now()
handler.handle(ctx)
let handlerEndTime = MonoTime.now()
if (server.logger.enabled(LogLevel.DEBUG)) {
let duration = handlerEndTime - handlerStartTime
httpLogDebug(server.logger,
"[Stream#handle] Handler for request ${ctx.request.method} ${ctx.request.url} on stream ${streamId} took ${duration.toMilliseconds()} ms."
)
}
} catch (e: Exception) { // exceptions from user code or check/write response failed and user hasn't catch, return 500 response.
httpLogWarn(server.logger, "[Stream#handle] handle failed, ${e}")
cleanDataQueue()
// headers haven't been sent, send response with status code STATUS_INTERNAL_SERVER_ERROR
if (!ctx.responseFlushedByUser && !ctx.upgraded) {
ctx.responseBuilder.status(HttpStatusCode.STATUS_INTERNAL_SERVER_ERROR).body(e.message)
engine.writeResponse(ctx) // throw HttpException, if responseQueue is closed, log and return
return
}
throw e
}
cleanDataQueue()
if (!engine.connect || !ctx.upgraded) {
engine.writeResponse(ctx) // throw HttpException/IllegalArgumentException, if response invalid
} else {
// if upgraded, user should call close in handler, who will send an empty DATA with streamEnd set.
// An extra close is called to ensure the stream is close
engine.close()
}
}
func cleanDataQueue() {
if (!dataBBQ.isClosed()) {
dataBBQ.close()
}
while (let Some(frame) <- dataBBQ.dequeue()) {
// release window
server.windowUpdateOnDataConsumed(frame.payloadLen)
}
}
/********************************************* post process / status machine *********************************************/
func postProcess(frame: Frame): Unit {
match (frame) {
case frame: DataFrame => onDataWrite(frame)
case frame: FieldsFrame => onFieldsWrite(frame)
case _: WindowUpdateFrame => () //nothing to do
case _: RstStreamFrame => onRstWrite()
case _ => throw HttpStreamException(ProtocolError, "Unexpected frame to write.") //never come here
}
}
private func onDataWrite(frame: DataFrame): Unit {
match (status) {
case Open =>
if (frame.streamEnd) {
status = HalfClosedLocal
if (server.logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(server.logger,
"[Stream#onDataWrite] write data with streamEnd on open stream ${streamId}")
}
close(NoError, "write response end, close stream ${streamId}")
}
case HalfClosedRemote =>
if (frame.streamEnd) {
status = Closed
if (server.logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(server.logger,
"[Stream#onDataWrite] purge stream when write data with streamEnd on stream ${streamId}")
}
purgeStream()
}
case _ => throw HttpStreamException(ProtocolError, "Write data frame on ${status} stream.")
}
}
private func onFieldsWrite(frame: FieldsFrame): Unit {
if (frame.pushId == 0) {
onHeadersFieldsWrite(frame)
} else {
onPushFieldsWrite()
}
}
private func onHeadersFieldsWrite(frame: FieldsFrame): Unit {
match (status) {
case Open => // headers / trailers / 100-continue
if (frame.streamEnd) {
status = HalfClosedLocal
close(NoError, "write response end, close stream ${streamId}")
}
case HalfClosedRemote => // headers / trailers / 100-continue
if (frame.streamEnd) {
status = Closed
if (server.logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(server.logger,
"[Stream#onHeadersFieldsWrite] purge stream when write headers with streamEnd on stream ${streamId}"
)
}
purgeStream()
}
case ReservedLocal => // push response headers
status = HalfClosedRemote
case _ => throw HttpStreamException(ProtocolError, "Write headers frame on ${status} stream.")
}
}
private func onPushFieldsWrite(): Unit {
match (status) {
case Open | HalfClosedRemote => ()
case _ => throw HttpStreamException(ProtocolError, "Write push promise on ${status} stream.")
}
}
private func onRstWrite(): Unit {
purgeStream()
dataBBQ.close()
}
func isClosed(): Bool {
return status == Closed
}
/********************************************* closing *********************************************/
// cjlint-ignore -start !G.OTH.03
/*
* According to https://www.rfc-editor.org/rfc/rfc9113.html#rfc.section.5.4.2,
* if frames on this stream are received in a round-trip, they should be processed minimally.
* We will send global WINDOW_UPDATE, if received DATA. Other frames will be ignored.
*/
// cjlint-ignore -end
func close(h2Error: H2Error, message: String, slow!: Bool = false): Unit {
if (h2Error.code != NoError.code) {
httpLogWarn(server.logger, "[Stream#close] stream ${streamId}: ${message}")
}
// received rst and status has already been set to Closed
if (isClosed()) {
purgeStream(purge: false, rstCount: !slow)
return
}
status = Closed
cancelAllTimers()
dataBBQ.close()
let id = streamId
spawn {
if (slow) {
sleep(Duration.second * 5)
}
let rstFrame = RstStreamFrame(id, h2Error.code)
if (!server.responseQueue.send(rstFrame, CONTROL_PRIORITY)) {
if (server.logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(server.logger, "[Stream#close] stream ${streamId}: connection closed, write rst failed.")
}
}
}
}
private func purgeStream(purge!: Bool = true, rstCount!: Bool = true): Unit {
windowMonitor.lock()
windowMonitor.notifyAll()
windowMonitor.unlock()
var needCount = rstCount
if (rstCount) {
if (!isRstCounted.compareAndSwap(false, true)) {
needCount = false
}
}
server.purgeStream(this, purge, needCount)
return
}
func cancelAllTimers(): Unit {
readHeaderTimer.cancel()
readTimer.cancel()
writeTimer.cancel()
readHeaderTimer = HttpTimer.empty
readTimer = HttpTimer.empty
writeTimer = HttpTimer.empty
if (server.logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(server.logger, "[Stream#cancelAllTimers] cancel all timer on stream ${streamId}")
}
}
}
class HttpEngineConn2 <: HttpEngineConn & InputStream {
let stream: Stream
var responseQueue: SPMCLevelQueue<Frame>
var maxFrameSize: UInt32
let logger: Logger
let dataBBQ: ClosableBlockingQueue<DataFrame>
var streamId: UInt32
let quit = AtomicBool(false)
var request = HttpRequest.empty //request received on this stream, will be set when finished receiving headers
// data frame, which has not been read to end
var dataFrameRemained: ?DataFrame = None
var dataFrameIndex = 0
var requestAuthority = "" // for check host, value of host in headers should be the same with value of pseudoHeader :authority
// flow control
let windowMonitor: Monitor
// websocket
var upgrade: ?String = None
var connect = false
// whether response with streamEnd has been sent on stream
var responded = false
// in case trailer processed before headers, trailer will be removed because there is no trailer header
// use SyncCounter to ensure header processed first
var headerSyncCounter = SyncCounter(1)
let arrayPool: ?ArrayPool
init(stream: Stream) {
this.stream = stream
this.responseQueue = stream.server.responseQueue
this.maxFrameSize = stream.server.remoteSettings[SettingsMaxFrameSize.code]
this.dataBBQ = stream.dataBBQ
this.logger = stream.server.logger
this.streamId = stream.streamId
this.windowMonitor = stream.windowMonitor
this.arrayPool = stream.server.arrayPool
}
// stream has not been replaced, but server might not be the same one
func reset(resetedStream: Stream) {
this.responseQueue = resetedStream.server.responseQueue
this.maxFrameSize = resetedStream.server.remoteSettings[SettingsMaxFrameSize.code]
this.streamId = resetedStream.streamId
this.quit.store(false)
this.dataFrameRemained = None
this.dataFrameIndex = 0
this.requestAuthority = ""
this.upgrade = None
this.connect = false
this.responded = false
this.headerSyncCounter = SyncCounter(1)
}
public prop clientCertificate: ?Array<Certificate> {
get() {
let bufferedConn = stream.server.conn
return match (bufferedConn.socket) {
case tlsConn: TlsConnection => tlsConn.handshakeResult?.peerCertificate ?? None
case _ => None
}
}
}
prop enablePush: Bool {
get() {
return stream.server.remoteSettings[SettingsEnablePush.code] == 1
}
}
public func isClosed(): Bool {
return stream.isClosed()
}
public func getSocket(): StreamingSocket {
stream.writeTimer.cancel()
return StreamingSocket2(this)
}
/********************************************* construct request *********************************************/
func constructRequest(fields: FieldsList): Unit { //read headers
var pseudoHeadersNum = 0
for ((name, _) in fields) {
if (name.startsWith(":")) {
pseudoHeadersNum++
} else {
break
}
}
if (pseudoHeadersNum == 0) {
throw HttpStreamException(ProtocolError, "Malformed request, pseudo headers are not found.")
}
checkAndSetPseudoHeaders(fields[..pseudoHeadersNum])
if (pseudoHeadersNum < fields.size) {
checkAndSetRequestHeaders(fields[pseudoHeadersNum..])
} else {
headerSyncCounter.dec()
}
if (request._headers.isSome() && expected100Continue(request.headers)) { // cjlint-ignore !G.EXP.03
request._body = HttpExpectBodyProviderH2(this)
} else {
request._body = this
}
request._bodySize = sizeOf(request._body)
request._remoteAddr = stream.server.remoteAddress
}
// cjlint-ignore -start !G.OTH.03
/* see https://www.rfc-editor.org/rfc/rfc9113.html#HttpRequest */
// cjlint-ignore -end
private func checkAndSetPseudoHeaders(pseudoHeaders: ArrayList<(String, String)>): Unit {
var method: String = String.empty
var scheme: String = String.empty
var authority: String = String.empty
var path: String = String.empty
for ((name, value) in pseudoHeaders) {
match (name) {
case ":method" =>
if (!method.isEmpty()) {
throw HttpStreamException(ProtocolError, "PseudoHeader ${name} should not be set more than once."
)
} else {
method = value
}
request._method = value //may throw, method should be tokens
case ":scheme" =>
if (!scheme.isEmpty()) {
throw HttpStreamException(ProtocolError, "PseudoHeader ${name} should not be set more than once."
)
} else {
scheme = value
}
if (value != "https") {
throw HttpStreamException(ProtocolError, "Malformed request, scheme of h2 request must be https."
)
}
case ":authority" =>
if (!authority.isEmpty()) {
throw HttpStreamException(ProtocolError, "PseudoHeader ${name} should not be set more than once."
)
} else {
authority = value
}
requestAuthority = value
case ":path" =>
if (value.isEmpty()) {
throw HttpStreamException(ProtocolError,
"Malformed request, value of pseudo header \":path\" should not be empty.")
}
if (!path.isEmpty()) {
throw HttpStreamException(ProtocolError, "PseudoHeader ${name} should not be set more than once."
)
} else {
path = value
}
request._url = URL.parse(value)
case ":protocol" =>
if (stream.server.localSettings[SettingsEnableConnectProtocol.code] == 0) {
throw HttpStreamException(
ProtocolError,
"Malformed request, extended CONNECT protocol is disabled."
)
}
if (!h2UpgradeProtocols.contains(value)) {
throw HttpStreamException(
ProtocolError,
"Pseudo header field \":protocol\" should be single valued and the value should be \"HTTP\", \"TLS\", \"WebSocket\", \"Websocket\", \"h2c\", \"connect-udp\" or \"connect-ip\"."
)
}
upgrade = value
case _ => throw HttpStreamException(ProtocolError,
"Malformed request, invalid pseudo header name ${name}.")
}
}
// must contain ":method"
if (method.isEmpty()) {
throw HttpStreamException(ProtocolError, "Malformed request, pseudoHeaders must contain :method.")
}
// must contain ":scheme" and ":path", unless method is connect
if (method == "CONNECT") {
connect = true
if (upgrade.isNone()) {
return
}
}
if (scheme.isEmpty()) {
throw HttpStreamException(ProtocolError, "Malformed request, pseudoHeaders must contain :scheme.")
}
if (path.isEmpty()) {
throw HttpStreamException(ProtocolError, "Malformed request, pseudoHeaders must contain :path.")
}
}
private func checkAndSetRequestHeaders(toCheck: ArrayList<(String, String)>): Unit {
for ((name, value) in toCheck) {
if (name.startsWith(":")) {
headerSyncCounter.dec()
throw HttpStreamException(ProtocolError,
"Malformed request, h2 pseudoHeaders ${name} should procedure headers.")
}
if (name.hasUpper()) {
headerSyncCounter.dec()
throw HttpStreamException(ProtocolError, "Malformed request, h2 headers should be lower cases.")
}
// should not contain connection, proxy-connection, keep-alive, transfer-encoding, upgrade
if (H2_EXCLUDE_HEADERS.contains(name)) {
headerSyncCounter.dec()
throw HttpStreamException(ProtocolError, "Malformed request, h2 headers should not contain ${name}.")
}
// TE header field can exist, value of TE MUST NOT contain any value other than "trailers"
// te: trailers means client allow server to send trailers
if (name == "te" && value != "trailers") {
headerSyncCounter.dec()
throw HttpStreamException(ProtocolError,
"Malformed request, te header field must not contain any value other than \"trailers\".")
}
// if :authority is set, value of Host must be the same with value of :authority
if (name == "host") {
if (!requestAuthority.isEmpty() && requestAuthority != value) {
headerSyncCounter.dec()
throw HttpStreamException(ProtocolError,
"Malformed request, value of :authority and host should be the same.")
}
}
if (name == "priority") {
let urgency = parsePriority(value)
stream.queuePriority = MAX_URGENCY - urgency
if (logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger,
"[Stream#checkAndSetRequestHeaders] stream ${stream.streamId}, set queuePriority = ${stream.queuePriority}"
)
}
}
if (name == "trailer") {
if (!Str(value).splitAllMatch(SYMBOL_COMMA, {v => !TrailerExcludeList.contains(v)})) {
headerSyncCounter.dec()
throw HttpStreamException(ProtocolError, "Malformed request, invalid trailer value in header.")
}
}
// set header to request
// will check, name should be tokens, value should be vchar/SP/HTAB
try {
request.headers.add(name, value) // may throw HttpException
} catch (e: HttpException) {
headerSyncCounter.dec()
throw HttpStreamException(ProtocolError, "Malformed request, ${e.message}.")
}
}
headerSyncCounter.dec()
try {
if (let Some(headers) <- request._headers) {
stream.reqContentLength = checkCl(headers) ?? -1
checkContentLength()
}
} catch (e: HttpException) {
throw HttpStreamException(ProtocolError, "Malformed request, ${e.message}.")
}
}
private func checkContentLength() {
if (stream.reqContentLength >= 0 && stream.receivedBodySize > stream.reqContentLength) {
throw HttpStreamException(ProtocolError, "Body size exceeds content-length ${stream.reqContentLength}.")
}
}
/**
* throw HttpStreamException, if trailer invalid
*/
func checkAndSetReqTrailer(fields: FieldsList): Unit {
let trailers = request.trailers
try {
for ((k, v) in fields) {
trailers.add(k, v) // throw HttpException, if k or v contain invalid elements
}
// remove trailers, name of which is absent in header "trailer"
headerSyncCounter.waitUntilZero()
if (let Some(headers) <- request._headers) {
checkTrailer(request)
}
} catch (e: HttpException) {
throw HttpStreamException(ProtocolError, "Invalid trailer field, ${e.message}.")
}
}
public func read(buf: Array<UInt8>): Int64 {
var frameIndex = 0
var readLen = 0
var frameOp: ?DataFrame = None
if (dataFrameRemained.isSome()) {
frameOp = dataFrameRemained
frameIndex = dataFrameIndex
dataFrameRemained = None
} else {
frameOp = dataBBQ.dequeue()
// update window
stream.server.windowUpdateOnDataConsumed(frameOp?.payloadLen ?? 0)
}
// dataBBQ has been closed && dataBBQ is empty
// will return 0, 1. finished receiving body, 2. connection closed with some error, 3. stream closed with some error
if (let Some(frame) <- frameOp) {
var singleReadLen = min(buf.size, frame.size - frameIndex)
frame.data.copyTo(buf, frameIndex, 0, singleReadLen)
frameIndex += singleReadLen
readLen += singleReadLen
if (frameIndex < frame.size) {
dataFrameRemained = frame
dataFrameIndex = frameIndex
} else {
frame.recyclePayload(arrayPool)
}
}
return readLen
}
/********************************************* write response *********************************************/
/*
* Write a response, including headers, [body], [trailer].
*/
func writeResponse(responseBuilder: HttpResponseBuilder): Unit {
if (stream.isClosed()) {
throw HttpException("Stream ${streamId} closed, write response failed.")
}
let status = responseBuilder._status ?? HttpStatusCode.STATUS_OK
if (status == HttpStatusCode.STATUS_CONTINUE) {
httpLogWarn(logger, "[Stream#writeResponse] status of final response should not be 100")
writeResponse(
HttpResponseBuilder()
.status(HttpStatusCode.STATUS_INTERNAL_SERVER_ERROR)
.body("Internal Server Error: invalid status code"))
return
}
checkAndSetCl(responseBuilder, request.method == "HEAD")
if (request.method == "HEAD" || status / 100 == 1 || status == HttpStatusCode.STATUS_NO_CONTENT || status == HttpStatusCode
.STATUS_NOT_MODIFIED) {
responseBuilder._body = HttpEmptyBody.INSTANCE
}
let bodySize = sizeOf(responseBuilder._body) ?? -1
let hasTrailer: Bool = !responseBuilder.trailers.isEmpty()
match ((bodySize != 0, hasTrailer)) {
case (true, true) =>
checkTrailer(responseBuilder)
writeHeader(status, responseBuilder.headers, streamEnd: false)
writeBody(responseBuilder._body, streamEnd: false)
writeTrailer(responseBuilder.trailers)
case (true, false) =>
writeHeader(status, responseBuilder.headers, streamEnd: false)
writeBody(responseBuilder._body, streamEnd: true)
case (false, true) =>
checkTrailer(responseBuilder)
writeHeader(status, responseBuilder.headers, streamEnd: false)
writeTrailer(responseBuilder.trailers)
case (false, false) => writeHeader(status, responseBuilder.headers, streamEnd: true)
}
}
public func writeResponse(ctx: HttpContext): Unit {
if (stream.isClosed()) {
throw HttpException("Stream ${streamId} closed, write response failed.")
}
let responseBuilder = ctx.responseBuilder
synchronized(ctx.writerMtx) {
if (ctx.upgraded) {
return
}
ctx.responded = true
if (ctx.responseFlushedByUser) { //headers and part of body has been sent
checkTrailer(responseBuilder) //throw HttpException if a field name is not allowed in trailers
if (responseBuilder.trailers.isEmpty()) {
writeEmptyDataFrame()
return
}
writeTrailer(responseBuilder.trailers)
return
}
writeResponse(responseBuilder)
}
}
public func writeResponseByWriter(ctx: HttpContext, bodyData: Array<UInt8>): Unit {
if (stream.isClosed()) {
throw HttpException("Stream ${streamId} closed, write response failed.")
}
if (!ctx.responseFlushedByUser) {
ctx.responseFlushedByUser = true
let status = ctx.responseBuilder._status ?? HttpStatusCode.STATUS_OK
if (request.method == "HEAD" || status / 100 == 1 || status == 204 || status == 304) {
throw HttpException("The body should be empty.")
}
writeHeader(status, ctx.responseBuilder.headers, streamEnd: false)
}
writeBody(bodyData, streamEnd: false)
}
public func writeUpgradeResponse(ctx: HttpContext): Unit {
if (stream.isClosed()) {
throw HttpException("Stream ${streamId} closed, write response failed.")
}
if (ctx.request.method != "CONNECT") {
throw HttpException("In http/2 build a tunnel is supported only when request method is CONNECT.")
}
// in http/2, response to CONNECT request must have status 2XX, and should not contain header field "content-length"
// cjlint-ignore -start !G.OTH.03
// ref: https://www.rfc-editor.org/rfc/rfc9113.html#CONNECT
// cjlint-ignore -end
let status = ctx.responseBuilder._status ?? HttpStatusCode.STATUS_OK
if (status / 100 != 2) {
throw HttpException("Status of response to CONNECT request must be 2XX, if tunnel is built successfully.")
}
ctx.responseBuilder.headers.del("content-length")
writeHeader(status, ctx.responseBuilder.headers, streamEnd: false)
}
private func writeBody(body: InputStream, streamEnd!: Bool): Unit {
if (let Some(rb) <- (body as HttpRawBody)) {
writeBody(rb.rawBody, streamEnd: streamEnd)
return
}
let readBufWrapperOp = arrayPool?.get(Int64(maxFrameSize))
let readBuf = readBufWrapperOp?.data ?? Array<UInt8>(Int64(maxFrameSize), repeat: 0) // cjlint-ignore !G.EXP.03
var readLen = body.read(readBuf) //may throw exceptions from user code
while (readLen > 0) { // if readLen < 0?
writeBody(readBuf[..readLen], streamEnd: false) // split according stream remoteWindow
readLen = body.read(readBuf)
}
if (let Some(readBufWrapper) <- readBufWrapperOp) {
if (readBufWrapper.size == 0) {
throw Exception("The readBufWrapper.size == 0, rawSize == ${readBufWrapper.rawSize}")
}
arrayPool?.put(readBufWrapper)
}
if (streamEnd) {
writeEmptyDataFrame()
}
}
func writeBody(bodyData: Array<UInt8>, streamEnd!: Bool): Unit {
var bodyIndex = 0
while (bodyIndex < bodyData.size) {
if (stream.isClosed()) {
throw HttpException("Connection closed, write body failed.")
}
waitIfWindowNegative()
let remainLen = bodyData.size - bodyIndex
let permitLen = min(Int64(maxFrameSize), stream.remoteWindow.load())
let (frame, writeLen) = if (remainLen <= permitLen) {
let cache = arrayPool?.get(remainLen) ?? ArrayWrapper(remainLen) // cjlint-ignore !G.EXP.03
bodyData.copyTo(cache.data, bodyIndex, 0, remainLen)
let f = DataFrame(stream.streamId, cache, remainLen, last: streamEnd)
bodyIndex = bodyData.size
(f, remainLen)
} else {
let cache = arrayPool?.get(permitLen) ?? ArrayWrapper(permitLen) // cjlint-ignore !G.EXP.03
bodyData.copyTo(cache.data, bodyIndex, 0, permitLen)
let f = DataFrame(stream.streamId, cache, permitLen)
bodyIndex += permitLen
(f, permitLen)
}
if (!responseQueue.send(frame, MESSAGE_PRIORITY)) {
// to handler or stream.handle, which will log the exception and go on
throw HttpException("Connection closed, write body failed.")
}
stream.remoteWindow.fetchSub(writeLen)
}
}
private func waitIfWindowNegative(): Unit {
while (stream.remoteWindow.load() <= 0 && !quit.load()) {
if (stream.isClosed()) {
throw HttpException("Stream is closed when waiting window update on stream ${stream.streamId}.") // to handler or stream.handle, which will log the exception and go on
}
if (logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(stream.server.logger,
"[Stream#waitIfWindowNegative] server stream ${stream.streamId} wait for window_update, current window is negative: ${stream.remoteWindow.load()}"
)
}
windowMonitor.lock()
windowMonitor.wait(timeout: Duration.millisecond * 10)
windowMonitor.unlock()
}
}
/*
* Write an empty data frame with streamEnd
*/
func writeEmptyDataFrame(): Unit {
let frame = DataFrame(stream.streamId, Array<UInt8>(), 0, last: true)
if (!responseQueue.send(frame, MESSAGE_PRIORITY)) {
throw HttpException("Connection closed, write body failed.")
}
}
/*
* Func writeResponse and class HttpExpectBodyProviderH2 will call it
*
* @throws HttpException, if headers invalid
*/
func writeHeader(status: UInt16, header: HttpHeaders, streamEnd!: Bool = false): Unit {
// start writeTimer
if (logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger, "[Stream#writeHeader] set write timer, writeTimeout = ${writeTimeout}")
}
stream.writeTimer = HttpTimer(start: writeTimeout,
task: {
=> stream.close(ProtocolError, "write response timeout on stream ${stream.streamId}")
})
let fields = FieldsList()
let statusString = STATUS_STRING.get(status) ?? status.toString()
fields.add((":status", statusString))
checkAndSetResponseHeaders(fields, header)
let frame = FieldsFrame(stream.streamId, fields, last: streamEnd)
if (!responseQueue.send(frame, MESSAGE_PRIORITY)) {
throw HttpException("Connection closed, write header failed.")
}
}
func write100ContinueHeader(): Unit {
let fields = FieldsList([(":status", "100")])
let frame = FieldsFrame(stream.streamId, fields, last: false)
if (!responseQueue.send(frame, MESSAGE_PRIORITY)) {
throw HttpException("Connection closed, write header failed.")
}
}
func writePush(path: String, method: String, header: HttpHeaders): Unit {
if (isPushStream(stream.streamId)) {
httpLogWarn(logger, "[Stream#writePush] should not call server push on push stream")
return
}
let pushStream = stream.server.createPushStream()
let fields = FieldsList()
if (method.isEmpty()) {
fields.add((":method", "GET"))
} else {
fields.add((":method", method))
}
fields.add((":scheme", "https"))
if (!requestAuthority.isEmpty()) {
fields.add((":authority", requestAuthority))
}
fields.add((":path", path))
checkAndSetResponseHeaders(fields, header)
let engine = HttpEngineConn2(pushStream)
engine.readTimeout = pushStream.server.readTimeout
engine.writeTimeout = pushStream.server.writeTimeout
engine.readHeaderTimeout = pushStream.server.readHeaderTimeout
engine.maxRequestHeaderSize = pushStream.server.maxRequestHeaderSize
engine.maxRequestBodySize = pushStream.server.maxRequestBodySize
engine.request = pushStream.ctx.request
pushStream.engineConn = engine
pushStream.ctx.httpConn = engine
pushStream.requestFields = fields
let frame = FieldsFrame(stream.streamId, fields, pushId: pushStream.streamId)
if (!responseQueue.send(frame, MESSAGE_PRIORITY)) {
throw HttpException("Connection closed, send push request failed.")
}
pushStream.status = ReservedLocal
if (let Some(vs) <- header.getInternal("priority")) {
pushStream.queuePriority = MAX_URGENCY - parsePriority(vs.toString())
}
if (!stream.server.requestQueue.send(pushStream, pushStream.queuePriority)) {
throw HttpException("Connection closed, send push request failed.")
}
}
private func checkAndSetResponseHeaders(fields: FieldsList, headers: HttpHeaders): Unit {
// add date field
let dateArr: Array<Byte> = "xxx, xx xxx xxxxxxxxxxxxxxxxxxxxxx xx:xx:xx GMT".toArray() //47 byte
let dateStr = getRFC1123String(dateArr)
fields.add(("date", dateStr.toString()))
for ((k, vs) in headers.map) {
if (H2_EXCLUDE_HEADERS.contains(k.toString())) {
continue
}
if (k == Str("date")) {
continue
}
if (k == Str("trailer")) {
checkTrailerInHeader(headers, false) //throw Exception if vs contains valid element, which is excluded by trailer
}
if (k == Str("set-cookie")) {
for (v in vs) {
fields.add((k.toString(), v))
}
continue
}
fields.add((k.toString(), vs.toString()))
}
}
private func writeTrailer(trailer: HttpHeaders): Unit {
let fields = FieldsList()
for ((k, v) in trailer.map) {
fields.add((k.toString(), v.toString()))
}
let frame = FieldsFrame(stream.streamId, fields, last: true)
if (!responseQueue.send(frame, MESSAGE_PRIORITY)) {
throw HttpException("Connection closed, write trailer failed.")
}
}
/********************************************* close *********************************************/
/*
* For websocket to call.
* Send an empty DATA with streamEnd set instead of RST_STREAM, because RST_STREAM has privilege, it may discard previous DATA.
*/
func close(): Unit {
if (quit.load()) {
return
}
responseQueue.send(ConnectRstStreamFrame(RstStreamFrame(stream.streamId, NoError.code)), MESSAGE_PRIORITY)
quit.store(true)
}
}
class WebSocketConn2 <: WebSocketConn {
var streamingSocket: StreamingSocket2
var closed = false
WebSocketConn2(let engine: HttpEngineConn2) {
streamingSocket = StreamingSocket2(engine)
}
public func readRaw(byteArray: Array<UInt8>): Int64 {
var len: Int64 = 0
while (len < byteArray.size) {
let singleReadLen = streamingSocket.read(byteArray[len..])
if (singleReadLen == 0) {
break
}
len += singleReadLen
}
return len
}
public func writeRaw(byteArray: Array<UInt8>): Unit {
streamingSocket.write(byteArray)
}
public func close(): Unit {
engine.close()
}
}
class StreamingSocket2 <: StreamingSocket {
var closed = false
StreamingSocket2(let engine: HttpEngineConn2) {}
public prop localAddress: SocketAddress {
get() {
return engine.stream.server.conn.socket.localAddress
}
}
public prop remoteAddress: SocketAddress {
get() {
return engine.stream.server.conn.remoteAddress
}
}
public mut prop readTimeout: ?Duration {
get() {
return None
}
set(_) {
throw HttpException("Should not set timeout on h2 stream.")
}
}
public mut prop writeTimeout: ?Duration {
get() {
return None
}
set(_) {
throw HttpException("Should not set timeout on h2 stream.")
}
}
public func read(byteArray: Array<Byte>): Int64 {
engine.read(byteArray)
}
public func write(byteArray: Array<UInt8>): Unit {
engine.writeBody(byteArray, streamEnd: false)
}
public func isClosed(): Bool {
return closed
}
public func close(): Unit {
engine.close()
}
public func toString(): String {
throw HttpException("StreamingSocket instance upgraded from h2 stream not support toString.")
}
}
func putOrThrow(map: HashMap<String, String>, name: String, value: String): Unit {
if (map.contains(name)) {
throw HttpStreamException(ProtocolError, "PseudoHeader ${name} should not be set more than once.")
} else {
map.add(name, value)
}
}
func containsOrThrow(headers: HashMap<String, String>, name: String): Unit {
if (!headers.contains(name)) {
throw HttpStreamException(ProtocolError, "Malformed request, pseudoHeaders must contain ${name}.")
}
}
func parsePriority(value: String): Int64 {
var urgency = 3
let pMap = HashMap<String, String>()
let pArr = value.split(",")
for (p in pArr) {
let kv = p.split("=")
if (kv.size == 2) {
pMap.add(kv[0].trimAscii(), kv[1].trimAscii())
} else if (kv.size == 1) {
pMap.add(kv[0].trimAscii(), "")
}
}
if (let Some(u) <- pMap.get("u")) {
try {
urgency = Int64.parse(u) //may throw IllegalArgumentException
} catch (e: IllegalArgumentException) {}
}
return urgency
}
func checkAndSetCl(responseBuilder: HttpResponseBuilder, setCl: Bool): Unit {
match ((checkCl(responseBuilder.headers), sizeOf(responseBuilder._body), setCl)) {
case (Some(cl), Some(bs), false) =>
if (cl != bs) {
throw HttpException("The content-length and body length not match.")
}
case (None, Some(bs), true) => responseBuilder.headers.add("content-length", "${bs}")
case _ => ()
}
}
class HttpExpectBodyProviderH2 <: InputStream {
var respondedContinue: Bool = false
var context: ?HttpContext = None
HttpExpectBodyProviderH2(let conn: HttpEngineConn2) {}
public func read(buf: Array<Byte>): Int64 {
let ctx = context ?? throw HttpException("Internal error, ctx in body provider is None.")
synchronized(ctx.writerMtx) {
if (!respondedContinue && !ctx.responseFlushedByUser && !ctx.upgraded && !ctx.responded) {
conn.write100ContinueHeader()
respondedContinue = true
}
}
return conn.read(buf)
}
}