"""
Pipe used to send status to golang process.
"""
import os
class NPipe:
"""pipe used to send status to golang process"""
def __init__(self, pipe_name, pipe_mode=os.O_SYNC | os.O_CREAT | os.O_WRONLY):
self.pipe = pipe_name
self.pipe_mode = pipe_mode
self.file = None
def write(self, message):
"""write message to file"""
if not isinstance(message, str):
return None
message = message.encode()
length = os.write(self.file, message)
return length
def open(self):
"""open pipe"""
if not os.path.exists(self.pipe):
return None
self.file = os.open(self.pipe, self.pipe_mode)
return self.file
def __del__(self):
self.close()
def close(self):
"""close file"""
if not self.file:
return
os.close(self.file)
def get_npipe(pipe):
"""get npipe"""
npipe = NPipe(pipe)
file = npipe.open()
if not file:
return None
return npipe