fix(writer): Fix erronous event handling in write pipeline

This fixes the use and handling of events in the write pipeline,
such that the pipeline would not be prematurely stalled or terminated.
Also, a new `fail` event is introduced, to signal non-fatal errors.

Change-Type: patch
This commit is contained in:
Jonas Hermsmeier 2018-04-06 16:01:23 +02:00
parent 65a3e51ff9
commit fe43e21484
No known key found for this signature in database
GPG Key ID: 1B870F801A0CEE9F
5 changed files with 73 additions and 30 deletions

View File

@ -102,17 +102,6 @@ permissions.isElevated().then((elevated) => {
checksumAlgorithms: options.check ? [ 'sha512' ] : [] checksumAlgorithms: options.check ? [ 'sha512' ] : []
}) })
/**
* @summary Error handler
* @param {Error} error - error
* @private
* @example
* writer.on('error', onError)
*/
const onError = function (error) {
console.error(error)
}
/** /**
* @summary Finish handler * @summary Finish handler
* @private * @private
@ -124,7 +113,7 @@ permissions.isElevated().then((elevated) => {
} }
writer.on('progress', onProgress) writer.on('progress', onProgress)
writer.on('error', onError) writer.on('error', reject)
writer.on('finish', onFinish) writer.on('finish', onFinish)
// NOTE: Drive can be (String|Array) // NOTE: Drive can be (String|Array)

View File

@ -126,6 +126,10 @@ exports.performWrite = (image, drives, onProgress) => {
console.log(message) console.log(message)
}) })
ipc.server.on('fail', (error) => {
console.log('Fail:', error)
})
const flashResults = {} const flashResults = {}
ipc.server.on('done', (results) => { ipc.server.on('done', (results) => {
_.merge(flashResults, results) _.merge(flashResults, results)

View File

@ -61,6 +61,8 @@ exports.fromFlashState = (state) => {
return `${state.percentage}% Flashing` return `${state.percentage}% Flashing`
} else if (isValidating) { } else if (isValidating) {
return `${state.percentage}% Validating` return `${state.percentage}% Validating`
} else if (!isFlashing && !isValidating) {
return 'Failed'
} }
throw new Error(`Invalid state: ${JSON.stringify(state)}`) throw new Error(`Invalid state: ${JSON.stringify(state)}`)

View File

@ -154,6 +154,27 @@ ipc.connectTo(IPC_SERVER_ID, () => {
ipc.of[IPC_SERVER_ID].emit('error', error) ipc.of[IPC_SERVER_ID].emit('error', error)
} }
/**
* @summary Failure handler (non-fatal errors)
* @param {Object} event - event data (error & device)
* @example
* writer.on('fail', onFail)
*/
const onFail = (event) => {
ipc.of[IPC_SERVER_ID].emit('fail', {
device: event.device,
error: {
name: event.error.name,
message: event.error.message,
code: event.error.code,
syscall: event.error.syscall,
errno: event.error.errno,
stack: event.error.stack,
stdout: event.error.stdout
}
})
}
const writer = new ImageWriter({ const writer = new ImageWriter({
verify: options.validateWriteOnSuccess, verify: options.validateWriteOnSuccess,
unmountOnSuccess: options.unmountOnSuccess, unmountOnSuccess: options.unmountOnSuccess,
@ -161,6 +182,7 @@ ipc.connectTo(IPC_SERVER_ID, () => {
}) })
writer.on('error', onError) writer.on('error', onError)
writer.on('fail', onFail)
writer.on('progress', onProgress) writer.on('progress', onProgress)
writer.on('finish', onFinish) writer.on('finish', onFinish)

View File

@ -217,6 +217,7 @@ class ImageWriter extends EventEmitter {
mountutils.unmountDisk(destination.device.device, (error) => { mountutils.unmountDisk(destination.device.device, (error) => {
debug('state:unmount', destination.device.device, error ? 'NOT OK' : 'OK') debug('state:unmount', destination.device.device, error ? 'NOT OK' : 'OK')
destination.error = error
callback.call(this, error) callback.call(this, error)
}) })
} }
@ -241,6 +242,7 @@ class ImageWriter extends EventEmitter {
diskpart.clean(destination.device.device).asCallback((error) => { diskpart.clean(destination.device.device).asCallback((error) => {
debug('state:clean', destination.device.device, error ? 'NOT OK' : 'OK') debug('state:clean', destination.device.device, error ? 'NOT OK' : 'OK')
destination.error = error
callback.call(this, error) callback.call(this, error)
}) })
} }
@ -286,12 +288,13 @@ class ImageWriter extends EventEmitter {
fs.open(destination.device.raw, flags, (error, fd) => { fs.open(destination.device.raw, flags, (error, fd) => {
debug('state:destination-open', destination.device.raw, error ? 'NOT OK' : 'OK') debug('state:destination-open', destination.device.raw, error ? 'NOT OK' : 'OK')
destination.fd = fd destination.fd = fd
destination.error = error
callback.call(this, error) callback.call(this, error)
}) })
} }
/** /**
* @summary Check a destinstation against the drive constraints * @summary Check a destination against the drive constraints
* @param {Object} destination - destination object * @param {Object} destination - destination object
* @param {Function} callback - callback(error) * @param {Function} callback - callback(error)
* @example * @example
@ -300,16 +303,14 @@ class ImageWriter extends EventEmitter {
* }) * })
*/ */
checkDriveConstraints (destination, callback) { checkDriveConstraints (destination, callback) {
let error = null
if (!constraints.isDriveLargeEnough(destination.device, this.source)) { if (!constraints.isDriveLargeEnough(destination.device, this.source)) {
error = errors.createUserError({ destination.error = errors.createUserError({
title: 'The image you selected is too big for this drive', title: 'The image you selected is too big for this drive',
description: 'Please connect a bigger drive and try again' description: 'Please connect a bigger drive and try again'
}) })
} }
callback.call(this, error) callback.call(this, destination.error)
} }
/** /**
@ -356,9 +357,11 @@ class ImageWriter extends EventEmitter {
(done) => { this.unmountDevice(destination, done) }, (done) => { this.unmountDevice(destination, done) },
(done) => { this.removePartitionTable(destination, done) }, (done) => { this.removePartitionTable(destination, done) },
(done) => { this.openDestination(destination, done) } (done) => { this.openDestination(destination, done) }
], (preparationError) => { ], () => {
destination.error = preparationError if (destination.error) {
next(preparationError) this.emit('fail', { device: destination.device.device, error: destination.error })
}
next(destination.error, destination)
}) })
} }
}) })
@ -367,7 +370,11 @@ class ImageWriter extends EventEmitter {
runParallel(tasks, (resultErrors, results) => { runParallel(tasks, (resultErrors, results) => {
// We can start (theoretically) flashing now... // We can start (theoretically) flashing now...
debug('write:prep:done', resultErrors) debug('write:prep:done', resultErrors)
this._write() if (_.every(resultErrors, _.identity)) {
this.emit('error', resultErrors[0])
} else {
this._write()
}
}) })
}) })
}) })
@ -433,23 +440,28 @@ class ImageWriter extends EventEmitter {
this.emit('error', error) this.emit('error', error)
}) })
this.pipeline.on('finish', (destination) => { this.pipeline.on('complete', (destination) => {
this.bytesRead = this.source.bytesRead this.bytesRead = this.source.bytesRead
let finishedCount = 0 let finishedCount = 0
let errorCount = 0
this.destinations.forEach((dest) => { this.destinations.forEach((dest) => {
finishedCount += dest.finished ? 1 : 0 finishedCount += dest.finished ? 1 : 0
errorCount += dest.error ? 1 : 0
}) })
debug('write:finish', finishedCount, '/', this.destinations.size) debug('write:finish', finishedCount, '/', this.destinations.size)
if (destination) { if (_.has(destination, [ 'stream' ])) {
this.bytesWritten += destination.stream.bytesWritten this.bytesWritten += destination.stream.bytesWritten
} }
if (finishedCount === this.destinations.size) { if (finishedCount === this.destinations.size) {
if (this.verifyChecksums) { if (errorCount === this.destinations.size) {
this.emit('error', destination.error)
this._finish()
} else if (this.verifyChecksums) {
debug('write:verify') debug('write:verify')
this.verify() this.verify()
} else { } else {
@ -469,8 +481,20 @@ class ImageWriter extends EventEmitter {
* imageWriter.verify() * imageWriter.verify()
*/ */
verify () { verify () {
let bytesWritten = 0
// NOTE: We can't re-use `this.bytesWritten` here, as that will
// included bytes of streams that may have errored part way through
this.destinations.forEach((destination) => {
// Don't count errored destinations
if (destination.error || !destination.stream) {
return
}
bytesWritten += destination.stream.bytesWritten
})
const progressStream = new ProgressStream({ const progressStream = new ProgressStream({
length: this.bytesWritten, length: bytesWritten,
time: 500 time: 500
}) })
@ -500,7 +524,7 @@ class ImageWriter extends EventEmitter {
const error = new Error(`Verification failed: ${JSON.stringify(this.checksum)} != ${JSON.stringify(checksum)}`) const error = new Error(`Verification failed: ${JSON.stringify(this.checksum)} != ${JSON.stringify(checksum)}`)
error.code = 'EVALIDATION' error.code = 'EVALIDATION'
destination.error = error destination.error = error
this.emit('error', error) this.emit('fail', { device: destination.device.device, error })
} }
}) })
@ -681,6 +705,7 @@ class ImageWriter extends EventEmitter {
this.destinations.forEach((destination) => { this.destinations.forEach((destination) => {
if (destination.error) { if (destination.error) {
debug('pipeline:skip', destination.device.device) debug('pipeline:skip', destination.device.device)
destination.finished = true
return return
} }
@ -689,21 +714,22 @@ class ImageWriter extends EventEmitter {
autoClose: false autoClose: false
}) })
destination.stream.once('finish', () => { destination.stream.on('finish', () => {
debug('finish:unpipe', destination.device.device) debug('finish:unpipe', destination.device.device)
destination.finished = true destination.finished = true
pipeline.emit('finish', destination) pipeline.emit('complete', destination)
pipeline.unpipe(destination.stream) pipeline.unpipe(destination.stream)
}) })
destination.stream.once('error', (error) => { destination.stream.on('error', (error) => {
debug('error:unpipe', destination.device.device) debug('error:unpipe', destination.device.device)
destination.error = error destination.error = error
destination.finished = true destination.finished = true
pipeline.unpipe(destination.stream) pipeline.unpipe(destination.stream)
this.emit('fail', { device: destination.device.device, error })
pipeline.emit('complete', destination)
}) })
pipeline.bind(destination.stream, 'error')
pipeline.pipe(destination.stream) pipeline.pipe(destination.stream)
}) })