Skip to content

Commit

Permalink
Test fixes
Browse files Browse the repository at this point in the history
More mailbox removal
  • Loading branch information
henricasanova committed Nov 18, 2023
1 parent bb25963 commit 84c478d
Show file tree
Hide file tree
Showing 34 changed files with 186 additions and 205 deletions.
4 changes: 2 additions & 2 deletions include/wrench/failure_causes/NetworkError.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ namespace wrench {
bool whileReceiving();
bool whileSending();
bool isTimeout();
std::string getMailbox();
std::string getCommPortName();

private:
NetworkError::OperationType operation_type;
NetworkError::ErrorType error_type;
std::string commport_name = "";
std::string commport_name;
};


Expand Down
2 changes: 1 addition & 1 deletion include/wrench/managers/job_manager/JobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ namespace wrench {

void processPilotJobFailure(const std::shared_ptr<PilotJob> &job, std::shared_ptr<ComputeService> compute_service, std::shared_ptr<FailureCause> cause);

// Mailbox of the creator of this job manager
// CommPort of the creator of this job manager
S4U_CommPort *creator_commport;

std::vector<std::shared_ptr<CompoundJob>> jobs_to_dispatch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace wrench {

public:
BatschedNetworkListener(std::string hostname, std::shared_ptr<BatchComputeService> batch_service,
S4U_Mailbox *batch_service_commport, std::string sched_port,
S4U_CommPort *batch_service_commport, std::string sched_port,
std::string data_to_send, WRENCH_PROPERTY_COLLECTION_TYPE property_list = {});

private:
Expand All @@ -50,7 +50,7 @@ namespace wrench {
std::shared_ptr<BatchComputeService> batch_service;
S4U_Commport *batch_service_commport;

void sendExecuteMessageToBatchComputeService(simgrid::s4u::Mailbox *answer_commport, std::string execute_job_reply_data);
void sendExecuteMessageToBatchComputeService(S4U_CommPort *answer_commport, std::string execute_job_reply_data);
void sendQueryAnswerMessageToBatchComputeService(double estimated_waiting_time);
void send_receive();
};
Expand Down
10 changes: 5 additions & 5 deletions include/wrench/services/storage/StorageServiceMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ namespace wrench {
public:
StorageServiceFreeSpaceRequestMessage(S4U_CommPort *answer_commport, const std::string &path, double payload);

/** @brief Mailbox to which the answer message should be sent */
/** @brief CommPort to which the answer message should be sent */
S4U_CommPort *answer_commport;
/** @brief The path */
std::string path;
Expand All @@ -69,7 +69,7 @@ namespace wrench {
const std::shared_ptr<FileLocation> &location,
double payload);

/** @brief Mailbox to which the answer message should be sent */
/** @brief CommPort to which the answer message should be sent */
S4U_CommPort *answer_commport;
/** @brief The location to lookup */
std::shared_ptr<FileLocation> location;
Expand Down Expand Up @@ -98,7 +98,7 @@ namespace wrench {
const std::shared_ptr<FileLocation> &location,
double payload);

/** @brief Mailbox to which the answer message should be sent */
/** @brief CommPort to which the answer message should be sent */
S4U_CommPort *answer_commport;
/** @brief The location to delete */
std::shared_ptr<FileLocation> location;
Expand Down Expand Up @@ -135,7 +135,7 @@ namespace wrench {
std::shared_ptr<FileLocation> dst,
double payload);

/** @brief Mailbox to which the answer message should be sent */
/** @brief CommPort to which the answer message should be sent */
S4U_CommPort *answer_commport;
/** @brief The source location */
std::shared_ptr<FileLocation> src;
Expand Down Expand Up @@ -175,7 +175,7 @@ namespace wrench {
double num_bytes_to_write,
double payload);

/** @brief Mailbox to which the answer message should be sent */
/** @brief CommPort to which the answer message should be sent */
S4U_CommPort *answer_commport;
/** @brief The requesting host */
simgrid::s4u::Host *requesting_host;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace wrench {
public:
CompoundStorageAllocationRequestMessage(S4U_CommPort *answer_commport, std::shared_ptr<DataFile> file, double payload);

/** @brief Mailbox to which the answer message should be sent */
/** @brief CommPort to which the answer message should be sent */
S4U_CommPort *answer_commport;
/** @brief The path */
std::shared_ptr<DataFile> file;
Expand All @@ -61,7 +61,7 @@ namespace wrench {
public:
CompoundStorageLookupRequestMessage(S4U_CommPort *answer_commport, std::shared_ptr<DataFile> file, double payload);

/** @brief Mailbox to which the answer message should be sent */
/** @brief CommPort to which the answer message should be sent */
S4U_CommPort *answer_commport;
/** @brief The path */
std::shared_ptr<DataFile> file;
Expand Down
6 changes: 3 additions & 3 deletions include/wrench/services/storage/xrootd/XRootDMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ namespace wrench {
std::shared_ptr<bool> answered,
int timeToLive);
ContinueSearchMessage(ContinueSearchMessage *toCopy);
/** @brief Mailbox to which the FINAL answer message should be sent */
/** @brief CommPort to which the FINAL answer message should be sent */
S4U_CommPort *answer_commport;

/** @brief The original file read request that kicked off the search (if null this was a lookup request)*/
Expand All @@ -78,7 +78,7 @@ namespace wrench {
double payload, std::shared_ptr<bool> answered);
UpdateCacheMessage(UpdateCacheMessage &other);
UpdateCacheMessage(UpdateCacheMessage *other);
/** @brief Mailbox to which the FINAL answer message should be sent */
/** @brief CommPort to which the FINAL answer message should be sent */
S4U_CommPort *answer_commport;
/** @brief The original file read request that kicked off the search (if null this was a lookup request)*/
std::shared_ptr<StorageServiceFileReadRequestMessage> original;
Expand All @@ -102,7 +102,7 @@ namespace wrench {
bool fileReadRequest,
std::shared_ptr<bool> answered);

/** @brief Mailbox to which the FINAL answer message should be sent */
/** @brief CommPort to which the FINAL answer message should be sent */
S4U_CommPort *answer_commport;
/** @brief The file being searched for */
std::shared_ptr<DataFile> file;
Expand Down
8 changes: 4 additions & 4 deletions include/wrench/simgrid_S4U_util/S4U_PendingCommunication.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ namespace wrench {
/**
* @brief Constructor
*
* @param mailbox: the S4U mailbox
* @param commport: the CommPort
* @param operation_type: the operation type
*/
S4U_PendingCommunication(simgrid::s4u::Mailbox *mailbox, OperationType operation_type) : s4u_mb(mailbox), operation_type(operation_type) {}
S4U_PendingCommunication(S4U_CommPort *commport, OperationType operation_type) : commport(commport), operation_type(operation_type) {}

std::unique_ptr<SimulationMessage> wait();
std::unique_ptr<SimulationMessage> wait(double timeout);
Expand All @@ -62,8 +62,8 @@ namespace wrench {

/** @brief The message */
std::unique_ptr<SimulationMessage> simulation_message;
/** @brief The S4U mailbox */
simgrid::s4u::Mailbox *s4u_mb;
/** @brief The CommPort */
S4U_CommPort *commport;
/** @brief The operation type */
OperationType operation_type;
};
Expand Down
10 changes: 5 additions & 5 deletions include/wrench/util/MessageManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ namespace wrench {
* a message was sent but never received). That is, if messages are in flight when the receiver daemon fails,
* than, because the receiver is the one freeing memory, we have memory leaks.
* This takes extra time however, and many simulations never simulate failures anyway, so it's use is only optional
* at compile time. Perhaps woudl be a good idea to make its usage optional at runtime?
* at compile time. Perhaps would be a good idea to make its usage optional at runtime?
*/

class MessageManager {

static std::unordered_map<std::string, std::unordered_set<SimulationMessage *>> mailbox_messages;
static std::unordered_map<const S4U_CommPort *, std::unordered_set<SimulationMessage *>> messages;

public:
static void manageMessage(const std::string &commport_name, SimulationMessage *msg);
static void cleanUpMessages(const std::string &commport_name);
static void removeReceivedMessage(const std::string &commport_name, SimulationMessage *msg);
static void manageMessage(const S4U_CommPort *commport, SimulationMessage *msg);
static void cleanUpMessages(const S4U_CommPort *commport);
static void removeReceivedMessage(const S4U_CommPort *commport, SimulationMessage *msg);
static void cleanUpAllMessages();
static void print();
};
Expand Down
4 changes: 2 additions & 2 deletions src/wrench/failure_causes/NetworkError.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ namespace wrench {
}

/**
* @brief Returns the commport_name name on which the error occurred
* @brief Returns the name of the CommPort on which the error occurred
* @return the commport_name name
*/
std::string NetworkError::getMailbox() {
std::string NetworkError::getCommPortName() {
return this->commport_name;
}

Expand Down
8 changes: 4 additions & 4 deletions src/wrench/services/compute/batch/BatschedNetworkListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ namespace wrench {
* @param property_list: property list ({} means "use all defaults")
*/
BatschedNetworkListener::BatschedNetworkListener(std::string hostname, std::shared_ptr<BatchComputeService> batch_service,
simgrid::s4u::Mailbox *batch_service_commport,
S4U_CommPort *batch_service_commport,
std::string sched_port,
std::string data_to_send,
WRENCH_PROPERTY_COLLECTION_TYPE property_list) : BatschedNetworkListener(hostname, batch_service, batch_service_commport,
Expand All @@ -65,7 +65,7 @@ namespace wrench {
* @param suffix the suffix to append
*/
BatschedNetworkListener::BatschedNetworkListener(
std::string hostname, std::shared_ptr<BatchComputeService> batch_service, simgrid::s4u::Mailbox *batch_service_commport,
std::string hostname, std::shared_ptr<BatchComputeService> batch_service, S4U_CommPort *batch_service_commport,
std::string sched_port,
std::string data_to_send, WRENCH_PROPERTY_COLLECTION_TYPE property_list,
std::string suffix = "") : Service(hostname, "batch_network_listener" + suffix) {
Expand Down Expand Up @@ -102,7 +102,7 @@ namespace wrench {
* @param answer_commport: commport_name on which ack will be received
* @param execute_job_reply_data: message to send
*/
void BatschedNetworkListener::sendExecuteMessageToBatchComputeService(simgrid::s4u::Mailbox *answer_commport,
void BatschedNetworkListener::sendExecuteMessageToBatchComputeService(S4U_CommPort *answer_commport,
std::string execute_job_reply_data) {
S4U_CommPort::putMessage(this->batch_service_commport,
new BatchExecuteJobFromBatSchedMessage(answer_commport, execute_job_reply_data, 0));
Expand Down Expand Up @@ -157,7 +157,7 @@ namespace wrench {
reply_decisions = nlohmann::json::parse(reply_data);
decision_events = reply_decisions["events"];

auto answer_commport = S4U_Daemon::getRunningActorRecvMailbox();
auto answer_commport = S4U_Daemon::getRunningActorRecvCommPort();
for (auto decisions: decision_events) {
std::string decision_type = decisions["type"];
double decision_timestamp = decisions["timestamp"];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ namespace wrench {
* @return A vector of shared_ptr on a FileLocation if the DataFile is known to the CompoundStorageService or empty vector if it's not.
*/
std::vector<std::shared_ptr<FileLocation>> CompoundStorageService::lookupFileLocation(const std::shared_ptr<DataFile> &file, S4U_CommPort *answer_commport) {
WRENCH_INFO("CSS::lookupFileLocation() - DataFile + Mailbox");
WRENCH_INFO("CSS::lookupFileLocation() - DataFile + CommPort");

this->commport->putMessage(
new CompoundStorageLookupRequestMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ namespace wrench {

// Generate a commport_name name on which to receive the file
auto file_reception_commport = S4U_CommPort::getTemporaryCommPort();
// auto file_reception_commport = S4U_CommPort::generateUniqueMailbox("faa_does_not_work");

// Reply with a "go ahead, send me the file" message
answer_commport->dputMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,14 +519,6 @@ namespace wrench {

// Send a message to the source
auto request_answer_commport = S4U_Daemon::getRunningActorRecvCommPort();
// auto commport_that_should_receive_file_content = S4U_CommPort::generateUniqueMailbox("works_by_itself");

// S4U_CommPort *commport_that_should_receive_file_content;
// if (src_loc->getStorageService()->buffer_size > DBL_EPSILON) {
// commport_that_should_receive_file_content = S4U_CommPort::getTemporaryCommPort();
// } else {
// commport_that_should_receive_file_content = nullptr;
// }

src_loc->getStorageService()->commport->putMessage(
new StorageServiceFileReadRequestMessage(
Expand Down
2 changes: 1 addition & 1 deletion src/wrench/simgrid_S4U_util/S4U_Daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ namespace wrench {
if ((S4U_Daemon::num_non_daemonized_actors_running == 0) or (not this->isSetToAutoRestart())) {
// Service::increaseNumCompletedServicesCount();
#ifdef MESSAGE_MANAGER
MessageManager::cleanUpMessages(this->commport_name);
MessageManager::cleanUpMessages(this->commport);
#endif
this->deleteLifeSaver();
}
Expand Down
21 changes: 12 additions & 9 deletions src/wrench/simgrid_S4U_util/S4U_PendingCommunication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,15 @@ namespace wrench {
} catch (simgrid::NetworkFailureException &e) {
if (this->operation_type == S4U_PendingCommunication::OperationType::SENDING) {
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::OperationType::SENDING, NetworkError::FAILURE, s4u_mb->get_name()));
NetworkError::OperationType::SENDING, NetworkError::FAILURE, this->commport->s4u_mb->get_name()));
} else {
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::OperationType::RECEIVING, NetworkError::FAILURE, s4u_mb->get_name()));
NetworkError::OperationType::RECEIVING, NetworkError::FAILURE, this->commport->s4u_mb->get_name()));
}
}
#ifdef MESSAGE_MANAGER
MessageManager::removeReceivedMessage(this->commport, this->simulation_message.get());
#endif
return std::move(this->simulation_message);
}

Expand All @@ -68,20 +71,23 @@ namespace wrench {
} catch (simgrid::NetworkFailureException &e) {
if (this->operation_type == S4U_PendingCommunication::OperationType::SENDING) {
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::OperationType::SENDING, NetworkError::FAILURE, s4u_mb->get_name()));
NetworkError::OperationType::SENDING, NetworkError::FAILURE, this->commport->s4u_mb->get_name()));
} else {
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::OperationType::RECEIVING, NetworkError::FAILURE, s4u_mb->get_name()));
NetworkError::OperationType::RECEIVING, NetworkError::FAILURE, this->commport->s4u_mb->get_name()));
}
} catch (simgrid::TimeoutException &e) {
if (this->operation_type == S4U_PendingCommunication::OperationType::SENDING) {
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::OperationType::SENDING, NetworkError::TIMEOUT, s4u_mb->get_name()));
NetworkError::OperationType::SENDING, NetworkError::TIMEOUT, this->commport->s4u_mb->get_name()));
} else {
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::OperationType::RECEIVING, NetworkError::TIMEOUT, s4u_mb->get_name()));
NetworkError::OperationType::RECEIVING, NetworkError::TIMEOUT, this->commport->s4u_mb->get_name()));
}
}
#ifdef MESSAGE_MANAGER
MessageManager::removeReceivedMessage(this->commport, this->simulation_message.get());
#endif
return std::move(this->simulation_message);
}

Expand Down Expand Up @@ -139,9 +145,6 @@ namespace wrench {
simgrid::s4u::ActivityPtr finished_activity = nullptr;
try {
finished_activity = pending_activities.wait_any_for(timeout);
#ifdef MESSAGE_MANAGER
MessageManager::removeReceivedMessage(pending_comms[index]->mailbox_name, pending_comms[index]->simulation_message.get());
#endif
} catch (simgrid::Exception &e) {
auto failed_activity = pending_activities.get_failed_activity();
for (unsigned long idx = 0; idx < pending_comms.size(); idx++ ) {
Expand Down
Loading

0 comments on commit 84c478d

Please sign in to comment.