Skip to content

Commit

Permalink
Merge pull request #7 from turnerlabs/allow_custom_kdshashkey
Browse files Browse the repository at this point in the history
Added code to crack open the JSON to see if it has the kdshashkey
  • Loading branch information
smithatlanta authored Mar 21, 2022
2 parents 38bba80 + 120284c commit 91f8686
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 1 deletion.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ building a data ingestion app using Kinesis in a language other than Java.
- Point the server to your kinesis stream by setting the `AWS_DEFAULT_REGION` and `KINESIS_STREAM` environment variables. Once the server is
up and running, you can send data to Kinesis by opening a socket connection and sending utf-8 data.
- Each record you send should be delimited by a new line.
- If you don't pass in a hash key a random hash key is generated for you.
- To pass in a hash key, add the hash key as kdshashkey on the root of your JSON object.

When service starts, it exposes two ports:
1. Inlet Port: This port is used to receive the message from your app to be sent to kinesis. The server defaults to port `3000` but can be overridden by setting the `PORT` environment variable.
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.9</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -46,14 +48,33 @@ public void runOnce(String line) throws Exception {

ByteBuffer data = ByteBuffer.wrap(finalLine.getBytes(java.nio.charset.StandardCharsets.UTF_8));

// Need to serialize this to an object to get the key.
String hashKey;

Gson gson = new Gson();
try {
MinimalKey minimal = gson.fromJson(line, MinimalKey.class);
if (minimal.kdsHashKey != null) {
hashKey = minimal.kdsHashKey;
log.debug("Using passed in hash key");
} else {
hashKey = randomExplicitHashKey();
log.debug("Using random hash key");
}
}
catch (JsonSyntaxException e) {
hashKey = randomExplicitHashKey();
log.debug("Using random hash key");
}

//This is a measure of the backpressure in the system, which should be checked before putting more records,
//to avoid exhausting system resources.
while (kinesis.getOutstandingRecordsCount() > 1e4) {
log.info("Too many outstanding records pending in the queue. Waiting for a second.");
Thread.sleep(500);
}

UserRecord userRecord = new UserRecord(stream, " ", randomExplicitHashKey(), data);
UserRecord userRecord = new UserRecord(stream, " ", hashKey, data);
ListenableFuture<UserRecordResult> f = kinesis.addUserRecord(userRecord);

Futures.addCallback(f, new FutureCallback<UserRecordResult>() {
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/warnermedia/kplserver/MinimalKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.warnermedia.kplserver;

public class MinimalKey {
String kdsHashKey = null;
}
39 changes: 39 additions & 0 deletions src/main/java/com/warnermedia/kplserver/TestClientJSONKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.warnermedia.kplserver;

import com.google.gson.Gson;

import java.io.*;
import java.net.Socket;


public class TestClientJSONKey {
public static class TestWithKey {
String kdsHashKey;
String testa;
}

public static void main(String[] args) throws Exception, IOException, ClassNotFoundException, InterruptedException {

System.out.println("starting client");

// establish socket connection to server
Socket socket = new Socket("127.0.0.1", 3000);
OutputStreamWriter out = new OutputStreamWriter(socket.getOutputStream(), "UTF-8");

System.out.println("Sending request to Socket Server");

TestWithKey tst = new TestWithKey();
tst.kdsHashKey = "mykey";
tst.testa ="hello";

Gson gson = new Gson();
String jsonResult = gson.toJson(tst);
String jsonFinal = jsonResult + "\n";

out.write(jsonFinal);
out.flush();
Thread.sleep(100);

socket.close();
}
}
37 changes: 37 additions & 0 deletions src/main/java/com/warnermedia/kplserver/TestClientJSONNoKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.warnermedia.kplserver;

import com.google.gson.Gson;

import java.io.*;
import java.net.Socket;


public class TestClientJSONNoKey {
public static class TestWithNoKey {
String testa;
}

public static void main(String[] args) throws Exception, IOException, ClassNotFoundException, InterruptedException {

System.out.println("starting client");

// establish socket connection to server
Socket socket = new Socket("127.0.0.1", 3000);
OutputStreamWriter out = new OutputStreamWriter(socket.getOutputStream(), "UTF-8");

System.out.println("Sending request to Socket Server");

TestWithNoKey tst = new TestWithNoKey();
tst.testa ="hello";

Gson gson = new Gson();
String jsonResult = gson.toJson(tst);
String jsonFinal = jsonResult + "\n";

out.write(jsonFinal);
out.flush();
Thread.sleep(100);

socket.close();
}
}

0 comments on commit 91f8686

Please sign in to comment.