package com.xscm.moduleutil.service; import android.content.Context; import android.os.Handler; import android.os.Looper; import android.util.Log; import android.widget.Toast; import com.blankj.utilcode.util.LogUtils; import com.google.android.gms.common.api.Api; import com.hjq.toast.ToastUtils; import com.xscm.moduleutil.utils.logger.DataLogger; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.util.ArrayList; public class MqttConnect { private String HOST; private String Tag = "MQTT"; private String clientId = ""; private static MqttClient mqttClient = null; private Context context; // 订阅主题 public static String shutdown = ""; public static String update_app = ""; public static String qx_hour_ranking = ""; public static String qx_redpacket_arrive="";//红包飘屏的主题 Handler handler = new Handler(Looper.getMainLooper()); String[] topic; int[] qos = {1,2,3,0,0,0,0,0,0,0,0,0,0}; // 消息质量 private static MqttConnect instance; public MqttConnect(Context context, String host, String clientId) { this.HOST = host; this.context = context; this.clientId = clientId; // 这里是你自己需要订阅的主题 shutdown = "qx_room_topic"; // 关机 update_app = "qx_xunlehui"; // 发送更新APP // qx_hour_ranking = "qx_hour_ranking"; qx_hour_ranking = "qx_hour_ranking"; qx_redpacket_arrive="qx_redpacket_arrive"; ArrayList topicList = new ArrayList<>(); topicList.add(shutdown); topicList.add(update_app); topicList.add(qx_hour_ranking); topicList.add(qx_redpacket_arrive); topic = topicList.toArray(new String[0]); } /** * 单列模式,只能实例化一次 * @param context * @param host * @param clientId * @return */ public static synchronized MqttConnect getInstance(Context context, String host, String clientId) { if (instance == null) { instance = new MqttConnect(context, host, clientId); } return instance; } /** * 客户端connect连接mqtt服务器 **/ public void mqttClient() { // close(); // handler.postDelayed(new Runnable() { // @Override // public void run() { try { // uiTip("MQTT开始连接"); MqttConnectOptions options = mqttConnectOptions(); mqttClient.setCallback(new MqttInitCallback(context, HOST, clientId)); mqttClient.connect(options); // sub(topic,qos); sub(shutdown); sub(update_app); sub(qx_hour_ranking); sub(qx_redpacket_arrive); // uiTip("MQTT连接成功"); }catch (MqttException e){ // uiTip("MQTT连接失败,准备重连。。。:"+e.getMessage()); handler.postDelayed(new Runnable() { @Override public void run() { Log.e(Tag,"开始重连。。。"); mqttClient(); } },3000); } // } // },200); } /** * 在主线程弹出消息 * @param msg */ private void uiTip(String msg){ Log.d(Tag,msg); handler.post(new Runnable() { @Override public void run() { // Toast.makeText(context.getApplicationContext(), msg, Toast.LENGTH_SHORT).show(); LogUtils.e("mqtt","连接成功"); ToastUtils.show(msg); } }); } /** * MQTT连接参数设置 */ private MqttConnectOptions mqttConnectOptions() throws MqttException { mqttClient = new MqttClient(HOST, clientId, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName("public"); options.setConnectionTimeout(10); options.setCleanSession(true); options.setConnectionTimeout(10); options.setKeepAliveInterval(10); return options; } /** * 关闭MQTT连接 */ public void close(){ if(mqttClient != null && mqttClient.isConnected()){ try { mqttClient.close(); mqttClient.disconnect(); mqttClient = null; } catch (MqttException e) { Log.e(Tag,"关闭MQTT连接报错:"+e.getMessage()); // ToastUtils.show("关闭MQTT连接报错"); } }else { Log.d(Tag,"Mqtt已关闭"); } } /** * 向某个主题发布消息 默认qos:1 */ public static void pub(String topic, String msg) throws MqttException { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setPayload(msg.getBytes()); MqttTopic mqttTopic = mqttClient.getTopic(topic); MqttDeliveryToken token = mqttTopic.publish(mqttMessage); token.waitForCompletion(); } /** * 向某个主题发布消息 * * @param topic: 发布的主题 * @param msg: 发布的消息 * @param qos: 消息质量 Qos:0、1、2 */ public void pub(String topic, String msg, int qos) throws MqttException { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setPayload(msg.getBytes()); MqttTopic mqttTopic = mqttClient.getTopic(topic); MqttDeliveryToken token = mqttTopic.publish(mqttMessage); token.waitForCompletion(); } /** * 订阅某一个主题 ,此方法默认的的Qos等级为:1 * * @param topic 主题 */ public void sub(String topic){ try { mqttClient.subscribe(topic); } catch (MqttException e) { Log.e(Tag,"MQTT主题订阅失败:" + e.getMessage()); // uiTip("MQTT主题订阅失败"); } } /** * 订阅某一个主题,可携带Qos * * @param topic 所要订阅的主题 * @param qos * 消息质量:0最多发送一次,不保证消息能够到达接收端,也不负责重发 * 1至少发送一次,确保消息能够到达接收端,但可能会导致消息重复 * 2确保消息恰好被接收一次 */ public void sub(String[] topic, int[] qos){ try { mqttClient.subscribe(topic, qos); }catch (MqttException e){ Log.e(Tag,"订阅主题失败:"+e.getMessage()); } } // 原文链接:https://blog.csdn.net/Fyx1987496919/article/details/140516525 }