package com.xscm.moduleutil.service; import android.content.Context; import android.os.Handler; import android.os.Looper; import android.util.Log; import android.widget.Toast; import com.blankj.utilcode.util.ActivityUtils; import com.blankj.utilcode.util.LogUtils; import com.google.android.gms.common.api.Api; import com.hjq.toast.ToastUtils; import com.xscm.moduleutil.utils.logger.DataLogger; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.util.ArrayList; public class MqttConnect { private String HOST; private String Tag = "MQTT"; private String clientId = ""; private static MqttClient mqttClient = null; private Context context; // 订阅主题 public static String shutdown = ""; public static String update_app = ""; public static String qx_hour_ranking = ""; public static String qx_redpacket_arrive = "";//红包飘屏的主题 Handler handler = new Handler(Looper.getMainLooper()); String[] topic; int[] qos = {1, 2, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; // 消息质量 private static MqttConnect instance; public MqttConnect(Context context, String host, String clientId) { this.HOST = host; this.context = context; this.clientId = clientId; // 这里是你自己需要订阅的主题 shutdown = "qx_room_topic"; // 房间飘屏 update_app = "qx_xunlehui"; // 巡乐会飘屏 qx_hour_ranking = "qx_hour_ranking";//小时榜飘屏 qx_redpacket_arrive = "qx_redpacket_arrive"; //红包飘屏的主题 ArrayList topicList = new ArrayList<>(); topicList.add(shutdown); topicList.add(update_app); topicList.add(qx_hour_ranking); topicList.add(qx_redpacket_arrive); topic = topicList.toArray(new String[0]); } /** * 单列模式,只能实例化一次 * * @param context * @param host * @param clientId * @return */ public static synchronized MqttConnect getInstance(Context context, String host, String clientId) { if (instance == null) { instance = new MqttConnect(context, host, clientId); } return instance; } /** * 客户端connect连接mqtt服务器 **/ public void mqttClient() { close(); // 使用线程或线程池 new Thread(() -> { try { MqttConnectOptions options = mqttConnectOptions(); mqttClient.setCallback(new MqttInitCallback(context, HOST, clientId)); // 在子线程连接(不会阻塞UI) mqttClient.connect(options); // 订阅也在子线程 // sub(topic, qos); sub(shutdown); sub(update_app); sub(qx_hour_ranking); sub(qx_redpacket_arrive); // UI操作回到主线程 ActivityUtils.getTopActivity().runOnUiThread(() -> uiTip("MQTT连接成功")); } catch (Exception e) { ActivityUtils.getTopActivity().runOnUiThread(() -> { uiTip("MQTT连接失败,准备重连: " + e.getMessage()); }); // 延迟重连 handler.postDelayed(() -> mqttClient(), 3000); } }).start(); } /** * 在主线程弹出消息 * * @param msg */ private void uiTip(String msg) { LogUtils.d(Tag, msg); } /** * MQTT连接参数设置 */ private MqttConnectOptions mqttConnectOptions() throws MqttException { mqttClient = new MqttClient(HOST, clientId, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName("public"); options.setCleanSession(true); options.setConnectionTimeout(10); options.setKeepAliveInterval(10); return options; } /** * 关闭MQTT连接 */ public void close() { if (mqttClient != null && mqttClient.isConnected()) { try { mqttClient.close(); mqttClient.disconnect(); mqttClient = null; } catch (MqttException e) { LogUtils.e(Tag, "关闭MQTT连接报错:" + e.getMessage()); // ToastUtils.show("关闭MQTT连接报错"); } } else { LogUtils.d(Tag, "Mqtt已关闭"); } } /** * 向某个主题发布消息 默认qos:1 */ public static void pub(String topic, String msg) throws MqttException { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setPayload(msg.getBytes()); MqttTopic mqttTopic = mqttClient.getTopic(topic); MqttDeliveryToken token = mqttTopic.publish(mqttMessage); token.waitForCompletion(); } /** * 向某个主题发布消息 * * @param topic: 发布的主题 * @param msg: 发布的消息 * @param qos: 消息质量 Qos:0、1、2 */ public void pub(String topic, String msg, int qos) throws MqttException { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setPayload(msg.getBytes()); MqttTopic mqttTopic = mqttClient.getTopic(topic); MqttDeliveryToken token = mqttTopic.publish(mqttMessage); token.waitForCompletion(); } /** * 订阅某一个主题 ,此方法默认的的Qos等级为:1 * * @param topic 主题 */ public void sub(String topic) { try { mqttClient.subscribe(topic); } catch (MqttException e) { LogUtils.e(Tag, "MQTT主题订阅失败:" + e.getMessage()); // uiTip("MQTT主题订阅失败"); } } /** * 订阅某一个主题,可携带Qos * * @param topic 所要订阅的主题 * @param qos 消息质量:0最多发送一次,不保证消息能够到达接收端,也不负责重发 * 1至少发送一次,确保消息能够到达接收端,但可能会导致消息重复 * 2确保消息恰好被接收一次 */ public void sub(String[] topic, int[] qos) { try { mqttClient.subscribe(topic, qos); } catch (MqttException e) { LogUtils.e(Tag, "订阅主题失败:" + e.getMessage()); } } // 原文链接:https://blog.csdn.net/Fyx1987496919/article/details/140516525 }