Skip to content

Commit

Permalink
Implemented backfilling depth
Browse files Browse the repository at this point in the history
  • Loading branch information
henricasanova committed Dec 20, 2024
1 parent aa70603 commit 2d86933
Show file tree
Hide file tree
Showing 16 changed files with 80 additions and 29 deletions.
2 changes: 1 addition & 1 deletion include/wrench/job/StandardJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ namespace wrench {

unsigned long getMinimumRequiredNumCores() const;

double getMinimumRequiredMemory() const;
sg_size_t getMinimumRequiredMemory() const;

unsigned long getNumCompletedTasks() const;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ namespace wrench {
{BatchComputeServiceProperty::BATCH_QUEUE_ORDERING_ALGORITHM, "fcfs"},
#else
{BatchComputeServiceProperty::BATCH_SCHEDULING_ALGORITHM, "fcfs"},
{BatchComputeServiceProperty::BACKFILLING_DEPTH, "infinity"},
#endif
{BatchComputeServiceProperty::BATCH_RJMS_PADDING_DELAY, "5"},
{BatchComputeServiceProperty::SIMULATED_WORKLOAD_TRACE_FILE, ""},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@ namespace wrench {
**/
DECLARE_PROPERTY_NAME(BATCH_SCHEDULING_ALGORITHM);

/**
* @brief The backfilling depth used by backfilling algorithms, i.e., how far they go down
* the batch queue to consider a waiting job for backfilling (e.g., "0" (no backfilling done), "10", "infinity").
* Default: "infinity".
*
* - If ENABLE_BATSCHED is set to off / not set:
* - if the BATCH_SCHEDULING_ALGORITHM is "fcfs": ignored
*
* - If ENABLE_BATSCHED is set to on: ignored
**/
DECLARE_PROPERTY_NAME(BACKFILLING_DEPTH);

/**
* @brief The batch queue ordering algorithm. Can be:
* - If ENABLE_BATSCHED is set to off / not set: ignored
Expand Down
2 changes: 1 addition & 1 deletion include/wrench/services/compute/batch/BatchJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace wrench {
void setRequestedTime(unsigned long time);
unsigned long getRequestedCoresPerNode() const;
std::string getUsername();
double getMemoryRequirement();
sg_size_t getMemoryRequirement();
double getBeginTimestamp() const;
void setBeginTimestamp(double time_stamp);
double getEndingTimestamp() const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace wrench {
class ConservativeBackfillingBatchScheduler : public HomegrownBatchScheduler {

public:
explicit ConservativeBackfillingBatchScheduler(BatchComputeService *cs);
explicit ConservativeBackfillingBatchScheduler(BatchComputeService *cs, unsigned long backfilling_depth);

void processQueuedJobs() override;

Expand All @@ -43,6 +43,7 @@ namespace wrench {
getStartTimeEstimates(std::set<std::tuple<std::string, unsigned long, unsigned long, sg_size_t>> set_of_jobs) override;

private:
unsigned long _backfilling_depth;
std::unique_ptr<NodeAvailabilityTimeLine> schedule;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace wrench {
class ConservativeBackfillingBatchSchedulerCoreLevel : public HomegrownBatchScheduler {

public:
explicit ConservativeBackfillingBatchSchedulerCoreLevel(BatchComputeService *cs);
explicit ConservativeBackfillingBatchSchedulerCoreLevel(BatchComputeService *cs, unsigned long backfilling_depth);

void processQueuedJobs() override;

Expand All @@ -44,6 +44,7 @@ namespace wrench {

private:
std::unique_ptr<CoreAvailabilityTimeLine> schedule;
unsigned long _backfilling_depth;
};


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace wrench {
class EasyBackfillingBatchScheduler : public HomegrownBatchScheduler {

public:
explicit EasyBackfillingBatchScheduler(BatchComputeService *cs, int depth);
explicit EasyBackfillingBatchScheduler(BatchComputeService *cs, int depth, unsigned long backfilling_depth);

void processQueuedJobs() override;

Expand All @@ -42,6 +42,7 @@ namespace wrench {

private:
std::unique_ptr<NodeAvailabilityTimeLine> schedule;
unsigned long _backfilling_depth;
int _depth;
};

Expand Down
6 changes: 3 additions & 3 deletions src/wrench/job/StandardJob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ namespace wrench {
* one task in the job cannot run if less ram than this minimum is available)
* @return the number of cores
*/
double StandardJob::getMinimumRequiredMemory() const {
double max_ram = 0;
sg_size_t StandardJob::getMinimumRequiredMemory() const {
sg_size_t max_ram = 0;
for (auto const &t: tasks) {
max_ram = std::max<double>(max_ram, (double) (t->getMemoryRequirement()));
max_ram = std::max<sg_size_t>(max_ram, t->getMemoryRequirement());
}
return max_ram;
}
Expand Down
32 changes: 23 additions & 9 deletions src/wrench/services/compute/batch/BatchComputeService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ namespace wrench {
std::string scratch_space_mount_point,
WRENCH_PROPERTY_COLLECTION_TYPE property_list,
WRENCH_MESSAGE_PAYLOAD_COLLECTION_TYPE messagepayload_list) : BatchComputeService(hostname, std::move(compute_hosts), ComputeService::ALL_CORES,
ComputeService::ALL_RAM, scratch_space_mount_point, std::move(property_list),
std::move(messagepayload_list), "") {}
ComputeService::ALL_RAM, scratch_space_mount_point, std::move(property_list),
std::move(messagepayload_list), "") {}

/**
* @brief Constructor
Expand Down Expand Up @@ -201,13 +201,27 @@ namespace wrench {
if (batch_scheduling_alg == "fcfs") {
this->scheduler = std::unique_ptr<BatchScheduler>(new FCFSBatchScheduler(this));
} else if (batch_scheduling_alg == "conservative_bf") {
this->scheduler = std::unique_ptr<BatchScheduler>(new ConservativeBackfillingBatchScheduler(this));
this->scheduler = std::unique_ptr<BatchScheduler>(
new ConservativeBackfillingBatchScheduler(
this,
this->getPropertyValueAsUnsignedLong(BatchComputeServiceProperty::BACKFILLING_DEPTH)));
} else if (batch_scheduling_alg == "easy_bf_depth0") {
this->scheduler = std::unique_ptr<BatchScheduler>(new EasyBackfillingBatchScheduler(this, 0));
this->scheduler = std::unique_ptr<BatchScheduler>(
new EasyBackfillingBatchScheduler(
this,
0,
this->getPropertyValueAsUnsignedLong(BatchComputeServiceProperty::BACKFILLING_DEPTH)));
} else if (batch_scheduling_alg == "easy_bf_depth1") {
this->scheduler = std::unique_ptr<BatchScheduler>(new EasyBackfillingBatchScheduler(this, 1));
this->scheduler = std::unique_ptr<BatchScheduler>(
new EasyBackfillingBatchScheduler(
this,
1,
this->getPropertyValueAsUnsignedLong(BatchComputeServiceProperty::BACKFILLING_DEPTH)));
} else if (batch_scheduling_alg == "conservative_bf_core_level") {
this->scheduler = std::unique_ptr<BatchScheduler>(new ConservativeBackfillingBatchSchedulerCoreLevel(this));
this->scheduler = std::unique_ptr<BatchScheduler>(
new ConservativeBackfillingBatchSchedulerCoreLevel(
this,
this->getPropertyValueAsUnsignedLong(BatchComputeServiceProperty::BACKFILLING_DEPTH)));
}
#endif

Expand Down Expand Up @@ -772,7 +786,7 @@ namespace wrench {
return true;

#ifdef ENABLE_BATSCHED
} else if (auto msg = std::dynamic_pointer_cast<BatchExecuteJobFromBatSchedMessage>(message)) {
} else if (auto msg = std::dynamic_pointer_cast<BatchExecuteJobFromBatSchedMessage>(message)) {
processExecuteJobFromBatSched(msg->batsched_decision_reply);
return true;
#endif
Expand Down Expand Up @@ -804,7 +818,7 @@ namespace wrench {
unsigned long requested_hosts = job->getRequestedNumNodes();
unsigned long requested_num_cores_per_host = job->getRequestedCoresPerNode();

double required_ram_per_host = job->getMemoryRequirement();
sg_size_t required_ram_per_host = job->getMemoryRequirement();

if ((requested_hosts > this->available_nodes_to_cores.size()) or
(requested_num_cores_per_host >
Expand Down Expand Up @@ -1066,7 +1080,7 @@ namespace wrench {
} else if (key == "ram_availabilities") {
// RAM availability per host (0 if something is running, full otherwise)
for (const auto &h: this->available_nodes_to_cores) {
if ((double) h.second < S4U_Simulation::getHostMemoryCapacity(h.first)) {
if (h.second < S4U_Simulation::getHostMemoryCapacity(h.first)) {
dict.insert(std::make_pair(h.first->get_name(), 0.0));
} else {
dict.insert(std::make_pair(h.first->get_name(), S4U_Simulation::getHostMemoryCapacity(h.first)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace wrench {
SET_PROPERTY_NAME(BatchComputeServiceProperty, TASK_SELECTION_ALGORITHM);

SET_PROPERTY_NAME(BatchComputeServiceProperty, BATCH_SCHEDULING_ALGORITHM);
SET_PROPERTY_NAME(BatchComputeServiceProperty, BACKFILLING_DEPTH);
SET_PROPERTY_NAME(BatchComputeServiceProperty, BATCH_QUEUE_ORDERING_ALGORITHM);
SET_PROPERTY_NAME(BatchComputeServiceProperty, BATCH_RJMS_PADDING_DELAY);

Expand Down
2 changes: 1 addition & 1 deletion src/wrench/services/compute/batch/BatchJob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ namespace wrench {
* @brief Get the memory_manager_service requirement
* @return a size in bytes
*/
double BatchJob::getMemoryRequirement() {
sg_size_t BatchJob::getMemoryRequirement() {
return this->compound_job->getMinimumRequiredMemory();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@ namespace wrench {
/**
* @brief Constructor
* @param cs: The BatchComputeService for which this scheduler is working
* @param backfilling_depth: the backfilling depth
*/
ConservativeBackfillingBatchScheduler::ConservativeBackfillingBatchScheduler(BatchComputeService *cs) : HomegrownBatchScheduler(cs) {
ConservativeBackfillingBatchScheduler::ConservativeBackfillingBatchScheduler(
BatchComputeService *cs,
unsigned long backfilling_depth) : HomegrownBatchScheduler(cs) {
this->schedule = std::make_unique<NodeAvailabilityTimeLine>(cs->total_num_of_nodes);
this->_backfilling_depth = backfilling_depth;
}

/**
Expand Down Expand Up @@ -125,11 +129,13 @@ namespace wrench {

// Reset the time origin
auto now = (u_int32_t) Simulation::getCurrentSimulatedDate();
this->schedule->setTimeOrigin(now);
this->schedule->setTimeOrigin(now);;

// Go through the BatchComputeService queue
for (auto const &batch_job: this->cs->batch_queue) {
// for (auto const &batch_job: this->cs->batch_queue) {
for (unsigned long i=0; i < std::min(this->_backfilling_depth, this->cs->batch_queue.size()); i++) {
// WRENCH_INFO("DEALING WITH JOB %lu", batch_job->getJobID());
auto batch_job = this->cs->batch_queue.at(i);

// Remove the job from the schedule
// WRENCH_INFO("REMOVING IT FROM SCHEDULE");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ namespace wrench {
/**
* @brief Constructor
* @param cs: The BatchComputeService for which this scheduler is working
* @param backfilling_depth: the backfilling depth
*/
ConservativeBackfillingBatchSchedulerCoreLevel::ConservativeBackfillingBatchSchedulerCoreLevel(BatchComputeService *cs) : HomegrownBatchScheduler(cs) {
ConservativeBackfillingBatchSchedulerCoreLevel::ConservativeBackfillingBatchSchedulerCoreLevel(BatchComputeService *cs,
unsigned long backfilling_depth) : HomegrownBatchScheduler(cs) {
this->schedule = std::make_unique<CoreAvailabilityTimeLine>(cs->total_num_of_nodes, cs->num_cores_per_node);
this->_backfilling_depth = backfilling_depth;
}

/**
Expand Down Expand Up @@ -137,7 +140,9 @@ namespace wrench {
this->schedule->setTimeOrigin(now);

// Go through the BatchComputeService queue
for (auto const &batch_job: this->cs->batch_queue) {
// for (auto const &batch_job: this->cs->batch_queue) {
for (unsigned long i=0; i < std::min(this->_backfilling_depth, this->cs->batch_queue.size()); i++) {
auto batch_job = this->cs->batch_queue.at(i);
// WRENCH_INFO("DEALING WITH JOB %lu", batch_job->getJobID());

// Remove the job from the schedule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@ namespace wrench {
/**
* @brief Constructor
* @param cs: The BatchComputeService for which this scheduler is working
* @param backfilling_depth: the backfilling depth
*/
EasyBackfillingBatchScheduler::EasyBackfillingBatchScheduler(BatchComputeService *cs, int depth) : HomegrownBatchScheduler(cs) {
EasyBackfillingBatchScheduler::EasyBackfillingBatchScheduler(
BatchComputeService *cs,
int depth,
unsigned long backfilling_depth) : HomegrownBatchScheduler(cs) {
this->schedule = std::make_unique<NodeAvailabilityTimeLine>(cs->total_num_of_nodes);
this->_depth = depth;
this->_backfilling_depth = backfilling_depth;
}

/**
Expand Down Expand Up @@ -106,7 +111,13 @@ namespace wrench {
// BACKFILLING: Go through all the other jobs, and start each one that can start right now
// (without hurting the first job in the queue if the depth is 1)
unsigned long num_nodes_available_now = this->schedule->getNumAvailableNodesInFirstSlot();
for (unsigned int i = first_job_not_started + 1; i < this->cs->batch_queue.size(); i++) {
unsigned long loop_upper_bound;
if (this->_backfilling_depth > this->cs->batch_queue.size() - first_job_not_started + 1) {
loop_upper_bound = this->cs->batch_queue.size();
} else {
loop_upper_bound = std::min(first_job_not_started + 1 + this->_backfilling_depth, this->cs->batch_queue.size());
}
for (unsigned int i = first_job_not_started + 1; i < loop_upper_bound; i++) {
auto candidate_job = this->cs->batch_queue.at(i);

// If the job's already started, forget it
Expand Down
3 changes: 0 additions & 3 deletions src/wrench/simgrid_S4U_util/S4U_Simulation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <iostream>
#include <set>
#include <climits>
#include <cfloat>
#include <wrench/util/UnitParser.h>
#include <simgrid/plugins/energy.h>
#include <simgrid/plugins/file_system.h>
Expand All @@ -23,10 +22,8 @@

#include <simgrid/version.h>

#include <fstream>
#include <wrench/simgrid_S4U_util/S4U_Simulation.h>
#include "smpi/smpi.h"
#include <cstdio>


WRENCH_LOG_CATEGORY(wrench_core_s4u_simulation, "Log category for S4U_Simulation");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ void BatchServiceEASY_BFTest::do_EASY_BF_test(int num_compute_nodes,
ASSERT_NO_THROW(compute_service = simulation->add(
new wrench::BatchComputeService(hostname, compute_hosts, "",
{{wrench::BatchComputeServiceProperty::BATCH_SCHEDULING_ALGORITHM, scheduling_algorithm},
{wrench::BatchComputeServiceProperty::BATCH_RJMS_PADDING_DELAY, "0"}})));
{wrench::BatchComputeServiceProperty::BATCH_RJMS_PADDING_DELAY, "0"},
{wrench::BatchComputeServiceProperty::BACKFILLING_DEPTH, "infinity"}})));

simulation->add(new wrench::FileRegistryService(hostname));

Expand Down

0 comments on commit 2d86933

Please sign in to comment.