You are reading a single comment by @Stephen- and its replies. Click here to read the full conversation.
  • Hi Gordon,

    It seems that the clinet.on('data') is sending more than one message in the data item.

    Here is a fix I have hacked for MQTT.js, couldn't find it on github for a pull request.

    MQTT.prototype.connect = function(client) {
        var mqo = this;
        var onConnect = function() {
            console.log('Client connected');
            client.write(mqo.mqttConnect(mqo.client_id));
            mqo.partData = '';
    
            // Disconnect if no CONNACK is received
            mqo.ctimo = setTimeout(function() {
                mqo.ctimo = undefined;
                mqo.disconnect();
            }, mqo.C.CONNECT_TIMEOUT);
    
            // Set up regular keep_alive ping
            mqo.pintr = setInterval(function() {
                // console.log("Pinging MQTT server");
                mqo.ping();
            }, mqo.ping_interval*1000);
    
            // Incoming data
            client.on('data', function(data) {
    
                if (mqo.partData) {
                    //console.log('** Adding PART DATA **');
                    data = mqo.partData + data;
                    mqo.partData = '';
                }
    
                var type = data.charCodeAt(0) >> 4;
                var rLen = data.charCodeAt(1);
                var pLen = rLen + 2;
    
                if (data.length < pLen) {
                    mqo.partData = data;
                    return;
                }
                var pData = data.substr(0,pLen);
    
                if(type === TYPE.PUBLISH) {
                    var parsedData = parsePublish(pData);
                    if (parsedData!==undefined) {
                        mqo.emit('publish', parsedData);
                        mqo.emit('message', parsedData.topic, parsedData.message);
                    }
                }
                else if(type === TYPE.PUBACK) {
                    // implement puback
                }
                else if(type === TYPE.SUBACK) {
                    // implement suback
                }
                else if(type === TYPE.UNSUBACK) {
                    // implement unsuback
                }
                else if(type === TYPE.PINGREQ) {
                    // silently reply to pings
                    client.write(TYPE.PINGRESP+"\x00"); // reply to PINGREQ
                }
                else if(type === TYPE.PINGRESP) {
                    mqo.emit('ping_reply');
                }
                else if(type === TYPE.CONNACK) {
                    if (mqo.ctimo) clearTimeout(mqo.ctimo);
                    mqo.ctimo = undefined;
                    var returnCode = pData.charCodeAt(3);
                    if(returnCode === RETURN_CODES.ACCEPTED) {
                        mqo.connected = true;
                        console.log("MQTT connection accepted");
                        mqo.emit('connected');
                        mqo.emit('connect');
                    }
                    else {
                        var mqttError = "Connection refused, ";
                        switch(returnCode) {
                            case RETURN_CODES.UNACCEPTABLE_PROTOCOL_VERSION:
                                mqttError += "unacceptable protocol version.";
                                break;
                            case RETURN_CODES.IDENTIFIER_REJECTED:
                                mqttError += "identifier rejected.";
                                break;
                            case RETURN_CODES.SERVER_UNAVAILABLE:
                                mqttError += "server unavailable.";
                                break;
                            case RETURN_CODES.BAD_USER_NAME_OR_PASSWORD:
                                mqttError += "bad user name or password.";
                                break;
                            case RETURN_CODES.NOT_AUTHORIZED:
                                mqttError += "not authorized.";
                                break;
                            default:
                                mqttError += "unknown return code: " + returnCode + ".";
                        }
                        console.log(mqttError);
                        mqo.emit('error', mqttError);
                    }
                }
                else {
                        console.log("MQTT unsupported packet type: " + type);
                         console.log("[MQTT]" + data.split("").map(function (c) {
                                 return c.charCodeAt(0);
                             }));
                }
                if (data.length > pLen) {
                    client.emit('data', data.substr(pLen));
                }
            });
    
About

Avatar for Stephen- @Stephen- started