From 11ef5aa6fe5f6930fd116f0b31594d9c87b62f0f Mon Sep 17 00:00:00 2001 From: Pascal Christoph Date: Thu, 30 Dec 2021 15:36:02 +0100 Subject: [PATCH] Call LineRecorder receiver's closeStream() (#433) Under some circumstances (e.g. if the JVM is not stopped after processing) the streams of receivers connected to LineRecorder are not closed, resulting in empty or missing output data. By switch from "implements ObjectPipe" to "extends DefaultObjectPipe" the "DefaultSender" is used which implements the Sender interface. This ensures the proper closing of streams. - complement test --- .../org/metafacture/strings/LineRecorder.java | 34 +++++-------------- .../metafacture/strings/LineRecorderTest.java | 1 + 2 files changed, 9 insertions(+), 26 deletions(-) diff --git a/metafacture-strings/src/main/java/org/metafacture/strings/LineRecorder.java b/metafacture-strings/src/main/java/org/metafacture/strings/LineRecorder.java index 97101dfa4..936a831b9 100644 --- a/metafacture-strings/src/main/java/org/metafacture/strings/LineRecorder.java +++ b/metafacture-strings/src/main/java/org/metafacture/strings/LineRecorder.java @@ -16,11 +16,11 @@ package org.metafacture.strings; import org.metafacture.framework.FluxCommand; -import org.metafacture.framework.ObjectPipe; import org.metafacture.framework.ObjectReceiver; import org.metafacture.framework.annotations.Description; import org.metafacture.framework.annotations.In; import org.metafacture.framework.annotations.Out; +import org.metafacture.framework.helpers.DefaultObjectPipe; /** * Collects strings and emits them as records when a line matches the pattern. @@ -34,7 +34,7 @@ @In(String.class) @Out(String.class) @FluxCommand("lines-to-records") -public final class LineRecorder implements ObjectPipe> { +public final class LineRecorder extends DefaultObjectPipe> { private static final int SB_CAPACITY = 4096 * 7; // empty line is the default @@ -70,34 +70,16 @@ record = new StringBuilder(SB_CAPACITY); } } - private boolean isClosed() { - return isClosed; - } - @Override - public void resetStream() { - record = new StringBuilder(SB_CAPACITY); - } - - @Override - public void closeStream() { - getReceiver().process(record.toString()); - isClosed = true; + protected void onCloseStream() { + if (record.length() > 0) { + getReceiver().process(record.toString()); + } } @Override - public > R setReceiver(final R newReceiver) { - receiver = newReceiver; - return newReceiver; - } - - /** - * Returns a reference to the downstream module. - * - * @return reference to the downstream module - */ - protected ObjectReceiver getReceiver() { - return receiver; + public void onResetStream() { + record = new StringBuilder(SB_CAPACITY); } } diff --git a/metafacture-strings/src/test/java/org/metafacture/strings/LineRecorderTest.java b/metafacture-strings/src/test/java/org/metafacture/strings/LineRecorderTest.java index 805854091..f77276a9f 100644 --- a/metafacture-strings/src/test/java/org/metafacture/strings/LineRecorderTest.java +++ b/metafacture-strings/src/test/java/org/metafacture/strings/LineRecorderTest.java @@ -111,6 +111,7 @@ public void shouldEmitLastRecordWithoutRecordMarkerWhenClosingStream() { LINE_SEPARATOR + RECORD3_PART2 + LINE_SEPARATOR); + ordered.verify(receiver).closeStream(); ordered.verifyNoMoreInteractions(); }