package com.qxcm.moduleutil.service; import android.annotation.SuppressLint; import android.app.Service; import android.content.BroadcastReceiver; import android.content.Context; import android.content.Intent; import android.content.IntentFilter; import android.os.IBinder; import android.util.Log; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.blankj.utilcode.util.NetworkUtils; import com.blankj.utilcode.util.ThreadUtils; import com.blankj.utilcode.util.ToastUtils; import com.orhanobut.logger.Logger; import com.qxcm.moduleutil.base.CommonAppContext; import com.qxcm.moduleutil.bean.UserBean; import com.qxcm.moduleutil.event.BossMsgEvent; import com.qxcm.moduleutil.event.QDZMqttEvent; import com.qxcm.moduleutil.utils.GsonUtils; import com.qxcm.moduleutil.utils.SpUtil; import com.tencent.qcloud.tuicore.TUIConstants; import org.eclipse.paho.android.service.MqttAndroidClient; import org.eclipse.paho.client.mqttv3.IMqttActionListener; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.greenrobot.eventbus.EventBus; import java.lang.reflect.Field; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class EMqttService extends Service implements NetworkUtils.OnNetworkStatusChangedListener { private final static String TAG = "EMQTT消息"; private static final int qos = 2; private static final String HOST = "tcp://81.70.45.221"; @SuppressLint("StaticFieldLeak") private static MqttAndroidClient mqttAndroidClient; private MqttConnectOptions mMqttConnectOptions; private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); private static final String TOPIC_BOSS = "boss"; private static final String TOPIC_ROOM = "qx_room_topic"; private static final String TOPIC_QIANG_TANG_GUO = "red_envelope_single_room_real_time_data"; // 抢糖果 public static String sRoomId = "qx_room_topic"; public static String sUserId; private BroadcastReceiver unreadCountReceiver; @Override public void onCreate() { super.onCreate(); Logger.e(TAG, "服务创建成功"); NetworkUtils.registerNetworkStatusChangedListener(this); scheduledExecutorService.scheduleWithFixedDelay(connectTask, 0, 10, TimeUnit.SECONDS); } private final Runnable connectTask = this::doClientConnection; @Override public int onStartCommand(Intent intent, int flags, int startId) { return super.onStartCommand(intent, flags, startId); } @Override public IBinder onBind(Intent intent) { return null; } public static void subscribeRoom(String roomId) { subscribe("qx_room_topic"); } public static void cleanSubscribeRoom(String roomId) { cleanSubscribe("qx_room_topic"); } public static void subscribeUser(String userId) { sUserId = userId; subscribe("user_" + userId); } public static void cleanSubscribeUser() { cleanSubscribe("user_" + sUserId); sUserId = null; } /** * 订阅主题 */ public static void subscribe(String topic) { try { if (isAlreadyConnected()) { IMqttToken subToken = mqttAndroidClient.subscribe(topic, qos); subToken.setActionCallback(new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { Logger.e(TAG, "主题订阅成功", topic); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { Logger.e(TAG, "主题订阅失败"); ThreadUtils.runOnUiThreadDelayed(() -> subscribe(topic), 1000); } }); } } catch (Exception e) { e.printStackTrace(); } } public static void cleanSubscribe(String topic) { try { if (isAlreadyConnected()) { IMqttToken subToken = mqttAndroidClient.unsubscribe(topic); subToken.setActionCallback(new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { Logger.e(TAG, "取消主题成功"); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { Logger.e(TAG, "取消主题失败"); } }); } } catch (Exception e) { e.printStackTrace(); } } /** * 初始化客户端连接 */ private String clientId = ""; private void init() { UserBean user = CommonAppContext.getInstance().getUser(); if (mqttAndroidClient == null) { clientId = "android-" + user.getUser_id() + "-" + MqttClient.generateClientId(); mqttAndroidClient = new MqttAndroidClient(this, HOST, clientId); } mMqttConnectOptions = new MqttConnectOptions(); mMqttConnectOptions.setCleanSession(true); mMqttConnectOptions.setConnectionTimeout(10); mMqttConnectOptions.setKeepAliveInterval(60); mMqttConnectOptions.setUserName("public"); mMqttConnectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); try { mqttAndroidClient.connect(mMqttConnectOptions, null, iMqttActionListener); mqttAndroidClient.setCallback(mqttCallback); } catch (MqttException e) { e.printStackTrace(); } } private synchronized void doClientConnection() { Logger.e("doClientConnection", "doClientConnection"); if (!isAlreadyConnected() && isConnectIsNomarl()) { releaseMQTT(); init(); } else { releaseMQTT(); init(); } } /** * 是否已连接 */ public static boolean isAlreadyConnected() { return mqttAndroidClient != null && mqttAndroidClient.isConnected(); } /** * 网络是否可用 */ private boolean isConnectIsNomarl() { if (NetworkUtils.isConnected()) { return true; } else { Log.i(TAG, "没有可用网络"); return false; } } /** * 连接状态监听 */ private final IMqttActionListener iMqttActionListener = new IMqttActionListener() { @Override public void onSuccess(IMqttToken arg0) { long time = 0; Logger.e(TAG, "链接状态成功"); EventBus.getDefault().post(EmqttState.CONNECTED); if (sRoomId != null) { subscribeRoom(sRoomId); } subscribe(TOPIC_BOSS); subscribe(TOPIC_ROOM); subscribe(TOPIC_QIANG_TANG_GUO); subscribeUser(CommonAppContext.getInstance().getUser().getUser_id() + ""); } @Override public void onFailure(IMqttToken arg0, Throwable arg1) { arg1.printStackTrace(); if (arg1 instanceof MqttException) { MqttException mqttException = (MqttException) arg1; Logger.e(TAG, "链接状态失败:" + mqttException.getMessage()); } ThreadUtils.runOnUiThreadDelayed(connectTask, 1000); } }; /** * 消息回调 */ private final MqttCallback mqttCallback = new MqttCallback() { @Override public void connectionLost(Throwable cause) { Logger.e(TAG, "链接断开,请检查网络"); doClientConnection(); } @Override public void messageArrived(String topic, MqttMessage message) { Logger.e(TAG, "收到的消息", "主题:" + topic + " 内容:" + message.toString()); if (TOPIC_BOSS.equals(topic)) { if (EventBus.getDefault().hasSubscriberForEvent(BossMsgEvent.class)) { EventBus.getDefault().post(new BossMsgEvent(message.toString())); } return; } if (TOPIC_QIANG_TANG_GUO.equals(topic)) { if (EventBus.getDefault().hasSubscriberForEvent(QDZMqttEvent.class)) { EventBus.getDefault().post(new QDZMqttEvent(message.toString())); } return; } receiveMessage(topic, message.toString()); } @Override public void deliveryComplete(IMqttDeliveryToken token) { // 发送完成回调 } }; @Override public void onDestroy() { try { NetworkUtils.unregisterNetworkStatusChangedListener(this); scheduledExecutorService.shutdownNow(); scheduledExecutorService = null; cleanSubscribeRoom(sRoomId); cleanSubscribeUser(); cleanSubscribe(TOPIC_BOSS); cleanSubscribe(TOPIC_ROOM); cleanSubscribe(TOPIC_QIANG_TANG_GUO); releaseMQTT(); if (mqttAndroidClient != null) { mqttAndroidClient.setCallback(null); mqttAndroidClient.disconnect(); mqttAndroidClient = null; } } catch (Exception e) { e.printStackTrace(); } super.onDestroy(); } public synchronized void releaseMQTT() { if (mqttAndroidClient != null) { try { // stopAlarmPingSender(mqttAndroidClient); // 防止BroadcastReceiver泄漏 mqttAndroidClient.disconnect(); mqttAndroidClient.unregisterResources(); mqttAndroidClient = null; } catch (Exception e) { e.printStackTrace(); } finally { mqttAndroidClient = null; } } } /** * 停止 Paho SDK 的 AlarmPingSender */ private void stopAlarmPingSender(MqttAndroidClient client) { try { Field field = MqttAndroidClient.class.getDeclaredField("client"); field.setAccessible(true); Object internalClient = field.get(client); Field pingSenderField = internalClient.getClass().getDeclaredField("pingSender"); pingSenderField.setAccessible(true); Object pingSender = pingSenderField.get(internalClient); // if (pingSender instanceof AlarmpingSender) { // ((AlarmPingSender) pingSender).stop(); // } } catch (Exception e) { e.printStackTrace(); } } private void receiveMessage(String topic, String data) { JSONObject jsonObject; try { jsonObject = JSON.parseObject(data); } catch (Exception e) { e.printStackTrace(); return; } int type = jsonObject.getIntValue("type"); String message = jsonObject.getString("msg"); if (TOPIC_ROOM.equals(topic)) { if (type == 10000) { // 全服红包 } } switch (type) { // 根据业务逻辑处理不同的消息类型 } } private void sendEvent(String message, Class tClass) { EventBus.getDefault().post(JSON.parseObject(message, tClass)); } private void sendEventList(String message, Class tClass) { List list = JSON.parseArray(message, tClass); EventBus.getDefault().post(list); } private void sendKtEvent(String message, Class tClass) { EventBus.getDefault().post(GsonUtils.GsonToBean(message, tClass)); } @Override public void onConnected(NetworkUtils.NetworkType networkType) { doClientConnection(); } @Override public void onDisconnected() { // 可选:做网络断开后的清理工作 } public enum EmqttState { CONNECTED, DISCONNECTED } }