`
yunhaifeiwu
  • 浏览: 161144 次
  • 性别: Icon_minigender_1
  • 来自: 宁波
社区版块
存档分类
最新评论

java 之异步套接字编程实例(AIO)

    博客分类:
  • java
阅读更多
本文重点以“淘宝邓悟”中学习理解整理而来。更改了客户端为swing应用程序,并增加了服务端与客户端之间相互向对方发信息的功能。为了便 于阅读,用自已观察总结性的理解,进行了啰嗦的注解。
http://blog.sina.com.cn/s/blog_71ad0d3f01019y1c.html


   异步socket编程,一样分成客户端与服务端。
   AsynchronousServerSocketChannel  -------服务端socket;
   AsynchronousSocketChannel------客户端socket.
   AsynchronousChannelGroup-----socket管理器。服务端socket与客户端socket都由它生成。它的管理需要线程池。它的工作方式之一是把必要的资源交给客户端与服务端的处理器,并调用该处理器进行工作。
   ExecutorService-----线程池。是socket管理器需要的东西。
   CompletionHandler-------处理器。它有两个泛型参数A、V。这是个回调函数所用的标准接口。Socket管理器 会把相关实参放到这个A,V的参数中,让用户处理后,然后调用这个处理器的方法进行执行。如果用户有一个方法中的参数的类型是该处理器,那么在其他地方再次调用这个方法,尽管方法不同,但是传给该方法的CompletionHandler的处理器的A、V参数 却是不相同的,不仅是值不同,类型也有可能完全不同。这是学习中的难点。

   练习中总结:除了服务端与客户端初始化时差别很大,但是在各自与对方通信中,所使用的类都是 客户端socket类。

  下面的例子,展示了异步方式的服务端与客户端相互通信的例子。客户端是swing程序。
   调试使用方法:先运行服务器,再运行客服端。在客户端上的文本框中输入字符点“点我”按钮,立即通过0号套接字向服务器发送信息。在调试台中,可看到服务器与客户端的通信情况。
下面是我测试时服务端的信息
引用
debug:
AioAcceptHandler.completed called
有客户端连接:/127.0.0.1:1606
AioAcceptHandler.completed called
有客户端连接:/127.0.0.1:1607
收到/127.0.0.1:1606的消息:0
收到/127.0.0.1:1607的消息:1
收到/127.0.0.1:1606的消息:sd
收到/127.0.0.1:1606的消息:1111111111

  
下面是我测试时客户端的信息
引用

debug:
收到localhost/127.0.0.1:9008的消息:服务器回应,你输出的是:1
收到localhost/127.0.0.1:9008的消息:服务器回应,你输出的是:0
收到localhost/127.0.0.1:9008的消息:服务器回应,你输出的是:sd
收到localhost/127.0.0.1:9008的消息:服务器回应,你输出的是:1111111111


例子说明:
  服务端与客户端各有四个类。
服务端:
  AioTcpServer---服务器主类。它与客户端通信由回调连接处理器AioAcceptHandler完成。
  AioAcceptHandler-----连接处理器。处理服务器与客户端的通信。具体的读操作,回调AioReadHandler处理器
  AioReadHandler-----读处理器。处理连接处理器交给的 读任务。即具体的读客户端的信息由它完成。如果服务器要回应该客户端,需要回调AioWriteHandler写处理器。
  AioWriteHandler----写处理器。完成读处理器交给的任务,即向客户端回应消息。

客户端:
   AioTcpClient-------客户端主类。它与服务器的通讯由回调AioConnectHandler连接处理器完成。具体写信息由客户端socket回调AioSendHandler完成。具体读信息由客户端socket回调AioReadHandler完成。
   AioConnectHandler-------连接处理器。完成与服务器的通讯。具体的读信息由客户端socket回调读处理器AioReadHandler完成,即完成读取服务器的信息。
   AioReadHandler-------读处理器,完成读取服务端信息。
   AioSendHandler----写处理器,向服务端发送信息。

1.9.3.1 服务端
AioTcpServer.java
import java.net.InetSocketAddress; 
import java.nio.channels.AsynchronousChannelGroup; 
import java.nio.channels.AsynchronousServerSocketChannel; 
import java.nio.channels.AsynchronousSocketChannel; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 
/**
 *     AIO异步socket通讯,分成 用于服务端的socekt与用于客户端的socket,当然这两者都是<br>
 * 异步的。两者使用时,都用到了同样的异步通道管理器,异步通道管理器通过线程池管理。<br>
 *    异步通道管理器,可以生成服务端socket与客户端socket。 * 
 *    使用服务端socket或客户端socket都需要一个操作处理器(CompletionHandler),<br>
 *当有信息时异步通道管理器会把 相关信息传递给操作作处理器。 * 
 *    操作处理器的方法是同一方法,但方法的参数是泛型,随着调用它的方法不同而改变。<br> * 
 *    在AIO中,CompletionHandler这个操作处理器方法,是个泛型接口,当回调函数用。<br>
 * 使用CompletionHandler的方法,约定是把该方法前一个参数实例传递给A型参数<br>
 * (attachment),CompletionHandler的另一个参数将是存有该方法的使用情况的实例。
 * 
 */
public class AioTcpServer implements Runnable { 
    private AsynchronousChannelGroup asyncChannelGroup;  
    private AsynchronousServerSocketChannel listener;  
  
    public AioTcpServer(int port) throws Exception { 
        //创建线程池
        ExecutorService executor = Executors.newFixedThreadPool(20); 
        //异步通道管理器
        asyncChannelGroup = AsynchronousChannelGroup.withThreadPool(executor); 
        //创建 用在服务端的异步Socket.以下简称服务器socket。
        //异步通道管理器,会把服务端所用到的相关参数
        listener = AsynchronousServerSocketChannel.open(asyncChannelGroup).
                bind(new InetSocketAddress(port)); 
    } 
 
    public void run() { 
        try { 

            AioAcceptHandler acceptHandler = new AioAcceptHandler();
            //为服务端socket指定接收操作对象.accept原型是:
            //accept(A attachment, CompletionHandler<AsynchronousSocketChannel,
            // ? super A> handler)
            //也就是这里的CompletionHandler的A型参数是实际调用accept方法的第一个参数
            //即是listener。另一个参数V,就是原型中的客户端socket
            listener.accept(listener, new AioAcceptHandler());  
            Thread.sleep(400000); 
        } catch (Exception e) { 
            e.printStackTrace(); 
        } finally { 
            System.out.println("finished server");
        } 
    } 
 
    public static void main(String... args) throws Exception { 
        AioTcpServer server = new AioTcpServer(9008); 
        new Thread(server).start(); 
    } 
}

  
AioAcceptHandler.java
import client.AioSendHandler;
import java.io.IOException; 
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer; 
import java.nio.channels.AsynchronousServerSocketChannel; 
import java.nio.channels.AsynchronousSocketChannel; 
import java.nio.channels.CompletionHandler; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.Future; 
 
//这里的参数受实际调用它的函数决定。本例是服务端socket.accetp调用决定
public class AioAcceptHandler implements CompletionHandler
        <AsynchronousSocketChannel, AsynchronousServerSocketChannel > 
{ 
    private  AsynchronousSocketChannel socket;
    @Override
    public void completed(AsynchronousSocketChannel socket, 
        AsynchronousServerSocketChannel attachment) 
    { //注意第一个是客户端socket,第二个是服户端socket
        try { 
            System.out.println("AioAcceptHandler.completed called");
            attachment.accept(attachment, this); 
            System.out.println("有客户端连接:" +
                socket.getRemoteAddress().toString()
            ); 
            startRead(socket);    
        } catch (IOException e) { 
            e.printStackTrace(); 
        } 
    } 
 
    @Override
    public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) 
    { 
        exc.printStackTrace(); 
    } 
    
    //不是CompletionHandler的方法
    public void startRead(AsynchronousSocketChannel socket) { 
        ByteBuffer clientBuffer = ByteBuffer.allocate(1024); 
        //read的原型是
        //read(ByteBuffer dst, A attachment,
        //    CompletionHandler<Integer,? super A> handler) 
        //即它的操作处理器,的A型,是实际调用read的第二个参数,即clientBuffer。
        // V型是存有read的连接情况的参数
        AioReadHandler rd=new AioReadHandler(socket);
        socket.read(clientBuffer, clientBuffer, rd); 
        try {             
        } catch (Exception e) { 
            e.printStackTrace(); 
        } 
    } 
 
}


AioReadHandler.java
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer; 
import java.nio.channels.AsynchronousSocketChannel; 
import java.nio.channels.CompletionHandler; 
import java.nio.charset.CharacterCodingException; 
import java.nio.charset.Charset; 
import java.nio.charset.CharsetDecoder; 
import java.util.logging.Level;
import java.util.logging.Logger;

//这里的参数型号,受调用它的函数决定。这里是受客户端socket.read调用
public class AioReadHandler implements CompletionHandler
        <Integer,ByteBuffer>
{ 
    private AsynchronousSocketChannel socket; 
    public  String msg;
 
    public AioReadHandler(AsynchronousSocketChannel socket) { 
        this.socket = socket; 
    }     
    private CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();  
    
    @Override
    public void completed(Integer i, ByteBuffer buf) { 
        if (i > 0) { 
            buf.flip(); 
            try { 
                msg=decoder.decode(buf).toString();
                System.out.println("收到" + 
                        socket.getRemoteAddress().toString() + "的消息:" + msg
                ); 
                buf.compact(); 
            } catch (CharacterCodingException e) { 
                e.printStackTrace(); 
            } catch (IOException e) { 
                e.printStackTrace(); 
            } 
            socket.read(buf, buf, this); 
            try {
                write(socket);
            } catch (UnsupportedEncodingException ex) {
                Logger.getLogger(AioReadHandler.class.getName()).log(Level.SEVERE, null, ex);
            }
        } else if (i == -1) { 
            try { 
                System.out.println("客户端断线:" + socket.getRemoteAddress().toString()); 
                buf = null; 
            } catch (IOException e) { 
                e.printStackTrace(); 
            } 
        } 
    } 

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
         System.out.println("cancelled"); 
    }
 
     //不是CompletionHandler的方法
    public void write(AsynchronousSocketChannel socket) throws UnsupportedEncodingException{
        String sendString="服务器回应,你输出的是:"+msg;
        ByteBuffer clientBuffer=ByteBuffer.wrap(sendString.getBytes("UTF-8"));        
        socket.write(clientBuffer, clientBuffer, new AioWriteHandler(socket));
    }
}


AioWriteHandler.java

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class AioWriteHandler implements CompletionHandler
    <Integer,ByteBuffer>
{ 
    private AsynchronousSocketChannel socket; 
 
    public AioWriteHandler(AsynchronousSocketChannel socket) { 
        this.socket = socket; 
    } 

    @Override
    public void completed(Integer i, ByteBuffer buf) {
        if (i > 0) { 
            socket.write(buf, buf, this); 
        } else if (i == -1) { 
            try { 
                System.out.println("对端断线:" + socket.getRemoteAddress().toString()); 
                buf = null; 
            } catch (IOException e) { 
                e.printStackTrace(); 
            } 
        } 
        
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        System.out.println("cancelled"); 
    }
    
}


1.9.3.2 客户端
AioTcpClient.java

import java.awt.BorderLayout;
import java.awt.FlowLayout;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.net.StandardSocketOptions;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JPanel;
import javax.swing.JTextField;

public class AioTcpClient {
    public static JTextField jt=new JTextField();
    public static ConcurrentHashMap<String,AsynchronousSocketChannel>
            sockets =new ConcurrentHashMap<>();
    static AioTcpClient me;
    
    private AsynchronousChannelGroup asyncChannelGroup;
    public AioTcpClient() throws Exception {
        //创建线程池
        ExecutorService executor = Executors.newFixedThreadPool(20);
        //创建异眇通道管理器   
        asyncChannelGroup = AsynchronousChannelGroup.withThreadPool(executor);
    }
    
    private final CharsetDecoder decoder = Charset.forName("GBK").newDecoder();
    
    public void start(final String ip, final int port) throws Exception {
        // 启动20000个并发连接,使用20个线程的池子
        for (int i = 0; i < 2; i++) {
            try {
                //客户端socket.当然它是异步方式的。
                AsynchronousSocketChannel connector = null;
                if (connector == null || !connector.isOpen()) {
                    //从异步通道管理器处得到客户端socket
                    connector = AsynchronousSocketChannel.open(asyncChannelGroup);
                    sockets.putIfAbsent(String.valueOf(i), connector);
                    
                    connector.setOption(StandardSocketOptions.TCP_NODELAY,
                        true);
                    connector.setOption(StandardSocketOptions.SO_REUSEADDR,
                        true);
                    connector.setOption(StandardSocketOptions.SO_KEEPALIVE,
                        true);
                    //开始连接服务器。这里的的connect原型是
                    // connect(SocketAddress remote, A attachment, 
                    //  CompletionHandler<Void,? super A> handler)
                    // 也就是它的CompletionHandler 的A型参数是由这里的调用方法
                    //的第二个参数决定。即是connector。客户端连接器。
                    // V型为null
                    connector.connect(new InetSocketAddress(ip, port),
                            connector, new AioConnectHandler(i));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    public void work() throws Exception{
        AioTcpClient client = new AioTcpClient();
        client.start("localhost", 9008);
    }

    public void send() throws UnsupportedEncodingException{
        AsynchronousSocketChannel socket=sockets.get("0");
        String sendString=jt.getText();
        ByteBuffer clientBuffer=ByteBuffer.wrap(sendString.getBytes("UTF-8"));        
        socket.write(clientBuffer, clientBuffer, new AioSendHandler(socket));
    }
    public   void createPanel() {
        me=this;
        JFrame f = new JFrame("Wallpaper");
        f.getContentPane().setLayout(new BorderLayout());       
        
        JPanel p=new JPanel(new FlowLayout(FlowLayout.LEFT));        
        JButton bt=new JButton("点我");
        p.add(bt);
        me=this;
        bt.addActionListener(new ActionListener(){

            @Override
            public void actionPerformed(ActionEvent e) {
               try {
                    me.send();
                 
                } catch (Exception ex) {
                    Logger.getLogger(AioTcpClient.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
        
        });
        
        bt=new JButton("结束");
        p.add(bt);
        me=this;
        bt.addActionListener(new ActionListener(){
            @Override
            public void actionPerformed(ActionEvent e) {                 
            }
        
        });
 
        f.getContentPane().add(jt,BorderLayout.CENTER);
        f.getContentPane().add(p, BorderLayout.EAST);
        
        f.setSize(450, 300);
        f.setDefaultCloseOperation (JFrame.EXIT_ON_CLOSE);
        f.setLocationRelativeTo (null);
        f.setVisible (true);
    }
      
    public static void main(String[] args) {
        javax.swing.SwingUtilities.invokeLater(new Runnable() {
            @Override
            public void run() {     
                AioTcpClient d = null;
                try {
                    d = new AioTcpClient();
                } catch (Exception ex) {
                    Logger.getLogger(AioTcpClient.class.getName()).log(Level.SEVERE, null, ex);
                }
                
                d.createPanel();
                try {
                    d.work();
                } catch (Exception ex) {
                    Logger.getLogger(AioTcpClient.class.getName()).log(Level.SEVERE, null, ex);
                }
               
                 
            }
        });
    } 
}


AioConnectHandler.java
import java.util.concurrent.*;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class AioConnectHandler implements CompletionHandler
    <Void,AsynchronousSocketChannel>
{
    private Integer content = 0;
    
    public AioConnectHandler(Integer value){
        this.content = value;
    }
 
    @Override
    public void completed(Void attachment,AsynchronousSocketChannel connector) { 
        try {  
         connector.write(ByteBuffer.wrap(String.valueOf(content).getBytes())).get();
         startRead(connector); 
        } catch (ExecutionException e) { 
            e.printStackTrace(); 
        } catch (InterruptedException ep) { 
            ep.printStackTrace(); 
        } 
    } 
 
    @Override
    public void failed(Throwable exc, AsynchronousSocketChannel attachment) { 
        exc.printStackTrace(); 
    } 
    
    //这不是 CompletionHandler接口的方法。
    public void startRead(AsynchronousSocketChannel socket) { 
        ByteBuffer clientBuffer = ByteBuffer.allocate(1024); 
        //read的原型是
        //read(ByteBuffer dst, A attachment,
        //    CompletionHandler<Integer,? super A> handler) 
        //即它的操作处理器,的A型,是实际调用read的第二个参数,即clientBuffer。
        // V型是存有read的连接情况的参数
        socket.read(clientBuffer, clientBuffer, new AioReadHandler(socket)); 
        try { 
            
        } catch (Exception e) { 
            e.printStackTrace(); 
        } 
    }
 
}


AioReadHandler.java
import java.io.IOException; 
import java.nio.ByteBuffer; 
import java.nio.channels.AsynchronousSocketChannel; 
import java.nio.channels.CompletionHandler; 
import java.nio.charset.CharacterCodingException; 
import java.nio.charset.Charset; 
import java.nio.charset.CharsetDecoder; 
 
public class AioReadHandler implements CompletionHandler
    <Integer,ByteBuffer>
{ 
    private AsynchronousSocketChannel socket; 
 
    public AioReadHandler(AsynchronousSocketChannel socket) { 
        this.socket = socket; 
    } 
 
    public void cancelled(ByteBuffer attachment) { 
        System.out.println("cancelled"); 
    } 
 
    private CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder(); 
 
    @Override
    public void completed(Integer i, ByteBuffer buf) { 
        if (i > 0) { 
            buf.flip(); 
            try { 
                System.out.println("收到" + socket.getRemoteAddress().toString() + "的消息:" + decoder.decode(buf)); 
                buf.compact(); 
            } catch (CharacterCodingException e) { 
                e.printStackTrace(); 
            } catch (IOException e) { 
                e.printStackTrace(); 
            } 
            socket.read(buf, buf, this); 
        } else if (i == -1) { 
            try { 
                System.out.println("对端断线:" + socket.getRemoteAddress().toString()); 
                buf = null; 
            } catch (IOException e) { 
                e.printStackTrace(); 
            } 
        } 
    } 
 
    @Override
    public void failed(Throwable exc, ByteBuffer buf) { 
        System.out.println(exc); 
    } 

     
}


AioSendHandler.java(与服务端的写相同)
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.CharacterCodingException;


public class AioSendHandler implements CompletionHandler
    <Integer,ByteBuffer>
{ 
    private AsynchronousSocketChannel socket; 
 
    public AioSendHandler(AsynchronousSocketChannel socket) { 
        this.socket = socket; 
    } 

    @Override
    public void completed(Integer i, ByteBuffer buf) {
        if (i > 0) { 
            socket.write(buf, buf, this); 
        } else if (i == -1) { 
            try { 
                System.out.println("对端断线:" + socket.getRemoteAddress().toString()); 
                buf = null; 
            } catch (IOException e) { 
                e.printStackTrace(); 
            } 
        } 
        
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        System.out.println("cancelled"); 
    }
    
}

  • src.rar (8.3 KB)
  • 下载次数: 212
分享到:
评论
2 楼 Yuanyuanasdf 2017-05-12  
同样的,在AioAcceptHandler中调用了socket.read(clientBuffer, clientBuffer, rd);但在
AioReadHandler中又调用了一次socket.read(buf, buf, this);
socket.write(clientBuffer, clientBuffer, new AioWriteHandler(socket));也是一样,分别在AioReadHandler和AioWriteHandler各调用一次,看的不太懂~
1 楼 Yuanyuanasdf 2017-05-12  
为什么在AioAcceptHandler compelted中再次调用
System.out.println("AioAcceptHandler.completed called"); 
            attachment.accept(attachment, this);
在tcpServer中不是已经调用了
listener.accept(listener, new AioAcceptHandler());
大神,求解答   

相关推荐

Global site tag (gtag.js) - Google Analytics