import pytest
import sys
import lz4.stream
import psutil
import os
_4GB = 0xffffffff
if os.environ.get('TRAVIS') is not None or sys.maxsize < _4GB or \
psutil.virtual_memory().available < _4GB:
huge = None
else:
try:
huge = b'\0' * _4GB
except (MemoryError, OverflowError):
huge = None
@pytest.mark.skipif(
os.environ.get('TRAVIS') is not None,
reason='Skipping test on Travis due to insufficient memory'
)
@pytest.mark.skipif(
sys.maxsize < _4GB,
reason='Py_ssize_t too small for this test'
)
@pytest.mark.skipif(
psutil.virtual_memory().available < _4GB or huge is None,
reason='Insufficient system memory for this test'
)
def test_huge_1():
data = b''
kwargs = {
'strategy': "double_buffer",
'buffer_size': lz4.stream.LZ4_MAX_INPUT_SIZE,
'store_comp_size': 4,
'dictionary': huge,
}
if psutil.virtual_memory().available < 3 * kwargs['buffer_size']:
pytest.skip('Insufficient system memory for this test')
message = r'^Dictionary too large for LZ4 API$'
with pytest.raises(OverflowError, match=message):
with lz4.stream.LZ4StreamCompressor(**kwargs) as proc:
proc.compress(data)
with pytest.raises(OverflowError, match=message):
with lz4.stream.LZ4StreamDecompressor(**kwargs) as proc:
proc.decompress(data)
@pytest.mark.skipif(
os.environ.get('TRAVIS') is not None,
reason='Skipping test on Travis due to insufficient memory'
)
@pytest.mark.skipif(
sys.maxsize < 0xffffffff,
reason='Py_ssize_t too small for this test'
)
@pytest.mark.skipif(
psutil.virtual_memory().available < _4GB or huge is None,
reason='Insufficient system memory for this test'
)
def test_huge_2():
data = huge
kwargs = {
'strategy': "double_buffer",
'buffer_size': lz4.stream.LZ4_MAX_INPUT_SIZE,
'store_comp_size': 4,
'dictionary': b'',
}
if psutil.virtual_memory().available < 3 * kwargs['buffer_size']:
pytest.skip('Insufficient system memory for this test')
message = r'^Input too large for LZ4 API$'
with pytest.raises(OverflowError, match=message):
with lz4.stream.LZ4StreamCompressor(**kwargs) as proc:
proc.compress(data)
with pytest.raises(lz4.stream.LZ4StreamError):
with lz4.stream.LZ4StreamDecompressor(**kwargs) as proc:
proc.decompress(data)
@pytest.mark.skipif(
os.environ.get('TRAVIS') is not None,
reason='Skipping test on Travis due to insufficient memory'
)
@pytest.mark.skipif(
sys.maxsize < 0xffffffff,
reason='Py_ssize_t too small for this test'
)
@pytest.mark.skipif(
psutil.virtual_memory().available < _4GB or huge is None,
reason='Insufficient system memory for this test'
)
def test_huge_3():
data = huge
kwargs = {
'strategy': "double_buffer",
'buffer_size': lz4.stream.LZ4_MAX_INPUT_SIZE,
'store_comp_size': 4,
'dictionary': huge,
}
if psutil.virtual_memory().available < 3 * kwargs['buffer_size']:
pytest.skip('Insufficient system memory for this test')
message = r'^Dictionary too large for LZ4 API$'
with pytest.raises(OverflowError, match=message):
with lz4.stream.LZ4StreamCompressor(**kwargs) as proc:
proc.compress(data)
with pytest.raises(OverflowError, match=message):
with lz4.stream.LZ4StreamDecompressor(**kwargs) as proc:
proc.decompress(data)
def test_dummy():
pass