/*
* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* This source file is part of the Cangjie project, licensed under Apache-2.0
* with Runtime Library Exception.
*
* See https://cangjie-lang.cn/pages/LICENSE for license information.
*/
package stdx.net.http
import std.io.*
import std.sync.*
import std.collection.*
interface SeekableInputStream <: InputStream & Seekable {}
interface BodyProviderConn {
prop isReadTimeout: AtomicBool
func getBufferConn(): BufferedConn
func returnConn(): Unit
func closeConn(): Unit
}
class HttpEmptyBody <: SeekableInputStream {
static let INSTANCE = HttpEmptyBody()
private init() {}
public func read(_: Array<Byte>): Int64 {
return 0
}
public func seek(_: SeekPosition): Int64 {
0
}
public prop length: Int64 {
get() {
0
}
}
}
class HttpRawBody <: SeekableInputStream {
var readLen: Int64 = 0
let rawBody: Array<Byte>
init(raw: Array<Byte>) {
this.rawBody = raw
}
init(raw: String) {
this.rawBody = unsafe { raw.rawData() }
}
public prop length: Int64 {
get() {
this.rawBody.size
}
}
public func read(buf: Array<Byte>): Int64 {
if (readLen >= rawBody.size) {
return 0
}
let copyLen = min(buf.size, rawBody.size - readLen)
rawBody.copyTo(buf, readLen, 0, copyLen)
readLen += copyLen
return copyLen
}
public func seek(_: SeekPosition): Int64 {
0
}
}
/*
* wrap a unknown size input stream, cache all data read from stream
*/
class HttpBufferedBody <: SeekableInputStream {
private let data: ArrayList<UInt8> = ArrayList<UInt8>()
HttpBufferedBody(let body: InputStream) {}
public prop length: Int64 {
get() {
this.data.size
}
}
prop bytes: Array<UInt8> {
get() {
unsafe { data.getRawArray()[..length] }
}
}
public func read(buf: Array<Byte>): Int64 {
let len = body.read(buf)
if (len > 0) {
data.add(all: buf[..len])
}
return len
}
public func seek(_: SeekPosition): Int64 {
0
}
}
class HttpNormalBody <: SeekableInputStream {
private let data: Array<Byte>
private let contentLength: Int64
private var remainingLength: Int64
HttpNormalBody(let body: HttpNormalBodyProvider) {
contentLength = body.length
remainingLength = contentLength
let buf = Array<Byte>(READ_CHUNK_SIZE, repeat: 0)
let chunks = ArrayList<Byte>()
var len = body.read(buf)
while (len != 0) {
chunks.add(all: buf[..len])
len = body.read(buf)
}
data = chunks.toArray()
}
public prop length: Int64 {
get() {
contentLength
}
}
public func read(buf: Array<Byte>): Int64 {
if (remainingLength == 0) {
return 0
}
let copyLen = min(buf.size, remainingLength)
data.copyTo(buf, contentLength - remainingLength, 0, copyLen)
remainingLength -= copyLen
return copyLen
}
public func seek(_: SeekPosition): Int64 {
0
}
}
class HttpNormalBodyProvider <: SeekableInputStream & Resource {
var readLen: Int64 = 0
let conn: BufferedConn
var eof = false
HttpNormalBodyProvider(let providerConn: BodyProviderConn, let contentLength: Int64, let timer: HttpTimer) {
if (contentLength == 0) {
timer.cancel()
eof = true
}
conn = providerConn.getBufferConn()
}
public prop length: Int64 {
get() {
contentLength
}
}
public func read(buf: Array<Byte>): Int64 {
if (eof) {
return 0
}
// remain length of data
let remainLen = contentLength - readLen
// read from connection
let len: Int64
try {
len = conn.read(buf, remainLen)
} catch (e: Exception) {
timer.cancel()
providerConn.closeConn()
if (providerConn.isReadTimeout.load()) {
throw HttpTimeoutException("Read body timeout and the connection is closed.")
}
throw e
}
readLen += len
if (readLen >= contentLength) {
timer.cancel()
providerConn.returnConn()
eof = true
}
return len
}
public func close(): Unit {
timer.cancel()
providerConn.closeConn()
}
public func isClosed(): Bool {
providerConn.getBufferConn().isClosed()
}
public func seek(_: SeekPosition): Int64 {
0
}
}
/*
* This body provider is used to read chunk body from peer.
* chunked-body = *chunk
* last-chunk
* trailer-section
* CRLF
*
* chunk = chunk-size [ chunk-ext ] CRLF
* chunk-data CRLF
* chunk-size = 1*HEXDIG
* last-chunk = 1*("0") [ chunk-ext ] CRLF
*
* chunk-data = 1*OCTET ; a sequence of chunk-size octets
* RFC 9112 7.1.
*/
class HttpChunkedBodyProvider <: InputStream & Resource {
var eof: Bool = false
var contentLength = 0
var remainChunkSize = 0
var isLastChunk = false
let header: HttpHeaders
let trailer: HttpHeaders
let isReq: Bool
let conn: BufferedConn
HttpChunkedBodyProvider(let providerConn: BodyProviderConn, let msg: Object, let timer: HttpTimer) {
conn = providerConn.getBufferConn()
match (msg) {
case req: HttpRequest =>
header = req.headers
trailer = req.trailers
isReq = true
case rsp: HttpResponse =>
header = rsp.headers
trailer = rsp.trailers
isReq = false
case _ => throw HttpException("Unsupported msg type!")
}
}
public func read(dst: Array<Byte>): Int64 {
if (eof) {
return 0
}
var dstReadLen = 0
try {
// parse the existing content in the buffer.
// chunk-data is from the buffer,
// but CRLF
// and chunk-size [ chunk-ext ] CRLF
// and 1*("0") [ chunk-ext ] CRLF may not.
// break when eof or buffer is empty or dst is full.
do {
remainChunkSize = getRemainSize(remainChunkSize)
if (remainChunkSize == 0) {
return dstReadLen
}
let readLen = conn.read(dst[dstReadLen..], remainChunkSize)
dstReadLen += readLen
remainChunkSize -= readLen
if (remainChunkSize == 0) {
readNextChunkLine(isReq)
}
} while (conn.bufferedReader.remainingData != 0 && dstReadLen != dst.size)
} catch (e: Exception) {
timer.cancel()
providerConn.closeConn()
if (providerConn.isReadTimeout.load()) {
throw HttpTimeoutException("Read body timeout and the connection is closed.")
}
throw e
}
return dstReadLen
}
public func close(): Unit {
timer.cancel()
providerConn.closeConn()
}
public func isClosed(): Bool {
providerConn.getBufferConn().isClosed()
}
private func getRemainSize(remainChunkSize: Int64): Int64 {
if (remainChunkSize == 0) {
let chunkSize = readChunkSize()
this.contentLength += chunkSize
// last-chunk = 1*("0") [ chunk-ext ] CRLF
if (chunkSize == 0) {
eof = true
readTrailer()
clearHeader()
timer.cancel()
providerConn.returnConn()
}
return chunkSize
}
return remainChunkSize
}
// chunk-data CRLF
// read CRLF and next chunk-size [ chunk-ext ] CRLF
private func readNextChunkLine(isReq: Bool) {
let lf = readLine()
if (!lf.isEmpty()) {
if (isReq) {
throw HttpStatusException(HttpStatusCode.STATUS_BAD_REQUEST, "Bad request.")
} else {
throw HttpException("Invalid chunked data.")
}
}
}
/*
* Trailer = #field-name
* trailer-section = *( field-line CRLF )
* The "Trailer" header field provides a list of field names that the sender anticipates sending as
* trailer fields within that message. This allows a recipient to prepare for receipt of the indicated
* metadata before it starts processing the content
* RFC 9110 6.6.2.
*/
private func readTrailer(): Unit {
var trailerLineCount = 0
while (true) {
let trailerLine = readLine()
if (trailerLine.isEmpty()) {
break
}
trailerLineCount++
if (trailerLineCount > MAX_TRAILER_LINE_COUNT) {
if (isReq) {
throw HttpStatusException(HttpStatusCode.STATUS_BAD_REQUEST, "Too many trailer fields.")
} else {
throw HttpException("Too many trailer fields.")
}
}
let (name, value) = parseAndCheckHeaderLine(trailerLine, isReq)
if (validTrailerName(name)) {
trailer.add(name, value)
}
}
}
private func clearHeader(): Unit {
// set `Content-Length`
header.add("content-length", "${contentLength}")
// remove `chunked` from last
var hv = header.map.get(Str("transfer-encoding")) ?? return ()
hv.removeLastValue()
match (hv.isEmpty()) {
case true => header.del("transfer-encoding")
case false => header.map.add(Str("transfer-encoding"), hv)
}
}
private func validTrailerName(name: String): Bool {
let trailersArray = header.getInternal("Trailer") ?? return false
return trailersArray |> splitValuesByComma |> name.toAsciiLower().caseInsensitiveMatchOne
}
private func readLine(): Str {
let line: Str
try {
line = conn.readLine()
} catch (e: HttpException) {
if (isReq) {
throw HttpStatusException(HttpStatusCode.STATUS_BAD_REQUEST, "Invalid line.")
} else {
throw e
}
}
if (line.contains(CR)) {
if (isReq) {
throw HttpStatusException(HttpStatusCode.STATUS_BAD_REQUEST, "Invalid line.")
} else {
throw HttpException("Trailer section can not contain raw CR.")
}
}
return line
}
/*
* chunk = chunk-size [ chunk-ext ] CRLF
* chunk-data CRLF
* chunk-size = 1 *HEXDIG
*/
private func readChunkSize(): Int64 {
let line = readLine()
let chunkSize = line.splitFirst(SEMICOLON) ?? throw HttpException("Failed to extract chunk-size.") // extract chunk-size
try {
return Int64.fromHexStr(chunkSize)
} catch (e: Exception) {
if (isReq) {
throw HttpStatusException(HttpStatusCode.STATUS_BAD_REQUEST, "Bad request.")
} else {
throw HttpException("Invalid chunked data.")
}
}
}
}
class HttpExpectBodyProvider <: InputStream {
let realBody: InputStream
var respondedContinue: Bool = false
var context: ?HttpContext = None
HttpExpectBodyProvider(let conn: HttpEngineConn1, let request: HttpRequest, let chunked: Bool,
let contentLength: ?Int64, let timer: HttpTimer) {
realBody = match {
case chunked =>
request._bodySize = None
HttpChunkedBodyProvider(conn, request, timer)
case _ => match (contentLength) {
case Some(length) =>
request._bodySize = length
HttpNormalBodyProvider(conn, length, timer)
case None =>
timer.cancel()
throw HttpException(" The content-length should not be none!")
}
}
}
func setContext(ctx: HttpContext): Unit {
this.context = ctx
}
public func read(buf: Array<Byte>): Int64 {
let ctx = context.getOrThrow()
synchronized(ctx.writerMtx) {
if (!respondedContinue && !ctx.responseFlushedByUser && !ctx.upgraded && !ctx.responded) {
// response 100-continue
let response = HttpResponseBuilder().version(request.version).status(100).build()
conn.writeWithoutBody(response)
respondedContinue = true
}
}
// read body
return realBody.read(buf)
}
}