/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved.
 */
package magic.utils.http

import magic.log.LogUtils
import magic.utils.StringExt

import std.collection.concurrent.BlockingQueue
import std.collection.{HashMap, ArrayList}
import encoding.json.JsonObject
import std.io.{InputStream, StringReader}
import magic.dsl.prompt

enum Item {
    | Bytes(Array<Byte>)
    | EOF
    | Error
}

/**
 * A http stream is an iterator of strings
 * Wrap of a blocking queue to support streaming.
 */
protected class HttpStream <: Iterator<String> & InputStream {
    let queue = BlockingQueue<Item>()
    let byteBuffer = ArrayList<Byte>()

    // Set when all data is consumed
    var finished = false

    private let closeFn: Option<() -> Unit>

    init(closeFn!: Option<() -> Unit> = None) {
        this.closeFn = closeFn
    }

    private func readBytes(): Bool {
        if (finished) {
            return false
        }
        match (queue.dequeue()) {
            case Error =>
                throw HttpException("Http byte stream error")
            case EOF =>
                finished = true
                LogUtils.debug("Http byte stream finished")
                return false
            case Bytes(bytes) =>
                this.byteBuffer.appendAll(bytes)
                return true
        } 
    }

    override public func read(buffer: Array<Byte>): Int64 {
        if (this.byteBuffer.isEmpty() && !this.readBytes()) {
            return 0
        }
        let len = if (this.byteBuffer.size <= buffer.size) {
            this.byteBuffer.size
        } else {
            buffer.size
        }

        unsafe {
            let bytes = this.byteBuffer.getRawArray()
            bytes.copyTo(buffer, 0, 0, len)
        }

        if (this.byteBuffer.size == len) {
            this.byteBuffer.clear()
        } else {
            this.byteBuffer.remove(0..len)
        }
        return len
    }

    var _reader: Option<StringReader<HttpStream>> = None
    private prop reader: StringReader<HttpStream> {
        get() {
            if (let Some(r) <- this._reader) {
                return r
            } else {
                let r = StringReader(this)
                this._reader = r
                return r
            }
        }
    }

    override public func next(): Option<String> {
        let line = this.reader.readln()
        LogUtils.debug("Http byte stream read line: `${line}`")
        return line
    }

    public func put(data: Array<Byte>): Unit {
        // LogUtils.debug("put data: `${data}`")
        queue.enqueue(Item.Bytes(data))
    }

    public func markEOF(): Unit {
        queue.enqueue(Item.EOF)
    }

    public func markError(): Unit {
        queue.enqueue(Item.Error)
    }

    public func close(): Unit {
        this.markEOF()
        if (let Some(fn) <- this.closeFn) {
            fn()
        }
    }
}