diff --git a/tools/stress_test.sh b/tools/stress_test.sh new file mode 100644 index 000000000..d7c344c58 --- /dev/null +++ b/tools/stress_test.sh @@ -0,0 +1,37 @@ +#!/bin/bash +# Some web server stress tests +# +# Perform a large number of parallel requests, stress testing the web server +# TODO: some kind of performance metrics + +# Accepts three command line arguments: +# - first argument - mandatory - IP or hostname of target server +# - second argument - target type (optional) +# - third argument - xfer count (for replicated targets) (optional) +HOST=$1 +declare -n TARGET_STR="${2:-JSON_LARGER}_TARGETS" +REPLICATE_COUNT=$(("${3:-10}")) + +PARALLEL_MAX=${PARALLEL_MAX:-50} + +CURL_ARGS="--compressed --parallel --parallel-immediate --parallel-max ${PARALLEL_MAX}" +CURL_PRINT_RESPONSE_ARGS="-w %{http_code}\n" + +JSON_TARGETS=('json/state' 'json/info' 'json/si', 'json/palettes' 'json/fxdata' 'settings/s.js?p=2') +FILE_TARGETS=('' 'iro.js' 'rangetouch.js' 'settings' 'settings/wifi') +# Replicate one target many times +function replicate() { + printf "${1}?%d " $(seq 1 ${REPLICATE_COUNT}) +} +read -a JSON_TINY_TARGETS <<< $(replicate "json/nodes") +read -a JSON_SMALL_TARGETS <<< $(replicate "json/info") +read -a JSON_LARGE_TARGETS <<< $(replicate "json/si") +read -a JSON_LARGER_TARGETS <<< $(replicate "json/fxdata") + +# Expand target URLS to full arguments for curl +TARGETS=(${TARGET_STR[@]}) +#echo "${TARGETS[@]}" +FULL_TGT_OPTIONS=$(printf "http://${HOST}/%s -o /dev/null " "${TARGETS[@]}") +#echo ${FULL_TGT_OPTIONS} + +time curl ${CURL_ARGS} ${FULL_TGT_OPTIONS} diff --git a/tools/udp_test.py b/tools/udp_test.py new file mode 100644 index 000000000..c4c9129cf --- /dev/null +++ b/tools/udp_test.py @@ -0,0 +1,46 @@ +import numpy as np +import socket + +class WledRealtimeClient: + def __init__(self, wled_controller_ip, num_pixels, udp_port=21324, max_pixels_per_packet=126): + self.wled_controller_ip = wled_controller_ip + self.num_pixels = num_pixels + self.udp_port = udp_port + self.max_pixels_per_packet = max_pixels_per_packet + self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self._prev_pixels = np.full((3, self.num_pixels), 253, dtype=np.uint8) + self.pixels = np.full((3, self.num_pixels), 1, dtype=np.uint8) + + def update(self): + # Truncate values and cast to integer + self.pixels = np.clip(self.pixels, 0, 255).astype(np.uint8) + p = np.copy(self.pixels) + + idx = np.where(~np.all(p == self._prev_pixels, axis=0))[0] + num_pixels = len(idx) + n_packets = (num_pixels + self.max_pixels_per_packet - 1) // self.max_pixels_per_packet + idx_split = np.array_split(idx, n_packets) + + header = bytes([1, 2]) # WARLS protocol header + for packet_indices in idx_split: + data = bytearray(header) + for i in packet_indices: + data.extend([i, *p[:, i]]) # Index and RGB values + self._sock.sendto(bytes(data), (self.wled_controller_ip, self.udp_port)) + + self._prev_pixels = np.copy(p) + + + +################################## LED blink test ################################## +if __name__ == "__main__": + WLED_CONTROLLER_IP = "192.168.1.153" + NUM_PIXELS = 255 # Amount of LEDs on your strip + import time + wled = WledRealtimeClient(WLED_CONTROLLER_IP, NUM_PIXELS) + print('Starting LED blink test') + while True: + for i in range(NUM_PIXELS): + wled.pixels[1, i] = 255 if wled.pixels[1, i] == 0 else 0 + wled.update() + time.sleep(.01)