Skip to content

Commit

Permalink
Merge 40861ae into 7bb33eb
Browse files Browse the repository at this point in the history
  • Loading branch information
smithatlanta authored Apr 14, 2022
2 parents 7bb33eb + 40861ae commit f8e2ef6
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 3 deletions.
9 changes: 8 additions & 1 deletion src/main/java/com/warnermedia/kplserver/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public static void main(String[] args) throws Exception {
ServerSocket errSocket = new ServerSocket(port);
errSocket.setSoTimeout(100);

KinesisEventPublisher kinesisEventPublisher = new KinesisEventPublisher(stream, getRegion(), getMetricsLevel(), errSocket);
KinesisEventPublisher kinesisEventPublisher = new KinesisEventPublisher(stream, getRegion(), getMetricsLevel(), getCrossAccountRole(), errSocket);

// graceful shutdowns
Runtime.getRuntime().addShutdownHook(new Thread() {
Expand Down Expand Up @@ -88,4 +88,11 @@ static String getMetricsLevel() {
return p;
}

static String getCrossAccountRole() {
String p = System.getenv("CROSS_ACCOUNT_ROLE");
if (p == null || p.equals("")) {
return "";
}
return p;
}
}
48 changes: 46 additions & 2 deletions src/main/java/com/warnermedia/kplserver/KinesisEventPublisher.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
package com.warnermedia.kplserver;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicSessionCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import com.amazonaws.services.kinesis.producer.UserRecord;
import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceAsyncClientBuilder;
import com.amazonaws.services.securitytoken.model.AssumeRoleRequest;
import com.amazonaws.services.securitytoken.model.AssumeRoleResult;
import com.amazonaws.services.securitytoken.model.Credentials;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -36,14 +46,48 @@ public class KinesisEventPublisher {
ServerSocket errSocket;
Socket errClient;

public KinesisEventPublisher(String stream, String region, String metricsLevel, ServerSocket errSocket) {
public KinesisEventPublisher(String stream, String region, String metricsLevel, String crossAccountRole, ServerSocket errSocket) {
this.stream = stream;
kinesis = new KinesisProducer(new KinesisProducerConfiguration()
.setRegion(region)
.setMetricsLevel(metricsLevel));
.setMetricsLevel(metricsLevel)
.setCredentialsProvider(loadCredentials(crossAccountRole)));
this.errSocket = errSocket;
}

private static AWSCredentialsProvider loadCredentials(String crossAccountRole) {
final AWSCredentialsProvider credentialsProvider;

Boolean isCrossAccount = false;
if (!crossAccountRole.equals("")) {
isCrossAccount = true;
}

if (isCrossAccount) {
AWSSecurityTokenService stsClient = AWSSecurityTokenServiceAsyncClientBuilder.standard()
.withCredentials(new ProfileCredentialsProvider("nonprodjump"))
.withRegion("us-east-1")
.build();

AssumeRoleRequest assumeRoleRequest = new AssumeRoleRequest().withDurationSeconds(3600)
.withRoleArn(crossAccountRole)
.withRoleSessionName("Kinesis_Session");

AssumeRoleResult assumeRoleResult = stsClient.assumeRole(assumeRoleRequest);
Credentials creds = assumeRoleResult.getCredentials();

credentialsProvider = new AWSStaticCredentialsProvider(
new BasicSessionCredentials(creds.getAccessKeyId(),
creds.getSecretAccessKey(),
creds.getSessionToken())
);
} else {
credentialsProvider = new DefaultAWSCredentialsProviderChain();
}

return credentialsProvider;
}

public void runOnce(String line) throws Exception {
// add new line so that downstream systems have an easier time parsing
String finalLine = line + "\n";
Expand Down

0 comments on commit f8e2ef6

Please sign in to comment.