Clean up netty pipeline and actually use it.

This commit is contained in:
Mike 2020-05-02 03:17:59 +02:00
parent 74772b04ed
commit 90061e8d7d
17 changed files with 264 additions and 129 deletions

View File

@ -80,8 +80,8 @@ public class GameClient {
public void sendResponse(MessageComposer composer) {
if (this.channel.isOpen()) {
try {
ServerMessage msg = composer.compose();
this.sendResponse(msg);
this.channel.write(composer, this.channel.voidPromise());
this.channel.flush();
} catch (Exception e) {
Emulator.getLogging().logPacketError(e);
}
@ -94,32 +94,23 @@ public class GameClient {
return;
}
if (PacketManager.DEBUG_SHOW_PACKETS)
Emulator.getLogging().logPacketLine("[" + Logging.ANSI_PURPLE + "SERVER" + Logging.ANSI_RESET + "] => [" + response.getHeader() + "] -> " + response.getBodyString());
this.channel.write(response.get(), this.channel.voidPromise());
this.channel.write(response, this.channel.voidPromise());
this.channel.flush();
}
}
public void sendResponses(ArrayList<ServerMessage> responses) {
ByteBuf buffer = Unpooled.buffer();
if (this.channel.isOpen()) {
for (ServerMessage response : responses) {
if (response == null || response.getHeader() <= 0) {
return;
}
if (PacketManager.DEBUG_SHOW_PACKETS)
Emulator.getLogging().logPacketLine("[" + Logging.ANSI_PURPLE + "SERVER" + Logging.ANSI_RESET + "] => [" + response.getHeader() + "] -> " + response.getBodyString());
buffer.writeBytes(response.get());
this.channel.write(response);
}
this.channel.write(buffer.copy(), this.channel.voidPromise());
this.channel.flush();
}
buffer.release();
}
public void dispose() {

View File

@ -78,4 +78,8 @@ public class ClientMessage {
return this.buffer.readableBytes();
}
public boolean release() {
return this.buffer.release();
}
}

View File

@ -191,4 +191,9 @@ public class ServerMessage {
return this.channelBuffer.copy();
}
public void release() {
this.channelBuffer.release();
}
}

View File

@ -4,8 +4,8 @@ import com.eu.habbo.crypto.HabboRC4;
import com.eu.habbo.messages.NoAuthMessage;
import com.eu.habbo.messages.incoming.MessageHandler;
import com.eu.habbo.messages.outgoing.handshake.CompleteDiffieHandshakeComposer;
import com.eu.habbo.networking.gameserver.GameByteDecryption;
import com.eu.habbo.networking.gameserver.GameByteEncryption;
import com.eu.habbo.networking.gameserver.decoders.GameByteDecryption;
import com.eu.habbo.networking.gameserver.encoders.GameByteEncryption;
import com.eu.habbo.networking.gameserver.GameServerAttributes;
@NoAuthMessage

View File

@ -1,54 +0,0 @@
package com.eu.habbo.networking.gameserver;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.CharsetUtil;
import java.util.List;
public class GameByteDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
in.markReaderIndex();
//4 bytes length + 2 bytes header
if (in.readableBytes() < 6) {
in.resetReaderIndex();
return;
}
int length = in.readInt();
//if(length > 5120 && (length >> 24 != 60))
//{
//}
if (length == 1014001516) {
in.resetReaderIndex();
//in.readBytes(in.readableBytes());
ChannelFuture f = ctx.writeAndFlush(Unpooled.copiedBuffer("<?xml version=\"1.0\"?>\n" +
" <!DOCTYPE cross-domain-policy SYSTEM \"/xml/dtds/cross-domain-policy.dtd\">\n" +
" <cross-domain-policy>\n" +
" <allow-access-from domain=\"*\" to-ports=\"1-31111\" />\n" +
" </cross-domain-policy>" + (char) 0, CharsetUtil.UTF_8));
f.channel().close();
ctx.channel().close();
return;
}
if (in.readableBytes() < length || length < 0) {
in.resetReaderIndex();
return;
}
in.resetReaderIndex();
ByteBuf read = in.readBytes(length + 4);
out.add(read);
}
}

View File

@ -4,6 +4,10 @@ import com.eu.habbo.Emulator;
import com.eu.habbo.habbohotel.gameclients.GameClientManager;
import com.eu.habbo.messages.PacketManager;
import com.eu.habbo.networking.Server;
import com.eu.habbo.networking.gameserver.decoders.*;
import com.eu.habbo.networking.gameserver.encoders.MessageComposerEncoder;
import com.eu.habbo.networking.gameserver.encoders.ServerMessageEncoder;
import com.eu.habbo.networking.gameserver.encoders.GameServerMessageLogger;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.logging.LoggingHandler;
@ -26,8 +30,24 @@ public class GameServer extends Server {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("logger", new LoggingHandler());
ch.pipeline().addLast("bytesDecoder", new GameByteDecoder());
ch.pipeline().addLast(new GameMessageHandler());
// Decoders.
ch.pipeline().addLast(
new GamePolicyDecoder(),
new GameByteFrameDecoder(),
new GameByteDecoder(),
new GameMessageRateLimit(),
new GameMessageHandler()
);
// Encoders.
ch.pipeline().addLast(new ServerMessageEncoder());
if (PacketManager.DEBUG_SHOW_PACKETS) {
ch.pipeline().addLast(new GameServerMessageLogger());
}
ch.pipeline().addLast(new MessageComposerEncoder());
}
});
}

View File

@ -0,0 +1,19 @@
package com.eu.habbo.networking.gameserver.decoders;
import com.eu.habbo.messages.ClientMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class GameByteDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
short header = in.readShort();
ByteBuf body = Unpooled.copiedBuffer(in.readBytes(in.readableBytes()));
out.add(new ClientMessage(header, body));
}
}

View File

@ -1,5 +1,6 @@
package com.eu.habbo.networking.gameserver;
package com.eu.habbo.networking.gameserver.decoders;
import com.eu.habbo.networking.gameserver.GameServerAttributes;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;

View File

@ -0,0 +1,26 @@
package com.eu.habbo.networking.gameserver.decoders;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
public class GameByteFrameDecoder extends LengthFieldBasedFrameDecoder {
private static final int MAX_PACKET_LENGTH = 8192 * 4;
private static final int LENGTH_FIELD_OFFSET = 0;
private static final int LENGTH_FIELD_LENGTH = 4;
private static final int LENGTH_FIELD_ADJUSTMENT = 0;
private static final int INITIAL_BYTES_TO_STRIP = 4;
public GameByteFrameDecoder()
{
super(MAX_PACKET_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, LENGTH_FIELD_ADJUSTMENT, INITIAL_BYTES_TO_STRIP);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception
{
return super.decode(ctx, in);
}
}

View File

@ -1,16 +1,19 @@
package com.eu.habbo.networking.gameserver;
package com.eu.habbo.networking.gameserver.decoders;
import com.eu.habbo.Emulator;
import com.eu.habbo.messages.ClientMessage;
import com.eu.habbo.messages.PacketManager;
import com.eu.habbo.threading.runnables.ChannelReadHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.TooLongFrameException;
import java.io.IOException;
@ChannelHandler.Sharable
public class GameMessageHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
if (!Emulator.getGameServer().getGameClientManager().addClient(ctx)) {
@ -25,8 +28,10 @@ public class GameMessageHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ClientMessage message = (ClientMessage) msg;
try {
ChannelReadHandler handler = new ChannelReadHandler(ctx, msg);
ChannelReadHandler handler = new ChannelReadHandler(ctx, message);
if (PacketManager.MULTI_THREADED_PACKET_HANDLING) {
Emulator.getThreading().run(handler);
@ -46,12 +51,18 @@ public class GameMessageHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (cause instanceof Exception) {
if (!(cause instanceof IOException)) {
// cause.printStackTrace(Logging.getErrorsRuntimeWriter());
if (cause instanceof TooLongFrameException) {
Emulator.getLogging().logErrorLine("Disconnecting client, reason: \"" + cause.getMessage() + "\".");
} else {
Emulator.getLogging().logErrorLine("Disconnecting client, exception in GameMessageHander:");
Emulator.getLogging().logErrorLine(cause.toString());
for (StackTraceElement element : cause.getStackTrace()) {
Emulator.getLogging().logErrorLine(element.toString());
}
}
ctx.channel().close();
}
}

View File

@ -0,0 +1,49 @@
package com.eu.habbo.networking.gameserver.decoders;
import com.eu.habbo.Emulator;
import com.eu.habbo.habbohotel.gameclients.GameClient;
import com.eu.habbo.messages.ClientMessage;
import com.eu.habbo.networking.gameserver.GameServerAttributes;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import java.util.List;
public class GameMessageRateLimit extends MessageToMessageDecoder<ClientMessage> {
private static final int RESET_TIME = 1;
private static final int MAX_COUNTER = 10;
@Override
protected void decode(ChannelHandlerContext ctx, ClientMessage message, List<Object> out) throws Exception {
GameClient client = ctx.channel().attr(GameServerAttributes.CLIENT).get();
if (client == null) {
return;
}
int count = 0;
// Check if reset time has passed.
int timestamp = Emulator.getIntUnixTimestamp();
if (timestamp - client.lastPacketCounterCleared > RESET_TIME) {
// Reset counter.
client.incomingPacketCounter.clear();
client.lastPacketCounterCleared = timestamp;
} else {
// Get stored count for message id.
count = client.incomingPacketCounter.getOrDefault(message.getMessageId(), 0);
}
// If we exceeded the counter, drop the packet.
if (count > MAX_COUNTER) {
return;
}
client.incomingPacketCounter.put(message.getMessageId(), ++count);
// Continue processing.
out.add(message);
}
}

View File

@ -0,0 +1,39 @@
package com.eu.habbo.networking.gameserver.decoders;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.CharsetUtil;
import java.util.List;
public class GamePolicyDecoder extends ByteToMessageDecoder {
private static final String POLICY = "<?xml version=\"1.0\"?>\n" +
" <!DOCTYPE cross-domain-policy SYSTEM \"/xml/dtds/cross-domain-policy.dtd\">\n" +
" <cross-domain-policy>\n" +
" <allow-access-from domain=\"*\" to-ports=\"1-31111\" />\n" +
" </cross-domain-policy>" + (char) 0;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
in.markReaderIndex();
byte b = in.readByte();
if (b == '<') {
in.resetReaderIndex();
ctx.writeAndFlush(Unpooled.copiedBuffer(POLICY, CharsetUtil.UTF_8))
.addListener(ChannelFutureListener.CLOSE);
return;
}
// Remove ourselves since the first packet was not a policy request.
ctx.pipeline().remove(this);
// Continue to the other pipelines.
in.resetReaderIndex();
}
}

View File

@ -1,5 +1,6 @@
package com.eu.habbo.networking.gameserver;
package com.eu.habbo.networking.gameserver.encoders;
import com.eu.habbo.networking.gameserver.GameServerAttributes;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
@ -14,7 +15,13 @@ public class GameByteEncryption extends ChannelOutboundHandlerAdapter {
ByteBuf out = (ByteBuf) msg;
// Read all available bytes.
byte[] data = out.array();
byte[] data;
if (out.hasArray()) {
data = out.array();
} else {
data = out.readBytes(out.readableBytes()).array();
}
// Encrypt.
ctx.channel().attr(GameServerAttributes.CRYPTO_SERVER).get().parse(data);

View File

@ -0,0 +1,20 @@
package com.eu.habbo.networking.gameserver.encoders;
import com.eu.habbo.Emulator;
import com.eu.habbo.core.Logging;
import com.eu.habbo.messages.ServerMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
public class GameServerMessageLogger extends MessageToMessageEncoder<ServerMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, ServerMessage message, List<Object> out) throws Exception {
Emulator.getLogging().logPacketLine("[" + Logging.ANSI_PURPLE + "SERVER" + Logging.ANSI_RESET + "] => [" + message.getHeader() + "] -> " + message.getBodyString());
out.add(message);
}
}

View File

@ -0,0 +1,16 @@
package com.eu.habbo.networking.gameserver.encoders;
import com.eu.habbo.messages.outgoing.MessageComposer;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
public class MessageComposerEncoder extends MessageToMessageEncoder<MessageComposer> {
@Override
protected void encode(ChannelHandlerContext ctx, MessageComposer message, List<Object> out) throws Exception {
out.add(message.compose());
}
}

View File

@ -0,0 +1,22 @@
package com.eu.habbo.networking.gameserver.encoders;
import com.eu.habbo.messages.ServerMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class ServerMessageEncoder extends MessageToByteEncoder<ServerMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, ServerMessage message, ByteBuf out) {
ByteBuf buf = message.get();
try {
out.writeBytes(buf);
} finally {
buf.release();
message.release();
}
}
}

View File

@ -2,70 +2,29 @@ package com.eu.habbo.threading.runnables;
import com.eu.habbo.Emulator;
import com.eu.habbo.habbohotel.gameclients.GameClient;
import com.eu.habbo.habbohotel.gameclients.GameClientManager;
import com.eu.habbo.messages.ClientMessage;
import com.eu.habbo.networking.gameserver.GameServerAttributes;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
public class ChannelReadHandler implements Runnable {
private final ChannelHandlerContext ctx;
private final Object msg;
//private int _header;
public ChannelReadHandler(ChannelHandlerContext ctx, Object msg) {
private final ChannelHandlerContext ctx;
private final ClientMessage message;
public ChannelReadHandler(ChannelHandlerContext ctx, ClientMessage message) {
this.ctx = ctx;
this.msg = msg;
this.message = message;
}
public void run() {
try {
ByteBuf m = (ByteBuf) this.msg;
int length = m.readInt();
short header = m.readShort();
//_header = header;
GameClient client = this.ctx.channel().attr(GameServerAttributes.CLIENT).get();
if (m.readableBytes() + 2 < length) {
return;
}
if (client != null) {
int count = 0;
int timestamp = Emulator.getIntUnixTimestamp();
if (timestamp - client.lastPacketCounterCleared > 1) {
client.incomingPacketCounter.clear();
client.lastPacketCounterCleared = timestamp;
} else {
if (m.readableBytes() + 2 < length) {
m.resetReaderIndex();
client.incomingPacketCounter.put((int) header, 0);
count = 0;
return;
} else {
count = client.incomingPacketCounter.getOrDefault(header, 0);
}
}
if (count <= 10) {
count++;
if (m.readableBytes() + 2 < length) {
m.resetReaderIndex();
client.incomingPacketCounter.put((int) header, 0);
count = 0;
return;
}
client.incomingPacketCounter.put((int) header, count);
ByteBuf body = Unpooled.wrappedBuffer(m.readBytes(m.readableBytes()));
Emulator.getGameServer().getPacketManager().handlePacket(client, new ClientMessage(header, body));
body.release();
}
Emulator.getGameServer().getPacketManager().handlePacket(client, message);
}
m.release();
} catch (Exception e) {
//System.out.println("Potential packet overflow occurring, careful! header: " + _header + e.getMessage());
} finally {
this.message.release();
}
}
}