Skip to content

Commit

Permalink
Merge pull request #17 from terascope/fixes-to-2.0.x
Browse files Browse the repository at this point in the history
v2.0.2 Use Conventional Metadata and performance improvements
  • Loading branch information
kstaken authored Jan 24, 2019
2 parents 12f8211 + a312165 commit 11eabb2
Show file tree
Hide file tree
Showing 14 changed files with 101 additions and 108 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ terafoundation:
{
// make sure to include the asset bundle
// additionally you can specify the version
// "kafka:2.0.1"
// "kafka:2.0.2"
"assets": [ "kafka" ],
"apis": [
{
Expand Down Expand Up @@ -205,7 +205,7 @@ terafoundation:
"slicers": 1,
// make sure to include the asset bundle
// additionally you can specify the version
// "kafka:2.0.1"
// "kafka:2.0.2"
"assets": [ "kafka" ],
// ...
"operations": [
Expand Down Expand Up @@ -256,7 +256,7 @@ terafoundation:
{
// make sure to include the asset bundle
// additionally you can specify the version
// "kafka:2.0.1"
// "kafka:2.0.2"
"assets": [ "kafka" ],
// ...
"operations": [
Expand Down Expand Up @@ -316,4 +316,4 @@ Please make sure to update tests as appropriate.

## License

[MIT](./LICENSE) licensed.
[MIT](./LICENSE) licensed.
2 changes: 1 addition & 1 deletion asset/asset.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "kafka",
"description": "Kafka reader and writer support.",
"version": "2.0.1"
"version": "2.0.2"
}
5 changes: 1 addition & 4 deletions asset/package.json
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
{
"name": "kafka-assets",
"version": "2.0.1",
"version": "2.0.2",
"private": true,
"description": "Teraslice asset for kafka operations",
"main": "index.js",
"dependencies": {
"@terascope/job-components": "^0.14.3",
"lodash.chunk": "^4.2.0",
"lodash.isequal": "^4.5.0",
"lodash.omit": "^4.5.0",
"lodash.once": "^4.1.1"
},
"devDependencies": {},
Expand Down
9 changes: 7 additions & 2 deletions asset/src/_kafka_clients/consumer-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ export default class ConsumerClient extends BaseClient<kafka.KafkaConsumer> {
}
}

this._logger.debug(`consumed ${results.length} messages`);
this._logger.info(`Resolving with ${results.length} results`);
return results;
}

Expand All @@ -205,8 +205,13 @@ export default class ConsumerClient extends BaseClient<kafka.KafkaConsumer> {
/* istanbul ignore next */
if (messages == null) return [];

for (const message of messages) {
const total = messages.length;

for (let i = 0; i < total; i++) {
const message = messages[i];

this._trackOffsets(message);

const entity = map(message);
if (entity != null) {
results.push(entity);
Expand Down
42 changes: 21 additions & 21 deletions asset/src/_kafka_clients/producer-client.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import chunk from 'lodash.chunk';
import { ProduceMessage, ProducerClientConfig } from './interfaces';
import { wrapError, AnyKafkaError } from '../_kafka_helpers';
import * as kafka from 'node-rdkafka';
Expand Down Expand Up @@ -47,30 +46,31 @@ export default class ProducerClient extends BaseClient<kafka.Producer> {
error = wrapError('Client error while producing', err);
});

const chunks = chunk(messages, this._bufferSize);
const sizes = chunks.map((c) => c.length);
this._logger.debug(`producing batches ${JSON.stringify(sizes)}...`);
const total = messages.length;
this._logger.debug(`producing ${total} messages in batches ${Math.floor(total / this._bufferSize)}...`);

try {
// Break the messages into chunks so the queue
// can be flushed after each "chunk"
for (const msgs of chunks) {
// for each message
for (const msg of msgs) {
const message: ProduceMessage = (map == null) ? msg : map(msg);
// Send the messages, after each buffer size is complete
// flush the messages
for (let i = 0; i < total; i++) {
const msg = messages[i];
const message: ProduceMessage = (map == null) ? msg : map(msg);

this._client.produce(
this._topic,
// This is the partition. There may be use cases where
// we'll need to control this.
null,
message.data,
message.key,
message.timestamp
);
}
this._client.produce(
this._topic,
// This is the partition. There may be use cases where
// we'll need to control this.
null,
message.data,
message.key,
message.timestamp
);

await this._try(() => this._flush(), 'produce', 0);
// flush the messages at the end of the buffer size
// or the end of the messages
if (i % this._bufferSize === 0 || i === (total - 1)) {
await this._try(() => this._flush(), 'produce', 0);
}
}
} finally {
off();
Expand Down
24 changes: 20 additions & 4 deletions asset/src/_kafka_helpers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,37 @@ function getErrorCause(err: any): string {
}

export interface KafkaMessageMetadata {
/** the message key */
_key: string;
/** The time at which the data was ingested into the source data */
_ingestTime: number;
/** The time at which the data was consumed by the reader */
_processTime: number;
/** TODO - a time off of the specific field */
_eventTime: number;
/** the topic name */
topic: string;
/** the partition on the topic the message was on */
partition: number;
/** the offset of the message */
offset: number;
/** the message size, in bytes. */
size: number;
}

export interface KafkaMessage {
/** the message key */
key: string;
/** the message size, in bytes. */
/** the topic name */
topic: string;
/** the partition on the topic the message was on */
partition: number;
/** the offset of the message */
offset: number;
/** the message size, in bytes */
size: number;
/** the message timestamp */
timestamp: number;
}

export interface KafkaMessage extends KafkaMessageMetadata {
/** the message data */
value: Buffer;
}
Expand Down
15 changes: 13 additions & 2 deletions asset/src/kafka_reader/fetcher.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import omit from 'lodash.omit';
import { KafkaReaderConfig } from './interfaces';
import {
Fetcher,
Expand Down Expand Up @@ -41,7 +40,19 @@ export default class KafkaFetcher extends Fetcher<KafkaReaderConfig> {

async fetch() {
const map = this.tryRecord((msg: KafkaMessage): DataEntity => {
const metadata: KafkaMessageMetadata = omit(msg, 'value');
const now = Date.now();
const metadata: KafkaMessageMetadata = {
_key: msg.key,
_ingestTime: msg.timestamp,
_processTime: now,
// TODO this should be based of an actual value
_eventTime: now,
topic: msg.topic,
partition: msg.partition,
offset: msg.offset,
size: msg.size,
};

return DataEntity.fromBuffer(
msg.value,
this.opConfig,
Expand Down
15 changes: 0 additions & 15 deletions asset/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -93,26 +93,11 @@ kind-of@^6.0.2:
resolved "https://registry.yarnpkg.com/kind-of/-/kind-of-6.0.2.tgz#01146b36a6218e64e58f3a8d66de5d7fc6f6d051"
integrity sha512-s5kLOcnH0XqDO+FvuaLX8DDjZ18CGFk7VygH40QoKPUQhW4e2rvM0rwUq0t8IQDOwYSeLK01U90OjzBTme2QqA==

lodash.chunk@^4.2.0:
version "4.2.0"
resolved "https://registry.yarnpkg.com/lodash.chunk/-/lodash.chunk-4.2.0.tgz#66e5ce1f76ed27b4303d8c6512e8d1216e8106bc"
integrity sha1-ZuXOH3btJ7QwPYxlEujRIW6BBrw=

[email protected], lodash.clonedeep@^4.5.0:
version "4.5.0"
resolved "https://registry.yarnpkg.com/lodash.clonedeep/-/lodash.clonedeep-4.5.0.tgz#e23f3f9c4f8fbdde872529c1071857a086e5ccef"
integrity sha1-4j8/nE+Pvd6HJSnBBxhXoIblzO8=

lodash.isequal@^4.5.0:
version "4.5.0"
resolved "https://registry.yarnpkg.com/lodash.isequal/-/lodash.isequal-4.5.0.tgz#415c4478f2bcc30120c22ce10ed3226f7d3e18e0"
integrity sha1-QVxEePK8wwEgwizhDtMib30+GOA=

lodash.omit@^4.5.0:
version "4.5.0"
resolved "https://registry.yarnpkg.com/lodash.omit/-/lodash.omit-4.5.0.tgz#6eb19ae5a1ee1dd9df0b969e66ce0b7fa30b5e60"
integrity sha1-brGa5aHuHdnfC5aeZs4Lf6MLXmA=

lodash.once@^4.1.1:
version "4.1.1"
resolved "https://registry.yarnpkg.com/lodash.once/-/lodash.once-4.1.1.tgz#0dd3971213c7c56df880977d504c88fb471a97ac"
Expand Down
2 changes: 1 addition & 1 deletion docs.d/kafka_dead_letter/USAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
{
// make sure to include the asset bundle
// additionally you can specify the version
// "kafka:2.0.1"
// "kafka:2.0.2"
"assets": [ "kafka" ],
"apis": [
{
Expand Down
2 changes: 1 addition & 1 deletion docs.d/kafka_reader/USAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"slicers": 1,
// make sure to include the asset bundle
// additionally you can specify the version
// "kafka:2.0.1"
// "kafka:2.0.2"
"assets": [ "kafka" ],
// ...
"operations": [
Expand Down
2 changes: 1 addition & 1 deletion docs.d/kafka_sender/USAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
{
// make sure to include the asset bundle
// additionally you can specify the version
// "kafka:2.0.1"
// "kafka:2.0.2"
"assets": [ "kafka" ],
// ...
"operations": [
Expand Down
13 changes: 5 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "kafka-asset-bundle",
"version": "2.0.1",
"version": "2.0.2",
"description": "A bundle of Kafka operations and processors for Teraslice",
"private": true,
"scripts": {
Expand All @@ -21,10 +21,7 @@
"author": "Terascope, LLC <[email protected]>",
"license": "MIT",
"devDependencies": {
"@types/jest": "^23.3.12",
"@types/lodash.chunk": "^4.2.4",
"@types/lodash.isequal": "^4.5.3",
"@types/lodash.omit": "^4.5.4",
"@types/jest": "^23.3.13",
"@types/lodash.once": "^4.1.4",
"@types/node": "^10.12.18",
"@types/uuid": "^3.4.4",
Expand All @@ -35,10 +32,10 @@
"rimraf": "^2.6.3",
"teraslice-test-harness": "^0.5.2",
"ts-jest": "^23.10.4",
"ts-node": "^7.0.1",
"tslint": "^5.12.0",
"ts-node": "^8.0.1",
"tslint": "^5.12.1",
"tslint-config-airbnb": "^5.11.1",
"typescript": "^3.2.2",
"typescript": "^3.2.4",
"uuid": "^3.3.2"
},
"engines": {
Expand Down
6 changes: 3 additions & 3 deletions packages/terafoundation_kafka_connector/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@
"devDependencies": {
"@terascope/job-components": "^0.14.3",
"@types/convict": "^4.2.1",
"@types/jest": "^23.3.12",
"@types/jest": "^23.3.13",
"@types/node": "^10.12.18",
"convict": "^4.4.1",
"jest": "^23.6.0",
"jest-extended": "^0.11.0",
"ts-jest": "^23.10.4",
"tslint": "^5.12.0",
"tslint": "^5.12.1",
"tslint-config-airbnb": "^5.11.1",
"typescript": "^3.2.2"
"typescript": "^3.2.4"
},
"engines": {
"node": ">=8.0.0"
Expand Down
Loading

0 comments on commit 11eabb2

Please sign in to comment.