mirror of
https://github.com/balena-io/etcher.git
synced 2025-04-27 00:37:18 +00:00

We add a cancel button next to the flash progress bar that gracefully aborts the flash process. Closes: https://github.com/resin-io/etcher/issues/1791 Closes: https://github.com/resin-io/etcher/issues/2234 Closes: https://github.com/resin-io/etcher/issues/2245 Change-Type: patch Changelog-Entry: Add a button to cancel the flash process.
826 lines
22 KiB
JavaScript
826 lines
22 KiB
JavaScript
/*
|
|
* Copyright 2017 resin.io
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
'use strict'
|
|
|
|
const os = require('os')
|
|
const fs = require('fs')
|
|
const EventEmitter = require('events').EventEmitter
|
|
const mountutils = require('mountutils')
|
|
const drivelist = require('drivelist')
|
|
const stream = require('readable-stream')
|
|
const Pipage = require('pipage')
|
|
const BlockMap = require('blockmap')
|
|
const BlockStream = require('./block-stream')
|
|
const BlockWriteStream = require('./block-write-stream')
|
|
const BlockReadStream = require('./block-read-stream')
|
|
const ChecksumStream = require('./checksum-stream')
|
|
const ProgressStream = require('./progress-stream')
|
|
const imageStream = require('../image-stream')
|
|
const diskpart = require('../../cli/diskpart')
|
|
const constraints = require('../../shared/drive-constraints')
|
|
const errors = require('../../shared/errors')
|
|
const debug = require('debug')('etcher:writer')
|
|
const _ = require('lodash')
|
|
|
|
/* eslint-disable prefer-reflect */
|
|
/* eslint-disable callback-return */
|
|
|
|
/**
|
|
* @summary Timeout, in milliseconds, to wait before unmounting on success
|
|
* @constant
|
|
* @type {Number}
|
|
*/
|
|
const UNMOUNT_ON_SUCCESS_TIMEOUT_MS = 2000
|
|
|
|
/**
|
|
* @summary Helper function to run a set of async tasks in sequence
|
|
* @private
|
|
* @param {Array<Function>} tasks - set of tasks
|
|
* @param {Function} callback - callback(error)
|
|
* @example
|
|
* runSeries([
|
|
* (next) => first(next),
|
|
* (next) => second(next),
|
|
* ], (error) => {
|
|
* // ...
|
|
* })
|
|
*/
|
|
const runSeries = (tasks, callback) => {
|
|
/**
|
|
* @summary Task runner
|
|
* @param {Error} [error] - error
|
|
* @example
|
|
* run()
|
|
*/
|
|
const run = (error) => {
|
|
const task = tasks.shift()
|
|
if (error || task == null) {
|
|
callback(error)
|
|
return
|
|
}
|
|
task(run)
|
|
}
|
|
|
|
run()
|
|
}
|
|
|
|
/**
|
|
* @summary Helper function to run a set of async tasks in sequence
|
|
* @private
|
|
* @param {Array<Function>} tasks - set of tasks
|
|
* @param {Function} callback - callback(error)
|
|
* @example
|
|
* runParallel([
|
|
* (next) => first(next),
|
|
* (next) => second(next),
|
|
* ], (error) => {
|
|
* // ...
|
|
* })
|
|
*/
|
|
const runParallel = (tasks, callback) => {
|
|
let count = tasks.length
|
|
const resultErrors = new Array(count).fill(null)
|
|
const results = new Array(count).fill(null)
|
|
|
|
tasks.forEach((task, index) => {
|
|
task((error, result) => {
|
|
count -= 1
|
|
resultErrors[index] = error
|
|
results[index] = result
|
|
if (count === 0) {
|
|
callback(resultErrors, results)
|
|
}
|
|
})
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @summary ImageWriter class
|
|
* @class
|
|
*/
|
|
class ImageWriter extends EventEmitter {
|
|
/**
|
|
* @summary ImageWriter constructor
|
|
* @param {Object} options - options
|
|
* @param {Boolean} options.verify - whether to verify the dest
|
|
* @param {Boolean} options.unmountOnSuccess - whether to unmount the dest after flashing
|
|
* @param {Array<String>} options.checksumAlgorithms - checksums to calculate
|
|
* @example
|
|
* new ImageWriter(options)
|
|
*/
|
|
constructor (options) {
|
|
options = options || {}
|
|
super()
|
|
|
|
debug('new', options)
|
|
|
|
this.unmountOnSuccess = Boolean(options.unmountOnSuccess)
|
|
this.verifyChecksums = Boolean(options.verify)
|
|
this.checksumAlgorithms = options.checksumAlgorithms || []
|
|
|
|
this.source = null
|
|
this.pipeline = null
|
|
this.destinations = new Map()
|
|
|
|
this.finished = false
|
|
this.hadError = false
|
|
|
|
this.bytesRead = 0
|
|
this.bytesWritten = 0
|
|
this.checksum = {}
|
|
|
|
this.once('error', () => {
|
|
this.hadError = true
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @summary Verify that the selected destination devices exist
|
|
* @param {Array<String>} paths - target device paths
|
|
* @param {Function} callback - callback(error)
|
|
* @private
|
|
* @example
|
|
* writer.getSelectedDevices(['/dev/disk2'], (error, destinations) => {
|
|
* // ...
|
|
* })
|
|
*/
|
|
getSelectedDevices (paths, callback) {
|
|
debug('state:device-select', paths)
|
|
drivelist.list((error, drives) => {
|
|
debug('state:device-select', paths, error ? 'NOT OK' : 'OK')
|
|
|
|
if (error) {
|
|
callback.call(this, error)
|
|
return
|
|
}
|
|
|
|
const results = paths.map((path) => {
|
|
const destination = {
|
|
fd: null,
|
|
error: null,
|
|
stream: null,
|
|
finished: false,
|
|
verified: false,
|
|
device: _.find(drives, {
|
|
device: path
|
|
})
|
|
}
|
|
|
|
if (!destination.device) {
|
|
const selectionError = errors.createUserError({
|
|
title: `The selected drive "${path}" was not found`,
|
|
description: `We can't find "${path}" in your system. Did you unplug the drive?`,
|
|
code: 'EUNPLUGGED'
|
|
})
|
|
debug('state:device-select', destination, 'NOT OK')
|
|
destination.error = selectionError
|
|
}
|
|
|
|
return destination
|
|
})
|
|
|
|
callback.call(this, null, results)
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @summary Unmount the destination device
|
|
* @param {Object} destination - destination object
|
|
* @param {Function} callback - callback(error)
|
|
* @private
|
|
* @example
|
|
* writer.unmountDevice((error) => {
|
|
* // ...
|
|
* })
|
|
*/
|
|
unmountDevice (destination, callback) {
|
|
if (os.platform() === 'win32') {
|
|
callback.call(this)
|
|
return
|
|
}
|
|
|
|
debug('state:unmount', destination.device.device)
|
|
|
|
mountutils.unmountDisk(destination.device.device, (error) => {
|
|
debug('state:unmount', destination.device.device, error ? 'NOT OK' : 'OK')
|
|
destination.error = error
|
|
callback.call(this, error)
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @summary Clean a device's partition table
|
|
* @param {Object} destination - destination object
|
|
* @param {Function} callback - callback(error)
|
|
* @private
|
|
* @example
|
|
* writer.removePartitionTable((error) => {
|
|
* // ...
|
|
* })
|
|
*/
|
|
removePartitionTable (destination, callback) {
|
|
if (os.platform() !== 'win32') {
|
|
callback.call(this)
|
|
return
|
|
}
|
|
|
|
debug('state:clean', destination.device.device)
|
|
|
|
diskpart.clean(destination.device.device).asCallback((error) => {
|
|
debug('state:clean', destination.device.device, error ? 'NOT OK' : 'OK')
|
|
destination.error = error
|
|
callback.call(this, error)
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @summary Open the source for reading
|
|
* @param {String} imagePath - path to source image
|
|
* @param {Function} callback - callback(error)
|
|
* @private
|
|
* @example
|
|
* writer.openSource('path/to/image.img', (error, source) => {
|
|
* // ...
|
|
* })
|
|
*/
|
|
openSource (imagePath, callback) {
|
|
debug('state:source-open', imagePath)
|
|
imageStream.getFromFilePath(imagePath).asCallback((error, image) => {
|
|
debug('state:source-open', imagePath, error ? 'NOT OK' : 'OK')
|
|
this.source = image
|
|
callback.call(this, error, this.source)
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @summary Open the destination for writing
|
|
* @param {Object} destination - destination object
|
|
* @param {Function} callback - callback(error)
|
|
* @private
|
|
* @example
|
|
* writer.openDestination((error) => {
|
|
* // ...
|
|
* })
|
|
*/
|
|
openDestination (destination, callback) {
|
|
debug('state:destination-open', destination.device.raw)
|
|
|
|
/* eslint-disable no-bitwise */
|
|
const flags = fs.constants.O_RDWR |
|
|
fs.constants.O_NONBLOCK |
|
|
fs.constants.O_SYNC
|
|
/* eslint-enable no-bitwise */
|
|
|
|
fs.open(destination.device.raw, flags, (error, fd) => {
|
|
debug('state:destination-open', destination.device.raw, error ? 'NOT OK' : 'OK')
|
|
destination.fd = fd
|
|
destination.error = error
|
|
callback.call(this, error)
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @summary Check a destination against the drive constraints
|
|
* @param {Object} destination - destination object
|
|
* @param {Function} callback - callback(error)
|
|
* @example
|
|
* this.checkDriveConstraints(destination, (error) => {
|
|
* // ...
|
|
* })
|
|
*/
|
|
checkDriveConstraints (destination, callback) {
|
|
if (!constraints.isDriveLargeEnough(destination.device, this.source)) {
|
|
destination.error = errors.createUserError({
|
|
title: 'The image you selected is too big for this drive',
|
|
description: 'Please connect a bigger drive and try again'
|
|
})
|
|
}
|
|
|
|
callback.call(this, destination.error)
|
|
}
|
|
|
|
/**
|
|
* @summary Start the flashing process
|
|
* @param {String} imagePath - path to source image
|
|
* @param {Array<String>} destinationPaths - paths to target devices
|
|
* @returns {ImageWriter} imageWriter
|
|
* @example
|
|
* imageWriter.write(source, destinations)
|
|
* .on('error', reject)
|
|
* .on('progress', onProgress)
|
|
* .on('finish', resolve)
|
|
*/
|
|
write (imagePath, destinationPaths) {
|
|
// Open the source image
|
|
this.openSource(imagePath, (openError, source) => {
|
|
if (openError) {
|
|
this.emit('error', openError)
|
|
return
|
|
}
|
|
|
|
// Open & prepare target devices
|
|
this.getSelectedDevices(destinationPaths, (error, destinations) => {
|
|
if (error) {
|
|
this.emit('error', error)
|
|
return
|
|
}
|
|
|
|
const notFound = _.find(destinations, (destination) => {
|
|
return Boolean(destination.error)
|
|
})
|
|
|
|
if (notFound) {
|
|
this.emit('error', notFound.error)
|
|
return
|
|
}
|
|
|
|
// Generate preparation tasks for all destinations
|
|
const tasks = destinations.map((destination) => {
|
|
destination.verified = !this.verifyChecksums
|
|
this.destinations.set(destination.device.device, destination)
|
|
return (next) => {
|
|
runSeries([
|
|
(done) => { this.checkDriveConstraints(destination, done) },
|
|
(done) => { this.unmountDevice(destination, done) },
|
|
(done) => { this.removePartitionTable(destination, done) },
|
|
(done) => { this.openDestination(destination, done) }
|
|
], () => {
|
|
if (destination.error) {
|
|
this.emit('fail', { device: destination.device.device, error: destination.error })
|
|
}
|
|
next(destination.error, destination)
|
|
})
|
|
}
|
|
})
|
|
|
|
// Run the preparation tasks in parallel for each destination
|
|
runParallel(tasks, (resultErrors, results) => {
|
|
// We can start (theoretically) flashing now...
|
|
debug('write:prep:done', resultErrors)
|
|
if (_.every(resultErrors, _.identity)) {
|
|
this.emit('error', resultErrors[0])
|
|
} else {
|
|
this._write()
|
|
}
|
|
})
|
|
})
|
|
})
|
|
|
|
return this
|
|
}
|
|
|
|
/**
|
|
* @summary Internal progress state handler
|
|
* @param {Object} state - progress state
|
|
* @example
|
|
* pipeline.on('progress', (state) => {
|
|
* // ...
|
|
* this._onProgress(state)
|
|
* })
|
|
*/
|
|
_onProgress (state) {
|
|
state.totalSpeed = 0
|
|
state.active = 0
|
|
|
|
state.flashing = 0
|
|
state.verifying = 0
|
|
state.failed = 0
|
|
state.successful = 0
|
|
|
|
this.destinations.forEach((dest) => {
|
|
state.flashing += !dest.error && !dest.finished ? 1 : 0
|
|
state.verifying += !dest.error && dest.finished && !dest.verified ? 1 : 0
|
|
state.failed += dest.error ? 1 : 0
|
|
state.successful += !dest.error && dest.finished && (dest.verified || !this.verifyChecksums) ? 1 : 0
|
|
if (!(dest.finished && dest.verified) && !dest.error) {
|
|
state.totalSpeed += state.type === 'write'
|
|
? (dest.stream.speed || 0)
|
|
: (dest.progress.state.speed || 0)
|
|
state.active += 1
|
|
}
|
|
})
|
|
|
|
state.speed = state.active
|
|
? state.totalSpeed / state.active
|
|
: state.totalSpeed
|
|
|
|
state.eta = state.speed ? state.remaining / state.speed : 0
|
|
|
|
this.emit('progress', state)
|
|
}
|
|
|
|
/**
|
|
* @summary Start the writing process
|
|
* @returns {ImageWriter} imageWriter
|
|
* @example
|
|
* imageWriter.write()
|
|
*/
|
|
_write () {
|
|
this.pipeline = this._createWritePipeline()
|
|
|
|
this.pipeline.on('checksum', (checksum) => {
|
|
debug('write:checksum', checksum)
|
|
this.checksum = checksum
|
|
})
|
|
|
|
this.pipeline.on('error', (error) => {
|
|
this.emit('error', error)
|
|
})
|
|
|
|
this.pipeline.on('complete', (destination) => {
|
|
this.bytesRead = this.source.bytesRead
|
|
|
|
let finishedCount = 0
|
|
let errorCount = 0
|
|
|
|
this.destinations.forEach((dest) => {
|
|
finishedCount += dest.finished ? 1 : 0
|
|
errorCount += dest.error ? 1 : 0
|
|
})
|
|
|
|
debug('write:finish', finishedCount, '/', this.destinations.size)
|
|
|
|
if (_.has(destination, [ 'stream' ])) {
|
|
this.bytesWritten += destination.stream.bytesWritten
|
|
}
|
|
|
|
if (finishedCount === this.destinations.size) {
|
|
if (errorCount === this.destinations.size) {
|
|
this.emit('error', destination.error)
|
|
this._finish()
|
|
} else if (this.verifyChecksums) {
|
|
debug('write:verify')
|
|
this.verify()
|
|
} else {
|
|
debug('write:finish')
|
|
this._finish()
|
|
}
|
|
}
|
|
})
|
|
|
|
return this
|
|
}
|
|
|
|
/**
|
|
* @summary Start the writing process
|
|
* @returns {ImageWriter} imageWriter
|
|
* @example
|
|
* imageWriter.verify()
|
|
*/
|
|
verify () {
|
|
let bytesWritten = 0
|
|
|
|
// NOTE: We can't re-use `this.bytesWritten` here, as that will
|
|
// included bytes of streams that may have errored part way through
|
|
this.destinations.forEach((destination) => {
|
|
// Don't count errored destinations
|
|
if (destination.error || !destination.stream) {
|
|
return
|
|
}
|
|
bytesWritten += destination.stream.bytesWritten
|
|
})
|
|
|
|
const progressStream = new ProgressStream({
|
|
length: bytesWritten,
|
|
time: 500
|
|
})
|
|
|
|
progressStream.resume()
|
|
|
|
progressStream.on('progress', (state) => {
|
|
state.type = 'check'
|
|
this._onProgress(state)
|
|
})
|
|
|
|
this.destinations.forEach((destination) => {
|
|
// Don't verify errored destinations
|
|
if (destination.error || !destination.stream) {
|
|
return
|
|
}
|
|
|
|
const pipeline = this._createVerifyPipeline(destination)
|
|
|
|
pipeline.on('error', (error) => {
|
|
this.emit('error', error)
|
|
})
|
|
|
|
pipeline.on('checksum', (checksum) => {
|
|
debug('verify:checksum', this.checksum, '==', checksum)
|
|
destination.checksum = checksum
|
|
if (!_.isEqual(this.checksum, checksum)) {
|
|
const error = new Error(`Verification failed: ${JSON.stringify(this.checksum)} != ${JSON.stringify(checksum)}`)
|
|
error.code = 'EVALIDATION'
|
|
destination.error = error
|
|
this.emit('fail', { device: destination.device.device, error })
|
|
}
|
|
})
|
|
|
|
pipeline.on('finish', () => {
|
|
debug('verify:finish')
|
|
|
|
destination.verified = true
|
|
destination.progress = null
|
|
destination.stream = null
|
|
|
|
let finishedCount = 0
|
|
|
|
this.destinations.forEach((dest) => {
|
|
finishedCount += (dest.error || dest.verified) ? 1 : 0
|
|
})
|
|
|
|
if (finishedCount === this.destinations.size) {
|
|
debug('verify:complete')
|
|
progressStream.end()
|
|
this._finish()
|
|
}
|
|
})
|
|
|
|
// NOTE: Normally we'd use `pipeline.pipe(progressStream)` here,
|
|
// but that leads to degraded performance
|
|
pipeline.on('readable', function () {
|
|
let chunk = null
|
|
while ((chunk = this.read())) {
|
|
progressStream.write(chunk)
|
|
}
|
|
})
|
|
})
|
|
|
|
return this
|
|
}
|
|
|
|
/**
|
|
* @summary Abort the flashing process
|
|
* @example
|
|
* imageWriter.abort()
|
|
*/
|
|
abort () {
|
|
if (this.source && this.source.stream) {
|
|
this.source.stream.destroy()
|
|
}
|
|
this.emit('abort')
|
|
}
|
|
|
|
/**
|
|
* @summary Cleanup after writing; close file descriptors & unmount
|
|
* @param {Function} callback - callback(error)
|
|
* @private
|
|
* @example
|
|
* writer._cleanup((error) => {
|
|
* // ...
|
|
* })
|
|
*/
|
|
_cleanup (callback) {
|
|
debug('state:cleanup')
|
|
const tasks = []
|
|
|
|
this.destinations.forEach((destination) => {
|
|
tasks.push((next) => {
|
|
runSeries([
|
|
(done) => {
|
|
if (destination.fd) {
|
|
fs.close(destination.fd, done)
|
|
destination.fd = null
|
|
} else {
|
|
done()
|
|
}
|
|
},
|
|
(done) => {
|
|
if (!this.unmountOnSuccess) {
|
|
done()
|
|
return
|
|
}
|
|
|
|
// Closing a file descriptor on a drive containing mountable
|
|
// partitions causes macOS to mount the drive. If we try to
|
|
// unmount too quickly, then the drive might get re-mounted
|
|
// right afterwards.
|
|
setTimeout(() => {
|
|
mountutils.unmountDisk(destination.device.device, (error) => {
|
|
debug('state:cleanup', error ? 'NOT OK' : 'OK')
|
|
done(error)
|
|
})
|
|
}, UNMOUNT_ON_SUCCESS_TIMEOUT_MS)
|
|
}
|
|
], next)
|
|
})
|
|
})
|
|
|
|
runParallel(tasks, (resultErrors, results) => {
|
|
debug('state:cleanup', resultErrors)
|
|
callback.call(this, resultErrors)
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @summary Emits the `finish` event with state metadata
|
|
* @private
|
|
* @example
|
|
* this._finish()
|
|
*/
|
|
_finish () {
|
|
this._cleanup(() => {
|
|
const failures = []
|
|
let successful = 0
|
|
let failed = 0
|
|
|
|
this.finished = true
|
|
|
|
this.destinations.forEach((dest) => {
|
|
successful += dest.finished && dest.verified && !dest.error ? 1 : 0
|
|
failed += dest.error ? 1 : 0
|
|
if (dest.error) {
|
|
dest.error.device = dest.device.device
|
|
failures.push(dest.error)
|
|
}
|
|
})
|
|
|
|
this.emit('finish', {
|
|
devices: { successful, failed },
|
|
bytesRead: this.bytesRead,
|
|
bytesWritten: this.bytesWritten,
|
|
checksum: this.checksum,
|
|
errors: failures
|
|
})
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @summary Creates a write pipeline from given options
|
|
* @private
|
|
* @returns {Pipage} pipeline
|
|
* @example
|
|
* this._createWritePipeline()
|
|
*/
|
|
_createWritePipeline () {
|
|
const pipeline = new Pipage({
|
|
readableObjectMode: true
|
|
})
|
|
|
|
const progressOptions = {
|
|
length: this.source.size.original,
|
|
time: 500
|
|
}
|
|
|
|
let progressStream = null
|
|
|
|
// If the final size is an estimation,
|
|
// use the original source size for progress metering
|
|
if (this.source.size.final.estimation) {
|
|
progressStream = new ProgressStream(progressOptions)
|
|
pipeline.append(progressStream)
|
|
}
|
|
|
|
const isPassThrough = this.source.transform instanceof stream.PassThrough
|
|
|
|
// If the image transform is a pass-through,
|
|
// ignore it to save on the overhead
|
|
if (this.source.transform && !isPassThrough) {
|
|
pipeline.append(this.source.transform)
|
|
}
|
|
|
|
// If the final size is known precisely and we're not
|
|
// using block maps, then use the final size for progress
|
|
if (!this.source.size.final.estimation && !this.source.bmap) {
|
|
progressOptions.length = this.source.size.final.value
|
|
progressStream = new ProgressStream(progressOptions)
|
|
pipeline.append(progressStream)
|
|
}
|
|
|
|
if (this.source.bmap) {
|
|
const blockMap = BlockMap.parse(this.source.bmap)
|
|
debug('write:bmap', blockMap)
|
|
progressStream = new ProgressStream(progressOptions)
|
|
pipeline.append(progressStream)
|
|
pipeline.append(new BlockMap.FilterStream(blockMap))
|
|
} else {
|
|
debug('write:blockstream')
|
|
pipeline.append(new BlockStream())
|
|
if (this.verifyChecksums) {
|
|
const checksumStream = new ChecksumStream({
|
|
objectMode: true,
|
|
algorithms: this.checksumAlgorithms
|
|
})
|
|
pipeline.append(checksumStream)
|
|
pipeline.bind(checksumStream, 'checksum')
|
|
}
|
|
}
|
|
|
|
this.destinations.forEach((destination) => {
|
|
if (destination.error) {
|
|
debug('pipeline:skip', destination.device.device)
|
|
destination.finished = true
|
|
return
|
|
}
|
|
|
|
destination.stream = new BlockWriteStream({
|
|
fd: destination.fd,
|
|
autoClose: false
|
|
})
|
|
|
|
destination.stream.on('finish', () => {
|
|
debug('finish:unpipe', destination.device.device)
|
|
destination.finished = true
|
|
pipeline.emit('complete', destination)
|
|
pipeline.unpipe(destination.stream)
|
|
})
|
|
|
|
destination.stream.on('error', (error) => {
|
|
debug('error:unpipe', destination.device.device)
|
|
destination.error = error
|
|
destination.finished = true
|
|
pipeline.unpipe(destination.stream)
|
|
this.emit('fail', { device: destination.device.device, error })
|
|
pipeline.emit('complete', destination)
|
|
})
|
|
|
|
pipeline.pipe(destination.stream)
|
|
})
|
|
|
|
// Pipeline.bind(progressStream, 'progress');
|
|
progressStream.on('progress', (state) => {
|
|
state.type = 'write'
|
|
this._onProgress(state)
|
|
})
|
|
|
|
pipeline.bind(this.source.stream, 'error')
|
|
this.source.stream.pipe(pipeline)
|
|
|
|
return pipeline
|
|
}
|
|
|
|
/**
|
|
* @summary Creates a verification pipeline from given options
|
|
* @private
|
|
* @param {Object} destination - the destination object
|
|
* @returns {Pipage} pipeline
|
|
* @example
|
|
* this._createVerifyPipeline()
|
|
*/
|
|
_createVerifyPipeline (destination) {
|
|
const pipeline = new Pipage()
|
|
|
|
let size = destination.stream.bytesWritten
|
|
|
|
if (!this.source.size.final.estimation) {
|
|
size = Math.max(size, this.source.size.final.value)
|
|
}
|
|
|
|
const progressStream = new ProgressStream({
|
|
length: size,
|
|
time: 500
|
|
})
|
|
|
|
pipeline.append(progressStream)
|
|
|
|
if (this.source.bmap) {
|
|
debug('verify:bmap')
|
|
const blockMap = BlockMap.parse(this.source.bmap)
|
|
const blockMapStream = new BlockMap.FilterStream(blockMap)
|
|
pipeline.append(blockMapStream)
|
|
|
|
// NOTE: Because the blockMapStream checksums each range,
|
|
// and doesn't emit a final "checksum" event, we artificially
|
|
// raise one once the stream finishes
|
|
blockMapStream.once('finish', () => {
|
|
pipeline.emit('checksum', {})
|
|
})
|
|
} else {
|
|
const checksumStream = new ChecksumStream({
|
|
algorithms: this.checksumAlgorithms
|
|
})
|
|
pipeline.append(checksumStream)
|
|
pipeline.bind(checksumStream, 'checksum')
|
|
}
|
|
|
|
const source = new BlockReadStream({
|
|
fd: destination.fd,
|
|
autoClose: false,
|
|
start: 0,
|
|
end: size
|
|
})
|
|
|
|
pipeline.bind(source, 'error')
|
|
|
|
destination.stream = source.pipe(pipeline)
|
|
destination.progress = progressStream
|
|
|
|
return pipeline
|
|
}
|
|
}
|
|
|
|
module.exports = ImageWriter
|