mirror of
https://github.com/JasonYANG170/IOTConnect-Web.git
synced 2024-11-23 20:26:28 +00:00
1128 lines
30 KiB
JavaScript
1128 lines
30 KiB
JavaScript
|
const protocol = require('./constants')
|
||
|
const { Buffer } = require('buffer')
|
||
|
const empty = Buffer.allocUnsafe(0)
|
||
|
const zeroBuf = Buffer.from([0])
|
||
|
const numbers = require('./numbers')
|
||
|
const nextTick = require('process-nextick-args').nextTick
|
||
|
const debug = require('debug')('mqtt-packet:writeToStream')
|
||
|
|
||
|
const numCache = numbers.cache
|
||
|
const generateNumber = numbers.generateNumber
|
||
|
const generateCache = numbers.generateCache
|
||
|
const genBufVariableByteInt = numbers.genBufVariableByteInt
|
||
|
const generate4ByteBuffer = numbers.generate4ByteBuffer
|
||
|
let writeNumber = writeNumberCached
|
||
|
let toGenerate = true
|
||
|
|
||
|
function generate (packet, stream, opts) {
|
||
|
debug('generate called')
|
||
|
if (stream.cork) {
|
||
|
stream.cork()
|
||
|
nextTick(uncork, stream)
|
||
|
}
|
||
|
|
||
|
if (toGenerate) {
|
||
|
toGenerate = false
|
||
|
generateCache()
|
||
|
}
|
||
|
debug('generate: packet.cmd: %s', packet.cmd)
|
||
|
switch (packet.cmd) {
|
||
|
case 'connect':
|
||
|
return connect(packet, stream, opts)
|
||
|
case 'connack':
|
||
|
return connack(packet, stream, opts)
|
||
|
case 'publish':
|
||
|
return publish(packet, stream, opts)
|
||
|
case 'puback':
|
||
|
case 'pubrec':
|
||
|
case 'pubrel':
|
||
|
case 'pubcomp':
|
||
|
return confirmation(packet, stream, opts)
|
||
|
case 'subscribe':
|
||
|
return subscribe(packet, stream, opts)
|
||
|
case 'suback':
|
||
|
return suback(packet, stream, opts)
|
||
|
case 'unsubscribe':
|
||
|
return unsubscribe(packet, stream, opts)
|
||
|
case 'unsuback':
|
||
|
return unsuback(packet, stream, opts)
|
||
|
case 'pingreq':
|
||
|
case 'pingresp':
|
||
|
return emptyPacket(packet, stream, opts)
|
||
|
case 'disconnect':
|
||
|
return disconnect(packet, stream, opts)
|
||
|
case 'auth':
|
||
|
return auth(packet, stream, opts)
|
||
|
default:
|
||
|
stream.destroy(new Error('Unknown command'))
|
||
|
return false
|
||
|
}
|
||
|
}
|
||
|
/**
|
||
|
* Controls numbers cache.
|
||
|
* Set to "false" to allocate buffers on-the-flight instead of pre-generated cache
|
||
|
*/
|
||
|
Object.defineProperty(generate, 'cacheNumbers', {
|
||
|
get () {
|
||
|
return writeNumber === writeNumberCached
|
||
|
},
|
||
|
set (value) {
|
||
|
if (value) {
|
||
|
if (!numCache || Object.keys(numCache).length === 0) toGenerate = true
|
||
|
writeNumber = writeNumberCached
|
||
|
} else {
|
||
|
toGenerate = false
|
||
|
writeNumber = writeNumberGenerated
|
||
|
}
|
||
|
}
|
||
|
})
|
||
|
|
||
|
function uncork (stream) {
|
||
|
stream.uncork()
|
||
|
}
|
||
|
|
||
|
function connect (packet, stream, opts) {
|
||
|
const settings = packet || {}
|
||
|
const protocolId = settings.protocolId || 'MQTT'
|
||
|
let protocolVersion = settings.protocolVersion || 4
|
||
|
const will = settings.will
|
||
|
let clean = settings.clean
|
||
|
const keepalive = settings.keepalive || 0
|
||
|
const clientId = settings.clientId || ''
|
||
|
const username = settings.username
|
||
|
const password = settings.password
|
||
|
/* mqtt5 new oprions */
|
||
|
const properties = settings.properties
|
||
|
|
||
|
if (clean === undefined) clean = true
|
||
|
|
||
|
let length = 0
|
||
|
|
||
|
// Must be a string and non-falsy
|
||
|
if (!protocolId ||
|
||
|
(typeof protocolId !== 'string' && !Buffer.isBuffer(protocolId))) {
|
||
|
stream.destroy(new Error('Invalid protocolId'))
|
||
|
return false
|
||
|
} else length += protocolId.length + 2
|
||
|
|
||
|
// Must be 3 or 4 or 5
|
||
|
if (protocolVersion !== 3 && protocolVersion !== 4 && protocolVersion !== 5) {
|
||
|
stream.destroy(new Error('Invalid protocol version'))
|
||
|
return false
|
||
|
} else length += 1
|
||
|
|
||
|
// ClientId might be omitted in 3.1.1 and 5, but only if cleanSession is set to 1
|
||
|
if ((typeof clientId === 'string' || Buffer.isBuffer(clientId)) &&
|
||
|
(clientId || protocolVersion >= 4) && (clientId || clean)) {
|
||
|
length += Buffer.byteLength(clientId) + 2
|
||
|
} else {
|
||
|
if (protocolVersion < 4) {
|
||
|
stream.destroy(new Error('clientId must be supplied before 3.1.1'))
|
||
|
return false
|
||
|
}
|
||
|
if ((clean * 1) === 0) {
|
||
|
stream.destroy(new Error('clientId must be given if cleanSession set to 0'))
|
||
|
return false
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Must be a two byte number
|
||
|
if (typeof keepalive !== 'number' ||
|
||
|
keepalive < 0 ||
|
||
|
keepalive > 65535 ||
|
||
|
keepalive % 1 !== 0) {
|
||
|
stream.destroy(new Error('Invalid keepalive'))
|
||
|
return false
|
||
|
} else length += 2
|
||
|
|
||
|
// Connect flags
|
||
|
length += 1
|
||
|
|
||
|
let propertiesData
|
||
|
let willProperties
|
||
|
|
||
|
// Properties
|
||
|
if (protocolVersion === 5) {
|
||
|
propertiesData = getProperties(stream, properties)
|
||
|
if (!propertiesData) { return false }
|
||
|
length += propertiesData.length
|
||
|
}
|
||
|
|
||
|
// If will exists...
|
||
|
if (will) {
|
||
|
// It must be an object
|
||
|
if (typeof will !== 'object') {
|
||
|
stream.destroy(new Error('Invalid will'))
|
||
|
return false
|
||
|
}
|
||
|
// It must have topic typeof string
|
||
|
if (!will.topic || typeof will.topic !== 'string') {
|
||
|
stream.destroy(new Error('Invalid will topic'))
|
||
|
return false
|
||
|
} else {
|
||
|
length += Buffer.byteLength(will.topic) + 2
|
||
|
}
|
||
|
|
||
|
// Payload
|
||
|
length += 2 // payload length
|
||
|
if (will.payload) {
|
||
|
if (will.payload.length >= 0) {
|
||
|
if (typeof will.payload === 'string') {
|
||
|
length += Buffer.byteLength(will.payload)
|
||
|
} else {
|
||
|
length += will.payload.length
|
||
|
}
|
||
|
} else {
|
||
|
stream.destroy(new Error('Invalid will payload'))
|
||
|
return false
|
||
|
}
|
||
|
}
|
||
|
// will properties
|
||
|
willProperties = {}
|
||
|
if (protocolVersion === 5) {
|
||
|
willProperties = getProperties(stream, will.properties)
|
||
|
if (!willProperties) { return false }
|
||
|
length += willProperties.length
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Username
|
||
|
let providedUsername = false
|
||
|
if (username != null) {
|
||
|
if (isStringOrBuffer(username)) {
|
||
|
providedUsername = true
|
||
|
length += Buffer.byteLength(username) + 2
|
||
|
} else {
|
||
|
stream.destroy(new Error('Invalid username'))
|
||
|
return false
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Password
|
||
|
if (password != null) {
|
||
|
if (!providedUsername) {
|
||
|
stream.destroy(new Error('Username is required to use password'))
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
if (isStringOrBuffer(password)) {
|
||
|
length += byteLength(password) + 2
|
||
|
} else {
|
||
|
stream.destroy(new Error('Invalid password'))
|
||
|
return false
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Generate header
|
||
|
stream.write(protocol.CONNECT_HEADER)
|
||
|
|
||
|
// Generate length
|
||
|
writeVarByteInt(stream, length)
|
||
|
|
||
|
// Generate protocol ID
|
||
|
writeStringOrBuffer(stream, protocolId)
|
||
|
|
||
|
if (settings.bridgeMode) {
|
||
|
protocolVersion += 128
|
||
|
}
|
||
|
|
||
|
stream.write(
|
||
|
protocolVersion === 131
|
||
|
? protocol.VERSION131
|
||
|
: protocolVersion === 132
|
||
|
? protocol.VERSION132
|
||
|
: protocolVersion === 4
|
||
|
? protocol.VERSION4
|
||
|
: protocolVersion === 5
|
||
|
? protocol.VERSION5
|
||
|
: protocol.VERSION3
|
||
|
)
|
||
|
|
||
|
// Connect flags
|
||
|
let flags = 0
|
||
|
flags |= (username != null) ? protocol.USERNAME_MASK : 0
|
||
|
flags |= (password != null) ? protocol.PASSWORD_MASK : 0
|
||
|
flags |= (will && will.retain) ? protocol.WILL_RETAIN_MASK : 0
|
||
|
flags |= (will && will.qos) ? will.qos << protocol.WILL_QOS_SHIFT : 0
|
||
|
flags |= will ? protocol.WILL_FLAG_MASK : 0
|
||
|
flags |= clean ? protocol.CLEAN_SESSION_MASK : 0
|
||
|
|
||
|
stream.write(Buffer.from([flags]))
|
||
|
|
||
|
// Keepalive
|
||
|
writeNumber(stream, keepalive)
|
||
|
|
||
|
// Properties
|
||
|
if (protocolVersion === 5) {
|
||
|
propertiesData.write()
|
||
|
}
|
||
|
|
||
|
// Client ID
|
||
|
writeStringOrBuffer(stream, clientId)
|
||
|
|
||
|
// Will
|
||
|
if (will) {
|
||
|
if (protocolVersion === 5) {
|
||
|
willProperties.write()
|
||
|
}
|
||
|
writeString(stream, will.topic)
|
||
|
writeStringOrBuffer(stream, will.payload)
|
||
|
}
|
||
|
|
||
|
// Username and password
|
||
|
if (username != null) {
|
||
|
writeStringOrBuffer(stream, username)
|
||
|
}
|
||
|
if (password != null) {
|
||
|
writeStringOrBuffer(stream, password)
|
||
|
}
|
||
|
// This is a small packet that happens only once on a stream
|
||
|
// We assume the stream is always free to receive more data after this
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
function connack (packet, stream, opts) {
|
||
|
const version = opts ? opts.protocolVersion : 4
|
||
|
const settings = packet || {}
|
||
|
const rc = version === 5 ? settings.reasonCode : settings.returnCode
|
||
|
const properties = settings.properties
|
||
|
let length = 2 // length of rc and sessionHeader
|
||
|
|
||
|
// Check return code
|
||
|
if (typeof rc !== 'number') {
|
||
|
stream.destroy(new Error('Invalid return code'))
|
||
|
return false
|
||
|
}
|
||
|
// mqtt5 properties
|
||
|
let propertiesData = null
|
||
|
if (version === 5) {
|
||
|
propertiesData = getProperties(stream, properties)
|
||
|
if (!propertiesData) { return false }
|
||
|
length += propertiesData.length
|
||
|
}
|
||
|
|
||
|
stream.write(protocol.CONNACK_HEADER)
|
||
|
// length
|
||
|
writeVarByteInt(stream, length)
|
||
|
stream.write(settings.sessionPresent ? protocol.SESSIONPRESENT_HEADER : zeroBuf)
|
||
|
|
||
|
stream.write(Buffer.from([rc]))
|
||
|
if (propertiesData != null) {
|
||
|
propertiesData.write()
|
||
|
}
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
function publish (packet, stream, opts) {
|
||
|
debug('publish: packet: %o', packet)
|
||
|
const version = opts ? opts.protocolVersion : 4
|
||
|
const settings = packet || {}
|
||
|
const qos = settings.qos || 0
|
||
|
const retain = settings.retain ? protocol.RETAIN_MASK : 0
|
||
|
const topic = settings.topic
|
||
|
const payload = settings.payload || empty
|
||
|
const id = settings.messageId
|
||
|
const properties = settings.properties
|
||
|
|
||
|
let length = 0
|
||
|
|
||
|
// Topic must be a non-empty string or Buffer
|
||
|
if (typeof topic === 'string') length += Buffer.byteLength(topic) + 2
|
||
|
else if (Buffer.isBuffer(topic)) length += topic.length + 2
|
||
|
else {
|
||
|
stream.destroy(new Error('Invalid topic'))
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
// Get the payload length
|
||
|
if (!Buffer.isBuffer(payload)) length += Buffer.byteLength(payload)
|
||
|
else length += payload.length
|
||
|
|
||
|
// Message ID must a number if qos > 0
|
||
|
if (qos && typeof id !== 'number') {
|
||
|
stream.destroy(new Error('Invalid messageId'))
|
||
|
return false
|
||
|
} else if (qos) length += 2
|
||
|
|
||
|
// mqtt5 properties
|
||
|
let propertiesData = null
|
||
|
if (version === 5) {
|
||
|
propertiesData = getProperties(stream, properties)
|
||
|
if (!propertiesData) { return false }
|
||
|
length += propertiesData.length
|
||
|
}
|
||
|
|
||
|
// Header
|
||
|
stream.write(protocol.PUBLISH_HEADER[qos][settings.dup ? 1 : 0][retain ? 1 : 0])
|
||
|
|
||
|
// Remaining length
|
||
|
writeVarByteInt(stream, length)
|
||
|
|
||
|
// Topic
|
||
|
writeNumber(stream, byteLength(topic))
|
||
|
stream.write(topic)
|
||
|
|
||
|
// Message ID
|
||
|
if (qos > 0) writeNumber(stream, id)
|
||
|
|
||
|
// Properties
|
||
|
if (propertiesData != null) {
|
||
|
propertiesData.write()
|
||
|
}
|
||
|
|
||
|
// Payload
|
||
|
debug('publish: payload: %o', payload)
|
||
|
return stream.write(payload)
|
||
|
}
|
||
|
|
||
|
/* Puback, pubrec, pubrel and pubcomp */
|
||
|
function confirmation (packet, stream, opts) {
|
||
|
const version = opts ? opts.protocolVersion : 4
|
||
|
const settings = packet || {}
|
||
|
const type = settings.cmd || 'puback'
|
||
|
const id = settings.messageId
|
||
|
const dup = (settings.dup && type === 'pubrel') ? protocol.DUP_MASK : 0
|
||
|
let qos = 0
|
||
|
const reasonCode = settings.reasonCode
|
||
|
const properties = settings.properties
|
||
|
let length = version === 5 ? 3 : 2
|
||
|
|
||
|
if (type === 'pubrel') qos = 1
|
||
|
|
||
|
// Check message ID
|
||
|
if (typeof id !== 'number') {
|
||
|
stream.destroy(new Error('Invalid messageId'))
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
// properies mqtt 5
|
||
|
let propertiesData = null
|
||
|
if (version === 5) {
|
||
|
// Confirm should not add empty property length with no properties (rfc 3.4.2.2.1)
|
||
|
if (typeof properties === 'object') {
|
||
|
propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
|
||
|
if (!propertiesData) { return false }
|
||
|
length += propertiesData.length
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Header
|
||
|
stream.write(protocol.ACKS[type][qos][dup][0])
|
||
|
|
||
|
// Length === 3 is only true of version === 5 and no properties; therefore if reasonCode === 0 we are allowed to skip both bytes - but if we write the reason code we also have to write property length [MQTT-3.4.2-1].
|
||
|
if (length === 3) length += reasonCode !== 0 ? 1 : -1
|
||
|
writeVarByteInt(stream, length)
|
||
|
|
||
|
// Message ID
|
||
|
writeNumber(stream, id)
|
||
|
|
||
|
// reason code in header - but only if it couldn't be omitted - indicated by length !== 2.
|
||
|
if (version === 5 && length !== 2) {
|
||
|
stream.write(Buffer.from([reasonCode]))
|
||
|
}
|
||
|
|
||
|
// properties mqtt 5
|
||
|
if (propertiesData !== null) {
|
||
|
propertiesData.write()
|
||
|
} else {
|
||
|
if (length === 4) {
|
||
|
// we have no properties but have written a reason code - so we need to indicate empty properties by filling in a zero.
|
||
|
stream.write(Buffer.from([0]))
|
||
|
}
|
||
|
}
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
function subscribe (packet, stream, opts) {
|
||
|
debug('subscribe: packet: ')
|
||
|
const version = opts ? opts.protocolVersion : 4
|
||
|
const settings = packet || {}
|
||
|
const dup = settings.dup ? protocol.DUP_MASK : 0
|
||
|
const id = settings.messageId
|
||
|
const subs = settings.subscriptions
|
||
|
const properties = settings.properties
|
||
|
|
||
|
let length = 0
|
||
|
|
||
|
// Check message ID
|
||
|
if (typeof id !== 'number') {
|
||
|
stream.destroy(new Error('Invalid messageId'))
|
||
|
return false
|
||
|
} else length += 2
|
||
|
|
||
|
// properies mqtt 5
|
||
|
let propertiesData = null
|
||
|
if (version === 5) {
|
||
|
propertiesData = getProperties(stream, properties)
|
||
|
if (!propertiesData) { return false }
|
||
|
length += propertiesData.length
|
||
|
}
|
||
|
|
||
|
// Check subscriptions
|
||
|
if (typeof subs === 'object' && subs.length) {
|
||
|
for (let i = 0; i < subs.length; i += 1) {
|
||
|
const itopic = subs[i].topic
|
||
|
const iqos = subs[i].qos
|
||
|
|
||
|
if (typeof itopic !== 'string') {
|
||
|
stream.destroy(new Error('Invalid subscriptions - invalid topic'))
|
||
|
return false
|
||
|
}
|
||
|
if (typeof iqos !== 'number') {
|
||
|
stream.destroy(new Error('Invalid subscriptions - invalid qos'))
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
if (version === 5) {
|
||
|
const nl = subs[i].nl || false
|
||
|
if (typeof nl !== 'boolean') {
|
||
|
stream.destroy(new Error('Invalid subscriptions - invalid No Local'))
|
||
|
return false
|
||
|
}
|
||
|
const rap = subs[i].rap || false
|
||
|
if (typeof rap !== 'boolean') {
|
||
|
stream.destroy(new Error('Invalid subscriptions - invalid Retain as Published'))
|
||
|
return false
|
||
|
}
|
||
|
const rh = subs[i].rh || 0
|
||
|
if (typeof rh !== 'number' || rh > 2) {
|
||
|
stream.destroy(new Error('Invalid subscriptions - invalid Retain Handling'))
|
||
|
return false
|
||
|
}
|
||
|
}
|
||
|
|
||
|
length += Buffer.byteLength(itopic) + 2 + 1
|
||
|
}
|
||
|
} else {
|
||
|
stream.destroy(new Error('Invalid subscriptions'))
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
// Generate header
|
||
|
debug('subscribe: writing to stream: %o', protocol.SUBSCRIBE_HEADER)
|
||
|
stream.write(protocol.SUBSCRIBE_HEADER[1][dup ? 1 : 0][0])
|
||
|
|
||
|
// Generate length
|
||
|
writeVarByteInt(stream, length)
|
||
|
|
||
|
// Generate message ID
|
||
|
writeNumber(stream, id)
|
||
|
|
||
|
// properies mqtt 5
|
||
|
if (propertiesData !== null) {
|
||
|
propertiesData.write()
|
||
|
}
|
||
|
|
||
|
let result = true
|
||
|
|
||
|
// Generate subs
|
||
|
for (const sub of subs) {
|
||
|
const jtopic = sub.topic
|
||
|
const jqos = sub.qos
|
||
|
const jnl = +sub.nl
|
||
|
const jrap = +sub.rap
|
||
|
const jrh = sub.rh
|
||
|
let joptions
|
||
|
|
||
|
// Write topic string
|
||
|
writeString(stream, jtopic)
|
||
|
|
||
|
// options process
|
||
|
joptions = protocol.SUBSCRIBE_OPTIONS_QOS[jqos]
|
||
|
if (version === 5) {
|
||
|
joptions |= jnl ? protocol.SUBSCRIBE_OPTIONS_NL : 0
|
||
|
joptions |= jrap ? protocol.SUBSCRIBE_OPTIONS_RAP : 0
|
||
|
joptions |= jrh ? protocol.SUBSCRIBE_OPTIONS_RH[jrh] : 0
|
||
|
}
|
||
|
// Write options
|
||
|
result = stream.write(Buffer.from([joptions]))
|
||
|
}
|
||
|
|
||
|
return result
|
||
|
}
|
||
|
|
||
|
function suback (packet, stream, opts) {
|
||
|
const version = opts ? opts.protocolVersion : 4
|
||
|
const settings = packet || {}
|
||
|
const id = settings.messageId
|
||
|
const granted = settings.granted
|
||
|
const properties = settings.properties
|
||
|
let length = 0
|
||
|
|
||
|
// Check message ID
|
||
|
if (typeof id !== 'number') {
|
||
|
stream.destroy(new Error('Invalid messageId'))
|
||
|
return false
|
||
|
} else length += 2
|
||
|
|
||
|
// Check granted qos vector
|
||
|
if (typeof granted === 'object' && granted.length) {
|
||
|
for (let i = 0; i < granted.length; i += 1) {
|
||
|
if (typeof granted[i] !== 'number') {
|
||
|
stream.destroy(new Error('Invalid qos vector'))
|
||
|
return false
|
||
|
}
|
||
|
length += 1
|
||
|
}
|
||
|
} else {
|
||
|
stream.destroy(new Error('Invalid qos vector'))
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
// properies mqtt 5
|
||
|
let propertiesData = null
|
||
|
if (version === 5) {
|
||
|
propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
|
||
|
if (!propertiesData) { return false }
|
||
|
length += propertiesData.length
|
||
|
}
|
||
|
|
||
|
// header
|
||
|
stream.write(protocol.SUBACK_HEADER)
|
||
|
|
||
|
// Length
|
||
|
writeVarByteInt(stream, length)
|
||
|
|
||
|
// Message ID
|
||
|
writeNumber(stream, id)
|
||
|
|
||
|
// properies mqtt 5
|
||
|
if (propertiesData !== null) {
|
||
|
propertiesData.write()
|
||
|
}
|
||
|
|
||
|
return stream.write(Buffer.from(granted))
|
||
|
}
|
||
|
|
||
|
function unsubscribe (packet, stream, opts) {
|
||
|
const version = opts ? opts.protocolVersion : 4
|
||
|
const settings = packet || {}
|
||
|
const id = settings.messageId
|
||
|
const dup = settings.dup ? protocol.DUP_MASK : 0
|
||
|
const unsubs = settings.unsubscriptions
|
||
|
const properties = settings.properties
|
||
|
|
||
|
let length = 0
|
||
|
|
||
|
// Check message ID
|
||
|
if (typeof id !== 'number') {
|
||
|
stream.destroy(new Error('Invalid messageId'))
|
||
|
return false
|
||
|
} else {
|
||
|
length += 2
|
||
|
}
|
||
|
// Check unsubs
|
||
|
if (typeof unsubs === 'object' && unsubs.length) {
|
||
|
for (let i = 0; i < unsubs.length; i += 1) {
|
||
|
if (typeof unsubs[i] !== 'string') {
|
||
|
stream.destroy(new Error('Invalid unsubscriptions'))
|
||
|
return false
|
||
|
}
|
||
|
length += Buffer.byteLength(unsubs[i]) + 2
|
||
|
}
|
||
|
} else {
|
||
|
stream.destroy(new Error('Invalid unsubscriptions'))
|
||
|
return false
|
||
|
}
|
||
|
// properies mqtt 5
|
||
|
let propertiesData = null
|
||
|
if (version === 5) {
|
||
|
propertiesData = getProperties(stream, properties)
|
||
|
if (!propertiesData) { return false }
|
||
|
length += propertiesData.length
|
||
|
}
|
||
|
|
||
|
// Header
|
||
|
stream.write(protocol.UNSUBSCRIBE_HEADER[1][dup ? 1 : 0][0])
|
||
|
|
||
|
// Length
|
||
|
writeVarByteInt(stream, length)
|
||
|
|
||
|
// Message ID
|
||
|
writeNumber(stream, id)
|
||
|
|
||
|
// properies mqtt 5
|
||
|
if (propertiesData !== null) {
|
||
|
propertiesData.write()
|
||
|
}
|
||
|
|
||
|
// Unsubs
|
||
|
let result = true
|
||
|
for (let j = 0; j < unsubs.length; j++) {
|
||
|
result = writeString(stream, unsubs[j])
|
||
|
}
|
||
|
|
||
|
return result
|
||
|
}
|
||
|
|
||
|
function unsuback (packet, stream, opts) {
|
||
|
const version = opts ? opts.protocolVersion : 4
|
||
|
const settings = packet || {}
|
||
|
const id = settings.messageId
|
||
|
const dup = settings.dup ? protocol.DUP_MASK : 0
|
||
|
const granted = settings.granted
|
||
|
const properties = settings.properties
|
||
|
const type = settings.cmd
|
||
|
const qos = 0
|
||
|
|
||
|
let length = 2
|
||
|
|
||
|
// Check message ID
|
||
|
if (typeof id !== 'number') {
|
||
|
stream.destroy(new Error('Invalid messageId'))
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
// Check granted
|
||
|
if (version === 5) {
|
||
|
if (typeof granted === 'object' && granted.length) {
|
||
|
for (let i = 0; i < granted.length; i += 1) {
|
||
|
if (typeof granted[i] !== 'number') {
|
||
|
stream.destroy(new Error('Invalid qos vector'))
|
||
|
return false
|
||
|
}
|
||
|
length += 1
|
||
|
}
|
||
|
} else {
|
||
|
stream.destroy(new Error('Invalid qos vector'))
|
||
|
return false
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// properies mqtt 5
|
||
|
let propertiesData = null
|
||
|
if (version === 5) {
|
||
|
propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
|
||
|
if (!propertiesData) { return false }
|
||
|
length += propertiesData.length
|
||
|
}
|
||
|
|
||
|
// Header
|
||
|
stream.write(protocol.ACKS[type][qos][dup][0])
|
||
|
|
||
|
// Length
|
||
|
writeVarByteInt(stream, length)
|
||
|
|
||
|
// Message ID
|
||
|
writeNumber(stream, id)
|
||
|
|
||
|
// properies mqtt 5
|
||
|
if (propertiesData !== null) {
|
||
|
propertiesData.write()
|
||
|
}
|
||
|
|
||
|
// payload
|
||
|
if (version === 5) {
|
||
|
stream.write(Buffer.from(granted))
|
||
|
}
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
function emptyPacket (packet, stream, opts) {
|
||
|
return stream.write(protocol.EMPTY[packet.cmd])
|
||
|
}
|
||
|
|
||
|
function disconnect (packet, stream, opts) {
|
||
|
const version = opts ? opts.protocolVersion : 4
|
||
|
const settings = packet || {}
|
||
|
const reasonCode = settings.reasonCode
|
||
|
const properties = settings.properties
|
||
|
let length = version === 5 ? 1 : 0
|
||
|
|
||
|
// properies mqtt 5
|
||
|
let propertiesData = null
|
||
|
if (version === 5) {
|
||
|
propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
|
||
|
if (!propertiesData) { return false }
|
||
|
length += propertiesData.length
|
||
|
}
|
||
|
|
||
|
// Header
|
||
|
stream.write(Buffer.from([protocol.codes.disconnect << 4]))
|
||
|
|
||
|
// Length
|
||
|
writeVarByteInt(stream, length)
|
||
|
|
||
|
// reason code in header
|
||
|
if (version === 5) {
|
||
|
stream.write(Buffer.from([reasonCode]))
|
||
|
}
|
||
|
|
||
|
// properies mqtt 5
|
||
|
if (propertiesData !== null) {
|
||
|
propertiesData.write()
|
||
|
}
|
||
|
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
function auth (packet, stream, opts) {
|
||
|
const version = opts ? opts.protocolVersion : 4
|
||
|
const settings = packet || {}
|
||
|
const reasonCode = settings.reasonCode
|
||
|
const properties = settings.properties
|
||
|
let length = version === 5 ? 1 : 0
|
||
|
|
||
|
if (version !== 5) stream.destroy(new Error('Invalid mqtt version for auth packet'))
|
||
|
|
||
|
// properies mqtt 5
|
||
|
const propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
|
||
|
if (!propertiesData) { return false }
|
||
|
length += propertiesData.length
|
||
|
|
||
|
// Header
|
||
|
stream.write(Buffer.from([protocol.codes.auth << 4]))
|
||
|
|
||
|
// Length
|
||
|
writeVarByteInt(stream, length)
|
||
|
|
||
|
// reason code in header
|
||
|
stream.write(Buffer.from([reasonCode]))
|
||
|
|
||
|
// properies mqtt 5
|
||
|
if (propertiesData !== null) {
|
||
|
propertiesData.write()
|
||
|
}
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* writeVarByteInt - write an MQTT style variable byte integer to the buffer
|
||
|
*
|
||
|
* @param <Buffer> buffer - destination
|
||
|
* @param <Number> pos - offset
|
||
|
* @param <Number> length - length (>0)
|
||
|
* @returns <Number> number of bytes written
|
||
|
*
|
||
|
* @api private
|
||
|
*/
|
||
|
|
||
|
const varByteIntCache = {}
|
||
|
function writeVarByteInt (stream, num) {
|
||
|
if (num > protocol.VARBYTEINT_MAX) {
|
||
|
stream.destroy(new Error(`Invalid variable byte integer: ${num}`))
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
let buffer = varByteIntCache[num]
|
||
|
|
||
|
if (!buffer) {
|
||
|
buffer = genBufVariableByteInt(num)
|
||
|
if (num < 16384) varByteIntCache[num] = buffer
|
||
|
}
|
||
|
debug('writeVarByteInt: writing to stream: %o', buffer)
|
||
|
return stream.write(buffer)
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* writeString - write a utf8 string to the buffer
|
||
|
*
|
||
|
* @param <Buffer> buffer - destination
|
||
|
* @param <Number> pos - offset
|
||
|
* @param <String> string - string to write
|
||
|
* @return <Number> number of bytes written
|
||
|
*
|
||
|
* @api private
|
||
|
*/
|
||
|
|
||
|
function writeString (stream, string) {
|
||
|
const strlen = Buffer.byteLength(string)
|
||
|
writeNumber(stream, strlen)
|
||
|
|
||
|
debug('writeString: %s', string)
|
||
|
return stream.write(string, 'utf8')
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* writeStringPair - write a utf8 string pairs to the buffer
|
||
|
*
|
||
|
* @param <Buffer> buffer - destination
|
||
|
* @param <String> name - string name to write
|
||
|
* @param <String> value - string value to write
|
||
|
* @return <Number> number of bytes written
|
||
|
*
|
||
|
* @api private
|
||
|
*/
|
||
|
function writeStringPair (stream, name, value) {
|
||
|
writeString(stream, name)
|
||
|
writeString(stream, value)
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* writeNumber - write a two byte number to the buffer
|
||
|
*
|
||
|
* @param <Buffer> buffer - destination
|
||
|
* @param <Number> pos - offset
|
||
|
* @param <String> number - number to write
|
||
|
* @return <Number> number of bytes written
|
||
|
*
|
||
|
* @api private
|
||
|
*/
|
||
|
function writeNumberCached (stream, number) {
|
||
|
debug('writeNumberCached: number: %d', number)
|
||
|
debug('writeNumberCached: %o', numCache[number])
|
||
|
return stream.write(numCache[number])
|
||
|
}
|
||
|
function writeNumberGenerated (stream, number) {
|
||
|
const generatedNumber = generateNumber(number)
|
||
|
debug('writeNumberGenerated: %o', generatedNumber)
|
||
|
return stream.write(generatedNumber)
|
||
|
}
|
||
|
function write4ByteNumber (stream, number) {
|
||
|
const generated4ByteBuffer = generate4ByteBuffer(number)
|
||
|
debug('write4ByteNumber: %o', generated4ByteBuffer)
|
||
|
return stream.write(generated4ByteBuffer)
|
||
|
}
|
||
|
/**
|
||
|
* writeStringOrBuffer - write a String or Buffer with the its length prefix
|
||
|
*
|
||
|
* @param <Buffer> buffer - destination
|
||
|
* @param <Number> pos - offset
|
||
|
* @param <String> toWrite - String or Buffer
|
||
|
* @return <Number> number of bytes written
|
||
|
*/
|
||
|
function writeStringOrBuffer (stream, toWrite) {
|
||
|
if (typeof toWrite === 'string') {
|
||
|
writeString(stream, toWrite)
|
||
|
} else if (toWrite) {
|
||
|
writeNumber(stream, toWrite.length)
|
||
|
stream.write(toWrite)
|
||
|
} else writeNumber(stream, 0)
|
||
|
}
|
||
|
|
||
|
function getProperties (stream, properties) {
|
||
|
/* connect properties */
|
||
|
if (typeof properties !== 'object' || properties.length != null) {
|
||
|
return {
|
||
|
length: 1,
|
||
|
write () {
|
||
|
writeProperties(stream, {}, 0)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
let propertiesLength = 0
|
||
|
function getLengthProperty (name, value) {
|
||
|
const type = protocol.propertiesTypes[name]
|
||
|
let length = 0
|
||
|
switch (type) {
|
||
|
case 'byte': {
|
||
|
if (typeof value !== 'boolean') {
|
||
|
stream.destroy(new Error(`Invalid ${name}: ${value}`))
|
||
|
return false
|
||
|
}
|
||
|
length += 1 + 1
|
||
|
break
|
||
|
}
|
||
|
case 'int8': {
|
||
|
if (typeof value !== 'number' || value < 0 || value > 0xff) {
|
||
|
stream.destroy(new Error(`Invalid ${name}: ${value}`))
|
||
|
return false
|
||
|
}
|
||
|
length += 1 + 1
|
||
|
break
|
||
|
}
|
||
|
case 'binary': {
|
||
|
if (value && value === null) {
|
||
|
stream.destroy(new Error(`Invalid ${name}: ${value}`))
|
||
|
return false
|
||
|
}
|
||
|
length += 1 + Buffer.byteLength(value) + 2
|
||
|
break
|
||
|
}
|
||
|
case 'int16': {
|
||
|
if (typeof value !== 'number' || value < 0 || value > 0xffff) {
|
||
|
stream.destroy(new Error(`Invalid ${name}: ${value}`))
|
||
|
return false
|
||
|
}
|
||
|
length += 1 + 2
|
||
|
break
|
||
|
}
|
||
|
case 'int32': {
|
||
|
if (typeof value !== 'number' || value < 0 || value > 0xffffffff) {
|
||
|
stream.destroy(new Error(`Invalid ${name}: ${value}`))
|
||
|
return false
|
||
|
}
|
||
|
length += 1 + 4
|
||
|
break
|
||
|
}
|
||
|
case 'var': {
|
||
|
// var byte integer is max 24 bits packed in 32 bits
|
||
|
if (typeof value !== 'number' || value < 0 || value > 0x0fffffff) {
|
||
|
stream.destroy(new Error(`Invalid ${name}: ${value}`))
|
||
|
return false
|
||
|
}
|
||
|
length += 1 + Buffer.byteLength(genBufVariableByteInt(value))
|
||
|
break
|
||
|
}
|
||
|
case 'string': {
|
||
|
if (typeof value !== 'string') {
|
||
|
stream.destroy(new Error(`Invalid ${name}: ${value}`))
|
||
|
return false
|
||
|
}
|
||
|
length += 1 + 2 + Buffer.byteLength(value.toString())
|
||
|
break
|
||
|
}
|
||
|
case 'pair': {
|
||
|
if (typeof value !== 'object') {
|
||
|
stream.destroy(new Error(`Invalid ${name}: ${value}`))
|
||
|
return false
|
||
|
}
|
||
|
length += Object.getOwnPropertyNames(value).reduce((result, name) => {
|
||
|
const currentValue = value[name]
|
||
|
if (Array.isArray(currentValue)) {
|
||
|
result += currentValue.reduce((currentLength, value) => {
|
||
|
currentLength += 1 + 2 + Buffer.byteLength(name.toString()) + 2 + Buffer.byteLength(value.toString())
|
||
|
return currentLength
|
||
|
}, 0)
|
||
|
} else {
|
||
|
result += 1 + 2 + Buffer.byteLength(name.toString()) + 2 + Buffer.byteLength(value[name].toString())
|
||
|
}
|
||
|
return result
|
||
|
}, 0)
|
||
|
break
|
||
|
}
|
||
|
default: {
|
||
|
stream.destroy(new Error(`Invalid property ${name}: ${value}`))
|
||
|
return false
|
||
|
}
|
||
|
}
|
||
|
return length
|
||
|
}
|
||
|
if (properties) {
|
||
|
for (const propName in properties) {
|
||
|
let propLength = 0
|
||
|
let propValueLength = 0
|
||
|
const propValue = properties[propName]
|
||
|
if (Array.isArray(propValue)) {
|
||
|
for (let valueIndex = 0; valueIndex < propValue.length; valueIndex++) {
|
||
|
propValueLength = getLengthProperty(propName, propValue[valueIndex])
|
||
|
if (!propValueLength) { return false }
|
||
|
propLength += propValueLength
|
||
|
}
|
||
|
} else {
|
||
|
propValueLength = getLengthProperty(propName, propValue)
|
||
|
if (!propValueLength) { return false }
|
||
|
propLength = propValueLength
|
||
|
}
|
||
|
if (!propLength) return false
|
||
|
propertiesLength += propLength
|
||
|
}
|
||
|
}
|
||
|
const propertiesLengthLength = Buffer.byteLength(genBufVariableByteInt(propertiesLength))
|
||
|
|
||
|
return {
|
||
|
length: propertiesLengthLength + propertiesLength,
|
||
|
write () {
|
||
|
writeProperties(stream, properties, propertiesLength)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function getPropertiesByMaximumPacketSize (stream, properties, opts, length) {
|
||
|
const mayEmptyProps = ['reasonString', 'userProperties']
|
||
|
const maximumPacketSize = opts && opts.properties && opts.properties.maximumPacketSize ? opts.properties.maximumPacketSize : 0
|
||
|
|
||
|
let propertiesData = getProperties(stream, properties)
|
||
|
if (maximumPacketSize) {
|
||
|
while (length + propertiesData.length > maximumPacketSize) {
|
||
|
const currentMayEmptyProp = mayEmptyProps.shift()
|
||
|
if (currentMayEmptyProp && properties[currentMayEmptyProp]) {
|
||
|
delete properties[currentMayEmptyProp]
|
||
|
propertiesData = getProperties(stream, properties)
|
||
|
} else {
|
||
|
return false
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return propertiesData
|
||
|
}
|
||
|
|
||
|
function writeProperty (stream, propName, value) {
|
||
|
const type = protocol.propertiesTypes[propName]
|
||
|
switch (type) {
|
||
|
case 'byte': {
|
||
|
stream.write(Buffer.from([protocol.properties[propName]]))
|
||
|
stream.write(Buffer.from([+value]))
|
||
|
break
|
||
|
}
|
||
|
case 'int8': {
|
||
|
stream.write(Buffer.from([protocol.properties[propName]]))
|
||
|
stream.write(Buffer.from([value]))
|
||
|
break
|
||
|
}
|
||
|
case 'binary': {
|
||
|
stream.write(Buffer.from([protocol.properties[propName]]))
|
||
|
writeStringOrBuffer(stream, value)
|
||
|
break
|
||
|
}
|
||
|
case 'int16': {
|
||
|
stream.write(Buffer.from([protocol.properties[propName]]))
|
||
|
writeNumber(stream, value)
|
||
|
break
|
||
|
}
|
||
|
case 'int32': {
|
||
|
stream.write(Buffer.from([protocol.properties[propName]]))
|
||
|
write4ByteNumber(stream, value)
|
||
|
break
|
||
|
}
|
||
|
case 'var': {
|
||
|
stream.write(Buffer.from([protocol.properties[propName]]))
|
||
|
writeVarByteInt(stream, value)
|
||
|
break
|
||
|
}
|
||
|
case 'string': {
|
||
|
stream.write(Buffer.from([protocol.properties[propName]]))
|
||
|
writeString(stream, value)
|
||
|
break
|
||
|
}
|
||
|
case 'pair': {
|
||
|
Object.getOwnPropertyNames(value).forEach(name => {
|
||
|
const currentValue = value[name]
|
||
|
if (Array.isArray(currentValue)) {
|
||
|
currentValue.forEach(value => {
|
||
|
stream.write(Buffer.from([protocol.properties[propName]]))
|
||
|
writeStringPair(stream, name.toString(), value.toString())
|
||
|
})
|
||
|
} else {
|
||
|
stream.write(Buffer.from([protocol.properties[propName]]))
|
||
|
writeStringPair(stream, name.toString(), currentValue.toString())
|
||
|
}
|
||
|
})
|
||
|
break
|
||
|
}
|
||
|
default: {
|
||
|
stream.destroy(new Error(`Invalid property ${propName} value: ${value}`))
|
||
|
return false
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function writeProperties (stream, properties, propertiesLength) {
|
||
|
/* write properties to stream */
|
||
|
writeVarByteInt(stream, propertiesLength)
|
||
|
for (const propName in properties) {
|
||
|
if (Object.prototype.hasOwnProperty.call(properties, propName) && properties[propName] !== null) {
|
||
|
const value = properties[propName]
|
||
|
if (Array.isArray(value)) {
|
||
|
for (let valueIndex = 0; valueIndex < value.length; valueIndex++) {
|
||
|
writeProperty(stream, propName, value[valueIndex])
|
||
|
}
|
||
|
} else {
|
||
|
writeProperty(stream, propName, value)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function byteLength (bufOrString) {
|
||
|
if (!bufOrString) return 0
|
||
|
else if (bufOrString instanceof Buffer) return bufOrString.length
|
||
|
else return Buffer.byteLength(bufOrString)
|
||
|
}
|
||
|
|
||
|
function isStringOrBuffer (field) {
|
||
|
return typeof field === 'string' || field instanceof Buffer
|
||
|
}
|
||
|
|
||
|
module.exports = generate
|