Tigase源码分析2:ConnectionOpenThread 处理服务端socket的线程

一、 ConnectionOpenThread 使用单例模式,他是负责建立服务端SOCKET和 接收连接客户端socket 线程。

在初始化ConnectionManager的时候ConnectionManager.connectThread 属性所引用的服务端SOCKET连接线程ConnectionOpenThread 就被初始化了

ConnectionManager::  
private static ConnectionOpenThread connectThread = ConnectionOpenThread.getInstance();  

ConnectionOpenThread .getInstance()的实现

private Selector  selector= null;  
  
public static ConnectionOpenThread getInstance() {  
  
        if (acceptThread == null) {  
            acceptThread = new ConnectionOpenThread();  
  
            Thread thrd = new Thread(acceptThread);  
  
            thrd.setName("ConnectionOpenThread");  
            thrd.start(); //启动ConnectionOpenThread线程,则this.run()方法将被被执行  
            if (log.isLoggable(Level.FINER)) {  
                log.finer("ConnectionOpenThread started.");  
            }  
        }    // end of if (acceptThread == null)  
  
        return acceptThread;  
    }  
  
  
private ConnectionOpenThread() {  
   .......  
        try {  
            selector = Selector.open();//得到一个选择器,可以去了解下nio api  
        } catch (Exception e) {  
            log.log(Level.SEVERE, "Server I/O error, can't continue my work.", e);  
            stopping = true;  
        }    // end of try-catch  
    }  

ConnectionOpenThread .run()的实现

在该方法中,selector管理的都是服务端SOCKET

public void run() {  
        while (!stopping) {  
            try {  
                selector.select();  
                  //此方法为阻塞方法,当选择器管理channel(也就是向selector注册的channel)                    中发生读、写或异常事件时,select()将会被触发会往下执行  
  
                // Set<SelectionKey> selected_keys = selector.selectedKeys();  
                // for (SelectionKey sk : selected_keys) {  
                               //返回已此通道已准备就绪的键集,已选择始终是键集的一个子集。  
                //begin iterator  
        for (Iterator i = selector.selectedKeys().iterator(); i.hasNext(); ) {  
                    SelectionKey sk = (SelectionKey) i.next();  
  
                    i.remove();  
  
                    SocketChannel sc        = null;  
                    boolean       throttled = false;  
                    int           port_no   = 0;  
  
                if ((sk.readyOps() & SelectionKey.OP_ACCEPT) != 0) {                     //在此是否为被动SOCKET也就是服务端SOCKET,是则接受客户端socket  
            ServerSocketChannel nextReady = (ServerSocketChannel) sk.channel();  
                    port_no = nextReady.socket().getLocalPort();  
                    sc = nextReady.accept();//得到一个客户端SOCKET  
                    ...  
                }    // end of if (sk.readyOps() & SelectionKey.OP_ACCEPT)  
             if ((sk.readyOps() & SelectionKey.OP_CONNECT) != 0) {  
                sk.cancel();  // 从Selector中删除指定的SelectionKey    
                                //所以这个普通的conect socket只会处理一次侦听到的发生事件  
                sc = (SocketChannel) sk.channel();//得到connect SOCKET  
  
                }    // end of if (sk.readyOps() & SelectionKey.OP_ACCEPT)  
                    if (sc != null) { //设置接收到的SOCKET的一些信息  
                        try {  
                sc.configureBlocking(false);//将客户端通道设置为非阻塞  
                            sc.socket().setSoLinger(false, 0);  
                            sc.socket().setReuseAddress(true);  
          
                                              //每个ServerSocketChannel在创建注册到selector                                              时就被绑定了一个ConnectionOpenListener对象,                                               用这个对象来处理该接受到的socket,该注册过程                                               在addAllWaiting()中进行  
            ConnectionOpenListener al = (ConnectionOpenListener) sk.attachment();  
  
            sc.socket().setTrafficClass(al.getTrafficClass());  
                sc.socket().setReceiveBufferSize(al.getReceiveBufferSize());  
               al.accept(sc);//此方法 为建立连接socket的进行后续处理的设定  
            } catch (java.net.SocketException e) {  
  
        ConnectionOpenListener al = (ConnectionOpenListener) sk.attachment();  
                    al.accept(sc);  
                        }  
                    } else {  
                        log.log(Level.INFO,  
                                "Can not obtain socket channel from selection key, throttling activated = {0}, for port: {1}",  
                                new Object[] { throttled, port_no });  
                    }    // end of if (sc != null) else  
                    ++accept_counter;  
                }  
              //end of iterator  
     addAllWaiting();//加载要注册到selector中的ServerSocketChannel或connect socket  
            } catch (IOException e) {  
                log.log(Level.SEVERE, "Server I/O error.", e);  
  
                // stopping = true;  
            }        // end of catch  
                    catch (Exception e) {  
                log.log(Level.SEVERE, "Other service exception.", e);  
  
                // stopping = true;  
            }        // end of catch  
        }  
    }  

在说解到addAllWaiting();加载要注册到selector中的ServerSocketChannel时,先看下源码:

在waiting队列中如果有等待处理的ConnectionOpenListener对象,则创建一个对应的ServerSocketChannel

private void addAllWaiting() throws IOException {  
        ConnectionOpenListener al = null;  
  
        while ((al = waiting.poll()) != null) {  
            try {  
                addPort(al);//绑定相关的端口进行监听  
            } catch (Exception e) {  
                log.log(Level.WARNING, "Error: creating connection for: " + al, e);  
                al.accept(null);  
            }    // end of try-catch  
        }      // end of for ()  
    }  
  
  
  
  
private void addPort(ConnectionOpenListener al) throws IOException {  
    if ((al.getConnectionType() == ConnectionType.connect) && (al.getRemoteAddress() !=  
                null)) {  
     addISA(al.getRemoteAddress(), al);  
    } else if ((al.getIfcs() == null) || (al.getIfcs().length == 0) || al.getIfcs()[0]  
                .equals("ifc") || al.getIfcs()[0].equals("*")) {  
   addISA(new InetSocketAddress(al.getPort()), al);//绑定到InetSocketAddress进行监听服务  
    } else {  
            for (String ifc : al.getIfcs()) {  
                addISA(new InetSocketAddress(ifc, al.getPort()), al);  
            }    // end of for ()  
    }      // end of if (ip == null || ip.equals("")) else  
    }  
  
  
  //addISA(..)这才是真正创建ServerSocketChannel方法,绑定到服务器某一个端口上进行监听服务,  
  //开启了服务端socket  
  private void addISA(InetSocketAddress isa, ConnectionOpenListener al)throws IOException {  
        switch (al.getConnectionType()) {  
        case accept :  
            ...  
    ServerSocketChannel ssc = ServerSocketChannel.open();  
    ssc.socket().setReceiveBufferSize(al.getReceiveBufferSize());  
        ssc.configureBlocking(false);//服务端socket也是非阻塞方法  
    ssc.socket().bind(isa, (int) (port_throttling)); //绑定到相关地址的某一个端口上  
    ssc.register(selector, SelectionKey.OP_ACCEPT, al);//注册服务端socket到selector中,        并且附带绑定一个ConnectionOpenListener对象,该对象为服务端socket接收到新来的socket         进行后续处理。所以selector能监听这些已注册socket的事件发生  
  
            break;  
  
        case connect :  
        ...  
                      //服务器socket之间要进行通讯,则先要连接  
            SocketChannel sc = SocketChannel.open();  
            sc.socket().setReceiveBufferSize(al.getReceiveBufferSize());  
            sc.socket().setTrafficClass(al.getTrafficClass());  
            sc.configureBlocking(false);  
            sc.connect(isa);  
      sc.register(selector, SelectionKey.OP_CONNECT, al);  
          //在此也注册到ConnectionOpenThread.selector中  
  
            break;  
  
        default :  
            ..  
            break;  
        }      
    } 

二、从以上addAllWaiting();分析中看到处理的都是waiting队列里的ConnectionOpenListener对象,那这个ConnectionOpenListener对象是什么时候就会被放到waiting队列的呢,这得从ConnectionManager.initializationCompleted()中说起,在启动章节中分析到MessageRouter.setProperties(map)负责加载了其它的组件最后对每一个组件都执行了初始化完成动作。从而ConnectionManager.initializationCompleted()将会被执行

MessageRouter::
for (ServerComponent comp : components.values()) {  
        comp.initializationCompleted();  
        }

ConnectionManager.initializationCompleted()源码如下

public void initializationCompleted() {  
        if (isInitializationComplete()) {  
            // Do we really need to do this again?  
            return;  
        }  
        super.initializationCompleted();  
        initializationCompleted = true;  
            //加载组件中的服务配置  
        for (Map<String, Object> params : waitingTasks) {  
       //启动一个定时任务,设置准备加入ConnectionOpenThread.waiting的ConnectionListener对象  
         reconnectService(params, connectionDelay);  
        }  
        waitingTasks.clear();  
        if ( null != watchdog ){  
            watchdog.start();  
        }  
    }  

2.1 也许看到上面的waitingTask你在想他是什么样的配置信息呢,其实他就是启动服务器监听端口的配置,系统 默认的有如下几种,bosh,c2s,s2s,ws2s组件的服务器配置,waitingTask装的是每一个组件的端口配置信息

c2s/connections/5222/type[S]=accept  
c2s/connections/5222/socket[S]=plain  
c2s/connections/5222/ifc[s]=*  
c2s/connections/5222/remote-host[S]=localhost  
c2s/connections/5222/connections/tls/required[B]=false  
c2s/connections/5223/type[S]=accept  
c2s/connections/5223/socket[S]=ssl  
c2s/connections/5223/ifc[s]=*  
c2s/connections/5223/remote-host[S]=localhost  
c2s/connections/5223/connections/tls/required[B]=false  
c2s/connections/ports[i]=5222, 5223  
  
bosh/connections/5280/type[S]=accept  
bosh/connections/5280/socket[S]=plain  
bosh/connections/5280/ifc[s]=*  
bosh/connections/5280/remote-host[S]=localhost  
bosh/connections/5280/connections/tls/required[B]=false  
bosh/connections/ports[i]=5280  
  
s2s/connections/5269/type[S]=accept  
s2s/connections/5269/socket[S]=plain  
s2s/connections/5269/ifc[s]=*  
s2s/connections/5269/remote-host[S]=localhost  
s2s/connections/5269/connections/tls/required[B]=false  
s2s/connections/ports[i]=5269  
  
ws2s/connections/5290/type[S]=accept  
ws2s/connections/5290/socket[S]=plain  
ws2s/connections/5290/ifc[s]=*  
ws2s/connections/5290/remote-host[S]=localhost  
ws2s/connections/5290/connections/tls/required[B]=false  
ws2s/connections/ports[i]=5290  

2.1.1 ConnectionManager.setProperties(.)中对上面的配置信息作了解析,放到map里保存着

ConnectionManager.setProperties(Map<String, Object> props){  
...  
for (int i = 0; i < ports.length; i++) {  
addWaitingTask(port_props);  
}  
}  
  
//conn信息加入waitingTasks队列  
protected void addWaitingTask(Map<String, Object> conn) {  
        if (initializationCompleted) {  
            reconnectService(conn, connectionDelay);  
        } else {  
            waitingTasks.add(conn);  
        }  
    } 

2.2 然后回来ConnectionManager对象继续分析ConnectionOpenThread.waiting队列是怎么样增加了数据的。

ConnectionManager::  
  
 //启动一个定时任务,设置准备加入ConnectionOpenThread.waiting的ConnectionListener对象  
private void reconnectService(final Map<String, Object> port_props, long delay) {  
        ...  
        addTimerTask(new tigase.util.TimerTask() {  
            @Override  
            public void run() {  
          startService(port_props);  
            }  
        }, delay);  
    }  
  
  
   //ConnectionOpenThread是单例模式,在此该对象只被初始化一次  
    private static ConnectionOpenThread connectThread = ConnectionOpenThread.getInstance();  
  
   private void startService(Map<String, Object> port_props) {  
        if (port_props == null) {  
            throw new NullPointerException("port_props cannot be null.");  
        }  
               //根据组件的配置信息生成一个相关的ConnectionListener对象  
        ConnectionListenerImpl cli = new ConnectionListenerImpl(port_props);  
  
        if (cli.getConnectionType() == ConnectionType.accept) {  
            pending_open.add(cli);  
        }  
                //将ConnectionListener对象加入ConnectionOpenThread.waiting队列中  
        connectThread.addConnectionOpenListener(cli);  
    }

最后在ConnectionOpenThread中可以看到ConnectionListener是怎么样加入waiting队列的了

ConnectionOpenThread::  
  public void addConnectionOpenListener(ConnectionOpenListener al) {  
    waiting.offer(al);  
    selector.wakeup();  
}

作者:被禁言文章转简书了
链接:https://juejin.cn/post/6844904193375862798
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!
0 条回复 A 作者 M 管理员
    所有的伟大,都源于一个勇敢的开始!
欢迎您,新朋友,感谢参与互动!欢迎您 {{author}},您在本站有{{commentsCount}}条评论