From 657142716c63f83b6059b0e63ec4126b395fe175 Mon Sep 17 00:00:00 2001 From: Jonas Hermsmeier Date: Wed, 6 Dec 2017 13:58:03 +0100 Subject: [PATCH] fix(writer): Add missing read/write-retry handling (#1885) This adds read- & write-retry handling of potentially temporary errors, as well as errors due to device disconnection. Change-Type: patch Changelog-Entry: Fix handling of temporary read/write errors --- lib/writer/block-read-stream.js | 19 +++++++++++++- lib/writer/block-write-stream.js | 36 ++++++++++++++++++++----- lib/writer/error-types.js | 45 ++++++++++++++++++++++++++++++++ 3 files changed, 92 insertions(+), 8 deletions(-) create mode 100644 lib/writer/error-types.js diff --git a/lib/writer/block-read-stream.js b/lib/writer/block-read-stream.js index 2183084c..1af849b5 100644 --- a/lib/writer/block-read-stream.js +++ b/lib/writer/block-read-stream.js @@ -19,6 +19,7 @@ const stream = require('readable-stream') const fs = require('fs') const debug = require('debug')('block-read-stream') +const errors = require('./error-types') const CHUNK_SIZE = 64 * 1024 const MIN_CHUNK_SIZE = 512 @@ -38,6 +39,7 @@ class BlockReadStream extends stream.Readable { * @param {Number} [options.start] - start offset in bytes * @param {Number} [options.end] - end offset in bytes * @param {Boolean} [options.autoClose] - automatically close the stream on end + * @param {Number} [options.maxRetries] - maximum number of retries per read * @example * new BlockReadStream() */ @@ -56,7 +58,9 @@ class BlockReadStream extends stream.Readable { this.mode = options.mode this.end = options.end || Infinity this.autoClose = options.autoClose + this.maxRetries = options.maxRetries || 5 + this.retries = 0 this.position = options.start || 0 this.bytesRead = 0 @@ -83,14 +87,28 @@ class BlockReadStream extends stream.Readable { } if (error) { + const isTransient = errors.isTransientError(error) + + if (isTransient && (this.retries < this.maxRetries)) { + this.retries += 1 + this._read() + return + } else if (isTransient) { + error.code = 'EUNPLUGGED' + } + if (this.autoClose) { this.destroy() } + this.emit('error', error) + return } + this.retries = 0 this.bytesRead += bytesRead + this.position += buffer.length this.push(buffer) } @@ -123,7 +141,6 @@ class BlockReadStream extends stream.Readable { const buffer = Buffer.alloc(length) this.fs.read(this.fd, buffer, 0, length, this.position, this._onRead) - this.position += length } /** diff --git a/lib/writer/block-write-stream.js b/lib/writer/block-write-stream.js index 06051d5c..263c5e96 100644 --- a/lib/writer/block-write-stream.js +++ b/lib/writer/block-write-stream.js @@ -20,6 +20,7 @@ const stream = require('readable-stream') const fs = require('fs') const speedometer = require('speedometer') const debug = require('debug')('block-write-stream') +const errors = require('./error-types') const CHUNK_SIZE = 64 * 1024 const UPDATE_INTERVAL_MS = 500 @@ -30,13 +31,14 @@ const UPDATE_INTERVAL_MS = 500 */ class BlockWriteStream extends stream.Writable { /** - * @summary BlockReadStream constructor + * @summary BlockWriteStream constructor * @param {Object} [options] - options * @param {Number} [options.fd] - file descriptor * @param {String} [options.path] - file path * @param {String} [options.flags] - file open flags * @param {Number} [options.mode] - file mode * @param {Boolean} [options.autoClose] - automatically close the stream on end + * @param {Number} [options.maxRetries] - maximum number of retries per write * @example * new BlockWriteStream(options) */ @@ -56,12 +58,14 @@ class BlockWriteStream extends stream.Writable { this.flags = options.flags this.mode = options.mode this.autoClose = options.autoClose + this.maxRetries = options.maxRetries || 5 this.position = 0 this.bytesRead = 0 this.blocksRead = 0 this.bytesWritten = 0 this.blocksWritten = 0 + this.retries = 0 this.meter = speedometer() this.delta = 0 this.speed = 0 @@ -115,8 +119,10 @@ class BlockWriteStream extends stream.Writable { return } - this.bytesRead += chunk.length - this.blocksRead += 1 + if (this.retries === 0) { + this.bytesRead += chunk.length + this.blocksRead += 1 + } if (chunk.position == null) { chunk.position = this.position @@ -134,10 +140,26 @@ class BlockWriteStream extends stream.Writable { } fs.write(this.fd, chunk, 0, chunk.length, chunk.position, (error, bytesWritten) => { - this.bytesWritten += bytesWritten - this.delta += bytesWritten - this.blocksWritten += 1 - this.position += bytesWritten + if (!error) { + this.bytesWritten += bytesWritten + this.delta += bytesWritten + this.blocksWritten += 1 + this.position += bytesWritten + this.retries = 0 + next(error) + return + } + + const isTransient = errors.isTransientError(error) + + if (isTransient && (this.retries < this.maxRetries)) { + this.retries += 1 + this._write(chunk, encoding, next) + return + } else if (isTransient) { + error.code = 'EUNPLUGGED' + } + next(error) }) } diff --git a/lib/writer/error-types.js b/lib/writer/error-types.js new file mode 100644 index 00000000..cd2bc253 --- /dev/null +++ b/lib/writer/error-types.js @@ -0,0 +1,45 @@ +/* + * Copyright 2017 resin.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict' + +module.exports = { + + /** + * @summary Determine whether an error is considered a + * transient occurrence, and the operation should be retried + * Errors considered potentially temporary are: + * - Mac OS: ENXIO, EBUSY + * - Windows: ENOENT, UNKNOWN + * - Linux: EIO + * @private + * @param {Error} error - Error + * @returns {Boolean} + * @example + * errors.isTransientError(error) + */ + isTransientError (error) { + if (process.platform === 'darwin') { + return error.code === 'ENXIO' || error.code === 'EBUSY' + } else if (process.platform === 'linux') { + return error.code === 'EIO' + } else if (process.platform === 'win32') { + return error.code === 'ENOENT' || error.code === 'UNKNOWN' + } + return false + } + +}