Ee2002v0.9.337b
4f1582b7创建于 2024年11月27日历史提交
/*
  Asynchronous WebServer library for Espressif MCUs

  Copyright (c) 2016 Hristo Gochkov. All rights reserved.

  This library is free software; you can redistribute it and/or
  modify it under the terms of the GNU Lesser General Public
  License as published by the Free Software Foundation; either
  version 2.1 of the License, or (at your option) any later version.

  This library is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  Lesser General Public License for more details.

  You should have received a copy of the GNU Lesser General Public
  License along with this library; if not, write to the Free Software
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
*/
#include "Arduino.h"
#include "AsyncEventSource.h"
#if ESP_ARDUINO_VERSION >= ESP_ARDUINO_VERSION_VAL(3, 0, 0)
#include "rom/ets_sys.h"
#endif

static String generateEventMessage(const char *message, const char *event, uint32_t id, uint32_t reconnect){
  String ev = "";

  if(reconnect){
    ev += "retry: ";
    ev += String(reconnect);
    ev += "\r\n";
  }

  if(id){
    ev += "id: ";
    ev += String(id);
    ev += "\r\n";
  }

  if(event != NULL){
    ev += "event: ";
    ev += String(event);
    ev += "\r\n";
  }

  if(message != NULL){
    size_t messageLen = strlen(message);
    char * lineStart = (char *)message;
    char * lineEnd;
    do {
      char * nextN = strchr(lineStart, '\n');
      char * nextR = strchr(lineStart, '\r');
      if(nextN == NULL && nextR == NULL){
        size_t llen = ((char *)message + messageLen) - lineStart;
        char * ldata = (char *)malloc(llen+1);
        if(ldata != NULL){
          memcpy(ldata, lineStart, llen);
          ldata[llen] = 0;
          ev += "data: ";
          ev += ldata;
          ev += "\r\n\r\n";
          free(ldata);
        }
        lineStart = (char *)message + messageLen;
      } else {
        char * nextLine = NULL;
        if(nextN != NULL && nextR != NULL){
          if(nextR < nextN){
            lineEnd = nextR;
            if(nextN == (nextR + 1))
              nextLine = nextN + 1;
            else
              nextLine = nextR + 1;
          } else {
            lineEnd = nextN;
            if(nextR == (nextN + 1))
              nextLine = nextR + 1;
            else
              nextLine = nextN + 1;
          }
        } else if(nextN != NULL){
          lineEnd = nextN;
          nextLine = nextN + 1;
        } else {
          lineEnd = nextR;
          nextLine = nextR + 1;
        }

        size_t llen = lineEnd - lineStart;
        char * ldata = (char *)malloc(llen+1);
        if(ldata != NULL){
          memcpy(ldata, lineStart, llen);
          ldata[llen] = 0;
          ev += "data: ";
          ev += ldata;
          ev += "\r\n";
          free(ldata);
        }
        lineStart = nextLine;
        if(lineStart == ((char *)message + messageLen))
          ev += "\r\n";
      }
    } while(lineStart < ((char *)message + messageLen));
  }

  return ev;
}

// Message

AsyncEventSourceMessage::AsyncEventSourceMessage(const char * data, size_t len)
: _data(nullptr), _len(len), _sent(0), _acked(0)
{
  _data = (uint8_t*)malloc(_len+1);
  if(_data == nullptr){
    _len = 0;
  } else {
    memcpy(_data, data, len);
    _data[_len] = 0;
  }
}

AsyncEventSourceMessage::~AsyncEventSourceMessage() {
     if(_data != NULL)
        free(_data);
}

size_t AsyncEventSourceMessage::ack(size_t len, uint32_t time) {
  (void)time;
  // If the whole message is now acked...
  if(_acked + len > _len){
     // Return the number of extra bytes acked (they will be carried on to the next message)
     const size_t extra = _acked + len - _len;
     _acked = _len;
     return extra;
  }
  // Return that no extra bytes left.
  _acked += len;
  return 0;
}

size_t AsyncEventSourceMessage::send(AsyncClient *client) {
  const size_t len = _len - _sent;
  if(client->space() < len){
    return 0;
  }
  size_t sent = client->add((const char *)_data, len);
  if(client->canSend())
    client->send();
  _sent += sent;
  return sent; 
}

// Client

AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, AsyncEventSource *server)
: _messageQueue(LinkedList<AsyncEventSourceMessage *>([](AsyncEventSourceMessage *m){ delete  m; }))
{
  _client = request->client();
  _server = server;
  _lastId = 0;
  if(request->hasHeader("Last-Event-ID"))
    _lastId = atoi(request->getHeader("Last-Event-ID")->value().c_str());
    
  _client->setRxTimeout(0);
  _client->onError(NULL, NULL);
  _client->onAck([](void *r, AsyncClient* c, size_t len, uint32_t time){ (void)c; ((AsyncEventSourceClient*)(r))->_onAck(len, time); }, this);
  _client->onPoll([](void *r, AsyncClient* c){ (void)c; ((AsyncEventSourceClient*)(r))->_onPoll(); }, this);
  _client->onData(NULL, NULL);
  _client->onTimeout([this](void *r, AsyncClient* c __attribute__((unused)), uint32_t time){ ((AsyncEventSourceClient*)(r))->_onTimeout(time); }, this);
  _client->onDisconnect([this](void *r, AsyncClient* c){ ((AsyncEventSourceClient*)(r))->_onDisconnect(); delete c; }, this);

  _server->_addClient(this);
  delete request;
}

AsyncEventSourceClient::~AsyncEventSourceClient(){
   _messageQueue.free();
  close();
}

void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage){
  if(dataMessage == NULL)
    return;
  if(!connected()){
    delete dataMessage;
    return;
  }
  if(_messageQueue.length() >= SSE_MAX_QUEUED_MESSAGES){
      ets_printf("ERROR: Too many messages queued\n");
      delete dataMessage;
  } else {
      _messageQueue.add(dataMessage);
  }
  if(_client->canSend())
    _runQueue();
}

void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){
  while(len && !_messageQueue.isEmpty()){
    len = _messageQueue.front()->ack(len, time);
    if(_messageQueue.front()->finished())
      _messageQueue.remove(_messageQueue.front());
  }

  _runQueue();
}

void AsyncEventSourceClient::_onPoll(){
  if(!_messageQueue.isEmpty()){
    _runQueue();
  }
}


void AsyncEventSourceClient::_onTimeout(uint32_t time __attribute__((unused))){
  _client->close(true);
}

void AsyncEventSourceClient::_onDisconnect(){
  _client = NULL;
  _server->_handleDisconnect(this);
}

void AsyncEventSourceClient::close(){
  if(_client != NULL)
    _client->close();
}

void AsyncEventSourceClient::write(const char * message, size_t len){
  _queueMessage(new AsyncEventSourceMessage(message, len));
}

void AsyncEventSourceClient::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){
  String ev = generateEventMessage(message, event, id, reconnect);
  _queueMessage(new AsyncEventSourceMessage(ev.c_str(), ev.length()));
}

void AsyncEventSourceClient::_runQueue(){
  while(!_messageQueue.isEmpty() && _messageQueue.front()->finished()){
    _messageQueue.remove(_messageQueue.front());
  }

  for(auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i)
  {
    if(!(*i)->sent())
      (*i)->send(_client);
  }
}


// Handler

AsyncEventSource::AsyncEventSource(const String& url)
  : _url(url)
  , _clients(LinkedList<AsyncEventSourceClient *>([](AsyncEventSourceClient *c){ delete c; }))
  , _connectcb(NULL)
{}

AsyncEventSource::~AsyncEventSource(){
  close();
}

void AsyncEventSource::onConnect(ArEventHandlerFunction cb){
  _connectcb = cb;
}

void AsyncEventSource::_addClient(AsyncEventSourceClient * client){
  /*char * temp = (char *)malloc(2054);
  if(temp != NULL){
    memset(temp+1,' ',2048);
    temp[0] = ':';
    temp[2049] = '\r';
    temp[2050] = '\n';
    temp[2051] = '\r';
    temp[2052] = '\n';
    temp[2053] = 0;
    client->write((const char *)temp, 2053);
    free(temp);
  }*/
  
  _clients.add(client);
  if(_connectcb)
    _connectcb(client);
}

void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient * client){
  _clients.remove(client);
}

void AsyncEventSource::close(){
  for(const auto &c: _clients){
    if(c->connected())
      c->close();
  }
}

// pmb fix
size_t AsyncEventSource::avgPacketsWaiting() const {
  if(_clients.isEmpty())
    return 0;
  
  size_t    aql=0;
  uint32_t  nConnectedClients=0;
  
  for(const auto &c: _clients){
    if(c->connected()) {
      aql+=c->packetsWaiting();
      ++nConnectedClients;
    }
  }
//  return aql / nConnectedClients;
  return ((aql) + (nConnectedClients/2))/(nConnectedClients); // round up
}

void AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){


  String ev = generateEventMessage(message, event, id, reconnect);
  for(const auto &c: _clients){
    if(c->connected()) {
      c->write(ev.c_str(), ev.length());
    }
  }
}

size_t AsyncEventSource::count() const {
  return _clients.count_if([](AsyncEventSourceClient *c){
    return c->connected();
  });
}

bool AsyncEventSource::canHandle(AsyncWebServerRequest *request){
  if(request->method() != HTTP_GET || !request->url().equals(_url)) {
    return false;
  }
  request->addInterestingHeader("Last-Event-ID");
  return true;
}

void AsyncEventSource::handleRequest(AsyncWebServerRequest *request){
  if((_username != "" && _password != "") && !request->authenticate(_username.c_str(), _password.c_str()))
    return request->requestAuthentication();
  request->send(new AsyncEventSourceResponse(this));
}

// Response

AsyncEventSourceResponse::AsyncEventSourceResponse(AsyncEventSource *server){
  _server = server;
  _code = 200;
  _contentType = "text/event-stream";
  _sendContentLength = false;
  addHeader("Cache-Control", "no-cache");
  addHeader("Connection","keep-alive");
}

void AsyncEventSourceResponse::_respond(AsyncWebServerRequest *request){
  String out = _assembleHead(request->version());
  request->client()->write(out.c_str(), _headLength);
  _state = RESPONSE_WAIT_ACK;
}

size_t AsyncEventSourceResponse::_ack(AsyncWebServerRequest *request, size_t len, uint32_t time __attribute__((unused))){
  if(len){
    new AsyncEventSourceClient(request, _server);
  }
  return 0;
}