Grid Tool Example

This section describes the commands of a sample Grid tool and shows source code for a GridPlugin class that uses this tool (the GridOptionsPanel class is not included). It also provides an example of how the Java code for your Grid plug-in might appear.

Our sample Grid tool is a standalone program called rxdispatch, which for this example has been built and installed for Linux platforms.

Note: Some processing steps will be represented by overview comments instead of detailed blocks of code. The following is a portion of its hypothetical “man page”:

Name

rxdispatch. Dispatch a command for remote execution.

Synopsis

rxdispatch submit [-d directory][-i filename] [-o filename] command
rxdispatch manage [-k | -p|-r] jobid
rxdispatch status [-m filename] jobid

Description

rxdispatch distributes computation tasks over a local network. It works in tandem with the daemon program rxmonitor, which oversees the set of machines in the local network used to run these tasks. It has three subcommands:

  • submit command. This command directs rxmonitor to select an available machine, allocate a working directory, create a job for command, and start a process to run command on that machine in that directory, where command is the sequence of command line tokens following the last flag valid for this subcommand. If the job process cannot be started, the job is put into the “FAIL” state; otherwise, the process is initially paused, and the job is put into the “PAUSE” state. Returns a number generated by rxmonitor uniquely identifying the job.

  • manage jobid. This command directs rxmonitor to change the run state of job jobid according to the specified flag; if no flag is specified, nothing is done.

  • status jobid. This command directs rxmonitor to return the run state of job jobid. Run states are encoded numerically as follows:

    • 0 == FAIL
    • 1 == PAUSE
    • 2 == RUNNING
    • 3 == DONE

    rxmonitor retains a record of the status of each job after the job has stopped. This record is deleted on the next call of rxdispatch status jobid or one hour after the job has stopped, whichever comes first. Calling rxdispatch status on an unknown job returns “FAIL.”

Options

-d directory

Identifies the directory associated with command. Before command runs, this directory must contain a copy of each file named with a -i option; after command runs, it will contain a copy of each file named with a -o option unless command failed. If not specified, the current directory is assumed.

-i filename

Identifies an input file associated with command. Before command runs, this file will be copied into the job’s working directory. Zero or more instances of this option may appear.

-o filename

Identifies an output file associated with command. After command runs, this file will be copied out of the job’s working directory. Zero or more instances of this option may appear.

-k

If the process for job jobid has not yet stopped, it is halted, and job jobid is put into the “FAIL” state.

-p

If the process for job jobid has not yet stopped and is not in the “PAUSE” state, it is stopped in such a manner as to be “restartable,” and job jobid is put into the “PAUSE” state; otherwise, nothing is done.

Note: The job process is put into the “PAUSE” state when it is started and, thus, must be resumed.

-r

If the process for job jobid is in the “PAUSE” state, it is restarted; otherwise, nothing is done.

Note: The job process is put into the “PAUSE” state when it is started and, thus, must be resumed.

-m filename

If this option is used and the job status is “FAIL,” any error text generated by rxmonitor or by command is written to the named file; otherwise, nothing is done. If filename is not absolute, the file is written relative to the current directory.

Sample

A sample Grid plug-in written around rxdispatch is shown below:

class RXDispatchGridPlugin
      extends GridPlugin {
   // Job status: 'rxdispatch status' return values:
   private static final int RX_FAILED = 0;
   private static final int RX_PAUSED = 1;
   private static final int RX_RUNNING = 2;
   private static final int RX_DONE = 3;
   private static final String[] jobStatusName = {
      ResMgr.getMessage(CLASS,0,"Job failed"),
      ResMgr.getMessage(CLASS,0,"Job is paused"),
      ResMgr.getMessage(CLASS,0,"Job is running"),
      ResMgr.getMessage(CLASS,0,"Job is done")
   };
   private String jobId = null;
   private int jobStatus = RX_FAIL;
   // COMMAND TO POLL FOR THE JOB STATUS.
   // WILL BE COMPLETED WHEN THE JOB STARTS.
   private String[] statusCmd = new String[5] {
      "rxdispatch", "status", "-m", null, null
   };
   public int submitJobAndWait(
      String[] cmd,
      String[] inFiles,
      String[] outFiles,
      File localWorkingDir,
      Log log)
      throws RtGridException {
         if (jobId != null) {
            throw new RtGridException(
            "Job "+jobId+" is already started");
         }
         // BUILD THE JOB SUBMIT COMMAND:
         int nSubmitCmd = 3 + cmd.length;
         if (inFiles != null) {
            nSubmitCmd += inFiles.length;
         }
         if (outFiles != null) {
            nSubmitCmd += outFiles.length;
         }

         String[] submitCmd = new String[nSubmitCmd];
         submitCmd[0] = "rtdispatch";
         submitCmd[1] = "submit";
         submitCmd[2] = "-d";
         submitCmd[3] = localWorkingDir.getCanonicalPath();

         // STRIP THE LOCAL WORKING DIRECTORY NAME
         // OFF OF ALL INPUT AND OUTPUT FILE NAMES
         // AND ADD ALL NECESSARY -i AND -o OPTIONS
         // TO THE JOB SUBMIT COMMAND:
         . . .

         // COPY ALL ENTRIES OF cmd TO THE REMAINING
         // ENTRIES OF THE JOB SUBMIT COMMAND:
         . . .

         // SUBMIT THE JOB AND MONITOR IT:
         // [REMEMBER: JOB STARTS PAUSED OR FAILED]
         jobId = runCmd(submitCmd);

         // CREATE A UNIQUE TEMP FILE FOR THE ERROR
         // TEXT RETURNED BY THE STATUS COMMAND:
         File errorFile = new File(localWorkingdir,jobId+".txt");
         statusCmd[3] = errorFile. getCanonicalPath();
         statusCmd[4] = jobId;

         int waitRet = 1;
         jobStatus = getStatus();
         if (jobStatus == RX_PAUSED) {
            resumeJob();
            waitRet = waitForJob();
         }

         // JOB IS DONE - CHECK FOR ERRORS:
         if (waitRet != 0) {
            if (errorFile.exists()) {
               // OPEN THE ERROR TEXT FILE AND
               // WRITE THE ERRORS TO THE LOG:
         }         
      }

      return waitRet;
   }
   public int waitForJob()
   throws RtGridException {

      if (jobId != null) {

      // REPEATEDLY POLL FOR THE JOB STATUS:
         while ((jobStatus=getStatus()) == RX_RUNNING) {

            // THIS BLOCK OF CODE MUST BE HERE
            // SO THE GRID PLUGIN CAN RESPOND
            // WHEN THE COMPONENT TIMES OUT:
            try {
               Thread.sleep(15000);  // 15 seconds
            }
            catch (InterruptedException ie) {
               throw new RtGridException(
               "Job "+jobID+" interrupted");
            }
      }

      // ADHERE TO THE DOCUMENTED PROTOCOL:
         if (jobStatus != RX_DONE) {
            return 1;
         }
         else {
            return 0;
         }
      }
      else {
         throw new RtGridException("No job was started");
      }
   }
   private int getStatus()
   throws RtGridException {
   String strStatus = runRXDispatch(statusCmd);
   return Integer.parseInt(strStatus);
}

   public void terminateJob()
   throws RtGridException {
      if (jobStatus == RX_RUNNING)  {
         manageJob("-k");
         jobStatus = RX_FAILED;
      }
   }
   public void pauseJob()
   throws RtGridException {
      if (jobStatus == RX_RUNNING)  {
         manageJob("-p");
         jobStatus = RX_PAUSED;
      }
   }
   public void resumeJob()
   throws RtGridException {
      if (jobStatus == RX_PAUSED) {
         manageJob("-r");
         jobStatus = RX_RUNNING;
      }
   }
   private void manageJob(String flag)
   throws RtGridException {
      if (jobId != null) {
         String[] manageCmd = new String[4] {
         "rxdispatch", "manage",  flag, jobId};
         runRXDispatch(manageCmd);
      }
      else {
         throw new RtGridException("No job was started");
      }
   }
   private String runCmd(String[] cmd)
   throws RtGridException {
      // THIS METHOD RUNS THE GIVEN COMMAND
      // USING A STANDARD ESI UTILITY, RETURNING
      // THE TEXT WRITTEN BY THE COMMAND TO
      // STANDARD OUTPUT.  IF TEXT IS WRITTEN TO
      // STANDARD ERROR AN EXCEPTION IS THROWN.
   }
   public int getJobStatus()
   throws RtGridException {
      if (jobId != null) {
         // ADHERE TO THE DOCUMENTED PROTOCOL:
         if (jobStatus != RX_DONE) {
            return 1;
         }
         else {
            return 0;
         }
      }
      else {
         throw new RtGridException("No job was started");
      }
   }
   public String getJobStatusAsString()
   throws RtGridException {
      if (jobId != null) {
         return jobStatusName[jobStatus];
      }
      else {
         throw new RtGridException("No job was started");
      }
   }
   public String getJobIDAsString()
   throws RtGridException {
      if (jobId != null) {
         return jobId;
      }
      else {
         throw new RtGridException("No job was started");
      }
   }
   public GridOptionsPanel getOptionsPanel ()
   throws RtGridException {
      return new RXDispatchGridOptionsPanel(this);
   }
}