From c724e4cb20298b99d5c6faed4c7c8f810afb5cf5 Mon Sep 17 00:00:00 2001 From: Jonas Hermsmeier Date: Sat, 24 Feb 2018 00:31:06 +0100 Subject: [PATCH 1/3] feat(writer): Impl multi-writes in writer modules Implement writing to multiple destinations simultaneously Change-Type: minor Changelog-Entry: Implement writing to multiple destinations simultaneously --- lib/cli/etcher.js | 94 +++- lib/cli/writer.js | 62 --- lib/gui/app/modules/image-writer.js | 44 +- lib/gui/app/pages/main/controllers/flash.js | 2 +- lib/gui/modules/child-writer.js | 85 ++- lib/sdk/image-stream/handlers.js | 17 +- lib/sdk/writer/block-write-stream.js | 2 +- lib/sdk/writer/index.js | 559 +++++++++++++------- tests/gui/modules/image-writer.spec.js | 16 +- 9 files changed, 554 insertions(+), 327 deletions(-) delete mode 100644 lib/cli/writer.js diff --git a/lib/cli/etcher.js b/lib/cli/etcher.js index e027e0f5..511592b3 100644 --- a/lib/cli/etcher.js +++ b/lib/cli/etcher.js @@ -16,11 +16,12 @@ 'use strict' -const path = require('path') +const _ = require('lodash') const Bluebird = require('bluebird') const visuals = require('resin-cli-visuals') const form = require('resin-cli-form') -const writer = require('./writer') +const bytes = require('pretty-bytes') +const ImageWriter = require('../sdk/writer') const utils = require('./utils') const options = require('./options') const messages = require('../shared/messages') @@ -28,6 +29,8 @@ const EXIT_CODES = require('../shared/exit-codes') const errors = require('../shared/errors') const permissions = require('../shared/permissions') +/* eslint-disable no-magic-numbers */ + const ARGV_IMAGE_PATH_INDEX = 0 const imagePath = options._[ARGV_IMAGE_PATH_INDEX] @@ -59,7 +62,6 @@ permissions.isElevated().then((elevated) => { // otherwise the question will not be asked because // `false` is a defined value. yes: options.yes || null - } }) }).then((answers) => { @@ -75,29 +77,79 @@ permissions.isElevated().then((elevated) => { check: new visuals.Progress('Validating') } - return writer.writeImage(imagePath, answers.drive, { - unmountOnSuccess: options.unmount, - validateWriteOnSuccess: options.check - }, (state) => { - progressBars[state.type].update(state) - }).then((results) => { - return { - imagePath, - flash: results + return new Bluebird((resolve, reject) => { + /** + * @summary Progress update handler + * @param {Object} state - progress state + * @private + * @example + * writer.on('progress', onProgress) + */ + const onProgress = (state) => { + state.message = state.active > 1 + ? `${bytes(state.totalSpeed)}/s total, ${bytes(state.speed)}/s avg` + : `${bytes(state.totalSpeed)}/s` + + state.message = `${state.type === 'write' ? 'Flashing' : 'Validating'}: ${state.message}` + + // Update progress bar + progressBars[state.type].update(state) } + + const writer = new ImageWriter({ + verify: options.check, + unmountOnSuccess: options.unmount, + checksumAlgorithms: options.check ? [ 'sha512' ] : [] + }) + + /** + * @summary Error handler + * @param {Error} error - error + * @private + * @example + * writer.on('error', onError) + */ + const onError = function (error) { + console.error(error) + } + + /** + * @summary Finish handler + * @private + * @example + * writer.on('finish', onFinish) + */ + const onFinish = function () { + resolve(Array.from(writer.destinations.values())) + } + + writer.on('progress', onProgress) + writer.on('error', onError) + writer.on('finish', onFinish) + + // NOTE: Drive can be (String|Array) + const destinations = [].concat(answers.drive) + + writer.write(imagePath, destinations) }) }).then((results) => { - return Bluebird.try(() => { - console.log(messages.info.flashComplete(path.basename(results.imagePath), results.flash.drive)) + let exitCode = EXIT_CODES.SUCCESS - if (results.flash.checksum.md5) { - console.log(`Checksum: ${results.flash.checksum.md5}`) - } + if (options.check) { + console.log('') + console.log('Checksums:') - return Bluebird.resolve() - }).then(() => { - process.exit(EXIT_CODES.SUCCESS) - }) + _.forEach(results, (result) => { + if (result.error) { + exitCode = EXIT_CODES.GENERAL_ERROR + console.log(` - ${result.device.device}: ${result.error.message}`) + } else { + console.log(` - ${result.device.device}: ${result.checksum.sha512}`) + } + }) + } + + process.exit(exitCode) }).catch((error) => { return Bluebird.try(() => { utils.printError(error) diff --git a/lib/cli/writer.js b/lib/cli/writer.js deleted file mode 100644 index d8424516..00000000 --- a/lib/cli/writer.js +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright 2016 resin.io - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -'use strict' - -const Bluebird = require('bluebird') -const ImageWriter = require('../sdk/writer') - -/** - * @summary Write an image to a disk drive - * @function - * @public - * - * @param {String} imagePath - path to image - * @param {String} drive - drive - * @param {Object} options - options - * @param {Boolean} [options.unmountOnSuccess=false] - unmount on success - * @param {Boolean} [options.validateWriteOnSuccess=false] - validate write on success - * @param {Function} onProgress - on progress callback (state) - * - * @fulfil {Boolean} - whether the operation was successful - * @returns {Promise} - * - * @example - * writer.writeImage('path/to/image.img', '/dev/disk2', { - * unmountOnSuccess: true, - * validateWriteOnSuccess: true - * }, (state) => { - * console.log(state.percentage); - * }).then(() => { - * console.log('Done!'); - * }); - */ -exports.writeImage = (imagePath, drive, options, onProgress) => { - const writer = new ImageWriter({ - path: drive, - imagePath, - verify: options.validateWriteOnSuccess, - checksumAlgorithms: [ 'md5' ], - unmountOnSuccess: options.unmountOnSuccess - }) - - return new Bluebird((resolve, reject) => { - writer.flash() - .on('error', reject) - .on('progress', onProgress) - .on('finish', resolve) - }) -} diff --git a/lib/gui/app/modules/image-writer.js b/lib/gui/app/modules/image-writer.js index b7cfb1e5..b17781e0 100644 --- a/lib/gui/app/modules/image-writer.js +++ b/lib/gui/app/modules/image-writer.js @@ -36,7 +36,7 @@ const packageJSON = require('../../../../package.json') * @type {Number} * @constant */ -const THREADS_PER_CPU = 4 +const THREADS_PER_CPU = 16 /** * @summary Get application entry point @@ -69,7 +69,7 @@ const getApplicationEntryPoint = () => { * This function is extracted for testing purposes. * * @param {String} image - image path - * @param {Object} drive - drive + * @param {Array} drives - drives * @param {Function} onProgress - in progress callback (state) * * @fulfil {Object} - flash results @@ -82,7 +82,7 @@ const getApplicationEntryPoint = () => { * console.log(state.percentage) * }) */ -exports.performWrite = (image, drive, onProgress) => { +exports.performWrite = (image, drives, onProgress) => { // There might be multiple Etcher instances running at // the same time, therefore we must ensure each IPC // server/client has a different name. @@ -135,6 +135,16 @@ exports.performWrite = (image, drive, onProgress) => { ipc.server.on('state', onProgress) + ipc.server.on('ready', (data, socket) => { + ipc.server.emit(socket, 'write', { + imagePath: image, + destinations: _.map(drives, [ 'device' ]), + validateWriteOnSuccess: settings.get('validateWriteOnSuccess'), + unmountOnSuccess: settings.get('unmountOnSuccess'), + checksumAlgorithms: [ 'sha512' ] + }) + }) + const argv = _.attempt(() => { const entryPoint = getApplicationEntryPoint() @@ -167,31 +177,24 @@ exports.performWrite = (image, drive, onProgress) => { permissions.elevateCommand(argv, { applicationName: packageJSON.displayName, - environment: { + environment: _.assign({}, process.env, { IPC_SERVER_ID, IPC_CLIENT_ID, IPC_SOCKET_ROOT: ipc.config.socketRoot, ELECTRON_RUN_AS_NODE: 1, UV_THREADPOOL_SIZE: os.cpus().length * THREADS_PER_CPU, - // Casting to Number nicely converts booleans to 0 or 1. - OPTION_VALIDATE: Number(settings.get('validateWriteOnSuccess')), - OPTION_UNMOUNT: Number(settings.get('unmountOnSuccess')), - - OPTION_IMAGE: image, - OPTION_DEVICE: drive.device, - // This environment variable prevents the AppImages // desktop integration script from presenting the // "installation" dialog SKIP: 1 - } + }) }).then((results) => { flashResults.cancelled = results.cancelled console.log('Flash results', flashResults) // This likely means the child died halfway through - if (!flashResults.cancelled && !flashResults.bytesWritten) { + if (!flashResults.cancelled && !_.get(flashResults, [ 'results', 'bytesWritten' ])) { throw errors.createUserError({ title: 'The writer process ended unexpectedly', description: 'Please try again, and contact the Etcher team if the problem persists', @@ -206,7 +209,6 @@ exports.performWrite = (image, drive, onProgress) => { if (error.code === SIGKILL_EXIT_CODE) { error.code = 'ECHILDDIED' } - return reject(error) }).finally(() => { console.log('Terminating IPC server') @@ -219,7 +221,7 @@ exports.performWrite = (image, drive, onProgress) => { } /** - * @summary Flash an image to a drive + * @summary Flash an image to drives * @function * @public * @@ -227,17 +229,17 @@ exports.performWrite = (image, drive, onProgress) => { * This function will update `imageWriter.state` with the current writing state. * * @param {String} image - image path - * @param {Object} drive - drive + * @param {Array} drives - drives * @returns {Promise} * * @example - * imageWriter.flash('foo.img', { + * imageWriter.flash('foo.img', [{ * device: '/dev/disk2' - * }).then(() => { + * }]).then(() => { * console.log('Write completed!') * }) */ -exports.flash = (image, drive) => { +exports.flash = (image, drives) => { if (flashState.isFlashing()) { return Bluebird.reject(new Error('There is already a flash in progress')) } @@ -246,7 +248,7 @@ exports.flash = (image, drive) => { const analyticsData = { image, - drive, + drives, uuid: flashState.getFlashUuid(), unmountOnSuccess: settings.get('unmountOnSuccess'), validateWriteOnSuccess: settings.get('validateWriteOnSuccess') @@ -254,7 +256,7 @@ exports.flash = (image, drive) => { analytics.logEvent('Flash', analyticsData) - return exports.performWrite(image, drive, (state) => { + return exports.performWrite(image, drives, (state) => { flashState.setProgressState(state) }).then(flashState.unsetFlashingFlag).then(() => { if (flashState.wasLastFlashCancelled()) { diff --git a/lib/gui/app/pages/main/controllers/flash.js b/lib/gui/app/pages/main/controllers/flash.js index d74c266d..d991266b 100644 --- a/lib/gui/app/pages/main/controllers/flash.js +++ b/lib/gui/app/pages/main/controllers/flash.js @@ -72,7 +72,7 @@ module.exports = function ( const iconPath = '../../../assets/icon.png' - imageWriter.flash(image.path, drive).then(() => { + imageWriter.flash(image.path, [ drive ]).then(() => { if (!flashState.wasLastFlashCancelled()) { notification.send('Success!', { body: messages.info.flashComplete(path.basename(image.path), drive), diff --git a/lib/gui/modules/child-writer.js b/lib/gui/modules/child-writer.js index 4045bc69..30be4fd6 100644 --- a/lib/gui/modules/child-writer.js +++ b/lib/gui/modules/child-writer.js @@ -19,7 +19,7 @@ const ipc = require('node-ipc') const EXIT_CODES = require('../../shared/exit-codes') const errors = require('../../shared/errors') -const writer = require('../../cli/writer') +const ImageWriter = require('../../sdk/writer') ipc.config.id = process.env.IPC_CLIENT_ID ipc.config.socketRoot = process.env.IPC_SOCKET_ROOT @@ -64,7 +64,9 @@ const log = (message) => { */ const terminate = (code) => { ipc.disconnect(IPC_SERVER_ID) - process.exit(code || EXIT_CODES.SUCCESS) + process.nextTick(() => { + process.exit(code || EXIT_CODES.SUCCESS) + }) } /** @@ -93,6 +95,7 @@ ipc.connectTo(IPC_SERVER_ID, () => { process.once('SIGINT', () => { terminate(EXIT_CODES.SUCCESS) }) + process.once('SIGTERM', () => { terminate(EXIT_CODES.SUCCESS) }) @@ -107,27 +110,65 @@ ipc.connectTo(IPC_SERVER_ID, () => { terminate(EXIT_CODES.SUCCESS) }) + ipc.of[IPC_SERVER_ID].on('write', (options) => { + const destinations = [].concat(options.destinations) + + log(`Image: ${options.imagePath}`) + log(`Devices: ${destinations.join(', ')}`) + log(`Umount on success: ${options.unmountOnSuccess}`) + log(`Validate on success: ${options.validateWriteOnSuccess}`) + + let exitCode = EXIT_CODES.SUCCESS + + /** + * @summary Progress handler + * @param {Object} state - progress state + * @example + * writer.on('progress', onProgress) + */ + const onProgress = (state) => { + ipc.of[IPC_SERVER_ID].emit('state', state) + } + + /** + * @summary Finish handler + * @param {Object} results - Flash results + * @example + * writer.on('finish', onFinish) + */ + const onFinish = (results) => { + log(`Finish: ${results.bytesWritten}`) + ipc.of[IPC_SERVER_ID].emit('done', { results }) + terminate(exitCode) + } + + /** + * @summary Error handler + * @param {Error} error - error + * @example + * writer.on('error', onError) + */ + const onError = (error) => { + log(`Error: ${error.message}`) + exitCode = EXIT_CODES.GENERAL_ERROR + ipc.of[IPC_SERVER_ID].emit('error', error) + } + + const writer = new ImageWriter({ + verify: options.validateWriteOnSuccess, + unmountOnSuccess: options.unmountOnSuccess, + checksumAlgorithms: options.checksumAlgorithms || [] + }) + + writer.on('error', onError) + writer.on('progress', onProgress) + writer.on('finish', onFinish) + + writer.write(options.imagePath, destinations) + }) + ipc.of[IPC_SERVER_ID].on('connect', () => { log(`Successfully connected to IPC server: ${IPC_SERVER_ID}, socket root ${ipc.config.socketRoot}`) - log(`Image: ${process.env.OPTION_IMAGE}`) - log(`Device: ${process.env.OPTION_DEVICE}`) - - // These come as strings containing 0 or 1. We need to convert - // them to numbers first, and then to booleans, otherwise something - // like `Boolean('0')` will be `true` - const unmountOnSuccess = Boolean(Number(process.env.OPTION_UNMOUNT)) - const validateWriteOnSuccess = Boolean(Number(process.env.OPTION_VALIDATE)) - log(`Umount on success: ${unmountOnSuccess}`) - log(`Validate on success: ${validateWriteOnSuccess}`) - - writer.writeImage(process.env.OPTION_IMAGE, process.env.OPTION_DEVICE, { - unmountOnSuccess, - validateWriteOnSuccess - }, (state) => { - ipc.of[IPC_SERVER_ID].emit('state', state) - }).then((results) => { - ipc.of[IPC_SERVER_ID].emit('done', results) - terminate(EXIT_CODES.SUCCESS) - }).catch(handleError) + ipc.of[IPC_SERVER_ID].emit('ready', {}) }) }) diff --git a/lib/sdk/image-stream/handlers.js b/lib/sdk/image-stream/handlers.js index 492e3d0b..34266afd 100644 --- a/lib/sdk/image-stream/handlers.js +++ b/lib/sdk/image-stream/handlers.js @@ -40,6 +40,13 @@ const errors = require('../../shared/errors') */ const DEFAULT_EXT = 'img' +/** + * @summary Default read-stream highWaterMark value (4M) + * @type {Number} + * @constant + */ +const STREAM_HWM = 4194304 + /** * @summary Image handlers * @namespace handlers @@ -65,7 +72,7 @@ module.exports = { path: imagePath, archiveExtension: fileExtensions.getLastFileExtension(imagePath), extension: fileExtensions.getPenultimateFileExtension(imagePath) || DEFAULT_EXT, - stream: fs.createReadStream(imagePath), + stream: fs.createReadStream(imagePath, { highWaterMark: STREAM_HWM }), size: { original: options.size, final: { @@ -105,7 +112,7 @@ module.exports = { path: imagePath, archiveExtension: fileExtensions.getLastFileExtension(imagePath), extension: fileExtensions.getPenultimateFileExtension(imagePath) || DEFAULT_EXT, - stream: fs.createReadStream(imagePath), + stream: fs.createReadStream(imagePath, { highWaterMark: STREAM_HWM }), size: { original: options.size, final: { @@ -146,7 +153,7 @@ module.exports = { path: imagePath, archiveExtension: fileExtensions.getLastFileExtension(imagePath), extension: fileExtensions.getPenultimateFileExtension(imagePath) || DEFAULT_EXT, - stream: fs.createReadStream(imagePath), + stream: fs.createReadStream(imagePath, { highWaterMark: STREAM_HWM }), size: { original: options.size, final: { @@ -177,7 +184,7 @@ module.exports = { return { path: imagePath, extension: fileExtensions.getLastFileExtension(imagePath), - stream: udif.createReadStream(imagePath), + stream: udif.createReadStream(imagePath, { highWaterMark: STREAM_HWM }), size: { original: options.size, final: { @@ -231,7 +238,7 @@ module.exports = { return { path: imagePath, extension: fileExtensions.getLastFileExtension(imagePath), - stream: fs.createReadStream(imagePath), + stream: fs.createReadStream(imagePath, { highWaterMark: STREAM_HWM }), size: { original: options.size, final: { diff --git a/lib/sdk/writer/block-write-stream.js b/lib/sdk/writer/block-write-stream.js index 01d28af0..597be99a 100644 --- a/lib/sdk/writer/block-write-stream.js +++ b/lib/sdk/writer/block-write-stream.js @@ -153,7 +153,7 @@ class BlockWriteStream extends stream.Writable { this.blocksWritten += 1 this.position += bytesWritten this.retries = 0 - next(error) + next() return } diff --git a/lib/sdk/writer/index.js b/lib/sdk/writer/index.js index c5565278..096445fb 100644 --- a/lib/sdk/writer/index.js +++ b/lib/sdk/writer/index.js @@ -37,6 +37,7 @@ const debug = require('debug')('etcher:writer') const _ = require('lodash') /* eslint-disable prefer-reflect */ +/* eslint-disable callback-return */ /** * @summary Timeout, in milliseconds, to wait before unmounting on success @@ -77,6 +78,36 @@ const runSeries = (tasks, callback) => { run() } +/** + * @summary Helper function to run a set of async tasks in sequence + * @private + * @param {Array} tasks - set of tasks + * @param {Function} callback - callback(error) + * @example + * runParallel([ + * (next) => first(next), + * (next) => second(next), + * ], (error) => { + * // ... + * }) + */ +const runParallel = (tasks, callback) => { + let count = tasks.length + const resultErrors = new Array(count).fill(null) + const results = new Array(count).fill(null) + + tasks.forEach((task, index) => { + task((error, result) => { + count -= 1 + resultErrors[index] = error + results[index] = result + if (count === 0) { + callback(resultErrors, results) + } + }) + }) +} + /** * @summary ImageWriter class * @class @@ -85,8 +116,6 @@ class ImageWriter extends EventEmitter { /** * @summary ImageWriter constructor * @param {Object} options - options - * @param {String} options.imagePath - disk image path - * @param {String} options.path - dest path * @param {Boolean} options.verify - whether to verify the dest * @param {Boolean} options.unmountOnSuccess - whether to unmount the dest after flashing * @param {Array} options.checksumAlgorithms - checksums to calculate @@ -94,62 +123,83 @@ class ImageWriter extends EventEmitter { * new ImageWriter(options) */ constructor (options) { + options = options || {} super() - this.options = options + debug('new', options) + + this.unmountOnSuccess = Boolean(options.unmountOnSuccess) + this.verifyChecksums = Boolean(options.verify) + this.checksumAlgorithms = options.checksumAlgorithms || [] this.source = null this.pipeline = null - this.target = null + this.destinations = new Map() + + this.finished = false + this.hadError = false this.bytesRead = 0 this.bytesWritten = 0 this.checksum = {} + + this.once('error', () => { + this.hadError = true + }) } /** - * @summary Verify that the selected destination device exists + * @summary Verify that the selected destination devices exist + * @param {Array} paths - target device paths * @param {Function} callback - callback(error) * @private * @example - * writer.checkSelectedDevice((error) => { + * writer.getSelectedDevices(['/dev/disk2'], (error, destinations) => { * // ... * }) */ - checkSelectedDevice (callback) { - debug('state:device-select', this.options.path) - this.destinationDevice = null + getSelectedDevices (paths, callback) { + debug('state:device-select', paths) drivelist.list((error, drives) => { - debug('state:device-select', this.options.path, error ? 'NOT OK' : 'OK') + debug('state:device-select', paths, error ? 'NOT OK' : 'OK') if (error) { callback.call(this, error) return } - const selectedDrive = _.find(drives, { - device: this.options.path + const results = paths.map((path) => { + const destination = { + fd: null, + error: null, + stream: null, + finished: false, + verified: false, + device: _.find(drives, { + device: path + }) + } + + if (!destination.device) { + const selectionError = errors.createUserError({ + title: `The selected drive "${path}" was not found`, + description: `We can't find "${path}" in your system. Did you unplug the drive?`, + code: 'EUNPLUGGED' + }) + debug('state:device-select', destination, 'NOT OK') + destination.error = selectionError + } + + return destination }) - if (!selectedDrive) { - const selectionError = errors.createUserError({ - title: 'The selected drive was not found', - description: `We can't find ${this.options.path} in your system. Did you unplug the drive?`, - code: 'EUNPLUGGED' - }) - debug('state:device-select', this.options.path, 'NOT OK') - callback.call(this, selectionError) - return - } - - this.destinationDevice = selectedDrive - - callback.call(this) + callback.call(this, null, results) }) } /** * @summary Unmount the destination device + * @param {Object} destination - destination object * @param {Function} callback - callback(error) * @private * @example @@ -157,22 +207,23 @@ class ImageWriter extends EventEmitter { * // ... * }) */ - unmountDevice (callback) { + unmountDevice (destination, callback) { if (os.platform() === 'win32') { callback.call(this) return } - debug('state:unmount', this.destinationDevice.device) + debug('state:unmount', destination.device.device) - mountutils.unmountDisk(this.destinationDevice.device, (error) => { - debug('state:unmount', this.destinationDevice.device, error ? 'NOT OK' : 'OK') + mountutils.unmountDisk(destination.device.device, (error) => { + debug('state:unmount', destination.device.device, error ? 'NOT OK' : 'OK') callback.call(this, error) }) } /** * @summary Clean a device's partition table + * @param {Object} destination - destination object * @param {Function} callback - callback(error) * @private * @example @@ -180,56 +231,42 @@ class ImageWriter extends EventEmitter { * // ... * }) */ - removePartitionTable (callback) { + removePartitionTable (destination, callback) { if (os.platform() !== 'win32') { callback.call(this) return } - debug('state:clean', this.destinationDevice.device) + debug('state:clean', destination.device.device) - diskpart.clean(this.destinationDevice.device).asCallback((error) => { - debug('state:clean', this.destinationDevice.device, error ? 'NOT OK' : 'OK') + diskpart.clean(destination.device.device).asCallback((error) => { + debug('state:clean', destination.device.device, error ? 'NOT OK' : 'OK') callback.call(this, error) }) } /** * @summary Open the source for reading + * @param {String} imagePath - path to source image * @param {Function} callback - callback(error) * @private * @example - * writer.openSource((error) => { + * writer.openSource('path/to/image.img', (error, source) => { * // ... * }) */ - openSource (callback) { - debug('state:source-open', this.options.imagePath) - imageStream.getFromFilePath(this.options.imagePath).asCallback((error, image) => { - debug('state:source-open', this.options.imagePath, error ? 'NOT OK' : 'OK') - if (error) { - callback.call(this, error) - return - } - - if (!constraints.isDriveLargeEnough(this.destinationDevice, image)) { - const driveError = errors.createUserError({ - title: 'The image you selected is too big for this drive', - description: 'Please connect a bigger drive and try again' - }) - debug('state:source-open', this.options.imagePath, 'NOT OK') - callback.call(this, driveError) - return - } - - this.options.image = image - - callback.call(this) + openSource (imagePath, callback) { + debug('state:source-open', imagePath) + imageStream.getFromFilePath(imagePath).asCallback((error, image) => { + debug('state:source-open', imagePath, error ? 'NOT OK' : 'OK') + this.source = image + callback.call(this, error, this.source) }) } /** * @summary Open the destination for writing + * @param {Object} destination - destination object * @param {Function} callback - callback(error) * @private * @example @@ -237,8 +274,8 @@ class ImageWriter extends EventEmitter { * // ... * }) */ - openDestination (callback) { - debug('state:destination-open', this.destinationDevice.raw) + openDestination (destination, callback) { + debug('state:destination-open', destination.device.raw) /* eslint-disable no-bitwise */ const flags = fs.constants.O_RDWR | @@ -246,38 +283,93 @@ class ImageWriter extends EventEmitter { fs.constants.O_SYNC /* eslint-enable no-bitwise */ - fs.open(this.destinationDevice.raw, flags, (error, fd) => { - debug('state:destination-open', this.destinationDevice.raw, error ? 'NOT OK' : 'OK') - this.options.fd = fd + fs.open(destination.device.raw, flags, (error, fd) => { + debug('state:destination-open', destination.device.raw, error ? 'NOT OK' : 'OK') + destination.fd = fd callback.call(this, error) }) } + /** + * @summary Check a destinstation against the drive constraints + * @param {Object} destination - destination object + * @param {Function} callback - callback(error) + * @example + * this.checkDriveConstraints(destination, (error) => { + * // ... + * }) + */ + checkDriveConstraints (destination, callback) { + let error = null + + if (!constraints.isDriveLargeEnough(destination.device, this.source)) { + error = errors.createUserError({ + title: 'The image you selected is too big for this drive', + description: 'Please connect a bigger drive and try again' + }) + } + + callback.call(this, error) + } + /** * @summary Start the flashing process + * @param {String} imagePath - path to source image + * @param {Array} destinationPaths - paths to target devices * @returns {ImageWriter} imageWriter * @example - * imageWriter.flash() + * imageWriter.write(source, destinations) * .on('error', reject) * .on('progress', onProgress) * .on('finish', resolve) */ - flash () { - const tasks = [ - (next) => { this.checkSelectedDevice(next) }, - (next) => { this.unmountDevice(next) }, - (next) => { this.removePartitionTable(next) }, - (next) => { this.openSource(next) }, - (next) => { this.openDestination(next) } - ] - - runSeries(tasks, (error) => { - if (error) { - this.emit('error', error) + write (imagePath, destinationPaths) { + // Open the source image + this.openSource(imagePath, (openError, source) => { + if (openError) { + this.emit('error', openError) return } - this.write() + // Open & prepare target devices + this.getSelectedDevices(destinationPaths, (error, destinations) => { + if (error) { + this.emit('error', error) + return + } + + const notFound = _.find(destinations, (destination) => { + return Boolean(destination.error) + }) + + if (notFound) { + this.emit('error', notFound.error) + return + } + + // Generate preparation tasks for all destinations + const tasks = destinations.map((destination) => { + this.destinations.set(destination.device.device, destination) + return (next) => { + runSeries([ + (done) => { this.checkDriveConstraints(destination, done) }, + (done) => { this.unmountDevice(destination, done) }, + (done) => { this.removePartitionTable(destination, done) }, + (done) => { this.openDestination(destination, done) } + ], (preparationError) => { + destination.error = preparationError + next(preparationError) + }) + } + }) + + // Run the preparation tasks in parallel for each destination + runParallel(tasks, (resultErrors, results) => { + // We can start (theoretically) flashing now... + debug('write:prep:done', resultErrors) + this._write() + }) + }) }) return this @@ -289,23 +381,41 @@ class ImageWriter extends EventEmitter { * @example * imageWriter.write() */ - write () { - this._createWritePipeline(this.options) - .on('checksum', (checksum) => { - debug('write:checksum', checksum) - this.checksum = checksum - }) - .on('error', (error) => { - this.emit('error', error) + _write () { + this.pipeline = this._createWritePipeline() + + this.pipeline.on('checksum', (checksum) => { + debug('write:checksum', checksum) + this.checksum = checksum + }) + + this.pipeline.on('error', (error) => { + this.emit('error', error) + }) + + this.pipeline.on('finish', (destination) => { + this.bytesRead = this.source.bytesRead + + let finishedCount = 0 + + this.destinations.forEach((dest) => { + finishedCount += dest.finished ? 1 : 0 }) - this.target.on('finish', () => { - this.bytesRead = this.source.bytesRead - this.bytesWritten = this.target.bytesWritten - if (this.options.verify) { - this.verify() - } else { - this._finish() + debug('write:finish', finishedCount, '/', this.destinations.size) + + if (destination) { + this.bytesWritten += destination.stream.bytesWritten + } + + if (finishedCount === this.destinations.size) { + if (this.verifyChecksums) { + debug('write:verify') + this.verify() + } else { + debug('write:finish') + this._finish() + } } }) @@ -319,26 +429,82 @@ class ImageWriter extends EventEmitter { * imageWriter.verify() */ verify () { - this._createVerifyPipeline(this.options) - .on('error', (error) => { + const progressStream = new ProgressStream({ + length: this.bytesWritten, + time: 500 + }) + + progressStream.resume() + + progressStream.on('progress', (state) => { + state.type = 'check' + state.totalSpeed = 0 + state.active = 0 + this.destinations.forEach((destination) => { + if (!destination.verified && !destination.error) { + state.totalSpeed += destination.progress.state.speed + state.active += 1 + } + }) + state.speed = state.active + ? state.totalSpeed / state.active + : state.active + this.emit('progress', state) + }) + + this.destinations.forEach((destination) => { + // Don't verify errored destinations + if (destination.error || !destination.stream) { + return + } + + const pipeline = this._createVerifyPipeline(destination) + + pipeline.on('error', (error) => { this.emit('error', error) }) - .on('checksum', (checksum) => { + + pipeline.on('checksum', (checksum) => { debug('verify:checksum', this.checksum, '==', checksum) + destination.checksum = checksum if (!_.isEqual(this.checksum, checksum)) { const error = new Error(`Verification failed: ${JSON.stringify(this.checksum)} != ${JSON.stringify(checksum)}`) error.code = 'EVALIDATION' + destination.error = error this.emit('error', error) } - this._finish() }) - .on('finish', () => { - debug('verify:end') - // NOTE: As the 'checksum' event only happens after - // the 'finish' event, we `._finish()` there instead of here + pipeline.on('finish', () => { + debug('verify:finish') + + destination.verified = true + destination.progress = null + destination.stream = null + + let finishedCount = 0 + + this.destinations.forEach((dest) => { + finishedCount += (dest.error || dest.verified) ? 1 : 0 + }) + + if (finishedCount === this.destinations.size) { + debug('verify:complete') + progressStream.end() + this._finish() + } }) + // NOTE: Normally we'd use `pipeline.pipe(progressStream)` here, + // but that leads to degraded performance + pipeline.on('readable', function () { + let chunk = null + while ((chunk = this.read())) { + progressStream.write(chunk) + } + }) + }) + return this } @@ -365,23 +531,43 @@ class ImageWriter extends EventEmitter { */ _cleanup (callback) { debug('state:cleanup') - fs.close(this.options.fd, (closeError) => { - debug('state:cleanup', closeError ? 'NOT OK' : 'OK') - if (!this.options.unmountOnSuccess) { - callback.call(this, closeError) - return - } + const tasks = [] - // Closing a file descriptor on a drive containing mountable - // partitions causes macOS to mount the drive. If we try to - // unmount too quickly, then the drive might get re-mounted - // right afterwards. - setTimeout(() => { - mountutils.unmountDisk(this.destinationDevice.device, (error) => { - debug('state:cleanup', error ? 'NOT OK' : 'OK') - callback.call(this, error) - }) - }, UNMOUNT_ON_SUCCESS_TIMEOUT_MS) + this.destinations.forEach((destination) => { + tasks.push((next) => { + runSeries([ + (done) => { + if (destination.fd) { + fs.close(destination.fd, done) + destination.fd = null + } else { + done() + } + }, + (done) => { + if (!this.unmountOnSuccess) { + done() + return + } + + // Closing a file descriptor on a drive containing mountable + // partitions causes macOS to mount the drive. If we try to + // unmount too quickly, then the drive might get re-mounted + // right afterwards. + setTimeout(() => { + mountutils.unmountDisk(destination.device.device, (error) => { + debug('state:cleanup', error ? 'NOT OK' : 'OK') + done(error) + }) + }, UNMOUNT_ON_SUCCESS_TIMEOUT_MS) + } + ], next) + }) + }) + + runParallel(tasks, (resultErrors, results) => { + debug('state:cleanup', resultErrors) + callback.call(this, resultErrors) }) } @@ -393,8 +579,8 @@ class ImageWriter extends EventEmitter { */ _finish () { this._cleanup(() => { + this.finished = true this.emit('finish', { - drive: this.destinationDevice, bytesRead: this.bytesRead, bytesWritten: this.bytesWritten, checksum: this.checksum @@ -405,28 +591,17 @@ class ImageWriter extends EventEmitter { /** * @summary Creates a write pipeline from given options * @private - * @param {Object} options - options - * @param {Object} options.image - source image - * @param {Number} [options.fd] - destination file descriptor - * @param {String} [options.path] - destination file path - * @param {String} [options.flags] - destination file open flags - * @param {String} [options.mode] - destination file mode * @returns {Pipage} pipeline * @example - * this._createWritePipeline({ - * image: sourceImage, - * path: '/dev/rdisk2' - * }) + * this._createWritePipeline() */ - _createWritePipeline (options) { + _createWritePipeline () { const pipeline = new Pipage({ readableObjectMode: true }) - const image = options.image - const source = image.stream const progressOptions = { - length: image.size.original, + length: this.source.size.original, time: 500 } @@ -434,66 +609,94 @@ class ImageWriter extends EventEmitter { // If the final size is an estimation, // use the original source size for progress metering - if (image.size.final.estimation) { + if (this.source.size.final.estimation) { progressStream = new ProgressStream(progressOptions) pipeline.append(progressStream) } - const isPassThrough = image.transform instanceof stream.PassThrough + const isPassThrough = this.source.transform instanceof stream.PassThrough // If the image transform is a pass-through, // ignore it to save on the overhead - if (image.transform && !isPassThrough) { - pipeline.append(image.transform) + if (this.source.transform && !isPassThrough) { + pipeline.append(this.source.transform) } // If the final size is known precisely and we're not // using block maps, then use the final size for progress - if (!image.size.final.estimation && !image.bmap) { - progressOptions.length = image.size.final.value + if (!this.source.size.final.estimation && !this.source.bmap) { + progressOptions.length = this.source.size.final.value progressStream = new ProgressStream(progressOptions) pipeline.append(progressStream) } - if (image.bmap) { - const blockMap = BlockMap.parse(image.bmap) + if (this.source.bmap) { + const blockMap = BlockMap.parse(this.source.bmap) debug('write:bmap', blockMap) progressStream = new ProgressStream(progressOptions) pipeline.append(progressStream) pipeline.append(new BlockMap.FilterStream(blockMap)) } else { debug('write:blockstream') - const checksumStream = new ChecksumStream({ - objectMode: true, - algorithms: options.checksumAlgorithms - }) pipeline.append(new BlockStream()) - pipeline.append(checksumStream) - pipeline.bind(checksumStream, 'checksum') + if (this.verifyChecksums) { + const checksumStream = new ChecksumStream({ + objectMode: true, + algorithms: this.checksumAlgorithms + }) + pipeline.append(checksumStream) + pipeline.bind(checksumStream, 'checksum') + } } - const target = new BlockWriteStream({ - fd: options.fd, - autoClose: false + this.destinations.forEach((destination) => { + if (destination.error) { + debug('pipeline:skip', destination.device.device) + return + } + + destination.stream = new BlockWriteStream({ + fd: destination.fd, + autoClose: false + }) + + destination.stream.once('finish', () => { + debug('finish:unpipe', destination.device.device) + destination.finished = true + pipeline.emit('finish', destination) + pipeline.unpipe(destination.stream) + }) + + destination.stream.once('error', (error) => { + debug('error:unpipe', destination.device.device) + destination.error = error + destination.finished = true + pipeline.unpipe(destination.stream) + }) + + pipeline.bind(destination.stream, 'error') + pipeline.pipe(destination.stream) }) // Pipeline.bind(progressStream, 'progress'); progressStream.on('progress', (state) => { - state.device = options.path state.type = 'write' - state.speed = target.speed + state.totalSpeed = 0 + state.active = 0 + this.destinations.forEach((destination) => { + if (!destination.finished && !destination.error) { + state.totalSpeed += destination.stream.speed + state.active += 1 + } + }) + state.speed = state.active + ? state.totalSpeed / state.active + : state.active this.emit('progress', state) }) - pipeline.bind(source, 'error') - pipeline.bind(target, 'error') - - source.pipe(pipeline) - .pipe(target) - - this.source = source - this.pipeline = pipeline - this.target = target + pipeline.bind(this.source.stream, 'error') + this.source.stream.pipe(pipeline) return pipeline } @@ -501,25 +704,18 @@ class ImageWriter extends EventEmitter { /** * @summary Creates a verification pipeline from given options * @private - * @param {Object} options - options - * @param {Object} options.image - image - * @param {Number} [options.fd] - file descriptor - * @param {String} [options.path] - file path - * @param {String} [options.flags] - file open flags - * @param {String} [options.mode] - file mode + * @param {Object} destination - the destination object * @returns {Pipage} pipeline * @example - * this._createVerifyPipeline({ - * path: '/dev/rdisk2' - * }) + * this._createVerifyPipeline() */ - _createVerifyPipeline (options) { + _createVerifyPipeline (destination) { const pipeline = new Pipage() - let size = this.bytesWritten + let size = destination.stream.bytesWritten - if (!options.image.size.final.estimation) { - size = Math.max(this.bytesWritten, options.image.size.final.value) + if (!this.source.size.final.estimation) { + size = Math.max(size, this.source.size.final.value) } const progressStream = new ProgressStream({ @@ -529,9 +725,9 @@ class ImageWriter extends EventEmitter { pipeline.append(progressStream) - if (options.image.bmap) { + if (this.source.bmap) { debug('verify:bmap') - const blockMap = BlockMap.parse(options.image.bmap) + const blockMap = BlockMap.parse(this.source.bmap) const blockMapStream = new BlockMap.FilterStream(blockMap) pipeline.append(blockMapStream) @@ -543,14 +739,14 @@ class ImageWriter extends EventEmitter { }) } else { const checksumStream = new ChecksumStream({ - algorithms: options.checksumAlgorithms + algorithms: this.checksumAlgorithms }) pipeline.append(checksumStream) pipeline.bind(checksumStream, 'checksum') } const source = new BlockReadStream({ - fd: options.fd, + fd: destination.fd, autoClose: false, start: 0, end: size @@ -558,17 +754,8 @@ class ImageWriter extends EventEmitter { pipeline.bind(source, 'error') - progressStream.on('progress', (state) => { - state.device = options.path - state.type = 'check' - this.emit('progress', state) - }) - - this.target = null - this.source = source - this.pipeline = pipeline - - source.pipe(pipeline).resume() + destination.stream = source.pipe(pipeline) + destination.progress = progressStream return pipeline } diff --git a/tests/gui/modules/image-writer.spec.js b/tests/gui/modules/image-writer.spec.js index a01d67a9..b566babf 100644 --- a/tests/gui/modules/image-writer.spec.js +++ b/tests/gui/modules/image-writer.spec.js @@ -29,7 +29,7 @@ describe('Browser: imageWriter', () => { sourceChecksum: '1234' }) - imageWriter.flash('foo.img', '/dev/disk2').finally(() => { + imageWriter.flash('foo.img', [ '/dev/disk2' ]).finally(() => { m.chai.expect(flashState.isFlashing()).to.be.false }) }) @@ -40,18 +40,18 @@ describe('Browser: imageWriter', () => { sourceChecksum: '1234' }) - const writing = imageWriter.flash('foo.img', '/dev/disk2') - imageWriter.flash('foo.img', '/dev/disk2').catch(angular.noop) + const writing = imageWriter.flash('foo.img', [ '/dev/disk2' ]) + imageWriter.flash('foo.img', [ '/dev/disk2' ]).catch(angular.noop) writing.finally(() => { m.chai.expect(this.performWriteStub).to.have.been.calledOnce }) }) it('should reject the second flash attempt', () => { - imageWriter.flash('foo.img', '/dev/disk2') + imageWriter.flash('foo.img', [ '/dev/disk2' ]) let rejectError = null - imageWriter.flash('foo.img', '/dev/disk2').catch((error) => { + imageWriter.flash('foo.img', [ '/dev/disk2' ]).catch((error) => { rejectError = error }).finally(() => { m.chai.expect(rejectError).to.be.an.instanceof(Error) @@ -73,13 +73,13 @@ describe('Browser: imageWriter', () => { }) it('should set flashing to false when done', () => { - imageWriter.flash('foo.img', '/dev/disk2').catch(angular.noop).finally(() => { + imageWriter.flash('foo.img', [ '/dev/disk2' ]).catch(angular.noop).finally(() => { m.chai.expect(flashState.isFlashing()).to.be.false }) }) it('should set the error code in the flash results', () => { - imageWriter.flash('foo.img', '/dev/disk2').catch(angular.noop).finally(() => { + imageWriter.flash('foo.img', [ '/dev/disk2' ]).catch(angular.noop).finally(() => { const flashResults = flashState.getFlashResults() m.chai.expect(flashResults.errorCode).to.equal('FOO') }) @@ -92,7 +92,7 @@ describe('Browser: imageWriter', () => { }) let rejection - imageWriter.flash('foo.img', '/dev/disk2').catch((error) => { + imageWriter.flash('foo.img', [ '/dev/disk2' ]).catch((error) => { rejection = error }).finally(() => { m.chai.expect(rejection).to.be.an.instanceof(Error) From ef634227aac2833241817b1885024b116512bf4e Mon Sep 17 00:00:00 2001 From: Jonas Hermsmeier Date: Tue, 20 Mar 2018 23:07:39 +0100 Subject: [PATCH 2/3] feat(cli): Display number of active cards Change-Type: patch --- lib/cli/etcher.js | 2 +- lib/sdk/image-stream/handlers.js | 4 +- lib/sdk/writer/index.js | 66 ++++++++++++++++++++------------ 3 files changed, 45 insertions(+), 27 deletions(-) diff --git a/lib/cli/etcher.js b/lib/cli/etcher.js index 511592b3..2acf45a6 100644 --- a/lib/cli/etcher.js +++ b/lib/cli/etcher.js @@ -87,7 +87,7 @@ permissions.isElevated().then((elevated) => { */ const onProgress = (state) => { state.message = state.active > 1 - ? `${bytes(state.totalSpeed)}/s total, ${bytes(state.speed)}/s avg` + ? `${bytes(state.totalSpeed)}/s total, ${bytes(state.speed)}/s x ${state.active}` : `${bytes(state.totalSpeed)}/s` state.message = `${state.type === 'write' ? 'Flashing' : 'Validating'}: ${state.message}` diff --git a/lib/sdk/image-stream/handlers.js b/lib/sdk/image-stream/handlers.js index 34266afd..75f2d690 100644 --- a/lib/sdk/image-stream/handlers.js +++ b/lib/sdk/image-stream/handlers.js @@ -41,11 +41,11 @@ const errors = require('../../shared/errors') const DEFAULT_EXT = 'img' /** - * @summary Default read-stream highWaterMark value (4M) + * @summary Default read-stream highWaterMark value (1M) * @type {Number} * @constant */ -const STREAM_HWM = 4194304 +const STREAM_HWM = 1048576 /** * @summary Image handlers diff --git a/lib/sdk/writer/index.js b/lib/sdk/writer/index.js index 096445fb..74d573de 100644 --- a/lib/sdk/writer/index.js +++ b/lib/sdk/writer/index.js @@ -375,6 +375,46 @@ class ImageWriter extends EventEmitter { return this } + /** + * @summary Internal progress state handler + * @param {Object} state - progress state + * @example + * pipeline.on('progress', (state) => { + * // ... + * this._onProgress(state) + * }) + */ + _onProgress (state) { + state.totalSpeed = 0 + state.active = 0 + + state.flashing = 0 + state.verifying = 0 + state.failed = 0 + state.succeeded = 0 + + this.destinations.forEach((dest) => { + state.flashing += !dest.error && !dest.finished ? 1 : 0 + state.verifying += !dest.error && !dest.verified ? 1 : 0 + state.failed += dest.error ? 1 : 0 + if (!(dest.finished && dest.verified) && !dest.error) { + state.totalSpeed += state.type === 'write' + ? dest.stream.speed + : dest.progress.state.speed + state.active += 1 + } + }) + + state.speed = state.active + ? state.totalSpeed / state.active + : state.active + + state.succeeded = state.active - state.failed - state.flashing - state.verifying + state.eta = state.speed ? state.remaining / state.speed : 0 + + this.emit('progress', state) + } + /** * @summary Start the writing process * @returns {ImageWriter} imageWriter @@ -438,18 +478,7 @@ class ImageWriter extends EventEmitter { progressStream.on('progress', (state) => { state.type = 'check' - state.totalSpeed = 0 - state.active = 0 - this.destinations.forEach((destination) => { - if (!destination.verified && !destination.error) { - state.totalSpeed += destination.progress.state.speed - state.active += 1 - } - }) - state.speed = state.active - ? state.totalSpeed / state.active - : state.active - this.emit('progress', state) + this._onProgress(state) }) this.destinations.forEach((destination) => { @@ -681,18 +710,7 @@ class ImageWriter extends EventEmitter { // Pipeline.bind(progressStream, 'progress'); progressStream.on('progress', (state) => { state.type = 'write' - state.totalSpeed = 0 - state.active = 0 - this.destinations.forEach((destination) => { - if (!destination.finished && !destination.error) { - state.totalSpeed += destination.stream.speed - state.active += 1 - } - }) - state.speed = state.active - ? state.totalSpeed / state.active - : state.active - this.emit('progress', state) + this._onProgress(state) }) pipeline.bind(this.source.stream, 'error') From 3424b996c83b07a502bb8e84c2d3715c4b266450 Mon Sep 17 00:00:00 2001 From: Jonas Hermsmeier Date: Wed, 21 Mar 2018 19:52:37 +0100 Subject: [PATCH 3/3] fix(writer): Fix state verification count Change-Type: patch --- lib/sdk/writer/index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/sdk/writer/index.js b/lib/sdk/writer/index.js index 74d573de..cb4bc4b1 100644 --- a/lib/sdk/writer/index.js +++ b/lib/sdk/writer/index.js @@ -395,7 +395,7 @@ class ImageWriter extends EventEmitter { this.destinations.forEach((dest) => { state.flashing += !dest.error && !dest.finished ? 1 : 0 - state.verifying += !dest.error && !dest.verified ? 1 : 0 + state.verifying += !dest.error && dest.finished && !dest.verified ? 1 : 0 state.failed += dest.error ? 1 : 0 if (!(dest.finished && dest.verified) && !dest.error) { state.totalSpeed += state.type === 'write'