Files
my_yuyin/QXLive/Config/QXManagerMqtt.m
2025-09-22 18:48:29 +08:00

250 lines
9.6 KiB
Objective-C
Executable File

//
// ManagerMqtt.m
// SoundRiver
//
// Created by apple on 2020/3/19.
//
#import "QXManagerMqtt.h"
@interface QXManagerMqtt()<MQTTSessionManagerDelegate>
@property (nonatomic, strong)NSString *username;
@property (nonatomic, strong)NSString *password;
@property (nonatomic, strong)NSString *cliendId;
//订阅的topic
@property (nonatomic,strong) NSMutableDictionary *subedDict;
@property (nonatomic,assign) BOOL isSSL;
@end
@implementation QXManagerMqtt
+ (instancetype)sharedInstance {
static dispatch_once_t onceToken;
static QXManagerMqtt *sharedManager;
dispatch_once(&onceToken, ^{
sharedManager = [[QXManagerMqtt alloc] init];
[MQTTLog setLogLevel:(DDLogLevelError)];
});
return sharedManager;
}
#pragma mark - 绑定
- (void)bindWithUserName:(NSString *)username password:(NSString *)password cliendId:(NSString *)cliendId isSSL:(BOOL)isSSL{
self.username = username;
self.password = password;
self.cliendId = cliendId;
self.isSSL = isSSL;
[self.mySessionManager connectTo:AddressOfMQTTServer
port:AddressOfMQTTPort
tls:self.isSSL
keepalive:60
clean:YES
auth:YES
user:self.username
pass:self.password
will:NO
willTopic:nil
willMsg:nil
willQos:MQTTQosLevelAtMostOnce
willRetainFlag:NO
withClientId:self.cliendId
securityPolicy:[self customSecurityPolicy]
certificates:nil
protocolLevel:MQTTProtocolVersion311
connectHandler:^(NSError *error) {
NSLog(@"mqtt connect - %@", error);
// if (error == nil) {
// [self subscribeTopic:@"qx_room_topic"];
// }
}];
self.mySessionManager.subscriptions = self.subedDict;
}
- (MQTTSSLSecurityPolicy *)customSecurityPolicy
{
MQTTSSLSecurityPolicy *securityPolicy = [MQTTSSLSecurityPolicy policyWithPinningMode:MQTTSSLPinningModeNone];
securityPolicy.allowInvalidCertificates = YES;
securityPolicy.validatesCertificateChain = YES;
securityPolicy.validatesDomainName = NO;
return securityPolicy;
}
#pragma mark ---- 状态
- (void)sessionManager:(MQTTSessionManager *)sessionManager didChangeState:(MQTTSessionManagerState)newState {
switch (newState) {
case MQTTSessionManagerStateConnected:
QXLOG(@"eventCode -- 连接成功");
[self subscribeTopic:qx_room_topic];
[self subscribeTopic:qx_ac_topic];
[self reConnectForStateError];
break;
case MQTTSessionManagerStateConnecting:
QXLOG(@"eventCode -- 连接中");
break;
case MQTTSessionManagerStateClosed:
QXLOG(@"eventCode -- 连接被关闭");
break;
case MQTTSessionManagerStateError:
QXLOG(@"eventCode -- 连接错误");
break;
case MQTTSessionManagerStateClosing:
QXLOG(@"eventCode -- 关闭中");
break;
case MQTTSessionManagerStateStarting:
QXLOG(@"eventCode -- 连接开始");
break;
default:
break;
}
}
- (void)sessionManager:(MQTTSessionManager *)sessionManager didReceiveMessage:(NSData *)data onTopic:(NSString *)topic retained:(BOOL)retained{
NSString *localPassword = [[NSUserDefaults standardUserDefaults] objectForKey:kChirldLocalPassword];
if ([localPassword isExist]) {
return;
}
NSError *error;
NSString *dataStr = [[NSString alloc]initWithData:data encoding:NSUTF8StringEncoding];
dataStr = [dataStr stringByReplacingOccurrencesOfString:@"\\u" withString:@"u_u"];
dataStr = [dataStr stringByReplacingOccurrencesOfString:@"\\U" withString:@"U_U"];
dataStr=[dataStr stringByReplacingOccurrencesOfString:@"\\" withString:@""];
dataStr=[dataStr stringByReplacingOccurrencesOfString:@"\\" withString:@""];
dataStr=[dataStr stringByReplacingOccurrencesOfString:@"\\" withString:@""];
dataStr=[dataStr stringByReplacingOccurrencesOfString:@"\\" withString:@""];
dataStr=[dataStr stringByReplacingOccurrencesOfString:@"\"\"{" withString:@"\"{"];
dataStr=[dataStr stringByReplacingOccurrencesOfString:@"}\"\"" withString:@"}\""];
dataStr=[dataStr stringByReplacingOccurrencesOfString:@"\"{" withString:@"{"];
dataStr=[dataStr stringByReplacingOccurrencesOfString:@"}\"" withString:@"}"];
dataStr = [dataStr stringByReplacingOccurrencesOfString:@"U_U" withString:@"\\U"];
dataStr = [dataStr stringByReplacingOccurrencesOfString:@"u_u" withString:@"\\u"];
//dataStr = [self replaceUnicode:dataStr];
if(dataStr != nil){
QXLOG(@"收到消息%@",dataStr);
NSDictionary *dict = [NSJSONSerialization JSONObjectWithData:[dataStr dataUsingEncoding:NSUTF8StringEncoding] options:NSJSONReadingMutableContainers error:&error];
QXLOG(@"收到消息%@",dict);
if (dict != nil && self.delegate && [self.delegate respondsToSelector:@selector(socketManager:receivedMessage:topic:)]) {
dispatch_async(dispatch_get_main_queue(), ^{
[self.delegate socketManager:@"收到消息" receivedMessage:dict topic:topic];
});
}
}
}
-(NSString *)replaceUnicode:(NSString *)unicodeStr {
NSString *tempStr1 = [unicodeStr stringByReplacingOccurrencesOfString:@"\\u" withString:@"\\U"];
// NSString *tempStr2 = [tempStr1 stringByReplacingOccurrencesOfString:@"\"" withString:@"\\\""];
// NSString *tempStr3 = [[@"\"" stringByAppendingString:tempStr2] stringByAppendingString:@"\""];
NSData *tempData = [tempStr1 dataUsingEncoding:NSUTF8StringEncoding];
NSString* returnStr = [NSPropertyListSerialization propertyListFromData:tempData
mutabilityOption:NSPropertyListImmutable
format:NULL
errorDescription:NULL];
// NSString* returnStr = [[NSString alloc]initWithData:tempData encoding:NSUTF8StringEncoding];
//NSLog(@"Output = %@", returnStr);
return [returnStr stringByReplacingOccurrencesOfString:@"\\r\\n" withString:@"\n"];
}
#pragma mark - 订阅
//- (void)subscribeTopic:(NSString *)topic handler:(SubscribeTopicHandler)handler{
- (void)subscribeTopic:(NSString *)topic{
QXLOG(@"当前需要订阅-------- topic = %@",topic);
if (![self.subedDict.allKeys containsObject:topic]) {
[self.subedDict setObject:[NSNumber numberWithLong:MQTTQosLevelAtLeastOnce] forKey:topic];
QXLOG(@"订阅字典 ----------- = %@",self.subedDict);
self.mySessionManager.subscriptions = self.subedDict;
}else {
QXLOG(@"已经存在,不用订阅");
}
}
#pragma mark - 取消订阅
- (void)unsubscribeTopic:(NSString *)topic {
QXLOG(@"当前需要取消订阅-------- topic = %@",topic);
if ([self.subedDict.allKeys containsObject:topic]) {
[self.subedDict removeObjectForKey:topic];
QXLOG(@"更新之后的订阅字典 ----------- = %@",self.subedDict);
self.mySessionManager.subscriptions = self.subedDict;
}
else {
QXLOG(@"不存在,无需取消");
}
}
#pragma mark - 发布消息
- (void)sendDataToTopic:(NSString *)topic dict:(NSDictionary *)dict {
QXLOG(@"发送命令 topic = %@ dict = %@",topic,dict);
// [dict setValue:[QPCommonTool dateWithNowSp] forKey:@"time"];
NSData *data = [NSJSONSerialization dataWithJSONObject:dict options:0 error:nil];
[self.mySessionManager sendData:data topic:topic qos:MQTTQosLevelAtLeastOnce retain:NO];
}
#pragma mark MQTTSessionManagerDelegate
- (void)handleMessage:(NSData *)data onTopic:(NSString *)topic retained:(BOOL)retained {
if (self.delegate && [self.delegate respondsToSelector:@selector(MQTTClientModel_handleMessage:onTopic:retained:)]) {
[self.delegate MQTTClientModel_handleMessage:data onTopic:topic retained:retained];
}
}
- (void)disconnect {
[self.mySessionManager disconnectWithDisconnectHandler:^(NSError *error) {
QXLOG(@"断开连接 error = %@",[error description]);
}];
[self.mySessionManager setDelegate:nil];
[self.subedDict removeAllObjects];
_mySessionManager = nil;
}
- (void)reConnect {
if (self.mySessionManager && self.mySessionManager.port) {
if (self.mySessionManager.state == MQTTSessionManagerStateConnected) {
return;
}
self.mySessionManager.delegate = self;
[self.mySessionManager connectToLast:^(NSError *error) {
QXLOG(@"重新连接 error = %@",[error description]);
}];
self.mySessionManager.subscriptions = self.subedDict;
}else if ([self.username isExist] && [self.password isExist] && [self.cliendId isExist]) {
[self bindWithUserName:self.username password:self.password cliendId:self.cliendId isSSL:self.isSSL];
}
}
//连接错误之后又连接成功之后
- (void)reConnectForStateError{
}
#pragma mark - 懒加载
- (MQTTSessionManager *)mySessionManager {
if (!_mySessionManager) {
_mySessionManager = [[MQTTSessionManager alloc]init];
_mySessionManager.delegate = self;
}
return _mySessionManager;
}
- (NSMutableDictionary *)subedDict {
if (!_subedDict) {
_subedDict = [NSMutableDictionary dictionary];
}
return _subedDict;
}
@end