import sys
import base64
from mindie_llm.connector.request_listener.shared_mem_communication import SharedMemory, SharedMemCommunication
from mindie_llm.connector.main import main
record_data = True
mock_request = False
record_data_file = "/home/connector_mock_data.txt"
record_index = 0
def get_obj_with_record(self, class_name):
global record_index
self.sem_consumer.acquire()
length = int.from_bytes(self.shm.buf[:4], 'little')
if length == 0:
raise ValueError("proto_data length is 0")
proto_data = bytes(self.shm.buf[4:length + 4])
with open(record_data_file, 'a', encoding='utf-8') as fi:
fi.write(str(record_index) + "request:\n")
fi.write(base64.b64encode(proto_data).decode('utf-8'))
fi.write("\n")
self.sem_producer.release()
obj = class_name()
obj.ParseFromString(proto_data)
return obj
def send_obj_with_record(self, obj, offset):
global record_index
proto_data = obj.SerializeToString()
length = len(proto_data)
self.sem_producer.acquire()
self.shm.buf[offset:4 + offset] = length.to_bytes(4, 'little')
self.shm.buf[4 + offset:length + 4 + offset] = proto_data
with open(record_data_file, 'a', encoding='utf-8') as fi:
fi.write(str(record_index) + "response:\n")
fi.write(base64.b64encode(proto_data).decode('utf-8'))
fi.write("\n")
self.sem_consumer.release()
record_index += 1
def get_obj_mock(self, class_name):
global record_index
self.sem_consumer.acquire()
length = int.from_bytes(self.shm.buf[:4], 'little')
if length == 0:
raise ValueError("proto_data length is 0")
self.sem_producer.release()
target_line = None
with open(record_data_file, 'r', encoding='utf-8') as fi:
for line in fi:
if line.strip().startswith(str(record_index) + 'response:'):
target_line = next(fi).strip()
record_index += 1
proto_data_response = target_line.encode('utf-8') if target_line else None
if proto_data_response is not None:
proto_data_response = base64.b64decode(proto_data_response)
from mindie_llm.connector.common.model_execute_data_pb2 import ExecuteResponse
obj = ExecuteResponse()
obj.ParseFromString(proto_data_response)
SharedMemCommunication.send_model_execute_response_cls(obj)
return None
if mock_request:
SharedMemory.get_obj = get_obj_mock
if record_data:
with open(record_data_file, 'w') as f:
f.write("")
SharedMemory.get_obj = get_obj_with_record
SharedMemory.send_obj = send_obj_with_record
if __name__ == '__main__':
sys.exit(main())