/**
Copyright (c) 2025 hashli
eventsource4cj is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/
package eventsource4cj.client
import eventsource4cj.model.*
import std.collection.*
import stdx.net.http.*
public class EventSource {
let client: Client
let url: String
let bufferSize: Int64
let handlers = HashMap<String, (Message) -> Unit>()
var retryInMills: UInt16 = 30_000
var lastMessageId: Option<String> = Option.None
var defaultMessageHandler = {_: Message =>}
var exceptionHandler = {e: Exception, _: Array<UInt8> => e.printStackTrace()}
public init(url: String) {
let client = ClientBuilder().readTimeout(365 * 100 * Duration.day).build()
this.client = client
this.url = url
this.bufferSize = 1024 * 8
}
public init(url: String, bufferSize: Int64) {
let client = ClientBuilder().readTimeout(365 * 100 * Duration.day).build()
this.client = client
this.url = url
this.bufferSize = bufferSize
}
public func setExceptionHandler(handler: (Exception, Array<UInt8>) -> Unit): Unit {
exceptionHandler = handler
}
public func onMessage(handler: (Message) -> Unit): Unit {
this.defaultMessageHandler = handler
}
public func onEvent(event: String, handler: (Message) -> Unit): Unit {
let listener = {
message: Message => if (message.eventType.isSome() && event == message.eventType.getOrThrow()) {
handler(message)
}
}
handlers.add(event, listener)
}
public func subscribe(): Future<Unit> {
return spawn {
while (true) {
try {
let stat = connect(this.lastMessageId)
if (stat == 0) {
return
}
} catch (ex: Exception) {
ex.printStackTrace()
sleep(Int64(retryInMills) * Duration.millisecond)
}
}
}
}
private func connect(lastMessageId: Option<String>): Int {
let headers = HttpHeaders()
headers.add("Accept", "text/event-stream")
if (lastMessageId.isSome()) {
headers.add("Last-Event-ID", lastMessageId.getOrThrow())
}
let endpoint = url
let req = HttpRequestBuilder().url(endpoint).addHeaders(headers).get().build()
let rsp = client.send(req)
let isEventStream = rsp.headers.iterator().any({
head => head[0].toAsciiLower() == "content-type" && head[1].iterator().at(0).getOrThrow().toAsciiLower() ==
"text/event-stream"
})
if (!isEventStream) {
throw Exception("服务端响应类型错误:Content-Type不正确!")
}
// 处理响应流
let nextLine = UInt8(b'\n')
let stream = rsp.body
let messageaHolder = ArrayList<UInt8>()
let buffer = Array<UInt8>(bufferSize, {_ => 0})
while (true) {
var len = stream.read(buffer)
if (len == 0) {
return 0 // chuned流结束 连接关闭
}
for (byte in buffer[0..len]) {
if (byte != nextLine) {
messageaHolder.add(byte)
continue
}
let lastByte = messageaHolder.get(messageaHolder.size - 1)
if (lastByte.isNone() || lastByte != nextLine) {
messageaHolder.add(byte)
continue
}
try {
// 已缓存了完整消息
messageaHolder.add(byte)
let msg = messageaHolder.toArray()
let message = MessageBuilder.resolve(msg)
// 消息指定了重试时间则按指定值设置
if (message.retryInMills.isSome()) {
this.retryInMills = message.retryInMills.getOrThrow()
}
// 调用消息处理函数
if (message.eventType.isNone()) {
this.defaultMessageHandler(message)
} else {
let event = message.eventType.getOrThrow()
let handler = this.handlers.get(event)
if (handler.isNone()) {
this.defaultMessageHandler(message)
} else {
handler.getOrThrow()(message)
}
}
if (message.msgId.isSome()) {
this.lastMessageId = message.msgId
}
} catch (e: Exception) {
exceptionHandler(e, buffer)
} finally {
messageaHolder.clear()
}
}
}
return -1
}
}