mirror of
https://github.com/JasonYANG170/IOTConnect-Web.git
synced 2024-11-23 20:26:28 +00:00
179 lines
4.1 KiB
JavaScript
179 lines
4.1 KiB
JavaScript
|
'use strict'
|
||
|
|
||
|
var Transform = require('readable-stream').Transform
|
||
|
var duplexify = require('duplexify')
|
||
|
var WS = require('ws')
|
||
|
var Buffer = require('safe-buffer').Buffer
|
||
|
|
||
|
module.exports = WebSocketStream
|
||
|
|
||
|
function buildProxy (options, socketWrite, socketEnd) {
|
||
|
var proxy = new Transform({
|
||
|
objectMode: options.objectMode
|
||
|
})
|
||
|
|
||
|
proxy._write = socketWrite
|
||
|
proxy._flush = socketEnd
|
||
|
|
||
|
return proxy
|
||
|
}
|
||
|
|
||
|
function WebSocketStream(target, protocols, options) {
|
||
|
var stream, socket
|
||
|
|
||
|
var isBrowser = process.title === 'browser'
|
||
|
var isNative = !!global.WebSocket
|
||
|
var socketWrite = isBrowser ? socketWriteBrowser : socketWriteNode
|
||
|
|
||
|
if (protocols && !Array.isArray(protocols) && 'object' === typeof protocols) {
|
||
|
// accept the "options" Object as the 2nd argument
|
||
|
options = protocols
|
||
|
protocols = null
|
||
|
|
||
|
if (typeof options.protocol === 'string' || Array.isArray(options.protocol)) {
|
||
|
protocols = options.protocol;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (!options) options = {}
|
||
|
|
||
|
if (options.objectMode === undefined) {
|
||
|
options.objectMode = !(options.binary === true || options.binary === undefined)
|
||
|
}
|
||
|
|
||
|
var proxy = buildProxy(options, socketWrite, socketEnd)
|
||
|
|
||
|
if (!options.objectMode) {
|
||
|
proxy._writev = writev
|
||
|
}
|
||
|
|
||
|
// browser only: sets the maximum socket buffer size before throttling
|
||
|
var bufferSize = options.browserBufferSize || 1024 * 512
|
||
|
|
||
|
// browser only: how long to wait when throttling
|
||
|
var bufferTimeout = options.browserBufferTimeout || 1000
|
||
|
|
||
|
// use existing WebSocket object that was passed in
|
||
|
if (typeof target === 'object') {
|
||
|
socket = target
|
||
|
// otherwise make a new one
|
||
|
} else {
|
||
|
// special constructor treatment for native websockets in browsers, see
|
||
|
// https://github.com/maxogden/websocket-stream/issues/82
|
||
|
if (isNative && isBrowser) {
|
||
|
socket = new WS(target, protocols)
|
||
|
} else {
|
||
|
socket = new WS(target, protocols, options)
|
||
|
}
|
||
|
|
||
|
socket.binaryType = 'arraybuffer'
|
||
|
}
|
||
|
|
||
|
// was already open when passed in
|
||
|
if (socket.readyState === socket.OPEN) {
|
||
|
stream = proxy
|
||
|
} else if (isBrowser) {
|
||
|
stream = proxy
|
||
|
stream.cork()
|
||
|
socket.onopen = onopenBrowser
|
||
|
} else {
|
||
|
stream = duplexify.obj()
|
||
|
socket.onopen = onopen
|
||
|
}
|
||
|
|
||
|
stream.socket = socket
|
||
|
|
||
|
socket.onclose = onclose
|
||
|
socket.onerror = onerror
|
||
|
socket.onmessage = onmessage
|
||
|
|
||
|
proxy.on('close', destroy)
|
||
|
|
||
|
var coerceToBuffer = !options.objectMode
|
||
|
|
||
|
function socketWriteNode(chunk, enc, next) {
|
||
|
// avoid errors, this never happens unless
|
||
|
// destroy() is called
|
||
|
if (socket.readyState !== socket.OPEN) {
|
||
|
next()
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if (coerceToBuffer && typeof chunk === 'string') {
|
||
|
chunk = Buffer.from(chunk, 'utf8')
|
||
|
}
|
||
|
socket.send(chunk, next)
|
||
|
}
|
||
|
|
||
|
function socketWriteBrowser(chunk, enc, next) {
|
||
|
if (socket.bufferedAmount > bufferSize) {
|
||
|
setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if (coerceToBuffer && typeof chunk === 'string') {
|
||
|
chunk = Buffer.from(chunk, 'utf8')
|
||
|
}
|
||
|
|
||
|
try {
|
||
|
socket.send(chunk)
|
||
|
} catch(err) {
|
||
|
return next(err)
|
||
|
}
|
||
|
|
||
|
next()
|
||
|
}
|
||
|
|
||
|
function socketEnd(done) {
|
||
|
socket.close()
|
||
|
done()
|
||
|
}
|
||
|
|
||
|
function onopen() {
|
||
|
stream.setReadable(proxy)
|
||
|
stream.setWritable(proxy)
|
||
|
stream.emit('connect')
|
||
|
}
|
||
|
|
||
|
function onopenBrowser () {
|
||
|
stream.uncork()
|
||
|
stream.emit('connect')
|
||
|
}
|
||
|
|
||
|
function onclose() {
|
||
|
stream.end()
|
||
|
stream.destroy()
|
||
|
}
|
||
|
|
||
|
function onerror(err) {
|
||
|
stream.destroy(err)
|
||
|
}
|
||
|
|
||
|
function onmessage(event) {
|
||
|
var data = event.data
|
||
|
if (data instanceof ArrayBuffer) data = Buffer.from(data)
|
||
|
else data = Buffer.from(data, 'utf8')
|
||
|
proxy.push(data)
|
||
|
}
|
||
|
|
||
|
function destroy() {
|
||
|
socket.close()
|
||
|
}
|
||
|
|
||
|
// this is to be enabled only if objectMode is false
|
||
|
function writev (chunks, cb) {
|
||
|
var buffers = new Array(chunks.length)
|
||
|
for (var i = 0; i < chunks.length; i++) {
|
||
|
if (typeof chunks[i].chunk === 'string') {
|
||
|
buffers[i] = Buffer.from(chunks[i], 'utf8')
|
||
|
} else {
|
||
|
buffers[i] = chunks[i].chunk
|
||
|
}
|
||
|
}
|
||
|
|
||
|
this._write(Buffer.concat(buffers), 'binary', cb)
|
||
|
}
|
||
|
|
||
|
return stream
|
||
|
}
|