from abc import ABC, abstractmethod
import logging
import subprocess
import threading
import time
import uuid
from devil.utils import reraiser_thread
class ExpensiveLineTransformer(ABC):
def __init__(self, process_start_timeout, minimum_timeout, per_line_timeout):
self._process_start_timeout = process_start_timeout
self._minimum_timeout = minimum_timeout
self._per_line_timeout = per_line_timeout
self._started = False
self._lock = threading.Lock()
self._close_lock = threading.Lock()
self._closed_called = False
self._proc = None
self._proc_start_time = None
def start(self):
if self._started:
logging.error('%s: Trying to start an already started command', self.name)
return
self._proc_start_time = time.time()
if not self.command:
logging.error('%s: No command available', self.name)
return
self._proc = subprocess.Popen(self.command,
bufsize=1,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
universal_newlines=True,
close_fds=True)
self._started = True
def IsClosed(self):
return (not self._started or self._closed_called
or self._proc.returncode is not None)
def IsBusy(self):
return self._lock.locked()
def IsReady(self):
return self._started and not self.IsClosed() and not self.IsBusy()
def TransformLines(self, lines):
"""Symbolizes names found in the given lines.
If anything goes wrong (process crashes, timeout, etc), returns |lines|.
Args:
lines: A list of strings without trailing newlines.
Returns:
A list of strings without trailing newlines.
"""
if not lines:
return []
eof_line = self.getEofLine()
out_lines = []
def _reader():
while True:
line = self._proc.stdout.readline()
if not line:
break
line = line[:-1]
if line == eof_line:
break
out_lines.append(line)
if self.IsBusy():
logging.warning('%s: Having to wait for transformation.', self.name)
with self._lock:
if self.IsClosed():
if self._started and not self._closed_called:
logging.warning('%s: Process exited with code=%d.', self.name,
self._proc.returncode)
self.Close()
return lines
reader_thread = reraiser_thread.ReraiserThread(_reader)
reader_thread.start()
try:
self._proc.stdin.write('\n'.join(lines))
self._proc.stdin.write('\n{}\n'.format(eof_line))
self._proc.stdin.flush()
time_since_proc_start = time.time() - self._proc_start_time
timeout = (max(0, self._process_start_timeout - time_since_proc_start) +
max(self._minimum_timeout,
len(lines) * self._per_line_timeout))
reader_thread.join(timeout)
if self.IsClosed():
logging.warning('%s: Close() called by another thread during join().',
self.name)
return lines
if reader_thread.is_alive():
logging.error('%s: Timed out after %f seconds with input:', self.name,
timeout)
for l in lines:
logging.error(l)
logging.error(eof_line)
logging.error('%s: End of timed out input.', self.name)
logging.error('%s: Timed out output was:', self.name)
for l in out_lines:
logging.error(l)
logging.error('%s: End of timed out output.', self.name)
self.Close()
return lines
return out_lines
except IOError:
logging.exception('%s: Exception during transformation', self.name)
self.Close()
return lines
def Close(self):
with self._close_lock:
needs_closing = not self.IsClosed()
self._closed_called = True
if needs_closing:
self._proc.stdin.close()
self._proc.kill()
self._proc.wait()
def __del__(self):
if not self._closed_called and self._proc:
logging.error('%s: Forgot to Close()', self.name)
self.Close()
@property
@abstractmethod
def name(self):
...
@property
@abstractmethod
def command(self):
...
@staticmethod
def getEofLine():
return "Generic useful log header: \'{}\'".format(uuid.uuid4().hex)
class ExpensiveLineTransformerPool(ABC):
def __init__(self, max_restarts, pool_size, passthrough_on_failure):
self._max_restarts = max_restarts
self._pool = [self.CreateTransformer() for _ in range(pool_size)]
self._passthrough_on_failure = passthrough_on_failure
self._lock = threading.Lock()
self._num_restarts = 0
def __enter__(self):
pass
def __exit__(self, *args):
self.Close()
def TransformLines(self, lines):
with self._lock:
assert self._pool, 'TransformLines() called on a closed Pool.'
if self._num_restarts == self._max_restarts:
if self._passthrough_on_failure:
return lines
raise Exception('%s is broken.' % self.name)
for i, d in enumerate(self._pool):
if d.IsClosed():
logging.warning('%s: Restarting closed instance.', self.name)
self._pool[i] = self.CreateTransformer()
self._num_restarts += 1
if self._num_restarts == self._max_restarts:
logging.warning('%s: MAX_RESTARTS reached.', self.name)
if self._passthrough_on_failure:
return lines
raise Exception('%s is broken.' % self.name)
selected = next((x for x in self._pool if x.IsReady()), self._pool[0])
self._pool.remove(selected)
self._pool.append(selected)
return selected.TransformLines(lines)
def Close(self):
with self._lock:
for d in self._pool:
d.Close()
self._pool = None
@abstractmethod
def CreateTransformer(self):
...
@property
@abstractmethod
def name(self):
...