Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16339: [Docs] Add migrating from transform to process #18314

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from

Conversation

fonsdant
Copy link
Contributor

No description provided.

@github-actions github-actions bot added triage PRs from the community docs labels Dec 24, 2024
@mjsax mjsax added streams and removed triage PRs from the community labels Dec 24, 2024
Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Made a first pass. The comments apply to all examples.

We should also delete the removed methods from the table in the "PAPI integration" section: https://kafka.apache.org/39/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration


// After
KStream<String, String> processedStream = stream.process(
() -> new CustomProcessor(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems switching from KStream#transform() to KStream#process() is the easy part. I guess the more interesting question is, how to rewrite CustomTransformer to CustomProcessor() ?


// After
KStream<String, String> processedStream = stream.process(
() -> new Processor<String, String>() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New new api.Processor has 4 generic, key/value in/out.

KStream<String, String> processedStream = stream.process(
() -> new Processor<String, String>() {
@Override
public void init(ProcessorContext context) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new api.ProcessorContext has <KOut, VOut> generic types.

() -> new Processor&lt;String, String>() {
@Override
public void init(ProcessorContext context) {
// Initialization logic here
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to just fill in the code? We need to store a reference to context

}

@Override
public void process(String key, String value) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void process(String key, String value) {
public void process(Record<String, String> record) {


@Override
public void process(String key, String value) {
Iterable&lt;KeyValue&lt;String, String>> results = customFlatTransformation(key, value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be difficult for user to reason what customFlatTransformation is?

public void process(String key, String value) {
Iterable&lt;KeyValue&lt;String, String>> results = customFlatTransformation(key, value);
for (KeyValue&lt;String, String> result : results) {
context.forward(result.key, result.value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs an update

}

@Override
public void close() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be omitted as it's empty.

<ul>
<li><strong>Unified API:</strong> Consolidates multiple methods into a single, versatile API.</li>
<li><strong>Flexibility:</strong> Simplifies the implementation of custom processing logic.</li>
<li><strong>Future-Proof:</strong> Ensures compatibility with the latest Kafka Streams releases.</li>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add "Improved type-safty" that the "new PAPI" offers.

@fonsdant
Copy link
Contributor Author

fonsdant commented Dec 28, 2024

@mjsax, may I format the code? Some blocks are not indented properly. I understand it might pollute the diff in code changes, so if I may, I would do it as a final commit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants