staging.SunGridEngine — staging class for SunGridEngine

Primitive framework for staging jobs in Sun Grid Engine via a customized Job class.

Example python submission script

Write the SGE script like this:

#!/usr/bin/env python
#$ -N bulk
#$ -S /usr/bin/python
#$ -v PYTHONPATH=/home/users/oliver/Library/python-lib
#$ -v LD_LIBRARY_PATH=/opt/intel/cmkl/8.0/lib/32:/opt/intel/itc60/slib:/opt/intel/ipp41/ia32_itanium/sharedlib:/opt/intel/ipp41/ia32_itanium/sharedlib/linux32:/opt/intel/fc/9.0/lib:/opt/intel/cc/9.0/lib
#$ -r n
#$ -j y
# The next line is IMPORTANT when you are using the default for Job(startdir=None)
#$ -cwd

from staging.SunGridEngine import Job

job = Job(inputfiles=dict(psf = 'inp/crbp_apo.psf',
                          dcd = 'trj/rmsfit_1opa_salt_ewald_shake_10ang_prod.dcd'),
          outputfiles=dict(dx = '*.dx', pickle = '*.pickle'),
          variables=dict(normalize = True, ...))

job.stage()
F = job.filenames  # use F[key] to reference filenames from inputfiles or outputfiles
V = job.variables  # and V[key] for the variables

# your python script here...
print "psf: %(psf)s  dcd: %(dcd)" % F
print "normalize = %(normalize)s" % V


job.unstage()
job.cleanup()   # removes stage dir, careful!

Description of the Job class

class staging.SunGridEngine.Job(*args, **kwargs)

The Job class encapsulates the SGE job and allows for clean staging and unstaging.

Set up the Job:

job = Job(inputfiles=dict(...),outputfiles=dict(...),variables=dict(...),**kwargs)

inputfiles and outputfiles are dictionaries with arbitrary keys; each item is a path to a file relative to the startdir (which by default is the directory from which the SGE job starts — use the #$ -cwd flag!). If the files are not relative to the start dir then new directories are constructed under the stage dir; in this instance it uis important that the user script only uses the filenames in Job.filenames: These have the proper paths of the local (staged) files for the script to operate on.

With

job.stage()

inputfiles are copied to the stagedir on the node’s scratch dir and sub directories are created as necessary; directories mentioned as part of the outputfiles are created, too.

job.unstage()

copies back all files mentioned in output files (again, use directories as part of the path as necessary) and create the directories in the startdir if needed. For the outputfiles one can also use shell-style glob patterns, e.g. outfiles = {'all_dcd': '*.dcd', 'last_data':'*[5-9].dat'}

Sensible defaults are automatically selected for startdir (cwd) and stagedir (/scratch/USER/JOB_NAME.JOB_ID).

If the script is not run through SGE (i.e. the environment variable JOB_NAME is not set) then the script is run without staging; this is pretty much equivalent to using

from staging.Local import Job

(i.e. using the staging.Local.Job class).

Attributes :
input

inputfiles dict (relative to startdir or absolute)

output

outputfiles dict (relative to startdir or absolute, can contain globs)

filenames

merged dict of input and output, pointing to staged files

variables

variables dict

Methods :
stage()

setup job on the nodes in stagedir

unstage()

retrieve results to startdir

cleanup()

remove all files on the node (rm -rf stagedir)

Set up SGE job.

Arguments :
inputfiles

dict of input files (with relative path to startdir); globs are not supported.

outputfiles

dict of result files or glob patterns (relative to stagedir == relative to startdir)

variables

key/value pairs that can be used in the script as Job.variables[key]

startdir

path to the directory where the input can be found (must be nfs-mounted on node)

stagedir

local scratch directory on node; all input files are copied there. The default should be ok.

JOB_NAME

unique identifier (only set this if this NOT submitted through the Gridengine queuing system AND if the files should be copied to a scratch disk (i.e. staging proceeds as it would for a SGE-submitted job).)

SGE_TASK_ID

fake a task id (use with JOB_NAME)

cleanup()

Remove stage dir

save(filename)

Save the Job() as a pickled file.

Restore with

import staging.SunGridengine
import cPickle
job = cPickle.load(open(<filename>,'r'))
stage()

Copy all input files to the scratch directory.

unstage()

Copy results back. Shell-style glob patterns are allowed.

Helper functions for building job arrays

staging.SunGridEngine.getline_from_arraylist(filename=None, ENVNAME='ARRAYLIST', default='arraylist.txt')

Read a list of values from filename and return the line that corresponds to the current SGE_TASK_ID.

line = get_line_from_arraylist(filename=None,ENVNAME=’ARRAYLIST’,default=”arraylist.txt”)

fields will be different depending on the value of SGE_TASK_ID (set by SunGridengine). The lines are simply numbered consecutively.

Arguments :
filename

name of the arraylist file

ENVNAME

try to get filename from environment variable if filename is not set

default

if all fails, try this as a default filename

File format:

# comment lines are ignored as are whitespace lines
# only the first column is read; the internal numbering starts at 1
line1 ...   <---- task id 1
line2 ...   <---- task id 2
# more comments, they are NOT counted for the task id
line3 ...   <---- task id 3      
...

Ignores white space lines and lines starting with #. Lines are stripped of left and right white space.

staging.SunGridEngine.get_fields_from_arraylist(**kwargs)

Read a list of values from filename and return the line that corresponds to the current SGE_TASK_ID.

get_line_from_arraylist(filename=None,ENVNAME=’ARRAYLIST’,default=”arraylist.txt”) -> fields

fields will be different depending on the value of SGE_TASK_ID (set by SunGridengine). The lines are simply numbered consecutively.

See getline_from_arraylist() for more details.

staging.SunGridEngine.get_value_from_arraylist(index=0, **kwargs)

Get field[index] of the entry in the array list corresponding to SGE_TASK_ID.

See get_fields_from_arraylist() for details.