代码之家  ›  专栏  ›  技术社区  ›  SSpoke

javanio框架在重载下停止工作,没有写操作

  •  0
  • SSpoke  · 技术社区  · 14 年前

    虽然我是个新手,但这个问题对我来说相当奇怪。

    发生的情况是,如果您强制服务器承受重负载的连接,并不断发送一个不代表策略的无效数据包。

    非常奇怪的问题,但请注意,如果我删除包匹配语句,它的行为将类似于echo服务器。然后它将无休止地运行,不会遇到任何连接接受问题,基本上会变得稳定。

    我在下面附上了整个服务器源代码。能有人谁拥有丰富的知识与NIO请检查它,让我知道如果有一个方法来解决它。

    真正吸引我注意的是send()中的选择器wakeup,它可能会在将下面的行放入read()之后修复所有问题。它似乎什么也不做,问题仍然存在。

    // Finally, wake up our selecting thread so it can make the required changes
    this.selector.wakeup();
    

    下面是简单服务器的源代码。

    import java.io.IOException;
    import java.net.InetAddress;
    import java.net.InetSocketAddress;
    import java.net.Socket;
    import java.nio.ByteBuffer;
    import java.nio.charset.Charset;
    import java.nio.charset.CharsetDecoder;
    import java.nio.charset.CodingErrorAction;
    import java.nio.channels.CancelledKeyException;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.channels.spi.SelectorProvider;
    import java.util.*;
    
    public class PolicyServer implements Runnable {
        public static final String POLICY_REQUEST = "<policy-file-request/>";
        public static final String POLICY_XML =
            "<?xml version=\"1.0\"?>"
            + "<cross-domain-policy>"
            + "<allow-access-from domain=\"*\" to-ports=\"*\" />"
            + "</cross-domain-policy>"
            + (char)0;
    
    
        // The host:port combination to listen on
        private InetAddress hostAddress;
        private int port;
    
        // The channel on which we'll accept connections
        private ServerSocketChannel serverChannel;
    
        // The selector we'll be monitoring
        private Selector selector;
    
        // The buffer into which we'll read data when it's available
        private ByteBuffer readBuffer = ByteBuffer.allocate(255);
    
        // This decodes raw bytes into ascii data.
        private CharsetDecoder asciiDecoder;
    
        // A list of PendingChange instances
        private List<ChangeRequest> pendingChanges = new LinkedList<ChangeRequest>();
    
        // Maps a SocketChannel to a list of ByteBuffer instances
        private Map<SocketChannel, List<ByteBuffer>> pendingData = new HashMap<SocketChannel, List<ByteBuffer>>();
    
        public PolicyServer(InetAddress hostAddress, int port) throws IOException {
            this.hostAddress = hostAddress;
            this.port = port;
            this.selector = this.initSelector();
            this.asciiDecoder = Charset.forName("US-ASCII").newDecoder().onMalformedInput(
                                    CodingErrorAction.REPLACE).onUnmappableCharacter(
                                    CodingErrorAction.REPLACE);
        }
    
        public void send(SocketChannel socket, byte[] data) {
            synchronized (this.pendingChanges) {
                // Indicate we want the interest ops set changed
                this.pendingChanges.add(new ChangeRequest(socket, ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE));
    
                // And queue the data we want written
                synchronized (this.pendingData) {
                    List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socket);
                    if (queue == null) {
                        queue = new ArrayList<ByteBuffer>();
                        this.pendingData.put(socket, queue);
                    }
                    queue.add(ByteBuffer.wrap(data));
                }
            }
    
            // Finally, wake up our selecting thread so it can make the required changes
            this.selector.wakeup();
        }
    
        public void run() {
            while (true) {
                try {
                    // Process any pending changes
                    synchronized (this.pendingChanges) {
                        Iterator changes = this.pendingChanges.iterator();
                        while (changes.hasNext()) {
                            ChangeRequest change = (ChangeRequest) changes.next();
                            changes.remove();
                            if(change == null) continue;
                            switch (change.type) {
                            case ChangeRequest.CHANGEOPS:
                                SelectionKey key = change.socket.keyFor(this.selector);
                                try {
                                    if(key!=null)
                                        key.interestOps(change.ops);
                                } catch(Exception ex) {
                                    if (key!=null)
                                        key.cancel();
                                }
                            }
                        }
                        this.pendingChanges.clear();
                    }
    
                    // Wait for an event one of the registered channels
                    this.selector.select();
    
                    // Iterate over the set of keys for which events are available
                    Iterator selectedKeys = this.selector.selectedKeys().iterator();
                    while (selectedKeys.hasNext()) {
                        SelectionKey key = (SelectionKey) selectedKeys.next();
                        selectedKeys.remove();
    
                        if (!key.isValid()) {
                            key.cancel();
                            continue;
                        }
    
                        // Check what event is available and deal with it
                        try {
                            if (key.isAcceptable()) {
                                this.accept(key);
                            } else if (key.isReadable()) {
                                this.read(key);
                            } else if (key.isWritable()) {
                                this.write(key);
                            }
                        } catch(IOException io) {
                            this.pendingData.remove(key.channel());
                            try {
                                ((SocketChannel)key.channel()).socket().close();
                            } catch (IOException e) {}
                            key.channel().close();
                            key.cancel();
                            key.attach(null);
                            key = null;
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void accept(SelectionKey key) throws IOException {
            // For an accept to be pending the channel must be a server socket channel.
            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
    
            // Accept the connection and make it non-blocking
            SocketChannel socketChannel = serverSocketChannel.accept();
            Socket socket = socketChannel.socket();
            socketChannel.configureBlocking(false);
    
            // Register the new SocketChannel with our Selector, indicating
            // we'd like to be notified when there's data waiting to be read
            // also contains a attachment of a new StringBuffer (for storing imcomplete/multi packets)
            socketChannel.register(this.selector, SelectionKey.OP_READ, new StringBuffer());
        }
    
        private void read(SelectionKey key) throws IOException {
            SocketChannel socketChannel = (SocketChannel) key.channel();
    
            // Clear out our read buffer so it's ready for new data
            this.readBuffer.clear();
    
            // Attempt to read off the channel
            int numRead = socketChannel.read(this.readBuffer);
    
            if (numRead == -1) {
                // Remote entity shut the socket down cleanly. Do the
                // same from our end and cancel the channel.
                throw new IOException("");
            }
    
            // Grab the StringBuffer we stored as the attachment
            StringBuffer sb = (StringBuffer)key.attachment();
    
            // Flips the readBuffer by setting the current position of
            // packet stream to beginning.
            // Append the data to the attachment StringBuffer
            this.readBuffer.flip();
            sb.append(this.asciiDecoder.decode(this.readBuffer).toString());
            this.readBuffer.clear();
    
            // Get the policy request as complete packet
            if(sb.indexOf("\0") != -1) {
                String packets = new String(sb.substring(0, sb.lastIndexOf("\0")+1));
                sb.delete(0, sb.lastIndexOf("\0")+1);
                if(packets.indexOf(POLICY_REQUEST) != -1)
                    send(socketChannel, POLICY_XML.getBytes());
            } else if(sb.length() >  8192) {
                sb.setLength(0);
                //Force disconnect.
                throw new IOException("");
            }
        }
    
        private void write(SelectionKey key) throws IOException {
            SocketChannel socketChannel = (SocketChannel) key.channel();
    
            synchronized (this.pendingData) {
                List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socketChannel);
    
                if(queue == null || queue.isEmpty()) {
                    // We wrote away all data, so we're no longer interested
                    // in writing on this socket. Switch back to waiting for
                    // data.
                    try {
                        if (key!=null)
                            key.interestOps(SelectionKey.OP_READ);
                    } catch(Exception ex) {
                        if (key!=null)
                            key.cancel();
                    }
                }
    
                // Write until there's not more data ...
                while (!queue.isEmpty()) {
                    ByteBuffer buf = (ByteBuffer) queue.get(0);
                    socketChannel.write(buf);
                    if (buf.remaining() > 0) {
                        // ... or the socket's buffer fills up
                        break;
                    }
                    queue.remove(0);
                }
            }
        }
    
        private Selector initSelector() throws IOException {
            // Create a new selector
            Selector socketSelector = SelectorProvider.provider().openSelector();
    
            // Create a new non-blocking server socket channel
            this.serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking(false);
    
            // Bind the server socket to the specified address and port
            InetSocketAddress isa = new InetSocketAddress(this.hostAddress, this.port);
            serverChannel.socket().bind(isa);
    
            // Register the server socket channel, indicating an interest in
            // accepting new connections
            serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
    
            return socketSelector;
        }
    
        public static void main(String[] args) {
            try {
                new Thread(new PolicyServer(null, 5556)).start();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    
    import java.nio.channels.SocketChannel;
    
    public class ChangeRequest {
        public static final int CHANGEOPS = 1;
    
        public SocketChannel socket;
        public int type;
        public int ops;
    
        public ChangeRequest(SocketChannel socket, int type, int ops) {
            this.socket = socket;
            this.type = type;
            this.ops = ops;
        }
    }
    
    3 回复  |  直到 13 年前
        1
  •  0
  •   user207421    14 年前

    ((承插槽)键.通道()).socket().close()

    key.channel().close()
    

    send(),将SelectionKey更改为 写入操作

        2
  •  1
  •   refuess    11 年前

    对于重负载,请确保您使用了一个可以进行连接池的框架。

    查看sfnrpc( http://code.google.com/p/sfnrpc

        3
  •  0
  •   YoK    14 年前

    阅读评论后编辑:

    试着跑步 ulimit 注意:不建议将其设置为无限,但您可以尝试以下操作:

     ulimit -u unlimited