Callbacks and synchronization in MQTT client applications

The MQTT client programming model uses threads extensively. The threads decouple an MQTT client application, as much as they can, from delays in transmitting messages to and from the server. Publications, delivery tokens, and connection lost events are delivered to the methods in a callback class that implements MqttCallback.


Callbacks

See the Eclipse Paho Web site for the latest changes to MqttCallback.

The MqttCallback interface has three callback methods:

    connectionLost(java.lang.Throwable cause)

    • connectionLost is called when a communications error leads to the connection dropping. It is also called if the server drops the connection as a result of an error on the server after the connection has been established. Server errors are logged to the queue manager error log. The server drops the connection to the client, and the client calls MqttCallback.connectionLost.
    • The only remote errors thrown as exceptions on the same thread as the client application are exceptions from MqttClient.connect. Errors detected by the server after the connection is established are reported back to the MqttCallback.connectionLost callback method as throwables.
    • Typical server errors that result in connectionLost are authorization errors. For example, the telemetry server tries to publish on a topic on behalf of a client that is not authorized to publish on the topic. Anything that results in a MQCC_FAIL condition code being returned to the telemetry server can result in the connection being dropped.

    deliveryComplete(MqttDeliveryToken token)

    • deliveryComplete is called by the MQTT client to pass a delivery token back to the client application; see Delivery tokens. Using the delivery token, the callback can access the published message with the method token.getMessage.
    • When the application callback returns control to the MQTT client after being called by the deliveryComplete method, delivery is completed. Until delivery is completed, messages with QoS 1 or 2 are retained by the persistence class.
    • The call to deliveryComplete is a point of synchronization between the application and the persistence class. The deliveryComplete method is never called twice for the same message.
    • When the application callback returns from deliveryComplete to the MQTT client, the client calls MqttClientPersistence.remove for messages with QoS 1 or 2. MqttClientPersistence.remove deletes the locally stored copy of the published message.
    • From a transaction processing perspective, the call to deliveryComplete is a single phase transaction that commits the delivery. If processing fails during the callback, on restart of the client MqttClientPersistence.remove is called again to delete the local copy of the published message. The callback is not called again. If you are using the callback to store a log of delivered messages, we cannot synchronize the log with the MQTT client. If you want to store a log reliably, then update the log in the MqttClientPersistence class.
    • The delivery token and message are referenced by the main application thread and the MQTT client. The MQTT client dereferences the MqttMessage object when delivery is completed, and the delivery token object when the client disconnects. The MqttMessage object can be garbage collected after delivery is completed if the client application dereferences it. The delivery token can be garbage collected after the session is disconnected.
    • We can get MqttDeliveryToken and MqttMessage attributes after a message has been published. If you attempt to set any MqttMessage attributes after the message has been published the result is undefined.
    • The MQTT client continues to process delivery acknowledgments if the client reconnects to the previous session with the same ClientIdentifier ; see Clean sessions. The MQTT client application must set MqttClient.CleanSession to false for the previous session, and set it to false in the new session. The MQTT client creates new delivery tokens and message objects in the new session for pending deliveries. It recovers the objects using the MqttClientPersistence class. If the application client still has references to the old delivery tokens and messages, dereference them. The application callback is called in the new session for any deliveries initiated in the previous session and completed in this session.
    • The application callback is called after the application client connects, when a pending delivery is completed. Before the application client connects, it can retrieve pending deliveries using the MqttClient.getPendingDeliveryTokens method.
    • Notice that the client application originally created the message object that is published, and its payload byte array. The MQTT client references these objects. The message object returned by the delivery token in the method token.getMessage is not necessarily the same message object created by the client. If a new MQTT client instance re-creates the delivery token, the MqttClientPersistence class re-creates the MqttMessage object. For consistency token.getMessage returns null if token.isCompleted is true, regardless of whether the message object was created by the application client or the MqttClientPersistence class.

    messageArrived(MqttTopic topic, MqttMessage message)

    • messageArrived is called when a publication arrives for the client that matched a subscription topic. topic is the publication topic, not the subscription filter. The two can be different if the filter contains wildcards.
    • If the topic matches multiple subscriptions created by the client, the client receives multiple copies of the publication. If a client publishes to a topic that it also subscribes to, it receives a copy of its own publication.
    • If a message is sent with a QoS of 1 or 2, the message is stored by the MqttClientPersistence class before the MQTT client calls messageArrived. messageArrived behaves like deliveryComplete: it is only called once for a publication, and the local copy of the publication is removed by MqttClientPersistence.remove when messageArrived returns to the MQTT client. The MQTT client drops its references to the topic and message when messageArrived returns to the MQTT client. The topic and message objects are garbage collected, if the application client has not held onto a reference to the objects.


Callbacks, threading, and client application synchronization

The MQTT client calls a callback method on a separate thread to the main application thread. The client application does not create a thread for the callback, it is created by the MQTT client.

The MQTT client synchronizes callback methods. Only one instance of the callback method runs at a time. The synchronization makes it easy to update an object that tallies which publications have been delivered. One instance of the MqttCallback.deliveryComplete runs at a time, and so it is safe to update the tally without further synchronization. It is also the case that only one publication arrives at a time. Your code in the messageArrived method can update an object without synchronizing it. If you are referring to the tally, or the object that is being updated, in another thread, synchronize the tally or object.

The delivery token provides a synchronization mechanism between the main application thread and delivery of a publication. The method token.waitForCompletion waits until delivery of a specific publication is completed, or until an optional timeout expires. You might use token.waitForCompletion in the following way to process one publication at a time.

To synchronize with the MqttCallback.deliveryComplete method. Only when MqttCallback.deliveryComplete returns to the MQTT Client does token.waitForCompletion resume. Using this mechanism we can synchronize running code in MqttCallback.deliveryComplete before code runs in the main application thread.

What if you wanted to publish without waiting for each publication to be delivered, but want confirmation when all the publications have been delivered? If you publish on a single thread, the last publication to be sent is also the last to be delivered.


Synchronization of requests sent to the server

Table 1 describes the methods in the MQTT Java client that send a request to the server. Unless the application client sets an indefinite timeout, the client never waits indefinitely for the server. If the client hangs, it is either an application programming problem, or a defect in the MQTT client.
Table 1. Synchronization behavior of methods that result in requests to the server
Method Synchronization Timeout interval
MqttClient.Connect

Waits for a connection to be established with the server.

Defaults to 30 seconds, or as set by a parameter, then throws an exception.

MqttClient.Disconnect

Waits for the MQTT client to finish any work it must do, and for the TCP/IP session to disconnect.

MqttClient.Subscribe

Waits for completion of the Subscribe or UnSubscribe method.

MqttClient.UnSubscribe
MqttClient.Publish

Returns immediately to the application thread after passing the request to the MQTT client.

None.

MqttDeliveryToken.waitForCompletion

Waits for the delivery token to be returned.

Indefinite, or as set as a parameter.