"use strict"; var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) { if (k2 === undefined) k2 = k; var desc = Object.getOwnPropertyDescriptor(m, k); if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) { desc = { enumerable: true, get: function() { return m[k]; } }; } Object.defineProperty(o, k2, desc); }) : (function(o, m, k, k2) { if (k2 === undefined) k2 = k; o[k2] = m[k]; })); var __setModuleDefault = (this && this.__setModuleDefault) || (Object.create ? (function(o, v) { Object.defineProperty(o, "default", { enumerable: true, value: v }); }) : function(o, v) { o["default"] = v; }); var __importStar = (this && this.__importStar) || function (mod) { if (mod && mod.__esModule) return mod; var result = {}; if (mod != null) for (var k in mod) if (k !== "default" && Object.prototype.hasOwnProperty.call(mod, k)) __createBinding(result, mod, k); __setModuleDefault(result, mod); return result; }; var __importDefault = (this && this.__importDefault) || function (mod) { return (mod && mod.__esModule) ? mod : { "default": mod }; }; Object.defineProperty(exports, "__esModule", { value: true }); const topic_alias_recv_1 = __importDefault(require("./topic-alias-recv")); const mqtt_packet_1 = __importDefault(require("mqtt-packet")); const default_message_id_provider_1 = __importDefault(require("./default-message-id-provider")); const readable_stream_1 = require("readable-stream"); const default_1 = __importDefault(require("rfdc/default")); const validations = __importStar(require("./validations")); const debug_1 = __importDefault(require("debug")); const store_1 = __importDefault(require("./store")); const handlers_1 = __importDefault(require("./handlers")); const shared_1 = require("./shared"); const TypedEmitter_1 = require("./TypedEmitter"); const PingTimer_1 = __importDefault(require("./PingTimer")); const is_browser_1 = __importStar(require("./is-browser")); const setImmediate = globalThis.setImmediate || ((...args) => { const callback = args.shift(); (0, shared_1.nextTick)(() => { callback(...args); }); }); const defaultConnectOptions = { keepalive: 60, reschedulePings: true, protocolId: 'MQTT', protocolVersion: 4, reconnectPeriod: 1000, connectTimeout: 30 * 1000, clean: true, resubscribe: true, writeCache: true, }; class MqttClient extends TypedEmitter_1.TypedEventEmitter { static defaultId() { return `mqttjs_${Math.random().toString(16).substr(2, 8)}`; } constructor(streamBuilder, options) { super(); this.options = options || {}; for (const k in defaultConnectOptions) { if (typeof this.options[k] === 'undefined') { this.options[k] = defaultConnectOptions[k]; } else { this.options[k] = options[k]; } } this.log = this.options.log || (0, debug_1.default)('mqttjs:client'); this.noop = this._noop.bind(this); this.log('MqttClient :: version:', process.env.npm_package_version); if (is_browser_1.isWebWorker) { this.log('MqttClient :: environment', 'webworker'); } else { this.log('MqttClient :: environment', is_browser_1.default ? 'browser' : 'node'); } this.log('MqttClient :: options.protocol', options.protocol); this.log('MqttClient :: options.protocolVersion', options.protocolVersion); this.log('MqttClient :: options.username', options.username); this.log('MqttClient :: options.keepalive', options.keepalive); this.log('MqttClient :: options.reconnectPeriod', options.reconnectPeriod); this.log('MqttClient :: options.rejectUnauthorized', options.rejectUnauthorized); this.log('MqttClient :: options.properties.topicAliasMaximum', options.properties ? options.properties.topicAliasMaximum : undefined); this.options.clientId = typeof options.clientId === 'string' ? options.clientId : MqttClient.defaultId(); this.log('MqttClient :: clientId', this.options.clientId); this.options.customHandleAcks = options.protocolVersion === 5 && options.customHandleAcks ? options.customHandleAcks : (...args) => { args[3](null, 0); }; if (!this.options.writeCache) { mqtt_packet_1.default.writeToStream.cacheNumbers = false; } this.streamBuilder = streamBuilder; this.messageIdProvider = typeof this.options.messageIdProvider === 'undefined' ? new default_message_id_provider_1.default() : this.options.messageIdProvider; this.outgoingStore = options.outgoingStore || new store_1.default(); this.incomingStore = options.incomingStore || new store_1.default(); this.queueQoSZero = options.queueQoSZero === undefined ? true : options.queueQoSZero; this._resubscribeTopics = {}; this.messageIdToTopic = {}; this.pingTimer = null; this.connected = false; this.disconnecting = false; this.reconnecting = false; this.queue = []; this.connackTimer = null; this.reconnectTimer = null; this._storeProcessing = false; this._packetIdsDuringStoreProcessing = {}; this._storeProcessingQueue = []; this.outgoing = {}; this._firstConnection = true; if (options.properties && options.properties.topicAliasMaximum > 0) { if (options.properties.topicAliasMaximum > 0xffff) { this.log('MqttClient :: options.properties.topicAliasMaximum is out of range'); } else { this.topicAliasRecv = new topic_alias_recv_1.default(options.properties.topicAliasMaximum); } } this.on('connect', () => { const { queue } = this; const deliver = () => { const entry = queue.shift(); this.log('deliver :: entry %o', entry); let packet = null; if (!entry) { this._resubscribe(); return; } packet = entry.packet; this.log('deliver :: call _sendPacket for %o', packet); let send = true; if (packet.messageId && packet.messageId !== 0) { if (!this.messageIdProvider.register(packet.messageId)) { send = false; } } if (send) { this._sendPacket(packet, (err) => { if (entry.cb) { entry.cb(err); } deliver(); }); } else { this.log('messageId: %d has already used. The message is skipped and removed.', packet.messageId); deliver(); } }; this.log('connect :: sending queued packets'); deliver(); }); this.on('close', () => { this.log('close :: connected set to `false`'); this.connected = false; this.log('close :: clearing connackTimer'); clearTimeout(this.connackTimer); this.log('close :: clearing ping timer'); if (this.pingTimer) { this.pingTimer.clear(); this.pingTimer = null; } if (this.topicAliasRecv) { this.topicAliasRecv.clear(); } this.log('close :: calling _setupReconnect'); this._setupReconnect(); }); if (!this.options.manualConnect) { this.log('MqttClient :: setting up stream'); this.connect(); } } handleAuth(packet, callback) { callback(); } handleMessage(packet, callback) { callback(); } _nextId() { return this.messageIdProvider.allocate(); } getLastMessageId() { return this.messageIdProvider.getLastAllocated(); } connect() { var _a; const writable = new readable_stream_1.Writable(); const parser = mqtt_packet_1.default.parser(this.options); let completeParse = null; const packets = []; this.log('connect :: calling method to clear reconnect'); this._clearReconnect(); this.log('connect :: using streamBuilder provided to client to create stream'); this.stream = this.streamBuilder(this); parser.on('packet', (packet) => { this.log('parser :: on packet push to packets array.'); packets.push(packet); }); const work = () => { this.log('work :: getting next packet in queue'); const packet = packets.shift(); if (packet) { this.log('work :: packet pulled from queue'); (0, handlers_1.default)(this, packet, nextTickWork); } else { this.log('work :: no packets in queue'); const done = completeParse; completeParse = null; this.log('work :: done flag is %s', !!done); if (done) done(); } }; const nextTickWork = () => { if (packets.length) { (0, shared_1.nextTick)(work); } else { const done = completeParse; completeParse = null; done(); } }; writable._write = (buf, enc, done) => { completeParse = done; this.log('writable stream :: parsing buffer'); parser.parse(buf); work(); }; const streamErrorHandler = (error) => { this.log('streamErrorHandler :: error', error.message); if (error.code) { this.log('streamErrorHandler :: emitting error'); this.emit('error', error); } else { this.noop(error); } }; this.log('connect :: pipe stream to writable stream'); this.stream.pipe(writable); this.stream.on('error', streamErrorHandler); this.stream.on('close', () => { this.log('(%s)stream :: on close', this.options.clientId); this._flushVolatile(); this.log('stream: emit close to MqttClient'); this.emit('close'); }); this.log('connect: sending packet `connect`'); const connectPacket = { cmd: 'connect', protocolId: this.options.protocolId, protocolVersion: this.options.protocolVersion, clean: this.options.clean, clientId: this.options.clientId, keepalive: this.options.keepalive, username: this.options.username, password: this.options.password, properties: this.options.properties, }; if (this.options.will) { connectPacket.will = Object.assign(Object.assign({}, this.options.will), { payload: (_a = this.options.will) === null || _a === void 0 ? void 0 : _a.payload }); } if (this.topicAliasRecv) { if (!connectPacket.properties) { connectPacket.properties = {}; } if (this.topicAliasRecv) { connectPacket.properties.topicAliasMaximum = this.topicAliasRecv.max; } } this._writePacket(connectPacket); parser.on('error', this.emit.bind(this, 'error')); if (this.options.properties) { if (!this.options.properties.authenticationMethod && this.options.properties.authenticationData) { this.end(() => this.emit('error', new Error('Packet has no Authentication Method'))); return this; } if (this.options.properties.authenticationMethod && this.options.authPacket && typeof this.options.authPacket === 'object') { const authPacket = Object.assign({ cmd: 'auth', reasonCode: 0 }, this.options.authPacket); this._writePacket(authPacket); } } this.stream.setMaxListeners(1000); clearTimeout(this.connackTimer); this.connackTimer = setTimeout(() => { this.log('!!connectTimeout hit!! Calling _cleanUp with force `true`'); this.emit('error', new Error('connack timeout')); this._cleanUp(true); }, this.options.connectTimeout); return this; } publish(topic, message, opts, callback) { this.log('publish :: message `%s` to topic `%s`', message, topic); const { options } = this; if (typeof opts === 'function') { callback = opts; opts = null; } opts = opts || {}; const defaultOpts = { qos: 0, retain: false, dup: false, }; opts = Object.assign(Object.assign({}, defaultOpts), opts); const { qos, retain, dup, properties, cbStorePut } = opts; if (this._checkDisconnecting(callback)) { return this; } const publishProc = () => { let messageId = 0; if (qos === 1 || qos === 2) { messageId = this._nextId(); if (messageId === null) { this.log('No messageId left'); return false; } } const packet = { cmd: 'publish', topic, payload: message, qos, retain, messageId, dup, }; if (options.protocolVersion === 5) { packet.properties = properties; } this.log('publish :: qos', qos); switch (qos) { case 1: case 2: this.outgoing[packet.messageId] = { volatile: false, cb: callback || this.noop, }; this.log('MqttClient:publish: packet cmd: %s', packet.cmd); this._sendPacket(packet, undefined, cbStorePut); break; default: this.log('MqttClient:publish: packet cmd: %s', packet.cmd); this._sendPacket(packet, callback, cbStorePut); break; } return true; }; if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !publishProc()) { this._storeProcessingQueue.push({ invoke: publishProc, cbStorePut: opts.cbStorePut, callback, }); } return this; } publishAsync(topic, message, opts) { return new Promise((resolve, reject) => { this.publish(topic, message, opts, (err, packet) => { if (err) { reject(err); } else { resolve(packet); } }); }); } subscribe(topicObject, opts, callback) { const version = this.options.protocolVersion; if (typeof opts === 'function') { callback = opts; } callback = callback || this.noop; let resubscribe = false; let topicsList = []; if (typeof topicObject === 'string') { topicObject = [topicObject]; topicsList = topicObject; } else if (Array.isArray(topicObject)) { topicsList = topicObject; } else if (typeof topicObject === 'object') { resubscribe = topicObject.resubscribe; delete topicObject.resubscribe; topicsList = Object.keys(topicObject); } const invalidTopic = validations.validateTopics(topicsList); if (invalidTopic !== null) { setImmediate(callback, new Error(`Invalid topic ${invalidTopic}`)); return this; } if (this._checkDisconnecting(callback)) { this.log('subscribe: discconecting true'); return this; } const defaultOpts = { qos: 0, }; if (version === 5) { defaultOpts.nl = false; defaultOpts.rap = false; defaultOpts.rh = 0; } opts = Object.assign(Object.assign({}, defaultOpts), opts); const properties = opts.properties; const subs = []; const parseSub = (topic, subOptions) => { subOptions = (subOptions || opts); if (!Object.prototype.hasOwnProperty.call(this._resubscribeTopics, topic) || this._resubscribeTopics[topic].qos < subOptions.qos || resubscribe) { const currentOpts = { topic, qos: subOptions.qos, }; if (version === 5) { currentOpts.nl = subOptions.nl; currentOpts.rap = subOptions.rap; currentOpts.rh = subOptions.rh; currentOpts.properties = properties; } this.log('subscribe: pushing topic `%s` and qos `%s` to subs list', currentOpts.topic, currentOpts.qos); subs.push(currentOpts); } }; if (Array.isArray(topicObject)) { topicObject.forEach((topic) => { this.log('subscribe: array topic %s', topic); parseSub(topic); }); } else { Object.keys(topicObject).forEach((topic) => { this.log('subscribe: object topic %s, %o', topic, topicObject[topic]); parseSub(topic, topicObject[topic]); }); } if (!subs.length) { callback(null, []); return this; } const subscribeProc = () => { const messageId = this._nextId(); if (messageId === null) { this.log('No messageId left'); return false; } const packet = { cmd: 'subscribe', subscriptions: subs, messageId, }; if (properties) { packet.properties = properties; } if (this.options.resubscribe) { this.log('subscribe :: resubscribe true'); const topics = []; subs.forEach((sub) => { if (this.options.reconnectPeriod > 0) { const topic = { qos: sub.qos }; if (version === 5) { topic.nl = sub.nl || false; topic.rap = sub.rap || false; topic.rh = sub.rh || 0; topic.properties = sub.properties; } this._resubscribeTopics[sub.topic] = topic; topics.push(sub.topic); } }); this.messageIdToTopic[packet.messageId] = topics; } this.outgoing[packet.messageId] = { volatile: true, cb(err, packet2) { if (!err) { const { granted } = packet2; for (let i = 0; i < granted.length; i += 1) { subs[i].qos = granted[i]; } } callback(err, subs); }, }; this.log('subscribe :: call _sendPacket'); this._sendPacket(packet); return true; }; if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !subscribeProc()) { this._storeProcessingQueue.push({ invoke: subscribeProc, callback, }); } return this; } subscribeAsync(topicObject, opts) { return new Promise((resolve, reject) => { this.subscribe(topicObject, opts, (err, granted) => { if (err) { reject(err); } else { resolve(granted); } }); }); } unsubscribe(topic, opts, callback) { if (typeof topic === 'string') { topic = [topic]; } if (typeof opts === 'function') { callback = opts; } callback = callback || this.noop; const invalidTopic = validations.validateTopics(topic); if (invalidTopic !== null) { setImmediate(callback, new Error(`Invalid topic ${invalidTopic}`)); return this; } if (this._checkDisconnecting(callback)) { return this; } const unsubscribeProc = () => { const messageId = this._nextId(); if (messageId === null) { this.log('No messageId left'); return false; } const packet = { cmd: 'unsubscribe', messageId, unsubscriptions: [], }; if (typeof topic === 'string') { packet.unsubscriptions = [topic]; } else if (Array.isArray(topic)) { packet.unsubscriptions = topic; } if (this.options.resubscribe) { packet.unsubscriptions.forEach((topic2) => { delete this._resubscribeTopics[topic2]; }); } if (typeof opts === 'object' && opts.properties) { packet.properties = opts.properties; } this.outgoing[packet.messageId] = { volatile: true, cb: callback, }; this.log('unsubscribe: call _sendPacket'); this._sendPacket(packet); return true; }; if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !unsubscribeProc()) { this._storeProcessingQueue.push({ invoke: unsubscribeProc, callback, }); } return this; } unsubscribeAsync(topic, opts) { return new Promise((resolve, reject) => { this.unsubscribe(topic, opts, (err, packet) => { if (err) { reject(err); } else { resolve(packet); } }); }); } end(force, opts, cb) { this.log('end :: (%s)', this.options.clientId); if (force == null || typeof force !== 'boolean') { cb = cb || opts; opts = force; force = false; } if (typeof opts !== 'object') { cb = cb || opts; opts = null; } this.log('end :: cb? %s', !!cb); if (!cb || typeof cb !== 'function') { cb = this.noop; } const closeStores = () => { this.log('end :: closeStores: closing incoming and outgoing stores'); this.disconnected = true; this.incomingStore.close((e1) => { this.outgoingStore.close((e2) => { this.log('end :: closeStores: emitting end'); this.emit('end'); if (cb) { const err = e1 || e2; this.log('end :: closeStores: invoking callback with args'); cb(err); } }); }); if (this._deferredReconnect) { this._deferredReconnect(); } }; const finish = () => { this.log('end :: (%s) :: finish :: calling _cleanUp with force %s', this.options.clientId, force); this._cleanUp(force, () => { this.log('end :: finish :: calling process.nextTick on closeStores'); (0, shared_1.nextTick)(closeStores); }, opts); }; if (this.disconnecting) { cb(); return this; } this._clearReconnect(); this.disconnecting = true; if (!force && Object.keys(this.outgoing).length > 0) { this.log('end :: (%s) :: calling finish in 10ms once outgoing is empty', this.options.clientId); this.once('outgoingEmpty', setTimeout.bind(null, finish, 10)); } else { this.log('end :: (%s) :: immediately calling finish', this.options.clientId); finish(); } return this; } endAsync(force, opts) { return new Promise((resolve, reject) => { this.end(force, opts, (err) => { if (err) { reject(err); } else { resolve(); } }); }); } removeOutgoingMessage(messageId) { if (this.outgoing[messageId]) { const { cb } = this.outgoing[messageId]; this._removeOutgoingAndStoreMessage(messageId, () => { cb(new Error('Message removed')); }); } return this; } reconnect(opts) { this.log('client reconnect'); const f = () => { if (opts) { this.options.incomingStore = opts.incomingStore; this.options.outgoingStore = opts.outgoingStore; } else { this.options.incomingStore = null; this.options.outgoingStore = null; } this.incomingStore = this.options.incomingStore || new store_1.default(); this.outgoingStore = this.options.outgoingStore || new store_1.default(); this.disconnecting = false; this.disconnected = false; this._deferredReconnect = null; this._reconnect(); }; if (this.disconnecting && !this.disconnected) { this._deferredReconnect = f; } else { f(); } return this; } _flushVolatile() { if (this.outgoing) { this.log('_flushVolatile :: deleting volatile messages from the queue and setting their callbacks as error function'); Object.keys(this.outgoing).forEach((messageId) => { if (this.outgoing[messageId].volatile && typeof this.outgoing[messageId].cb === 'function') { this.outgoing[messageId].cb(new Error('Connection closed')); delete this.outgoing[messageId]; } }); } } _flush() { if (this.outgoing) { this.log('_flush: queue exists? %b', !!this.outgoing); Object.keys(this.outgoing).forEach((messageId) => { if (typeof this.outgoing[messageId].cb === 'function') { this.outgoing[messageId].cb(new Error('Connection closed')); delete this.outgoing[messageId]; } }); } } _removeTopicAliasAndRecoverTopicName(packet) { let alias; if (packet.properties) { alias = packet.properties.topicAlias; } let topic = packet.topic.toString(); this.log('_removeTopicAliasAndRecoverTopicName :: alias %d, topic %o', alias, topic); if (topic.length === 0) { if (typeof alias === 'undefined') { return new Error('Unregistered Topic Alias'); } topic = this.topicAliasSend.getTopicByAlias(alias); if (typeof topic === 'undefined') { return new Error('Unregistered Topic Alias'); } packet.topic = topic; } if (alias) { delete packet.properties.topicAlias; } } _checkDisconnecting(callback) { if (this.disconnecting) { if (callback && callback !== this.noop) { callback(new Error('client disconnecting')); } else { this.emit('error', new Error('client disconnecting')); } } return this.disconnecting; } _reconnect() { this.log('_reconnect: emitting reconnect to client'); this.emit('reconnect'); if (this.connected) { this.end(() => { this.connect(); }); this.log('client already connected. disconnecting first.'); } else { this.log('_reconnect: calling connect'); this.connect(); } } _setupReconnect() { if (!this.disconnecting && !this.reconnectTimer && this.options.reconnectPeriod > 0) { if (!this.reconnecting) { this.log('_setupReconnect :: emit `offline` state'); this.emit('offline'); this.log('_setupReconnect :: set `reconnecting` to `true`'); this.reconnecting = true; } this.log('_setupReconnect :: setting reconnectTimer for %d ms', this.options.reconnectPeriod); this.reconnectTimer = setInterval(() => { this.log('reconnectTimer :: reconnect triggered!'); this._reconnect(); }, this.options.reconnectPeriod); } else { this.log('_setupReconnect :: doing nothing...'); } } _clearReconnect() { this.log('_clearReconnect : clearing reconnect timer'); if (this.reconnectTimer) { clearInterval(this.reconnectTimer); this.reconnectTimer = null; } } _cleanUp(forced, done, opts = {}) { if (done) { this.log('_cleanUp :: done callback provided for on stream close'); this.stream.on('close', done); } this.log('_cleanUp :: forced? %s', forced); if (forced) { if (this.options.reconnectPeriod === 0 && this.options.clean) { this._flush(); } this.log('_cleanUp :: (%s) :: destroying stream', this.options.clientId); this.stream.destroy(); } else { const packet = Object.assign({ cmd: 'disconnect' }, opts); this.log('_cleanUp :: (%s) :: call _sendPacket with disconnect packet', this.options.clientId); this._sendPacket(packet, () => { this.log('_cleanUp :: (%s) :: destroying stream', this.options.clientId); setImmediate(() => { this.stream.end(() => { this.log('_cleanUp :: (%s) :: stream destroyed', this.options.clientId); }); }); }); } if (!this.disconnecting && !this.reconnecting) { this.log('_cleanUp :: client not disconnecting/reconnecting. Clearing and resetting reconnect.'); this._clearReconnect(); this._setupReconnect(); } if (this.pingTimer) { this.log('_cleanUp :: clearing pingTimer'); this.pingTimer.clear(); this.pingTimer = null; } if (done && !this.connected) { this.log('_cleanUp :: (%s) :: removing stream `done` callback `close` listener', this.options.clientId); this.stream.removeListener('close', done); done(); } } _storeAndSend(packet, cb, cbStorePut) { this.log('storeAndSend :: store packet with cmd %s to outgoingStore', packet.cmd); let storePacket = packet; let err; if (storePacket.cmd === 'publish') { storePacket = (0, default_1.default)(packet); err = this._removeTopicAliasAndRecoverTopicName(storePacket); if (err) { return cb && cb(err); } } this.outgoingStore.put(storePacket, (err2) => { if (err2) { return cb && cb(err2); } cbStorePut(); this._writePacket(packet, cb); }); } _applyTopicAlias(packet) { if (this.options.protocolVersion === 5) { if (packet.cmd === 'publish') { let alias; if (packet.properties) { alias = packet.properties.topicAlias; } const topic = packet.topic.toString(); if (this.topicAliasSend) { if (alias) { if (topic.length !== 0) { this.log('applyTopicAlias :: register topic: %s - alias: %d', topic, alias); if (!this.topicAliasSend.put(topic, alias)) { this.log('applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias); return new Error('Sending Topic Alias out of range'); } } } else if (topic.length !== 0) { if (this.options.autoAssignTopicAlias) { alias = this.topicAliasSend.getAliasByTopic(topic); if (alias) { packet.topic = ''; packet.properties = Object.assign(Object.assign({}, packet.properties), { topicAlias: alias }); this.log('applyTopicAlias :: auto assign(use) topic: %s - alias: %d', topic, alias); } else { alias = this.topicAliasSend.getLruAlias(); this.topicAliasSend.put(topic, alias); packet.properties = Object.assign(Object.assign({}, packet.properties), { topicAlias: alias }); this.log('applyTopicAlias :: auto assign topic: %s - alias: %d', topic, alias); } } else if (this.options.autoUseTopicAlias) { alias = this.topicAliasSend.getAliasByTopic(topic); if (alias) { packet.topic = ''; packet.properties = Object.assign(Object.assign({}, packet.properties), { topicAlias: alias }); this.log('applyTopicAlias :: auto use topic: %s - alias: %d', topic, alias); } } } } else if (alias) { this.log('applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias); return new Error('Sending Topic Alias out of range'); } } } } _noop(err) { this.log('noop ::', err); } _writePacket(packet, cb) { this.log('_writePacket :: packet: %O', packet); this.log('_writePacket :: emitting `packetsend`'); this.emit('packetsend', packet); this._shiftPingInterval(); this.log('_writePacket :: writing to stream'); const result = mqtt_packet_1.default.writeToStream(packet, this.stream, this.options); this.log('_writePacket :: writeToStream result %s', result); if (!result && cb && cb !== this.noop) { this.log('_writePacket :: handle events on `drain` once through callback.'); this.stream.once('drain', cb); } else if (cb) { this.log('_writePacket :: invoking cb'); cb(); } } _sendPacket(packet, cb, cbStorePut, noStore) { this.log('_sendPacket :: (%s) :: start', this.options.clientId); cbStorePut = cbStorePut || this.noop; cb = cb || this.noop; const err = this._applyTopicAlias(packet); if (err) { cb(err); return; } if (!this.connected) { if (packet.cmd === 'auth') { this._writePacket(packet, cb); return; } this.log('_sendPacket :: client not connected. Storing packet offline.'); this._storePacket(packet, cb, cbStorePut); return; } if (noStore) { this._writePacket(packet, cb); return; } switch (packet.cmd) { case 'publish': break; case 'pubrel': this._storeAndSend(packet, cb, cbStorePut); return; default: this._writePacket(packet, cb); return; } switch (packet.qos) { case 2: case 1: this._storeAndSend(packet, cb, cbStorePut); break; case 0: default: this._writePacket(packet, cb); break; } this.log('_sendPacket :: (%s) :: end', this.options.clientId); } _storePacket(packet, cb, cbStorePut) { this.log('_storePacket :: packet: %o', packet); this.log('_storePacket :: cb? %s', !!cb); cbStorePut = cbStorePut || this.noop; let storePacket = packet; if (storePacket.cmd === 'publish') { storePacket = (0, default_1.default)(packet); const err = this._removeTopicAliasAndRecoverTopicName(storePacket); if (err) { return cb && cb(err); } } const qos = storePacket.qos || 0; if ((qos === 0 && this.queueQoSZero) || storePacket.cmd !== 'publish') { this.queue.push({ packet: storePacket, cb }); } else if (qos > 0) { cb = this.outgoing[storePacket.messageId] ? this.outgoing[storePacket.messageId].cb : null; this.outgoingStore.put(storePacket, (err) => { if (err) { return cb && cb(err); } cbStorePut(); }); } else if (cb) { cb(new Error('No connection to broker')); } } _setupPingTimer() { this.log('_setupPingTimer :: keepalive %d (seconds)', this.options.keepalive); if (!this.pingTimer && this.options.keepalive) { this.pingResp = true; this.pingTimer = new PingTimer_1.default(this.options.keepalive, () => { this._checkPing(); }); } } _shiftPingInterval() { if (this.pingTimer && this.options.keepalive && this.options.reschedulePings) { this.pingTimer.reschedule(); } } _checkPing() { this.log('_checkPing :: checking ping...'); if (this.pingResp) { this.log('_checkPing :: ping response received. Clearing flag and sending `pingreq`'); this.pingResp = false; this._sendPacket({ cmd: 'pingreq' }); } else { this.emit('error', new Error('Keepalive timeout')); this.log('_checkPing :: calling _cleanUp with force true'); this._cleanUp(true); } } _resubscribe() { this.log('_resubscribe'); const _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics); if (!this._firstConnection && (this.options.clean || (this.options.protocolVersion >= 4 && !this.connackPacket.sessionPresent)) && _resubscribeTopicsKeys.length > 0) { if (this.options.resubscribe) { if (this.options.protocolVersion === 5) { this.log('_resubscribe: protocolVersion 5'); for (let topicI = 0; topicI < _resubscribeTopicsKeys.length; topicI++) { const resubscribeTopic = {}; resubscribeTopic[_resubscribeTopicsKeys[topicI]] = this._resubscribeTopics[_resubscribeTopicsKeys[topicI]]; resubscribeTopic.resubscribe = true; this.subscribe(resubscribeTopic, { properties: resubscribeTopic[_resubscribeTopicsKeys[topicI]] .properties, }); } } else { this._resubscribeTopics.resubscribe = true; this.subscribe(this._resubscribeTopics); } } else { this._resubscribeTopics = {}; } } this._firstConnection = false; } _onConnect(packet) { if (this.disconnected) { this.emit('connect', packet); return; } this.connackPacket = packet; this.messageIdProvider.clear(); this._setupPingTimer(); this.connected = true; const startStreamProcess = () => { let outStore = this.outgoingStore.createStream(); const remove = () => { outStore.destroy(); outStore = null; this._flushStoreProcessingQueue(); clearStoreProcessing(); }; const clearStoreProcessing = () => { this._storeProcessing = false; this._packetIdsDuringStoreProcessing = {}; }; this.once('close', remove); outStore.on('error', (err) => { clearStoreProcessing(); this._flushStoreProcessingQueue(); this.removeListener('close', remove); this.emit('error', err); }); const storeDeliver = () => { if (!outStore) { return; } const packet2 = outStore.read(1); let cb; if (!packet2) { outStore.once('readable', storeDeliver); return; } this._storeProcessing = true; if (this._packetIdsDuringStoreProcessing[packet2.messageId]) { storeDeliver(); return; } if (!this.disconnecting && !this.reconnectTimer) { cb = this.outgoing[packet2.messageId] ? this.outgoing[packet2.messageId].cb : null; this.outgoing[packet2.messageId] = { volatile: false, cb(err, status) { if (cb) { cb(err, status); } storeDeliver(); }, }; this._packetIdsDuringStoreProcessing[packet2.messageId] = true; if (this.messageIdProvider.register(packet2.messageId)) { this._sendPacket(packet2, undefined, undefined, true); } else { this.log('messageId: %d has already used.', packet2.messageId); } } else if (outStore.destroy) { outStore.destroy(); } }; outStore.on('end', () => { let allProcessed = true; for (const id in this._packetIdsDuringStoreProcessing) { if (!this._packetIdsDuringStoreProcessing[id]) { allProcessed = false; break; } } this.removeListener('close', remove); if (allProcessed) { clearStoreProcessing(); this._invokeAllStoreProcessingQueue(); this.emit('connect', packet); } else { startStreamProcess(); } }); storeDeliver(); }; startStreamProcess(); } _invokeStoreProcessingQueue() { if (!this._storeProcessing && this._storeProcessingQueue.length > 0) { const f = this._storeProcessingQueue[0]; if (f && f.invoke()) { this._storeProcessingQueue.shift(); return true; } } return false; } _invokeAllStoreProcessingQueue() { while (this._invokeStoreProcessingQueue()) { } } _flushStoreProcessingQueue() { for (const f of this._storeProcessingQueue) { if (f.cbStorePut) f.cbStorePut(new Error('Connection closed')); if (f.callback) f.callback(new Error('Connection closed')); } this._storeProcessingQueue.splice(0); } _removeOutgoingAndStoreMessage(messageId, cb) { delete this.outgoing[messageId]; this.outgoingStore.del({ messageId }, (err, packet) => { cb(err, packet); this.messageIdProvider.deallocate(messageId); this._invokeStoreProcessingQueue(); }); } } exports.default = MqttClient; //# sourceMappingURL=client.js.map