Task scheduling in MPI

I wrote this simple parallel task-scheduling system in C++/MPI, following discussions with Juha Jäyykä. It seems to work most of the time, but fails if jobs finish so quickly or so close to each other that a node does not have the change to update nexttask_ind.

I can’t work out the exact failure criterion, and would welcome any other advice with the code.

/* A simple task scheduler in C++ and MPI. 
 * Usage: 
 *   $ mpic++ taskscheduler.cpp -o taskscheduler 
 *   $ mpirun -np 4 ./taskscheduler Ntasks
 */
// see http://stackoverflow.com/questions/11180624/mpi-task-scheduling
// and
// http://stackoverflow.com/questions/12810391/mpi-asynchronous-broadcast-gather#12810617
#include<iostream>
#include<mpi.h>
#include<unistd.h>
#include<assert.h>
#include<time.h>
#include<stdlib.h>

int main (int argc, char** argv) {
    MPI_Init (&argc, &argv);      /* starts MPI */
    MPI_Request mpireq;
    MPI_Status* mpistatus;

    assert(argc > 1);
    int Ntasks = atoi(argv[1]);
    
    /* get number of nodes */
    int nnodes; 
    MPI_Comm_size (MPI_COMM_WORLD, &nnodes);        
    /* get my node rank */
    int rank; 
    MPI_Comm_rank (MPI_COMM_WORLD, &rank); 
    /* get my node hostname */
    char hostname[MPI_MAX_PROCESSOR_NAME];
    int name_len;
    MPI_Get_processor_name(hostname, &name_len);

    /* This is the task that we are currently working on. */
    int mytask_ind;
    /* This buffer stores the next task that needs to be done. */
    int nexttask_ind = nnodes;

    /* We start off by giving task i to node i. */
    mytask_ind = rank;
    
    srandom(rank);
    
    printf("Node %d signing on.\n", rank);
    while (true)
    {
        /* Do our task, if there is any more work to do. */
        if (mytask_ind >= Ntasks) 
            break;

        /* Do our task. */

        printf("Node rank %d on %s is now doing task %d.\n", 
                rank, hostname, mytask_ind);
        /* Whatever we're doing takes a while... */
        int usec = 0;
        while(true)
        {
            if (random() % 100000 == 0) 
                break;

            usec++;
            usleep(1);

            /* Every now and then, check for updates to nexttask_ind from other
             * nodes. */
            if (usec % 100 == 0)
                for (int node = 0; node < nnodes; node++)
                    if (node != rank)
                    {
                        int flag = 0;
                        MPI_Iprobe(node, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, mpistatus);
                        if (flag)
                        {
                            MPI_Irecv( &nexttask_ind, 1, MPI_INT, node, MPI_ANY_TAG, MPI_COMM_WORLD, &mpireq );
                        }
                    }
        }


        /* Advertise the fact that we have started work on this task, by
         * updating nexttask_ind. */
        mytask_ind = nexttask_ind;
        nexttask_ind++;
        for (int node = 0; node < nnodes; node++)
            if (node != rank)
                MPI_Isend( &nexttask_ind, 1, MPI_INT, node, 0, MPI_COMM_WORLD, &mpireq);

    }

    MPI_Finalize();
    return 0;
}

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.