代码之家  ›  专栏  ›  技术社区  ›  Little Fox

为什么MQTT服务阻止活动?

  •  1
  • Little Fox  · 技术社区  · 7 年前

    如果我设置 token.waitForCompletion(500); 速度快得多,但我猜不出那个值。。。

    @Override
    public void onCreate() {
    
        Datapool();
        IntentFilter intentf = new IntentFilter();
        intentf.addAction(ConnectivityManager.CONNECTIVITY_ACTION);
        registerReceiver(mqttBroadcastReceiver, new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));
        mConnMan = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);
        deviceId = String.format(DEVICE_ID_FORMAT, Settings.Secure.getString(getContentResolver(), Settings.Secure.ANDROID_ID));
    
    }
    

    MQTTBroadcastReceiver mqttBroadcastReceiver = new MQTTBroadcastReceiver();
    
    class MQTTBroadcastReceiver extends BroadcastReceiver {
        @Override
        public void onReceive(Context context, Intent intent) {
            Connect();
        }
    
    };
    

    IMqttToken token;
    int i = 0;
    private HashMap<String, Boolean> _hashMap = new HashMap<>();
    private void Connect(){
        for (ServiceDataModel connectionData : dataModels) {
    
            Log.d(TAG, "doConnect() " + connectionData.getCONNECTION_NAME());
            _hashMap.put(connectionData.getCONNECTION_NAME(), false);
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            i++;
            try {
                mqttClient = new MqttAsyncClient("tcp://" + connectionData.getSERVER_IP() + ":" + connectionData.getSERVER_PORT(), deviceId + i , new MemoryPersistence());
                token = mqttClient.connect();
                token.waitForCompletion(2500);
                if (mqttClient.isConnected()) {
                        mqttClient.setCallback(new MqttEventCallback());
                        token = mqttClient.subscribe(connectionData.getTOPIC(), Integer.parseInt(connectionData.getQOS()));
                        token.waitForCompletion(2500);
                       _hashMap.put(connectionData.getCONNECTION_NAME(), true);
                }
            }catch (Exception ex){
                Log.d(TAG, ex.toString() + connectionData.toString());
            }
        }
        sendMessageToActivity(_hashMap);
    }
    
    1 回复  |  直到 7 年前
        1
  •  1
  •   Alexander Farber    5 年前

    通过呼叫 token.waitForCompletion(2500) 您正在尝试先同步连接,然后同步订阅,这会阻塞主线程。

    mCallback mConnectionCallback (见下文)。连接成功后,使用异步订阅回调( mSubscribeCallback

    private final MqttCallbackExtended mCallback = new MqttCallbackExtended() {
        @Override
        public void connectComplete(boolean reconnect, String brokerAddress) {
                mqttClient.subscribe(connectionData.getTOPIC(), Integer.parseInt(connectionData.getQOS()), null, mSubscribeCallback);
        }
    
        @Override
        public void connectionLost(Throwable ex) {
        }
    
        @Override
        public void deliveryComplete(IMqttDeliveryToken deliveryToken) {
        }
    
        @Override
        public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        }
    };
    
    private final IMqttActionListener mConnectionCallback = new IMqttActionListener() {
        @Override
        public void onSuccess(IMqttToken asyncActionToken) {
            // do nothing, this case is handled in mCallback.connectComplete()
        }
    
        @Override
        public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
        }
    };
    
    private final IMqttActionListener mSubscribeCallback = new IMqttActionListener() {
        @Override
        public void onSuccess(IMqttToken subscribeToken) {
               _hashMap.put(connectionData.getCONNECTION_NAME(), true);
        }
    
        @Override
        public void onFailure(IMqttToken subscribeToken, Throwable ex) {
        }
    
    };
    
    try {
        MqttConnectOptions connectOptions = new MqttConnectOptions();
        connectOptions.setCleanSession(true);
        connectOptions.setAutomaticReconnect(false);
        connectOptions.setUserName("username");
        connectOptions.setPassword("password".toCharArray());
    
        mqttClient = new MqttAsyncClient("tcp://" + connectionData.getSERVER_IP() + ":" + connectionData.getSERVER_PORT(), deviceId + i , new MemoryPersistence());
        mqttClient.setCallback(mCallback);
        mqttClient.connect(connectOptions, null, mConnectionCallback);
    
    } catch (Exception ex) {
        Log.d(TAG, ex.toString() + connectionData.toString());
    }
    

    LocalBroadcastManager 在SQLite、MQTT和活动之间通信。

    更新: 使用LiveData和Room代替LocalBroadcastManager