/*
* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* This source file is part of the Cangjie project, licensed under Apache-2.0
* with Runtime Library Exception.
*
* See https://cangjie-lang.cn/pages/LICENSE for license information.
*/
package stdx.net.http
import std.sync.{Timer, Mutex, AtomicUInt32, AtomicBool, AtomicUInt8, AtomicUInt64}
import std.net.{SocketAddress, StreamingSocket, SocketException}
import std.collection.{ArrayList, HashMap}
import stdx.log.LogLevel
import stdx.net.tls.common.TlsException
import std.time.MonoTime
class HttpServer2 <: ProtocolService {
// a wrapped tls connection
let conn: BufferedConn
// frames to be send
// level of DATA/HEADERS/CONTINUATION/PUSH_PROMISE frames is 1, level of other frames is 0
let responseQueue = SPMCLevelQueue<Frame>(2)
// requests to be handled, queued by 7 - urgency
let requestQueue = SPMCLevelQueue<Stream>(8)
// local settings can be set by serverBuilder
// remote settings are set to default, they will be reset when receiving SETTINGS frame from client
var localSettings = HashMap<UInt16, UInt32>()
var remoteSettings = initializeSettings()
// settings ack timer, it will be set and started when sending SETTINGS frame, and canceled when receiving SETTINGS frame with ack flag
// in this implementation, SETTINGS frame will be sent only once
var settingsTimer: ?Timer = None
// stream management
private let streams = HashMap<UInt32, Stream>()
private let streamsMutex = Mutex()
private var lastClientStreamId = UInt32(0)
private var lastPushStreamId = UInt32(0)
private let activeClientStreamNum = AtomicUInt32(0)
private let activeClientRstStreamNum = AtomicUInt32(0)
private let activePushStreamNum = AtomicUInt32(0)
private let activePushRstStreamNum = AtomicUInt32(0)
private let pushMutex = Mutex()
// closing means service is in graceful closing, and it will refuse new requests
private var closing = false
// quit means underlying tls connection is closed
private var quit = AtomicBool(false)
// flow control on connection level
var localWindow = AtomicUInt32(DEFAULT_WINDOW_SIZE)
var remoteWindow = AtomicUInt32(DEFAULT_WINDOW_SIZE)
//threshold to send window update
var flowThreshold = DEFAULT_WINDOW_SIZE / 2
// hpack
let encoder = Encoder(name: "HttpServer2")
let decoder = Decoder(encoder, name: "HttpServer2")
let fieldsWriter: FieldsWriter
// default value is 16384, will be reset when receiving ack to initialSettings
var localMaxFrameSize = MIN_FRAME_SIZE
// remote address
var remoteAddress: ?SocketAddress = None
// frame header buffer
let frameHeaderBuffer = Array<Byte>(FRAME_HEAD_LEN, repeat: 0)
// zeroValue is more suitable...
var streamPool: ?PutSafeRingPool<Any> = None
var arrayPool: ?ArrayPool = None
// quitDone will be 0 when all threads are returned
let quitDone = AtomicUInt8(3)
/***************************************************** serve *****************************************************/
init(socket: StreamingSocket) {
this.conn = BufferedConn(socket)
this.fieldsWriter = FieldsWriter(conn.bufferedWriter)
}
protected func serve() {
if (logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger, "[HttpServer2#serve] Http/2.0 serve...")
}
conn.logger = logger
decoder.logger = logger
encoder.logger = logger
remoteAddress = conn.socket.remoteAddress
storeLocalSettings()
// stream pool
if (server.streamPools.isNone() && server.maxConcurrentStreams > 0) {
server.streamPools = ConcurrentRingPool<PutSafeRingPool<Any>>(MAX_STREAM_POOLS_CAPACITY,
STREAM_POOLS_THRESHOLD, newFn: {=> PutSafeRingPool<Any>(MAX_STREAM_POOL_CAPACITY, newFn: {=> Stream()})}
)
}
streamPool = server.streamPools?.get()
// array pool
match (server.arrayPool) {
case Some(v) => this.arrayPool = v
case None =>
let v = ArrayPool(MAX_ARRAY_POOL_CAPACITY, ARRAY_POOL_THRESHOLD)
server.arrayPool = v
this.arrayPool = v
}
// start threads
let connId = ThreadContext.connId
writeThread(connId)
handleThread(connId)
mainThread() // read thread
// main thread returned
handbackStream()
}
private func storeLocalSettings() {
// SETTINGS_HEADER_TABLE_SIZE
localSettings.add(SettingsHeaderTableSize.code, server.headerTableSize)
decoder.setHeaderTableSizeLimit(Int64(server.headerTableSize))
encoder.setHeaderTableSizeLimit(Int64(server.headerTableSize))
// SETTINGS_MAX_CONCURRENT_STREAMS
localSettings.add(SettingsMaxConcurrentStreams.code, server.maxConcurrentStreams)
// SETTINGS_INITIAL_WINDOW_SIZE
localSettings.add(SettingsInitialWindowSize.code, server.initialWindowSize)
// we can set initialWindowSize only when constructing a server instance, so the flowThreshold will not be changed.
flowThreshold = server.initialWindowSize / 2
// SETTINGS_MAX_FRAME_SIZE
localSettings.add(SettingsMaxFrameSize.code, server.maxFrameSize)
// SETTINGS_MAX_HEADER_LIST_SIZE
localSettings.add(SettingsMaxHeaderListSize.code, server.maxHeaderListSize)
decoder.maxHeaderListSize = Int64(server.maxHeaderListSize)
// SETTINGS_ENABLE_CONNECT_PROTOCOL
if (server.enableConnectProtocol) {
localSettings.add(SettingsEnableConnectProtocol.code, 1)
} else {
localSettings.add(SettingsEnableConnectProtocol.code, 0)
}
// this implementation does not support rfc 7540, use rfc 9218 instead
localSettings.add(SettingsNoRfc7540Priorities.code, 1)
}
private func mainThread() {
try {
exchangePreface()
} catch (e: Exception) {
httpLogWarn(logger, "[HttpServer2#mainThread] exchange preface failed, ${e}")
shutdown(shouldSleep: false)
return
}
try {
while (!quit.load()) {
readFrames()
}
} catch (e: SocketException | ConnectionException | TlsException) {
httpLogWarn(logger, "[HttpServer2#mainThread] read frame failed, ${e}")
shutdown(shouldSleep: false)
} catch (ce: HttpConnectionException) {
close(ce.h2Error, ce.message)
} catch (he: HpackException) {
close(CompressionError, he.message)
} catch (e: Exception) {
close(H2Error.InternalError, e.toString())
}
if (logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger, "[HttpServer2#mainThread] connection closed, read thread returned")
}
}
/***************************************************** h2 preface *****************************************************/
// cjlint-ignore -start !G.OTH.03
/*
* Initialize h2 connection:
* step 1: receive and check client preface, "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".toArray()
* step 2: send server initialSettings, which carry localSettings
* step 3: receive and apply client initialSettings
* for details: https://www.rfc-editor.org/rfc/rfc9113.html.html#preface
*
* @throws SocketException, ConnectionException or TlsException, if something wrong with socket
* @throws HttpConnectionException, if some h2 connection error occurs
*/
// cjlint-ignore -end
private func exchangePreface() {
var prefaceReceived = Array<UInt8>(PREFACE.size, repeat: 0)
let _ = conn.readFull(prefaceReceived)
if (prefaceReceived != PREFACE) {
throw HttpConnectionException(ProtocolError, "Initialize h2 connection failed, client preface is required.")
}
sendSettings()
let firstFrameReceived = match (Frame.read(conn, headerBuf: frameHeaderBuffer)) {
case Some(v) => v as SettingsFrame
case None => throw HttpConnectionException(ProtocolError,
"Initialize h2 connection failed, settings frame is required.")
}
match (firstFrameReceived) {
case Some(settingsFrame) =>
if (settingsFrame.payloadLen > localMaxFrameSize) {
throw HttpConnectionException(ProtocolError,
"Size of initial settings exceeds default max frame size ${MIN_FRAME_SIZE}.")
}
onSettingsRead(settingsFrame)
case None => throw HttpConnectionException(ProtocolError,
"Initialize h2 connection failed, settings frame is required.")
}
if (logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger, "[HttpServer2#exchangePreface] initialize h2 connection succeed")
}
}
/***************************************************** read thread *****************************************************/
/*
* Read and process frames from tls conn.
*
* @throws SocketException, ConnectionException or TlsException, if something wrong with socket
* @throws HttpConnectionException, if some h2 connection error occurs
* @throws HpackException, if decode fields failed
*/
private func readFrames() {
// may throw HttpConnectionException, SocketException, TlsException
var frame = match (Frame.read(conn, maxFrameSize: localSettings[SettingsMaxFrameSize.code],
headerBuf: frameHeaderBuffer, arrayPool: arrayPool)) {
case Some(v) => v
case None => return
}
if (logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger, "[HttpServer2#readFrames] received frame: ${frame}, stream id: ${frame.streamId}.")
}
if (frame.payloadLen > localMaxFrameSize) {
throw HttpConnectionException(ProtocolError,
"Frame size exceeds max frame size ${localMaxFrameSize}, which is set by server.")
}
// read and process HEADERS + CONTINUATION, which may be headers/trailers of request
if (let Some(hf) <- (frame as HeadersFrame)) {
if (logger.enabled(LogLevel.TRACE)) {
httpLogTrace(logger, "[HttpServer2#readFrames] header frame received.")
}
readAndProcessFields(hf)
return
}
// process global frames
if (frame.streamId == 0) {
preProcessGlobalFrame(frame)
return
}
// process other frames on stream
// process flow control when receiving DATA frame
let (streamOp, isPurged) = getStream(frame.streamId)
match (streamOp) {
case None =>
if (isPurged) {
if (frame is DataFrame) {
if (logger.enabled(LogLevel.TRACE)) {
httpLogTrace(logger, "[HttpServer2#readFrames] Stream ${frame.streamId} is purged. Data frame received.")
}
processDataFlow(frame.payloadLen)
windowUpdateOnDataConsumed(frame.payloadLen)
}
} else {
throw HttpConnectionException(ProtocolError, "Unexpected frame on idle stream ${frame.streamId}.")
}
case Some(stream) =>
if (frame is DataFrame) {
if (logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger, "[HttpServer2#readFrames] Data frame received.")
}
processDataFlow(frame.payloadLen)
if (stream.dataBBQ.isClosed()) {
if (logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger, "[HttpServer2#readFrames] Data queue has been closed.")
}
windowUpdateOnDataConsumed(frame.payloadLen)
}
}
preProcessStreamFrame(frame, stream)
}
}
/**
* @throws HttpConnectionException, if flow control check fail
*/
private func processDataFlow(len: UInt32): Unit {
// Local window should decrease when receiving data frame, increase when sending windowUpdate frame,
if (localWindow.load() < len) {
throw HttpConnectionException(FlowControlError,
"PayloadLen of DATA frame exceeds local window on connection level.")
}
localWindow.fetchSub(len)
}
func windowUpdateOnDataConsumed(len: UInt32): Unit {
if (len <= 0) {
return
}
// Release local window when Data-Frame has been consumed.
// Send Window-Update to peer.
sendWindowUpdate()
}
/**
* The first HEADERS frame on a stream will be treated as part of headers of request.
* An Idle stream will be create or get (if it has already been created when receiving PRIORITY_UPDATE frame),
* and read timer on new stream will be started, when read the first HEADERS frame, and the connection is not on closing.
* The subsequent HEADERS frame on a stream will be treated as part of trailers of request.
*
* If the connection is on closing, or the related stream has already been purged, the fields will not be processed on stream level.
* But the fields will always be decoded, in order to update the status of hpack.
*
* HEADERS frame and CONTINUATION frame of a field entity (that is, headers or trailers of request) must be received continuously on connection level.
*
* @throws SocketException, ConnectionException or TlsException, if something wrong with socket
* @throws HttpConnectionException, if some h2 connection error occurs
* @throws HpackException, if decode fields failed
*/
private func readAndProcessFields(hf: HeadersFrame): Unit {
let streamId = hf.streamId
let streamOp: ?Stream = match (getStream(streamId)) { // (streamOp, isPurged)
// an existed stream (trailers may received on existed streams)
case (Some(v), _) => v
// a new stream
case (None, false) => createClientStream(streamId, startReadTimer: true)
// a purged stream, we should decode and then discard the fields
case _ => None
}
let fieldsList = if (hf.headerEnd) {
// fast path, only HeadersFrame
if (logger.enabled(LogLevel.TRACE)) {
httpLogTrace(logger, "[HttpServer2#readAndProcessFields] Got END_HEADERS.")
}
decodeFields(decoder, hf.fieldBlock)
} else {
// slow path, HeadersFrame + ContinuationFrames
let cfs = ArrayList<Frame>(1)
let fieldsBlocks = ArrayList<UInt8>()
fieldsBlocks.add(all: hf.fieldBlock)
var headerEnd = false
while (!headerEnd) {
let cf = Frame
.read(conn, arrayPool: arrayPool)
.getOrThrow(
{
=> HttpConnectionException(ProtocolError,
"Headers and continuation frames should be continuous on connection level.")
}) as ContinuationFrame ?? throw HttpConnectionException(ProtocolError, // cjlint-ignore !G.EXP.03
"Headers and continuation frames should be continuous on connection level.")
if (cf.streamId != streamId) {
throw HttpConnectionException(ProtocolError, "Unexpected Continuation on stream ${streamId}.")
}
cfs.add(cf)
fieldsBlocks.add(all: cf.fieldBlock)
headerEnd = cf.headerEnd
}
for (cf in cfs) {
cf.recyclePayload(arrayPool)
}
decodeFields(decoder, fieldsBlocks)
}
// make sure payload has no slice
hf.recyclePayload(arrayPool)
if (let Some(stream) <- streamOp) {
let fieldsFrame = FieldsFrame(streamId, fieldsList, last: hf.streamEnd)
preProcessStreamFrame(fieldsFrame, stream)
}
}
var lastExceptionTime = MonoTime.now()
/*
* Process frame on stream level.
*
* @throws HttpConnectionException, if some h2 connection error occurs
* @throws HpackException, if decode fields failed
*/
private func preProcessStreamFrame(frame: Frame, stream: Stream): Unit {
try {
stream.preProcess(frame)
} catch (se: HttpStreamException) {
// stream exception must be caught and processed here, because the outer function can not get the stream
let now = MonoTime.now()
let needSlow = now - lastExceptionTime < Duration.second * 5
lastExceptionTime = now
stream.close(se.h2Error, se.message, slow: needSlow)
}
}
/***************************************************** handle thread *****************************************************/
private func handleThread(connId: ?UInt64) { //never close the connection
spawn {
ThreadContext.connId = connId
try {
handleRequests()
} finally {
handbackStream()
}
}
}
private func handleRequests() {
if (logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger, "[HttpServer2#handleRequests] handle thread init")
}
let reqCount = AtomicUInt64(0)
while (!quit.load()) {
let stream = requestQueue.receive() ?? continue
let connId = ThreadContext.connId
spawn {
ThreadContext.connId = connId
reqCount.fetchAdd(1)
try {
stream.handle()
} catch (e: Exception) {
stream.close(H2Error.InternalError, e.message)
} finally {
reqCount.fetchSub(1)
}
}
}
while (reqCount.load() > 0) {
sleep(Duration.millisecond * 200)
}
if (logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger, "[HttpServer2#handleRequests] connection closed, handle thread returned")
}
}
/***************************************************** write thread *****************************************************/
private func writeThread(connId: ?UInt64) {
spawn {
ThreadContext.connId = connId
try {
writeFrames()
} finally {
handbackStream()
}
}
}
private func handbackStream() {
if (quitDone.fetchSub(1) == 1) {
server.streamPools?.put(streamPool ?? return ())
streamPool = None
arrayPool = None
clearSettingsTimer()
}
}
private func writeFrames() {
if (logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger, "[HttpServer2#writeFrames] write thread init")
}
var stream = Box<Stream>(Stream())
var toWriteFrames = ArrayList<Frame>()
while (!quit.load()) {
try {
if (!writeFrame(stream, toWriteFrames)) {
break
}
} catch (e: HttpStreamException) {
// stream must have been assigned, if HttpStreamException is thrown
stream.value.close(e.h2Error, e.message)
} catch (e: SocketException | ConnectionException | TlsException) {
httpLogWarn(logger, "[HttpServer2#writeFrames] ${e}")
shutdown(shouldSleep: false)
break
} catch (e: HpackException) {
close(H2Error.CompressionError, e.message)
break
} catch (e: Exception) {
close(H2Error.InternalError, e.toString())
break
}
}
if (logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger, "[HttpServer2#writeFrames] connection closed, write thread returned")
}
}
private func writeFrame(stream: Box<Stream>, toWriteFrames: ArrayList<Frame>): Bool {
let frame = responseQueue.receive() ?? return false
if (frame.streamId == 0 && !(frame is UnblockFrame)) {
postProcessGlobalFrame(frame)
if (logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger, "[HttpServer2#writeFrames] write frame: ${frame}")
}
frame.writeTo(conn.bufferedWriter)
return true
}
if (frame is WindowUpdateFrame || frame is RstStreamFrame) {
stream.value = streams.get(frame.streamId) ?? return true
stream.value.postProcess(frame)
if (logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger, "[HttpServer2#writeFrames] write frame: ${frame}")
}
frame.writeTo(conn.bufferedWriter)
return true
}
// DataFrame || FieldsFrame || UnblockFrame || ConnectRstStreamFrame
// unblockFrame is added to responseQueue when receiving global WINDOW_UPDATE
if (!(frame is UnblockFrame)) {
toWriteFrames.add(frame)
}
while (!toWriteFrames.isEmpty()) {
let waitingFrame = toWriteFrames.remove(at: 0)
stream.value = streams.get(waitingFrame.streamId) ?? continue
// when stream is closed, but hasn't been purged yet, for example, write timeout,
// queued data and fields frames should be discarded, in order to avoid blocking due to connection flow control
if (stream.value.isClosed()) {
continue
}
if (!writeFrame(waitingFrame, stream.value)) {
break
}
}
return true
}
private func writeFrame(frame: Frame, stream: Stream): Bool {
match (frame) {
case f: DataFrame =>
if (frame.payloadLen > remoteWindow.load()) {
if (logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger, "[HttpServer2#writeFrames] no enough window on connection level")
}
return false
}
stream.postProcess(frame) //may throw stream exception and should not write
remoteWindow.fetchSub(frame.payloadLen)
f.writeTo(conn.bufferedWriter)
f.recyclePayload(arrayPool)
if (logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger, "[HttpServer2#writeFrames] write frame: ${f}")
}
if (f.streamEnd) {
stream.writeTimer.cancel()
if (logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger,
"[HttpServer2#writeFrames] cancel write timer after write body on stream ${stream.streamId}"
)
}
}
case f: FieldsFrame =>
stream.postProcess(frame)
writeFieldsFrame(f)
if (f.streamEnd) {
stream.writeTimer.cancel()
if (logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger,
"[HttpServer2#writeFrames] cancel write timer after write fields (headers or trailers) on stream ${stream.streamId}"
)
}
}
case f: ConnectRstStreamFrame =>
stream.postProcess(f.rstFrame)
f.rstFrame.writeTo(conn.bufferedWriter)
if (logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger, "[HttpServer2#writeFrames] write frame: ${f}")
}
case _ => throw HttpException("Unexpected frame: ${frame}.") //should never come here
}
true
}
private func writeFieldsFrame(frame: FieldsFrame) {
fieldsWriter.streamId = frame.streamId
fieldsWriter.streamEnd = frame.streamEnd
fieldsWriter.pushId = frame.pushId
frame.writeTo(fieldsWriter, encoder)
if (logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger, "[HttpServer2#writeFieldsFrame] write frame: ${frame}")
}
}
/***************************************************** get stream *****************************************************/
/*
* look for stream in streams
* return tuple: (stream in streams, isPurged)
*/
private func getStream(streamId: UInt32): (?Stream, Bool) {
synchronized(streamsMutex) {
let streamOp = streams.get(streamId)
if (!streamOp.isNone()) {
return (streamOp, false)
}
if (isPushStream(streamId)) {
if (streamId <= lastPushStreamId) {
return (streamOp, true)
} else {
return (streamOp, false)
}
} else {
if (streamId <= lastClientStreamId) {
return (streamOp, true)
} else {
return (streamOp, false)
}
}
}
}
/*
* return Some(v), if stream has been created successfully, and is ready to process request
* return None, if connection is in graceful closing
* return None, if stream has been created, but is already closed in this function
*
* @throws HttpConnectionException to exit read thread, if streamId is invalid
*/
private func createClientStream(streamId: UInt32, startReadTimer!: Bool): ?Stream {
if (closing) {
synchronized(streamsMutex) {
verifyClientStreamId(streamId)
}
return None
}
var stream: Stream
synchronized(streamsMutex) {
verifyClientStreamId(streamId)
stream = match (streamPool) {
case None => Stream(this, streamId)
case Some(pool) => (pool.get() as Stream) ?? throw Exception("The pool get not Stream.")
}
stream.reset(this, streamId)
streams.add(streamId, stream)
}
// update stream id records
lastClientStreamId = streamId
activeClientStreamNum.fetchAdd(1)
// close the stream immediately, if num of client streams exceeds SettingsMaxConcurrentStreams
// when closing, we inform client that the request hasn't been handled, and client can send it again
if (activeClientStreamOverLimit()) {
stream.close(RefusedStream,
"received frame on stream ${streamId}, num of active client streams exceeds SettingsMaxConcurrentStreams, which is set by server"
)
return None
}
if (startReadTimer) {
stream.startReadTimer()
}
return stream
}
private func activeClientStreamOverLimit(): Bool {
return activeClientStreamNum.load() > localSettings[SettingsMaxConcurrentStreams.code] ||
// no need to verify strictly 2 times
(activeClientRstStreamNum.load() >> 1) > localSettings[SettingsMaxConcurrentStreams.code]
}
/*
* create a push stream, id is lastPushStreamId + 2
* it will be called only in user handler
*
* throw HttpException, if num of push streams exceeds MAX_STREAM_ID or num of active push streams exceeds SettingsMaxConcurrentStreams
*/
func createPushStream(): Stream {
synchronized(pushMutex) {
lastPushStreamId += 2
verifyPushStreamId(lastPushStreamId)
let stream = Stream(this, lastPushStreamId)
synchronized(streamsMutex) {
streams.add(lastPushStreamId, stream)
}
activePushStreamNum.fetchAdd(1)
return stream
}
}
/*
* @throws HttpConnectionException to exit read thread, if streamId is invalid
*/
private func verifyClientStreamId(id: UInt32) {
if (isPushStream(id)) {
throw HttpConnectionException(ProtocolError, "Stream initiated by client should be identified with odd num.")
}
if (id <= lastClientStreamId) {
throw HttpConnectionException(ProtocolError, "Id of new stream should be the largest.")
}
if (id > MAX_STREAM_ID) {
throw HttpConnectionException(ProtocolError, "Stream id reaches the max, 2^31 - 1.")
}
}
/*
* @throws HttpException to user handler, if push streamId is invalid
*/
private func verifyPushStreamId(id: UInt32) {
if (activePushStreamOverLimit()) {
throw HttpException(
"Write push related to stream ${id}, num of active push streams exceeds SettingsMaxConcurrentStreams, which is set by client."
)
}
if (id > MAX_STREAM_ID) {
throw HttpException("Push stream id exceeds 2^31-1.") //to user handler
}
}
private func activePushStreamOverLimit(): Bool {
return activePushStreamNum.load() >= remoteSettings[SettingsMaxConcurrentStreams.code] ||
// no need to verify strictly 2 times
(activePushRstStreamNum.load() >> 1) >= remoteSettings[SettingsMaxConcurrentStreams.code]
}
/***************************************************** global frames *****************************************************/
private func preProcessGlobalFrame(frame: Frame) {
match (frame) {
case f: SettingsFrame => onSettingsRead(f)
case f: WindowUpdateFrame => onWindowUpdateRead(f)
case f: GoawayFrame => onGoawayRead(f)
case f: PriorityUpdateFrame => onPriorityUpdateRead(f)
case f: PingFrame => onPingRead(f)
case _ => throw HttpConnectionException(ProtocolError, "Received invalid frame on stream 0.")
}
}
private func sendSettings(ack!: Bool = false): Unit {
let frame = if (ack) {
SettingsFrame() //settings frame with ack flag set
} else {
SettingsFrame(localSettings)
}
if (!responseQueue.send(frame, CONTROL_PRIORITY) && logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger, "[HttpServer2#sendSettings] connection closed, send Settings frame failed.")
}
}
func sendWindowUpdate(): Unit {
let remain = localWindow.load()
if (remain > DEFAULT_WINDOW_SIZE / 2) {
return
}
let initial = localSettings[SettingsInitialWindowSize.code]
if (!localWindow.compareAndSwap(remain, initial)) {
return
}
let frame = WindowUpdateFrame(0, initial - remain)
if (!responseQueue.send(frame, CONTROL_PRIORITY) && logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger, "[HttpServer2#sendWindowUpdate] connection closed, send WINDOW_UPDATE frame failed")
}
}
private func sendGoaway(h2Error: H2Error, lastProcessedId: UInt32, debugMsg: String) {
let frame = GoawayFrame(lastProcessedId, h2Error.code, data: debugMsg.toArray())
if (!responseQueue.send(frame, CONTROL_PRIORITY) && logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger, "[HttpServer2#sendGoaway] connection closed, send Goaway frame failed")
}
}
private func onSettingsRead(frame: SettingsFrame): Unit {
// ack settings, will be read only once
if (frame.ack) {
if (frame.payloadLen != 0) {
throw HttpConnectionException(FrameSizeError, "Settings with ack should not have payload.")
}
if (!clearSettingsTimer()) {
throw HttpConnectionException(ProtocolError, "Unexpected ack settings.")
}
localMaxFrameSize = server.maxFrameSize
return
}
// non ack settings
for ((k, v) in frame.settings) {
match (getSettingByCode(k)) {
case SettingsHeaderTableSize =>
// receive SETTINGS_HEADER_TABLE_SIZE by peer Decoder
encoder.receiveSettingsHeaderTableSize(Int64(v))
case SettingsEnablePush =>
if (v != 0 && v != 1) {
throw HttpConnectionException(ProtocolError,
"Invalid client settings, SettingsEnablePush should be 0 or 1.")
}
case SettingsMaxConcurrentStreams => ()
case SettingsInitialWindowSize =>
if (v > MAX_WINDOW) {
throw HttpConnectionException(FlowControlError,
"Invalid client settings, SettingsInitialWindowSize should not exceeds 2^31.")
}
setActiveStreamWindow(v)
case SettingsMaxFrameSize =>
if (v < UInt32(2 ** 14) || v > UInt32(2 ** 24 - 1)) {
throw HttpConnectionException(ProtocolError,
"Invalid client settings, SettingsMaxFrameSize should be in range [2^14..2^24-1].")
}
case SettingsMaxHeaderListSize => encoder.maxHeaderListSize = Int64(v)
case _ => ()
}
this.remoteSettings[k] = v
}
let fieldsBlockSize = Int64(remoteSettings[SettingsMaxFrameSize.code])
if (fieldsWriter.blockSize != fieldsBlockSize) {
fieldsWriter.buffer = Array<Byte>(fieldsBlockSize, repeat: 0)
fieldsWriter.blockSize = fieldsBlockSize
}
sendSettings(ack: true)
}
// cjlint-ignore -start !G.OTH.03
/*
* see https://www.rfc-editor.org/rfc/rfc9113.html#InitialWindowSize
*/
// cjlint-ignore -end
private func setActiveStreamWindow(initialWindow: UInt32) {
let increment = Int64(initialWindow) - Int64(this.remoteSettings[SettingsInitialWindowSize.code])
synchronized(streamsMutex) {
for ((_, stream) in streams) {
if (stream.remoteWindow.load() + increment > Int64(MAX_WINDOW)) {
throw HttpConnectionException(FlowControlError,
"SettingsInitialWindowSize cause remote window of stream ${stream.streamId} exceeds max value")
} else {
stream.remoteWindow.fetchAdd(increment)
}
}
}
}
/*
* in this implementation will not send ping
*/
private func onPingRead(frame: PingFrame) {
if (frame.ack) {
return
}
let ping = PingFrame(isAck: true, payload: frame.payload)
if (!responseQueue.send(ping, CONTROL_PRIORITY) && logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger, "[HttpServer2#onPingRead] connection closed, send Ping frame failed")
}
}
private func onWindowUpdateRead(frame: WindowUpdateFrame) {
remoteWindow.fetchAdd(frame.increment) //never overflow
if (remoteWindow.load() > MAX_WINDOW) {
throw HttpConnectionException(FlowControlError,
"The WINDOW_UPDATE frame cause remote window exceeds 2^31-1 on connection level.")
}
// unblock write thread
if (!responseQueue.send(UnblockFrame(), CONTROL_PRIORITY) && logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger, "[HttpServer2#onWindowUpdateRead] connection closed, send internal frame failed.")
}
}
private func onGoawayRead(frame: GoawayFrame) {
if (logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger,
"[HttpServer2#onGoawayRead] connection closing, received goaway frame, errCode = ${frame.errorCode}, debugMsg = ${sanitizeForLog(frame.debugData)}"
)
}
shutdown(shouldSleep: false)
}
// cjlint-ignore -start !G.OTH.03
/*
* the value of parameter urgency will be set on related stream
* if the related stream has not been created yet, and the streamId is valid (odd, > lastClientStreamId, < MAX_STREAM_ID), a new Idle stream with the specified urgency will be created
* if the related stream is already in handle or purged, the frame will be ignored
* see https://www.rfc-editor.org/rfc/rfc9218.html
*
* @throws HttpConnectionException if streamId invalid
*/
// cjlint-ignore -end
private func onPriorityUpdateRead(frame: PriorityUpdateFrame): Unit {
let (streamOp, isPurged) = getStream(frame.prioritizedId)
if (streamOp.isNone() && !isPurged) {
if (let Some(stream) <- createClientStream(frame.prioritizedId, startReadTimer: false)) {
stream.queuePriority = MAX_URGENCY - parsePriority(frame.fieldValue)
}
}
}
private func postProcessGlobalFrame(frame: Frame) {
match (frame) {
case f: SettingsFrame =>
if (!f.ack) {
onSettingsWrite()
}
case _: WindowUpdateFrame => () //nothing to do
case _: GoawayFrame => ()
case _: PingFrame => ()
case _ => ()
}
}
/* will be called only once, when write initial settings */
private func onSettingsWrite() {
settingsTimer = Timer.once(Duration.second * 10) {
=>
httpLogWarn(logger, "[HttpServer2#onSettingsWrite] connection closing, wait ack settings timeout")
shutdown(shouldSleep: false)
}
}
/***************************************************** closing *****************************************************/
func purgeStream(stream: Stream, purge: Bool, rstCount: Bool): Unit {
let id = stream.streamId
if (purge) {
synchronized(streamsMutex) {
let removed = streams.remove(id)
match (removed) {
case None => return
case Some(v) => streamPool?.put(v)
}
if (logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger,
"[HttpServer2#purgeStream] purgeStream, stream id: ${id}, size of streams: ${streams.size}")
}
}
}
if (rstCount) {
match ((isPushStream(id), stream.isRst.load())) {
case (true, true) =>
if (stream.isRst.compareAndSwap(true, false)) {
activePushRstStreamNum.fetchSub(1)
}
case (true, false) => activePushStreamNum.fetchSub(1)
case (false, true) =>
if (stream.isRst.compareAndSwap(true, false)) {
activeClientRstStreamNum.fetchSub(1)
}
case (false, false) => activeClientStreamNum.fetchSub(1)
}
}
if (logger.enabled(LogLevel.TRACE)) {
httpLogTrace(logger, "[HttpServer2#purgeStream] activePushRstStreamNum = ${activePushRstStreamNum.load()}, activeClientRstStreamNum = ${activeClientRstStreamNum.load()}.")
}
}
protected func closeGracefully(): Unit {
if (quit.load()) {
return
}
if (logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger, "[HttpServer2#closeGracefully] closing connection gracefully")
}
sendGoaway(NoError, MAX_STREAM_ID, "server closing gracefully")
waitStreamInflight()
}
private func waitStreamInflight() {
sleep(Duration.second) // wait for 1s to allow client receiving goaway frame
sendGoaway(NoError, lastClientStreamId, "server closing gracefully")
closing = true
waitStreamComplete()
}
private func waitStreamComplete() {
for (_ in 0..10) {
if (quit.load()) {
return
} // some error occurred and connection is already closed
if (activeStreamNumIsZero()) {
if (logger.enabled(LogLevel.DEBUG)) {
httpLogDebug(logger, "[HttpServer2#waitStreamComplete] connection closed gracefully")
}
shutdown()
}
sleep(Duration.second)
}
httpLogWarn(logger, "[HttpServer2#waitStreamComplete] graceful closing timeout")
shutdown()
}
private func activeStreamNumIsZero(): Bool {
return activeClientStreamNum.load() == 0u32 && activePushStreamNum.load() == 0u32 &&
activeClientRstStreamNum.load() == 0u32 && activePushRstStreamNum.load() == 0u32
}
func addRstStreamCnt(rstFrame: RstStreamFrame): Unit {
match (isPushStream(rstFrame.streamId)) {
case true =>
activePushRstStreamNum.fetchAdd(1)
activePushStreamNum.fetchSub(1)
case false =>
activeClientRstStreamNum.fetchAdd(1)
activeClientStreamNum.fetchSub(1)
}
}
protected func close(): Unit {
close(NoError, "server closing forcibly")
}
private func close(h2Error: H2Error, debugMsg: String) {
httpLogWarn(logger, "[HttpServer2#close] connection closing, ${debugMsg}")
if (quit.load()) {
if (logger.enabled(LogLevel.TRACE)) {
httpLogTrace(logger, "[HttpServer2#close] Already quit.")
}
return
}
sendGoaway(h2Error, this.lastClientStreamId, debugMsg)
shutdown()
}
private func shutdown(shouldSleep!: Bool = true) {
if (logger.enabled(LogLevel.TRACE)) {
httpLogTrace(logger, "[HttpServer2#shutdown] Shutdown.")
}
if (quit.load()) {
return
}
if (shouldSleep) { // allow client to receive goaway
Timer.once(Duration.second) {
=>
quit.store(true)
conn.close()
}
} else {
quit.store(true)
conn.close()
}
synchronized(streamsMutex) {
for ((_, stream) in streams) {
// unblock data read in handle thread
stream.dataBBQ.close() //will cause receiving data frame return None
// unblock handle threads, which is writing body
stream.status = Closed
stream.cancelAllTimers()
stream.windowMonitor.lock()
stream.windowMonitor.notifyAll()
stream.windowMonitor.unlock()
}
}
clearSettingsTimer()
// unblock write thread and readBody in handler (a handle thread)
// notify user who is writing body data handle thread, close responseQueue will cause send frame fail
responseQueue.close()
// unblock handle thread
requestQueue.close()
}
private func clearSettingsTimer(): Bool {
if (let Some(v) <- settingsTimer) {
v.cancel()
settingsTimer = None
return true
}
false
}
}