diff --git a/Client/src/tk/jackyliao123/proxy/client/Tunnel.java b/Client/src/tk/jackyliao123/proxy/client/Tunnel.java index 6f05950..c33e2a2 100644 --- a/Client/src/tk/jackyliao123/proxy/client/Tunnel.java +++ b/Client/src/tk/jackyliao123/proxy/client/Tunnel.java @@ -33,7 +33,7 @@ public Tunnel(EventProcessor processor, byte[] secretKey, TCPListener tcpListene serverSocketChannel.configureBlocking(false); serverSocketChannel.connect(Variables.serverAddress); - rawServerConnection = processor.registerSocketChannel(serverSocketChannel, new ConnectToServerListener(this, secretKey)); + this.rawServerConnection = processor.registerSocketChannel(serverSocketChannel, new ConnectToServerListener(this, secretKey)); this.packetLengthListener = new ClientEncryptedPacketLengthListener(this); this.packetListener = new ClientEncryptedPacketListener(this); diff --git a/Client/src/tk/jackyliao123/proxy/client/event/ClientEncryptedPacketLengthListener.java b/Client/src/tk/jackyliao123/proxy/client/event/ClientEncryptedPacketLengthListener.java index 8b1564d..bb53820 100644 --- a/Client/src/tk/jackyliao123/proxy/client/event/ClientEncryptedPacketLengthListener.java +++ b/Client/src/tk/jackyliao123/proxy/client/event/ClientEncryptedPacketLengthListener.java @@ -15,6 +15,7 @@ public ClientEncryptedPacketLengthListener(Tunnel tunnel) { this.tunnel = tunnel; } + @Override public void onRead(ChannelWrapper channel, byte[] array) throws IOException { int length = Util.b2ub(array[0]) * 16; channel.pushFillReadBuffer(ByteBuffer.allocate(length), tunnel.packetListener); diff --git a/Client/src/tk/jackyliao123/proxy/client/event/ConnectToServerListener.java b/Client/src/tk/jackyliao123/proxy/client/event/ConnectToServerListener.java index 60dc5a7..e5ef3df 100644 --- a/Client/src/tk/jackyliao123/proxy/client/event/ConnectToServerListener.java +++ b/Client/src/tk/jackyliao123/proxy/client/event/ConnectToServerListener.java @@ -18,6 +18,7 @@ public ConnectToServerListener(Tunnel tunnel, byte[] secretKey) { this.secretKey = secretKey; } + @Override public boolean onConnect(ChannelWrapper c) throws IOException { SocketChannel channel = (SocketChannel) c.channel; boolean connected = channel.finishConnect(); diff --git a/Client/src/tk/jackyliao123/proxy/client/event/HandshakeResponseListener.java b/Client/src/tk/jackyliao123/proxy/client/event/HandshakeResponseListener.java index 19b473b..5806423 100644 --- a/Client/src/tk/jackyliao123/proxy/client/event/HandshakeResponseListener.java +++ b/Client/src/tk/jackyliao123/proxy/client/event/HandshakeResponseListener.java @@ -23,6 +23,7 @@ public HandshakeResponseListener(Tunnel tunnel, byte[] secretKey) { this.secretKey = secretKey; } + @Override public void onRead(ChannelWrapper channel, byte[] array) throws IOException { boolean eq = Util.bseq(array, 0, Constants.MAGIC_LENGTH, Constants.MAGIC, 0, Constants.MAGIC_LENGTH); if (!eq) { diff --git a/Client/src/tk/jackyliao123/proxy/client/event/TunnelDisconnectListener.java b/Client/src/tk/jackyliao123/proxy/client/event/TunnelDisconnectListener.java index b1146f8..ab303f7 100644 --- a/Client/src/tk/jackyliao123/proxy/client/event/TunnelDisconnectListener.java +++ b/Client/src/tk/jackyliao123/proxy/client/event/TunnelDisconnectListener.java @@ -13,6 +13,7 @@ public TunnelDisconnectListener(SocksClient client) { this.client = client; } + @Override public void onDisconnect(ChannelWrapper c) throws IOException { client.disconnect(c); System.err.println("Disconnected"); diff --git a/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5AddressListener.java b/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5AddressListener.java index 2fd0ffe..6ced1eb 100644 --- a/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5AddressListener.java +++ b/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5AddressListener.java @@ -22,6 +22,7 @@ public Socks5AddressListener(SocksClient client, byte atyp, byte cmd) { this.cmd = cmd; } + @Override public void onRead(ChannelWrapper channel, byte[] array) throws IOException { byte type; byte[] addr; diff --git a/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5DataListener.java b/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5DataListener.java index ea377dc..811f489 100644 --- a/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5DataListener.java +++ b/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5DataListener.java @@ -15,6 +15,7 @@ public Socks5DataListener(SocksClient client, int id) { this.id = id; } + @Override public void onRead(ChannelWrapper channel, byte[] array) throws IOException { client.getTCPTunnel().send(id, array); channel.pushDumpReadBuffer(this); diff --git a/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5DisconnectListener.java b/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5DisconnectListener.java index db74a84..942fc51 100644 --- a/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5DisconnectListener.java +++ b/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5DisconnectListener.java @@ -16,6 +16,7 @@ public Socks5DisconnectListener(SocksClient client, int connectionId) { this.connectionId = connectionId; } + @Override public void onDisconnect(ChannelWrapper c) throws IOException { client.freeId(connectionId); client.getTCPTunnel().disconnect(connectionId, Constants.TCP_DISCONNECT_CONNECTION_RESET); diff --git a/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5DomainLengthListener.java b/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5DomainLengthListener.java index 34eaaaa..22f35cf 100644 --- a/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5DomainLengthListener.java +++ b/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5DomainLengthListener.java @@ -18,6 +18,7 @@ public Socks5DomainLengthListener(SocksClient client, byte cmd) { this.cmd = cmd; } + @Override public void onRead(ChannelWrapper channel, byte[] array) throws IOException { channel.pushFillReadBuffer(ByteBuffer.allocate(Util.b2ub(array[0]) + 2), new Socks5AddressListener(client, Socks5Constants.ATYP_DOMAIN, cmd)); } diff --git a/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5MethodLengthListener.java b/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5MethodLengthListener.java index ca256c4..85a4d45 100644 --- a/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5MethodLengthListener.java +++ b/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5MethodLengthListener.java @@ -17,6 +17,7 @@ public Socks5MethodLengthListener(SocksClient client) { this.client = client; } + @Override public void onRead(ChannelWrapper channel, byte[] array) throws IOException { byte version = array[0]; if (version != Socks5Constants.VERSION) { diff --git a/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5RequestListener.java b/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5RequestListener.java index d0349f9..034cf3c 100644 --- a/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5RequestListener.java +++ b/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5RequestListener.java @@ -16,6 +16,7 @@ public Socks5RequestListener(SocksClient client) { this.client = client; } + @Override public void onRead(ChannelWrapper channel, byte[] array) throws IOException { byte version = array[0]; if (version != Socks5Constants.VERSION) { diff --git a/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5TCPListener.java b/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5TCPListener.java index 037eded..ef1ace9 100644 --- a/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5TCPListener.java +++ b/Client/src/tk/jackyliao123/proxy/client/socks/event/Socks5TCPListener.java @@ -18,6 +18,7 @@ public Socks5TCPListener(SocksClient client) { this.client = client; } + @Override public void onTcpConnect(int connectionId, byte statusCode, int ping) throws IOException { Socks5ConnectionData c = client.connections.get(connectionId); if (c == null) { @@ -80,6 +81,7 @@ public void onTcpConnect(int connectionId, byte statusCode, int ping) throws IOE c.client.pushDumpReadBuffer(new Socks5DataListener(client, connectionId)); } + @Override public void onTcpPacket(int connectionId, byte[] packet) throws IOException { Socks5ConnectionData c = client.connections.get(connectionId); if (c == null) { @@ -89,6 +91,7 @@ public void onTcpPacket(int connectionId, byte[] packet) throws IOException { c.client.pushWriteBuffer(ByteBuffer.wrap(packet)); } + @Override public void onTcpDisconnect(int connectionId, byte reason) throws IOException { Socks5ConnectionData c = client.connections.remove(connectionId); if (c == null) { diff --git a/Client/src/tk/jackyliao123/proxy/client/socks/event/TunnelDisconnectListener.java b/Client/src/tk/jackyliao123/proxy/client/socks/event/TunnelDisconnectListener.java index a0ffe3f..ee97fce 100644 --- a/Client/src/tk/jackyliao123/proxy/client/socks/event/TunnelDisconnectListener.java +++ b/Client/src/tk/jackyliao123/proxy/client/socks/event/TunnelDisconnectListener.java @@ -13,6 +13,7 @@ public TunnelDisconnectListener(SocksClient client) { this.client = client; } + @Override public void onDisconnect(ChannelWrapper c) throws IOException { client.disconnect(c); System.err.println("Disconnected"); diff --git a/Common/src/tk/jackyliao123/proxy/ChannelWrapper.java b/Common/src/tk/jackyliao123/proxy/ChannelWrapper.java index f62c6ab..81aa49e 100644 --- a/Common/src/tk/jackyliao123/proxy/ChannelWrapper.java +++ b/Common/src/tk/jackyliao123/proxy/ChannelWrapper.java @@ -92,6 +92,7 @@ public void addInterest(int op) { try { selectionKey.interestOps(selectionKey.interestOps() | op); } catch (CancelledKeyException e) { + Logger.warning("Cancelled key exception"); close(); } } @@ -100,21 +101,26 @@ public void removeInterest(int op) { try { selectionKey.interestOps(selectionKey.interestOps() & (~op)); } catch (CancelledKeyException e) { + Logger.warning("Cancelled key exception"); close(); } } public void pushDumpReadBuffer(ReadEventListener listener) { - if (!shouldClose && !stopReading) { + if (!shouldClose) { readBuffers.addLast(new BufferFiller(ByteBuffer.allocate(Constants.BUFFER_SIZE), listener, false)); - addInterest(SelectionKey.OP_READ); + if (!stopReading) { + addInterest(SelectionKey.OP_READ); + } } } public void pushFillReadBuffer(ByteBuffer bytes, ReadEventListener listener) { - if (!shouldClose && !stopReading) { + if (!shouldClose) { readBuffers.addLast(new BufferFiller(bytes, listener, true)); - addInterest(SelectionKey.OP_READ); + if (!stopReading) { + addInterest(SelectionKey.OP_READ); + } } } @@ -130,6 +136,7 @@ public BufferFiller getReadBuffer() { if (!readBuffers.isEmpty()) { return readBuffers.getFirst(); } + removeInterest(SelectionKey.OP_READ); return null; } @@ -139,9 +146,11 @@ public boolean isFullyRead() { } public void pushWriteBuffer(ByteBuffer bytes) { - if (!shouldClose && !stopWriting) { + if (!shouldClose) { writeBuffers.addLast(bytes); - addInterest(SelectionKey.OP_WRITE); + if (!stopWriting) { + addInterest(SelectionKey.OP_WRITE); + } } } diff --git a/Common/src/tk/jackyliao123/proxy/Logger.java b/Common/src/tk/jackyliao123/proxy/Logger.java index 1d0b1e4..2d7abfa 100644 --- a/Common/src/tk/jackyliao123/proxy/Logger.java +++ b/Common/src/tk/jackyliao123/proxy/Logger.java @@ -89,6 +89,7 @@ public LogThread() { setDaemon(true); } + @Override public void run() { while (true) { try { diff --git a/Common/src/tk/jackyliao123/proxy/TunnelChannelWrapper.java b/Common/src/tk/jackyliao123/proxy/TunnelChannelWrapper.java index 8f39ab5..c9ba4c6 100644 --- a/Common/src/tk/jackyliao123/proxy/TunnelChannelWrapper.java +++ b/Common/src/tk/jackyliao123/proxy/TunnelChannelWrapper.java @@ -43,6 +43,11 @@ public void pushWriteBuffer(int connectionId, ByteBuffer data) { buffers.addLast(data); + if (buffers.size() >= Constants.MAX_QUEUE) { + stopReading(connectionId); + Logger.info("Buffer filled"); + } + addInterest(SelectionKey.OP_WRITE); } @@ -57,6 +62,9 @@ public ByteBuffer getWriteBuffer() { if (clientBuffers.isEmpty()) { iterator.remove(); } + if (clientBuffers.size() < Constants.MAX_QUEUE) { + startReading(entry.getKey()); + } super.pushWriteBuffer(buffer); } } diff --git a/Common/src/tk/jackyliao123/proxy/event/EventProcessor.java b/Common/src/tk/jackyliao123/proxy/event/EventProcessor.java index 0a59e4e..6da3526 100644 --- a/Common/src/tk/jackyliao123/proxy/event/EventProcessor.java +++ b/Common/src/tk/jackyliao123/proxy/event/EventProcessor.java @@ -27,7 +27,6 @@ public Selector getSelector() { public void process(long timeout) throws IOException { selector.select(timeout); - Iterator keys = selector.selectedKeys().iterator(); while (keys.hasNext()) { SelectionKey key = keys.next(); @@ -136,6 +135,10 @@ public void process(long timeout) throws IOException { Logger.error("Event processing has experienced an error on " + key.channel()); Logger.error(e); kill((ChannelWrapper) key.attachment()); + } catch (Throwable t) { + Logger.error("Critical Error, Throwable caught"); + Logger.error(t); + kill((ChannelWrapper) key.attachment()); } keys.remove(); diff --git a/Server/src/tk/jackyliao123/proxy/server/event/ConnectListener.java b/Server/src/tk/jackyliao123/proxy/server/event/ConnectListener.java index 6f7b63a..63589fe 100644 --- a/Server/src/tk/jackyliao123/proxy/server/event/ConnectListener.java +++ b/Server/src/tk/jackyliao123/proxy/server/event/ConnectListener.java @@ -19,6 +19,7 @@ public ConnectListener(TCPHandler handler, int connectionId) { this.handler = handler; } + @Override public boolean onConnect(ChannelWrapper channel) throws IOException { if (channel.channel instanceof SocketChannel) { int timeTaken = (int) (System.currentTimeMillis() - channel.currentTimestamp); diff --git a/Server/src/tk/jackyliao123/proxy/server/event/RedirectDisconnectListener.java b/Server/src/tk/jackyliao123/proxy/server/event/RedirectDisconnectListener.java index b942379..4ec66ce 100644 --- a/Server/src/tk/jackyliao123/proxy/server/event/RedirectDisconnectListener.java +++ b/Server/src/tk/jackyliao123/proxy/server/event/RedirectDisconnectListener.java @@ -16,6 +16,7 @@ public RedirectDisconnectListener(TCPHandler handler, int connectionId) { this.connectionId = connectionId; } + @Override public void onDisconnect(ChannelWrapper c) throws IOException { handler.closeConnection(connectionId); handler.sendDisconnect(connectionId, Constants.TCP_DISCONNECT_CONNECTION_RESET); diff --git a/Server/src/tk/jackyliao123/proxy/server/event/RedirectToClientListener.java b/Server/src/tk/jackyliao123/proxy/server/event/RedirectToClientListener.java index 5e57200..bfe9e2d 100644 --- a/Server/src/tk/jackyliao123/proxy/server/event/RedirectToClientListener.java +++ b/Server/src/tk/jackyliao123/proxy/server/event/RedirectToClientListener.java @@ -15,6 +15,7 @@ public RedirectToClientListener(TCPHandler tcp, int connectionId) { this.connectionId = connectionId; } + @Override public void onRead(ChannelWrapper channel, byte[] array) throws IOException { tcp.sendPacket(connectionId, array); channel.pushDumpReadBuffer(this);