1
2
3
4
5 """
6 :mod:`gromacs.qsub` -- utilities for batch submission systems
7 =============================================================
8
9 The module helps writing submission scripts for various batch submission
10 queuing systems. The known ones are listed stored as
11 :class:`~gromacs.qsub.QueuingSystem` instances in
12 :data:`~gromacs.qsub.queuing_systems`; append new ones to this list.
13
14 The working paradigm is that template scripts are provided (see
15 :data:`gromacs.config.templates`) and only a few place holders are substituted
16 (using :func:`gromacs.cbook.edit_txt`).
17
18 *User-supplied template scripts* can be stored in
19 :data:`gromacs.config.qscriptdir` (by default ``~/.gromacswrapper/qscripts``)
20 and they will be picked up before the package-supplied ones.
21
22 The :class:`~gromacs.qsub.Manager` handles setup and control of jobs
23 in a queuing system on a remote system via :program:`ssh`.
24
25 At the moment, some of the functions in :mod:`gromacs.setup` use this module
26 but it is fairly independent and could conceivably be used for a wider range of
27 projects.
28
29
30 Queuing system templates
31 ------------------------
32
33 The queuing system scripts are highly specific and you will need to add
34 your own. Templates should be shell scripts. Some parts of the
35 templates are modified by the
36 :func:`~gromacs.qsub.generate_submit_scripts` function. The "place
37 holders" that can be replaced are shown in the table below. Typically,
38 the place holders are either shell variable assignments or batch
39 submission system commands. The table shows SGE_ commands but PBS_ and
40 LoadLeveler_ have similar constructs; e.g. PBS commands start with
41 ``#PBS`` and LoadLeveller uses ``#@`` with its own command keywords).
42
43 .. Table:: Substitutions in queuing system templates.
44
45 =============== =========== ================ ================= =====================================
46 place holder default replacement description regex
47 =============== =========== ================ ================= =====================================
48 #$ -N GMX_MD *sgename* job name /^#.*(-N|job_name)/
49 #$ -l walltime= 00:20:00 *walltime* max run time /^#.*(-l walltime|wall_clock_limit)/
50 #$ -A BUDGET *budget* account /^#.*(-A|account_no)/
51 DEFFNM= md *deffnm* default gmx name /^DEFFNM=/
52 WALL_HOURS= 0.33 *walltime* h mdrun's -maxh /^WALL_HOURS=/
53 MDRUN_OPTS= "" *mdrun_opts* more options /^MDRUN_OPTS=/
54 =============== =========== ================ ================= =====================================
55
56 Lines with place holders should not have any white space at the beginning. The
57 regular expression pattern ("regex") is used to find the lines for the
58 replacement and the literal default values ("default") are replaced. Not all
59 place holders have to occur in a template; for instance, if a queue has no run
60 time limitation then one would probably not include *walltime* and *WALL_HOURS*
61 place holders.
62
63 The line ``# JOB_ARRAY_PLACEHOLDER`` can be replaced by
64 :func:`~gromacs.qsub.generate_submit_array` to produce a "job array"
65 (also known as a "task array") script that runs a large number of
66 related simulations under the control of a single queuing system
67 job. The individual array tasks are run from different sub
68 directories. Only queuing system scripts that are using the
69 :program:`bash` shell are supported for job arrays at the moment.
70
71 A queuing system script *must* have the appropriate suffix to be properly
72 recognized, as shown in the table below.
73
74 .. Table:: Suffices for queuing system templates. Pure shell-scripts are only used to run locally.
75
76 ============================== =========== ===========================
77 Queuing system suffix notes
78 ============================== =========== ===========================
79 Sun Gridengine .sge Sun's `Sun Gridengine`_
80 Portable Batch queuing system .pbs OpenPBS_ and `PBS Pro`_
81 LoadLeveler .ll IBM's `LoadLeveler`_
82 bash script .bash, .sh `Advanced bash scripting`_
83 csh script .csh avoid_ csh_
84 ============================== =========== ===========================
85
86 .. _OpenPBS: http://www.mcs.anl.gov/research/projects/openpbs/
87 .. _PBS: OpenPBS_
88 .. _PBS Pro: http://www.pbsworks.com/Product.aspx?id=1
89 .. _Sun Gridengine: http://gridengine.sunsource.net/
90 .. _SGE: Sun Gridengine_
91 .. _LoadLeveler: http://www-03.ibm.com/systems/software/loadleveler/index.html
92 .. _Advanced bash scripting: http://tldp.org/LDP/abs/html/
93 .. _avoid: http://www.grymoire.com/Unix/CshTop10.txt
94 .. _csh: http://www.faqs.org/faqs/unix-faq/shell/csh-whynot/
95
96
97 Example queuing system script template for PBS
98 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
99
100 The following script is a usable PBS_ script for a super computer. It
101 contains almost all of the replacement tokens listed in the table
102 (indicated by ++++++; these values should be kept in the template as
103 they are or they will not be subject to replacement). ::
104
105 #!/bin/bash
106 # File name: ~/.gromacswrapper/qscripts/supercomputer.somewhere.fr_64core.pbs
107 #PBS -N GMX_MD
108 # ++++++
109 #PBS -j oe
110 #PBS -l select=8:ncpus=8:mpiprocs=8
111 #PBS -l walltime=00:20:00
112 # ++++++++
113
114 # host: supercomputer.somewhere.fr
115 # queuing system: PBS
116
117 # set this to the same value as walltime; mdrun will stop cleanly
118 # at 0.99 * WALL_HOURS
119 WALL_HOURS=0.33
120 # ++++
121
122 # deffnm line is possibly modified by gromacs.setup
123 # (leave it as it is in the template)
124 DEFFNM=md
125 # ++
126
127 TPR=${DEFFNM}.tpr
128 OUTPUT=${DEFFNM}.out
129 PDB=${DEFFNM}.pdb
130
131 MDRUN_OPTS=""
132 # ++
133
134 # If you always want to add additional MDRUN options in this script then
135 # you can either do this directly in the mdrun commandline below or by
136 # constructs such as the following:
137 ## MDRUN_OPTS="-npme 24 $MDRUN_OPTS"
138
139 # JOB_ARRAY_PLACEHOLDER
140 #++++++++++++++++++++++ leave the full commented line intact!
141
142 # avoids some failures
143 export MPI_GROUP_MAX=1024
144 # use hard coded path for time being
145 GMXBIN="/opt/software/SGI/gromacs/4.0.3/bin"
146 MPIRUN=/usr/pbs/bin/mpiexec
147 APPLICATION=$GMXBIN/mdrun_mpi
148
149 $MPIRUN $APPLICATION -stepout 1000 -deffnm ${DEFFNM} -s ${TPR} -c ${PDB} -cpi \
150 $MDRUN_OPTS \
151 -maxh ${WALL_HOURS} > $OUTPUT
152 rc=$?
153
154 # dependent jobs will only start if rc == 0
155 exit $rc
156
157 Save the above script in ``~/.gromacswrapper/qscripts`` under the name
158 ``supercomputer.somewhere.fr_64core.pbs``. This will make the script
159 immediately usable. For example, in order to set up a production MD run with
160 :func:`gromacs.setup.MD` for this super computer one would use ::
161
162 gromacs.setup.MD(..., qscripts=['supercomputer.somewhere.fr_64core.pbs', 'local.sh'])
163
164 This will generate submission scripts based on
165 ``supercomputer.somewhere.fr_64core.pbs`` and also the default ``local.sh``
166 that is provided with *GromacsWrapper*.
167
168 In order to modify ``MDRUN_OPTS`` one would use the additonal *mdrun_opts*
169 argument, for instance::
170
171 gromacs.setup.MD(..., qscripts=['supercomputer.somewhere.fr_64core.pbs', 'local.sh'],
172 mdrun_opts="-v -npme 20 -dlb yes -nosum")
173
174
175 Currently there is no good way to specify the number of processors when
176 creating run scripts. You will need to provided scripts with different numbers
177 of cores hard coded or set them when submitting the scripts with command line
178 options to :program:`qsub`.
179
180
181
182 Classes and functions
183 ---------------------
184
185 .. autoclass:: QueuingSystem
186 :members:
187 .. autofunction:: generate_submit_scripts
188 .. autofunction:: generate_submit_array
189 .. autofunction:: detect_queuing_system
190
191 .. autodata:: queuing_systems
192
193
194
195 Queuing system Manager
196 ----------------------
197
198 The :class:`Manager` class must be customized for each system such as
199 a cluster or a super computer. It then allows submission and control of
200 jobs remotely (using ssh_).
201
202 .. autoclass:: Manager
203 :members:
204 :exclude-members: job_done, qstat
205
206 .. autoattribute:: _hostname
207 .. autoattribute:: _scratchdir
208 .. autoattribute:: _qscript
209 .. autoattribute:: _walltime
210 .. method:: job_done
211
212 alias for :meth:`get_status`
213
214 .. method:: qstat
215
216 alias for :meth:`get_status`
217
218
219 .. _ssh: http://www.openssh.com/
220 .. _~/.ssh/config: http://linux.die.net/man/5/ssh_config
221 """
222
223 import os, errno
224 import warnings
225 from subprocess import call, Popen, PIPE
226 import shutil
227 import re
228 import glob
229
230 import gromacs.config
231 import gromacs.cbook
232 from gromacs.utilities import asiterable, Timedelta
233 from gromacs import AutoCorrectionWarning
234
235 import logging
236 logger = logging.getLogger('gromacs.qsub')
237
238 try:
239 from os.path import relpath
240 except ImportError:
241
243 """Return a relative version of a path (from posixpath 2.6)"""
244
245 if not path:
246 raise ValueError("no path specified")
247
248 start_list = os.path.abspath(start).split(os.path.sep)
249 path_list = os.path.abspath(path).split(os.path.sep)
250
251
252 i = len(os.path.commonprefix([start_list, path_list]))
253
254 rel_list = [os.path.pardir] * (len(start_list)-i) + path_list[i:]
255 if not rel_list:
256 return os.path.curdir
257 return os.path.join(*rel_list)
258
259
261 """Class that represents minimum information about a batch submission system."""
262
263 - def __init__(self, name, suffix, qsub_prefix, array_variable=None, array_option=None):
264 """Define a queuing system's functionality
265
266 :Arguments:
267 *name*
268 name of the queuing system, e.g. 'Sun Gridengine'
269 *suffix*
270 suffix of input files, e.g. 'sge'
271 *qsub_prefix*
272 prefix string that starts a qsub flag in a script, e.g. '#$'
273
274 :Keywords:
275 *array_variable*
276 environment variable exported for array jobs, e.g.
277 'SGE_TASK_ID'
278 *array_option*
279 qsub option format string to launch an array (e.g. '-t %d-%d')
280
281 """
282 self.name = name
283 self.suffix = suffix
284 self.qsub_prefix = qsub_prefix
285 self.array_variable = array_variable
286 self.array_option = array_option
287
288 - def flag(self, *args):
289 """Return string for qsub flag *args* prefixed with appropriate inscript prefix."""
290 return " ".join((self.qsub_prefix,)+args)
291
293 """True if known how to do job arrays."""
294 return not self.array_variable is None
295
297 """Return string to embed the array launching option in the script."""
298 return self.flag(self.array_option % (1,len(directories)))
299
300 - def array(self, directories):
301 """Return multiline string for simple array jobs over *directories*.
302
303 .. Warning:: The string is in ``bash`` and hence the template must also
304 be ``bash`` (and *not* ``csh`` or ``sh``).
305 """
306 if not self.has_arrays():
307 raise NotImplementedError('Not known how make array jobs for '
308 'queuing system %(name)s' % vars(self))
309 hrule = '#'+60*'-'
310 lines = [
311 '',
312 hrule,
313 '# job array:',
314 self.array_flag(directories),
315 hrule,
316 '# directories for job tasks',
317 'declare -a jobdirs']
318 for i,dirname in enumerate(asiterable(directories)):
319 idx = i+1
320 lines.append('jobdirs[%(idx)d]=%(dirname)r' % vars())
321 lines.extend([
322 '# Switch to the current tasks directory:',
323 'wdir="${jobdirs[${%(array_variable)s}]}"' % vars(self),
324 'cd "$wdir" || { echo "ERROR: failed to enter $wdir."; exit 1; }',
325 hrule,
326 ''
327 ])
328 return "\n".join(lines)
329
330 - def isMine(self, scriptname):
331 """Primitive queuing system detection; only looks at suffix at the moment."""
332 suffix = os.path.splitext(scriptname)[1].lower()
333 if suffix.startswith('.'):
334 suffix = suffix[1:]
335 return self.suffix == suffix
336
338 return "<"+self.name+" QueuingSystem instance>"
339
340
341 queuing_systems = [
342 QueuingSystem('Sun Gridengine', 'sge', '#$', array_variable='SGE_TASK_ID', array_option='-t %d-%d'),
343 QueuingSystem('PBS', 'pbs', '#PBS', array_variable='PBS_ARRAY_INDEX', array_option='-J %d-%d'),
344 QueuingSystem('LoadLeveler', 'll', '#@'),
345 ]
346
348 """Return the queuing system for which *scriptfile* was written."""
349 for qs in queuing_systems:
350 if qs.isMine(scriptfile):
351 return qs
352 return None
353
354 -def generate_submit_scripts(templates, prefix=None, deffnm='md', jobname='MD', budget=None,
355 mdrun_opts=None, walltime=1.0, jobarray_string=None,
356 **kwargs):
357 """Write scripts for queuing systems.
358
359
360 This sets up queuing system run scripts with a simple search and replace in
361 templates. See :func:`gromacs.cbook.edit_txt` for details. Shell scripts
362 are made executable.
363
364 :Arguments:
365 *templates*
366 Template file or list of template files. The "files" can also be names
367 or symbolic names for templates in the templates directory. See
368 :mod:`gromacs.config` for details and rules for writing templates.
369 *prefix*
370 Prefix for the final run script filename; by default the filename will be
371 the same as the template. [None]
372 *dirname*
373 Directory in which to place the submit scripts. [.]
374 *deffnm*
375 Default filename prefix for :program:`mdrun` ``-deffnm`` [md]
376 *jobname*
377 Name of the job in the queuing system. [MD]
378 *budget*
379 Which budget to book the runtime on [None]
380 *mdrun_opts*
381 String of additional options for :program:`mdrun`.
382 *walltime*
383 Maximum runtime of the job in hours. [1]
384 *jobarray_string*
385 Multi-line string that is spliced in for job array functionality
386 (see :func:`gromacs.qsub.generate_submit_array`; do not use manually)
387 *kwargs*
388 all other kwargs are ignored
389
390 :Returns: list of generated run scripts
391 """
392 if not jobname[0].isalpha():
393 jobname = 'MD_'+jobname
394 wmsg = "To make the jobname legal it must start with a letter: changed to %r" % jobname
395 logger.warn(wmsg)
396 warnings.warn(wmsg, category=AutoCorrectionWarning)
397 if prefix is None:
398 prefix = ""
399 if not mdrun_opts is None:
400 mdrun_opts = '"'+str(mdrun_opts)+'"'
401
402 dirname = kwargs.pop('dirname', os.path.curdir)
403
404 wt = Timedelta(hours=walltime)
405 walltime = wt.strftime("%h:%M:%S")
406 wall_hours = wt.ashours
407
408 def write_script(template):
409 submitscript = os.path.join(dirname, prefix + os.path.basename(template))
410 logger.info("Setting up queuing system script %(submitscript)r..." % vars())
411
412 gromacs.cbook.edit_txt(template,
413 [('^ *DEFFNM=','md', deffnm),
414 ('^#.*(-N|job_name)', 'GMX_MD', jobname),
415 ('^#.*(-A|account_no)', 'BUDGET', budget),
416 ('^#.*(-l walltime|wall_clock_limit)', '00:20:00', walltime),
417 ('^ *WALL_HOURS=', '0\.33', wall_hours),
418 ('^ *MDRUN_OPTS=', '""', mdrun_opts),
419 ('^# JOB_ARRAY_PLACEHOLDER', '^.*$', jobarray_string),
420 ],
421 newname=submitscript)
422 ext = os.path.splitext(submitscript)[1]
423 if ext in ('.sh', '.csh', '.bash'):
424 os.chmod(submitscript, 0755)
425 return submitscript
426
427 return [write_script(template) for template in gromacs.config.get_templates(templates)]
428
429
431 """Generate a array job.
432
433 For each ``work_dir`` in *directories*, the array job will
434 1. cd into ``work_dir``
435 2. run the job as detailed in the template
436 It will use all the queuing system directives found in the
437 template. If more complicated set ups are required, then this
438 function cannot be used.
439
440 :Arguments:
441 *templates*
442 Basic template for a single job; the job array logic is spliced into
443 the position of the line ::
444 # JOB_ARRAY_PLACEHOLDER
445 The appropriate commands for common queuing systems (Sun Gridengine, PBS)
446 are hard coded here. The queuing system is detected from the suffix of
447 the template.
448 *directories*
449 List of directories under *dirname*. One task is set up for each
450 directory.
451 *dirname*
452 The array script will be placed in this directory. The *directories*
453 **must** be located under *dirname*.
454 *kwargs*
455 See :func:`gromacs.setup.generate_submit_script` for details.
456 """
457 dirname = kwargs.setdefault('dirname', os.path.curdir)
458 reldirs = [relpath(p, start=dirname) for p in asiterable(directories)]
459 missing = [p for p in (os.path.join(dirname, subdir) for subdir in reldirs)
460 if not os.path.exists(p)]
461 if len(missing) > 0:
462 logger.error("Some directories are not accessible from the array script: "
463 "%(missing)r" % vars())
464 def write_script(template):
465 qsystem = detect_queuing_system(template)
466 if qsystem is None or not qsystem.has_arrays():
467 logger.warning("Not known how to make a job array for %(template)r; skipping..." % vars())
468 return None
469 kwargs['jobarray_string'] = qsystem.array(reldirs)
470 return generate_submit_scripts(template, **kwargs)[0]
471
472
473 return [write_script(template) for template in gromacs.config.get_templates(templates)]
474
475
477 """ Base class to launch simulations remotely on computers with queuing systems.
478
479 Basically, ssh into machine and run job.
480
481 Derive a class from :class:`Manager` and override the attributes
482
483 - :attr:`Manager._hostname` (hostname of the machine)
484 - :attr:`Manager._scratchdir` (all files and directories will be created under
485 this scratch directory; it must be a path on the remote host)
486 - :attr:`Manager._qscript` (the default queuing system script template)
487 - :attr:`Manager._walltime` (if there is a limit to the run time
488 of a job; in hours)
489
490 and implement a specialized :meth:`Manager.qsub` method if needed.
491
492 ssh_ must be set up (via `~/.ssh/config`_) to allow access via a
493 commandline such as ::
494
495 ssh <hostname> <command> ...
496
497 Typically you want something such as ::
498
499 host <hostname>
500 hostname <hostname>.fqdn.org
501 user <remote_user>
502
503 in ``~/.ssh/config`` and also set up public-key authentication in
504 order to avoid typing your password all the time.
505 """
506
507
508
509 _hostname = None
510
511 _scratchdir = None
512
513
514
515
516 _qscript = None
517
518
519
520
521 _walltime = None
522
523
524
525 log_RE = re.compile(r"""
526 Run\stime\sexceeded\s+(?P<exceeded>.*)\s+hours,\swill\sterminate\sthe\srun
527 # another part to come
528 | Performance:\s*(?P<performance>[\s\d.]+)\n # performance line (split later)
529 | (?P<completed>Finished\smdrun\son\snode) # this (part of a) run completed
530 """, re.VERBOSE)
531
532
534 """Set up the manager.
535
536 :Arguments:
537 *statedir*
538 directory component under the remote scratch dir (should
539 be different for different jobs) [basename(CWD)]
540 *prefix*
541 identifier for job names [MD]
542 """
543 self.logger = logging.getLogger('gromacs.qsub.Manager')
544
545
546 self.hostname = self._assertnotempty(self._hostname, '_hostname')
547 self.scratchdir = self._assertnotempty(self._scratchdir, '_scratchdir')
548 self.qscript = self._assertnotempty(self._qscript, '_qscript')
549 self.walltime = self._walltime
550 self.performance = None
551
552 statedir = kwargs.pop('statedir', os.path.basename(os.path.realpath(os.path.curdir)))
553 self.wdir = os.path.join(self.scratchdir, statedir)
554 self.prefix = kwargs.pop('prefix', 'MD')
555 self.uri = self.hostname.strip()+":"+self.wdir
556
557 self.logger.info("Setting up a manager from %r.", statedir)
558
559
560 rc = call(['ssh', self.hostname, 'mkdir' , '-p', self.wdir])
561 if rc == 0:
562 self.logger.info("All good: can access %(uri)s" % vars(self))
563 else:
564 self.logger.error("Problem with ssh and path %(uri)s" % vars(self))
565
566 super(Manager, self).__init__(**kwargs)
567
569 """Simple sanity check."""
570 if value is None or value == '':
571 raise AssertionError("Class %s must have class variable %s set"
572 % (self.__class__.__name__, name))
573 return value
574
576 """Directory on the remote machine."""
577 return os.path.join(self.wdir,*args)
578
579 get_dir = remotepath
580
582 """URI of the directory on the remote machine."""
583 return os.path.join(self.uri,*args)
584
585 - def put(self, dirname):
586 """scp dirname to host.
587
588 :Arguments: dirname to be transferred
589 :Returns: return code from scp
590 """
591 self.logger.info("Copying %r to %r" % (dirname, self.uri))
592 return call(["scp", "-r", dirname, self.uri])
593
594 - def putfile(self, filename, dirname):
595 """scp *filename* to host in *dirname*.
596
597 :Arguments: filename and dirname to be transferred to
598 :Returns: return code from scp
599 """
600 destdir = self.remoteuri(dirname)
601 self.logger.info("Copying %(filename)r to %(destdir)r" % vars())
602 return call(["scp", filename, destdir])
603
604 - def get(self, dirname, checkfile=None, targetdir=os.path.curdir):
605 """``scp -r`` *dirname* from host into *targetdir*
606
607 :Arguments:
608 - *dirname*: dir to download
609 - *checkfile*: raise OSError/ENOENT if *targetdir/dirname/checkfile* was not found
610 - *targetdir*: put *dirname* into this directory
611
612 :Returns: return code from scp
613 """
614 self.logger.info("Copying %r from %r" % (dirname, self.uri))
615 rc = call(["scp", "-r", self.remoteuri(dirname), targetdir])
616
617 if not checkfile is None:
618 if not os.path.exists(os.path.join(targetdir, dirname, checkfile)):
619 self.logger.error("Failed to get %r from %s", checkfile, self.hostname)
620 raise OSError(errno.ENOENT, checkfile,
621 "Failed to download file from %(hostname)r" % vars(self))
622 return rc
623
624 - def local_get(self, dirname, checkfile, cattrajectories=True, cleanup=False):
625 """Find *checkfile* locally if possible.
626
627 If *checkfile* is not found in *dirname* then it is transferred from the
628 remote host.
629
630 If needed, the trajectories are concatenated using :meth:`Manager.cat`.
631
632 :Returns: local path of *checkfile*
633 """
634 checkpath = os.path.join(dirname, checkfile)
635 if not os.path.exists(checkpath):
636 self.get(dirname)
637 if cattrajectories and not os.path.exists(checkpath):
638
639 prefix = os.path.splitext(os.path.basename(checkfile))[0]
640 self.cat(dirname, prefix=prefix, cleanup=cleanup)
641 if not os.path.exists(checkpath):
642 self.logger.error("Failed to get %r from %s", checkfile, self.hostname)
643 raise OSError(errno.ENOENT, checkfile,
644 "Failed to download file from %(hostname)r" % vars(self))
645 return checkpath
646
647 - def cat(self, dirname, prefix='md', cleanup=True):
648 """Concatenate parts of a run in *dirname*.
649
650 Always uses :func:`gromacs.cbook.cat` with *resolve_multi* = 'guess'.
651
652 .. Note:: The default is to immediately delete the original files
653 (*cleanup* = ``True``).
654
655 :Keywords:
656 *dirname*
657 directory to work in
658 *prefix*
659 prefix (deffnm) of the files [md]
660 *cleanup* : boolean
661 if ``True``, remove all used files [``True``]
662 """
663 gromacs.cbook.cat(prefix, dirname=dirname, resolve_multi='guess')
664
665 full_dir = os.path.join(dirname, 'full')
666 complete_files = os.path.join(full_dir, '*.*')
667 self.logger.info("Manager.cat(): recoverning cated files from %r", full_dir)
668 for f in glob.glob(complete_files):
669 self.logger.debug("Manager.cat(): mv %s %s", f, dirname)
670 shutil.move(f, dirname)
671 shutil.rmtree(full_dir)
672 if cleanup:
673 partsdir = os.path.join(dirname, 'parts')
674 self.logger.info("Manager.cat(): Removing cat dir %r", partsdir)
675 shutil.rmtree(partsdir)
676
677 - def qsub(self, dirname, **kwargs):
678 """Submit job remotely on host.
679
680 This is the most primitive implementation: it just runs the commands ::
681
682 cd remotedir && qsub qscript
683
684 on :attr:`Manager._hostname`. *remotedir* is *dirname* under
685 :attr:`Manager._scratchdir` and *qscript* defaults to the queuing system
686 script hat was produced from the template :attr:`Manager._qscript`.
687 """
688
689 remotedir = kwargs.pop('remotedir', self.remotepath(dirname))
690 qscript = kwargs.pop('qscript', os.path.basename(self.qscript))
691 rc = call(['ssh', self.hostname, 'cd %s && qsub %s' % (remotedir, qscript)])
692 if rc == 0:
693 self.logger.info("Submitted job %r on %s.", qscript, self.hostname )
694 else:
695 self.logger.error("Failed running job %s on %s in %r.",
696 qscript, self.hostname, remotedir)
697 return rc == 0
698
699 - def get_status(self, dirname, logfilename='md*.log', silent=False):
700 """Check status of remote job by looking into the logfile.
701
702 Report on the status of the job and extracts the performance in ns/d if
703 available (which is saved in :attr:`Manager.performance`).
704
705 :Arguments:
706 - *dirname*
707 - *logfilename* can be a shell glob pattern [md*.log]
708 - *silent* = True/False; True suppresses log.info messages
709
710 :Returns: ``True`` is job is done, ``False`` if still running
711 ``None`` if no log file found to look at
712
713 .. Note:: Also returns ``False`` if the connection failed.
714
715 .. Warning:: This is an important but somewhat **fragile** method. It
716 needs to be improved to be more robust.
717 """
718
719 remotefile = os.path.join(self.wdir, dirname, logfilename)
720
721 def loginfo(*args, **kwargs):
722 if not silent:
723 self.logger.info(*args, **kwargs)
724 if not silent:
725 self.logger.debug("Checking status of %s:%s", self.hostname, remotefile)
726
727
728 sshcmd = """files=$(ls %(remotefile)s); """ \
729 """test -n "$files" && tail -n 500 $(echo $files | tr ' ' '\n' | sort | tail -n 1) """\
730 """|| exit 255""" % vars()
731 p = Popen(['ssh', self.hostname, sshcmd],
732 stdout=PIPE, stderr=PIPE, universal_newlines=True)
733 out, err = p.communicate()
734 rc = p.returncode
735
736 status = {'exceeded': False, 'completed': False, 'started': False}
737 performance = None
738 if rc == 0:
739 status['started'] = True
740 for m in re.finditer(self.log_RE, out):
741 if m.group('completed'):
742 status['completed'] = True
743 elif m.group('exceeded'):
744 status['exceeded'] = True
745 elif m.group('performance'):
746 performance = dict(zip(['Mnbf/s', 'GFlops', 'ns/day', 'hour/ns'],
747 map(float, m.group('performance').split())))
748 elif rc == 255:
749 loginfo("No output file (yet) for job on %(hostname)s." % vars(self))
750 if err:
751 self.logger.error("remote: %r", err)
752 else:
753 self.logger.debug("get_status(): got return code %r, not sure what it means", rc)
754
755 isDone = False
756 if status['exceeded']:
757 loginfo("Job on %(hostname)s is RUNNING but waiting for next part to run." % vars(self))
758 elif status['completed']:
759 isDone = True
760 loginfo("Job on %(hostname)s is DONE." % vars(self))
761 elif not status['started']:
762 loginfo("Job on %(hostname)s is WAITING in the queue." % vars(self))
763 else:
764 loginfo("Job on %(hostname)s is still RUNNING." % vars(self))
765 if err:
766 self.logger.error("remote: %r", err)
767 lines = out.split('\n').__iter__()
768 values = ['NAN', 'NAN', 'NAN']
769 for line in lines:
770 if re.match('\s*Step\s+Time\s+Lambda', line):
771 try:
772 values = lines.next().split()
773 except StopIteration:
774 pass
775 loginfo("Last time step %f ns at step %d.", float(values[1])/1000, float(values[0]))
776
777 if performance:
778 self.performance = performance['ns/day']
779 loginfo("Performance: %(ns/day)g ns/day", performance)
780
781 return isDone
782
783 job_done = get_status
784 qstat = get_status
785
786 - def ndependent(self, runtime, performance=None, walltime=None):
787 """Calculate how many dependent (chained) jobs are required.
788
789 Uses *performance* in ns/d (gathered from :meth:`get_status`) and job max
790 *walltime* (in hours) from the class unless provided as keywords.
791
792 n = ceil(runtime/(performance*0.99*walltime)
793
794 :Keywords:
795 *runtime*
796 length of run in ns
797 *performance*
798 ns/d with the given setup
799 *walltime*
800 maximum run length of the script (using 99% of it), in h
801
802 :Returns: *n* or 1 if walltime is unlimited
803 """
804 import math
805 perf = performance or self.performance
806 wt = walltime or self.walltime
807
808 if wt is None or wt == float('inf'):
809 return 1
810
811 if perf is None:
812 raise ValueError("No performance data available. Run get_status()?")
813
814 return int(math.ceil(runtime/(perf*0.99*wt/24.)))
815
816 - def waitfor(self, dirname, **kwargs):
817 """Wait until the job associated with *dirname* is done.
818
819 Super-primitive, uses a simple while ... sleep for *seconds* delay
820
821 :Arguments:
822 *dirname*
823 look for log files under the remote dir corresponding to *dirname*
824 *seconds*
825 delay in *seconds* during re-polling
826 """
827 import sys
828 import time
829 delta_seconds = kwargs.pop('seconds', 120)
830 kwargs.setdefault('silent', True)
831 totseconds = 0
832 while not self.job_done(dirname, **kwargs):
833 sys.stderr.write("%4d min ... %s still running\r" % (totseconds/60, dirname))
834 time.sleep(delta_seconds)
835 totseconds += delta_seconds
836 sys.stderr.write('\n')
837
838
839
840
842 """Set up position restraints run and transfer to host.
843
844 *kwargs* are passed to :func:`gromacs.setup.MD_restrained`
845
846 """
847
848 dirname = 'MD_POSRES'
849 struct = self.local_get('em','em.pdb')
850 gromacs.setup.MD_restrained(dirname=dirname, struct=struct,
851 qscript=self.qscript, qname=self.prefix+'pr',
852 **kwargs)
853 self.put(dirname)
854 self.logger.info("Run %s on %s in %s/%s" % (dirname, self.hostname, self.uri, dirname))
855 self.logger.info(">> qsub('%s')", dirname)
856 return dirname
857
858 - def setup_MD(self, jobnumber, struct=os.path.join('MD_POSRES', 'md.pdb'), **kwargs):
859 """Set up production and transfer to host.
860
861 :Arguments:
862 - *jobnumber*: 1,2 ...
863 - *struct* is the starting structure (default from POSRES
864 run but that is just a guess);
865 - kwargs are passed to :func:`gromacs.setup.MD`
866 """
867 kwargs.setdefault('runtime', 1e4)
868
869 jobid_s = '%(jobnumber)03d' % vars()
870 dirname = 'MD_'+jobid_s
871 structure = self.local_get(os.path.dirname(struct), os.path.basename(struct))
872
873 gromacs.setup.MD(dirname=dirname, struct=structure, qscript=self.qscript,
874 qname=self.prefix+jobid_s,
875 **kwargs)
876 self.put(dirname)
877 self.logger.info("Run %s on %s in %s/%s" % (dirname, self.hostname, self.uri, dirname))
878 self.logger.info("Or use %s.qsub(%r)" % (self.__class__.__name__, dirname))
879
880 return dirname
881