Skip to content

Commit

Permalink
NIFI-14108 Removed the instantiation of extraneous Object arrays vara…
Browse files Browse the repository at this point in the history
…rg methods. (#9594)

Signed-off-by: Lucas Ottersbach <[email protected]>
  • Loading branch information
dan-s1 authored Dec 25, 2024
1 parent 7051aca commit 8c2ceac
Show file tree
Hide file tree
Showing 19 changed files with 45 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,6 @@ private void fetchListing(final ProcessContext context, final ProcessSession ses
}

getLogger().info("Obtained file listing in {} milliseconds; listing had {} items, {} of which were new",
new Object[]{millis, listing.size(), newItems});
millis, listing.size(), newItems);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ HdfsResources resetHDFSResources(final List<String> resourceLocations, final Pro

final Path workingDir = fs.getWorkingDirectory();
getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}",
new Object[]{workingDir, fs.getDefaultBlockSize(workingDir), fs.getDefaultReplication(workingDir), config.toString()});
workingDir, fs.getDefaultBlockSize(workingDir), fs.getDefaultReplication(workingDir), config.toString());

return new HdfsResources(config, fs, ugi, kerberosUser);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public static <FCT extends RollbackOnFailure> BiFunction<FCT, ErrorTypes, ErrorT
if (adjusted != null) {
if (logger.isDebugEnabled()) {
logger.debug("Adjusted {} to {} based on context rollbackOnFailure={}, processedCount={}, transactional={}",
new Object[]{t, adjusted, c.isRollbackOnFailure(), c.getProcessedCount(), c.isTransactional()});
t, adjusted, c.isRollbackOnFailure(), c.getProcessedCount(), c.isTransactional());
}
return adjusted;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,20 +377,20 @@ protected void processBatchOfFiles(final List<Path> files, final ProcessContext
// Remove destination file (newFile) to replace
if (hdfs.delete(newFile, false)) {
getLogger().info("deleted {} in order to replace with the contents of {}",
new Object[]{newFile, flowFile});
newFile, flowFile);
}
break;
case IGNORE_RESOLUTION:
session.transfer(flowFile, REL_SUCCESS);
getLogger().info(
"transferring {} to success because file with same name already exists",
new Object[]{flowFile});
flowFile);
return null;
case FAIL_RESOLUTION:
session.transfer(session.penalize(flowFile), REL_FAILURE);
getLogger().warn(
"penalizing {} and routing to failure because file with same name already exists",
new Object[]{flowFile});
flowFile);
return null;
default:
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void process(InputStream in, OutputStream out) throws IOException {
}
});
logger.debug("Wrote Sequence File {} ({}).",
new Object[]{sequenceFilename, watch.calculateDataRate(flowFile.getSize())});
sequenceFilename, watch.calculateDataRate(flowFile.getSize()));
return sfFlowFile;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ public void onTrigger(final ReportingContext context) {

final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
getLogger().info("Successfully sent {} Provenance Events to destination in {} ms; Transaction ID = {}; First Event ID = {}",
new Object[] {events.size(), transferMillis, transactionId, events.get(0).getEventId()});
events.size(), transferMillis, transactionId, events.get(0).getEventId());
} catch (final Exception e) {
if (transaction != null) {
transaction.error();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public void onTrigger(final ReportingContext context) {

final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
getLogger().info("Successfully sent {} Status Records to destination in {} ms; Transaction ID = {}",
new Object[]{jsonArray.size(), transferMillis, transactionId});
jsonArray.size(), transferMillis, transactionId);

fromIndex = toIndex;
toIndex = Math.min(fromIndex + batchSize, jsonArray.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
// fall-through
default:
getLogger().error("Putting data into Splunk was not successful. Response with header {} was: {}",
new Object[] {responseMessage.getStatus(), IOUtils.toString(responseMessage.getContent(), "UTF-8")});
responseMessage.getStatus(), IOUtils.toString(responseMessage.getContent(), "UTF-8"));
}
} catch (final Exception e) {
getLogger().error("Error during communication with Splunk: {}", e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session

if (!sentAt.isPresent() || !ackId.isPresent()) {
getLogger().error("Flow file ({}) attributes {} and {} are expected to be set using 64-bit integer values!",
new Object[]{flowFile.getId(), SplunkAPICall.RESPONDED_AT_ATTRIBUTE, SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE});
flowFile.getId(), SplunkAPICall.RESPONDED_AT_ATTRIBUTE, SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE);
session.transfer(flowFile, RELATIONSHIP_FAILURE);
} else {
undetermined.put(ackId.get(), flowFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
sqlWriter.updateCounters(session);

logger.debug("{} contains {} records; transferring to 'success'",
new Object[]{fileToProcess, nrOfRows.get()});
fileToProcess, nrOfRows.get());

session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
resultSetFlowFiles.add(fileToProcess);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public void process(final InputStream rawIn, final OutputStream rawOut) throws I

session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
logger.info("successfully converted characters from {} to {} for {}",
new Object[]{inputCharset, outputCharset, flowFile});
inputCharset, outputCharset, flowFile);
session.transfer(flowFile, REL_SUCCESS);
} catch (final Exception e) {
throw new ProcessException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
if (logger.isErrorEnabled()) {
logger.error("{} unexpected exception throwing DebugFlow exception: {}",
new Object[] {this, e});
this, e);
}
}
} else {
Expand Down Expand Up @@ -461,8 +461,8 @@ public void process(final OutputStream out) throws IOException {
if (flowFileCurrSuccess < flowFileMaxSuccess) {
flowFileCurrSuccess += 1;
logger.info("DebugFlow transferring to success file={} UUID={}",
new Object[] {ff.getAttribute(CoreAttributes.FILENAME.key()),
ff.getAttribute(CoreAttributes.UUID.key())});
ff.getAttribute(CoreAttributes.FILENAME.key()),
ff.getAttribute(CoreAttributes.UUID.key()));
session.transfer(ff, REL_SUCCESS);
break;
} else {
Expand All @@ -475,8 +475,8 @@ public void process(final OutputStream out) throws IOException {
if (flowFileCurrFailure < flowFileMaxFailure) {
flowFileCurrFailure += 1;
logger.info("DebugFlow transferring to failure file={} UUID={}",
new Object[] {ff.getAttribute(CoreAttributes.FILENAME.key()),
ff.getAttribute(CoreAttributes.UUID.key())});
ff.getAttribute(CoreAttributes.FILENAME.key()),
ff.getAttribute(CoreAttributes.UUID.key()));
session.transfer(ff, REL_FAILURE);
break;
} else {
Expand All @@ -489,8 +489,8 @@ public void process(final OutputStream out) throws IOException {
if (flowFileCurrRollback < flowFileMaxRollback) {
flowFileCurrRollback += 1;
logger.info("DebugFlow rolling back (no penalty) file={} UUID={}",
new Object[] {ff.getAttribute(CoreAttributes.FILENAME.key()),
ff.getAttribute(CoreAttributes.UUID.key())});
ff.getAttribute(CoreAttributes.FILENAME.key()),
ff.getAttribute(CoreAttributes.UUID.key()));
session.rollback();
break;
} else {
Expand All @@ -503,8 +503,8 @@ public void process(final OutputStream out) throws IOException {
if (flowFileCurrYield < flowFileMaxYield) {
flowFileCurrYield += 1;
logger.info("DebugFlow yielding file={} UUID={}",
new Object[] {ff.getAttribute(CoreAttributes.FILENAME.key()),
ff.getAttribute(CoreAttributes.UUID.key())});
ff.getAttribute(CoreAttributes.FILENAME.key()),
ff.getAttribute(CoreAttributes.UUID.key()));
session.rollback();
context.yield();
return;
Expand All @@ -518,8 +518,8 @@ public void process(final OutputStream out) throws IOException {
if (flowFileCurrPenalty < flowFileMaxPenalty) {
flowFileCurrPenalty += 1;
logger.info("DebugFlow rolling back (with penalty) file={} UUID={}",
new Object[] {ff.getAttribute(CoreAttributes.FILENAME.key()),
ff.getAttribute(CoreAttributes.UUID.key())});
ff.getAttribute(CoreAttributes.FILENAME.key()),
ff.getAttribute(CoreAttributes.UUID.key()));
session.rollback(true);
break;
} else {
Expand All @@ -533,16 +533,16 @@ public void process(final OutputStream out) throws IOException {
flowFileCurrException += 1;
String message = "forced by " + this.getClass().getName();
logger.info("DebugFlow throwing NPE file={} UUID={}",
new Object[] {ff.getAttribute(CoreAttributes.FILENAME.key()),
ff.getAttribute(CoreAttributes.UUID.key())});
ff.getAttribute(CoreAttributes.FILENAME.key()),
ff.getAttribute(CoreAttributes.UUID.key()));
RuntimeException rte;
try {
rte = flowFileExceptionClass.getConstructor(String.class).newInstance(message);
throw rte;
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
if (logger.isErrorEnabled()) {
logger.error("{} unexpected exception throwing DebugFlow exception: {}",
new Object[] {this, e});
this, e);
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
Path filePath = file.toPath();
if (!Files.exists(filePath) && !Files.notExists(filePath)) { // see https://docs.oracle.com/javase/tutorial/essential/io/check.html for more details
getLogger().log(levelFileNotFound, "Could not fetch file {} from file system for {} because the existence of the file cannot be verified; routing to failure",
new Object[] {file, flowFile});
file, flowFile);
session.transfer(session.penalize(flowFile), REL_FAILURE);
return;
} else if (!Files.exists(filePath)) {
Expand All @@ -265,7 +265,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final String user = System.getProperty("user.name");
if (!isReadable(file)) {
getLogger().log(levelPermDenied, "Could not fetch file {} from file system for {} due to user {} not having sufficient permissions to read the file; routing to permission.denied",
new Object[] {file, flowFile, user});
file, flowFile, user);
session.getProvenanceReporter().route(flowFile, REL_PERMISSION_DENIED);
session.transfer(session.penalize(flowFile), REL_PERMISSION_DENIED);
return;
Expand All @@ -281,7 +281,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
if (targetDir.exists() && (!isWritable(targetDir) || !isDirectory(targetDir))) {
getLogger().error("Could not fetch file {} from file system for {} because Completion Strategy is configured to move the original file to {}, "
+ "but that is not a directory or user {} does not have permissions to write to that directory",
new Object[] {file, flowFile, targetDir, user});
file, flowFile, targetDir, user);
session.transfer(flowFile, REL_FAILURE);
return;
}
Expand All @@ -305,7 +305,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
if (targetFile.exists()) {
getLogger().error("Could not fetch file {} from file system for {} because Completion Strategy is configured to move the original file to {}, "
+ "but a file with name {} already exists in that directory and the Move Conflict Strategy is configured for failure",
new Object[] {file, flowFile, targetDir, file.getName()});
file, flowFile, targetDir, file.getName());
session.transfer(flowFile, REL_FAILURE);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final HttpServletResponse response = contextMap.getResponse(contextIdentifier);
if (response == null) {
getLogger().error("Failed to respond to HTTP request for {} because FlowFile had an '{}' attribute of {} but could not find an HTTP Response Object for this identifier",
new Object[]{flowFile, HTTPUtils.HTTP_CONTEXT_ID, contextIdentifier});
flowFile, HTTPUtils.HTTP_CONTEXT_ID, contextIdentifier);
session.transfer(flowFile, REL_FAILURE);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,7 @@ public void accept(final DiskOperation operation, final long duration) {
if (duration > maxDiskOperationMillis) {
final String fullPath = getFullPath();
logger.warn("This Processor completed action {} on {} in {} milliseconds, which exceeds the configured threshold of {} milliseconds",
new Object[] {operation, fullPath, duration, maxDiskOperationMillis});
operation, fullPath, duration, maxDiskOperationMillis);
}

if (logger.isTraceEnabled()) {
Expand Down Expand Up @@ -1332,7 +1332,7 @@ private void monitorActiveOperation() {
}

logger.warn("This Processor has currently spent {} milliseconds performing the {} action on {}, which exceeds the configured threshold of {} milliseconds",
new Object[] {activeTime, activeOperation.getOperation(), fullPath, maxDiskOperationMillis});
activeTime, activeOperation.getOperation(), fullPath, maxDiskOperationMillis);
}
}

Expand All @@ -1347,7 +1347,7 @@ private void monitorActiveDirectory() {
if (activeMillis > maxListingMillis) {
final String fullPath = activeDirectory.isEmpty() ? "the base directory" : activeDirectory;
logger.warn("This processor has currently spent {} milliseconds performing the listing of {}, which exceeds the configured threshold of {} milliseconds",
new Object[] {activeMillis, fullPath, maxListingMillis});
activeMillis, fullPath, maxListingMillis);
}
}
}
Expand Down
Loading

0 comments on commit 8c2ceac

Please sign in to comment.