From 43a2f8659f2a4b2a98e01e160fe5ecdd70613e42 Mon Sep 17 00:00:00 2001 From: Daniyaal Khan Date: Tue, 15 Oct 2024 18:57:17 +0530 Subject: [PATCH] Concurrency fixes --- Dockerfile | 18 +---- .../proxy/comparator/engine/Differences.java | 4 ++ .../proxy/comparator/engine/Utils.java | 22 ++++++ .../impl/MongoPersistenceServiceImpl.java | 68 +++++++++++++------ .../resources/application-local.properties | 3 + 5 files changed, 76 insertions(+), 39 deletions(-) create mode 100644 src/main/java/com/degressly/proxy/comparator/engine/Utils.java create mode 100644 src/main/resources/application-local.properties diff --git a/Dockerfile b/Dockerfile index 2441704..76160b9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,22 +6,6 @@ ADD pom.xml . RUN mvn clean verify --fail-never -ARG spring_profiles_active=default -ARG diff_publisher_bootstrap_servers -ARG diff_publisher_topic_name=diff_stream -ARG mongo_url -ARG mongo_username -ARG mongo_password -ARG mongo_dbname - -ENV spring_profiles_active=${spring_profiles_active} -ENV diff_publisher_bootstrap-servers=${diff_publisher_bootstrap_servers} -ENV diff_publisher_topic-name=${diff_publisher_topic_name} -ENV MONGO_URL=${mongo_url} -ENV MONGO_USERNAME=${mongo_username} -ENV MONGO_PASSWORD=${mongo_password} -ENV MONGO_DBNAME=${mongo_dbname} - COPY . . RUN mvn clean package @@ -30,6 +14,6 @@ EXPOSE 8000 ENTRYPOINT ["java", "-jar", \ "-Dspring.profiles.active=${spring_profiles_active}", \ "-Ddiff.publisher.bootstrap-servers=${diff_publisher_bootstrap_servers}", \ - "-Ddiff.publisher.topic-name=${diff_publisher_topic-name}", \ + "-Ddiff.publisher.topic-name=${diff_publisher_topic_name}", \ "target/comparator-0.0.1-SNAPSHOT.jar" \ ] \ No newline at end of file diff --git a/src/main/java/com/degressly/proxy/comparator/engine/Differences.java b/src/main/java/com/degressly/proxy/comparator/engine/Differences.java index 96895c6..8fc38d1 100644 --- a/src/main/java/com/degressly/proxy/comparator/engine/Differences.java +++ b/src/main/java/com/degressly/proxy/comparator/engine/Differences.java @@ -1,12 +1,16 @@ package com.degressly.proxy.comparator.engine; +import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import java.util.Map; @Data @Builder +@NoArgsConstructor +@AllArgsConstructor public class Differences { Map rawDifferences; diff --git a/src/main/java/com/degressly/proxy/comparator/engine/Utils.java b/src/main/java/com/degressly/proxy/comparator/engine/Utils.java new file mode 100644 index 0000000..ffaccfd --- /dev/null +++ b/src/main/java/com/degressly/proxy/comparator/engine/Utils.java @@ -0,0 +1,22 @@ +package com.degressly.proxy.comparator.engine; + +import java.lang.reflect.Field; + +public class Utils { + + @SuppressWarnings("unchecked") + public static T mergeObjects(T first, T second) throws IllegalAccessException, InstantiationException { + Class clazz = first.getClass(); + Field[] fields = clazz.getDeclaredFields(); + Object returnValue = clazz.newInstance(); + for (Field field : fields) { + field.setAccessible(true); + Object value1 = field.get(first); + Object value2 = field.get(second); + Object value = (value1 != null) ? value1 : value2; + field.set(returnValue, value); + } + return (T) returnValue; + } + +} diff --git a/src/main/java/com/degressly/proxy/comparator/service/impl/MongoPersistenceServiceImpl.java b/src/main/java/com/degressly/proxy/comparator/service/impl/MongoPersistenceServiceImpl.java index ac24d4f..3aebe12 100644 --- a/src/main/java/com/degressly/proxy/comparator/service/impl/MongoPersistenceServiceImpl.java +++ b/src/main/java/com/degressly/proxy/comparator/service/impl/MongoPersistenceServiceImpl.java @@ -2,18 +2,20 @@ import com.degressly.proxy.comparator.dto.Observation; import com.degressly.proxy.comparator.engine.Differences; -import com.degressly.proxy.comparator.mongo.Diffs; +import com.degressly.proxy.comparator.engine.Utils; import com.degressly.proxy.comparator.mongo.TraceDocument; import com.degressly.proxy.comparator.mongo.TraceDocumentRepository; import com.degressly.proxy.comparator.service.PersistenceService; -import org.apache.commons.lang3.builder.Diff; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import java.util.*; +import java.util.stream.IntStream; +@Slf4j @Service @ConditionalOnProperty("spring.data.mongodb.uri") public class MongoPersistenceServiceImpl implements PersistenceService { @@ -25,59 +27,81 @@ public class MongoPersistenceServiceImpl implements PersistenceService { public void save(String traceId, String requestUrl, Observation observation, Differences responseDiffs, Differences downstreamDiffs) { - // Replacing . with _ in map key due to mongo limitations - requestUrl = requestUrl.replace('.', '_'); + synchronized (this) { + // Replacing . with _ in map key due to mongo limitations + requestUrl = requestUrl.replace('.', '_'); - TraceDocument document = traceDocumentRepository.findByTraceId(traceId); + TraceDocument document = traceDocumentRepository.findByTraceId(traceId); - if (document == null) { - document = new TraceDocument(); - document.setTraceId(traceId); - } + if (document == null) { + document = new TraceDocument(); + document.setTraceId(traceId); + } - updateObservationMap(requestUrl, observation, document); + updateObservationMap(requestUrl, observation, document); - if ("RESPONSE".equals(observation.getObservationType())) { - updateResponseDiffMap(requestUrl, responseDiffs, document); - } + if ("RESPONSE".equals(observation.getObservationType())) { + updateResponseDiffMap(requestUrl, responseDiffs, document); + } - if ("REQUEST".equals(observation.getObservationType())) { - updateDownstreamDiffMap(requestUrl, downstreamDiffs, document); - } + if ("REQUEST".equals(observation.getObservationType())) { + updateDownstreamDiffMap(requestUrl, downstreamDiffs, document); + } - traceDocumentRepository.save(document); + log.info("Saving: {}", document); + traceDocumentRepository.save(document); + } } private static void updateDownstreamDiffMap(String requestUrl, Differences responseDiffs, TraceDocument document) { Map existingDownstreamDifferencesMap = document.getDownstreamDiffs(); Map newDownstreamDifferencesMap = new HashMap<>(); - newDownstreamDifferencesMap.put(requestUrl, responseDiffs); newDownstreamDifferencesMap.putAll(existingDownstreamDifferencesMap); + newDownstreamDifferencesMap.put(requestUrl, responseDiffs); document.setDownstreamDiffs(newDownstreamDifferencesMap); } private static void updateResponseDiffMap(String requestUrl, Differences responseDiffs, TraceDocument document) { Map existingResponseDifferencesMap = document.getResponseDiffs(); Map newResponseDifferencesMap = new HashMap<>(); - newResponseDifferencesMap.put(requestUrl, responseDiffs); newResponseDifferencesMap.putAll(existingResponseDifferencesMap); + newResponseDifferencesMap.put(requestUrl, responseDiffs); document.setResponseDiffs(newResponseDifferencesMap); } private static void updateObservationMap(String requestUrl, Observation observation, TraceDocument document) { Map> existingObservationsMap = document.getObservationMap(); - Map> newObservationsMap = new HashMap<>(); List observationsList = existingObservationsMap.get(requestUrl); List newObservationsList = new ArrayList<>(Arrays.asList(observation)); if (!CollectionUtils.isEmpty(observationsList)) { - newObservationsList.addAll(observationsList); + newObservationsList = combineObservationLists(observationsList, newObservationsList); } + Map> newObservationsMap = new HashMap<>(existingObservationsMap); newObservationsMap.put(requestUrl, newObservationsList); - newObservationsMap.putAll(existingObservationsMap); document.setObservationMap(newObservationsMap); } + private static List combineObservationLists(List observationsList, + List newObservationsList) { + List smallerList = observationsList.size() < newObservationsList.size() ? observationsList + : newObservationsList; + List largerList = observationsList.size() >= newObservationsList.size() ? observationsList + : newObservationsList; + + IntStream.range(0, smallerList.size()).forEach(i -> { + try { + largerList.set(i, Utils.mergeObjects(smallerList.get(i), largerList.get(i))); + } + catch (Exception e) { + log.error("Error when merging Observations: ", e); + } + + }); + + return largerList; + } + } diff --git a/src/main/resources/application-local.properties b/src/main/resources/application-local.properties new file mode 100644 index 0000000..6e55873 --- /dev/null +++ b/src/main/resources/application-local.properties @@ -0,0 +1,3 @@ +spring.data.mongodb.uri=mongodb://degressly_user:secure_password@localhost:27017/degressly +spring.data.mongodb.database=degressly +diff.publisher.group-id=local_diff_reader \ No newline at end of file