diff --git a/etcher-pro-monitoring/benchmark/Dockerfile.template b/etcher-pro-monitoring/benchmark/Dockerfile.template index b8f75b84..8ba9bc63 100644 --- a/etcher-pro-monitoring/benchmark/Dockerfile.template +++ b/etcher-pro-monitoring/benchmark/Dockerfile.template @@ -1,6 +1,6 @@ -FROM balenalib/%%BALENA_MACHINE_NAME%%-debian-node:12.6-buster-build as rust-builder +FROM balenalib/%%BALENA_MACHINE_NAME%%-debian-node:12.6-buster-build as builder RUN apt-get update -RUN apt-get install -yq --no-install-recommends git curl +RUN apt-get install -yq --no-install-recommends git curl python # https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=923479 # https://github.com/balena-io-library/base-images/issues/562 RUN c_rehash @@ -10,6 +10,9 @@ WORKDIR /usr/src/app RUN git clone https://github.com/balena-io-playground/parallel-disk-duplicator.git . RUN git checkout sda-to-sd-b-to-p RUN cargo build --release +# Also build @ronomon/direct-io +COPY package.json package-lock.json ./ +RUN npm i FROM balenalib/%%BALENA_MACHINE_NAME%%-debian-node:12.6-buster RUN \ @@ -17,9 +20,9 @@ RUN \ apt-get install -y stress htop dcfldd && \ rm -rf /var/lib/apt/lists/* WORKDIR /usr/src/app -COPY dd.sh flash.ts package.json package-lock.json ./ -COPY --from=rust-builder /usr/src/app/target/release/pdd . -RUN npm i +COPY dd.sh flash.ts tsconfig.json package.json package-lock.json ./ +COPY --from=builder /usr/src/app/target/release/pdd . +COPY --from=builder /usr/src/app/node_modules ./node_modules ENV UDEV=1 ENV UV_THREADPOOL_SIZE=128 RUN echo "echo \"stress, htop, dd and dcfldd are installed, try running ./flash.ts --help\"" >> /etc/bash.bashrc diff --git a/etcher-pro-monitoring/benchmark/flash.ts b/etcher-pro-monitoring/benchmark/flash.ts index 2f662406..07e31279 100755 --- a/etcher-pro-monitoring/benchmark/flash.ts +++ b/etcher-pro-monitoring/benchmark/flash.ts @@ -1,95 +1,229 @@ #!/usr/src/app/node_modules/.bin/ts-node -import { createReadStream, createWriteStream, promises as fs } from 'fs'; +// @ts-ignore +import { getAlignedBuffer } from '@ronomon/direct-io'; +import { constants, createWriteStream, promises as fs } from 'fs'; import { resolve as resolvePath } from 'path'; +// @ts-ignore +import * as RWMutex from 'rwmutex'; import { Readable } from 'stream'; import { Argv } from 'yargs'; -function createReader(size: number, sourceHighWaterMark: number) { - return createReadStream('/dev/zero', { - end: size, - highWaterMark: sourceHighWaterMark, - }); +const CHUNK_SIZE = 1024 ** 2; +const ALIGNMENT = 4096; + +interface LockableBuffer extends Buffer { + lock: () => Promise<() => void>; + rlock: () => Promise<() => void>; + slice: (start?: number, end?: number) => LockableBuffer; +} + +function attachMutex(buf: Buffer, mutex: RWMutex): LockableBuffer { + const buffer = buf as LockableBuffer; + buffer.lock = mutex.lock.bind(mutex); + buffer.rlock = mutex.rlock.bind(mutex); + const bufferSlice = buffer.slice.bind(buffer); + buffer.slice = (...args) => { + const slice = bufferSlice(...args); + return attachMutex(slice, mutex); + }; + return buffer; +} + +function createBuffer(size: number, alignment: number): LockableBuffer { + return attachMutex(getAlignedBuffer(size, alignment), new RWMutex()); +} + +export class ReadStream extends Readable { + public bytesRead = 0; + private handle: fs.FileHandle; + private ready: Promise; + private buffers: LockableBuffer[]; + private currentBufferIndex = 0; + + constructor( + private debug: boolean, + private path: string, + private direct: boolean, + private end?: number, + private numBuffers = 2, + ) { + super({ + objectMode: true, + highWaterMark: numBuffers - 1, + }); + if (numBuffers < 2) { + throw new Error("numBuffers can't be less than 2"); + } + this.buffers = new Array(numBuffers); + this.ready = this.init(); + } + + private getCurrentBuffer(): LockableBuffer { + let buffer = this.buffers[this.currentBufferIndex]; + if (buffer === undefined) { + buffer = createBuffer(CHUNK_SIZE, ALIGNMENT); + // @ts-ignore + buffer.index = this.currentBufferIndex; + this.buffers[this.currentBufferIndex] = buffer; + } + this.currentBufferIndex = (this.currentBufferIndex + 1) % this.numBuffers; + return buffer; + } + + private async init(): Promise { + let flags = constants.O_RDONLY; + if (this.direct) { + flags |= constants.O_DIRECT | constants.O_EXCL | constants.O_SYNC; + } + this.handle = await fs.open(this.path, flags); + } + + public async _read() { + await this.ready; + const buffer = this.getCurrentBuffer(); + const unlock = await buffer.lock(); + if (this.debug) { + // @ts-ignore + console.log('r start', buffer.index); + } + const { bytesRead } = await this.handle.read( + buffer, + 0, + CHUNK_SIZE, + this.bytesRead, + ); + unlock(); + this.bytesRead += bytesRead; + const slice = buffer.slice(0, bytesRead); + // @ts-ignore + slice.index = buffer.index; + if (this.debug) { + // @ts-ignore + console.log('r end', buffer.index); + } + this.push(slice); + if ( + bytesRead < CHUNK_SIZE || + (this.end !== undefined && this.bytesRead > this.end) + ) { + this.push(null); + await this.handle.close(); + this.emit('close'); + } + } +} + +function nTab(n: number): string { + let result = ''; + for (let i = 0; i < n; i++) { + result += '\t'; + } + return result; } async function flash( - size: number, - sourceHighWaterMark: number, - destinationHighWaterMark: number, - oneSource: boolean, - devices: string[] = [], + numBuffers: number, + size: number | undefined, + inputDirect: boolean, + outputDirect: boolean, + debug: boolean, + input: string, + outputs: string[] = [], ) { const promises: Array> = []; - const outputs = devices.map((f: string) => resolvePath(f)); - let globalSource: Readable; - if (oneSource) { - globalSource = createReader(size, sourceHighWaterMark); - promises.push( - new Promise((resolve, reject) => { - globalSource.on('close', resolve); - globalSource.on('error', reject); - }), - ); - globalSource.setMaxListeners(outputs.length + 1); - } + const source = new ReadStream(debug, input, inputDirect, size, numBuffers); + source.setMaxListeners(outputs.length + 1); + promises.push( + new Promise((resolve, reject) => { + source.on('close', resolve); + source.on('error', reject); + }), + ); const start = new Date().getTime(); - for (const output of outputs) { + for (let idx = 0; idx < outputs.length; idx++) { + const output = outputs[idx]; + let flags = constants.O_WRONLY; + if (outputDirect) { + flags |= constants.O_DIRECT | constants.O_EXCL | constants.O_SYNC; + } const destination = createWriteStream(output, { - highWaterMark: destinationHighWaterMark, + objectMode: true, + highWaterMark: Math.round(numBuffers / 2) - 1, + // @ts-ignore (flags can be a number) + flags, }); + destination._writev = undefined; + const origWrite = destination._write.bind(destination); + destination._write = async (...args) => { + const origOnWrite = args[2]; + const unlock = await args[0].rlock(); + if (debug) { + // @ts-ignore + console.log(`${nTab(idx + 1)}w start`, args[0].index); + } + args[2] = (...aargs) => { + unlock(); + if (debug) { + console.log(`${nTab(idx + 1)}w end`, args[0].index); + } + // @ts-ignore + origOnWrite(...aargs); + }; + // @ts-ignore + return origWrite(...args); + }; promises.push( new Promise((resolve, reject) => { destination.on('close', resolve); destination.on('error', reject); }), ); - let source: Readable; - if (globalSource !== undefined) { - source = globalSource; - } else { - source = createReader(size, sourceHighWaterMark); - promises.push( - new Promise((resolve, reject) => { - source.on('close', resolve); - source.on('error', reject); - }), - ); - } source.pipe(destination); } await Promise.all(promises); const end = new Date().getTime(); const duration = (end - start) / 1000; + if (size === undefined) { + size = source.bytesRead; + } console.log('total time', duration, 's'); console.log('speed', size / 1024 ** 2 / duration, 'MiB/s'); } const argv = require('yargs').command( - '$0 [devices..]', + '$0 input [devices..]', 'Write zeros to devices', (yargs: Argv) => { + yargs.positional('input', { describe: 'Input device' }); yargs.positional('devices', { describe: 'Devices to write to' }); + yargs.option('numBuffers', { + default: 2, + describe: 'Number of 1MiB buffers used by the reader', + }); yargs.option('size', { - default: 1500 * 1024 ** 2, + type: 'number', describe: 'Size in bytes', }); - yargs.option('sourceHighWaterMark', { - default: 1024 ** 2, - describe: 'Source high water mark in bytes', - }); - yargs.option('destinationHighWaterMark', { - default: 64 * 1024 ** 2, - describe: 'Destinations high water mark in bytes', - }); yargs.option('loop', { type: 'boolean', default: false, describe: 'Indefinitely restart flashing when done', }); - yargs.option('oneSource', { + yargs.option('debug', { type: 'boolean', default: false, - describe: 'Use only one reader for /dev/zero', + describe: 'Show debug information', + }); + yargs.option('inputDirect', { + type: 'boolean', + default: false, + describe: 'Use direct io for input', + }); + yargs.option('outputDirect', { + type: 'boolean', + default: false, + describe: 'Use direct io for output', }); }, ).argv; @@ -101,11 +235,13 @@ async function main() { } while (true) { await flash( + argv.numBuffers, argv.size, - argv.sourceHighWaterMark, - argv.destinationHighWaterMark, - argv.oneSource, - argv.devices, + argv.inputDirect, + argv.outputDirect, + argv.debug, + resolvePath(argv.input), + argv.devices.map((f: string) => resolvePath(f)), ); if (!argv.loop) { break; diff --git a/etcher-pro-monitoring/benchmark/package-lock.json b/etcher-pro-monitoring/benchmark/package-lock.json index 55df98d9..25278904 100644 --- a/etcher-pro-monitoring/benchmark/package-lock.json +++ b/etcher-pro-monitoring/benchmark/package-lock.json @@ -4,15 +4,28 @@ "lockfileVersion": 1, "requires": true, "dependencies": { + "@ronomon/direct-io": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/@ronomon/direct-io/-/direct-io-3.0.1.tgz", + "integrity": "sha512-NkKB32bjq7RfMdAMiWayphMlVWzsfPiKelK+btXLqggv1vDVgv2xELqeo0z4uYLLt86fVReLPxQj7qpg0zWvow==", + "requires": { + "@ronomon/queue": "^3.0.1" + } + }, + "@ronomon/queue": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/@ronomon/queue/-/queue-3.0.1.tgz", + "integrity": "sha512-STcqSvk+c7ArMrZgYxhM92p6O6F7t0SUbGr+zm8s9fJple5EdJAMwP3dXqgdXeF95xWhBpha5kjEqNAIdI0r4w==" + }, "@types/color-name": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/@types/color-name/-/color-name-1.1.1.tgz", "integrity": "sha512-rr+OQyAjxze7GgWrSaJwydHStIhHq2lvY3BOC2Mj7KnzI7XK0Uw1TOOdI9lDoajEbSWLiYgoo4f1R51erQfhPQ==" }, "@types/node": { - "version": "12.12.28", - "resolved": "https://registry.npmjs.org/@types/node/-/node-12.12.28.tgz", - "integrity": "sha512-g73GJYJDXgf0jqg+P9S8h2acWbDXNkoCX8DLtJVu7Fkn788pzQ/oJsrdJz/2JejRf/SjfZaAhsw+3nd1D5EWGg==", + "version": "12.12.29", + "resolved": "https://registry.npmjs.org/@types/node/-/node-12.12.29.tgz", + "integrity": "sha512-yo8Qz0ygADGFptISDj3pOC9wXfln/5pQaN/ysDIzOaAWXt73cNHmtEC8zSO2Y+kse/txmwIAJzkYZ5fooaS5DQ==", "dev": true }, "@types/yargs": { @@ -84,6 +97,14 @@ "resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz", "integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==" }, + "debug": { + "version": "3.2.6", + "resolved": "https://registry.npmjs.org/debug/-/debug-3.2.6.tgz", + "integrity": "sha512-mel+jf7nrtEl5Pn1Qx46zARXKDpBbvzezse7p7LqINmdoIk8PYP5SySaxEmYv6TZ0JyEKA1hsCId6DIhgITtWQ==", + "requires": { + "ms": "^2.1.1" + } + }, "decamelize": { "version": "1.2.0", "resolved": "https://registry.npmjs.org/decamelize/-/decamelize-1.2.0.tgz", @@ -133,6 +154,11 @@ "integrity": "sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==", "dev": true }, + "ms": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" + }, "p-limit": { "version": "2.2.2", "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-2.2.2.tgz", @@ -169,6 +195,14 @@ "resolved": "https://registry.npmjs.org/require-main-filename/-/require-main-filename-2.0.0.tgz", "integrity": "sha512-NKN5kMDylKuldxYLSUfrbo5Tuzh4hd+2E8NPPX02mZtn1VuREQToYe/ZdlJy+J3uCpfaiGF05e7B8W0iXbQHmg==" }, + "rwmutex": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/rwmutex/-/rwmutex-1.0.0.tgz", + "integrity": "sha1-/dHqaoe3f0SecteF+eonTL4UDe0=", + "requires": { + "debug": "^3.0.1" + } + }, "set-blocking": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/set-blocking/-/set-blocking-2.0.0.tgz", diff --git a/etcher-pro-monitoring/benchmark/package.json b/etcher-pro-monitoring/benchmark/package.json index 0223cb40..c7ebe977 100644 --- a/etcher-pro-monitoring/benchmark/package.json +++ b/etcher-pro-monitoring/benchmark/package.json @@ -9,12 +9,14 @@ "author": "", "license": "ISC", "devDependencies": { - "@types/node": "^12.12.28", + "@types/node": "^12.12.29", "@types/yargs": "^15.0.4", "ts-node": "^8.6.2", "typescript": "^3.8.2" }, "dependencies": { + "@ronomon/direct-io": "^3.0.1", + "rwmutex": "^1.0.0", "yargs": "^15.1.0" } } diff --git a/etcher-pro-monitoring/benchmark/tsconfig.json b/etcher-pro-monitoring/benchmark/tsconfig.json new file mode 100644 index 00000000..1c62b4a8 --- /dev/null +++ b/etcher-pro-monitoring/benchmark/tsconfig.json @@ -0,0 +1,14 @@ +{ + "compilerOptions": { + "noImplicitAny": true, + "noUnusedLocals": true, + "noUnusedParameters": true, + "strictNullChecks": true, + "resolveJsonModule": true, + "moduleResolution": "node", + "module": "commonjs", + "target": "es2018", + "typeRoots": ["./node_modules/@types", "./typings"], + "allowSyntheticDefaultImports": true + } +}