improved zopfli compression (#22737)

This commit is contained in:
Ryan Castellucci 2024-12-29 19:47:12 +00:00 committed by GitHub
parent 02225ea7c4
commit a1aef6b39f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 83 additions and 8 deletions

View File

@ -31,10 +31,10 @@ if not tasmotapiolib.is_env_set(tasmotapiolib.DISABLE_MAP_GZ, env):
env.AddPostAction("$BUILD_DIR/${PROGNAME}.bin", [map_gzip])
if tasmotapiolib.is_env_set(tasmotapiolib.ENABLE_ESP32_GZ, env) or env["PIOPLATFORM"] != "espressif32":
try:
from zopfli.gzip import compress
except:
from gzip import compress
import time
gzip_level = int(env['ENV'].get('GZIP_LEVEL', 10))
def bin_gzip(source, target, env):
# create string with location and file names based on variant
bin_file = tasmotapiolib.get_final_bin_path(env)
@ -47,8 +47,10 @@ if tasmotapiolib.is_env_set(tasmotapiolib.ENABLE_ESP32_GZ, env) or env["PIOPLATF
# write gzip firmware file
with open(bin_file, "rb") as fp:
with open(gzip_file, "wb") as f:
zopfli_gz = compress(fp.read())
f.write(zopfli_gz)
time_start = time.time()
gz = tasmotapiolib.compress(fp.read(), gzip_level)
time_delta = time.time() - time_start
f.write(gz)
ORG_FIRMWARE_SIZE = bin_file.stat().st_size
GZ_FIRMWARE_SIZE = gzip_file.stat().st_size
@ -59,10 +61,11 @@ if tasmotapiolib.is_env_set(tasmotapiolib.ENABLE_ESP32_GZ, env) or env["PIOPLATF
)
)
else:
print(Fore.GREEN + "Compression reduced firmware size to {:.0f}% (was {} bytes, now {} bytes)".format(
print(Fore.GREEN + "Compression reduced firmware size to {:.0f}% (was {} bytes, now {} bytes, took {:.3f} seconds)".format(
(GZ_FIRMWARE_SIZE / ORG_FIRMWARE_SIZE) * 100,
ORG_FIRMWARE_SIZE,
GZ_FIRMWARE_SIZE,
time_delta,
)
)

View File

@ -21,6 +21,7 @@ map_dir = /tmp/map_files/
Values in .ini files override environment variables
"""
import zlib
import pathlib
import os
@ -42,7 +43,6 @@ MAP_DIR = "map_dir"
# This is the default output directory
OUTPUT_DIR = pathlib.Path("build_output")
def get_variant(env) -> str:
"""Get the current build variant."""
return env["PIOENV"]
@ -125,3 +125,75 @@ def is_env_set(name: str, env):
val = val.strip()
return val == "1"
return False
def _compress_with_gzip(data, level=9):
import zlib
if level < 0: level = 0
elif level > 9: level = 9
# gzip header without timestamp
zobj = zlib.compressobj(level=level, wbits=16 + zlib.MAX_WBITS)
return zobj.compress(data) + zobj.flush()
try:
import zopfli
# two python modules call themselves `zopfli`, which one is this?
if hasattr(zopfli, 'ZopfliCompressor'):
# we seem to have zopflipy
from zopfli import ZopfliCompressor, ZOPFLI_FORMAT_GZIP
def _compress_with_zopfli(data, iterations=15, maxsplit=15, **kw):
zobj = ZopfliCompressor(
ZOPFLI_FORMAT_GZIP,
iterations=iterations,
block_splitting_max=maxsplit,
**kw,
)
return zobj.compress(data) + zobj.flush()
else:
# we seem to have pyzopfli
import zopfli.gzip
def _compress_with_zopfli(data, iterations=15, maxsplit=15, **kw):
return zopfli.gzip.compress(
data,
numiterations=iterations,
blocksplittingmax=maxsplit,
**kw,
)
# values based on limited manual testing
def _level_to_params(level):
if level == 10: return (15, 15)
elif level == 11: return (15, 20)
elif level == 12: return (15, 25)
elif level == 13: return (15, 30)
elif level == 14: return (15, 35)
elif level == 15: return (33, 40)
elif level == 16: return (67, 45)
elif level == 17: return (100, 50)
elif level == 18: return (500, 100)
elif level >= 19: return (2500, 250)
else:
raise ValueError(f'Invalid level: {repr(level)}')
def compress(data, level=None, *, iterations=None, maxsplit=None, **kw):
if level is not None and (iterations is not None or maxsplit is not None):
raise ValueError("The `level` argument can't be used with `iterations` and/or `maxsplit`!")
# set parameters based on level or to defaults
if iterations is None and maxsplit is None:
if level is None: level = 10
elif level < 10: return _compress_with_gzip(data, level)
iterations, maxsplit = _level_to_params(level)
if maxsplit is not None:
kw['maxsplit'] = maxsplit
if iterations is not None:
kw['iterations'] = iterations
return _compress_with_zopfli(data, **kw)
except ModuleNotFoundError:
def compress(data, level=9, **kw):
return _compress_with_gzip(data, level)