mirror of
https://github.com/JasonYANG170/IOTConnect-Web.git
synced 2024-11-24 04:36:31 +00:00
192 lines
5.0 KiB
JavaScript
192 lines
5.0 KiB
JavaScript
|
'use strict';
|
||
|
|
||
|
import stream from 'stream';
|
||
|
import utils from '../utils.js';
|
||
|
import throttle from './throttle.js';
|
||
|
import speedometer from './speedometer.js';
|
||
|
|
||
|
const kInternals = Symbol('internals');
|
||
|
|
||
|
class AxiosTransformStream extends stream.Transform{
|
||
|
constructor(options) {
|
||
|
options = utils.toFlatObject(options, {
|
||
|
maxRate: 0,
|
||
|
chunkSize: 64 * 1024,
|
||
|
minChunkSize: 100,
|
||
|
timeWindow: 500,
|
||
|
ticksRate: 2,
|
||
|
samplesCount: 15
|
||
|
}, null, (prop, source) => {
|
||
|
return !utils.isUndefined(source[prop]);
|
||
|
});
|
||
|
|
||
|
super({
|
||
|
readableHighWaterMark: options.chunkSize
|
||
|
});
|
||
|
|
||
|
const self = this;
|
||
|
|
||
|
const internals = this[kInternals] = {
|
||
|
length: options.length,
|
||
|
timeWindow: options.timeWindow,
|
||
|
ticksRate: options.ticksRate,
|
||
|
chunkSize: options.chunkSize,
|
||
|
maxRate: options.maxRate,
|
||
|
minChunkSize: options.minChunkSize,
|
||
|
bytesSeen: 0,
|
||
|
isCaptured: false,
|
||
|
notifiedBytesLoaded: 0,
|
||
|
ts: Date.now(),
|
||
|
bytes: 0,
|
||
|
onReadCallback: null
|
||
|
};
|
||
|
|
||
|
const _speedometer = speedometer(internals.ticksRate * options.samplesCount, internals.timeWindow);
|
||
|
|
||
|
this.on('newListener', event => {
|
||
|
if (event === 'progress') {
|
||
|
if (!internals.isCaptured) {
|
||
|
internals.isCaptured = true;
|
||
|
}
|
||
|
}
|
||
|
});
|
||
|
|
||
|
let bytesNotified = 0;
|
||
|
|
||
|
internals.updateProgress = throttle(function throttledHandler() {
|
||
|
const totalBytes = internals.length;
|
||
|
const bytesTransferred = internals.bytesSeen;
|
||
|
const progressBytes = bytesTransferred - bytesNotified;
|
||
|
if (!progressBytes || self.destroyed) return;
|
||
|
|
||
|
const rate = _speedometer(progressBytes);
|
||
|
|
||
|
bytesNotified = bytesTransferred;
|
||
|
|
||
|
process.nextTick(() => {
|
||
|
self.emit('progress', {
|
||
|
'loaded': bytesTransferred,
|
||
|
'total': totalBytes,
|
||
|
'progress': totalBytes ? (bytesTransferred / totalBytes) : undefined,
|
||
|
'bytes': progressBytes,
|
||
|
'rate': rate ? rate : undefined,
|
||
|
'estimated': rate && totalBytes && bytesTransferred <= totalBytes ?
|
||
|
(totalBytes - bytesTransferred) / rate : undefined
|
||
|
});
|
||
|
});
|
||
|
}, internals.ticksRate);
|
||
|
|
||
|
const onFinish = () => {
|
||
|
internals.updateProgress(true);
|
||
|
};
|
||
|
|
||
|
this.once('end', onFinish);
|
||
|
this.once('error', onFinish);
|
||
|
}
|
||
|
|
||
|
_read(size) {
|
||
|
const internals = this[kInternals];
|
||
|
|
||
|
if (internals.onReadCallback) {
|
||
|
internals.onReadCallback();
|
||
|
}
|
||
|
|
||
|
return super._read(size);
|
||
|
}
|
||
|
|
||
|
_transform(chunk, encoding, callback) {
|
||
|
const self = this;
|
||
|
const internals = this[kInternals];
|
||
|
const maxRate = internals.maxRate;
|
||
|
|
||
|
const readableHighWaterMark = this.readableHighWaterMark;
|
||
|
|
||
|
const timeWindow = internals.timeWindow;
|
||
|
|
||
|
const divider = 1000 / timeWindow;
|
||
|
const bytesThreshold = (maxRate / divider);
|
||
|
const minChunkSize = internals.minChunkSize !== false ? Math.max(internals.minChunkSize, bytesThreshold * 0.01) : 0;
|
||
|
|
||
|
function pushChunk(_chunk, _callback) {
|
||
|
const bytes = Buffer.byteLength(_chunk);
|
||
|
internals.bytesSeen += bytes;
|
||
|
internals.bytes += bytes;
|
||
|
|
||
|
if (internals.isCaptured) {
|
||
|
internals.updateProgress();
|
||
|
}
|
||
|
|
||
|
if (self.push(_chunk)) {
|
||
|
process.nextTick(_callback);
|
||
|
} else {
|
||
|
internals.onReadCallback = () => {
|
||
|
internals.onReadCallback = null;
|
||
|
process.nextTick(_callback);
|
||
|
};
|
||
|
}
|
||
|
}
|
||
|
|
||
|
const transformChunk = (_chunk, _callback) => {
|
||
|
const chunkSize = Buffer.byteLength(_chunk);
|
||
|
let chunkRemainder = null;
|
||
|
let maxChunkSize = readableHighWaterMark;
|
||
|
let bytesLeft;
|
||
|
let passed = 0;
|
||
|
|
||
|
if (maxRate) {
|
||
|
const now = Date.now();
|
||
|
|
||
|
if (!internals.ts || (passed = (now - internals.ts)) >= timeWindow) {
|
||
|
internals.ts = now;
|
||
|
bytesLeft = bytesThreshold - internals.bytes;
|
||
|
internals.bytes = bytesLeft < 0 ? -bytesLeft : 0;
|
||
|
passed = 0;
|
||
|
}
|
||
|
|
||
|
bytesLeft = bytesThreshold - internals.bytes;
|
||
|
}
|
||
|
|
||
|
if (maxRate) {
|
||
|
if (bytesLeft <= 0) {
|
||
|
// next time window
|
||
|
return setTimeout(() => {
|
||
|
_callback(null, _chunk);
|
||
|
}, timeWindow - passed);
|
||
|
}
|
||
|
|
||
|
if (bytesLeft < maxChunkSize) {
|
||
|
maxChunkSize = bytesLeft;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (maxChunkSize && chunkSize > maxChunkSize && (chunkSize - maxChunkSize) > minChunkSize) {
|
||
|
chunkRemainder = _chunk.subarray(maxChunkSize);
|
||
|
_chunk = _chunk.subarray(0, maxChunkSize);
|
||
|
}
|
||
|
|
||
|
pushChunk(_chunk, chunkRemainder ? () => {
|
||
|
process.nextTick(_callback, null, chunkRemainder);
|
||
|
} : _callback);
|
||
|
};
|
||
|
|
||
|
transformChunk(chunk, function transformNextChunk(err, _chunk) {
|
||
|
if (err) {
|
||
|
return callback(err);
|
||
|
}
|
||
|
|
||
|
if (_chunk) {
|
||
|
transformChunk(_chunk, transformNextChunk);
|
||
|
} else {
|
||
|
callback(null);
|
||
|
}
|
||
|
});
|
||
|
}
|
||
|
|
||
|
setLength(length) {
|
||
|
this[kInternals].length = +length;
|
||
|
return this;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
export default AxiosTransformStream;
|