Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Realtime WebSockets Event Server #261

Open
wants to merge 19 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions GlobalQuakeCore/src/main/java/globalquake/core/Settings.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,13 @@ public final class Settings {
public static String discordBotChannelID;
@SuppressWarnings("unused")
public static Boolean discordBotSendRevisions;

public static Boolean enableRTWSEventServer;
public static String RTWSEventIP;
public static Integer RTWSEventPort;
public static Integer RTWSEventMaxConnections;
public static Integer RTWSMaxConnectionsPerUniqueIP;

public static Boolean hideClustersWithQuake;
public static Boolean antialiasingQuakes;
public static Boolean antialiasingOldQuakes;
Expand Down Expand Up @@ -242,6 +249,13 @@ private static void load() {
loadProperty("FDSNWSEventIP", "localhost"); //As a default, localhost is used for security.
loadProperty("FDSNWSEventPort", "8080");
loadProperty("autoStartFDSNWSEventServer", "false");


loadProperty("enableRTWSEventServer", "false");
loadProperty("RTWSEventIP", "localhost"); //As a default, localhost is used for security.
loadProperty("RTWSEventPort", "8081");
loadProperty("RTWSEventMaxConnections", "10000");
loadProperty("RTWSMaxConnectionsPerUniqueIP", "10");
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK


loadProperty("shakingLevelScale", "0",
o -> validateInt(0, IntensityScales.INTENSITY_SCALES.length - 1, (Integer) o));
Expand Down
21 changes: 20 additions & 1 deletion GlobalQuakeServer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,26 @@
<artifactId>JTransforms</artifactId>
<version>3.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.json/json -->


<!-- https://mvnrepository.com/artifact/org.eclipse.jetty/jetty-server -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>11.0.20</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.eclipse.jetty.websocket/websocket-jetty-server -->
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-jetty-server</artifactId>
<version>11.0.20</version>
</dependency>






<dependency>
<groupId>org.tinylog</groupId>
Expand Down
10 changes: 9 additions & 1 deletion GlobalQuakeServer/src/main/java/gqserver/main/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import gqserver.bot.DiscordBot;
import gqserver.fdsnws_event.FdsnwsEventsHTTPServer;

import gqserver.websocketserver.WebSocketEventServer;
import globalquake.utils.Scale;
import gqserver.server.GlobalQuakeServer;
import gqserver.ui.server.DatabaseMonitorFrame;
Expand Down Expand Up @@ -138,6 +138,7 @@ public static void updateProgressBar(String status, int value) {
}

private static final double PHASES = 10.0;
DecryptingElectrons marked this conversation as resolved.
Show resolved Hide resolved

private static int phase = 0;

public static void initAll() throws Exception{
Expand Down Expand Up @@ -170,6 +171,13 @@ public static void initAll() throws Exception{
getErrorHandler().handleWarning(new RuntimeException("Unable to start FDSNWS EVENT server! Check logs for more info.", e));
}
}

//Start the WebSocket Server, if enabled
updateProgressBar("Starting WebSocket Server...", (int) ((phase++ / PHASES) * 100.0));
if(Settings.enableRTWSEventServer){
WebSocketEventServer.getInstance().init();
WebSocketEventServer.getInstance().start();
}

updateProgressBar("Starting Discord Bot...", (int) ((phase++ / PHASES) * 100.0));
if(Settings.discordBotEnabled){
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package gqserver.websocketserver;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.concurrent.Future;

import org.eclipse.jetty.websocket.api.Session;

import org.tinylog.Logger;

public class Client {
private Session session;
private String ip;
private String uniqueID;
private Future<?> pingFuture;
private Long lastMessageTime = 0L;

private static Duration pingInterval = Duration.ofSeconds(25);

/**
* Create a new client object from a Jetty WebSocket session
* @param session
*/
public Client(Session session) {
this.session = session;


SocketAddress remoteAddress = session.getRemoteAddress();
//If the remote address is null, close the connection. Might happen.. idk
if(remoteAddress == null) {
Logger.error("A critical error occurred while trying to get the remote address for a new client");
session.close(0, "No remote address");
return;
}

InetSocketAddress inetAddress = (InetSocketAddress) remoteAddress;

ip = inetAddress.getAddress().getHostAddress();
uniqueID = ip + ":" + inetAddress.getPort();

pingFuture = Clients.getInstance().getPingExecutor().scheduleAtFixedRate(this::pingThread, pingInterval.toMillis(), pingInterval.toMillis(), java.util.concurrent.TimeUnit.MILLISECONDS);
}


private void pingThread(){
if(!isConnected()) {
pingFuture.cancel(true);
return;
}

Long timeSinceLastMessage = System.currentTimeMillis() - lastMessageTime;

//If the time since the last message is less than a third of the ping interval, don't send a ping
if(timeSinceLastMessage<pingInterval.toMillis()/3){
DecryptingElectrons marked this conversation as resolved.
Show resolved Hide resolved
return;
}

try {
session.getRemote().sendPing(null);
} catch (Exception e) {
session.close();
}

lastMessageTime = System.currentTimeMillis();
}

public void sendString(String message) throws IOException {
session.getRemote().sendString(message);
lastMessageTime = System.currentTimeMillis();
}

public boolean isConnected() {
return session.isOpen();
}

public void disconnectEvent() {
Clients.getInstance().clientDisconnected(this.getUniqueID());
}

public void updateLastMessageTime() {
lastMessageTime = System.currentTimeMillis();
}

public String getIP() {
return ip;
}

public Session getSession() {
return session;
}

public String getUniqueID() {
return uniqueID;
}
}
149 changes: 149 additions & 0 deletions GlobalQuakeServer/src/main/java/gqserver/websocketserver/Clients.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package gqserver.websocketserver;

import java.io.FileWriter;
import java.io.IOException;

import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;


import org.tinylog.Logger;

import globalquake.core.Settings;



public class Clients {
private ScheduledExecutorService pingExecutor;

// IP:PORT -> Client
private Hashtable<String, Client> clients;

//IP -> Integer
private Hashtable<String, Integer> uniqueIPConnectionCounts;

private static Clients instance = new Clients();
private Clients() {
pingExecutor = Executors.newScheduledThreadPool(4);
clients = new Hashtable<String, Client>();
uniqueIPConnectionCounts = new Hashtable<String, Integer>();
}

public int getCountForIP(String ip) {
int count = 0;
try{
count = uniqueIPConnectionCounts.get(ip);
}
catch(Exception e) {}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

umm what? maybe you wanted to use HashMap instead of Hashtable.. there is a function called getOrDefault(...)


return count;
}

/**
* Increment the connection count for the given unique IP address
* @param address
* @return The new connection count for the given IP address
*/
private int incrementConnectionCount(String address) {
int count = 0;

synchronized (uniqueIPConnectionCounts) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the synchronizations here seem a bit sketchy to me 💀 I would be quite worried about deadlocks or race conditions from my experience. Perhaps you can use ConcurrentHashMap to do the job?

if (uniqueIPConnectionCounts.containsKey(address)) {
int currentCount = uniqueIPConnectionCounts.get(address);
count = currentCount + 1; //Used to return the count from function
uniqueIPConnectionCounts.put(address, count);
} else {
uniqueIPConnectionCounts.put(address, 1);
count = 1;
}
}

return count;
}

/**
* Decrement the connection count for the given unique IP address
* @param address
* @return The new connection count for the given IP address
*/
private int decrementConnectionCount(String address) {
int count = 0;

synchronized (uniqueIPConnectionCounts) {
if(!uniqueIPConnectionCounts.containsKey(address)) {
return 0;
}

int currentCount = uniqueIPConnectionCounts.get(address);
count = currentCount - 1; //Used to return the count from function
if (count <= 0) {
uniqueIPConnectionCounts.remove(address);
} else {
uniqueIPConnectionCounts.put(address, count);
}
}

return count;
}

public void clientDisconnected(String uniqueID) {
Logger.info("Client disconnected: " + uniqueID);

Client client = clients.get(uniqueID);
if(client == null) {
return;
}

decrementConnectionCount(client.getIP());
clients.remove(client.getUniqueID());
}

public synchronized void addClient(Client client) {
Logger.info("Client connected: " + client.getUniqueID());

clients.put(client.getUniqueID(), client);
incrementConnectionCount(client.getIP());

//Close the connection if the number of connections from this IP exceeds the limit
if(uniqueIPConnectionCounts.get(client.getIP()) > Settings.RTWSMaxConnectionsPerUniqueIP) {
client.getSession().close(4420, "Too many connections from this IP");
}
}

public List<Client> getClients() {
return new ArrayList<Client>(clients.values());
}

public ScheduledExecutorService getPingExecutor() {
return pingExecutor;
}

public void DEBUG_SAVE_CONNECTION_COUNTS() {
String filename = "connection_counts.txt";

try {
FileWriter writer = new FileWriter(filename);

writer.write(uniqueIPConnectionCounts.toString());

writer.write("\n\n");

int totalConnections = 0;
totalConnections = clients.size();
writer.write("Total connections: " + totalConnections);

writer.close();
} catch (IOException e) {
e.printStackTrace();
}

}

public static Clients getInstance() {
return instance;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package gqserver.websocketserver;

import org.eclipse.jetty.websocket.server.JettyServerUpgradeRequest;
import org.eclipse.jetty.websocket.server.JettyServerUpgradeResponse;
import org.eclipse.jetty.websocket.server.JettyWebSocketCreator;
import org.tinylog.Logger;

import globalquake.core.Settings;



/**
* This class implements the JettyWebSocketCreator interface and is responsible for creating WebSocket instances for IP connections with limited connections.
* It checks the number of connections from a specific IP address and returns a WebSocket instance if the number of connections is below the maximum limit.
* If the number of connections exceeds the maximum limit, it logs a message and does not create a WebSocket instance.
* A WebSocket not being returned will cause the connection to be closed.
*/
public class EventEndpointCreatorIPConnectionLimited implements JettyWebSocketCreator
{

public EventEndpointCreatorIPConnectionLimited() {
super();
}

@Override
public synchronized Object createWebSocket(JettyServerUpgradeRequest jettyServerUpgradeRequest, JettyServerUpgradeResponse jettyServerUpgradeResponse)
{
//If the server overall has too many connections, don't create a new connection
if(Clients.getInstance().getClients().size() >= Settings.RTWSEventMaxConnections) {
Logger.error("Maximum number of connections reached, not creating new connection");
return null;
}

String ip = jettyServerUpgradeRequest.getHttpServletRequest().getRemoteAddr();
int count = Clients.getInstance().getCountForIP(ip);

//If the IP does not have too many connections, create a new connection
if(!(count >= Settings.RTWSMaxConnectionsPerUniqueIP)) {
return new ServerEndpoint();
}

/*
Attempt to kick the connection early if the IP has too many connections
Clients.addClient will also close connections if the IP has too many connections
*/
try {
jettyServerUpgradeResponse.sendForbidden("Too many connections from this IP");
Logger.info("Connection from " + ip + " was denied due to too many connections");
} catch (Exception e) {
Logger.error(e, "Error occurred while trying to send forbidden response");
}

return null;
}
}
Loading
Loading