XMPPFramework源码分析<三> - 流
XMPPStream是XMPPFramework的核心,所有的消息的发送和接收都是在XMPPStream中进行的。借助CocoaAsyncSocket实现socket通信,申明了GCDAsyncSocketDelegate委托。
私有成员
- xmppQueue:线程队列
- xmppQueueTag: 线程队列的标志符,用来找回标注过的线程
- 线程队列:willSendIqQueue、willSendMessageQueue、willSendPresenceQueue、willSendLoginQueue、willReceiveIqQueue、willReceiveMessageQueue、willReceivePresenceQueue、willReceiveLogin、didReceiveIqQueue
- connectTimer:监视某些类型事件的对象
- multicastDelegate:多重委托
- state:枚举XMPPStreamState,表示连接状态
- asyncSocket:GCDAsyncSocket的实例,负责socket的连接
- parser:XMPPParser实例,负责处理接收到的流。
- parserError
- flags
- kP2PInitiator:P2P模式
- kIsSecure:通过SSL/TLS保护连接
- kIsAuthenticated:认证成功
- kDidStartNegotiation:Negotiation已经至少开始了一次
- config
- kP2PMode:XMPPStream以P2P模式初始化
- kResetByteCountPerConnection:字节计数每次连接都会清空
- kEnableBackgroundingOnSocket:socket上设置为VOIP标志
- authenticationDate
- myJID_setByClient:客户端设置的用户JID
- myJID_setByServer:服务器设置的用户JID
- remoteJID:
- rootElement:接收到消息的根元素,包含服务器的配置信息。
- lastSendReceiveTime:最近的socket通信时间
- keepAliveData:心跳数据,可通过
(void)setKeepAliveWhitespaceCharacter:(char)keepAliveChar
来改变 - registeredModules
- autoDelegateDict
- srvResolver:XMPPSRVResolver
- srvResults:
- srvResultsIndex
- receipts
- userTag
属性
在对属性赋值和取值时均要判断当前进程是否处在xmppQueue队列下:
hostName(可选)
建立TCP连接的服务器主机名,如果没有设置,则进行SRV查找
hostPort
xmpp服务器的端口,默认为5222
autoStartTLS
是否开启安全传输层协议(TLS),默认为NO
myJID
用户的JID,当连接时,JID的域名被用来确定xmpp虚拟主机;
在注册和认证时可以利用用户名生成的JID进行通信。
remoteJID(只用于P2P)
keepAliveInterval:传送心跳数据的无连接时间间隔
xmpp支持传送心跳数据,传送的空白字符会被xmpp协议忽略,该数据只有在没有其他数据传送或接收时发送,当其他数据最后一次发送或接收时,开始计时,并在达到间隔时间时传送数据。
keepAliveWhitespaceCharacter
可以设定心跳数据的字符串,只能是空格、Tab、回车
myPresence
myJID最近一次发送的presence元素
numberOfBytesSent/numberOfBytesReceived
通过xmppStream接收或发送的总字节数
resetByteCountPerConnection
是否在新建连接前清空字节统计
tag
可以绑定用户的自定义信息
初始化
- (id)init;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36xmppQueueTag = &xmppQueueTag;
xmppQueue = dispatch_queue_create("xmpp", NULL);
// 向xmppQueue队列设置标识xmppQueueTag,方便进行当前队列的判断
dispatch_queue_set_specific(xmppQueue, xmppQueueTag, xmppQueueTag, NULL);
willSendIqQueue = dispatch_queue_create("xmpp.willSendIq", NULL);
willSendMessageQueue = dispatch_queue_create("xmpp.willSendMessage", NULL);
willSendPresenceQueue = dispatch_queue_create("xmpp.willSendPresence", NULL);
willSendLoginQueue = dispatch_queue_create("xmpp.willSendLogin", NULL);
willReceiveIqQueue = dispatch_queue_create("xmpp.willReceiveIq", NULL);
willReceiveMessageQueue = dispatch_queue_create("xmpp.willReceiveMessage", NULL);
willReceivePresenceQueue = dispatch_queue_create("xmpp.willReceivePresence", NULL);
willReceiveLogin = dispatch_queue_create("xmpp.willReceiveLogin", NULL);
didReceiveIqQueue = dispatch_queue_create("xmpp.didReceiveIq", NULL);
// 初始化多重委托
multicastDelegate = (GCDMulticastDelegate <XMPPStreamDelegate> *)[[GCDMulticastDelegate alloc] init];
state = STATE_XMPP_DISCONNECTED;
flags = 0;
config = 0;
numberOfBytesSent = 0;
numberOfBytesReceived = 0;
hostPort = 5222;
// 2分钟的心跳数据等待发送间隔
keepAliveInterval = DEFAULT_KEEPALIVE_INTERVAL;
keepAliveData = [@" " dataUsingEncoding:NSUTF8StringEncoding];
registeredModules = [[NSMutableArray alloc] init];
autoDelegateDict = [[NSMutableDictionary alloc] init];
receipts = [[NSMutableArray alloc] init];asyncSocket初始化
多重委托相关
1
2
3
4
5
6
7
8
9
10
11
12
13- (void)addDelegate:(id)delegate delegateQueue:(dispatch_queue_t)delegateQueue
{
// Asynchronous operation (if outside xmppQueue)
dispatch_block_t block = ^{
[multicastDelegate addDelegate:delegate delegateQueue:delegateQueue];
};
if (dispatch_get_specific(xmppQueueTag))// 当前队列是xmppQueue
block();
else// 当前队列不是xmppQueue
dispatch_async(xmppQueue, block);
}
连接与断开
- (BOOL)connectWithTimeout:(NSTimeInterval)timeout error:(NSError **)errPtr
通过设定好的hostPort连接hostName:
[self connectToHost:hostName onPort:hostPort withTimeout:XMPPStreamTimeoutNone error:&connectErr]
通知代理:
[multicastDelegate xmppStreamWillConnect:self];
无hostName,则利用用户JID的域名进行服务定位(SRV)资源查找;
有hostName,则与该主机建立TCP连接
[asyncSocket connectToHost:host onPort:port error:errPtr]
设置连接超时的操作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28- (void)startConnectTimeout:(NSTimeInterval)timeout
{
XMPPLogTrace();
if (timeout >= 0.0 && !connectTimer)
{
// 创建计时器型dispatch source
connectTimer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, xmppQueue);
// 设置到时后的block:进行连接超时操作
dispatch_source_set_event_handler(connectTimer, ^{ @autoreleasepool {
// 连接超时操作
[self doConnectTimeout];
}});
dispatch_source_t theConnectTimer = connectTimer;
dispatch_source_set_cancel_handler(connectTimer, ^{
XMPPLogVerbose(@"%@: dispatch_release(connectTimer)", THIS_FILE);
dispatch_release(theConnectTimer);
});
// 初始化计时器
dispatch_time_t tt = dispatch_time(DISPATCH_TIME_NOW, (timeout * NSEC_PER_SEC));
dispatch_source_set_timer(connectTimer, tt, DISPATCH_TIME_FOREVER, 0);
// 启动connectTimer
dispatch_resume(connectTimer);
}
}如果连接成功,将触发
(void)socket:(GCDAsyncSocket *)sock didConnectToHost:(NSString *)host port:(UInt16)port
代理中连接超时将会关闭。- (BOOL)oldSchoolSecureConnectWithTimeout:(NSTimeInterval)timeout error:(NSError **)errPtr
提供给那些需要提早保护连接的老旧服务器(这类服务器的端口一般为5223)。
- (BOOL)connectP2PWithSocket:(GCDAsyncSocket *)acceptedSocket error:(NSError **)errPtr
当XMPPStream用initP2P方法初始化后,可以使用给定的socket开始P2P连接。
- (void)disconnect
关闭TCP socket连接,由于是同步的,所以会立即发生
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17// 通知xmppStreamDelegate委托,用来区分断开的原因是自愿还是连接错误
[multicastDelegate xmppStreamWasToldToDisconnect:self];
// 有主机名
if (state == STATE_XMPP_RESOLVING_SRV)
{// 停止XMPPSRVResolver
[srvResolver stop];
srvResolver = nil;
state = STATE_XMPP_DISCONNECTED;
// 通知xmppStreamDelegate委托,
[multicastDelegate xmppStreamDidDisconnect:self withError:nil];
}
else
{
// 断开socket连接
[asyncSocket disconnect];
// Everthing will be handled in socketDidDisconnect:withError:
}- (void)disconnectWithError:(NSError*)error
由于错误而发生的断开,与4相同,不同之处在于通知delegate错误信息。
安全
secure
- (void)socket:(GCDAsyncSocket *)sock didConnectToHost:(NSString *)host port:(UInt16)port
socket连接成功后触发[self startTLS]
开始之前通知所有的代理(如果包含(xmppStream:willSecureWithSettings:)方法)
[self continueStartTLS:settings]
[asyncSocket startTLS:settings];
socket开始TLS协商会话secure设为YES
开始协商
asyncSocket开始读取数据
- (BOOL)supportsStartTLS()
判断服务器是否可以通过SSL/TLS对连接进行保护
通过读取rootElement的子元素
来实现。 - (BOOL)secureConnection:(NSError **)errPtr
企图通过SSL/TLS来保护连接,该方法是同步的,errPtr用于存放错误信息,
[self sendStartTLSRequest]
服务器收到请求后发送响应
- (void)xmppParser:(XMPPParser *)sender didReadElement:(NSXMLElement *)element
接收到带proceed
元素[self handleStartTLSResponse:element]
[self startTLS]
注册
- (BOOL)supportsInBandRegistration
查看服务器是否支持带内(in-Band)注册
- (BOOL)registerWithElements:(NSArray *)elements error:(NSError **)errPtr;
通过给定的元素进行新用户的注册
建立
type
为set
的IQ元素建立
的NSXMLElement,将元素数组的元素全部加入进去 将
放入IQ 对xmlString通过压缩,编码,生成 outgoingData
[asyncSocket writeData:outgoingData withTimeout:TIMEOUT_XMPP_WRITE tag:TAG_XMPP_WRITE_STREAM]
- (BOOL)registerWithPassword:(NSString *)password error:(NSError **)errPtr;
通过给定的用户名和密码建立一个用户账号
将
username
和password
两个元素添加到elements中并调用2的方法进行注册。
认证
- (BOOL)authenticateWithPassword:(NSString *)inPassword error:(NSError **)errPtr
选择一个最好的认证方法,共四种
[self authenticate:someAuth error:&err]
state = STATE_XMPP_AUTH
[inAuth start:&err]
- (void)sendAuthElement:(NSXMLElement *)element
[asyncSocket writeData:outgoingData withTimeout:TIMEOUT_XMPP_WRITE tag:TAG_XMPP_WRITE_STREAM]
服务器信息
服务器相关信息来自rootElement,每次收到xml消息通过parser进行解析时,都要从头结点开始,就会触发相应的代理。
发送消息
- (void)sendElement:(NSXMLElement *)element
一般情况下iq、message、presence、login都是通过该通用方法进行发送;
如果element属于以上任何一个类型则使用对应的发送方法;如果不是则根据name通过object_setClass转换为对应类型;如果都不是则将element以压缩字符串的形式直接发送。
- (void)sendIQ:(XMPPIQ *)iq withTag:(long)tag
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49// 通知一切感兴趣的代理,为了能够以线程安全的方式改变元素(element),需要连续进行操作
// 获取multicastDelegate的枚举,方便遍历
GCDMulticastDelegateEnumerator *delegateEnumerator = [multicastDelegate delegateEnumerator];
dispatch_async(willSendIqQueue, ^{ @autoreleasepool {
// Allow delegates to modify and/or filter outgoing element
__block XMPPIQ *modifiedIQ = iq;
id del;
dispatch_queue_t dq;
while (modifiedIQ && [delegateEnumerator getNextDelegate:&del delegateQueue:&dq forSelector:selector])
{
{
// 确定代理方法(xmppStream:willSendIQ:)的返回值是否是(XMPPIQ *)
char methodReturnType[32];
Method method = class_getInstanceMethod([del class], selector);
method_getReturnType(method, methodReturnType, sizeof(methodReturnType));
if (strcmp(methodReturnType, @encode(XMPPIQ*)) != 0)
{
NSAssert(NO, @"Method xmppStream:willSendIQ: is no longer void (see XMPPStream.h). "
@"Culprit = %@", NSStringFromClass([del class]));
}
}
dispatch_sync(dq, ^{ @autoreleasepool {
// 提供通过外部delegate改变IQ的机会
modifiedIQ = [del xmppStream:self willSendIQ:modifiedIQ];
}});
}
if (modifiedIQ)
{
dispatch_async(xmppQueue, ^{ @autoreleasepool {
if (state == STATE_XMPP_CONNECTED) {
// 继续IQ的发送:压缩转码后通过[asyncSocket writeData:...]方法发送
[self continueSendIQ:modifiedIQ withTag:tag];
}
}});
}
}});- (void)sendLogin:(XMPPLogin *)login withTag:(long)tag
同2- (void)sendMessage:(XMPPMessage *)message withTag:(long)tag
同2- (void)sendPresence:(XMPPPresence *)presence withTag:(long)tag
同2- (void)injectElement:(NSXMLElement *)element
该方法允许你将一个元素插入到流中,就好像是从socket接收到的消息一样,这创造了一些有趣的可能性。
接收消息
接收消息的过程:
AsyncSocket的代理方法
- (void)socket:(GCDAsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag
接收到数据
[parser parseData:data]
触发parser的代理方法
- (void)xmppParser:(XMPPParser *)sender didReadRoot:(NSXMLElement *)root
拿到rootElement
- (void)xmppParser:(XMPPParser *)sender didReadElement:(NSXMLElement *)element
解析element,并根据state的状态分发下去:
STATE_XMPP_NEGOTIATING
->[self handleStreamFeatures]
该方法用来分析接收到的服务器的流特征,判断是否已经与服务器建立了连接
根据features检查是否需要TLS,需要则
[self sendStartTLSRequest]
根据features检查是否支持资源绑定,需要则将资源绑定请求加入到iq中发送到服务器
state = STATE_XMPP_CONNECTED
STATE_XMPP_STARTTLS_1
->[self handleStartTLSResponse:element]
如果返回的是”proceed”的元素,则成功,开始TLS协商;如果不成功则断开连接。
STATE_XMPP_REGISTERING
->[self handleRegistration:element]
如果返回的元素的
type
不是”error”,则注册成功。
STATE_XMPP_AUTH
->[self handleAuth:element]
如果认证成功,则再次开始协商。
STATE_XMPP_BINDING)
->[self handleBinding:element]
如果能够拿到JID,说明资源绑定成功,检查是否需要建立对话,需要则开始IM会话,结束。
没有JID则尝试改变resource
通知代理,利用代理方法得到更改后的resource
[self continueHandleBinding:alternativeResource]
有
alternativeResource
则添加到iq中发送没有则让服务器来分配resource(通过发送
type
为”set”的iq)
STATE_XMPP_START_SESSION
->[self handleStartSessionResponse:element]
type
为”result”表示认证成功,回到STATE_XMPP_CONNECTED
状态
iq
->[self receiveIQ:[XMPPIQ iqFromElement:element]]
通知所有的代理,可在解析iq前改变或过滤iq
[self continueReceiveIQ:modifiedIQ]
如果iq需要回复
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72 SEL selector = @selector(xmppStream:didReceiveIQ:);
// 用信号量快速控制并发
dispatch_semaphore_t delSemaphore = dispatch_semaphore_create(0);
dispatch_group_t delGroup = dispatch_group_create();
while ([delegateEnumerator getNextDelegate:&del delegateQueue:&dq forSelector:selector])
{
dispatch_group_async(delGroup, dq, ^{ @autoreleasepool {
if ([del xmppStream:self didReceiveIQ:iq])
{
// 有代理方法对iq进行占用则发送一个信号
dispatch_semaphore_signal(delSemaphore);
}
}});
}
dispatch_async(didReceiveIqQueue, ^{ @autoreleasepool {
dispatch_group_wait(delGroup, DISPATCH_TIME_FOREVER);
// Did any of the delegates handle the IQ? (handle == will response)
BOOL handled = (dispatch_semaphore_wait(delSemaphore, DISPATCH_TIME_NOW) == 0);
// 客户端收到<type>为"get"或"set"的IQ请求时必须以<type>为"result"或"error"的iq来回复,并且回复的iq必须保留收到的iq的"id"
// 如果没有代理来处理这个iq则返回带<errro>的iq
if (!handled)
{
// 返回给服务器的错误信息:
//
// <iq to="jid" type="error" id="id">
// <query xmlns="ns"/>
// <error type="cancel" code="501">
// <feature-not-implemented xmlns="urn:ietf:params:xml:ns:xmpp-stanzas"/>
// </error>
// </iq>
NSXMLElement *reason = [NSXMLElement elementWithName:@"feature-not-implemented"
xmlns:@"urn:ietf:params:xml:ns:xmpp-stanzas"];
NSXMLElement *error = [NSXMLElement elementWithName:@"error"];
[error addAttributeWithName:@"type" stringValue:@"cancel"];
[error addAttributeWithName:@"code" stringValue:@"501"];
[error addChild:reason];
XMPPIQ *iqResponse = [XMPPIQ iqWithType:@"error"
to:[iq from]
elementID:[iq elementID]
child:error];
// 返回的iq中添加iq的子元素
NSXMLElement *iqChild = [iq childElement];
if (iqChild)
{
NSXMLNode *iqChildCopy = [iqChild copy];
[iqResponse insertChild:iqChildCopy atIndex:0];
}
// Purposefully go through the sendElement: method
// so that it gets dispatched onto the xmppQueue,
// and so that modules may get notified of the outgoing error message.
[self sendElement:iqResponse];
}
dispatch_release(delSemaphore);
dispatch_release(delGroup);
}});
message
->[self receiveMessage:[XMPPMessage messageFromElement:element]]
presence
->[self receivePresence:[XMPPPresence presenceFromElement:element]]
cmcc_client_login
->[self receiveLogin:[XMPPLogin loginFromElement:element]]
流协商
- (void)startNegotiation
用来开始初始的协商操作
[self sendOpeningNegotiation]
初始化XML流通过socket发送<?xml version=’1.0’?>,
初始化或重启parser,从而将self赋值给parser的代理,从而可以在XMPPStream中实现parser的代理方法。
发送
[multicastDelegate xmppStreamDidStartNegotiation:self]
[asyncSocket readDataWithTimeout:TIMEOUT_XMPP_READ_START tag:TAG_XMPP_READ_START]
- (void)startTLS
根据正确的设置,开始TLS协商,在”安全”章节中有提到。
心跳数据
- (void)setupKeepAliveTimer
设置心跳数据计时器
1 | keepAliveTimer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, xmppQueue); |
代理方法
AsyncSocket
- (void)socket:(GCDAsyncSocket *)sock didConnectToHost:(NSString *)host port:(UInt16)port
[self endConnectTimeout]
取消超时处理[asyncSocket enableBackgroundingOnSocket]
开启socket的后台监听[self startTLS]
[self startNegotiation]
- (void)socket:(GCDAsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag
[parser parseData:data]
[asyncSocket readDataWithTimeout:TIMEOUT_XMPP_READ_STREAM tag:TAG_XMPP_READ_STREAM]
继续读取xml元素
总结:
从中可以看出,XMPPStream是一个设计严谨,线程安全的消息通讯类型,其风格包括:
- 为了保证线程安全:
1
2
3
4
5
6
7dispatch_block_t block = ^{
// 要在xmpp线程中运行的代码
};
if (dispatch_get_specific(xmppQueueTag))
block();
else
dispatch_sync(xmppQueue, block);
- 如何通知带返回值的代理方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18SEL selector = @selector(对应的代理方法)
if (![multicastDelegate hasDelegateThatRespondsToSelector:selector])
{
// None of the delegates implement the method.
// Use a shortcut.
}
else{
GCDMulticastDelegateEnumerator *delegateEnumerator = [multicastDelegate delegateEnumerator];
dispatch_async(queue, ^{@autoreleasepool {
while ([delegateEnumerator getNextDelegate:&del delegateQueue:&dq forSelector:selector])
{
dispatch_async(dq, ^{@autoreleasepool
{
[del 对应的代理方法];
}});
}
}});
}
后记:
本文只是在看源代码过程中对个人简介的一点梳理,由于能力所限,无论排版还是内容理解都有所欠缺,如有错误还请见谅,该文以后还会改进与更新。
下一篇文章为多重代理的原理与实现,敬请期待。