Skip to content

Commit

Permalink
Merge pull request #23 from terascope/fixes-key
Browse files Browse the repository at this point in the history
v2.1.2 return a key not a buffer in metadata, fixes #22
  • Loading branch information
peterdemartini authored May 6, 2019
2 parents 7198e77 + ea31700 commit eff234a
Show file tree
Hide file tree
Showing 12 changed files with 23 additions and 17 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ before_install:
- export KAFKA_TMP="$HOME/.kafka-downloads"
- mkdir -p "$KAFKA_HOME"
- mkdir -p "$KAFKA_TMP"
- '[[ ! -f "$KAFKA_TMP/kafka.tgz" ]] && wget https://www.apache.org/dist/kafka/2.1.0/kafka_2.11-2.1.0.tgz -O "$KAFKA_TMP/kafka.tgz" || echo "* using cache"'
- '[[ ! -f "$KAFKA_TMP/kafka.tgz" ]] && wget https://www.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz -O "$KAFKA_TMP/kafka.tgz" || echo "* using cache"'
- tar xzf "$KAFKA_TMP/kafka.tgz" -C $KAFKA_HOME --strip-components 1
# add bin to path for ease of use
- export PATH="$KAFKA_HOME/bin:$PATH"
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ terafoundation:
{
// make sure to include the asset bundle
// additionally you can specify the version
// "kafka:2.1.1"
// "kafka:2.1.2"
"assets": [ "kafka" ],
"apis": [
{
Expand Down Expand Up @@ -219,7 +219,7 @@ terafoundation:
"slicers": 1,
// make sure to include the asset bundle
// additionally you can specify the version
// "kafka:2.1.1"
// "kafka:2.1.2"
"assets": [ "kafka" ],
// ...
"operations": [
Expand Down Expand Up @@ -271,7 +271,7 @@ terafoundation:
{
// make sure to include the asset bundle
// additionally you can specify the version
// "kafka:2.1.1"
// "kafka:2.1.2"
"assets": [ "kafka" ],
// ...
"operations": [
Expand Down
2 changes: 1 addition & 1 deletion asset/asset.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "kafka",
"description": "Kafka reader and writer support.",
"version": "2.1.1"
"version": "2.1.2"
}
2 changes: 1 addition & 1 deletion asset/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "kafka-assets",
"version": "2.1.1",
"version": "2.1.2",
"private": true,
"description": "Teraslice asset for kafka operations",
"main": "index.js",
Expand Down
2 changes: 1 addition & 1 deletion asset/src/_kafka_clients/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export interface ConsumerClientConfig {

export interface ProduceMessage {
data: Buffer;
key: string|null;
key: Buffer|string|null;
timestamp: number|null;
}

Expand Down
8 changes: 4 additions & 4 deletions asset/src/_kafka_helpers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { codeToMessage, okErrors } from './error-codes';
export type AnyKafkaError = Error|KafkaError|number|string|null;

export interface KafkaError extends Error {
code: number;
code: number|string;
}

export function wrapError(message: string, err: AnyKafkaError): KafkaError {
Expand Down Expand Up @@ -34,7 +34,7 @@ function getErrorCause(err: any): string {
let message = causedBy;
message += typeof err === 'object' ? err.message : toString(err);

let code: number|null = null;
let code: number|string|null = null;

if (isKafkaError(err)) {
code = err.code;
Expand Down Expand Up @@ -73,7 +73,7 @@ export interface KafkaMessageMetadata {

export interface KafkaMessage {
/** the message key */
key: string;
key: Buffer|string;
/** the topic name */
topic: string;
/** the partition on the topic the message was on */
Expand All @@ -93,7 +93,7 @@ export function isKafkaError(err: any): err is KafkaError {
return isError(err) && err.code != null;
}

export function isOkayError(err: AnyKafkaError, action: string = 'any'): boolean {
export function isOkayError(err?: AnyKafkaError, action: string = 'any'): boolean {
if (err == null) return false;
const code = isKafkaError(err) ? err.code : err as number;
if (action === 'retryable') {
Expand Down
8 changes: 7 additions & 1 deletion asset/src/kafka_reader/fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export default class KafkaFetcher extends Fetcher<KafkaReaderConfig> {
const map = this.tryRecord((msg: KafkaMessage): DataEntity => {
const now = Date.now();
const metadata: KafkaMessageMetadata = {
_key: msg.key,
_key: keyToString(msg.key),
_ingestTime: msg.timestamp,
_processTime: now,
// TODO this should be based of an actual value
Expand Down Expand Up @@ -114,3 +114,9 @@ export default class KafkaFetcher extends Fetcher<KafkaReaderConfig> {
return connection.client;
}
}

/** Safely convert a buffer or string to a string */
function keyToString(str?: string|Buffer) {
if (!str) return '';
return str.toString('utf8');
}
2 changes: 1 addition & 1 deletion docs/kafka_dead_letter/USAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
{
// make sure to include the asset bundle
// additionally you can specify the version
// "kafka:2.1.1"
// "kafka:2.1.2"
"assets": [ "kafka" ],
"apis": [
{
Expand Down
2 changes: 1 addition & 1 deletion docs/kafka_reader/USAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"slicers": 1,
// make sure to include the asset bundle
// additionally you can specify the version
// "kafka:2.1.1"
// "kafka:2.1.2"
"assets": [ "kafka" ],
// ...
"operations": [
Expand Down
2 changes: 1 addition & 1 deletion docs/kafka_sender/USAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
{
// make sure to include the asset bundle
// additionally you can specify the version
// "kafka:2.1.1"
// "kafka:2.1.2"
"assets": [ "kafka" ],
// ...
"operations": [
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "kafka-asset-bundle",
"version": "2.1.1",
"version": "2.1.2",
"description": "A bundle of Kafka operations and processors for Teraslice",
"private": true,
"scripts": {
Expand Down
2 changes: 1 addition & 1 deletion test/helpers-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ describe('isOkayError helper', () => {
describe.each(alwaysBad)('when consuming and checking error code %s', (code) => {
it('should return false', () => {
const err = new Error('Uh oh') as KafkaError;
err.code = code;
err.code = code as string|number;

expect(isOkayError(err, 'consume')).toBeFalse();
expect(isOkayError(code, 'consume')).toBeFalse();
Expand Down

0 comments on commit eff234a

Please sign in to comment.