fix(writer): Add missing read/write-retry handling (#1885)

This adds read- & write-retry handling of potentially temporary errors,
as well as errors due to device disconnection.

Change-Type: patch
Changelog-Entry: Fix handling of temporary read/write errors
This commit is contained in:
Jonas Hermsmeier 2017-12-06 13:58:03 +01:00 committed by GitHub
parent 618440e38f
commit 657142716c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 92 additions and 8 deletions

View File

@ -19,6 +19,7 @@
const stream = require('readable-stream')
const fs = require('fs')
const debug = require('debug')('block-read-stream')
const errors = require('./error-types')
const CHUNK_SIZE = 64 * 1024
const MIN_CHUNK_SIZE = 512
@ -38,6 +39,7 @@ class BlockReadStream extends stream.Readable {
* @param {Number} [options.start] - start offset in bytes
* @param {Number} [options.end] - end offset in bytes
* @param {Boolean} [options.autoClose] - automatically close the stream on end
* @param {Number} [options.maxRetries] - maximum number of retries per read
* @example
* new BlockReadStream()
*/
@ -56,7 +58,9 @@ class BlockReadStream extends stream.Readable {
this.mode = options.mode
this.end = options.end || Infinity
this.autoClose = options.autoClose
this.maxRetries = options.maxRetries || 5
this.retries = 0
this.position = options.start || 0
this.bytesRead = 0
@ -83,14 +87,28 @@ class BlockReadStream extends stream.Readable {
}
if (error) {
const isTransient = errors.isTransientError(error)
if (isTransient && (this.retries < this.maxRetries)) {
this.retries += 1
this._read()
return
} else if (isTransient) {
error.code = 'EUNPLUGGED'
}
if (this.autoClose) {
this.destroy()
}
this.emit('error', error)
return
}
this.retries = 0
this.bytesRead += bytesRead
this.position += buffer.length
this.push(buffer)
}
@ -123,7 +141,6 @@ class BlockReadStream extends stream.Readable {
const buffer = Buffer.alloc(length)
this.fs.read(this.fd, buffer, 0, length, this.position, this._onRead)
this.position += length
}
/**

View File

@ -20,6 +20,7 @@ const stream = require('readable-stream')
const fs = require('fs')
const speedometer = require('speedometer')
const debug = require('debug')('block-write-stream')
const errors = require('./error-types')
const CHUNK_SIZE = 64 * 1024
const UPDATE_INTERVAL_MS = 500
@ -30,13 +31,14 @@ const UPDATE_INTERVAL_MS = 500
*/
class BlockWriteStream extends stream.Writable {
/**
* @summary BlockReadStream constructor
* @summary BlockWriteStream constructor
* @param {Object} [options] - options
* @param {Number} [options.fd] - file descriptor
* @param {String} [options.path] - file path
* @param {String} [options.flags] - file open flags
* @param {Number} [options.mode] - file mode
* @param {Boolean} [options.autoClose] - automatically close the stream on end
* @param {Number} [options.maxRetries] - maximum number of retries per write
* @example
* new BlockWriteStream(options)
*/
@ -56,12 +58,14 @@ class BlockWriteStream extends stream.Writable {
this.flags = options.flags
this.mode = options.mode
this.autoClose = options.autoClose
this.maxRetries = options.maxRetries || 5
this.position = 0
this.bytesRead = 0
this.blocksRead = 0
this.bytesWritten = 0
this.blocksWritten = 0
this.retries = 0
this.meter = speedometer()
this.delta = 0
this.speed = 0
@ -115,8 +119,10 @@ class BlockWriteStream extends stream.Writable {
return
}
if (this.retries === 0) {
this.bytesRead += chunk.length
this.blocksRead += 1
}
if (chunk.position == null) {
chunk.position = this.position
@ -134,10 +140,26 @@ class BlockWriteStream extends stream.Writable {
}
fs.write(this.fd, chunk, 0, chunk.length, chunk.position, (error, bytesWritten) => {
if (!error) {
this.bytesWritten += bytesWritten
this.delta += bytesWritten
this.blocksWritten += 1
this.position += bytesWritten
this.retries = 0
next(error)
return
}
const isTransient = errors.isTransientError(error)
if (isTransient && (this.retries < this.maxRetries)) {
this.retries += 1
this._write(chunk, encoding, next)
return
} else if (isTransient) {
error.code = 'EUNPLUGGED'
}
next(error)
})
}

45
lib/writer/error-types.js Normal file
View File

@ -0,0 +1,45 @@
/*
* Copyright 2017 resin.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict'
module.exports = {
/**
* @summary Determine whether an error is considered a
* transient occurrence, and the operation should be retried
* Errors considered potentially temporary are:
* - Mac OS: ENXIO, EBUSY
* - Windows: ENOENT, UNKNOWN
* - Linux: EIO
* @private
* @param {Error} error - Error
* @returns {Boolean}
* @example
* errors.isTransientError(error)
*/
isTransientError (error) {
if (process.platform === 'darwin') {
return error.code === 'ENXIO' || error.code === 'EBUSY'
} else if (process.platform === 'linux') {
return error.code === 'EIO'
} else if (process.platform === 'win32') {
return error.code === 'ENOENT' || error.code === 'UNKNOWN'
}
return false
}
}