完整示例代码
package eventsource4jexample
import eventsource4cj.client.*
import eventsource4cj.model.*
import eventsource4cj.server.*
import std.collection.*
import std.collection.ArrayList
import std.collection.concurrent.*
import std.io.*
import std.random.*
import std.sync.*
import std.time.*
import std.time.*
import stdx.log.*
import stdx.net.http.*
let clients = ArrayList<SseEmitterBroker>()
let messageCount = AtomicInt64(0)
let endpoint = "/bus"
main(): Int64 {
println("hello world")
spawn {
serve()
}
let eventsource1 = EventSource("http://127.0.0.1:8080/bus?token=c1-89uy93r")
eventsource1.setExceptionHandler(
{
ex, buffer =>
println("[客户端1]-接收消息错误:${ex.message}")
println("[客户端1]-异常数据:${buffer}")
}
)
let eventsource2 = EventSource("http://127.0.0.1:8080/bus?token=c2-394j590&sse-tag=GA")
eventsource1.onMessage(
{
msg =>
println("[客户端1]-默认处理消息----------------->")
println("[客户端1]-" + msg.toMessageString())
}
)
eventsource1.onEvent(
"test-event",
{
msg =>
println("[客户端1]-处理消息----------------->")
println("[客户端1]-" + msg.toMessageString())
}
)
eventsource2.onMessage(
{
msg =>
println("[客户端2]-默认处理消息=========>")
println("[客户端2]-" + msg.toMessageString())
}
)
eventsource2.onEvent(
"test-event",
{
msg =>
println("[客户端2]-处理消息=========>")
println("[客户端2]-" + msg.toMessageString())
}
)
let future = eventsource1.subscribe()
eventsource2.subscribe()
future.get()
return 0
}
func serve(): Int64 {
println("hello eventsource4cj!!!")
let sseServer = SseServer("127.0.0.1", 8080)
sseServer.getHttpServer().logger.level = LogLevel.DEBUG
addExamplePage(sseServer.getHttpServer())
sseServer.publish(endpoint, checkAndInit)
sseServer.publish("/bus2")
doBusiness()
sseServer.serveForever()
return 0
}
func checkAndInit(broker: SseEmitterBroker): Bool {
let requestContext = broker.httpContext
let token = requestContext.request.headers.get("token")
clients.add(broker)
return true
}
func doBusiness(): Unit {
Timer.repeat(
0 * Duration.second,
5 * Duration.second,
{
=>
clients.iterator().forEach(
{
broker =>
let index = messageCount.fetchAdd(1)
let message = MessageBuilder()
.event("test-event")
.id("id-${index}-1")
.data("数据行1-${DateTime.nowUTC().toString()}")
.data("数据行2-${DateTime.nowUTC().toString()}")
.data("数据行3-${DateTime.nowUTC().toString()}")
.build()
println("向端点上所有连接发送消息->${message.toMessageString()}\n")
broker.send(message)
}
)
let emitter = SseEmitter.getEmitter("/bus").getOrThrow()
let message = MessageBuilder()
.data("给所有请求参数sse-tag=GA客户端的消息-${DateTime.nowUTC().toString()}")
.build()
emitter.send(message, SseEmitter.defaultBrokerFilterByRequestParameterValue("sse-tag", "GA"))
}
)
}
func addExamplePage(httpServer: Server) {
httpServer.distributor.register(
"/hello",
{
context =>
let body = StringBuilder()
body.append("<html><head></head><body>test server send event!!!!<div id='msgbox'></div></body><script>")
body.append("console.log('load sse ...');")
body.append(
"const evtsource = new EventSource('http://127.0.0.1:8080/bus?clientId=clientId-1&sse-tag=user8080');")
body.append("evtsource.onmessage = function(ctx) {console.log(ctx)};")
body.append("evtsource.addEventListener('test-event', function(ctx) {")
body.append(" console.log(ctx);")
body.append(" const boxer = document.getElementById('msgbox');")
body.append(" const p = document.createElement('p');")
body.append(" p.innerText = ctx.lastEventId + ':' + ctx.data;")
body.append(" boxer.appendChild(p);")
body.append("});")
body.append("</script></html>")
context.responseBuilder.header("Content-Type", "text/html").body(body.toString().toArray())
}
)
}