mqtt 连接

This commit is contained in:
2025-12-02 11:41:32 +08:00
parent 6e269d89ec
commit 456a063bbf
2 changed files with 56 additions and 63 deletions

View File

@@ -161,7 +161,7 @@ public class CommonAppContext extends MultiDexApplication implements Applicatio
//设置mqtt环境 false 测试环境 true 正式环境 //设置mqtt环境 false 测试环境 true 正式环境
// ExternalResConstants.INSTANCE.setIS_MQTT_RELEASE(false); // ExternalResConstants.INSTANCE.setIS_MQTT_RELEASE(false);
//设置http环境 false 测试环境 true 正式环境 //设置http环境 false 测试环境 true 正式环境
ExternalResConstants.INSTANCE.setIS_HTTP_RELEASE(true); ExternalResConstants.INSTANCE.setIS_HTTP_RELEASE(false);
currentEnvironment = ExternalResConstants.INSTANCE.HTTP_PATH(); currentEnvironment = ExternalResConstants.INSTANCE.HTTP_PATH();
@@ -450,9 +450,9 @@ public class CommonAppContext extends MultiDexApplication implements Applicatio
// startService(mqttServiceIntent); // startService(mqttServiceIntent);
// } // }
mqttConnect=MqttConnect.getInstance(this, currentEnvironment.getMqttUrl(),"android-"+ MqttClient.generateClientId()); // mqttConnect=MqttConnect.getInstance(this, currentEnvironment.getMqttUrl(),"android-"+ MqttClient.generateClientId());
// mqttConnect=MqttConnect.getInstance(this,"tcp://1.13.101.98","android-"+ MqttClient.generateClientId()); // mqttConnect=MqttConnect.getInstance(this,"tcp://1.13.101.98","android-"+ MqttClient.generateClientId());
mqttConnect.mqttClient(); // mqttConnect.mqttClient();
// 每次启动应用时重置状态 // 每次启动应用时重置状态
SpUtil.getInstance().setBooleanValue("youth_model_shown", false); SpUtil.getInstance().setBooleanValue("youth_model_shown", false);

View File

@@ -6,6 +6,7 @@ import android.os.Looper;
import android.util.Log; import android.util.Log;
import android.widget.Toast; import android.widget.Toast;
import com.blankj.utilcode.util.ActivityUtils;
import com.blankj.utilcode.util.LogUtils; import com.blankj.utilcode.util.LogUtils;
import com.google.android.gms.common.api.Api; import com.google.android.gms.common.api.Api;
import com.hjq.toast.ToastUtils; import com.hjq.toast.ToastUtils;
@@ -33,10 +34,10 @@ public class MqttConnect {
public static String update_app = ""; public static String update_app = "";
public static String qx_hour_ranking = ""; public static String qx_hour_ranking = "";
public static String qx_redpacket_arrive="";//红包飘屏的主题 public static String qx_redpacket_arrive = "";//红包飘屏的主题
Handler handler = new Handler(Looper.getMainLooper()); Handler handler = new Handler(Looper.getMainLooper());
String[] topic; String[] topic;
int[] qos = {1,2,3,0,0,0,0,0,0,0,0,0,0}; // 消息质量 int[] qos = {1, 2, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; // 消息质量
private static MqttConnect instance; private static MqttConnect instance;
public MqttConnect(Context context, String host, String clientId) { public MqttConnect(Context context, String host, String clientId) {
@@ -49,7 +50,7 @@ public class MqttConnect {
update_app = "qx_xunlehui"; // 发送更新APP update_app = "qx_xunlehui"; // 发送更新APP
// qx_hour_ranking = "qx_hour_ranking"; // qx_hour_ranking = "qx_hour_ranking";
qx_hour_ranking = "qx_hour_ranking"; qx_hour_ranking = "qx_hour_ranking";
qx_redpacket_arrive="qx_redpacket_arrive"; qx_redpacket_arrive = "qx_redpacket_arrive";
ArrayList<String> topicList = new ArrayList<>(); ArrayList<String> topicList = new ArrayList<>();
topicList.add(shutdown); topicList.add(shutdown);
@@ -61,12 +62,13 @@ public class MqttConnect {
/** /**
* 单列模式,只能实例化一次 * 单列模式,只能实例化一次
*
* @param context * @param context
* @param host * @param host
* @param clientId * @param clientId
* @return * @return
*/ */
public static synchronized MqttConnect getInstance(Context context, String host, String clientId) { public static synchronized MqttConnect getInstance(Context context, String host, String clientId) {
if (instance == null) { if (instance == null) {
instance = new MqttConnect(context, host, clientId); instance = new MqttConnect(context, host, clientId);
} }
@@ -76,51 +78,44 @@ public class MqttConnect {
/** /**
* 客户端connect连接mqtt服务器 * 客户端connect连接mqtt服务器
**/ **/
public void mqttClient() public void mqttClient() {
{ close();
// close();
// handler.postDelayed(new Runnable() { // 使用线程或线程池
// @Override new Thread(() -> {
// public void run() { try {
try { MqttConnectOptions options = mqttConnectOptions();
// uiTip("MQTT开始连接"); mqttClient.setCallback(new MqttInitCallback(context, HOST, clientId));
MqttConnectOptions options = mqttConnectOptions(); // 在子线程连接不会阻塞UI
mqttClient.setCallback(new MqttInitCallback(context, HOST, clientId)); mqttClient.connect(options);
mqttClient.connect(options); // 订阅也在子线程
// sub(topic,qos); // sub(topic, qos);
sub(shutdown); sub(shutdown);
sub(update_app); sub(update_app);
sub(qx_hour_ranking); sub(qx_hour_ranking);
sub(qx_redpacket_arrive); sub(qx_redpacket_arrive);
// uiTip("MQTT连接成功");
}catch (MqttException e){ // UI操作回到主线程
// uiTip("MQTT连接失败,准备重连。。。:"+e.getMessage()); ActivityUtils.getTopActivity().runOnUiThread(() -> uiTip("MQTT连接成功"));
handler.postDelayed(new Runnable() {
@Override } catch (Exception e) {
public void run() { ActivityUtils.getTopActivity().runOnUiThread(() -> {
LogUtils.e(Tag,"开始重连。。。"); uiTip("MQTT连接失败,准备重连: " + e.getMessage());
mqttClient(); });
} // 延迟重连
},3000); handler.postDelayed(() -> mqttClient(), 3000);
} }
// } }).start();
// },200);
} }
/** /**
* 在主线程弹出消息 * 在主线程弹出消息
*
* @param msg * @param msg
*/ */
private void uiTip(String msg){ private void uiTip(String msg) {
LogUtils.d(Tag,msg); LogUtils.d(Tag, msg);
handler.post(new Runnable() {
@Override
public void run() {
// Toast.makeText(context.getApplicationContext(), msg, Toast.LENGTH_SHORT).show();
LogUtils.e("mqtt","连接成功");
ToastUtils.show(msg);
}
});
} }
/** /**
@@ -131,7 +126,6 @@ public class MqttConnect {
mqttClient = new MqttClient(HOST, clientId, new MemoryPersistence()); mqttClient = new MqttClient(HOST, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions(); MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("public"); options.setUserName("public");
options.setConnectionTimeout(10);
options.setCleanSession(true); options.setCleanSession(true);
options.setConnectionTimeout(10); options.setConnectionTimeout(10);
options.setKeepAliveInterval(10); options.setKeepAliveInterval(10);
@@ -143,18 +137,18 @@ public class MqttConnect {
/** /**
* 关闭MQTT连接 * 关闭MQTT连接
*/ */
public void close(){ public void close() {
if(mqttClient != null && mqttClient.isConnected()){ if (mqttClient != null && mqttClient.isConnected()) {
try { try {
mqttClient.close(); mqttClient.close();
mqttClient.disconnect(); mqttClient.disconnect();
mqttClient = null; mqttClient = null;
} catch (MqttException e) { } catch (MqttException e) {
LogUtils.e(Tag,"关闭MQTT连接报错"+e.getMessage()); LogUtils.e(Tag, "关闭MQTT连接报错" + e.getMessage());
// ToastUtils.show("关闭MQTT连接报错"); // ToastUtils.show("关闭MQTT连接报错");
} }
}else { } else {
LogUtils.d(Tag,"Mqtt已关闭"); LogUtils.d(Tag, "Mqtt已关闭");
} }
} }
@@ -173,8 +167,8 @@ public class MqttConnect {
* 向某个主题发布消息 * 向某个主题发布消息
* *
* @param topic: 发布的主题 * @param topic: 发布的主题
* @param msg: 发布的消息 * @param msg: 发布的消息
* @param qos: 消息质量 Qos0、1、2 * @param qos: 消息质量 Qos0、1、2
*/ */
public void pub(String topic, String msg, int qos) throws MqttException { public void pub(String topic, String msg, int qos) throws MqttException {
MqttMessage mqttMessage = new MqttMessage(); MqttMessage mqttMessage = new MqttMessage();
@@ -190,11 +184,11 @@ public class MqttConnect {
* *
* @param topic 主题 * @param topic 主题
*/ */
public void sub(String topic){ public void sub(String topic) {
try { try {
mqttClient.subscribe(topic); mqttClient.subscribe(topic);
} catch (MqttException e) { } catch (MqttException e) {
LogUtils.e(Tag,"MQTT主题订阅失败" + e.getMessage()); LogUtils.e(Tag, "MQTT主题订阅失败" + e.getMessage());
// uiTip("MQTT主题订阅失败"); // uiTip("MQTT主题订阅失败");
} }
} }
@@ -203,16 +197,15 @@ public class MqttConnect {
* 订阅某一个主题可携带Qos * 订阅某一个主题可携带Qos
* *
* @param topic 所要订阅的主题 * @param topic 所要订阅的主题
* @param qos * @param qos 消息质量0最多发送一次不保证消息能够到达接收端也不负责重发
* 消息质量0最多发送一次,不保证消息能够到达接收端,也不负责重发 * 1至少发送一次,确保消息能够到达接收端,但可能会导致消息重复
* 1至少发送一次确保消息能够到达接收端但可能会导致消息重复 * 2确保消息恰好被接收一次
* 2确保消息恰好被接收一次
*/ */
public void sub(String[] topic, int[] qos){ public void sub(String[] topic, int[] qos) {
try { try {
mqttClient.subscribe(topic, qos); mqttClient.subscribe(topic, qos);
}catch (MqttException e){ } catch (MqttException e) {
LogUtils.e(Tag,"订阅主题失败:"+e.getMessage()); LogUtils.e(Tag, "订阅主题失败:" + e.getMessage());
} }
} }