import lz4.block
import pytest
import sys
import os
def test_decompress_ui32_overflow():
data = lz4.block.compress(b'A' * 64)
with pytest.raises(OverflowError):
lz4.block.decompress(data[4:], uncompressed_size=((1 << 32) + 64))
def test_decompress_without_leak():
data = lz4.block.compress(b'A' * 64)
message = r'^Decompressor wrote 64 bytes, but 79 bytes expected from header$'
with pytest.raises(lz4.block.LZ4BlockError, match=message):
lz4.block.decompress(b'\x4f' + data[1:])
def test_decompress_with_small_buffer():
data = lz4.block.compress(b'A' * 64, store_size=False)
message = r'^Decompression failed: corrupt input or insufficient space in destination buffer. Error code: \d+$'
with pytest.raises(lz4.block.LZ4BlockError, match=message):
lz4.block.decompress(data[4:], uncompressed_size=64)
with pytest.raises(lz4.block.LZ4BlockError, match=message):
lz4.block.decompress(data, uncompressed_size=60)
def test_decompress_truncated():
input_data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24
compressed = lz4.block.compress(input_data)
with pytest.raises(ValueError, match='Input source data size too small'):
lz4.block.decompress(compressed[:0])
for n in [0, 1]:
with pytest.raises(ValueError, match='Input source data size too small'):
lz4.block.decompress(compressed[:n])
for n in [24, 25, -2, 27, 67, 85]:
with pytest.raises(lz4.block.LZ4BlockError):
lz4.block.decompress(compressed[:n])
def test_decompress_with_trailer():
data = b'A' * 64
comp = lz4.block.compress(data)
message = r'^Decompression failed: corrupt input or insufficient space in destination buffer. Error code: \d+$'
with pytest.raises(lz4.block.LZ4BlockError, match=message):
lz4.block.decompress(comp + b'A')
with pytest.raises(lz4.block.LZ4BlockError, match=message):
lz4.block.decompress(comp + comp)
with pytest.raises(lz4.block.LZ4BlockError, match=message):
lz4.block.decompress(comp + comp[4:])
def test_unicode():
if sys.version_info < (3,):
return
DATA = b'x'
with pytest.raises(TypeError):
lz4.block.compress(DATA.decode('latin1'))
lz4.block.decompress(lz4.block.compress(DATA).decode('latin1'))
def test_return_bytearray():
if sys.version_info < (3,):
return
data = os.urandom(128 * 1024)
compressed = lz4.block.compress(data)
b = lz4.block.compress(data, return_bytearray=True)
assert isinstance(b, bytearray)
assert bytes(b) == compressed
b = lz4.block.decompress(compressed, return_bytearray=True)
assert isinstance(b, bytearray)
assert bytes(b) == data
def test_memoryview():
if sys.version_info < (2, 7):
return
data = os.urandom(128 * 1024)
compressed = lz4.block.compress(data)
assert lz4.block.compress(memoryview(data)) == compressed
assert lz4.block.decompress(memoryview(compressed)) == data
def test_with_dict_none():
input_data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24
for mode in ['default', 'high_compression']:
assert lz4.block.decompress(lz4.block.compress(
input_data, mode=mode, dict=None)) == input_data
assert lz4.block.decompress(lz4.block.compress(
input_data, mode=mode), dict=None) == input_data
assert lz4.block.decompress(lz4.block.compress(
input_data, mode=mode, dict=b'')) == input_data
assert lz4.block.decompress(lz4.block.compress(
input_data, mode=mode), dict=b'') == input_data
assert lz4.block.decompress(lz4.block.compress(
input_data, mode=mode, dict='')) == input_data
assert lz4.block.decompress(lz4.block.compress(
input_data, mode=mode), dict='') == input_data
def test_with_dict():
input_data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24
dict1 = input_data[10:30]
dict2 = input_data[20:40]
message = r'^Decompression failed: corrupt input or insufficient space in destination buffer. Error code: \d+$'
for mode in ['default', 'high_compression']:
compressed = lz4.block.compress(input_data, mode=mode, dict=dict1)
with pytest.raises(lz4.block.LZ4BlockError, match=message):
lz4.block.decompress(compressed)
with pytest.raises(lz4.block.LZ4BlockError, match=message):
lz4.block.decompress(compressed, dict=dict1[:2])
assert lz4.block.decompress(compressed, dict=dict2) != input_data
assert lz4.block.decompress(compressed, dict=dict1) == input_data
assert lz4.block.decompress(lz4.block.compress(
input_data), dict=dict1) == input_data
def test_known_decompress_1():
input = b'\x00\x00\x00\x00\x00'
output = b''
assert lz4.block.decompress(input) == output
def test_known_decompress_2():
input = b'\x01\x00\x00\x00\x10 '
output = b' '
assert lz4.block.decompress(input) == output
def test_known_decompress_3():
input = b'h\x00\x00\x00\xff\x0bLorem ipsum dolor sit amet\x1a\x006P amet'
output = b'Lorem ipsum dolor sit amet' * 4
assert lz4.block.decompress(input) == output
def test_known_decompress_4():
input = b'\xb0\xb3\x00\x00\xff\x1fExcepteur sint occaecat cupidatat non proident.\x00' + (b'\xff' * 180) + b'\x1ePident'
output = b'Excepteur sint occaecat cupidatat non proident' * 1000
assert lz4.block.decompress(input) == output