通过呼叫
token.waitForCompletion(2500)
您正在尝试先同步连接,然后同步订阅,这会阻塞主线程。
mCallback
mConnectionCallback
(见下文)。连接成功后,使用异步订阅回调(
mSubscribeCallback
private final MqttCallbackExtended mCallback = new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String brokerAddress) {
mqttClient.subscribe(connectionData.getTOPIC(), Integer.parseInt(connectionData.getQOS()), null, mSubscribeCallback);
}
@Override
public void connectionLost(Throwable ex) {
}
@Override
public void deliveryComplete(IMqttDeliveryToken deliveryToken) {
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
}
};
private final IMqttActionListener mConnectionCallback = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
// do nothing, this case is handled in mCallback.connectComplete()
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
}
};
private final IMqttActionListener mSubscribeCallback = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken subscribeToken) {
_hashMap.put(connectionData.getCONNECTION_NAME(), true);
}
@Override
public void onFailure(IMqttToken subscribeToken, Throwable ex) {
}
};
try {
MqttConnectOptions connectOptions = new MqttConnectOptions();
connectOptions.setCleanSession(true);
connectOptions.setAutomaticReconnect(false);
connectOptions.setUserName("username");
connectOptions.setPassword("password".toCharArray());
mqttClient = new MqttAsyncClient("tcp://" + connectionData.getSERVER_IP() + ":" + connectionData.getSERVER_PORT(), deviceId + i , new MemoryPersistence());
mqttClient.setCallback(mCallback);
mqttClient.connect(connectOptions, null, mConnectionCallback);
} catch (Exception ex) {
Log.d(TAG, ex.toString() + connectionData.toString());
}
LocalBroadcastManager
在SQLite、MQTT和活动之间通信。
更新:
使用LiveData和Room代替LocalBroadcastManager