diff --git a/pio-tools/gzip-firmware.py b/pio-tools/gzip-firmware.py index 38876e63e..8de4f1c0f 100644 --- a/pio-tools/gzip-firmware.py +++ b/pio-tools/gzip-firmware.py @@ -31,10 +31,10 @@ if not tasmotapiolib.is_env_set(tasmotapiolib.DISABLE_MAP_GZ, env): env.AddPostAction("$BUILD_DIR/${PROGNAME}.bin", [map_gzip]) if tasmotapiolib.is_env_set(tasmotapiolib.ENABLE_ESP32_GZ, env) or env["PIOPLATFORM"] != "espressif32": - try: - from zopfli.gzip import compress - except: - from gzip import compress + import time + + gzip_level = int(env['ENV'].get('GZIP_LEVEL', 10)) + def bin_gzip(source, target, env): # create string with location and file names based on variant bin_file = tasmotapiolib.get_final_bin_path(env) @@ -47,8 +47,10 @@ if tasmotapiolib.is_env_set(tasmotapiolib.ENABLE_ESP32_GZ, env) or env["PIOPLATF # write gzip firmware file with open(bin_file, "rb") as fp: with open(gzip_file, "wb") as f: - zopfli_gz = compress(fp.read()) - f.write(zopfli_gz) + time_start = time.time() + gz = tasmotapiolib.compress(fp.read(), gzip_level) + time_delta = time.time() - time_start + f.write(gz) ORG_FIRMWARE_SIZE = bin_file.stat().st_size GZ_FIRMWARE_SIZE = gzip_file.stat().st_size @@ -59,10 +61,11 @@ if tasmotapiolib.is_env_set(tasmotapiolib.ENABLE_ESP32_GZ, env) or env["PIOPLATF ) ) else: - print(Fore.GREEN + "Compression reduced firmware size to {:.0f}% (was {} bytes, now {} bytes)".format( + print(Fore.GREEN + "Compression reduced firmware size to {:.0f}% (was {} bytes, now {} bytes, took {:.3f} seconds)".format( (GZ_FIRMWARE_SIZE / ORG_FIRMWARE_SIZE) * 100, ORG_FIRMWARE_SIZE, GZ_FIRMWARE_SIZE, + time_delta, ) ) diff --git a/pio-tools/tasmotapiolib.py b/pio-tools/tasmotapiolib.py index 3105739c1..c63dc83c0 100644 --- a/pio-tools/tasmotapiolib.py +++ b/pio-tools/tasmotapiolib.py @@ -21,6 +21,7 @@ map_dir = /tmp/map_files/ Values in .ini files override environment variables """ +import zlib import pathlib import os @@ -42,7 +43,6 @@ MAP_DIR = "map_dir" # This is the default output directory OUTPUT_DIR = pathlib.Path("build_output") - def get_variant(env) -> str: """Get the current build variant.""" return env["PIOENV"] @@ -125,3 +125,75 @@ def is_env_set(name: str, env): val = val.strip() return val == "1" return False + +def _compress_with_gzip(data, level=9): + import zlib + if level < 0: level = 0 + elif level > 9: level = 9 + # gzip header without timestamp + zobj = zlib.compressobj(level=level, wbits=16 + zlib.MAX_WBITS) + return zobj.compress(data) + zobj.flush() + +try: + import zopfli + + # two python modules call themselves `zopfli`, which one is this? + if hasattr(zopfli, 'ZopfliCompressor'): + # we seem to have zopflipy + from zopfli import ZopfliCompressor, ZOPFLI_FORMAT_GZIP + def _compress_with_zopfli(data, iterations=15, maxsplit=15, **kw): + zobj = ZopfliCompressor( + ZOPFLI_FORMAT_GZIP, + iterations=iterations, + block_splitting_max=maxsplit, + **kw, + ) + return zobj.compress(data) + zobj.flush() + + else: + # we seem to have pyzopfli + import zopfli.gzip + def _compress_with_zopfli(data, iterations=15, maxsplit=15, **kw): + return zopfli.gzip.compress( + data, + numiterations=iterations, + blocksplittingmax=maxsplit, + **kw, + ) + + # values based on limited manual testing + def _level_to_params(level): + if level == 10: return (15, 15) + elif level == 11: return (15, 20) + elif level == 12: return (15, 25) + elif level == 13: return (15, 30) + elif level == 14: return (15, 35) + elif level == 15: return (33, 40) + elif level == 16: return (67, 45) + elif level == 17: return (100, 50) + elif level == 18: return (500, 100) + elif level >= 19: return (2500, 250) + else: + raise ValueError(f'Invalid level: {repr(level)}') + + def compress(data, level=None, *, iterations=None, maxsplit=None, **kw): + if level is not None and (iterations is not None or maxsplit is not None): + raise ValueError("The `level` argument can't be used with `iterations` and/or `maxsplit`!") + + # set parameters based on level or to defaults + if iterations is None and maxsplit is None: + if level is None: level = 10 + elif level < 10: return _compress_with_gzip(data, level) + iterations, maxsplit = _level_to_params(level) + + if maxsplit is not None: + kw['maxsplit'] = maxsplit + + if iterations is not None: + kw['iterations'] = iterations + + return _compress_with_zopfli(data, **kw) + +except ModuleNotFoundError: + def compress(data, level=9, **kw): + return _compress_with_gzip(data, level)