Skip to content

Commit

Permalink
Fix asset to work with Teraslice versions lower than v1.4.0 (#791)
Browse files Browse the repository at this point in the history
This PR makes the following changes:

- Introduces backwards compatibility for teraslice versions before
`v1.4.0`
- This fixes an issue where using a teraslice version older than version
`v1.4.0` would throw `TypeError: this.promMetrics.addGauge is not a
function`
- Bump **kafka-assets** to `v4.4.1`
- Updates the following dependencies:
  - **@terascope/job-components** from `v0.74.2` to `v0.75.0`

ref: terascope/teraslice#3625
  • Loading branch information
sotojn authored May 24, 2024
1 parent 73dbaf2 commit 9bd1da1
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 56 deletions.
2 changes: 1 addition & 1 deletion asset/asset.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "kafka",
"description": "Kafka reader and writer support.",
"version": "4.4.0"
"version": "4.4.1"
}
4 changes: 2 additions & 2 deletions asset/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "kafka-assets",
"displayName": "Asset",
"version": "4.4.0",
"version": "4.4.1",
"private": true,
"description": "Teraslice asset for kafka operations",
"license": "MIT",
Expand All @@ -21,7 +21,7 @@
"test": "yarn --cwd ../ test"
},
"dependencies": {
"@terascope/job-components": "^0.74.2",
"@terascope/job-components": "^0.75.0",
"@terascope/types": "^0.17.2"
},
"devDependencies": {},
Expand Down
52 changes: 27 additions & 25 deletions asset/src/kafka_reader/fetcher.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Fetcher, DataEntity } from '@terascope/job-components';
import { Fetcher, DataEntity, isPromAvailable } from '@terascope/job-components';
import { KafkaReaderAPIConfig, KafkaReaderAPI, DEFAULT_API_NAME } from '../kafka_reader_api/interfaces';
import { KafkaReaderConfig } from './interfaces';
import { APIConsumer } from '../_kafka_clients';
Expand Down Expand Up @@ -30,30 +30,32 @@ export default class KafkaFetcher extends Fetcher<KafkaReaderConfig> {
this.consumer = consumer;

const { context, opConfig } = this;
await this.context.apis.foundation.promMetrics.addGauge(
'kafka_partitions',
'Number of partitions the kafka consumer is consuming from',
['op_name'],
async function collect() {
const partitionCount = await consumer.getPartitionCount(topic);
const labels = {
op_name: opConfig._op,
...context.apis.foundation.promMetrics.getDefaultLabels()
};
this.set(labels, partitionCount);
});
await this.context.apis.foundation.promMetrics.addGauge(
'kafka_bytes_consumed',
'Number of bytes the kafka consumer has consumed',
['op_name'],
async function collect() {
const bytesConsumed = await consumer.getBytesConsumed();
const labels = {
op_name: opConfig._op,
...context.apis.foundation.promMetrics.getDefaultLabels()
};
this.set(labels, bytesConsumed);
});
if (isPromAvailable(context)) {
await this.context.apis.foundation.promMetrics.addGauge(
'kafka_partitions',
'Number of partitions the kafka consumer is consuming from',
['op_name'],
async function collect() {
const partitionCount = await consumer.getPartitionCount(topic);
const labels = {
op_name: opConfig._op,
...context.apis.foundation.promMetrics.getDefaultLabels()
};
this.set(labels, partitionCount);
});
await this.context.apis.foundation.promMetrics.addGauge(
'kafka_bytes_consumed',
'Number of bytes the kafka consumer has consumed',
['op_name'],
async function collect() {
const bytesConsumed = await consumer.getBytesConsumed();
const labels = {
op_name: opConfig._op,
...context.apis.foundation.promMetrics.getDefaultLabels()
};
this.set(labels, bytesConsumed);
});
}
}

async fetch(): Promise<DataEntity[]> {
Expand Down
2 changes: 1 addition & 1 deletion asset/src/kafka_sender_api/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export default class KafkaSenderApi extends APIFactory<KafkaRouteSender, KafkaSe
const client = new KafkaRouteSender(
kafkaClient,
validConfig,
this.context.apis.foundation.promMetrics
this.context
);

await client.initialize();
Expand Down
39 changes: 21 additions & 18 deletions asset/src/kafka_sender_api/sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import {
Logger,
toString,
TSError,
isPromAvailable,
Context
} from '@terascope/job-components';
import { Terafoundation as tf } from '@terascope/types';
import * as kafka from 'node-rdkafka';
import { KafkaSenderAPIConfig } from './interfaces';
import { ProducerClient, ProduceMessage } from '../_kafka_clients';
Expand All @@ -17,15 +18,15 @@ type FN = (input: any) => any;
export default class KafkaSender implements RouteSenderAPI {
logger: Logger;
producer: ProducerClient;
promMetrics: tf.PromMetrics;
context: Context;
readonly hasConnected = false;
readonly config: KafkaSenderAPIConfig = {};
readonly isWildcard: boolean;
private tryFn: (msg: any, err: any) => DataEntity|null;
readonly pathList = new Map<string, boolean>();
readonly mapper: (msg: DataEntity) => ProduceMessage;

constructor(client: kafka.Producer, config: KafkaSenderAPIConfig, promMetrics: tf.PromMetrics) {
constructor(client: kafka.Producer, config: KafkaSenderAPIConfig, context: Context) {
const producer = new ProducerClient(client, {
logger: config.logger,
topic: config.topicOverride || config.topic,
Expand All @@ -38,7 +39,7 @@ export default class KafkaSender implements RouteSenderAPI {
this.tryFn = config.tryFn || this.tryCatch;
this.mapper = this.mapFn.bind(this);
this.logger = config.logger;
this.promMetrics = promMetrics;
this.context = context;
}

private tryCatch(fn: FN) {
Expand All @@ -54,20 +55,22 @@ export default class KafkaSender implements RouteSenderAPI {
}

async initialize(): Promise<void> {
const { promMetrics, producer, config } = this;
this.promMetrics.addGauge(
'kafka_bytes_produced',
'Number of bytes the kafka producer has produced',
['op_name'],
async function collect() {
const bytesProduced = await producer.getBytesProduced();
const labels = {
op_name: config._op,
...promMetrics.getDefaultLabels()
};
this.set(labels, bytesProduced);
}
);
const { context, producer, config } = this;
if (isPromAvailable(context)) {
context.apis.foundation.promMetrics.addGauge(
'kafka_bytes_produced',
'Number of bytes the kafka producer has produced',
['op_name'],
async function collect() {
const bytesProduced = await producer.getBytesProduced();
const labels = {
op_name: config._op,
...context.apis.foundation.promMetrics.getDefaultLabels()
};
this.set(labels, bytesProduced);
}
);
}
await this.producer.connect();
}

Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "kafka-asset-bundle",
"displayName": "Kafka Asset Bundle",
"version": "4.4.0",
"version": "4.4.1",
"private": true,
"description": "A bundle of Kafka operations and processors for Teraslice",
"repository": "[email protected]:terascope/kafka-assets.git",
Expand Down Expand Up @@ -33,7 +33,7 @@
},
"devDependencies": {
"@terascope/eslint-config": "^0.8.0",
"@terascope/job-components": "^0.74.2",
"@terascope/job-components": "^0.75.0",
"@terascope/scripts": "^0.77.2",
"@types/jest": "^29.5.12",
"@types/node": "^18.14.2",
Expand Down
2 changes: 1 addition & 1 deletion packages/terafoundation_kafka_connector/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"node-rdkafka": "^3.0.1"
},
"devDependencies": {
"@terascope/job-components": "^0.74.2",
"@terascope/job-components": "^0.75.0",
"@types/convict": "^6.1.3",
"convict": "^6.2.4"
},
Expand Down
12 changes: 6 additions & 6 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -750,12 +750,12 @@
progress "^2.0.3"
yargs "^17.2.1"

"@terascope/job-components@^0.74.2":
version "0.74.2"
resolved "https://registry.yarnpkg.com/@terascope/job-components/-/job-components-0.74.2.tgz#de875f792644abf3324f74d98e6b7daa046d7fff"
integrity sha512-FEliIWkcK43Q+kiONkwutXgV/f9qQMGUjesmJq9IyqYswOd7ZHtlXCjBUziKqdUWZ01OZ3YDzFxeuufJoLflUg==
"@terascope/job-components@^0.75.0":
version "0.75.0"
resolved "https://registry.yarnpkg.com/@terascope/job-components/-/job-components-0.75.0.tgz#03739729d7d7aee722b6291418987084cf65aaa9"
integrity sha512-QbIAKfbjX5Wj8A/hZvjE0E8y9hvW9QecQIw6eMwTRneudx2OPQcissCCYNlxBpfP13dK/U85BIqOVFZRTh8KcA==
dependencies:
"@terascope/utils" "^0.59.1"
"@terascope/utils" "^0.59.2"
convict "^6.2.4"
convict-format-with-moment "^6.2.0"
convict-format-with-validator "^6.2.0"
Expand Down Expand Up @@ -806,7 +806,7 @@
dependencies:
prom-client "^15.1.2"

"@terascope/utils@^0.59.1", "@terascope/utils@^0.59.2":
"@terascope/utils@^0.59.2":
version "0.59.2"
resolved "https://registry.yarnpkg.com/@terascope/utils/-/utils-0.59.2.tgz#0c987c2d307a19abe7e41036c162bc382df741a6"
integrity sha512-CYEfVt7PkkKFdpnnMf0sXfd6dYQdK3SsrKkCuupYvk3nPyP5q1eRzyzX4IS1/KrG3yVsqTKNHLrx8gR/UHUPOw==
Expand Down

0 comments on commit 9bd1da1

Please sign in to comment.