Skip to content

Commit 7466396

Browse files
MegaRedHandh3lio5
andauthored
feat: snappy framing format (decompression) (#710)
Co-authored-by: Akash SM <akashsm@ce.iitr.ac.in>
1 parent af5d0d9 commit 7466396

File tree

6 files changed

+252
-3
lines changed

6 files changed

+252
-3
lines changed

lib/snappy_ex.ex

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
defmodule SnappyEx do
2+
@moduledoc """
3+
Encoder/decoder implementation for the [Snappy framing format](https://github.com/google/snappy/blob/main/framing_format.txt)
4+
"""
5+
import Bitwise
6+
7+
@bit_mask_32 2 ** 32 - 1
8+
@chunk_size_limit 65_540
9+
10+
@stream_identifier "sNaPpY"
11+
12+
@id_compressed_data 0x00
13+
@id_uncompressed_data 0x01
14+
@id_padding 0xFE
15+
@id_stream_identifier 0xFF
16+
17+
@ids_payload_chunks [@id_compressed_data, @id_uncompressed_data]
18+
@ids_reserved_unskippable_chunks 0x02..0x7F
19+
@ids_reserved_skippable_chunks 0x80..0xFD
20+
21+
##########################
22+
### Public API
23+
##########################
24+
25+
@doc """
26+
Compresses the given data.
27+
Returns the compressed data.
28+
29+
## Examples
30+
31+
iex> SnappyEx.compress("")
32+
<<0xFF, 6::little-size(24)>> <> "sNaPpY"
33+
"""
34+
@spec compress(binary()) :: binary()
35+
def compress(data) when is_binary(data) do
36+
# TODO: implement
37+
<<@id_stream_identifier, 6::little-size(24)>> <> @stream_identifier
38+
end
39+
40+
@doc """
41+
Uncompresses a given stream.
42+
Returns a result tuple with the uncompressed data or an error message.
43+
44+
## Examples
45+
46+
iex> SnappyEx.decompress(<<0xFF, 6::little-size(24)>> <> "sNaPpY")
47+
{:ok, ""}
48+
"""
49+
@spec decompress(nonempty_binary()) :: {:ok, binary()} | {:error, String.t()}
50+
def decompress(<<@id_stream_identifier>> <> _ = chunks), do: decompress_frames(chunks, <<>>)
51+
52+
@spec decompress(<<>>) :: {:error, String.t()}
53+
def decompress(chunks) when is_binary(chunks), do: {:error, "no stream identifier at beginning"}
54+
55+
@spec compute_checksum(binary()) :: Types.uint32()
56+
def compute_checksum(data) when is_binary(data) do
57+
checksum = Crc32c.calc!(data)
58+
59+
# the crc32c checksum of the uncompressed data is masked before inserted into the
60+
# frame using masked_checksum = ((checksum >> 15) | (checksum << 17)) + 0xa282ead8
61+
(checksum >>> 15 ||| checksum <<< 17) + 0xA282EAD8 &&& @bit_mask_32
62+
end
63+
64+
##########################
65+
### Private Functions
66+
##########################
67+
68+
defp decompress_frames("", acc), do: {:ok, acc}
69+
70+
defp decompress_frames(chunks, acc) do
71+
with {:ok, {id, data, remaining_chunks}} <- process_chunk_metadata(chunks),
72+
{:ok, new_acc} <- parse_chunk(acc, id, data) do
73+
decompress_frames(remaining_chunks, new_acc)
74+
end
75+
end
76+
77+
# chunk layout: 1-byte chunk_id, 3-bytes chunk_size, remaining chunks if any data present.
78+
defp process_chunk_metadata(chunks) when byte_size(chunks) < 4,
79+
do: {:error, "header too small"}
80+
81+
defp process_chunk_metadata(<<_id::size(8), size::little-size(24), rest::binary>>)
82+
when byte_size(rest) < size,
83+
do: {:error, "missing data in chunk. expected: #{byte_size(rest)}. got: #{size}"}
84+
85+
defp process_chunk_metadata(<<id::size(8), size::little-size(24), rest::binary>>) do
86+
<<chunk::binary-size(size), remaining_chunks::binary>> = rest
87+
{:ok, {id, chunk, remaining_chunks}}
88+
end
89+
90+
# Stream identifier
91+
# NOTE: it can appear more than once, and must be validated each time
92+
defp parse_chunk(acc, @id_stream_identifier, @stream_identifier), do: {:ok, acc}
93+
defp parse_chunk(_, @id_stream_identifier, _), do: {:error, "invalid stream identifier"}
94+
95+
# Data-carrying chunks (compressed or uncompressed)
96+
defp parse_chunk(_acc, id, data)
97+
when id in @ids_payload_chunks and byte_size(data) > @chunk_size_limit,
98+
do: {:error, "chunk is bigger than limit"}
99+
100+
defp parse_chunk(acc, id, data) when id in @ids_payload_chunks do
101+
<<checksum::little-size(32), compressed_data::binary>> = data
102+
103+
with {:ok, uncompressed_data} <- decompress_payload(id, compressed_data),
104+
:ok <- verify_checksum(uncompressed_data, checksum) do
105+
{:ok, <<acc::binary, uncompressed_data::binary>>}
106+
end
107+
end
108+
109+
# Skippable chunks (padding or reserved)
110+
defp parse_chunk(acc, id, _data)
111+
when id == @id_padding or id in @ids_reserved_skippable_chunks,
112+
do: {:ok, acc}
113+
114+
# Reserved unskippable chunks
115+
defp parse_chunk(_acc, id, _data) when id in @ids_reserved_unskippable_chunks,
116+
do: {:error, "unskippable chunk of type: #{id}"}
117+
118+
defp decompress_payload(@id_compressed_data, data), do: :snappyer.decompress(data)
119+
defp decompress_payload(@id_uncompressed_data, data), do: {:ok, data}
120+
121+
defp verify_checksum(data, checksum) do
122+
if checksum == compute_checksum(data),
123+
do: :ok,
124+
else: {:error, "invalid checksum"}
125+
end
126+
end

lib/types/mod.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ defmodule Types do
77
## Integer types
88
@type uint8 :: 0..unquote(2 ** 8 - 1)
99
@type uint16 :: 0..unquote(2 ** 16 - 1)
10+
@type uint32 :: 0..unquote(2 ** 32 - 1)
1011
@type uint64 :: 0..unquote(2 ** 64 - 1)
1112
@type uint256 :: 0..unquote(2 ** 256 - 1)
1213

mix.exs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,12 @@ defmodule LambdaEthereumConsensus.MixProject do
5454
{:ex2ms, "~> 1.6", runtime: false},
5555
{:eflambe, "~> 0.3.1"},
5656
{:patch, "~> 0.13.0", only: [:test]},
57-
{:stream_data, "~> 0.5", only: [:test]},
57+
{:stream_data, "~> 0.6", only: [:test]},
5858
{:benchee, "~> 1.2", only: [:dev]},
5959
{:dialyxir, "~> 1.1", only: [:dev, :test], runtime: false},
6060
{:credo, "~> 1.7", only: [:dev, :test], runtime: false},
61-
{:open_api_spex, "~> 3.18"}
61+
{:open_api_spex, "~> 3.18"},
62+
{:crc32c, git: "https://github.com/lambdaclass/crc32c", branch: "bump-rustler-to-29"}
6263
]
6364
end
6465

mix.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
"cowboy": {:hex, :cowboy, "2.10.0", "ff9ffeff91dae4ae270dd975642997afe2a1179d94b1887863e43f681a203e26", [:make, :rebar3], [{:cowlib, "2.12.1", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "3afdccb7183cc6f143cb14d3cf51fa00e53db9ec80cdcd525482f5e99bc41d6b"},
1010
"cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"},
1111
"cowlib": {:hex, :cowlib, "2.12.1", "a9fa9a625f1d2025fe6b462cb865881329b5caff8f1854d1cbc9f9533f00e1e1", [:make, :rebar3], [], "hexpm", "163b73f6367a7341b33c794c4e88e7dbfe6498ac42dcd69ef44c5bc5507c8db0"},
12+
"crc32c": {:git, "https://github.com/lambdaclass/crc32c", "e4207478aa5b01c7e0326d89a399a76dc8215daf", [branch: "bump-rustler-to-29"]},
1213
"credo": {:hex, :credo, "1.7.3", "05bb11eaf2f2b8db370ecaa6a6bda2ec49b2acd5e0418bc106b73b07128c0436", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "35ea675a094c934c22fb1dca3696f3c31f2728ae6ef5a53b5d648c11180a4535"},
1314
"deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"},
1415
"dialyxir": {:hex, :dialyxir, "1.4.3", "edd0124f358f0b9e95bfe53a9fcf806d615d8f838e2202a9f430d59566b6b53b", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "bf2cfb75cd5c5006bec30141b131663299c661a864ec7fbbc72dfa557487a986"},

test/unit/snappy_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ defmodule Unit.SnappyTest do
33
use ExUnitProperties
44
doctest Snappy
55

6-
def assert_snappy_decompress(compressed, uncompressed) do
6+
defp assert_snappy_decompress(compressed, uncompressed) do
77
{:ok, ^uncompressed} =
88
compressed
99
|> Base.decode16!()

test/unit/snappyex_test.exs

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
defmodule Unit.SnappyExTest do
2+
use ExUnit.Case
3+
use ExUnitProperties
4+
doctest SnappyEx
5+
6+
@empty_stream <<0xFF, 6::little-size(24)>> <> "sNaPpY"
7+
8+
defp assert_snappy_decompress(compressed, uncompressed) do
9+
assert compressed |> Base.decode16!() |> SnappyEx.decompress() ==
10+
{:ok, Base.decode16!(uncompressed)}
11+
end
12+
13+
test "empty stream w/o stream identifier" do
14+
assert {:error, _} = SnappyEx.decompress("")
15+
end
16+
17+
test "empty stream w/ stream identifier" do
18+
assert {:ok, ""} = SnappyEx.decompress(@empty_stream)
19+
end
20+
21+
test "uncompressed stream" do
22+
data = "some uncompressed data"
23+
checksum = <<SnappyEx.compute_checksum(data)::little-size(32)>>
24+
25+
size = byte_size(data) + byte_size(checksum)
26+
27+
header = <<0x01, size::little-size(24)>>
28+
stream = Enum.join([@empty_stream, header, checksum, data])
29+
30+
assert SnappyEx.decompress(stream) == {:ok, data}
31+
end
32+
33+
test "compressed stream" do
34+
data = "some compressed data"
35+
checksum = <<SnappyEx.compute_checksum(data)::little-size(32)>>
36+
37+
{:ok, compressed_data} = :snappyer.compress(data)
38+
size = byte_size(compressed_data) + byte_size(checksum)
39+
40+
header = <<0x00, size::little-size(24)>>
41+
stream = Enum.join([@empty_stream, header, checksum, compressed_data])
42+
43+
assert SnappyEx.decompress(stream) == {:ok, data}
44+
end
45+
46+
test "decompress GetMetadata response uncompressed 0" do
47+
assert_snappy_decompress(
48+
"FF060000734E6150705901150000F1D17CFF0008000000000000FFFFFFFFFFFFFFFF0F",
49+
"0008000000000000FFFFFFFFFFFFFFFF0F"
50+
)
51+
end
52+
53+
test "decompress GetMetadata response uncompressed 1" do
54+
assert_snappy_decompress(
55+
"FF060000734E6150705901150000CD11E7D53A03000000000000FFFFFFFFFFFFFFFF0F",
56+
"3A03000000000000FFFFFFFFFFFFFFFF0F"
57+
)
58+
end
59+
60+
test "decompress GetMetadata response compressed" do
61+
assert_snappy_decompress(
62+
"FF060000734E61507059000A0000B3A056EA1100003E0100",
63+
"0000000000000000000000000000000000"
64+
)
65+
end
66+
67+
test "decompress Ping response 0" do
68+
assert_snappy_decompress(
69+
"FF060000734E61507059010C0000B18525A04300000000000000",
70+
"4300000000000000"
71+
)
72+
end
73+
74+
test "decompress Ping response 1" do
75+
assert_snappy_decompress(
76+
"FF060000734E61507059010C00000175DE410100000000000000",
77+
"0100000000000000"
78+
)
79+
end
80+
81+
test "decompress Ping response 2" do
82+
assert_snappy_decompress(
83+
"FF060000734E61507059010C0000EAB2043E0500000000000000",
84+
"0500000000000000"
85+
)
86+
end
87+
88+
test "decompress Ping response 3" do
89+
assert_snappy_decompress(
90+
"FF060000734E61507059010C0000290398070000000000000000",
91+
"0000000000000000"
92+
)
93+
end
94+
95+
test "decompress error response" do
96+
assert_snappy_decompress(
97+
"FF060000734E6150705900220000EF99F84B1C6C4661696C656420746F20756E636F6D7072657373206D657373616765",
98+
Base.encode16("Failed to uncompress message")
99+
)
100+
end
101+
102+
property "SnappyEx == Snappy: random stream" do
103+
check all(stream <- binary(min_length: 1)) do
104+
expected = Snappy.decompress(stream)
105+
106+
case SnappyEx.decompress(stream) do
107+
{:ok, result} -> assert expected == {:ok, result}
108+
{:error, reason} -> assert {:error, _} = expected, reason
109+
end
110+
end
111+
end
112+
113+
property "SnappyEx == Snappy: random valid stream" do
114+
check all(payload <- binary(min_length: 1)) do
115+
{:ok, stream} = Snappy.compress(payload)
116+
117+
assert SnappyEx.decompress(stream) == {:ok, payload}
118+
end
119+
end
120+
end

0 commit comments

Comments
 (0)