Skip to content

Commit

Permalink
Merge pull request #24 from terascope/fix-2.1.2
Browse files Browse the repository at this point in the history
v2.1.3 Fix case when using no key and elasticsearch bulk
  • Loading branch information
peterdemartini authored May 9, 2019
2 parents eff234a + 816f056 commit a2c236e
Show file tree
Hide file tree
Showing 9 changed files with 12 additions and 11 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ terafoundation:
{
// make sure to include the asset bundle
// additionally you can specify the version
// "kafka:2.1.2"
// "kafka:2.1.3"
"assets": [ "kafka" ],
"apis": [
{
Expand Down Expand Up @@ -219,7 +219,7 @@ terafoundation:
"slicers": 1,
// make sure to include the asset bundle
// additionally you can specify the version
// "kafka:2.1.2"
// "kafka:2.1.3"
"assets": [ "kafka" ],
// ...
"operations": [
Expand Down Expand Up @@ -271,7 +271,7 @@ terafoundation:
{
// make sure to include the asset bundle
// additionally you can specify the version
// "kafka:2.1.2"
// "kafka:2.1.3"
"assets": [ "kafka" ],
// ...
"operations": [
Expand Down
2 changes: 1 addition & 1 deletion asset/asset.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "kafka",
"description": "Kafka reader and writer support.",
"version": "2.1.2"
"version": "2.1.3"
}
2 changes: 1 addition & 1 deletion asset/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "kafka-assets",
"version": "2.1.2",
"version": "2.1.3",
"private": true,
"description": "Teraslice asset for kafka operations",
"main": "index.js",
Expand Down
2 changes: 1 addition & 1 deletion asset/src/_kafka_helpers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ function getErrorCause(err: any): string {

export interface KafkaMessageMetadata {
/** the message key */
_key: string;
_key: string|null;
/** The time at which the data was ingested into the source data */
_ingestTime: number;
/** The time at which the data was consumed by the reader */
Expand Down
3 changes: 2 additions & 1 deletion asset/src/kafka_reader/fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export default class KafkaFetcher extends Fetcher<KafkaReaderConfig> {
async fetch() {
const map = this.tryRecord((msg: KafkaMessage): DataEntity => {
const now = Date.now();

const metadata: KafkaMessageMetadata = {
_key: keyToString(msg.key),
_ingestTime: msg.timestamp,
Expand Down Expand Up @@ -117,6 +118,6 @@ export default class KafkaFetcher extends Fetcher<KafkaReaderConfig> {

/** Safely convert a buffer or string to a string */
function keyToString(str?: string|Buffer) {
if (!str) return '';
if (!str) return null;
return str.toString('utf8');
}
2 changes: 1 addition & 1 deletion docs/kafka_dead_letter/USAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
{
// make sure to include the asset bundle
// additionally you can specify the version
// "kafka:2.1.2"
// "kafka:2.1.3"
"assets": [ "kafka" ],
"apis": [
{
Expand Down
2 changes: 1 addition & 1 deletion docs/kafka_reader/USAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"slicers": 1,
// make sure to include the asset bundle
// additionally you can specify the version
// "kafka:2.1.2"
// "kafka:2.1.3"
"assets": [ "kafka" ],
// ...
"operations": [
Expand Down
2 changes: 1 addition & 1 deletion docs/kafka_sender/USAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
{
// make sure to include the asset bundle
// additionally you can specify the version
// "kafka:2.1.2"
// "kafka:2.1.3"
"assets": [ "kafka" ],
// ...
"operations": [
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "kafka-asset-bundle",
"version": "2.1.2",
"version": "2.1.3",
"description": "A bundle of Kafka operations and processors for Teraslice",
"private": true,
"scripts": {
Expand Down

0 comments on commit a2c236e

Please sign in to comment.