Skip to content

Commit

Permalink
add lambda
Browse files Browse the repository at this point in the history
  • Loading branch information
wanyuzha committed Oct 4, 2023
1 parent 5e3293b commit d95e161
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 138 deletions.
21 changes: 1 addition & 20 deletions tools/wrench/wrench-daemon/include/SimulationController.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,26 +131,7 @@ namespace wrench {
BlockingQueue<wrench::FileRegistryService *> file_service_to_start;
BlockingQueue<std::tuple<std::shared_ptr<StandardJob>, std::shared_ptr<ComputeService>, std::map<std::string, std::string>>> submissions_to_do;

BlockingQueue<std::pair<std::tuple<unsigned long, double, WRENCH_PROPERTY_COLLECTION_TYPE, WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE>, std::shared_ptr<ComputeService>>> vm_to_create;
BlockingQueue<std::pair<bool, std::string>> vm_created;

BlockingQueue<std::pair<std::string, std::shared_ptr<ComputeService>>> vm_to_start;
BlockingQueue<std::pair<bool, std::string>> vm_started;

BlockingQueue<std::pair<std::string, std::shared_ptr<ComputeService>>> vm_to_shutdown;
BlockingQueue<std::pair<bool, std::string>> vm_shutdown;

BlockingQueue<std::pair<std::string, std::shared_ptr<ComputeService>>> vm_to_destroy;
BlockingQueue<std::pair<bool, std::string>> vm_destroyed;

BlockingQueue<std::pair<std::shared_ptr<DataFile>, std::shared_ptr<StorageService>>> file_to_lookup;
BlockingQueue<std::tuple<bool, bool, std::string>> file_looked_up;

BlockingQueue<std::pair<std::string, std::shared_ptr<ComputeService>>> vm_to_suspend;
BlockingQueue<std::pair<bool, std::string>> vm_suspended;

BlockingQueue<std::pair<std::string, std::shared_ptr<ComputeService>>> vm_to_resume;
BlockingQueue<std::pair<bool, std::string>> vm_resumed;
BlockingQueue<std::function<void()>> things_to_do;

// The two managers
std::shared_ptr<JobManager> job_manager;
Expand Down
226 changes: 108 additions & 118 deletions tools/wrench/wrench-daemon/src/SimulationController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ namespace wrench {
std::pair<std::tuple<unsigned long, double, WRENCH_PROPERTY_COLLECTION_TYPE, WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE>, std::shared_ptr<ComputeService>> spec_vm_to_create;
std::pair<std::string, std::shared_ptr<ComputeService>> vm_id;
std::pair<std::shared_ptr<DataFile>, std::shared_ptr<StorageService>> file_lookup_request;
std::function<void()> thing_to_do;

if (this->compute_services_to_start.tryPop(new_compute_service)) {
// starting compute service
Expand All @@ -78,103 +79,11 @@ namespace wrench {
// Add the new service to the registry of existing services, so that later we can look it up by name
this->file_service_registry.insert(new_service_shared_ptr->getName(), new_service_shared_ptr);

} else if (this->vm_to_create.tryPop(spec_vm_to_create)) {
auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(spec_vm_to_create.second);
auto num_cores = std::get<0>(spec_vm_to_create.first);
auto ram_size = std::get<1>(spec_vm_to_create.first);
auto properties = std::get<2>(spec_vm_to_create.first);
auto payloads = std::get<3>(spec_vm_to_create.first);
std::string vm_name;
try {
vm_name = cloud_cs->createVM(num_cores, ram_size, properties, payloads);
this->vm_created.push(std::pair(true, vm_name));
} catch (ExecutionException &e) {
this->vm_created.push(std::pair(false, e.getCause()->toString()));
}

} else if (this->vm_to_start.tryPop(vm_id)) {
auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(vm_id.second);
auto vm_name = vm_id.first;
try {
if (not cloud_cs->isVMDown(vm_name)) {
throw std::invalid_argument("Cannot start VM because it's not down");
}
auto bm_cs = cloud_cs->startVM(vm_name);
this->compute_service_registry.insert(bm_cs->getName(), bm_cs);
this->vm_started.push(std::pair(true, bm_cs->getName()));
} catch (ExecutionException &e) {
this->vm_created.push(std::pair(false, e.getCause()->toString()));
} catch (std::invalid_argument &e) {
this->vm_started.push(std::pair(false, e.what()));
}

} else if (this->vm_to_shutdown.tryPop(vm_id)) {

auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(vm_id.second);
auto vm_name = vm_id.first;
try {
if (not cloud_cs->isVMRunning(vm_name)) {
throw std::invalid_argument("Cannot shutdown VM because it's not running");
}
auto bm_cs = cloud_cs->getVMComputeService(vm_name);

this->compute_service_registry.remove(bm_cs->getName());
cloud_cs->shutdownVM(vm_name);
this->vm_shutdown.push(std::pair(true, vm_name));
} catch (ExecutionException &e) {
this->vm_shutdown.push(std::pair(false, e.what()));
} catch (std::invalid_argument &e) {
this->vm_shutdown.push(std::pair(false, e.what()));
}

} else if (this->vm_to_destroy.tryPop(vm_id)) {

auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(vm_id.second);
auto vm_name = vm_id.first;
try {
if (not cloud_cs->isVMDown(vm_name)) {
throw std::invalid_argument("Cannot destroy VM because it's not down");
}
cloud_cs->destroyVM(vm_name);
this->vm_destroyed.push(std::pair(true, vm_name));
} catch (std::invalid_argument &e) {
this->vm_destroyed.push(std::pair(false, e.what()));
}

} else if (this->file_to_lookup.tryPop(file_lookup_request)) {

auto file = file_lookup_request.first;
auto ss = file_lookup_request.second;
try {
bool result = ss->lookupFile(file);
this->file_looked_up.push(std::tuple(true, result, ""));
} catch (std::invalid_argument &e) {
this->file_looked_up.push(std::tuple(false, false, e.what()));
}

} else if (this->vm_to_suspend.tryPop(vm_id)) {

auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(vm_id.second);
auto vm_name = vm_id.first;
try {
cloud_cs->suspendVM(vm_name);
this->vm_suspended.push(std::pair(true, vm_name));
} catch (std::invalid_argument &e) {
this->vm_suspended.push(std::pair(false, e.what()));
}

} else if (this->vm_to_resume.tryPop(vm_id)) {

auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(vm_id.second);
auto vm_name = vm_id.first;
try {
cloud_cs->resumeVM(vm_name);
this->vm_resumed.push(std::pair(true, vm_name));
} catch (std::invalid_argument &e) {
this->vm_resumed.push(std::pair(false, e.what()));
}

} else {
} else if (this->things_to_do.tryPop(thing_to_do))
{
thing_to_do();
}
else {
break;
}
}
Expand Down Expand Up @@ -515,18 +424,23 @@ namespace wrench {
throw std::runtime_error("Unknown compute service " + cs_name);
}

BlockingQueue<std::pair<bool, std::string>> vm_created;

// Push the request into the blocking queue (will be a single one!)
this->vm_to_create.push(
std::pair(
std::tuple(num_cores,
ram_memory,
service_property_list,
service_message_payload_list),
cs));
this->things_to_do.push([num_cores, ram_memory, service_property_list, service_message_payload_list, cs, &vm_created](){
auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
std::string vm_name;
try {
vm_name = cloud_cs->createVM(num_cores, ram_memory, service_property_list, service_message_payload_list);
vm_created.push(std::pair(true, vm_name));
} catch (ExecutionException &e) {
vm_created.push(std::pair(false, e.getCause()->toString()));
}
});

// Pool from the shared queue (will be a single one!)
std::pair<bool, std::string> reply;
this->vm_created.waitAndPop(reply);
vm_created.waitAndPop(reply);
bool success = std::get<0>(reply);
if (not success) {
std::string error_msg = std::get<1>(reply);
Expand All @@ -553,12 +467,27 @@ namespace wrench {
throw std::runtime_error("Unknown compute service " + cs_name);
}

BlockingQueue<std::pair<bool, std::string>> vm_started;
// Push the request into the blocking queue (will be a single one!)
this->vm_to_start.push(std::pair(vm_name, cs));
this->things_to_do.push([this, vm_name, cs, &vm_started](){
auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
try {
if (not cloud_cs->isVMDown(vm_name)) {
throw std::invalid_argument("Cannot start VM because it's not down");
}
auto bm_cs = cloud_cs->startVM(vm_name);
this->compute_service_registry.insert(bm_cs->getName(), bm_cs);
vm_started.push(std::pair(true, bm_cs->getName()));
} catch (ExecutionException &e) {
vm_started.push(std::pair(false, e.getCause()->toString()));
} catch (std::invalid_argument &e) {
vm_started.push(std::pair(false, e.what()));
}
});

// Pool from the shared queue (will be a single one!)
std::pair<bool, std::string> reply;
this->vm_started.waitAndPop(reply);
vm_started.waitAndPop(reply);
bool success = std::get<0>(reply);
if (not success) {
std::string error_msg = std::get<1>(reply);
Expand All @@ -585,12 +514,31 @@ namespace wrench {
throw std::runtime_error("Unknown compute service " + cs_name);
}

BlockingQueue<std::pair<bool, std::string>> vm_shutdown;

// Push the request into the blocking queue (will be a single one!)
this->vm_to_shutdown.push(std::pair(vm_name, cs));
//this->vm_to_shutdown.push(std::pair(vm_name, cs));
this->things_to_do.push([this, vm_name, cs, &vm_shutdown](){
auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
try {
if (not cloud_cs->isVMRunning(vm_name)) {
throw std::invalid_argument("Cannot shutdown VM because it's not running");
}
auto bm_cs = cloud_cs->getVMComputeService(vm_name);

this->compute_service_registry.remove(bm_cs->getName());
cloud_cs->shutdownVM(vm_name);
vm_shutdown.push(std::pair(true, vm_name));
} catch (ExecutionException &e) {
vm_shutdown.push(std::pair(false, e.what()));
} catch (std::invalid_argument &e) {
vm_shutdown.push(std::pair(false, e.what()));
}
});

// Pool from the shared queue (will be a single one!)
std::pair<bool, std::string> reply;
this->vm_shutdown.waitAndPop(reply);
vm_shutdown.waitAndPop(reply);
bool success = std::get<0>(reply);
if (not success) {
std::string error_msg = std::get<1>(reply);
Expand All @@ -615,12 +563,25 @@ namespace wrench {
throw std::runtime_error("Unknown compute service " + cs_name);
}

BlockingQueue<std::pair<bool, std::string>> vm_destroyed;

// Push the request into the blocking queue (will be a single one!)
this->vm_to_destroy.push(std::pair(vm_name, cs));
this->things_to_do.push([this, vm_name, cs, &vm_destroyed](){
auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
try {
if (not cloud_cs->isVMDown(vm_name)) {
throw std::invalid_argument("Cannot destroy VM because it's not down");
}
cloud_cs->destroyVM(vm_name);
vm_destroyed.push(std::pair(true, vm_name));
} catch (std::invalid_argument &e) {
vm_destroyed.push(std::pair(false, e.what()));
}
});

// Pool from the shared queue (will be a single one!)
std::pair<bool, std::string> reply;
this->vm_destroyed.waitAndPop(reply);
vm_destroyed.waitAndPop(reply);
bool success = std::get<0>(reply);
if (not success) {
std::string error_msg = std::get<1>(reply);
Expand Down Expand Up @@ -700,12 +661,21 @@ namespace wrench {
throw std::runtime_error("Unknown file " + filename);
}

BlockingQueue<std::tuple<bool, bool, std::string>> file_looked_up;

// Push the request into the blocking queue (will be a single one!)
this->file_to_lookup.push(std::pair(file, ss));
this->things_to_do.push([this, file, ss, &file_looked_up](){
try {
bool result = ss->lookupFile(file);
file_looked_up.push(std::tuple(true, result, ""));
} catch (std::invalid_argument &e) {
file_looked_up.push(std::tuple(false, false, e.what()));
}
});

// Pool from the shared queue (will be a single one!)
std::tuple<bool, bool, std::string> reply;
this->file_looked_up.waitAndPop(reply);
file_looked_up.waitAndPop(reply);
bool success = std::get<0>(reply);
if (not success) {
std::string error_msg = std::get<2>(reply);
Expand Down Expand Up @@ -1101,11 +1071,21 @@ namespace wrench {
}

// Push the request into the blocking queue (will be a single one!)
this->vm_to_suspend.push(std::pair(vm_name, cs));
//this->vm_to_suspend.push(std::pair(vm_name, cs));
BlockingQueue<std::pair<bool, std::string>> vm_suspended;
this->things_to_do.push([this, vm_name, cs, &vm_suspended](){
auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
try {
cloud_cs->suspendVM(vm_name);
vm_suspended.push(std::pair(true, vm_name));
} catch (std::invalid_argument &e) {
vm_suspended.push(std::pair(false, e.what()));
}
});

// Pool from the shared queue (will be a single one!)
std::pair<bool, std::string> reply;
this->vm_suspended.waitAndPop(reply);
vm_suspended.waitAndPop(reply);
bool success = std::get<0>(reply);
if (not success) {
std::string error_msg = std::get<1>(reply);
Expand Down Expand Up @@ -1151,12 +1131,22 @@ namespace wrench {
throw std::runtime_error("Unknown compute service " + cs_name);
}

BlockingQueue<std::pair<bool, std::string>> vm_resumed;

// Push the request into the blocking queue (will be a single one!)
this->vm_to_resume.push(std::pair(vm_name, cs));
this->things_to_do.push([this, vm_name, cs, &vm_resumed](){
auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
try {
cloud_cs->resumeVM(vm_name);
vm_resumed.push(std::pair(true, vm_name));
} catch (std::invalid_argument &e) {
vm_resumed.push(std::pair(false, e.what()));
}
});

// Pool from the shared queue (will be a single one!)
std::pair<bool, std::string> reply;
this->vm_resumed.waitAndPop(reply);
vm_resumed.waitAndPop(reply);
bool success = std::get<0>(reply);
if (not success) {
std::string error_msg = std::get<1>(reply);
Expand All @@ -1177,7 +1167,7 @@ namespace wrench {
throw std::runtime_error("Unknown compute service " + cs_name);
}
auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
std::vector<std::string> execution_hosts_list = cloud_cs->getExecutionHosts();
std::vector<std::string> execution_hosts_list = cloud_cs->getHosts();
json answer {};
answer["execution_hosts"] = execution_hosts_list;
return answer;
Expand Down

0 comments on commit d95e161

Please sign in to comment.