feat(writer): Impl multi-writes in writer modules

Implement writing to multiple destinations simultaneously

Change-Type: minor
Changelog-Entry: Implement writing to multiple destinations simultaneously
This commit is contained in:
Jonas Hermsmeier 2018-02-24 00:31:06 +01:00
parent 835f2cf769
commit c724e4cb20
No known key found for this signature in database
GPG Key ID: 1B870F801A0CEE9F
9 changed files with 554 additions and 327 deletions

View File

@ -16,11 +16,12 @@
'use strict'
const path = require('path')
const _ = require('lodash')
const Bluebird = require('bluebird')
const visuals = require('resin-cli-visuals')
const form = require('resin-cli-form')
const writer = require('./writer')
const bytes = require('pretty-bytes')
const ImageWriter = require('../sdk/writer')
const utils = require('./utils')
const options = require('./options')
const messages = require('../shared/messages')
@ -28,6 +29,8 @@ const EXIT_CODES = require('../shared/exit-codes')
const errors = require('../shared/errors')
const permissions = require('../shared/permissions')
/* eslint-disable no-magic-numbers */
const ARGV_IMAGE_PATH_INDEX = 0
const imagePath = options._[ARGV_IMAGE_PATH_INDEX]
@ -59,7 +62,6 @@ permissions.isElevated().then((elevated) => {
// otherwise the question will not be asked because
// `false` is a defined value.
yes: options.yes || null
}
})
}).then((answers) => {
@ -75,29 +77,79 @@ permissions.isElevated().then((elevated) => {
check: new visuals.Progress('Validating')
}
return writer.writeImage(imagePath, answers.drive, {
unmountOnSuccess: options.unmount,
validateWriteOnSuccess: options.check
}, (state) => {
progressBars[state.type].update(state)
}).then((results) => {
return {
imagePath,
flash: results
return new Bluebird((resolve, reject) => {
/**
* @summary Progress update handler
* @param {Object} state - progress state
* @private
* @example
* writer.on('progress', onProgress)
*/
const onProgress = (state) => {
state.message = state.active > 1
? `${bytes(state.totalSpeed)}/s total, ${bytes(state.speed)}/s avg`
: `${bytes(state.totalSpeed)}/s`
state.message = `${state.type === 'write' ? 'Flashing' : 'Validating'}: ${state.message}`
// Update progress bar
progressBars[state.type].update(state)
}
const writer = new ImageWriter({
verify: options.check,
unmountOnSuccess: options.unmount,
checksumAlgorithms: options.check ? [ 'sha512' ] : []
})
/**
* @summary Error handler
* @param {Error} error - error
* @private
* @example
* writer.on('error', onError)
*/
const onError = function (error) {
console.error(error)
}
/**
* @summary Finish handler
* @private
* @example
* writer.on('finish', onFinish)
*/
const onFinish = function () {
resolve(Array.from(writer.destinations.values()))
}
writer.on('progress', onProgress)
writer.on('error', onError)
writer.on('finish', onFinish)
// NOTE: Drive can be (String|Array)
const destinations = [].concat(answers.drive)
writer.write(imagePath, destinations)
})
}).then((results) => {
return Bluebird.try(() => {
console.log(messages.info.flashComplete(path.basename(results.imagePath), results.flash.drive))
let exitCode = EXIT_CODES.SUCCESS
if (results.flash.checksum.md5) {
console.log(`Checksum: ${results.flash.checksum.md5}`)
}
if (options.check) {
console.log('')
console.log('Checksums:')
return Bluebird.resolve()
}).then(() => {
process.exit(EXIT_CODES.SUCCESS)
})
_.forEach(results, (result) => {
if (result.error) {
exitCode = EXIT_CODES.GENERAL_ERROR
console.log(` - ${result.device.device}: ${result.error.message}`)
} else {
console.log(` - ${result.device.device}: ${result.checksum.sha512}`)
}
})
}
process.exit(exitCode)
}).catch((error) => {
return Bluebird.try(() => {
utils.printError(error)

View File

@ -1,62 +0,0 @@
/*
* Copyright 2016 resin.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict'
const Bluebird = require('bluebird')
const ImageWriter = require('../sdk/writer')
/**
* @summary Write an image to a disk drive
* @function
* @public
*
* @param {String} imagePath - path to image
* @param {String} drive - drive
* @param {Object} options - options
* @param {Boolean} [options.unmountOnSuccess=false] - unmount on success
* @param {Boolean} [options.validateWriteOnSuccess=false] - validate write on success
* @param {Function} onProgress - on progress callback (state)
*
* @fulfil {Boolean} - whether the operation was successful
* @returns {Promise}
*
* @example
* writer.writeImage('path/to/image.img', '/dev/disk2', {
* unmountOnSuccess: true,
* validateWriteOnSuccess: true
* }, (state) => {
* console.log(state.percentage);
* }).then(() => {
* console.log('Done!');
* });
*/
exports.writeImage = (imagePath, drive, options, onProgress) => {
const writer = new ImageWriter({
path: drive,
imagePath,
verify: options.validateWriteOnSuccess,
checksumAlgorithms: [ 'md5' ],
unmountOnSuccess: options.unmountOnSuccess
})
return new Bluebird((resolve, reject) => {
writer.flash()
.on('error', reject)
.on('progress', onProgress)
.on('finish', resolve)
})
}

View File

@ -36,7 +36,7 @@ const packageJSON = require('../../../../package.json')
* @type {Number}
* @constant
*/
const THREADS_PER_CPU = 4
const THREADS_PER_CPU = 16
/**
* @summary Get application entry point
@ -69,7 +69,7 @@ const getApplicationEntryPoint = () => {
* This function is extracted for testing purposes.
*
* @param {String} image - image path
* @param {Object} drive - drive
* @param {Array<Object>} drives - drives
* @param {Function} onProgress - in progress callback (state)
*
* @fulfil {Object} - flash results
@ -82,7 +82,7 @@ const getApplicationEntryPoint = () => {
* console.log(state.percentage)
* })
*/
exports.performWrite = (image, drive, onProgress) => {
exports.performWrite = (image, drives, onProgress) => {
// There might be multiple Etcher instances running at
// the same time, therefore we must ensure each IPC
// server/client has a different name.
@ -135,6 +135,16 @@ exports.performWrite = (image, drive, onProgress) => {
ipc.server.on('state', onProgress)
ipc.server.on('ready', (data, socket) => {
ipc.server.emit(socket, 'write', {
imagePath: image,
destinations: _.map(drives, [ 'device' ]),
validateWriteOnSuccess: settings.get('validateWriteOnSuccess'),
unmountOnSuccess: settings.get('unmountOnSuccess'),
checksumAlgorithms: [ 'sha512' ]
})
})
const argv = _.attempt(() => {
const entryPoint = getApplicationEntryPoint()
@ -167,31 +177,24 @@ exports.performWrite = (image, drive, onProgress) => {
permissions.elevateCommand(argv, {
applicationName: packageJSON.displayName,
environment: {
environment: _.assign({}, process.env, {
IPC_SERVER_ID,
IPC_CLIENT_ID,
IPC_SOCKET_ROOT: ipc.config.socketRoot,
ELECTRON_RUN_AS_NODE: 1,
UV_THREADPOOL_SIZE: os.cpus().length * THREADS_PER_CPU,
// Casting to Number nicely converts booleans to 0 or 1.
OPTION_VALIDATE: Number(settings.get('validateWriteOnSuccess')),
OPTION_UNMOUNT: Number(settings.get('unmountOnSuccess')),
OPTION_IMAGE: image,
OPTION_DEVICE: drive.device,
// This environment variable prevents the AppImages
// desktop integration script from presenting the
// "installation" dialog
SKIP: 1
}
})
}).then((results) => {
flashResults.cancelled = results.cancelled
console.log('Flash results', flashResults)
// This likely means the child died halfway through
if (!flashResults.cancelled && !flashResults.bytesWritten) {
if (!flashResults.cancelled && !_.get(flashResults, [ 'results', 'bytesWritten' ])) {
throw errors.createUserError({
title: 'The writer process ended unexpectedly',
description: 'Please try again, and contact the Etcher team if the problem persists',
@ -206,7 +209,6 @@ exports.performWrite = (image, drive, onProgress) => {
if (error.code === SIGKILL_EXIT_CODE) {
error.code = 'ECHILDDIED'
}
return reject(error)
}).finally(() => {
console.log('Terminating IPC server')
@ -219,7 +221,7 @@ exports.performWrite = (image, drive, onProgress) => {
}
/**
* @summary Flash an image to a drive
* @summary Flash an image to drives
* @function
* @public
*
@ -227,17 +229,17 @@ exports.performWrite = (image, drive, onProgress) => {
* This function will update `imageWriter.state` with the current writing state.
*
* @param {String} image - image path
* @param {Object} drive - drive
* @param {Array<Object>} drives - drives
* @returns {Promise}
*
* @example
* imageWriter.flash('foo.img', {
* imageWriter.flash('foo.img', [{
* device: '/dev/disk2'
* }).then(() => {
* }]).then(() => {
* console.log('Write completed!')
* })
*/
exports.flash = (image, drive) => {
exports.flash = (image, drives) => {
if (flashState.isFlashing()) {
return Bluebird.reject(new Error('There is already a flash in progress'))
}
@ -246,7 +248,7 @@ exports.flash = (image, drive) => {
const analyticsData = {
image,
drive,
drives,
uuid: flashState.getFlashUuid(),
unmountOnSuccess: settings.get('unmountOnSuccess'),
validateWriteOnSuccess: settings.get('validateWriteOnSuccess')
@ -254,7 +256,7 @@ exports.flash = (image, drive) => {
analytics.logEvent('Flash', analyticsData)
return exports.performWrite(image, drive, (state) => {
return exports.performWrite(image, drives, (state) => {
flashState.setProgressState(state)
}).then(flashState.unsetFlashingFlag).then(() => {
if (flashState.wasLastFlashCancelled()) {

View File

@ -72,7 +72,7 @@ module.exports = function (
const iconPath = '../../../assets/icon.png'
imageWriter.flash(image.path, drive).then(() => {
imageWriter.flash(image.path, [ drive ]).then(() => {
if (!flashState.wasLastFlashCancelled()) {
notification.send('Success!', {
body: messages.info.flashComplete(path.basename(image.path), drive),

View File

@ -19,7 +19,7 @@
const ipc = require('node-ipc')
const EXIT_CODES = require('../../shared/exit-codes')
const errors = require('../../shared/errors')
const writer = require('../../cli/writer')
const ImageWriter = require('../../sdk/writer')
ipc.config.id = process.env.IPC_CLIENT_ID
ipc.config.socketRoot = process.env.IPC_SOCKET_ROOT
@ -64,7 +64,9 @@ const log = (message) => {
*/
const terminate = (code) => {
ipc.disconnect(IPC_SERVER_ID)
process.exit(code || EXIT_CODES.SUCCESS)
process.nextTick(() => {
process.exit(code || EXIT_CODES.SUCCESS)
})
}
/**
@ -93,6 +95,7 @@ ipc.connectTo(IPC_SERVER_ID, () => {
process.once('SIGINT', () => {
terminate(EXIT_CODES.SUCCESS)
})
process.once('SIGTERM', () => {
terminate(EXIT_CODES.SUCCESS)
})
@ -107,27 +110,65 @@ ipc.connectTo(IPC_SERVER_ID, () => {
terminate(EXIT_CODES.SUCCESS)
})
ipc.of[IPC_SERVER_ID].on('write', (options) => {
const destinations = [].concat(options.destinations)
log(`Image: ${options.imagePath}`)
log(`Devices: ${destinations.join(', ')}`)
log(`Umount on success: ${options.unmountOnSuccess}`)
log(`Validate on success: ${options.validateWriteOnSuccess}`)
let exitCode = EXIT_CODES.SUCCESS
/**
* @summary Progress handler
* @param {Object} state - progress state
* @example
* writer.on('progress', onProgress)
*/
const onProgress = (state) => {
ipc.of[IPC_SERVER_ID].emit('state', state)
}
/**
* @summary Finish handler
* @param {Object} results - Flash results
* @example
* writer.on('finish', onFinish)
*/
const onFinish = (results) => {
log(`Finish: ${results.bytesWritten}`)
ipc.of[IPC_SERVER_ID].emit('done', { results })
terminate(exitCode)
}
/**
* @summary Error handler
* @param {Error} error - error
* @example
* writer.on('error', onError)
*/
const onError = (error) => {
log(`Error: ${error.message}`)
exitCode = EXIT_CODES.GENERAL_ERROR
ipc.of[IPC_SERVER_ID].emit('error', error)
}
const writer = new ImageWriter({
verify: options.validateWriteOnSuccess,
unmountOnSuccess: options.unmountOnSuccess,
checksumAlgorithms: options.checksumAlgorithms || []
})
writer.on('error', onError)
writer.on('progress', onProgress)
writer.on('finish', onFinish)
writer.write(options.imagePath, destinations)
})
ipc.of[IPC_SERVER_ID].on('connect', () => {
log(`Successfully connected to IPC server: ${IPC_SERVER_ID}, socket root ${ipc.config.socketRoot}`)
log(`Image: ${process.env.OPTION_IMAGE}`)
log(`Device: ${process.env.OPTION_DEVICE}`)
// These come as strings containing 0 or 1. We need to convert
// them to numbers first, and then to booleans, otherwise something
// like `Boolean('0')` will be `true`
const unmountOnSuccess = Boolean(Number(process.env.OPTION_UNMOUNT))
const validateWriteOnSuccess = Boolean(Number(process.env.OPTION_VALIDATE))
log(`Umount on success: ${unmountOnSuccess}`)
log(`Validate on success: ${validateWriteOnSuccess}`)
writer.writeImage(process.env.OPTION_IMAGE, process.env.OPTION_DEVICE, {
unmountOnSuccess,
validateWriteOnSuccess
}, (state) => {
ipc.of[IPC_SERVER_ID].emit('state', state)
}).then((results) => {
ipc.of[IPC_SERVER_ID].emit('done', results)
terminate(EXIT_CODES.SUCCESS)
}).catch(handleError)
ipc.of[IPC_SERVER_ID].emit('ready', {})
})
})

View File

@ -40,6 +40,13 @@ const errors = require('../../shared/errors')
*/
const DEFAULT_EXT = 'img'
/**
* @summary Default read-stream highWaterMark value (4M)
* @type {Number}
* @constant
*/
const STREAM_HWM = 4194304
/**
* @summary Image handlers
* @namespace handlers
@ -65,7 +72,7 @@ module.exports = {
path: imagePath,
archiveExtension: fileExtensions.getLastFileExtension(imagePath),
extension: fileExtensions.getPenultimateFileExtension(imagePath) || DEFAULT_EXT,
stream: fs.createReadStream(imagePath),
stream: fs.createReadStream(imagePath, { highWaterMark: STREAM_HWM }),
size: {
original: options.size,
final: {
@ -105,7 +112,7 @@ module.exports = {
path: imagePath,
archiveExtension: fileExtensions.getLastFileExtension(imagePath),
extension: fileExtensions.getPenultimateFileExtension(imagePath) || DEFAULT_EXT,
stream: fs.createReadStream(imagePath),
stream: fs.createReadStream(imagePath, { highWaterMark: STREAM_HWM }),
size: {
original: options.size,
final: {
@ -146,7 +153,7 @@ module.exports = {
path: imagePath,
archiveExtension: fileExtensions.getLastFileExtension(imagePath),
extension: fileExtensions.getPenultimateFileExtension(imagePath) || DEFAULT_EXT,
stream: fs.createReadStream(imagePath),
stream: fs.createReadStream(imagePath, { highWaterMark: STREAM_HWM }),
size: {
original: options.size,
final: {
@ -177,7 +184,7 @@ module.exports = {
return {
path: imagePath,
extension: fileExtensions.getLastFileExtension(imagePath),
stream: udif.createReadStream(imagePath),
stream: udif.createReadStream(imagePath, { highWaterMark: STREAM_HWM }),
size: {
original: options.size,
final: {
@ -231,7 +238,7 @@ module.exports = {
return {
path: imagePath,
extension: fileExtensions.getLastFileExtension(imagePath),
stream: fs.createReadStream(imagePath),
stream: fs.createReadStream(imagePath, { highWaterMark: STREAM_HWM }),
size: {
original: options.size,
final: {

View File

@ -153,7 +153,7 @@ class BlockWriteStream extends stream.Writable {
this.blocksWritten += 1
this.position += bytesWritten
this.retries = 0
next(error)
next()
return
}

View File

@ -37,6 +37,7 @@ const debug = require('debug')('etcher:writer')
const _ = require('lodash')
/* eslint-disable prefer-reflect */
/* eslint-disable callback-return */
/**
* @summary Timeout, in milliseconds, to wait before unmounting on success
@ -77,6 +78,36 @@ const runSeries = (tasks, callback) => {
run()
}
/**
* @summary Helper function to run a set of async tasks in sequence
* @private
* @param {Array<Function>} tasks - set of tasks
* @param {Function} callback - callback(error)
* @example
* runParallel([
* (next) => first(next),
* (next) => second(next),
* ], (error) => {
* // ...
* })
*/
const runParallel = (tasks, callback) => {
let count = tasks.length
const resultErrors = new Array(count).fill(null)
const results = new Array(count).fill(null)
tasks.forEach((task, index) => {
task((error, result) => {
count -= 1
resultErrors[index] = error
results[index] = result
if (count === 0) {
callback(resultErrors, results)
}
})
})
}
/**
* @summary ImageWriter class
* @class
@ -85,8 +116,6 @@ class ImageWriter extends EventEmitter {
/**
* @summary ImageWriter constructor
* @param {Object} options - options
* @param {String} options.imagePath - disk image path
* @param {String} options.path - dest path
* @param {Boolean} options.verify - whether to verify the dest
* @param {Boolean} options.unmountOnSuccess - whether to unmount the dest after flashing
* @param {Array<String>} options.checksumAlgorithms - checksums to calculate
@ -94,62 +123,83 @@ class ImageWriter extends EventEmitter {
* new ImageWriter(options)
*/
constructor (options) {
options = options || {}
super()
this.options = options
debug('new', options)
this.unmountOnSuccess = Boolean(options.unmountOnSuccess)
this.verifyChecksums = Boolean(options.verify)
this.checksumAlgorithms = options.checksumAlgorithms || []
this.source = null
this.pipeline = null
this.target = null
this.destinations = new Map()
this.finished = false
this.hadError = false
this.bytesRead = 0
this.bytesWritten = 0
this.checksum = {}
this.once('error', () => {
this.hadError = true
})
}
/**
* @summary Verify that the selected destination device exists
* @summary Verify that the selected destination devices exist
* @param {Array<String>} paths - target device paths
* @param {Function} callback - callback(error)
* @private
* @example
* writer.checkSelectedDevice((error) => {
* writer.getSelectedDevices(['/dev/disk2'], (error, destinations) => {
* // ...
* })
*/
checkSelectedDevice (callback) {
debug('state:device-select', this.options.path)
this.destinationDevice = null
getSelectedDevices (paths, callback) {
debug('state:device-select', paths)
drivelist.list((error, drives) => {
debug('state:device-select', this.options.path, error ? 'NOT OK' : 'OK')
debug('state:device-select', paths, error ? 'NOT OK' : 'OK')
if (error) {
callback.call(this, error)
return
}
const selectedDrive = _.find(drives, {
device: this.options.path
const results = paths.map((path) => {
const destination = {
fd: null,
error: null,
stream: null,
finished: false,
verified: false,
device: _.find(drives, {
device: path
})
}
if (!destination.device) {
const selectionError = errors.createUserError({
title: `The selected drive "${path}" was not found`,
description: `We can't find "${path}" in your system. Did you unplug the drive?`,
code: 'EUNPLUGGED'
})
debug('state:device-select', destination, 'NOT OK')
destination.error = selectionError
}
return destination
})
if (!selectedDrive) {
const selectionError = errors.createUserError({
title: 'The selected drive was not found',
description: `We can't find ${this.options.path} in your system. Did you unplug the drive?`,
code: 'EUNPLUGGED'
})
debug('state:device-select', this.options.path, 'NOT OK')
callback.call(this, selectionError)
return
}
this.destinationDevice = selectedDrive
callback.call(this)
callback.call(this, null, results)
})
}
/**
* @summary Unmount the destination device
* @param {Object} destination - destination object
* @param {Function} callback - callback(error)
* @private
* @example
@ -157,22 +207,23 @@ class ImageWriter extends EventEmitter {
* // ...
* })
*/
unmountDevice (callback) {
unmountDevice (destination, callback) {
if (os.platform() === 'win32') {
callback.call(this)
return
}
debug('state:unmount', this.destinationDevice.device)
debug('state:unmount', destination.device.device)
mountutils.unmountDisk(this.destinationDevice.device, (error) => {
debug('state:unmount', this.destinationDevice.device, error ? 'NOT OK' : 'OK')
mountutils.unmountDisk(destination.device.device, (error) => {
debug('state:unmount', destination.device.device, error ? 'NOT OK' : 'OK')
callback.call(this, error)
})
}
/**
* @summary Clean a device's partition table
* @param {Object} destination - destination object
* @param {Function} callback - callback(error)
* @private
* @example
@ -180,56 +231,42 @@ class ImageWriter extends EventEmitter {
* // ...
* })
*/
removePartitionTable (callback) {
removePartitionTable (destination, callback) {
if (os.platform() !== 'win32') {
callback.call(this)
return
}
debug('state:clean', this.destinationDevice.device)
debug('state:clean', destination.device.device)
diskpart.clean(this.destinationDevice.device).asCallback((error) => {
debug('state:clean', this.destinationDevice.device, error ? 'NOT OK' : 'OK')
diskpart.clean(destination.device.device).asCallback((error) => {
debug('state:clean', destination.device.device, error ? 'NOT OK' : 'OK')
callback.call(this, error)
})
}
/**
* @summary Open the source for reading
* @param {String} imagePath - path to source image
* @param {Function} callback - callback(error)
* @private
* @example
* writer.openSource((error) => {
* writer.openSource('path/to/image.img', (error, source) => {
* // ...
* })
*/
openSource (callback) {
debug('state:source-open', this.options.imagePath)
imageStream.getFromFilePath(this.options.imagePath).asCallback((error, image) => {
debug('state:source-open', this.options.imagePath, error ? 'NOT OK' : 'OK')
if (error) {
callback.call(this, error)
return
}
if (!constraints.isDriveLargeEnough(this.destinationDevice, image)) {
const driveError = errors.createUserError({
title: 'The image you selected is too big for this drive',
description: 'Please connect a bigger drive and try again'
})
debug('state:source-open', this.options.imagePath, 'NOT OK')
callback.call(this, driveError)
return
}
this.options.image = image
callback.call(this)
openSource (imagePath, callback) {
debug('state:source-open', imagePath)
imageStream.getFromFilePath(imagePath).asCallback((error, image) => {
debug('state:source-open', imagePath, error ? 'NOT OK' : 'OK')
this.source = image
callback.call(this, error, this.source)
})
}
/**
* @summary Open the destination for writing
* @param {Object} destination - destination object
* @param {Function} callback - callback(error)
* @private
* @example
@ -237,8 +274,8 @@ class ImageWriter extends EventEmitter {
* // ...
* })
*/
openDestination (callback) {
debug('state:destination-open', this.destinationDevice.raw)
openDestination (destination, callback) {
debug('state:destination-open', destination.device.raw)
/* eslint-disable no-bitwise */
const flags = fs.constants.O_RDWR |
@ -246,38 +283,93 @@ class ImageWriter extends EventEmitter {
fs.constants.O_SYNC
/* eslint-enable no-bitwise */
fs.open(this.destinationDevice.raw, flags, (error, fd) => {
debug('state:destination-open', this.destinationDevice.raw, error ? 'NOT OK' : 'OK')
this.options.fd = fd
fs.open(destination.device.raw, flags, (error, fd) => {
debug('state:destination-open', destination.device.raw, error ? 'NOT OK' : 'OK')
destination.fd = fd
callback.call(this, error)
})
}
/**
* @summary Check a destinstation against the drive constraints
* @param {Object} destination - destination object
* @param {Function} callback - callback(error)
* @example
* this.checkDriveConstraints(destination, (error) => {
* // ...
* })
*/
checkDriveConstraints (destination, callback) {
let error = null
if (!constraints.isDriveLargeEnough(destination.device, this.source)) {
error = errors.createUserError({
title: 'The image you selected is too big for this drive',
description: 'Please connect a bigger drive and try again'
})
}
callback.call(this, error)
}
/**
* @summary Start the flashing process
* @param {String} imagePath - path to source image
* @param {Array<String>} destinationPaths - paths to target devices
* @returns {ImageWriter} imageWriter
* @example
* imageWriter.flash()
* imageWriter.write(source, destinations)
* .on('error', reject)
* .on('progress', onProgress)
* .on('finish', resolve)
*/
flash () {
const tasks = [
(next) => { this.checkSelectedDevice(next) },
(next) => { this.unmountDevice(next) },
(next) => { this.removePartitionTable(next) },
(next) => { this.openSource(next) },
(next) => { this.openDestination(next) }
]
runSeries(tasks, (error) => {
if (error) {
this.emit('error', error)
write (imagePath, destinationPaths) {
// Open the source image
this.openSource(imagePath, (openError, source) => {
if (openError) {
this.emit('error', openError)
return
}
this.write()
// Open & prepare target devices
this.getSelectedDevices(destinationPaths, (error, destinations) => {
if (error) {
this.emit('error', error)
return
}
const notFound = _.find(destinations, (destination) => {
return Boolean(destination.error)
})
if (notFound) {
this.emit('error', notFound.error)
return
}
// Generate preparation tasks for all destinations
const tasks = destinations.map((destination) => {
this.destinations.set(destination.device.device, destination)
return (next) => {
runSeries([
(done) => { this.checkDriveConstraints(destination, done) },
(done) => { this.unmountDevice(destination, done) },
(done) => { this.removePartitionTable(destination, done) },
(done) => { this.openDestination(destination, done) }
], (preparationError) => {
destination.error = preparationError
next(preparationError)
})
}
})
// Run the preparation tasks in parallel for each destination
runParallel(tasks, (resultErrors, results) => {
// We can start (theoretically) flashing now...
debug('write:prep:done', resultErrors)
this._write()
})
})
})
return this
@ -289,23 +381,41 @@ class ImageWriter extends EventEmitter {
* @example
* imageWriter.write()
*/
write () {
this._createWritePipeline(this.options)
.on('checksum', (checksum) => {
debug('write:checksum', checksum)
this.checksum = checksum
})
.on('error', (error) => {
this.emit('error', error)
_write () {
this.pipeline = this._createWritePipeline()
this.pipeline.on('checksum', (checksum) => {
debug('write:checksum', checksum)
this.checksum = checksum
})
this.pipeline.on('error', (error) => {
this.emit('error', error)
})
this.pipeline.on('finish', (destination) => {
this.bytesRead = this.source.bytesRead
let finishedCount = 0
this.destinations.forEach((dest) => {
finishedCount += dest.finished ? 1 : 0
})
this.target.on('finish', () => {
this.bytesRead = this.source.bytesRead
this.bytesWritten = this.target.bytesWritten
if (this.options.verify) {
this.verify()
} else {
this._finish()
debug('write:finish', finishedCount, '/', this.destinations.size)
if (destination) {
this.bytesWritten += destination.stream.bytesWritten
}
if (finishedCount === this.destinations.size) {
if (this.verifyChecksums) {
debug('write:verify')
this.verify()
} else {
debug('write:finish')
this._finish()
}
}
})
@ -319,26 +429,82 @@ class ImageWriter extends EventEmitter {
* imageWriter.verify()
*/
verify () {
this._createVerifyPipeline(this.options)
.on('error', (error) => {
const progressStream = new ProgressStream({
length: this.bytesWritten,
time: 500
})
progressStream.resume()
progressStream.on('progress', (state) => {
state.type = 'check'
state.totalSpeed = 0
state.active = 0
this.destinations.forEach((destination) => {
if (!destination.verified && !destination.error) {
state.totalSpeed += destination.progress.state.speed
state.active += 1
}
})
state.speed = state.active
? state.totalSpeed / state.active
: state.active
this.emit('progress', state)
})
this.destinations.forEach((destination) => {
// Don't verify errored destinations
if (destination.error || !destination.stream) {
return
}
const pipeline = this._createVerifyPipeline(destination)
pipeline.on('error', (error) => {
this.emit('error', error)
})
.on('checksum', (checksum) => {
pipeline.on('checksum', (checksum) => {
debug('verify:checksum', this.checksum, '==', checksum)
destination.checksum = checksum
if (!_.isEqual(this.checksum, checksum)) {
const error = new Error(`Verification failed: ${JSON.stringify(this.checksum)} != ${JSON.stringify(checksum)}`)
error.code = 'EVALIDATION'
destination.error = error
this.emit('error', error)
}
this._finish()
})
.on('finish', () => {
debug('verify:end')
// NOTE: As the 'checksum' event only happens after
// the 'finish' event, we `._finish()` there instead of here
pipeline.on('finish', () => {
debug('verify:finish')
destination.verified = true
destination.progress = null
destination.stream = null
let finishedCount = 0
this.destinations.forEach((dest) => {
finishedCount += (dest.error || dest.verified) ? 1 : 0
})
if (finishedCount === this.destinations.size) {
debug('verify:complete')
progressStream.end()
this._finish()
}
})
// NOTE: Normally we'd use `pipeline.pipe(progressStream)` here,
// but that leads to degraded performance
pipeline.on('readable', function () {
let chunk = null
while ((chunk = this.read())) {
progressStream.write(chunk)
}
})
})
return this
}
@ -365,23 +531,43 @@ class ImageWriter extends EventEmitter {
*/
_cleanup (callback) {
debug('state:cleanup')
fs.close(this.options.fd, (closeError) => {
debug('state:cleanup', closeError ? 'NOT OK' : 'OK')
if (!this.options.unmountOnSuccess) {
callback.call(this, closeError)
return
}
const tasks = []
// Closing a file descriptor on a drive containing mountable
// partitions causes macOS to mount the drive. If we try to
// unmount too quickly, then the drive might get re-mounted
// right afterwards.
setTimeout(() => {
mountutils.unmountDisk(this.destinationDevice.device, (error) => {
debug('state:cleanup', error ? 'NOT OK' : 'OK')
callback.call(this, error)
})
}, UNMOUNT_ON_SUCCESS_TIMEOUT_MS)
this.destinations.forEach((destination) => {
tasks.push((next) => {
runSeries([
(done) => {
if (destination.fd) {
fs.close(destination.fd, done)
destination.fd = null
} else {
done()
}
},
(done) => {
if (!this.unmountOnSuccess) {
done()
return
}
// Closing a file descriptor on a drive containing mountable
// partitions causes macOS to mount the drive. If we try to
// unmount too quickly, then the drive might get re-mounted
// right afterwards.
setTimeout(() => {
mountutils.unmountDisk(destination.device.device, (error) => {
debug('state:cleanup', error ? 'NOT OK' : 'OK')
done(error)
})
}, UNMOUNT_ON_SUCCESS_TIMEOUT_MS)
}
], next)
})
})
runParallel(tasks, (resultErrors, results) => {
debug('state:cleanup', resultErrors)
callback.call(this, resultErrors)
})
}
@ -393,8 +579,8 @@ class ImageWriter extends EventEmitter {
*/
_finish () {
this._cleanup(() => {
this.finished = true
this.emit('finish', {
drive: this.destinationDevice,
bytesRead: this.bytesRead,
bytesWritten: this.bytesWritten,
checksum: this.checksum
@ -405,28 +591,17 @@ class ImageWriter extends EventEmitter {
/**
* @summary Creates a write pipeline from given options
* @private
* @param {Object} options - options
* @param {Object} options.image - source image
* @param {Number} [options.fd] - destination file descriptor
* @param {String} [options.path] - destination file path
* @param {String} [options.flags] - destination file open flags
* @param {String} [options.mode] - destination file mode
* @returns {Pipage} pipeline
* @example
* this._createWritePipeline({
* image: sourceImage,
* path: '/dev/rdisk2'
* })
* this._createWritePipeline()
*/
_createWritePipeline (options) {
_createWritePipeline () {
const pipeline = new Pipage({
readableObjectMode: true
})
const image = options.image
const source = image.stream
const progressOptions = {
length: image.size.original,
length: this.source.size.original,
time: 500
}
@ -434,66 +609,94 @@ class ImageWriter extends EventEmitter {
// If the final size is an estimation,
// use the original source size for progress metering
if (image.size.final.estimation) {
if (this.source.size.final.estimation) {
progressStream = new ProgressStream(progressOptions)
pipeline.append(progressStream)
}
const isPassThrough = image.transform instanceof stream.PassThrough
const isPassThrough = this.source.transform instanceof stream.PassThrough
// If the image transform is a pass-through,
// ignore it to save on the overhead
if (image.transform && !isPassThrough) {
pipeline.append(image.transform)
if (this.source.transform && !isPassThrough) {
pipeline.append(this.source.transform)
}
// If the final size is known precisely and we're not
// using block maps, then use the final size for progress
if (!image.size.final.estimation && !image.bmap) {
progressOptions.length = image.size.final.value
if (!this.source.size.final.estimation && !this.source.bmap) {
progressOptions.length = this.source.size.final.value
progressStream = new ProgressStream(progressOptions)
pipeline.append(progressStream)
}
if (image.bmap) {
const blockMap = BlockMap.parse(image.bmap)
if (this.source.bmap) {
const blockMap = BlockMap.parse(this.source.bmap)
debug('write:bmap', blockMap)
progressStream = new ProgressStream(progressOptions)
pipeline.append(progressStream)
pipeline.append(new BlockMap.FilterStream(blockMap))
} else {
debug('write:blockstream')
const checksumStream = new ChecksumStream({
objectMode: true,
algorithms: options.checksumAlgorithms
})
pipeline.append(new BlockStream())
pipeline.append(checksumStream)
pipeline.bind(checksumStream, 'checksum')
if (this.verifyChecksums) {
const checksumStream = new ChecksumStream({
objectMode: true,
algorithms: this.checksumAlgorithms
})
pipeline.append(checksumStream)
pipeline.bind(checksumStream, 'checksum')
}
}
const target = new BlockWriteStream({
fd: options.fd,
autoClose: false
this.destinations.forEach((destination) => {
if (destination.error) {
debug('pipeline:skip', destination.device.device)
return
}
destination.stream = new BlockWriteStream({
fd: destination.fd,
autoClose: false
})
destination.stream.once('finish', () => {
debug('finish:unpipe', destination.device.device)
destination.finished = true
pipeline.emit('finish', destination)
pipeline.unpipe(destination.stream)
})
destination.stream.once('error', (error) => {
debug('error:unpipe', destination.device.device)
destination.error = error
destination.finished = true
pipeline.unpipe(destination.stream)
})
pipeline.bind(destination.stream, 'error')
pipeline.pipe(destination.stream)
})
// Pipeline.bind(progressStream, 'progress');
progressStream.on('progress', (state) => {
state.device = options.path
state.type = 'write'
state.speed = target.speed
state.totalSpeed = 0
state.active = 0
this.destinations.forEach((destination) => {
if (!destination.finished && !destination.error) {
state.totalSpeed += destination.stream.speed
state.active += 1
}
})
state.speed = state.active
? state.totalSpeed / state.active
: state.active
this.emit('progress', state)
})
pipeline.bind(source, 'error')
pipeline.bind(target, 'error')
source.pipe(pipeline)
.pipe(target)
this.source = source
this.pipeline = pipeline
this.target = target
pipeline.bind(this.source.stream, 'error')
this.source.stream.pipe(pipeline)
return pipeline
}
@ -501,25 +704,18 @@ class ImageWriter extends EventEmitter {
/**
* @summary Creates a verification pipeline from given options
* @private
* @param {Object} options - options
* @param {Object} options.image - image
* @param {Number} [options.fd] - file descriptor
* @param {String} [options.path] - file path
* @param {String} [options.flags] - file open flags
* @param {String} [options.mode] - file mode
* @param {Object} destination - the destination object
* @returns {Pipage} pipeline
* @example
* this._createVerifyPipeline({
* path: '/dev/rdisk2'
* })
* this._createVerifyPipeline()
*/
_createVerifyPipeline (options) {
_createVerifyPipeline (destination) {
const pipeline = new Pipage()
let size = this.bytesWritten
let size = destination.stream.bytesWritten
if (!options.image.size.final.estimation) {
size = Math.max(this.bytesWritten, options.image.size.final.value)
if (!this.source.size.final.estimation) {
size = Math.max(size, this.source.size.final.value)
}
const progressStream = new ProgressStream({
@ -529,9 +725,9 @@ class ImageWriter extends EventEmitter {
pipeline.append(progressStream)
if (options.image.bmap) {
if (this.source.bmap) {
debug('verify:bmap')
const blockMap = BlockMap.parse(options.image.bmap)
const blockMap = BlockMap.parse(this.source.bmap)
const blockMapStream = new BlockMap.FilterStream(blockMap)
pipeline.append(blockMapStream)
@ -543,14 +739,14 @@ class ImageWriter extends EventEmitter {
})
} else {
const checksumStream = new ChecksumStream({
algorithms: options.checksumAlgorithms
algorithms: this.checksumAlgorithms
})
pipeline.append(checksumStream)
pipeline.bind(checksumStream, 'checksum')
}
const source = new BlockReadStream({
fd: options.fd,
fd: destination.fd,
autoClose: false,
start: 0,
end: size
@ -558,17 +754,8 @@ class ImageWriter extends EventEmitter {
pipeline.bind(source, 'error')
progressStream.on('progress', (state) => {
state.device = options.path
state.type = 'check'
this.emit('progress', state)
})
this.target = null
this.source = source
this.pipeline = pipeline
source.pipe(pipeline).resume()
destination.stream = source.pipe(pipeline)
destination.progress = progressStream
return pipeline
}

View File

@ -29,7 +29,7 @@ describe('Browser: imageWriter', () => {
sourceChecksum: '1234'
})
imageWriter.flash('foo.img', '/dev/disk2').finally(() => {
imageWriter.flash('foo.img', [ '/dev/disk2' ]).finally(() => {
m.chai.expect(flashState.isFlashing()).to.be.false
})
})
@ -40,18 +40,18 @@ describe('Browser: imageWriter', () => {
sourceChecksum: '1234'
})
const writing = imageWriter.flash('foo.img', '/dev/disk2')
imageWriter.flash('foo.img', '/dev/disk2').catch(angular.noop)
const writing = imageWriter.flash('foo.img', [ '/dev/disk2' ])
imageWriter.flash('foo.img', [ '/dev/disk2' ]).catch(angular.noop)
writing.finally(() => {
m.chai.expect(this.performWriteStub).to.have.been.calledOnce
})
})
it('should reject the second flash attempt', () => {
imageWriter.flash('foo.img', '/dev/disk2')
imageWriter.flash('foo.img', [ '/dev/disk2' ])
let rejectError = null
imageWriter.flash('foo.img', '/dev/disk2').catch((error) => {
imageWriter.flash('foo.img', [ '/dev/disk2' ]).catch((error) => {
rejectError = error
}).finally(() => {
m.chai.expect(rejectError).to.be.an.instanceof(Error)
@ -73,13 +73,13 @@ describe('Browser: imageWriter', () => {
})
it('should set flashing to false when done', () => {
imageWriter.flash('foo.img', '/dev/disk2').catch(angular.noop).finally(() => {
imageWriter.flash('foo.img', [ '/dev/disk2' ]).catch(angular.noop).finally(() => {
m.chai.expect(flashState.isFlashing()).to.be.false
})
})
it('should set the error code in the flash results', () => {
imageWriter.flash('foo.img', '/dev/disk2').catch(angular.noop).finally(() => {
imageWriter.flash('foo.img', [ '/dev/disk2' ]).catch(angular.noop).finally(() => {
const flashResults = flashState.getFlashResults()
m.chai.expect(flashResults.errorCode).to.equal('FOO')
})
@ -92,7 +92,7 @@ describe('Browser: imageWriter', () => {
})
let rejection
imageWriter.flash('foo.img', '/dev/disk2').catch((error) => {
imageWriter.flash('foo.img', [ '/dev/disk2' ]).catch((error) => {
rejection = error
}).finally(() => {
m.chai.expect(rejection).to.be.an.instanceof(Error)