Package gromacs :: Module qsub
[hide private]
[frames] | no frames]

Source Code for Module gromacs.qsub

  1  # qsub -- utilities for batch submission systems 
  2  # Copyright (c) 2010 Oliver Beckstein <orbeckst@gmail.com> 
  3  # Made available under GNU Pulic License v3. 
  4   
  5  """ 
  6  :mod:`gromacs.qsub` -- utilities for batch submission systems 
  7  ============================================================= 
  8   
  9  The module helps writing submission scripts for various batch submission 
 10  queuing systems. The known ones are listed stored as 
 11  :class:`~gromacs.qsub.QueuingSystem` instances in 
 12  :data:`~gromacs.qsub.queuing_systems`; append new ones to this list. 
 13   
 14  The working paradigm is that template scripts are provided (see 
 15  :data:`gromacs.config.templates`) and only a few place holders are substituted 
 16  (using :func:`gromacs.cbook.edit_txt`). 
 17   
 18  *User-supplied template scripts* can be stored in 
 19  :data:`gromacs.config.qscriptdir` (by default ``~/.gromacswrapper/qscripts``) 
 20  and they will be picked up before the package-supplied ones. 
 21   
 22  The :class:`~gromacs.qsub.Manager` handles setup and control of jobs 
 23  in a queuing system on a remote system via :program:`ssh`. 
 24   
 25  At the moment, some of the functions in :mod:`gromacs.setup` use this module 
 26  but it is fairly independent and could conceivably be used for a wider range of 
 27  projects. 
 28   
 29   
 30  Queuing system templates 
 31  ------------------------ 
 32   
 33  The queuing system scripts are highly specific and you will need to add 
 34  your own.  Templates should be shell scripts. Some parts of the 
 35  templates are modified by the 
 36  :func:`~gromacs.qsub.generate_submit_scripts` function. The "place 
 37  holders" that can be replaced are shown in the table below. Typically, 
 38  the place holders are either shell variable assignments or batch 
 39  submission system commands. The table shows SGE_ commands but PBS_ and 
 40  LoadLeveler_ have similar constructs; e.g. PBS commands start with 
 41  ``#PBS`` and LoadLeveller uses ``#@`` with its own command keywords). 
 42   
 43  .. Table:: Substitutions in queuing system templates. 
 44   
 45     ===============  ===========  ================ ================= ===================================== 
 46     place holder     default      replacement      description       regex 
 47     ===============  ===========  ================ ================= ===================================== 
 48     #$ -N            GMX_MD       *sgename*        job name          /^#.*(-N|job_name)/ 
 49     #$ -l walltime=  00:20:00     *walltime*       max run time      /^#.*(-l walltime|wall_clock_limit)/ 
 50     #$ -A            BUDGET       *budget*         account           /^#.*(-A|account_no)/ 
 51     DEFFNM=          md           *deffnm*         default gmx name  /^DEFFNM=/ 
 52     WALL_HOURS=      0.33         *walltime* h     mdrun's -maxh     /^WALL_HOURS=/ 
 53     MDRUN_OPTS=      ""           *mdrun_opts*     more options      /^MDRUN_OPTS=/ 
 54     ===============  ===========  ================ ================= ===================================== 
 55   
 56  Lines with place holders should not have any white space at the beginning. The 
 57  regular expression pattern ("regex") is used to find the lines for the 
 58  replacement and the literal default values ("default") are replaced. Not all 
 59  place holders have to occur in a template; for instance, if a queue has no run 
 60  time limitation then one would probably not include *walltime* and *WALL_HOURS* 
 61  place holders. 
 62   
 63  The line ``# JOB_ARRAY_PLACEHOLDER`` can be replaced by 
 64  :func:`~gromacs.qsub.generate_submit_array` to produce a "job array" 
 65  (also known as a "task array") script that runs a large number of 
 66  related simulations under the control of a single queuing system 
 67  job. The individual array tasks are run from different sub 
 68  directories. Only queuing system scripts that are using the 
 69  :program:`bash` shell are supported for job arrays at the moment. 
 70   
 71  A queuing system script *must* have the appropriate suffix to be properly 
 72  recognized, as shown in the table below. 
 73   
 74  .. Table:: Suffices for queuing system templates. Pure shell-scripts are only used to run locally. 
 75   
 76     ==============================  ===========  =========================== 
 77     Queuing system                  suffix       notes 
 78     ==============================  ===========  =========================== 
 79     Sun Gridengine                  .sge         Sun's `Sun Gridengine`_ 
 80     Portable Batch queuing system   .pbs         OpenPBS_ and `PBS Pro`_ 
 81     LoadLeveler                     .ll          IBM's `LoadLeveler`_ 
 82     bash script                     .bash, .sh   `Advanced bash scripting`_ 
 83     csh script                      .csh         avoid_ csh_ 
 84     ==============================  ===========  =========================== 
 85   
 86  .. _OpenPBS: http://www.mcs.anl.gov/research/projects/openpbs/ 
 87  .. _PBS: OpenPBS_ 
 88  .. _PBS Pro: http://www.pbsworks.com/Product.aspx?id=1 
 89  .. _Sun Gridengine: http://gridengine.sunsource.net/ 
 90  .. _SGE: Sun Gridengine_ 
 91  .. _LoadLeveler: http://www-03.ibm.com/systems/software/loadleveler/index.html 
 92  .. _Advanced bash scripting: http://tldp.org/LDP/abs/html/ 
 93  .. _avoid: http://www.grymoire.com/Unix/CshTop10.txt 
 94  .. _csh: http://www.faqs.org/faqs/unix-faq/shell/csh-whynot/ 
 95   
 96   
 97  Example queuing system script template for PBS 
 98  ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
 99   
100  The following script is a usable PBS_ script for a super computer. It 
101  contains almost all of the replacement tokens listed in the table 
102  (indicated by ++++++; these values should be kept in the template as 
103  they are or they will not be subject to replacement). :: 
104   
105     #!/bin/bash 
106     # File name: ~/.gromacswrapper/qscripts/supercomputer.somewhere.fr_64core.pbs 
107     #PBS -N GMX_MD 
108     #       ++++++ 
109     #PBS -j oe 
110     #PBS -l select=8:ncpus=8:mpiprocs=8 
111     #PBS -l walltime=00:20:00 
112     #                ++++++++ 
113   
114     # host: supercomputer.somewhere.fr 
115     # queuing system: PBS 
116   
117     # set this to the same value as walltime; mdrun will stop cleanly 
118     # at 0.99 * WALL_HOURS  
119     WALL_HOURS=0.33 
120     #          ++++ 
121   
122     # deffnm line is possibly modified by gromacs.setup 
123     # (leave it as it is in the template) 
124     DEFFNM=md 
125     #      ++ 
126   
127     TPR=${DEFFNM}.tpr 
128     OUTPUT=${DEFFNM}.out 
129     PDB=${DEFFNM}.pdb 
130   
131     MDRUN_OPTS="" 
132     #          ++ 
133   
134     # If you always want to add additional MDRUN options in this script then 
135     # you can either do this directly in the mdrun commandline below or by 
136     # constructs such as the following: 
137     ## MDRUN_OPTS="-npme 24 $MDRUN_OPTS" 
138   
139     # JOB_ARRAY_PLACEHOLDER 
140     #++++++++++++++++++++++   leave the full commented line intact! 
141   
142     # avoids some failures 
143     export MPI_GROUP_MAX=1024 
144     # use hard coded path for time being 
145     GMXBIN="/opt/software/SGI/gromacs/4.0.3/bin" 
146     MPIRUN=/usr/pbs/bin/mpiexec 
147     APPLICATION=$GMXBIN/mdrun_mpi 
148      
149     $MPIRUN $APPLICATION -stepout 1000 -deffnm ${DEFFNM} -s ${TPR} -c ${PDB} -cpi \ 
150                          $MDRUN_OPTS \ 
151                          -maxh ${WALL_HOURS} > $OUTPUT 
152     rc=$? 
153   
154     # dependent jobs will only start if rc == 0 
155     exit $rc 
156   
157  Save the above script in ``~/.gromacswrapper/qscripts`` under the name 
158  ``supercomputer.somewhere.fr_64core.pbs``. This will make the script 
159  immediately usable. For example, in order to set up a production MD run with 
160  :func:`gromacs.setup.MD` for this super computer one would use :: 
161   
162     gromacs.setup.MD(..., qscripts=['supercomputer.somewhere.fr_64core.pbs', 'local.sh']) 
163   
164  This will generate submission scripts based on 
165  ``supercomputer.somewhere.fr_64core.pbs`` and also the default ``local.sh`` 
166  that is provided with *GromacsWrapper*. 
167   
168  In order to modify ``MDRUN_OPTS`` one would use the additonal *mdrun_opts* 
169  argument, for instance:: 
170   
171     gromacs.setup.MD(..., qscripts=['supercomputer.somewhere.fr_64core.pbs', 'local.sh'], 
172                      mdrun_opts="-v -npme 20 -dlb yes -nosum") 
173   
174   
175  Currently there is no good way to specify the number of processors when 
176  creating run scripts. You will need to provided scripts with different numbers 
177  of cores hard coded or set them when submitting the scripts with command line 
178  options to :program:`qsub`. 
179   
180   
181   
182  Classes and functions 
183  --------------------- 
184   
185  .. autoclass:: QueuingSystem 
186     :members: 
187  .. autofunction:: generate_submit_scripts 
188  .. autofunction:: generate_submit_array 
189  .. autofunction:: detect_queuing_system 
190   
191  .. autodata:: queuing_systems 
192   
193   
194   
195  Queuing system Manager 
196  ---------------------- 
197   
198  The :class:`Manager` class must be customized for each system such as 
199  a cluster or a super computer. It then allows submission and control of 
200  jobs remotely (using ssh_). 
201   
202  .. autoclass:: Manager 
203     :members: 
204     :exclude-members: job_done, qstat 
205   
206     .. autoattribute:: _hostname 
207     .. autoattribute:: _scratchdir 
208     .. autoattribute:: _qscript 
209     .. autoattribute:: _walltime 
210     .. method:: job_done 
211   
212                 alias for :meth:`get_status` 
213   
214     .. method:: qstat 
215   
216                 alias for :meth:`get_status` 
217   
218   
219  .. _ssh: http://www.openssh.com/ 
220  .. _~/.ssh/config: http://linux.die.net/man/5/ssh_config 
221  """ 
222   
223  import os, errno 
224  import warnings 
225  from subprocess import call, Popen, PIPE 
226  import shutil 
227  import re 
228  import glob 
229   
230  import gromacs.config 
231  import gromacs.cbook 
232  from gromacs.utilities import asiterable, Timedelta 
233  from gromacs import AutoCorrectionWarning 
234   
235  import logging 
236  logger = logging.getLogger('gromacs.qsub') 
237   
238  try: 
239      from os.path import relpath 
240  except ImportError: 
241      # appeared in python 2.6 
242 - def relpath(path, start=os.path.curdir):
243 """Return a relative version of a path (from posixpath 2.6)""" 244 245 if not path: 246 raise ValueError("no path specified") 247 248 start_list = os.path.abspath(start).split(os.path.sep) 249 path_list = os.path.abspath(path).split(os.path.sep) 250 251 # Work out how much of the filepath is shared by start and path. 252 i = len(os.path.commonprefix([start_list, path_list])) 253 254 rel_list = [os.path.pardir] * (len(start_list)-i) + path_list[i:] 255 if not rel_list: 256 return os.path.curdir 257 return os.path.join(*rel_list)
258 259
260 -class QueuingSystem(object):
261 """Class that represents minimum information about a batch submission system.""" 262
263 - def __init__(self, name, suffix, qsub_prefix, array_variable=None, array_option=None):
264 """Define a queuing system's functionality 265 266 :Arguments: 267 *name* 268 name of the queuing system, e.g. 'Sun Gridengine' 269 *suffix* 270 suffix of input files, e.g. 'sge' 271 *qsub_prefix* 272 prefix string that starts a qsub flag in a script, e.g. '#$' 273 274 :Keywords: 275 *array_variable* 276 environment variable exported for array jobs, e.g. 277 'SGE_TASK_ID' 278 *array_option* 279 qsub option format string to launch an array (e.g. '-t %d-%d') 280 281 """ 282 self.name = name 283 self.suffix = suffix 284 self.qsub_prefix = qsub_prefix 285 self.array_variable = array_variable 286 self.array_option = array_option
287
288 - def flag(self, *args):
289 """Return string for qsub flag *args* prefixed with appropriate inscript prefix.""" 290 return " ".join((self.qsub_prefix,)+args)
291
292 - def has_arrays(self):
293 """True if known how to do job arrays.""" 294 return not self.array_variable is None
295
296 - def array_flag(self, directories):
297 """Return string to embed the array launching option in the script.""" 298 return self.flag(self.array_option % (1,len(directories)))
299
300 - def array(self, directories):
301 """Return multiline string for simple array jobs over *directories*. 302 303 .. Warning:: The string is in ``bash`` and hence the template must also 304 be ``bash`` (and *not* ``csh`` or ``sh``). 305 """ 306 if not self.has_arrays(): 307 raise NotImplementedError('Not known how make array jobs for ' 308 'queuing system %(name)s' % vars(self)) 309 hrule = '#'+60*'-' 310 lines = [ 311 '', 312 hrule, 313 '# job array:', 314 self.array_flag(directories), 315 hrule, 316 '# directories for job tasks', 317 'declare -a jobdirs'] 318 for i,dirname in enumerate(asiterable(directories)): 319 idx = i+1 # job array indices are 1-based 320 lines.append('jobdirs[%(idx)d]=%(dirname)r' % vars()) 321 lines.extend([ 322 '# Switch to the current tasks directory:', 323 'wdir="${jobdirs[${%(array_variable)s}]}"' % vars(self), 324 'cd "$wdir" || { echo "ERROR: failed to enter $wdir."; exit 1; }', 325 hrule, 326 '' 327 ]) 328 return "\n".join(lines)
329
330 - def isMine(self, scriptname):
331 """Primitive queuing system detection; only looks at suffix at the moment.""" 332 suffix = os.path.splitext(scriptname)[1].lower() 333 if suffix.startswith('.'): 334 suffix = suffix[1:] 335 return self.suffix == suffix
336
337 - def __repr__(self):
338 return "<"+self.name+" QueuingSystem instance>"
339 340 #: Pre-defined queuing systems (SGE, PBS). Add your own here. 341 queuing_systems = [ 342 QueuingSystem('Sun Gridengine', 'sge', '#$', array_variable='SGE_TASK_ID', array_option='-t %d-%d'), 343 QueuingSystem('PBS', 'pbs', '#PBS', array_variable='PBS_ARRAY_INDEX', array_option='-J %d-%d'), 344 QueuingSystem('LoadLeveler', 'll', '#@'), # no idea how to do arrays in LL 345 ] 346
347 -def detect_queuing_system(scriptfile):
348 """Return the queuing system for which *scriptfile* was written.""" 349 for qs in queuing_systems: 350 if qs.isMine(scriptfile): 351 return qs 352 return None
353
354 -def generate_submit_scripts(templates, prefix=None, deffnm='md', jobname='MD', budget=None, 355 mdrun_opts=None, walltime=1.0, jobarray_string=None, 356 **kwargs):
357 """Write scripts for queuing systems. 358 359 360 This sets up queuing system run scripts with a simple search and replace in 361 templates. See :func:`gromacs.cbook.edit_txt` for details. Shell scripts 362 are made executable. 363 364 :Arguments: 365 *templates* 366 Template file or list of template files. The "files" can also be names 367 or symbolic names for templates in the templates directory. See 368 :mod:`gromacs.config` for details and rules for writing templates. 369 *prefix* 370 Prefix for the final run script filename; by default the filename will be 371 the same as the template. [None] 372 *dirname* 373 Directory in which to place the submit scripts. [.] 374 *deffnm* 375 Default filename prefix for :program:`mdrun` ``-deffnm`` [md] 376 *jobname* 377 Name of the job in the queuing system. [MD] 378 *budget* 379 Which budget to book the runtime on [None] 380 *mdrun_opts* 381 String of additional options for :program:`mdrun`. 382 *walltime* 383 Maximum runtime of the job in hours. [1] 384 *jobarray_string* 385 Multi-line string that is spliced in for job array functionality 386 (see :func:`gromacs.qsub.generate_submit_array`; do not use manually) 387 *kwargs* 388 all other kwargs are ignored 389 390 :Returns: list of generated run scripts 391 """ 392 if not jobname[0].isalpha(): 393 jobname = 'MD_'+jobname 394 wmsg = "To make the jobname legal it must start with a letter: changed to %r" % jobname 395 logger.warn(wmsg) 396 warnings.warn(wmsg, category=AutoCorrectionWarning) 397 if prefix is None: 398 prefix = "" 399 if not mdrun_opts is None: 400 mdrun_opts = '"'+str(mdrun_opts)+'"' # TODO: could test if quotes already present 401 402 dirname = kwargs.pop('dirname', os.path.curdir) 403 404 wt = Timedelta(hours=walltime) 405 walltime = wt.strftime("%h:%M:%S") 406 wall_hours = wt.ashours 407 408 def write_script(template): 409 submitscript = os.path.join(dirname, prefix + os.path.basename(template)) 410 logger.info("Setting up queuing system script %(submitscript)r..." % vars()) 411 # These substitution rules are documented for the user in gromacs.config: 412 gromacs.cbook.edit_txt(template, 413 [('^ *DEFFNM=','md', deffnm), 414 ('^#.*(-N|job_name)', 'GMX_MD', jobname), 415 ('^#.*(-A|account_no)', 'BUDGET', budget), 416 ('^#.*(-l walltime|wall_clock_limit)', '00:20:00', walltime), 417 ('^ *WALL_HOURS=', '0\.33', wall_hours), 418 ('^ *MDRUN_OPTS=', '""', mdrun_opts), 419 ('^# JOB_ARRAY_PLACEHOLDER', '^.*$', jobarray_string), 420 ], 421 newname=submitscript) 422 ext = os.path.splitext(submitscript)[1] 423 if ext in ('.sh', '.csh', '.bash'): 424 os.chmod(submitscript, 0755) 425 return submitscript
426 427 return [write_script(template) for template in gromacs.config.get_templates(templates)] 428 429
430 -def generate_submit_array(templates, directories, **kwargs):
431 """Generate a array job. 432 433 For each ``work_dir`` in *directories*, the array job will 434 1. cd into ``work_dir`` 435 2. run the job as detailed in the template 436 It will use all the queuing system directives found in the 437 template. If more complicated set ups are required, then this 438 function cannot be used. 439 440 :Arguments: 441 *templates* 442 Basic template for a single job; the job array logic is spliced into 443 the position of the line :: 444 # JOB_ARRAY_PLACEHOLDER 445 The appropriate commands for common queuing systems (Sun Gridengine, PBS) 446 are hard coded here. The queuing system is detected from the suffix of 447 the template. 448 *directories* 449 List of directories under *dirname*. One task is set up for each 450 directory. 451 *dirname* 452 The array script will be placed in this directory. The *directories* 453 **must** be located under *dirname*. 454 *kwargs* 455 See :func:`gromacs.setup.generate_submit_script` for details. 456 """ 457 dirname = kwargs.setdefault('dirname', os.path.curdir) 458 reldirs = [relpath(p, start=dirname) for p in asiterable(directories)] 459 missing = [p for p in (os.path.join(dirname, subdir) for subdir in reldirs) 460 if not os.path.exists(p)] 461 if len(missing) > 0: 462 logger.error("Some directories are not accessible from the array script: " 463 "%(missing)r" % vars()) 464 def write_script(template): 465 qsystem = detect_queuing_system(template) 466 if qsystem is None or not qsystem.has_arrays(): 467 logger.warning("Not known how to make a job array for %(template)r; skipping..." % vars()) 468 return None 469 kwargs['jobarray_string'] = qsystem.array(reldirs) 470 return generate_submit_scripts(template, **kwargs)[0] # returns list of length 1
471 472 # must use config.get_templates() because we need to access the file for detecting 473 return [write_script(template) for template in gromacs.config.get_templates(templates)] 474 475
476 -class Manager(object):
477 """ Base class to launch simulations remotely on computers with queuing systems. 478 479 Basically, ssh into machine and run job. 480 481 Derive a class from :class:`Manager` and override the attributes 482 483 - :attr:`Manager._hostname` (hostname of the machine) 484 - :attr:`Manager._scratchdir` (all files and directories will be created under 485 this scratch directory; it must be a path on the remote host) 486 - :attr:`Manager._qscript` (the default queuing system script template) 487 - :attr:`Manager._walltime` (if there is a limit to the run time 488 of a job; in hours) 489 490 and implement a specialized :meth:`Manager.qsub` method if needed. 491 492 ssh_ must be set up (via `~/.ssh/config`_) to allow access via a 493 commandline such as :: 494 495 ssh <hostname> <command> ... 496 497 Typically you want something such as :: 498 499 host <hostname> 500 hostname <hostname>.fqdn.org 501 user <remote_user> 502 503 in ``~/.ssh/config`` and also set up public-key authentication in 504 order to avoid typing your password all the time. 505 """ 506 507 # override 508 #: hostname of the super computer (**required**) 509 _hostname = None 510 #: scratch dir on hostname (**required**) 511 _scratchdir = None 512 #: name of the template submission script appropriate for the 513 #: queuing system on :attr:`Manager._hostname`; can be a path to a 514 #: local file or a template stored in 515 #: :data:`gromacs.config.qscriptdir` or a key for :data:`gromacs.config.templates` (**required**) 516 _qscript = None 517 #: maximum run time of script in hours; the queuing system script 518 #: :attr:`Manager._qscript` is supposed to stop :program:`mdrun` 519 #: after 99% of this time via the ``-maxh`` option. A value of 520 #: ``None`` or ``inf`` indicates no limit. 521 _walltime = None 522 523 #: Regular expression used by :meth:`Manager.get_status` to parse 524 #: the logfile from :program:`mdrun`. 525 log_RE = re.compile(r""" 526 Run\stime\sexceeded\s+(?P<exceeded>.*)\s+hours,\swill\sterminate\sthe\srun 527 # another part to come 528 | Performance:\s*(?P<performance>[\s\d.]+)\n # performance line (split later) 529 | (?P<completed>Finished\smdrun\son\snode) # this (part of a) run completed 530 """, re.VERBOSE) 531 532
533 - def __init__(self, dirname=os.path.curdir, **kwargs):
534 """Set up the manager. 535 536 :Arguments: 537 *statedir* 538 directory component under the remote scratch dir (should 539 be different for different jobs) [basename(CWD)] 540 *prefix* 541 identifier for job names [MD] 542 """ 543 self.logger = logging.getLogger('gromacs.qsub.Manager') 544 545 # get variables into instance (so that vars(self) works...) 546 self.hostname = self._assertnotempty(self._hostname, '_hostname') 547 self.scratchdir = self._assertnotempty(self._scratchdir, '_scratchdir') 548 self.qscript = self._assertnotempty(self._qscript, '_qscript') 549 self.walltime = self._walltime 550 self.performance = None # ns/d, updated with get_status() 551 552 statedir = kwargs.pop('statedir', os.path.basename(os.path.realpath(os.path.curdir))) 553 self.wdir = os.path.join(self.scratchdir, statedir) 554 self.prefix = kwargs.pop('prefix', 'MD') # for future use/examples 555 self.uri = self.hostname.strip()+":"+self.wdir 556 557 self.logger.info("Setting up a manager from %r.", statedir) 558 559 # test connection and make directory where we run things on the remote host 560 rc = call(['ssh', self.hostname, 'mkdir' , '-p', self.wdir]) 561 if rc == 0: 562 self.logger.info("All good: can access %(uri)s" % vars(self)) 563 else: 564 self.logger.error("Problem with ssh and path %(uri)s" % vars(self)) 565 566 super(Manager, self).__init__(**kwargs)
567
568 - def _assertnotempty(self, value, name):
569 """Simple sanity check.""" 570 if value is None or value == '': 571 raise AssertionError("Class %s must have class variable %s set" 572 % (self.__class__.__name__, name)) 573 return value
574
575 - def remotepath(self, *args):
576 """Directory on the remote machine.""" 577 return os.path.join(self.wdir,*args)
578 579 get_dir = remotepath 580
581 - def remoteuri(self, *args):
582 """URI of the directory on the remote machine.""" 583 return os.path.join(self.uri,*args)
584
585 - def put(self, dirname):
586 """scp dirname to host. 587 588 :Arguments: dirname to be transferred 589 :Returns: return code from scp 590 """ 591 self.logger.info("Copying %r to %r" % (dirname, self.uri)) 592 return call(["scp", "-r", dirname, self.uri])
593
594 - def putfile(self, filename, dirname):
595 """scp *filename* to host in *dirname*. 596 597 :Arguments: filename and dirname to be transferred to 598 :Returns: return code from scp 599 """ 600 destdir = self.remoteuri(dirname) 601 self.logger.info("Copying %(filename)r to %(destdir)r" % vars()) 602 return call(["scp", filename, destdir])
603
604 - def get(self, dirname, checkfile=None, targetdir=os.path.curdir):
605 """``scp -r`` *dirname* from host into *targetdir* 606 607 :Arguments: 608 - *dirname*: dir to download 609 - *checkfile*: raise OSError/ENOENT if *targetdir/dirname/checkfile* was not found 610 - *targetdir*: put *dirname* into this directory 611 612 :Returns: return code from scp 613 """ 614 self.logger.info("Copying %r from %r" % (dirname, self.uri)) 615 rc = call(["scp", "-r", self.remoteuri(dirname), targetdir]) 616 #rc = call(["rsync", "-e","ssh","-avP", os.path.join(self.uri,dirname), targetdir]) 617 if not checkfile is None: 618 if not os.path.exists(os.path.join(targetdir, dirname, checkfile)): 619 self.logger.error("Failed to get %r from %s", checkfile, self.hostname) 620 raise OSError(errno.ENOENT, checkfile, 621 "Failed to download file from %(hostname)r" % vars(self)) 622 return rc
623
624 - def local_get(self, dirname, checkfile, cattrajectories=True, cleanup=False):
625 """Find *checkfile* locally if possible. 626 627 If *checkfile* is not found in *dirname* then it is transferred from the 628 remote host. 629 630 If needed, the trajectories are concatenated using :meth:`Manager.cat`. 631 632 :Returns: local path of *checkfile* 633 """ 634 checkpath = os.path.join(dirname, checkfile) 635 if not os.path.exists(checkpath): 636 self.get(dirname) # try downloading 637 if cattrajectories and not os.path.exists(checkpath): 638 # try cating everything first (guess prefix...) 639 prefix = os.path.splitext(os.path.basename(checkfile))[0] 640 self.cat(dirname, prefix=prefix, cleanup=cleanup) 641 if not os.path.exists(checkpath): 642 self.logger.error("Failed to get %r from %s", checkfile, self.hostname) 643 raise OSError(errno.ENOENT, checkfile, 644 "Failed to download file from %(hostname)r" % vars(self)) 645 return checkpath
646
647 - def cat(self, dirname, prefix='md', cleanup=True):
648 """Concatenate parts of a run in *dirname*. 649 650 Always uses :func:`gromacs.cbook.cat` with *resolve_multi* = 'guess'. 651 652 .. Note:: The default is to immediately delete the original files 653 (*cleanup* = ``True``). 654 655 :Keywords: 656 *dirname* 657 directory to work in 658 *prefix* 659 prefix (deffnm) of the files [md] 660 *cleanup* : boolean 661 if ``True``, remove all used files [``True``] 662 """ 663 gromacs.cbook.cat(prefix, dirname=dirname, resolve_multi='guess') 664 # cleanup/get stuff back 665 full_dir = os.path.join(dirname, 'full') # default of cat 666 complete_files = os.path.join(full_dir, '*.*') 667 self.logger.info("Manager.cat(): recoverning cated files from %r", full_dir) 668 for f in glob.glob(complete_files): 669 self.logger.debug("Manager.cat(): mv %s %s", f, dirname) 670 shutil.move(f, dirname) 671 shutil.rmtree(full_dir) 672 if cleanup: 673 partsdir = os.path.join(dirname, 'parts') # default of cat 674 self.logger.info("Manager.cat(): Removing cat dir %r", partsdir) 675 shutil.rmtree(partsdir)
676
677 - def qsub(self, dirname, **kwargs):
678 """Submit job remotely on host. 679 680 This is the most primitive implementation: it just runs the commands :: 681 682 cd remotedir && qsub qscript 683 684 on :attr:`Manager._hostname`. *remotedir* is *dirname* under 685 :attr:`Manager._scratchdir` and *qscript* defaults to the queuing system 686 script hat was produced from the template :attr:`Manager._qscript`. 687 """ 688 689 remotedir = kwargs.pop('remotedir', self.remotepath(dirname)) 690 qscript = kwargs.pop('qscript', os.path.basename(self.qscript)) 691 rc = call(['ssh', self.hostname, 'cd %s && qsub %s' % (remotedir, qscript)]) 692 if rc == 0: 693 self.logger.info("Submitted job %r on %s.", qscript, self.hostname ) 694 else: 695 self.logger.error("Failed running job %s on %s in %r.", 696 qscript, self.hostname, remotedir) 697 return rc == 0
698
699 - def get_status(self, dirname, logfilename='md*.log', silent=False):
700 """Check status of remote job by looking into the logfile. 701 702 Report on the status of the job and extracts the performance in ns/d if 703 available (which is saved in :attr:`Manager.performance`). 704 705 :Arguments: 706 - *dirname* 707 - *logfilename* can be a shell glob pattern [md*.log] 708 - *silent* = True/False; True suppresses log.info messages 709 710 :Returns: ``True`` is job is done, ``False`` if still running 711 ``None`` if no log file found to look at 712 713 .. Note:: Also returns ``False`` if the connection failed. 714 715 .. Warning:: This is an important but somewhat **fragile** method. It 716 needs to be improved to be more robust. 717 """ 718 719 remotefile = os.path.join(self.wdir, dirname, logfilename) 720 721 def loginfo(*args, **kwargs): 722 if not silent: 723 self.logger.info(*args, **kwargs)
724 if not silent: 725 self.logger.debug("Checking status of %s:%s", self.hostname, remotefile) 726 727 # need to check if file exists to avoid infinite hangs 728 sshcmd = """files=$(ls %(remotefile)s); """ \ 729 """test -n "$files" && tail -n 500 $(echo $files | tr ' ' '\n' | sort | tail -n 1) """\ 730 """|| exit 255""" % vars() 731 p = Popen(['ssh', self.hostname, sshcmd], 732 stdout=PIPE, stderr=PIPE, universal_newlines=True) 733 out, err = p.communicate() 734 rc = p.returncode 735 736 status = {'exceeded': False, 'completed': False, 'started': False} 737 performance = None 738 if rc == 0: 739 status['started'] = True 740 for m in re.finditer(self.log_RE, out): 741 if m.group('completed'): 742 status['completed'] = True 743 elif m.group('exceeded'): 744 status['exceeded'] = True 745 elif m.group('performance'): 746 performance = dict(zip(['Mnbf/s', 'GFlops', 'ns/day', 'hour/ns'], 747 map(float, m.group('performance').split()))) 748 elif rc == 255: 749 loginfo("No output file (yet) for job on %(hostname)s." % vars(self)) 750 if err: 751 self.logger.error("remote: %r", err) 752 else: 753 self.logger.debug("get_status(): got return code %r, not sure what it means", rc) 754 755 isDone = False 756 if status['exceeded']: 757 loginfo("Job on %(hostname)s is RUNNING but waiting for next part to run." % vars(self)) 758 elif status['completed']: # and not exceeded 759 isDone = True 760 loginfo("Job on %(hostname)s is DONE." % vars(self)) 761 elif not status['started']: 762 loginfo("Job on %(hostname)s is WAITING in the queue." % vars(self)) 763 else: 764 loginfo("Job on %(hostname)s is still RUNNING." % vars(self)) 765 if err: 766 self.logger.error("remote: %r", err) 767 lines = out.split('\n').__iter__() 768 values = ['NAN', 'NAN', 'NAN'] # set a stupid default in case we don't find any time step 769 for line in lines: 770 if re.match('\s*Step\s+Time\s+Lambda', line): 771 try: 772 values = lines.next().split() # typically three values 773 except StopIteration: 774 pass # if we're unlucky and Step...is last line 775 loginfo("Last time step %f ns at step %d.", float(values[1])/1000, float(values[0])) 776 777 if performance: 778 self.performance = performance['ns/day'] # used for calculating ndependent() 779 loginfo("Performance: %(ns/day)g ns/day", performance) 780 781 return isDone
782 783 job_done = get_status 784 qstat = get_status 785
786 - def ndependent(self, runtime, performance=None, walltime=None):
787 """Calculate how many dependent (chained) jobs are required. 788 789 Uses *performance* in ns/d (gathered from :meth:`get_status`) and job max 790 *walltime* (in hours) from the class unless provided as keywords. 791 792 n = ceil(runtime/(performance*0.99*walltime) 793 794 :Keywords: 795 *runtime* 796 length of run in ns 797 *performance* 798 ns/d with the given setup 799 *walltime* 800 maximum run length of the script (using 99% of it), in h 801 802 :Returns: *n* or 1 if walltime is unlimited 803 """ 804 import math 805 perf = performance or self.performance # in ns/d 806 wt = walltime or self.walltime # max runtime of job in h (None = inf) 807 808 if wt is None or wt == float('inf'): 809 return 1 810 811 if perf is None: 812 raise ValueError("No performance data available. Run get_status()?") 813 814 return int(math.ceil(runtime/(perf*0.99*wt/24.)))
815
816 - def waitfor(self, dirname, **kwargs):
817 """Wait until the job associated with *dirname* is done. 818 819 Super-primitive, uses a simple while ... sleep for *seconds* delay 820 821 :Arguments: 822 *dirname* 823 look for log files under the remote dir corresponding to *dirname* 824 *seconds* 825 delay in *seconds* during re-polling 826 """ 827 import sys 828 import time 829 delta_seconds = kwargs.pop('seconds', 120) 830 kwargs.setdefault('silent', True) 831 totseconds = 0 832 while not self.job_done(dirname, **kwargs): 833 sys.stderr.write("%4d min ... %s still running\r" % (totseconds/60, dirname)) 834 time.sleep(delta_seconds) 835 totseconds += delta_seconds 836 sys.stderr.write('\n')
837 838 #------------------------------------------------------------ 839 # example implementations for various stages 840 #------------------------------------------------------------
841 - def setup_posres(self, **kwargs):
842 """Set up position restraints run and transfer to host. 843 844 *kwargs* are passed to :func:`gromacs.setup.MD_restrained` 845 846 """ 847 848 dirname = 'MD_POSRES' 849 struct = self.local_get('em','em.pdb') 850 gromacs.setup.MD_restrained(dirname=dirname, struct=struct, 851 qscript=self.qscript, qname=self.prefix+'pr', 852 **kwargs) 853 self.put(dirname) 854 self.logger.info("Run %s on %s in %s/%s" % (dirname, self.hostname, self.uri, dirname)) 855 self.logger.info(">> qsub('%s')", dirname) 856 return dirname
857
858 - def setup_MD(self, jobnumber, struct=os.path.join('MD_POSRES', 'md.pdb'), **kwargs):
859 """Set up production and transfer to host. 860 861 :Arguments: 862 - *jobnumber*: 1,2 ... 863 - *struct* is the starting structure (default from POSRES 864 run but that is just a guess); 865 - kwargs are passed to :func:`gromacs.setup.MD` 866 """ 867 kwargs.setdefault('runtime', 1e4) 868 869 jobid_s = '%(jobnumber)03d' % vars() 870 dirname = 'MD_'+jobid_s 871 structure = self.local_get(os.path.dirname(struct), os.path.basename(struct)) 872 873 gromacs.setup.MD(dirname=dirname, struct=structure, qscript=self.qscript, 874 qname=self.prefix+jobid_s, 875 **kwargs) 876 self.put(dirname) 877 self.logger.info("Run %s on %s in %s/%s" % (dirname, self.hostname, self.uri, dirname)) 878 self.logger.info("Or use %s.qsub(%r)" % (self.__class__.__name__, dirname)) 879 880 return dirname
881