Skip to content

Commit

Permalink
Updating the CSS so that a partial read / write on a file now reads f…
Browse files Browse the repository at this point in the history
…rom / writes to entire stripes/parts of files (not necessarily all of them in the same read/write action), rather than reading/writing the same amount of bytes from/to ALL stripes/parts of a file. Still under test, but seemingly very useful to heavily reduce the number of reads/writes, while somewhat preserving a parallel IO patterns in our simulated jobs using the CSS and a striping allocator
  • Loading branch information
MONNIOT Julien committed Nov 9, 2023
1 parent c0a09c5 commit 9573498
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ namespace wrench {

std::map<std::shared_ptr<DataFile>, std::vector<std::shared_ptr<FileLocation>>> file_location_mapping = {};

std::map<std::shared_ptr<DataFile>, unsigned int> partial_io_stripe_index;

StorageSelectionStrategyCallback &allocate;

/**
Expand Down
129 changes: 98 additions & 31 deletions src/wrench/services/storage/compound/CompoundStorageService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ namespace wrench {

if (!designated_locations.empty()) {
this->file_location_mapping[file] = designated_locations;
WRENCH_INFO("CSS::processStorageSelectionMessage(): Local mapping updated");
this->partial_io_stripe_index[file] = 0;
WRENCH_INFO("CSS::processStorageSelectionMessage(): Local mapping updated for file %s", file->getID().c_str());
}
S4U_Mailbox::dputMessage(
msg->answer_mailbox,
Expand Down Expand Up @@ -283,10 +284,11 @@ namespace wrench {
CompoundStorageServiceMessagePayload::STORAGE_SELECTION_PAYLOAD)));
} else {
auto mapped_locations = this->file_location_mapping[file];
WRENCH_INFO("CSS::processStorageLookupMessage(): File %s is known by this CompoundStorageService", file->getID().c_str());
for (const auto &loc : mapped_locations) {
WRENCH_INFO("CSS::processStorageLookupMessage(): File stripe %s is known by this CompoundStorageService and associated to storage service %s",
loc->getFile()->getID().c_str(),
loc->getStorageService()->getName().c_str());
WRENCH_DEBUG("CSS::processStorageLookupMessage(): File stripe %s is known by this CompoundStorageService and associated to storage service %s",
loc->getFile()->getID().c_str(),
loc->getStorageService()->getName().c_str());
}

S4U_Mailbox::dputMessage(
Expand Down Expand Up @@ -440,6 +442,7 @@ namespace wrench {

// Clean up local map
this->file_location_mapping.erase(location->getFile());
this->partial_io_stripe_index.erase(location->getFile());

// Collect traces
wrench::AllocationTrace trace;
Expand Down Expand Up @@ -809,47 +812,79 @@ namespace wrench {
double num_bytes_to_write,
bool wait_for_answer) {

WRENCH_INFO("CSS::writeFile(): Writing %f b to file %s", num_bytes_to_write, location->getFile()->getID().c_str());
WRENCH_INFO("CSS::writeFile(): Writing %fb to file %s", num_bytes_to_write, location->getFile()->getID().c_str());
this->assertServiceIsUp();

if (location == nullptr) {
throw std::invalid_argument("CSS::writeFile(): Invalid arguments (location is a nullptr)");
}

this->assertServiceIsUp();

// Find the file, or allocate file/parts of file onto known SSS
auto designated_locations = this->lookupOrDesignateStorageService(location);

if (designated_locations.empty()) {
WRENCH_WARN("'designated_locations' vector empry (error or lack of space from the allocator point of view)");
WRENCH_WARN("CSS:writeFile(): 'designated_locations' vector empry (error or lack of space from the allocator point of view)");
throw ExecutionException(std::make_shared<StorageServiceNotEnoughSpace>(location->getFile(), this->getSharedPtr<CompoundStorageService>()));
}

this->traceInternalStorageUse(IOAction::WriteStart, designated_locations);
WRENCH_INFO("CSS::writeFile(): Destination file %s has %zu stripes",
location->getFile()->getID().c_str(), designated_locations.size());
if (num_bytes_to_write > location->getFile()->getSize()) {
WRENCH_WARN("CSS:writeFile(): 'num_bytes_to_write' is larger than actual file size");
throw std::invalid_argument("'num_bytes_to_write' is larger than actual file size");
}

// Contact every SimpleStorageService that we want to use, and request a FileWrite
WRENCH_INFO("CSS:writeFile(): STARTING TO WORK ON STRIPES FOR FILE %s", location->getFile()->getID().c_str());
auto locations_start = this->partial_io_stripe_index[location->getFile()];
if (locations_start == designated_locations.size()) {
locations_start = 0;
this->partial_io_stripe_index[location->getFile()] = 0;
}

/* Find where to stop writing : starting from locations_start, we keep iterating up the vector until we reach the end or the amount of
bytes to write is depleted
*/
auto stripe_count = designated_locations.size();
while (num_bytes_to_write > 0 and locations_start != stripe_count) {
WRENCH_INFO(" - total_bytes_to_write = %f and current file (%s) is %f bytes",
num_bytes_to_write,
designated_locations[locations_start]->getFile()->getID().c_str(),
designated_locations[locations_start]->getFile()->getSize());
num_bytes_to_write -= designated_locations[locations_start]->getFile()->getSize();
locations_start++;
}
locations_start--;

// Writes are now going to take place on these locations only
WRENCH_INFO("CSS:writeFile(): Writing file %s from part %s to part %s",
location->getFile()->getID().c_str(),
designated_locations[this->partial_io_stripe_index[location->getFile()]]->getFile()->getID().c_str(),
designated_locations[locations_start]->getFile()->getID().c_str());

std::vector<std::shared_ptr<wrench::FileLocation>> designated_locations_subset(
designated_locations.begin() + this->partial_io_stripe_index[location->getFile()],
designated_locations.begin() + (++locations_start));
// UPDATE the progress on writing the file stripes
this->partial_io_stripe_index[location->getFile()] = locations_start;

this->traceInternalStorageUse(IOAction::WriteStart, designated_locations_subset);
WRENCH_INFO("CSS::writeFile(): Destination file %s has %zu stripes, and for now we'll be writing on %zu of these stripes",
location->getFile()->getID().c_str(),
designated_locations.size(),
designated_locations_subset.size());

// Contact every SimpleStorageService that we want to use, and request a FileWrite
unsigned int request_count = 0;
auto recv_mailbox = S4U_Mailbox::getTemporaryMailbox();
for (auto &dloc : designated_locations) {

if (num_bytes_to_write != dloc->getFile()->getSize()) {
WRENCH_DEBUG("CSS:writeFile(): Sending partial write request %d for <%f> b on file >> %s << (%f b) to %s",
request_count, num_bytes_to_write, dloc->getFile()->getID().c_str(), dloc->getFile()->getSize(), dloc->getStorageService()->getName().c_str());
} else {
WRENCH_DEBUG("CSS:writeFile(): Sending full write request %d on file %s (<%f> b) to %s",
request_count, dloc->getFile()->getID().c_str(), dloc->getFile()->getSize(), dloc->getStorageService()->getName().c_str());
}
for (auto &dloc : designated_locations_subset) {
WRENCH_DEBUG("CSS:writeFile(): Sending full write request %d on file %s (<%f> b) to %s",
request_count, dloc->getFile()->getID().c_str(), dloc->getFile()->getSize(), dloc->getStorageService()->getName().c_str());

S4U_Mailbox::dputMessage(
dloc->getStorageService()->mailbox,
new StorageServiceFileWriteRequestMessage(
recv_mailbox,
simgrid::s4u::this_actor::get_host(),
dloc,
num_bytes_to_write,
dloc->getFile()->getSize(),
this->getMessagePayloadValue(
CompoundStorageServiceMessagePayload::FILE_WRITE_REQUEST_MESSAGE_PAYLOAD)));
request_count++;
Expand All @@ -869,7 +904,6 @@ namespace wrench {
WRENCH_DEBUG("CSS::writeFile(): %u FileWriteRequests sent and validated", request_count);

for (const auto &msg : messages) {

// Update buffer size according to which storage service actually answered.
auto buffer_size = msg->buffer_size;

Expand Down Expand Up @@ -904,9 +938,9 @@ namespace wrench {
trace.internal_locations = designated_locations;
this->write_traces[location->getFile()->getID()] = trace;

this->traceInternalStorageUse(IOAction::WriteEnd, designated_locations);
this->traceInternalStorageUse(IOAction::WriteEnd, designated_locations_subset);

for (const auto &loc : designated_locations) {
for (const auto &loc : designated_locations_subset) {
WRENCH_DEBUG("CSS::writeFile(): For location %s, free space = %f", loc->getStorageService()->getName().c_str(), loc->getStorageService()->getTotalFreeSpace());
}

Expand All @@ -927,23 +961,56 @@ namespace wrench {
bool wait_for_answer) {
WRENCH_INFO("CSS::readFile(): Reading file %s", location->getFile()->getID().c_str());

assertServiceIsUp(this->shared_from_this());

if (!answer_mailbox or !location or (num_bytes < 0.0)) {
throw std::invalid_argument("StorageService::readFile(): Invalid nullptr/0 arguments");
}

assertServiceIsUp(this->shared_from_this());

auto designated_locations = this->lookupFileLocation(location);
if (designated_locations.empty()) {
throw ExecutionException(std::make_shared<FileNotFound>(location));
}

WRENCH_INFO("CSS:readFile(): STARTING TO WORK ON STRIPES FOR FILE %s", location->getFile()->getID().c_str());
auto locations_start = this->partial_io_stripe_index[location->getFile()];
if (locations_start == designated_locations.size()) {
locations_start = 0;
this->partial_io_stripe_index[location->getFile()] = 0;
}

/* Find where to stop reading : starting from locations_start, we keep iterating up the vector until we reach the end or the amount of
bytes to read is depleted
*/
auto stripe_count = designated_locations.size();
while (num_bytes > 0 and locations_start != stripe_count) {
WRENCH_INFO("CSS:readFile(): - num_bytes = %f and current file (%s) is %f bytes",
num_bytes,
designated_locations[locations_start]->getFile()->getID().c_str(),
designated_locations[locations_start]->getFile()->getSize());
num_bytes -= designated_locations[locations_start]->getFile()->getSize();
locations_start++;
}
locations_start--;

// Writes are now going to take place on these locations only
WRENCH_INFO("CSS:readFile(): Reading file %s from part %s to part %s",
location->getFile()->getID().c_str(),
designated_locations[this->partial_io_stripe_index[location->getFile()]]->getFile()->getID().c_str(),
designated_locations[locations_start]->getFile()->getID().c_str());

std::vector<std::shared_ptr<wrench::FileLocation>> designated_locations_subset(
designated_locations.begin() + this->partial_io_stripe_index[location->getFile()],
designated_locations.begin() + (++locations_start));
// UPDATE the progress on writing the file stripes
this->partial_io_stripe_index[location->getFile()] = locations_start;

// Contact every SSS
auto recv_mailbox = S4U_Mailbox::getTemporaryMailbox();
auto request_count = 0;
for (const auto &dloc : designated_locations) {
for (const auto &dloc : designated_locations_subset) {
WRENCH_DEBUG("CSS::readFile(): Sending read file request for %f bytes on file %s at path %s, to storage service %s",
num_bytes,
dloc->getFile()->getSize(),
dloc->getFile()
->getID()
.c_str(),
Expand All @@ -956,7 +1023,7 @@ namespace wrench {
recv_mailbox,
simgrid::s4u::this_actor::get_host(),
dloc,
num_bytes,
dloc->getFile()->getSize(),
dloc->getStorageService()->getMessagePayloadValue(
CompoundStorageServiceMessagePayload::FILE_READ_REQUEST_MESSAGE_PAYLOAD)));
request_count++;
Expand Down Expand Up @@ -1020,7 +1087,7 @@ namespace wrench {
wrench::AllocationTrace trace;
trace.ts = S4U_Simulation::getClock();
trace.act = IOAction::ReadEnd;
trace.internal_locations = designated_locations;
trace.internal_locations = designated_locations_subset;
// this->read_traces[location->getFile()->getID()] = trace;
}

Expand Down

0 comments on commit 9573498

Please sign in to comment.