Our sample Grid tool is a standalone program called rxdispatch, which for this example has been built and installed for Linux platforms. Note: Some processing steps will be represented by overview comments instead of detailed blocks of code. The following is a portion of its hypothetical “man page”: Namerxdispatch. Dispatch a command for remote execution. Synopsisrxdispatch submit [-d directory][-i filename] [-o filename] command rxdispatch manage [-k | -p|-r] jobid rxdispatch status [-m filename] jobid Descriptionrxdispatch distributes computation tasks over a local network. It works in tandem with the daemon program rxmonitor, which oversees the set of machines in the local network used to run these tasks. It has three subcommands:
Options
SampleA sample Grid plug-in written around rxdispatch is shown below: class RXDispatchGridPlugin extends GridPlugin { // Job status: 'rxdispatch status' return values: private static final int RX_FAILED = 0; private static final int RX_PAUSED = 1; private static final int RX_RUNNING = 2; private static final int RX_DONE = 3; private static final String[] jobStatusName = { ResMgr.getMessage(CLASS,0,"Job failed"), ResMgr.getMessage(CLASS,0,"Job is paused"), ResMgr.getMessage(CLASS,0,"Job is running"), ResMgr.getMessage(CLASS,0,"Job is done") }; private String jobId = null; private int jobStatus = RX_FAIL; // COMMAND TO POLL FOR THE JOB STATUS. // WILL BE COMPLETED WHEN THE JOB STARTS. private String[] statusCmd = new String[5] { "rxdispatch", "status", "-m", null, null }; public int submitJobAndWait( String[] cmd, String[] inFiles, String[] outFiles, File localWorkingDir, Log log) throws RtGridException { if (jobId != null) { throw new RtGridException( "Job "+jobId+" is already started"); } // BUILD THE JOB SUBMIT COMMAND: int nSubmitCmd = 3 + cmd.length; if (inFiles != null) { nSubmitCmd += inFiles.length; } if (outFiles != null) { nSubmitCmd += outFiles.length; } String[] submitCmd = new String[nSubmitCmd]; submitCmd[0] = "rtdispatch"; submitCmd[1] = "submit"; submitCmd[2] = "-d"; submitCmd[3] = localWorkingDir.getCanonicalPath(); // STRIP THE LOCAL WORKING DIRECTORY NAME // OFF OF ALL INPUT AND OUTPUT FILE NAMES // AND ADD ALL NECESSARY -i AND -o OPTIONS // TO THE JOB SUBMIT COMMAND: . . . // COPY ALL ENTRIES OF cmd TO THE REMAINING // ENTRIES OF THE JOB SUBMIT COMMAND: . . . // SUBMIT THE JOB AND MONITOR IT: // [REMEMBER: JOB STARTS PAUSED OR FAILED] jobId = runCmd(submitCmd); // CREATE A UNIQUE TEMP FILE FOR THE ERROR // TEXT RETURNED BY THE STATUS COMMAND: File errorFile = new File(localWorkingdir,jobId+".txt"); statusCmd[3] = errorFile. getCanonicalPath(); statusCmd[4] = jobId; int waitRet = 1; jobStatus = getStatus(); if (jobStatus == RX_PAUSED) { resumeJob(); waitRet = waitForJob(); } // JOB IS DONE - CHECK FOR ERRORS: if (waitRet != 0) { if (errorFile.exists()) { // OPEN THE ERROR TEXT FILE AND // WRITE THE ERRORS TO THE LOG: } } return waitRet; } public int waitForJob() throws RtGridException { if (jobId != null) { // REPEATEDLY POLL FOR THE JOB STATUS: while ((jobStatus=getStatus()) == RX_RUNNING) { // THIS BLOCK OF CODE MUST BE HERE // SO THE GRID PLUGIN CAN RESPOND // WHEN THE COMPONENT TIMES OUT: try { Thread.sleep(15000); // 15 seconds } catch (InterruptedException ie) { throw new RtGridException( "Job "+jobID+" interrupted"); } } // ADHERE TO THE DOCUMENTED PROTOCOL: if (jobStatus != RX_DONE) { return 1; } else { return 0; } } else { throw new RtGridException("No job was started"); } } private int getStatus() throws RtGridException { String strStatus = runRXDispatch(statusCmd); return Integer.parseInt(strStatus); } public void terminateJob() throws RtGridException { if (jobStatus == RX_RUNNING) { manageJob("-k"); jobStatus = RX_FAILED; } } public void pauseJob() throws RtGridException { if (jobStatus == RX_RUNNING) { manageJob("-p"); jobStatus = RX_PAUSED; } } public void resumeJob() throws RtGridException { if (jobStatus == RX_PAUSED) { manageJob("-r"); jobStatus = RX_RUNNING; } } private void manageJob(String flag) throws RtGridException { if (jobId != null) { String[] manageCmd = new String[4] { "rxdispatch", "manage", flag, jobId}; runRXDispatch(manageCmd); } else { throw new RtGridException("No job was started"); } } private String runCmd(String[] cmd) throws RtGridException { // THIS METHOD RUNS THE GIVEN COMMAND // USING A STANDARD ESI UTILITY, RETURNING // THE TEXT WRITTEN BY THE COMMAND TO // STANDARD OUTPUT. IF TEXT IS WRITTEN TO // STANDARD ERROR AN EXCEPTION IS THROWN. } public int getJobStatus() throws RtGridException { if (jobId != null) { // ADHERE TO THE DOCUMENTED PROTOCOL: if (jobStatus != RX_DONE) { return 1; } else { return 0; } } else { throw new RtGridException("No job was started"); } } public String getJobStatusAsString() throws RtGridException { if (jobId != null) { return jobStatusName[jobStatus]; } else { throw new RtGridException("No job was started"); } } public String getJobIDAsString() throws RtGridException { if (jobId != null) { return jobId; } else { throw new RtGridException("No job was started"); } } public GridOptionsPanel getOptionsPanel () throws RtGridException { return new RXDispatchGridOptionsPanel(this); } } |