XMPPStream是XMPPFramework的核心,所有的消息的发送和接收都是在XMPPStream中进行的。借助CocoaAsyncSocket实现socket通信,申明了GCDAsyncSocketDelegate委托。

私有成员

  1. xmppQueue:线程队列
  2. xmppQueueTag: 线程队列的标志符,用来找回标注过的线程
  3. 线程队列:willSendIqQueue、willSendMessageQueue、willSendPresenceQueue、willSendLoginQueue、willReceiveIqQueue、willReceiveMessageQueue、willReceivePresenceQueue、willReceiveLogin、didReceiveIqQueue
  4. connectTimer:监视某些类型事件的对象
  5. multicastDelegate:多重委托
  6. state:枚举XMPPStreamState,表示连接状态
  7. asyncSocket:GCDAsyncSocket的实例,负责socket的连接
  8. parser:XMPPParser实例,负责处理接收到的流。
  9. parserError
  10. flags
    • kP2PInitiator:P2P模式
    • kIsSecure:通过SSL/TLS保护连接
    • kIsAuthenticated:认证成功
    • kDidStartNegotiation:Negotiation已经至少开始了一次
  11. config
    • kP2PMode:XMPPStream以P2P模式初始化
    • kResetByteCountPerConnection:字节计数每次连接都会清空
    • kEnableBackgroundingOnSocket:socket上设置为VOIP标志
  12. authenticationDate
  13. myJID_setByClient:客户端设置的用户JID
  14. myJID_setByServer:服务器设置的用户JID
  15. remoteJID:
  16. rootElement:接收到消息的根元素,包含服务器的配置信息。
  17. lastSendReceiveTime:最近的socket通信时间
  18. keepAliveData:心跳数据,可通过(void)setKeepAliveWhitespaceCharacter:(char)keepAliveChar来改变
  19. registeredModules
  20. autoDelegateDict
  21. srvResolver:XMPPSRVResolver
  22. srvResults:
  23. srvResultsIndex
  24. receipts
  25. userTag

属性

在对属性赋值和取值时均要判断当前进程是否处在xmppQueue队列下:

  1. hostName(可选)

    建立TCP连接的服务器主机名,如果没有设置,则进行SRV查找

  2. hostPort

    xmpp服务器的端口,默认为5222

  3. autoStartTLS

    是否开启安全传输层协议(TLS),默认为NO

  4. myJID

    用户的JID,当连接时,JID的域名被用来确定xmpp虚拟主机;

    在注册和认证时可以利用用户名生成的JID进行通信。

  5. remoteJID(只用于P2P)

  6. keepAliveInterval:传送心跳数据的无连接时间间隔

    xmpp支持传送心跳数据,传送的空白字符会被xmpp协议忽略,该数据只有在没有其他数据传送或接收时发送,当其他数据最后一次发送或接收时,开始计时,并在达到间隔时间时传送数据。

  7. keepAliveWhitespaceCharacter

    可以设定心跳数据的字符串,只能是空格、Tab、回车

  8. myPresence

    myJID最近一次发送的presence元素

  9. numberOfBytesSent/numberOfBytesReceived

    通过xmppStream接收或发送的总字节数

  10. resetByteCountPerConnection

    是否在新建连接前清空字节统计

  11. tag

    可以绑定用户的自定义信息

初始化

  1. - (id)init;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    xmppQueueTag = &xmppQueueTag;
    xmppQueue = dispatch_queue_create("xmpp", NULL);
    // 向xmppQueue队列设置标识xmppQueueTag,方便进行当前队列的判断
    dispatch_queue_set_specific(xmppQueue, xmppQueueTag, xmppQueueTag, NULL);

    willSendIqQueue = dispatch_queue_create("xmpp.willSendIq", NULL);
    willSendMessageQueue = dispatch_queue_create("xmpp.willSendMessage", NULL);
    willSendPresenceQueue = dispatch_queue_create("xmpp.willSendPresence", NULL);
    willSendLoginQueue = dispatch_queue_create("xmpp.willSendLogin", NULL);

    willReceiveIqQueue = dispatch_queue_create("xmpp.willReceiveIq", NULL);
    willReceiveMessageQueue = dispatch_queue_create("xmpp.willReceiveMessage", NULL);
    willReceivePresenceQueue = dispatch_queue_create("xmpp.willReceivePresence", NULL);
    willReceiveLogin = dispatch_queue_create("xmpp.willReceiveLogin", NULL);

    didReceiveIqQueue = dispatch_queue_create("xmpp.didReceiveIq", NULL);
    // 初始化多重委托
    multicastDelegate = (GCDMulticastDelegate <XMPPStreamDelegate> *)[[GCDMulticastDelegate alloc] init];

    state = STATE_XMPP_DISCONNECTED;

    flags = 0;
    config = 0;

    numberOfBytesSent = 0;
    numberOfBytesReceived = 0;

    hostPort = 5222;
    // 2分钟的心跳数据等待发送间隔
    keepAliveInterval = DEFAULT_KEEPALIVE_INTERVAL;
    keepAliveData = [@" " dataUsingEncoding:NSUTF8StringEncoding];

    registeredModules = [[NSMutableArray alloc] init];
    autoDelegateDict = [[NSMutableDictionary alloc] init];

    receipts = [[NSMutableArray alloc] init];

    asyncSocket初始化

  2. 多重委托相关

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    - (void)addDelegate:(id)delegate delegateQueue:(dispatch_queue_t)delegateQueue
    {
    // Asynchronous operation (if outside xmppQueue)

    dispatch_block_t block = ^{
    [multicastDelegate addDelegate:delegate delegateQueue:delegateQueue];
    };

    if (dispatch_get_specific(xmppQueueTag))// 当前队列是xmppQueue
    block();
    else// 当前队列不是xmppQueue
    dispatch_async(xmppQueue, block);
    }

连接与断开

  1. - (BOOL)connectWithTimeout:(NSTimeInterval)timeout error:(NSError **)errPtr

    通过设定好的hostPort连接hostName:
    [self connectToHost:hostName onPort:hostPort withTimeout:XMPPStreamTimeoutNone error:&connectErr]

    通知代理:[multicastDelegate xmppStreamWillConnect:self];

    无hostName,则利用用户JID的域名进行服务定位(SRV)资源查找;

    有hostName,则与该主机建立TCP连接

    [asyncSocket connectToHost:host onPort:port error:errPtr]

    设置连接超时的操作:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    - (void)startConnectTimeout:(NSTimeInterval)timeout
    {
    XMPPLogTrace();

    if (timeout >= 0.0 && !connectTimer)
    {
    // 创建计时器型dispatch source
    connectTimer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, xmppQueue);
    // 设置到时后的block:进行连接超时操作
    dispatch_source_set_event_handler(connectTimer, ^{ @autoreleasepool {
    // 连接超时操作
    [self doConnectTimeout];
    }});

    #if !OS_OBJECT_USE_OBJC
    dispatch_source_t theConnectTimer = connectTimer;
    dispatch_source_set_cancel_handler(connectTimer, ^{
    XMPPLogVerbose(@"%@: dispatch_release(connectTimer)", THIS_FILE);
    dispatch_release(theConnectTimer);
    });
    #endif
    // 初始化计时器
    dispatch_time_t tt = dispatch_time(DISPATCH_TIME_NOW, (timeout * NSEC_PER_SEC));
    dispatch_source_set_timer(connectTimer, tt, DISPATCH_TIME_FOREVER, 0);
    // 启动connectTimer
    dispatch_resume(connectTimer);
    }
    }

    如果连接成功,将触发(void)socket:(GCDAsyncSocket *)sock didConnectToHost:(NSString *)host port:(UInt16)port代理中连接超时将会关闭。

  2. - (BOOL)oldSchoolSecureConnectWithTimeout:(NSTimeInterval)timeout error:(NSError **)errPtr

    提供给那些需要提早保护连接的老旧服务器(这类服务器的端口一般为5223)。

  3. - (BOOL)connectP2PWithSocket:(GCDAsyncSocket *)acceptedSocket error:(NSError **)errPtr

    当XMPPStream用initP2P方法初始化后,可以使用给定的socket开始P2P连接。

  4. - (void)disconnect

    关闭TCP socket连接,由于是同步的,所以会立即发生

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    // 通知xmppStreamDelegate委托,用来区分断开的原因是自愿还是连接错误
    [multicastDelegate xmppStreamWasToldToDisconnect:self];
    // 有主机名
    if (state == STATE_XMPP_RESOLVING_SRV)
    {// 停止XMPPSRVResolver
    [srvResolver stop];
    srvResolver = nil;
    state = STATE_XMPP_DISCONNECTED;
    // 通知xmppStreamDelegate委托,
    [multicastDelegate xmppStreamDidDisconnect:self withError:nil];
    }
    else
    {
    // 断开socket连接
    [asyncSocket disconnect];
    // Everthing will be handled in socketDidDisconnect:withError:
    }
  5. - (void)disconnectWithError:(NSError*)error

    由于错误而发生的断开,与4相同,不同之处在于通知delegate错误信息。

安全

  1. secure

    - (void)socket:(GCDAsyncSocket *)sock didConnectToHost:(NSString *)host port:(UInt16)portsocket连接成功后触发

    [self startTLS]

    开始之前通知所有的代理(如果包含(xmppStream:willSecureWithSettings:)方法)

    [self continueStartTLS:settings]

    [asyncSocket startTLS:settings];socket开始TLS协商会话

    secure设为YES

    开始协商

    asyncSocket开始读取数据

  2. - (BOOL)supportsStartTLS()

    判断服务器是否可以通过SSL/TLS对连接进行保护

    通过读取rootElement的子元素来实现。

  3. - (BOOL)secureConnection:(NSError **)errPtr

    企图通过SSL/TLS来保护连接,该方法是同步的,errPtr用于存放错误信息,

    [self sendStartTLSRequest]

    服务器收到请求后发送响应

    - (void)xmppParser:(XMPPParser *)sender didReadElement:(NSXMLElement *)element接收到带proceed元素

    [self handleStartTLSResponse:element]

    [self startTLS]

注册

  1. - (BOOL)supportsInBandRegistration

    查看服务器是否支持带内(in-Band)注册

  2. - (BOOL)registerWithElements:(NSArray *)elements error:(NSError **)errPtr;

    通过给定的元素进行新用户的注册

    建立typeset的IQ元素

    建立的NSXMLElement,将元素数组的元素全部加入进去

    放入IQ

    对xmlString通过压缩,编码,生成 outgoingData

    [asyncSocket writeData:outgoingData withTimeout:TIMEOUT_XMPP_WRITE tag:TAG_XMPP_WRITE_STREAM]

  3. - (BOOL)registerWithPassword:(NSString *)password error:(NSError **)errPtr;

    通过给定的用户名和密码建立一个用户账号

    usernamepassword两个元素添加到elements中并调用2的方法进行注册。

认证

- (BOOL)authenticateWithPassword:(NSString *)inPassword error:(NSError **)errPtr

选择一个最好的认证方法,共四种

[self authenticate:someAuth error:&err]

state = STATE_XMPP_AUTH

[inAuth start:&err]

- (void)sendAuthElement:(NSXMLElement *)element

[asyncSocket writeData:outgoingData withTimeout:TIMEOUT_XMPP_WRITE tag:TAG_XMPP_WRITE_STREAM]

服务器信息

服务器相关信息来自rootElement,每次收到xml消息通过parser进行解析时,都要从头结点开始,就会触发相应的代理。

发送消息

  1. - (void)sendElement:(NSXMLElement *)element

    一般情况下iq、message、presence、login都是通过该通用方法进行发送;

    如果element属于以上任何一个类型则使用对应的发送方法;如果不是则根据name通过object_setClass转换为对应类型;如果都不是则将element以压缩字符串的形式直接发送。

  2. - (void)sendIQ:(XMPPIQ *)iq withTag:(long)tag

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    // 通知一切感兴趣的代理,为了能够以线程安全的方式改变元素(element),需要连续进行操作

    // 获取multicastDelegate的枚举,方便遍历
    GCDMulticastDelegateEnumerator *delegateEnumerator = [multicastDelegate delegateEnumerator];

    dispatch_async(willSendIqQueue, ^{ @autoreleasepool {

    // Allow delegates to modify and/or filter outgoing element

    __block XMPPIQ *modifiedIQ = iq;

    id del;
    dispatch_queue_t dq;

    while (modifiedIQ && [delegateEnumerator getNextDelegate:&del delegateQueue:&dq forSelector:selector])
    {
    #if DEBUG
    {
    // 确定代理方法(xmppStream:willSendIQ:)的返回值是否是(XMPPIQ *)
    char methodReturnType[32];
    Method method = class_getInstanceMethod([del class], selector);
    method_getReturnType(method, methodReturnType, sizeof(methodReturnType));

    if (strcmp(methodReturnType, @encode(XMPPIQ*)) != 0)
    {
    NSAssert(NO, @"Method xmppStream:willSendIQ: is no longer void (see XMPPStream.h). "
    @"Culprit = %@", NSStringFromClass([del class]));
    }
    }

    #endif
    dispatch_sync(dq, ^{ @autoreleasepool {
    // 提供通过外部delegate改变IQ的机会
    modifiedIQ = [del xmppStream:self willSendIQ:modifiedIQ];

    }});
    }

    if (modifiedIQ)
    {
    dispatch_async(xmppQueue, ^{ @autoreleasepool {

    if (state == STATE_XMPP_CONNECTED) {
    // 继续IQ的发送:压缩转码后通过[asyncSocket writeData:...]方法发送
    [self continueSendIQ:modifiedIQ withTag:tag];
    }
    }});
    }
    }});
  3. - (void)sendLogin:(XMPPLogin *)login withTag:(long)tag 同2

  4. - (void)sendMessage:(XMPPMessage *)message withTag:(long)tag 同2

  5. - (void)sendPresence:(XMPPPresence *)presence withTag:(long)tag 同2

  6. - (void)injectElement:(NSXMLElement *)element

    该方法允许你将一个元素插入到流中,就好像是从socket接收到的消息一样,这创造了一些有趣的可能性。

接收消息

接收消息的过程:

AsyncSocket的代理方法- (void)socket:(GCDAsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag接收到数据

[parser parseData:data]

触发parser的代理方法

- (void)xmppParser:(XMPPParser *)sender didReadRoot:(NSXMLElement *)root拿到rootElement

- (void)xmppParser:(XMPPParser *)sender didReadElement:(NSXMLElement *)element解析element,并根据state的状态分发下去:

STATE_XMPP_NEGOTIATING->[self handleStreamFeatures]

该方法用来分析接收到的服务器的流特征,判断是否已经与服务器建立了连接

根据features检查是否需要TLS,需要则[self sendStartTLSRequest]

根据features检查是否支持资源绑定,需要则将资源绑定请求加入到iq中发送到服务器

state = STATE_XMPP_CONNECTED

STATE_XMPP_STARTTLS_1->[self handleStartTLSResponse:element]

如果返回的是”proceed”的元素,则成功,开始TLS协商;如果不成功则断开连接。

STATE_XMPP_REGISTERING->[self handleRegistration:element]

如果返回的元素的type不是”error”,则注册成功。

STATE_XMPP_AUTH->[self handleAuth:element]

如果认证成功,则再次开始协商。

STATE_XMPP_BINDING)->[self handleBinding:element]

如果能够拿到JID,说明资源绑定成功,检查是否需要建立对话,需要则开始IM会话,结束。

没有JID则尝试改变resource

通知代理,利用代理方法得到更改后的resource

[self continueHandleBinding:alternativeResource]

alternativeResource则添加到iq中发送

没有则让服务器来分配resource(通过发送type为”set”的iq)

STATE_XMPP_START_SESSION->[self handleStartSessionResponse:element]

type为”result”表示认证成功,回到STATE_XMPP_CONNECTED状态

iq->[self receiveIQ:[XMPPIQ iqFromElement:element]]

通知所有的代理,可在解析iq前改变或过滤iq

[self continueReceiveIQ:modifiedIQ]

如果iq需要回复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
SEL selector = @selector(xmppStream:didReceiveIQ:);
// 用信号量快速控制并发
dispatch_semaphore_t delSemaphore = dispatch_semaphore_create(0);
dispatch_group_t delGroup = dispatch_group_create();

while ([delegateEnumerator getNextDelegate:&del delegateQueue:&dq forSelector:selector])
{
dispatch_group_async(delGroup, dq, ^{ @autoreleasepool {

if ([del xmppStream:self didReceiveIQ:iq])
{
// 有代理方法对iq进行占用则发送一个信号
dispatch_semaphore_signal(delSemaphore);
}
}});
}

dispatch_async(didReceiveIqQueue, ^{ @autoreleasepool {

dispatch_group_wait(delGroup, DISPATCH_TIME_FOREVER);

// Did any of the delegates handle the IQ? (handle == will response)

BOOL handled = (dispatch_semaphore_wait(delSemaphore, DISPATCH_TIME_NOW) == 0);

// 客户端收到<type>为"get"或"set"的IQ请求时必须以<type>为"result"或"error"的iq来回复,并且回复的iq必须保留收到的iq的"id"

// 如果没有代理来处理这个iq则返回带<errro>的iq
if (!handled)
{
// 返回给服务器的错误信息:
//
// <iq to="jid" type="error" id="id">
// <query xmlns="ns"/>
// <error type="cancel" code="501">
// <feature-not-implemented xmlns="urn:ietf:params:xml:ns:xmpp-stanzas"/>
// </error>
// </iq>

NSXMLElement *reason = [NSXMLElement elementWithName:@"feature-not-implemented"
xmlns:@"urn:ietf:params:xml:ns:xmpp-stanzas"];

NSXMLElement *error = [NSXMLElement elementWithName:@"error"];
[error addAttributeWithName:@"type" stringValue:@"cancel"];
[error addAttributeWithName:@"code" stringValue:@"501"];
[error addChild:reason];

XMPPIQ *iqResponse = [XMPPIQ iqWithType:@"error"
to:[iq from]
elementID:[iq elementID]
child:error];
// 返回的iq中添加iq的子元素
NSXMLElement *iqChild = [iq childElement];
if (iqChild)
{
NSXMLNode *iqChildCopy = [iqChild copy];
[iqResponse insertChild:iqChildCopy atIndex:0];
}

// Purposefully go through the sendElement: method
// so that it gets dispatched onto the xmppQueue,
// and so that modules may get notified of the outgoing error message.

[self sendElement:iqResponse];
}

#if !OS_OBJECT_USE_OBJC
dispatch_release(delSemaphore);
dispatch_release(delGroup);
#endif

}});

message->[self receiveMessage:[XMPPMessage messageFromElement:element]]

presence->[self receivePresence:[XMPPPresence presenceFromElement:element]]

cmcc_client_login->[self receiveLogin:[XMPPLogin loginFromElement:element]]

流协商

  1. - (void)startNegotiation

    用来开始初始的协商操作

    [self sendOpeningNegotiation]初始化XML流

    通过socket发送<?xml version=’1.0’?>,

    初始化或重启parser,从而将self赋值给parser的代理,从而可以在XMPPStream中实现parser的代理方法。

    发送

    [multicastDelegate xmppStreamDidStartNegotiation:self]

    [asyncSocket readDataWithTimeout:TIMEOUT_XMPP_READ_START tag:TAG_XMPP_READ_START]

  2. - (void)startTLS

    根据正确的设置,开始TLS协商,在”安全”章节中有提到。

心跳数据

- (void)setupKeepAliveTimer设置心跳数据计时器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
keepAliveTimer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, xmppQueue);

dispatch_source_set_event_handler(keepAliveTimer, ^{ @autoreleasepool {
// 发送心跳数据
NSTimeInterval now = [NSDate timeIntervalSinceReferenceDate];
NSTimeInterval elapsed = (now - lastSendReceiveTime);
// 如果长达2分钟没有通信发生则发送心跳数据
if (elapsed < 0 || elapsed >= keepAliveInterval)
{
numberOfBytesSent += [keepAliveData length];

[asyncSocket writeData:keepAliveData
withTimeout:TIMEOUT_XMPP_WRITE
tag:TAG_XMPP_WRITE_STREAM];

// 强制更新 lastSendReceiveTime,以防TCP socket通信由于传送巨大而缓慢
lastSendReceiveTime = [NSDate timeIntervalSinceReferenceDate];
}

}});

uint64_t interval = ((keepAliveInterval / 4.0) * NSEC_PER_SEC);
// 设置计时器的开始时间-延时30s
dispatch_time_t tt = dispatch_time(DISPATCH_TIME_NOW, interval);
// 以30s为间隔发送心跳数据
dispatch_source_set_timer(keepAliveTimer, tt, interval, 1.0);
dispatch_resume(keepAliveTimer);

代理方法

  1. AsyncSocket

    • - (void)socket:(GCDAsyncSocket *)sock didConnectToHost:(NSString *)host port:(UInt16)port

      [self endConnectTimeout]取消超时处理

      [asyncSocket enableBackgroundingOnSocket]开启socket的后台监听

      [self startTLS]

      [self startNegotiation]

    • - (void)socket:(GCDAsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag

      [parser parseData:data]

      [asyncSocket readDataWithTimeout:TIMEOUT_XMPP_READ_STREAM tag:TAG_XMPP_READ_STREAM]继续读取xml元素

总结:

从中可以看出,XMPPStream是一个设计严谨,线程安全的消息通讯类型,其风格包括:

  1. 为了保证线程安全:
    1
    2
    3
    4
    5
    6
    7
    dispatch_block_t block = ^{
    // 要在xmpp线程中运行的代码
    };
    if (dispatch_get_specific(xmppQueueTag))
    block();
    else
    dispatch_sync(xmppQueue, block);

  1. 如何通知带返回值的代理方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    SEL selector = @selector(对应的代理方法)
    if (![multicastDelegate hasDelegateThatRespondsToSelector:selector])
    {
    // None of the delegates implement the method.
    // Use a shortcut.
    }
    else{
    GCDMulticastDelegateEnumerator *delegateEnumerator = [multicastDelegate delegateEnumerator];
    dispatch_async(queue, ^{@autoreleasepool {
    while ([delegateEnumerator getNextDelegate:&del delegateQueue:&dq forSelector:selector])
    {
    dispatch_async(dq, ^{@autoreleasepool
    {
    [del 对应的代理方法];
    }});
    }
    }});
    }

后记:

本文只是在看源代码过程中对个人简介的一点梳理,由于能力所限,无论排版还是内容理解都有所欠缺,如有错误还请见谅,该文以后还会改进与更新。
下一篇文章为多重代理的原理与实现,敬请期待。