Skip to content

Commit

Permalink
Concurrency fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
daniyaalk committed Oct 15, 2024
1 parent 85a31b9 commit 43a2f86
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 39 deletions.
18 changes: 1 addition & 17 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,6 @@ ADD pom.xml .

RUN mvn clean verify --fail-never

ARG spring_profiles_active=default
ARG diff_publisher_bootstrap_servers
ARG diff_publisher_topic_name=diff_stream
ARG mongo_url
ARG mongo_username
ARG mongo_password
ARG mongo_dbname

ENV spring_profiles_active=${spring_profiles_active}
ENV diff_publisher_bootstrap-servers=${diff_publisher_bootstrap_servers}
ENV diff_publisher_topic-name=${diff_publisher_topic_name}
ENV MONGO_URL=${mongo_url}
ENV MONGO_USERNAME=${mongo_username}
ENV MONGO_PASSWORD=${mongo_password}
ENV MONGO_DBNAME=${mongo_dbname}

COPY . .
RUN mvn clean package

Expand All @@ -30,6 +14,6 @@ EXPOSE 8000
ENTRYPOINT ["java", "-jar", \
"-Dspring.profiles.active=${spring_profiles_active}", \
"-Ddiff.publisher.bootstrap-servers=${diff_publisher_bootstrap_servers}", \
"-Ddiff.publisher.topic-name=${diff_publisher_topic-name}", \
"-Ddiff.publisher.topic-name=${diff_publisher_topic_name}", \
"target/comparator-0.0.1-SNAPSHOT.jar" \
]
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.degressly.proxy.comparator.engine;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Map;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Differences {

Map<String, String> rawDifferences;
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/com/degressly/proxy/comparator/engine/Utils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.degressly.proxy.comparator.engine;

import java.lang.reflect.Field;

public class Utils {

@SuppressWarnings("unchecked")
public static <T> T mergeObjects(T first, T second) throws IllegalAccessException, InstantiationException {
Class<?> clazz = first.getClass();
Field[] fields = clazz.getDeclaredFields();
Object returnValue = clazz.newInstance();
for (Field field : fields) {
field.setAccessible(true);
Object value1 = field.get(first);
Object value2 = field.get(second);
Object value = (value1 != null) ? value1 : value2;
field.set(returnValue, value);
}
return (T) returnValue;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@

import com.degressly.proxy.comparator.dto.Observation;
import com.degressly.proxy.comparator.engine.Differences;
import com.degressly.proxy.comparator.mongo.Diffs;
import com.degressly.proxy.comparator.engine.Utils;
import com.degressly.proxy.comparator.mongo.TraceDocument;
import com.degressly.proxy.comparator.mongo.TraceDocumentRepository;
import com.degressly.proxy.comparator.service.PersistenceService;
import org.apache.commons.lang3.builder.Diff;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.util.*;
import java.util.stream.IntStream;

@Slf4j
@Service
@ConditionalOnProperty("spring.data.mongodb.uri")
public class MongoPersistenceServiceImpl implements PersistenceService {
Expand All @@ -25,59 +27,81 @@ public class MongoPersistenceServiceImpl implements PersistenceService {
public void save(String traceId, String requestUrl, Observation observation, Differences responseDiffs,
Differences downstreamDiffs) {

// Replacing . with _ in map key due to mongo limitations
requestUrl = requestUrl.replace('.', '_');
synchronized (this) {
// Replacing . with _ in map key due to mongo limitations
requestUrl = requestUrl.replace('.', '_');

TraceDocument document = traceDocumentRepository.findByTraceId(traceId);
TraceDocument document = traceDocumentRepository.findByTraceId(traceId);

if (document == null) {
document = new TraceDocument();
document.setTraceId(traceId);
}
if (document == null) {
document = new TraceDocument();
document.setTraceId(traceId);
}

updateObservationMap(requestUrl, observation, document);
updateObservationMap(requestUrl, observation, document);

if ("RESPONSE".equals(observation.getObservationType())) {
updateResponseDiffMap(requestUrl, responseDiffs, document);
}
if ("RESPONSE".equals(observation.getObservationType())) {
updateResponseDiffMap(requestUrl, responseDiffs, document);
}

if ("REQUEST".equals(observation.getObservationType())) {
updateDownstreamDiffMap(requestUrl, downstreamDiffs, document);
}
if ("REQUEST".equals(observation.getObservationType())) {
updateDownstreamDiffMap(requestUrl, downstreamDiffs, document);
}

traceDocumentRepository.save(document);
log.info("Saving: {}", document);
traceDocumentRepository.save(document);
}

}

private static void updateDownstreamDiffMap(String requestUrl, Differences responseDiffs, TraceDocument document) {
Map<String, Differences> existingDownstreamDifferencesMap = document.getDownstreamDiffs();
Map<String, Differences> newDownstreamDifferencesMap = new HashMap<>();
newDownstreamDifferencesMap.put(requestUrl, responseDiffs);
newDownstreamDifferencesMap.putAll(existingDownstreamDifferencesMap);
newDownstreamDifferencesMap.put(requestUrl, responseDiffs);
document.setDownstreamDiffs(newDownstreamDifferencesMap);
}

private static void updateResponseDiffMap(String requestUrl, Differences responseDiffs, TraceDocument document) {
Map<String, Differences> existingResponseDifferencesMap = document.getResponseDiffs();
Map<String, Differences> newResponseDifferencesMap = new HashMap<>();
newResponseDifferencesMap.put(requestUrl, responseDiffs);
newResponseDifferencesMap.putAll(existingResponseDifferencesMap);
newResponseDifferencesMap.put(requestUrl, responseDiffs);
document.setResponseDiffs(newResponseDifferencesMap);
}

private static void updateObservationMap(String requestUrl, Observation observation, TraceDocument document) {
Map<String, List<Observation>> existingObservationsMap = document.getObservationMap();
Map<String, List<Observation>> newObservationsMap = new HashMap<>();

List<Observation> observationsList = existingObservationsMap.get(requestUrl);
List<Observation> newObservationsList = new ArrayList<>(Arrays.asList(observation));
if (!CollectionUtils.isEmpty(observationsList)) {
newObservationsList.addAll(observationsList);
newObservationsList = combineObservationLists(observationsList, newObservationsList);
}

Map<String, List<Observation>> newObservationsMap = new HashMap<>(existingObservationsMap);
newObservationsMap.put(requestUrl, newObservationsList);
newObservationsMap.putAll(existingObservationsMap);
document.setObservationMap(newObservationsMap);
}

private static List<Observation> combineObservationLists(List<Observation> observationsList,
List<Observation> newObservationsList) {
List<Observation> smallerList = observationsList.size() < newObservationsList.size() ? observationsList
: newObservationsList;
List<Observation> largerList = observationsList.size() >= newObservationsList.size() ? observationsList
: newObservationsList;

IntStream.range(0, smallerList.size()).forEach(i -> {
try {
largerList.set(i, Utils.mergeObjects(smallerList.get(i), largerList.get(i)));
}
catch (Exception e) {
log.error("Error when merging Observations: ", e);
}

});

return largerList;
}

}
3 changes: 3 additions & 0 deletions src/main/resources/application-local.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
spring.data.mongodb.uri=mongodb://degressly_user:secure_password@localhost:27017/degressly
spring.data.mongodb.database=degressly
diff.publisher.group-id=local_diff_reader

0 comments on commit 43a2f86

Please sign in to comment.