From 456a063bbfbd564ea47a2c5715d8c60adb0a039f Mon Sep 17 00:00:00 2001 From: lzl <1239365383@qq.com> Date: Tue, 2 Dec 2025 11:41:32 +0800 Subject: [PATCH] =?UTF-8?q?mqtt=20=E8=BF=9E=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../moduleutil/base/CommonAppContext.java | 6 +- .../xscm/moduleutil/service/MqttConnect.java | 113 ++++++++---------- 2 files changed, 56 insertions(+), 63 deletions(-) diff --git a/BaseModule/src/main/java/com/xscm/moduleutil/base/CommonAppContext.java b/BaseModule/src/main/java/com/xscm/moduleutil/base/CommonAppContext.java index 170922cb..f44f3908 100644 --- a/BaseModule/src/main/java/com/xscm/moduleutil/base/CommonAppContext.java +++ b/BaseModule/src/main/java/com/xscm/moduleutil/base/CommonAppContext.java @@ -161,7 +161,7 @@ public class CommonAppContext extends MultiDexApplication implements Applicatio //设置mqtt环境 false 测试环境 true 正式环境 // ExternalResConstants.INSTANCE.setIS_MQTT_RELEASE(false); //设置http环境 false 测试环境 true 正式环境 - ExternalResConstants.INSTANCE.setIS_HTTP_RELEASE(true); + ExternalResConstants.INSTANCE.setIS_HTTP_RELEASE(false); currentEnvironment = ExternalResConstants.INSTANCE.HTTP_PATH(); @@ -450,9 +450,9 @@ public class CommonAppContext extends MultiDexApplication implements Applicatio // startService(mqttServiceIntent); // } - mqttConnect=MqttConnect.getInstance(this, currentEnvironment.getMqttUrl(),"android-"+ MqttClient.generateClientId()); +// mqttConnect=MqttConnect.getInstance(this, currentEnvironment.getMqttUrl(),"android-"+ MqttClient.generateClientId()); // mqttConnect=MqttConnect.getInstance(this,"tcp://1.13.101.98","android-"+ MqttClient.generateClientId()); - mqttConnect.mqttClient(); +// mqttConnect.mqttClient(); // 每次启动应用时重置状态 SpUtil.getInstance().setBooleanValue("youth_model_shown", false); diff --git a/BaseModule/src/main/java/com/xscm/moduleutil/service/MqttConnect.java b/BaseModule/src/main/java/com/xscm/moduleutil/service/MqttConnect.java index d51feea3..6aa9e201 100644 --- a/BaseModule/src/main/java/com/xscm/moduleutil/service/MqttConnect.java +++ b/BaseModule/src/main/java/com/xscm/moduleutil/service/MqttConnect.java @@ -6,6 +6,7 @@ import android.os.Looper; import android.util.Log; import android.widget.Toast; +import com.blankj.utilcode.util.ActivityUtils; import com.blankj.utilcode.util.LogUtils; import com.google.android.gms.common.api.Api; import com.hjq.toast.ToastUtils; @@ -33,10 +34,10 @@ public class MqttConnect { public static String update_app = ""; public static String qx_hour_ranking = ""; - public static String qx_redpacket_arrive="";//红包飘屏的主题 + public static String qx_redpacket_arrive = "";//红包飘屏的主题 Handler handler = new Handler(Looper.getMainLooper()); String[] topic; - int[] qos = {1,2,3,0,0,0,0,0,0,0,0,0,0}; // 消息质量 + int[] qos = {1, 2, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; // 消息质量 private static MqttConnect instance; public MqttConnect(Context context, String host, String clientId) { @@ -49,7 +50,7 @@ public class MqttConnect { update_app = "qx_xunlehui"; // 发送更新APP // qx_hour_ranking = "qx_hour_ranking"; qx_hour_ranking = "qx_hour_ranking"; - qx_redpacket_arrive="qx_redpacket_arrive"; + qx_redpacket_arrive = "qx_redpacket_arrive"; ArrayList topicList = new ArrayList<>(); topicList.add(shutdown); @@ -61,12 +62,13 @@ public class MqttConnect { /** * 单列模式,只能实例化一次 + * * @param context * @param host * @param clientId * @return */ - public static synchronized MqttConnect getInstance(Context context, String host, String clientId) { + public static synchronized MqttConnect getInstance(Context context, String host, String clientId) { if (instance == null) { instance = new MqttConnect(context, host, clientId); } @@ -76,51 +78,44 @@ public class MqttConnect { /** * 客户端connect连接mqtt服务器 **/ - public void mqttClient() - { -// close(); -// handler.postDelayed(new Runnable() { -// @Override -// public void run() { - try { -// uiTip("MQTT开始连接"); - MqttConnectOptions options = mqttConnectOptions(); - mqttClient.setCallback(new MqttInitCallback(context, HOST, clientId)); - mqttClient.connect(options); -// sub(topic,qos); - sub(shutdown); - sub(update_app); - sub(qx_hour_ranking); - sub(qx_redpacket_arrive); -// uiTip("MQTT连接成功"); - }catch (MqttException e){ -// uiTip("MQTT连接失败,准备重连。。。:"+e.getMessage()); - handler.postDelayed(new Runnable() { - @Override - public void run() { - LogUtils.e(Tag,"开始重连。。。"); - mqttClient(); - } - },3000); - } -// } -// },200); + public void mqttClient() { + close(); + + // 使用线程或线程池 + new Thread(() -> { + try { + MqttConnectOptions options = mqttConnectOptions(); + mqttClient.setCallback(new MqttInitCallback(context, HOST, clientId)); + // 在子线程连接(不会阻塞UI) + mqttClient.connect(options); + // 订阅也在子线程 +// sub(topic, qos); + sub(shutdown); + sub(update_app); + sub(qx_hour_ranking); + sub(qx_redpacket_arrive); + + // UI操作回到主线程 + ActivityUtils.getTopActivity().runOnUiThread(() -> uiTip("MQTT连接成功")); + + } catch (Exception e) { + ActivityUtils.getTopActivity().runOnUiThread(() -> { + uiTip("MQTT连接失败,准备重连: " + e.getMessage()); + }); + // 延迟重连 + handler.postDelayed(() -> mqttClient(), 3000); + } + }).start(); + } /** * 在主线程弹出消息 + * * @param msg */ - private void uiTip(String msg){ - LogUtils.d(Tag,msg); - handler.post(new Runnable() { - @Override - public void run() { -// Toast.makeText(context.getApplicationContext(), msg, Toast.LENGTH_SHORT).show(); - LogUtils.e("mqtt","连接成功"); - ToastUtils.show(msg); - } - }); + private void uiTip(String msg) { + LogUtils.d(Tag, msg); } /** @@ -131,7 +126,6 @@ public class MqttConnect { mqttClient = new MqttClient(HOST, clientId, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName("public"); - options.setConnectionTimeout(10); options.setCleanSession(true); options.setConnectionTimeout(10); options.setKeepAliveInterval(10); @@ -143,18 +137,18 @@ public class MqttConnect { /** * 关闭MQTT连接 */ - public void close(){ - if(mqttClient != null && mqttClient.isConnected()){ + public void close() { + if (mqttClient != null && mqttClient.isConnected()) { try { mqttClient.close(); mqttClient.disconnect(); mqttClient = null; } catch (MqttException e) { - LogUtils.e(Tag,"关闭MQTT连接报错:"+e.getMessage()); + LogUtils.e(Tag, "关闭MQTT连接报错:" + e.getMessage()); // ToastUtils.show("关闭MQTT连接报错"); } - }else { - LogUtils.d(Tag,"Mqtt已关闭"); + } else { + LogUtils.d(Tag, "Mqtt已关闭"); } } @@ -173,8 +167,8 @@ public class MqttConnect { * 向某个主题发布消息 * * @param topic: 发布的主题 - * @param msg: 发布的消息 - * @param qos: 消息质量 Qos:0、1、2 + * @param msg: 发布的消息 + * @param qos: 消息质量 Qos:0、1、2 */ public void pub(String topic, String msg, int qos) throws MqttException { MqttMessage mqttMessage = new MqttMessage(); @@ -190,11 +184,11 @@ public class MqttConnect { * * @param topic 主题 */ - public void sub(String topic){ + public void sub(String topic) { try { mqttClient.subscribe(topic); } catch (MqttException e) { - LogUtils.e(Tag,"MQTT主题订阅失败:" + e.getMessage()); + LogUtils.e(Tag, "MQTT主题订阅失败:" + e.getMessage()); // uiTip("MQTT主题订阅失败"); } } @@ -203,16 +197,15 @@ public class MqttConnect { * 订阅某一个主题,可携带Qos * * @param topic 所要订阅的主题 - * @param qos - * 消息质量:0最多发送一次,不保证消息能够到达接收端,也不负责重发 - * 1至少发送一次,确保消息能够到达接收端,但可能会导致消息重复 - * 2确保消息恰好被接收一次 + * @param qos 消息质量:0最多发送一次,不保证消息能够到达接收端,也不负责重发 + * 1至少发送一次,确保消息能够到达接收端,但可能会导致消息重复 + * 2确保消息恰好被接收一次 */ - public void sub(String[] topic, int[] qos){ + public void sub(String[] topic, int[] qos) { try { mqttClient.subscribe(topic, qos); - }catch (MqttException e){ - LogUtils.e(Tag,"订阅主题失败:"+e.getMessage()); + } catch (MqttException e) { + LogUtils.e(Tag, "订阅主题失败:" + e.getMessage()); } }