Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KCL 2.0 DynamoDBLeaseRefresher cannot marshall request to JSON for local testing #554

Open
dbottini2 opened this issue May 22, 2019 · 5 comments

Comments

@dbottini2
Copy link

I have been trying to write integration tests for an application that uses KCL 2.0, using kinesalite v2.0.0 and dynalite v2.3.1 to mock kinesis and dynamodb. The issue I am having is that KCL 2.0 cannot create/describe the checkpoint table, with the specific (internal) exception software.amazon.awssdk.core.exception.SdkClientException: Unable to marshall request to JSON: host must not be null.
The stack trace is always

0 = {StackTraceElement@11714} "software.amazon.kinesis.retrieval.AWSExceptionManager.apply(AWSExceptionManager.java:65)"
1 = {StackTraceElement@11715} "software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher.tableStatus(DynamoDBLeaseRefresher.java:196)"
2 = {StackTraceElement@11716} "software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher.createLeaseTableIfNotExists(DynamoDBLeaseRefresher.java:139)"
3 = {StackTraceElement@11717} "software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator.initialize(DynamoDBLeaseCoordinator.java:213)"
4 = {StackTraceElement@11718} "software.amazon.kinesis.coordinator.Scheduler.initialize(Scheduler.java:239)"
5 = {StackTraceElement@11719} "software.amazon.kinesis.coordinator.Scheduler.run(Scheduler.java:213)"

My console ends up looking like the following, which doesn't give much insight either.

{"message":"Initialization attempt 1","logger_name":"software.amazon.kinesis.coordinator.Scheduler","thread_name":"main","level":"INFO","level_value":20000}
{"message":"Initializing LeaseCoordinator","logger_name":"software.amazon.kinesis.coordinator.Scheduler","thread_name":"main","level":"INFO","level_value":20000}
{"message":"Initialization attempt 2","logger_name":"software.amazon.kinesis.coordinator.Scheduler","thread_name":"main","level":"INFO","level_value":20000}
{"message":"Initializing LeaseCoordinator","logger_name":"software.amazon.kinesis.coordinator.Scheduler","thread_name":"main","level":"INFO","level_value":20000}
{"message":"Initialization attempt 3","logger_name":"software.amazon.kinesis.coordinator.Scheduler","thread_name":"main","level":"INFO","level_value":20000}
...

Everything functions properly from the aws cli, i.e. create/read streams using kinesalite and create/read tables using dynalite, so I do not think that the issue is there. I am also able to directly use the KinesisAsyncClient and DynamoDbAsyncClient to create/write/read local streams/tables, so I know the SSL cert check disable is working right, as well as the HTTP 1.1 override, and the endpoints are getting passed around correctly.

If you need more code/configuration/etc to reproduce the issues I've been seeing, just let me know what you need.

Version:
amazon-kinesis-client: 2.2.0
kinesalite: 2.0.0
dynalite: 2.3.1

Environment variables:

        System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
        environmentVariables.set("AWS_REGION", "local"); // class is org.junit.contrib.java.lang.system.EnvironmentVariables

Async Clients initialization code:

            this.kinesisAsyncClient = KinesisAsyncClient.builder()
                    .httpClient(NettyNioAsyncHttpClient
                            .builder()
                            .protocol(Protocol.HTTP1_1)
                            .buildWithDefaults(
                                    AttributeMap
                                            .builder()
                                            .put(
                                                    SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
                                                    Boolean.TRUE
                                            )
                                            .build()
                            ))
                    .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")))
                    .endpointOverride(URI.create(configuration.getKinesaliteUrl()))
                    .build();

            this.dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                    .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")))
                    .endpointOverride(URI.create(configuration.getDynaliteUrl()))
                    .build();

Scheduler initialization code:

                kinesisAsyncClient);
        RetrievalConfig retrievalConfig = new RetrievalConfig(kinesisAsyncClient,
                streamConfiguration.getStreamName(),
                streamConfiguration.getStreamName())
                .retrievalSpecificConfig(pollingConfig);
        this.configsBuilder = new ConfigsBuilder(streamConfiguration.getStreamName(),
                streamConfiguration.getStreamName(),
                kinesisAsyncClient,
                dynamoDbAsyncClient,
                cloudWatchClient,
                workerId,
                backupRecordProcessorFactory)
                .tableName(streamConfiguration.getCheckpointTable());
        this.scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig().metricsLevel(MetricsLevel.NONE).metricsFactory(new NullMetricsFactory()),
                configsBuilder.processorConfig(),
                retrievalConfig
        );

Kinesalite/Dynalite ps entries (ports are random)

dynalite --port=59261 --createTableMs=0 --deleteTableMs=0 --updateTableMs=0
kinesalite --port=59255 --createStreamMs=0 --updateStreamMs=0 --deleteStreamMs=0 --ssl
@x418
Copy link

x418 commented May 23, 2019

With KCL v2, it's not just enough if you override the HttpClient of KinesisAsyncClient. You will have to do the same for DynamoDB as well as Cloudwatch. For eg., this works for me (Code is from a Spring project)

  @Bean
  public KinesisAsyncClient kinesisAsyncClient() {
    KinesisAsyncClientBuilder clientBuilder = KinesisAsyncClient.builder().region(of(region));
    String kinesisEndpoint = System.getProperty("kinesisEndpoint");
    overrideHttpClientWhenUsingLocalstack(clientBuilder, kinesisEndpoint);
    if(kinesisEndpoint!=null){ //When overriden, we can't use KinesisClientUtil to create the client
      return clientBuilder.build();
    }
    return KinesisClientUtil.createKinesisAsyncClient(clientBuilder);
  }

  @Bean
  public DynamoDbAsyncClient dynamoDbAsyncClient() {
    DynamoDbAsyncClientBuilder clientBuilder = DynamoDbAsyncClient.builder().region(of(this.region));
    overrideHttpClientWhenUsingLocalstack(clientBuilder, System.getProperty("dynamodbEndpoint"));
    return clientBuilder.build();
  }

  @Bean
  public CloudWatchAsyncClient cloudWatchAsyncClient() {
    CloudWatchAsyncClientBuilder clientBuilder = CloudWatchAsyncClient.builder().region(of(this.region));
    overrideHttpClientWhenUsingLocalstack(clientBuilder, System.getProperty("cloudwatchEndpoint"));
    return clientBuilder.build();
  }


  private void overrideHttpClientWhenUsingLocalstack(SdkAsyncClientBuilder clientBuilder, String endpoint) {
    //Workaround for Localstack See https://github.com/localstack/localstack/issues/893#issuecomment-486898065
    if (StringUtils.isNotBlank(endpoint)) {
      clientBuilder.httpClient(NettyNioAsyncHttpClient.builder().buildWithDefaults(AttributeMap.builder().put(TRUST_ALL_CERTIFICATES, TRUE).build()));
      ((SdkClientBuilder) clientBuilder).endpointOverride(URI.create(endpoint));
    }
  }

@dbottini2
Copy link
Author

I have tried as such with the DynamoDbAsyncClient, with both SSL on and SSL off for dynalite, with no change in the error.

this.dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                    .region(Region.of("local")) // error does not change if I include region or not
                    .httpClient(NettyNioAsyncHttpClient
                            .builder()
                            // No need to explicitly use HTTP/1.1 https://github.com/awslabs/amazon-kinesis-client/issues/440#issuecomment-428599241
                            //.protocol(Protocol.HTTP1_1)
                            .buildWithDefaults(
                                    AttributeMap
                                            .builder()
                                            .put(
                                                    SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
                                                    Boolean.TRUE
                                            )
                                            .build()
                            ))
                    .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")))
                    .endpointOverride(URI.create(configuration.getDynaliteUrl()))
                    .build();

I did get CloudWatch errors until I added

                configsBuilder.metricsConfig().metricsLevel(MetricsLevel.NONE).metricsFactory(new NullMetricsFactory()),

so I am led to believe the current CloudWatch client is fine.
In any case, I tried this (without anything running on localhost there) and it didn't change the way it failed (getting stuck in a loop, with an internal failure with JSON marshalling).

this.cloudWatchClient = CloudWatchAsyncClient.builder()
                .region(Region.of("local")) // error does not change if I include region or not
                .httpClient(NettyNioAsyncHttpClient
                        .builder()
                        .buildWithDefaults(
                                AttributeMap
                                        .builder()
                                        .put(
                                                SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
                                                Boolean.TRUE
                                        )
                                        .build()
                        ))
                .endpointOverride(URI.create("https://localhost:8080"))
                .build();

@x418
Copy link

x418 commented May 24, 2019

Do you happen to have differing jackson versions?

@dbottini2
Copy link
Author

Maven: com.fasterxml.jackson.core:jackson-annotations:2.9.0
Maven: com.fasterxml.jackson.core:jackson-core:2.9.8
Maven: com.fasterxml.jackson.core:jackson-databind:2.9.8
Maven: com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.9.8
Maven: com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.9.8
Maven: com.fasterxml.jackson.datatype:jackson-datatype-guava:2.9.8
Maven: com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.9.8
Maven: com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.9.8
Maven: com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.9.8
Maven: com.fasterxml.jackson.module:jackson-module-parameter-names:2.9.8

I just attempted it with 2.9.9 on all of them, but that did not resolve, nor change the outputs/internal error, of the commands I'm running.

@mahesh031
Copy link

Facing the same issue. Did any one find the solution for this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants