c34f7862创建于 1月15日历史提交
import queue
from typing import Any, Dict, List, Optional, Sequence
from uuid import UUID

from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.documents import Document


class EventDispatchCallbackHandler(BaseCallbackHandler):
    '''Puts dispatched events onto an event queue to be processed as a streaming chunk'''
    def __init__(self, queue: queue.Queue):
        self.queue = queue

    def on_custom_event(
        self,
        name: str,
        data: Any,
        *,
        run_id: UUID,
        tags: Optional[List[str]] = None,
        metadata: Optional[Dict[str, Any]] = None,
        **kwargs
    ):
        self.queue.put({
            'type': 'event',
            'name': name,
            'data': data
        })

    def on_retriever_end(
        self,
        documents: Sequence[Document],
        *,
        run_id: UUID,
        parent_run_id: Optional[UUID] = None,
        **kwargs: Any,
    ) -> Any:
        document_objects = []
        for d in documents:
            document_objects.append({
                'content': d.page_content,
                'metadata': d.metadata
            })
        self.queue.put({
            'type': 'event',
            'name': 'retriever_end',
            'data': {
                'documents': document_objects
            }
        })