etcher/lib/writer/block-read-stream.js
Jonas Hermsmeier 74c0c13fbd
feat(writer): Add read/write retry delays (#1919)
Change-Type: minor
2017-12-15 16:09:11 +01:00

239 lines
5.3 KiB
JavaScript

/*
* Copyright 2017 resin.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict'
const stream = require('readable-stream')
const fs = require('fs')
const debug = require('debug')('block-read-stream')
const errors = require('./error-types')
const CHUNK_SIZE = 64 * 1024
const MIN_CHUNK_SIZE = 512
/**
* @summary I/O retry base timeout, in milliseconds
* @constant
* @type {Number}
*/
const RETRY_BASE_TIMEOUT = 100
/**
* @summary BlockReadStream
* @class
*/
class BlockReadStream extends stream.Readable {
/**
* @summary BlockReadStream constructor
* @param {Object} [options] - options
* @param {Number} [options.fd] - file descriptor
* @param {String} [options.path] - file path
* @param {String} [options.flags] - file open flags
* @param {Number} [options.mode] - file mode
* @param {Number} [options.start] - start offset in bytes
* @param {Number} [options.end] - end offset in bytes
* @param {Boolean} [options.autoClose] - automatically close the stream on end
* @param {Number} [options.maxRetries] - maximum number of retries per read
* @example
* new BlockReadStream()
*/
constructor (options) {
options = Object.assign({}, BlockReadStream.defaults, options)
options.objectMode = true
debug('block-read-stream %j', options)
super(options)
this.fs = options.fs
this.fd = options.fd
this.path = options.path
this.flags = options.flags
this.mode = options.mode
this.end = options.end || Infinity
this.autoClose = options.autoClose
this.maxRetries = options.maxRetries || 5
this.retries = 0
this.position = options.start || 0
this.bytesRead = 0
this.closed = false
this.destroyed = false
this.once('end', function () {
if (this.autoClose) {
this.close()
}
})
/**
* @summary onRead handler
* @param {Error} error - error
* @param {Number} bytesRead - bytes read
* @param {Buffer} buffer - resulting buffer
* @example
* fs.read(fd, buffer, 0, length, position, onRead)
*/
this._onRead = (error, bytesRead, buffer) => {
if (!error && bytesRead !== buffer.length) {
error = new Error(`Bytes read mismatch: ${bytesRead} != ${buffer.length}`)
}
if (error) {
const isTransient = errors.isTransientError(error)
if (isTransient && (this.retries < this.maxRetries)) {
this.retries += 1
setTimeout(() => {
this._read()
}, RETRY_BASE_TIMEOUT * this.retries)
return
} else if (isTransient) {
error.code = 'EUNPLUGGED'
}
if (this.autoClose) {
this.destroy()
}
this.emit('error', error)
return
}
this.retries = 0
this.bytesRead += bytesRead
this.position += buffer.length
this.push(buffer)
}
this.open()
}
/**
* @summary Read a chunk from the source
* @private
* @example
* // not to be called directly
*/
_read () {
// Wait for file handle to be open
if (this.fd == null) {
this.once('open', () => {
this._read()
})
return
}
const toRead = this.end - this.position
if (toRead <= 0) {
this.push(null)
return
}
const length = Math.min(CHUNK_SIZE, Math.max(MIN_CHUNK_SIZE, toRead))
const buffer = Buffer.alloc(length)
this.fs.read(this.fd, buffer, 0, length, this.position, this._onRead)
}
/**
* @summary Open a handle to the file
* @private
* @example
* this.open()
*/
open () {
debug('open')
if (this.fd != null) {
this.emit('open', this.fd)
return
}
this.fs.open(this.path, this.flags, this.mode, (error, fd) => {
if (error) {
if (this.autoClose) {
this.destroy()
}
this.emit('error', error)
} else {
this.fd = fd
this.emit('open', fd)
}
})
}
/**
* @summary Close the underlying resource
* @param {Function} callback - callback(error)
* @example
* blockStream.close((error) => {
* // ...
* })
*/
close (callback) {
debug('close')
if (callback) {
this.once('close', callback)
}
if (this.closed || this.fd == null) {
if (this.fd == null) {
this.once('open', () => {
this.close()
})
} else {
process.nextTick(() => {
this.emit('close')
})
}
return
}
this.closed = true
this.fs.close(this.fd, (error) => {
if (error) {
this.emit('error', error)
} else {
this.emit('close')
}
})
this.fd = null
}
}
/**
* @summary Default options
* @type {Object}
* @constant
*/
BlockReadStream.defaults = {
fs,
fd: null,
path: null,
flags: 'r',
mode: 0o666,
autoClose: true
}
module.exports = BlockReadStream