Simulating DAG

This tutorial presents the basics to understand how DAG are represented in SimGrid and how to simulate their workflow.

Definition of a DAG

Directed Acyclic Graph:

\[\mathcal{G} = (\mathcal{V},\mathcal{E})\]

Set of vertices representing Activities:

\[\mathcal{V} = {v_i | i = 1, ..., V}\]

Set of edges representing precedence constraints between Activities:

\[\mathcal{E} = {e_i,j | (i,j) \in {1, ..., V} x {1, ..., V}}\]
_images/dag.svg

Representing Vertices/Activities

There is two types of Activities that can represent Vertices: Exec and Comm. Thoses activities must be initiated and configured to properly describe your worflow.

An Exec represents the execution of an amount of flop on a Host of your platform.

ExecPtr exec = Exec::init();
exec->set_flops_amount(int);
exec->set_host(Host*);
exec->start();

A Comm represents a data transfer between two Hosts of your platform.

CommPtr comm = Comm::sendto_init();
comm->set_source(Host*);
comm->set_destination(Host*);
comm->start();

Representing Edges/Dependencies

An activity will not start until all of its dependencies have been completed. Activities may have any number of successors. Dependencies between Activities are created using simgrid::s4u::Activity::add_successor().

exec->add_successor(comm);

The Activity comm will not start until exec has been completed.

Lab 1: Basics

The goal of this lab is to describe the following DAG:

_images/dag1.svg

In this DAG we want c1 to compute 1e9 flops, c2 to compute 5e9 flops and c3 to compute 2e9 flops. There is also a data transfer of 5e8 bytes between c1 and c3.

First of all, include the SimGrid library and define the log category.

#include "simgrid/s4u.hpp"

XBT_LOG_NEW_DEFAULT_CATEGORY(dag_tuto, "Messages specific for this s4u tutorial");

Inside the main function create an instance of Engine and load the platform.

  simgrid::s4u::Engine e(&argc, argv);
  e.load_platform(argv[1]);

Retrieve pointers to some hosts.

  simgrid::s4u::Host* tremblay = e.host_by_name("Tremblay");
  simgrid::s4u::Host* jupiter  = e.host_by_name("Jupiter");

Initiate the activities.

  simgrid::s4u::ExecPtr c1 = simgrid::s4u::Exec::init();
  simgrid::s4u::ExecPtr c2 = simgrid::s4u::Exec::init();
  simgrid::s4u::ExecPtr c3 = simgrid::s4u::Exec::init();
  simgrid::s4u::CommPtr t1 = simgrid::s4u::Comm::sendto_init();

Give names to thoses activities.

  c1->set_name("c1");
  c2->set_name("c2");
  c3->set_name("c3");
  t1->set_name("t1");

Set the amount of work for each activity.

  c1->set_flops_amount(1e9);
  c2->set_flops_amount(5e9);
  c3->set_flops_amount(2e9);
  t1->set_payload_size(5e8);

Define the dependencies between the activities.

  c1->add_successor(t1);
  t1->add_successor(c3);
  c2->add_successor(c3);

Set the location of each Exec activity and source and destination for the Comm activity.

  c1->set_host(tremblay);
  c2->set_host(jupiter);
  c3->set_host(jupiter);
  t1->set_source(tremblay);
  t1->set_destination(jupiter);

Start the executions of Activities without dependencies.

  c1->start();
  c2->start();

Add a callback to monitor the Exec activities.

  simgrid::s4u::Exec::on_completion_cb([](simgrid::s4u::Exec const& exec) {
    XBT_INFO("Exec '%s' is complete (start time: %f, finish time: %f)", exec.get_cname(),
             exec.get_start_time(), exec.get_finish_time());
  });

Add a callback to monitor the Comm activities.

  simgrid::s4u::Comm::on_completion_cb([](simgrid::s4u::Comm const& comm) {
    XBT_INFO("Comm '%s' is complete (start time: %f, finish time: %f)", comm.get_cname(),
             comm.get_start_time(), comm.get_finish_time());
  });

Finally, run the simulation.

  e.run();

The execution of this code should give you the following output:

> [10.194200] [dag_tuto/INFO] Exec 'c1' is complete (start time: 0.000000, finish time: 10.194200)
> [65.534235] [dag_tuto/INFO] Exec 'c2' is complete (start time: 0.000000, finish time: 65.534235)
> [85.283378] [dag_tuto/INFO] Comm 't1' is complete (start time: 10.194200, finish time: 85.283378)
> [111.497072] [dag_tuto/INFO] Exec 'c3' is complete (start time: 85.283378, finish time: 111.497072)

Lab 2: Import a DAG from a file

In this lab we present how to import a DAG into you SimGrid simulation, either using a DOT file, a JSON file, or a DAX file.

The files presented in this lab describe the following DAG:

_images/dag2.svg

From a DOT file

A DOT file describes a workflow in accordance with the graphviz format.

The following DOT file describes the workflow presented at the beginning of this lab:

digraph G {
  c1 [size="1e9"];
  c2 [size="5e9"];
  c3 [size="2e9"];
  root->c1 [size="2e8"];
  root->c2 [size="1e8"];
  c1->c3 [size="5e8"];
  c2->c3 [size="-1."];
  c3->end [size="2e8"];
}

It can be imported as a vector of Activities into SimGrid using simgrid::s4u::create_DAG_from_DOT(). Then, you have to assign hosts to your Activities.

/* Copyright (c) 2003-2024. The SimGrid Team. All rights reserved.          */

/* This program is free software; you can redistribute it and/or modify it
 * under the terms of the license (GNU LGPL) which comes with this package. */

#include "simgrid/s4u.hpp"

XBT_LOG_NEW_DEFAULT_CATEGORY(dag_from_dot_simple, "Messages specific for this s4u example");

int main(int argc, char* argv[])
{
  simgrid::s4u::Engine e(&argc, argv);
  e.load_platform(argv[1]);

  std::vector<simgrid::s4u::ActivityPtr> dag = simgrid::s4u::create_DAG_from_dot(argv[2]);

  simgrid::s4u::Host* tremblay = e.host_by_name("Tremblay");
  simgrid::s4u::Host* jupiter  = e.host_by_name("Jupiter");
  simgrid::s4u::Host* fafard   = e.host_by_name("Fafard");

  dynamic_cast<simgrid::s4u::Exec*>(dag[0].get())->set_host(fafard);
  dynamic_cast<simgrid::s4u::Exec*>(dag[1].get())->set_host(tremblay);
  dynamic_cast<simgrid::s4u::Exec*>(dag[2].get())->set_host(jupiter);
  dynamic_cast<simgrid::s4u::Exec*>(dag[3].get())->set_host(jupiter);
  dynamic_cast<simgrid::s4u::Exec*>(dag[8].get())->set_host(jupiter);

  for (const auto& a : dag) {
    if (auto* comm = dynamic_cast<simgrid::s4u::Comm*>(a.get())) {
      const auto* pred = dynamic_cast<simgrid::s4u::Exec*>((*comm->get_dependencies().begin()).get());
      const auto* succ = dynamic_cast<simgrid::s4u::Exec*>(comm->get_successors().front().get());
      comm->set_source(pred->get_host())->set_destination(succ->get_host());
    }
  }

  simgrid::s4u::Exec::on_completion_cb([](simgrid::s4u::Exec const& exec) {
    XBT_INFO("Exec '%s' is complete (start time: %f, finish time: %f)", exec.get_cname(),
             exec.get_start_time(), exec.get_finish_time());
  });

  simgrid::s4u::Comm::on_completion_cb([](simgrid::s4u::Comm const& comm) {
    XBT_INFO("Comm '%s' is complete (start time: %f, finish time: %f)", comm.get_cname(),
             comm.get_start_time(), comm.get_finish_time());
  });

  e.run();
  return 0;
}

The execution of this code should give you the following output:

> [0.000000] [dag_from_dot_simple/INFO] Exec 'root' is complete (start time: 0.000000, finish time: 0.000000)
> [33.394394] [dag_from_dot_simple/INFO] Comm 'root->c2' is complete (start time: 0.000000, finish time: 33.394394)
> [39.832311] [dag_from_dot_simple/INFO] Comm 'root->c1' is complete (start time: 0.000000, finish time: 39.832311)
> [50.026511] [dag_from_dot_simple/INFO] Exec 'c1' is complete (start time: 39.832311, finish time: 50.026511)
> [98.928629] [dag_from_dot_simple/INFO] Exec 'c2' is complete (start time: 33.394394, finish time: 98.928629)
> [125.115689] [dag_from_dot_simple/INFO] Comm 'c1->c3' is complete (start time: 50.026511, finish time: 125.115689)
> [151.329383] [dag_from_dot_simple/INFO] Exec 'c3' is complete (start time: 125.115689, finish time: 151.329383)
> [151.743605] [dag_from_dot_simple/INFO] Comm 'c3->end' is complete (start time: 151.329383, finish time: 151.743605)
> [151.743605] [dag_from_dot_simple/INFO] Exec 'end' is complete (start time: 151.743605, finish time: 151.743605)

From a JSON file

A JSON file describes a workflow in accordance with the wfformat .

The following JSON file describes the workflow presented at the beginning of this lab:

{
  "name": "simple_json",
  "schemaVersion": "1.0",
  "workflow": {
    "makespanInSeconds": 0,
    "executedAt": "2023-03-09T00:00:00-00:00",
    "tasks": [
      {
        "name": "c1",
        "type": "compute",
        "parents": [],
        "runtimeInSeconds": 1e9,
        "machine": "Tremblay"
      },
      {
        "name": "t1",
        "type": "transfer",
        "parents": ["c1"],
        "writtenBytes": 5e8,
        "machine": "Jupiter"
      },
      {
        "name": "c2",
        "type": "compute",
        "parents": [],
        "runtimeInSeconds": 5e9,
        "machine": "Jupiter"
      },
      {
        "name": "c3",
        "type": "compute",
        "parents": ["t1","c2"],
        "runtimeInSeconds": 2e9,
        "machine": "Jupiter"
      }
    ],
    "machines": [
      {"nodeName": "Tremblay"},
      {"nodeName": "Jupiter"}
    ]
  }
}

It can be imported as a vector of Activities into SimGrid using simgrid::s4u::create_DAG_from_json().

/* Copyright (c) 2003-2024. The SimGrid Team. All rights reserved.          */

/* This program is free software; you can redistribute it and/or modify it
 * under the terms of the license (GNU LGPL) which comes with this package. */

#include "simgrid/s4u.hpp"

XBT_LOG_NEW_DEFAULT_CATEGORY(dag_from_json_simple, "Messages specific for this s4u example");

int main(int argc, char* argv[])
{
  simgrid::s4u::Engine e(&argc, argv);
  e.load_platform(argv[1]);

  std::vector<simgrid::s4u::ActivityPtr> dag = simgrid::s4u::create_DAG_from_json(argv[2]);

   simgrid::s4u::Exec::on_completion_cb([](simgrid::s4u::Exec const& exec) {
    XBT_INFO("Exec '%s' is complete (start time: %f, finish time: %f)", exec.get_cname(),
             exec.get_start_time(), exec.get_finish_time());
  });

  simgrid::s4u::Comm::on_completion_cb([](simgrid::s4u::Comm const& comm) {
    XBT_INFO("Comm '%s' is complete (start time: %f, finish time: %f)", comm.get_cname(),
             comm.get_start_time(), comm.get_finish_time());
  });

  e.run();
  return 0;
}

The execution of this code should give you the following output:

> [10.194200] [dag_from_json_simple/INFO] Exec 'c1' is complete (start time: 0.000000, finish time: 10.194200)
> [65.534235] [dag_from_json_simple/INFO] Exec 'c2' is complete (start time: 0.000000, finish time: 65.534235)
> [85.283378] [dag_from_json_simple/INFO] Comm 't1' is complete (start time: 10.194200, finish time: 85.283378)
> [111.497072] [dag_from_json_simple/INFO] Exec 'c3' is complete (start time: 85.283378, finish time: 111.497072)

From a DAX file [deprecated]

A DAX file describes a workflow in accordance with the Pegasus format.

The following DAX file describes the workflow presented at the beginning of this lab:

<?xml version="1.0" encoding="UTF-8"?>
<adag xmlns="http://pegasus.isi.edu/schema/DAX" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://pegasus.isi.edu/schema/DAX http://pegasus.isi.edu/schema/dax-2.1.xsd"
      version="2.1">
  <job id="1" name="c1" runtime="10">
    <uses file="i1" link="input" register="true" transfer="true" optional="false" type="data" size="2e8"/>
    <uses file="o1" link="output" register="true" transfer="true" optional="false" type="data" size="5e8"/>
  </job>
  <job id="2" name="c2" runtime="50">
    <uses file="i2" link="input" register="true" transfer="true" optional="false" type="data" size="1e8"/>
  </job>
  <job id="3" name="c3" runtime="20">
    <uses file="o1" link="input" register="true" transfer="true" optional="false" type="data" size="5e8"/>
    <uses file="o3" link="output" register="true" transfer="true" optional="false" type="data" size="2e8"/>
  </job>
  <child ref="3">
    <parent ref="1"/>
    <parent ref="2"/>
  </child>
</adag>

It can be imported as a vector of Activities into SimGrid using simgrid::s4u::create_DAG_from_DAX().

/* Copyright (c) 2003-2024. The SimGrid Team. All rights reserved.          */

/* This program is free software; you can redistribute it and/or modify it
 * under the terms of the license (GNU LGPL) which comes with this package. */

#include "simgrid/s4u.hpp"

XBT_LOG_NEW_DEFAULT_CATEGORY(dag_from_dax_simple, "Messages specific for this s4u example");

int main(int argc, char* argv[])
{
  simgrid::s4u::Engine e(&argc, argv);
  e.load_platform(argv[1]);

  std::vector<simgrid::s4u::ActivityPtr> dag = simgrid::s4u::create_DAG_from_DAX(argv[2]);

  simgrid::s4u::Host* tremblay = e.host_by_name("Tremblay");
  simgrid::s4u::Host* jupiter  = e.host_by_name("Jupiter");
  simgrid::s4u::Host* fafard   = e.host_by_name("Fafard");

  dynamic_cast<simgrid::s4u::Exec*>(dag[0].get())->set_host(fafard);
  dynamic_cast<simgrid::s4u::Exec*>(dag[1].get())->set_host(tremblay);
  dynamic_cast<simgrid::s4u::Exec*>(dag[2].get())->set_host(jupiter);
  dynamic_cast<simgrid::s4u::Exec*>(dag[3].get())->set_host(jupiter);
  dynamic_cast<simgrid::s4u::Exec*>(dag[8].get())->set_host(jupiter);

  for (const auto& a : dag) {
    if (auto* comm = dynamic_cast<simgrid::s4u::Comm*>(a.get())) {
      const auto* pred = dynamic_cast<simgrid::s4u::Exec*>((*comm->get_dependencies().begin()).get());
      const auto* succ = dynamic_cast<simgrid::s4u::Exec*>(comm->get_successors().front().get());
      comm->set_source(pred->get_host())->set_destination(succ->get_host());
    }
  }

  simgrid::s4u::Exec::on_completion_cb([](simgrid::s4u::Exec const& exec) {
    XBT_INFO("Exec '%s' is complete (start time: %f, finish time: %f)", exec.get_cname(),
             exec.get_start_time(), exec.get_finish_time());
  });

  simgrid::s4u::Comm::on_completion_cb([](simgrid::s4u::Comm const& comm) {
    XBT_INFO("Comm '%s' is complete (start time: %f, finish time: %f)", comm.get_cname(),
             comm.get_start_time(), comm.get_finish_time());
  });

  e.run();
  return 0;
}

Lab 3: Scheduling with the Min-Min algorithm

In this lab we present how to schedule activities imported from a DAX file using the Min-Min algorithm.

The source code for this lab can be found here.

For code readability we first create the sg4 namespace.

namespace sg4 = simgrid::s4u;

The core mechanism of the algorithm lies in three functions. They respectively serve the purpose of finding tasks to schedule, finding the best host to execute them and properly scheduling them.

Find Tasks to Schedule

The role of this function is to retrieve tasks that are ready to be scheduled, i.e, that have their dependencies solved.

static std::vector<sg4::Exec*> get_ready_tasks(const std::vector<sg4::ActivityPtr>& dax)
{
  std::vector<sg4::Exec*> ready_tasks;
  std::map<sg4::Exec*, unsigned int> candidate_execs;

  for (const auto& a : dax) {
    // Only look at activity that have their dependencies solved but are not assigned
    if (a->dependencies_solved() && not a->is_assigned()) {
      // if it is an exec, it's ready
      if (auto* exec = dynamic_cast<sg4::Exec*>(a.get()))
        ready_tasks.push_back(exec);
      // if it a comm, we consider its successor as a candidate. If a candidate solves all its dependencies,
      // i.e., get all its input data, it's ready
      if (const auto* comm = dynamic_cast<sg4::Comm*>(a.get())) {
        auto* next_exec = static_cast<sg4::Exec*>(comm->get_successors().front().get());
        candidate_execs[next_exec]++;
        if (next_exec->get_dependencies().size() == candidate_execs[next_exec])
          ready_tasks.push_back(next_exec);
      }
    }
  }
  XBT_DEBUG("There are %zu ready tasks", ready_tasks.size());
  return ready_tasks;
}

Find the Best Placement

Once we have a task ready to be scheduled, we need to find the best placement for it. This is done by evaluating the earliest finish time among all hosts. It depends on the duration of the data transfers of the parents of this task to this host.

static sg4::Host* get_best_host(const sg4::ExecPtr exec, double* min_finish_time)
{
  sg4::Host* best_host = nullptr;
  *min_finish_time = std::numeric_limits<double>::max();

  for (const auto& host : sg4::Engine::get_instance()->get_all_hosts()) {
    double data_available      = 0.;
    double last_data_available = -1.0;
    /* compute last_data_available */
    for (const auto& parent : exec->get_dependencies()) {
      /* normal case */
      if (const auto* comm = dynamic_cast<sg4::Comm*>(parent.get())) {
        const auto* source = comm->get_source();
        XBT_DEBUG("transfer from %s to %s", source->get_cname(), host->get_cname());
        /* Estimate the redistribution time from this parent */
        double redist_time;
        if (comm->get_remaining() <= 1e-6) {
          redist_time = 0;
        } else {
          double bandwidth      = std::numeric_limits<double>::max();
          auto [links, latency] = source->route_to(host);
          for (auto const& link : links)
            bandwidth = std::min(bandwidth, link->get_bandwidth());

          redist_time = latency + comm->get_remaining() / bandwidth;
        }
        // We use the user data field to store the finish time of the predecessor of the comm, i.e., its potential
        // start time
        data_available = *comm->get_data<double>() + redist_time;
      }

      /* no transfer, control dependency */
      if (const auto* parent_exec = dynamic_cast<sg4::Exec*>(parent.get()))
        data_available = parent_exec->get_finish_time();

      if (last_data_available < data_available)
        last_data_available = data_available;
    }

    double finish_time = std::max(*host->get_data<double>(), last_data_available) +
                         exec->get_remaining() / host->get_speed();

    XBT_DEBUG("%s finishes on %s at %f", exec->get_cname(), host->get_cname(), finish_time);

    if (finish_time < *min_finish_time) {
      *min_finish_time = finish_time;
      best_host        = host;
    }
  }

  return best_host;
}

Schedule a Task

When the best host has been found, the task is scheduled on it:

  • it sets the host of the task to schedule

  • it stores the finish time of this task on the host

  • it sets the destination of parents communication

  • it sets the source of any child communication.

static void schedule_on(sg4::ExecPtr exec, sg4::Host* host, double busy_until = 0.0)
{
  exec->set_host(host);
  // We use the user data field to store up to when the host is busy
  delete host->get_data<double>(); // In case we're erasing a previous value
  host->set_data(new double(busy_until));
  // we can also set the destination of all the input comms of this exec
  for (const auto& pred : exec->get_dependencies()) {
    auto* comm = dynamic_cast<sg4::Comm*>(pred.get());
    if (comm != nullptr) {
      comm->set_destination(host);
      delete comm->get_data<double>();
    }
  }
  // we can also set the source of all the output comms of this exec
  for (const auto& succ : exec->get_successors()) {
    auto* comm = dynamic_cast<sg4::Comm*>(succ.get());
    if (comm != nullptr)
      comm->set_source(host);
  }
}

Mixing it all Together

Now that we have the key components of the algorithm let’s merge them inside the main function.

int main(int argc, char** argv)
{
...

First, we initialize the SimGrid Engine.

sg4::Engine e(&argc, argv);

The Min-Min algorithm schedules unscheduled tasks. To keep track of them we make use of the method simgrid::s4u::Engine::track_vetoed_activities().

std::set<sg4::Activity*> vetoed;
e.track_vetoed_activities(&vetoed);

We add the following callback that will be triggered at the end of execution activities. This callback stores the finish time of the execution, to use it as a start time for any subsequent communications.

sg4::Activity::on_completion_cb([](sg4::Activity const& activity) {
  // when an Exec completes, we need to set the potential start time of all its ouput comms
  const auto* exec = dynamic_cast<sg4::Exec const*>(&activity);
  if (exec == nullptr) // Only Execs are concerned here
    return;
  for (const auto& succ : exec->get_successors()) {
    auto* comm = dynamic_cast<sg4::Comm*>(succ.get());
    if (comm != nullptr) {
      auto* finish_time = new double(exec->get_finish_time());
      // We use the user data field to store the finish time of the predecessor of the comm, i.e., its potential start
      // time
      comm->set_data(finish_time);
    }
  }
});

We load the platform and force sequential execution on hosts.

 e.load_platform(argv[1]);

/* Mark all hosts as sequential, as it ought to be in such a scheduling example.
 *
 * It means that the hosts can only compute one thing at a given time. If an execution already takes place on a given
 * host, any subsequently started execution will be queued until after the first execution terminates */
for (auto const& host : e.get_all_hosts()) {
  host->set_concurrency_limit(1);
  host->set_data(new double(0.0));
}

The tasks are imported from a DAX file.

/* load the DAX file */
auto dax = sg4::create_DAG_from_DAX(argv[2]);

We look for the best host for the root task and schedule it. We then advance the simulation to unlock next schedulable tasks.

/* Schedule the root first */
double finish_time;
auto* root = static_cast<sg4::Exec*>(dax.front().get());
auto host  = get_best_host(root, &finish_time);
schedule_on(root, host);
e.run();

Then, we get to the major loop of the algorithm. This loop goes on until all tasks have been scheduled and executed. It starts by finding ready tasks using get_ready_tasks. It iteratively looks for the task that will finish first among ready tasks using get_best_host, and place it using schedule_on. When no more tasks can be placed, we advance the simulation.

while (not vetoed.empty()) {
  XBT_DEBUG("Start new scheduling round");
  /* Get the set of ready tasks */
  auto ready_tasks = get_ready_tasks(dax);
  vetoed.clear();

  if (ready_tasks.empty()) {
    /* there is no ready exec, let advance the simulation */
    e.run();
    continue;
  }
  /* For each ready exec:
   * get the host that minimizes the completion time.
   * select the exec that has the minimum completion time on its best host.
   */
  double min_finish_time   = std::numeric_limits<double>::max();
  sg4::Exec* selected_task = nullptr;
  sg4::Host* selected_host = nullptr;

  for (auto exec : ready_tasks) {
    XBT_DEBUG("%s is ready", exec->get_cname());
    double finish_time;
    host = get_best_host(exec, &finish_time);
    if (finish_time < min_finish_time) {
      min_finish_time = finish_time;
      selected_task   = exec;
      selected_host   = host;
    }
  }

  XBT_INFO("Schedule %s on %s", selected_task->get_cname(), selected_host->get_cname());
  schedule_on(selected_task, selected_host, min_finish_time);

  ready_tasks.clear();
  e.run();
}

Finally, we clean up the memory.

/* Cleanup memory */
for (auto const& h : e.get_all_hosts())
  delete h->get_data<double>();