1
2
3 """
4 Interface for communicating with and managing modeling through VIC3.
5
6 Author: R. Lombaert
7
8 """
9
10 import os
11 from scipy import log10
12 import subprocess
13 from time import gmtime, sleep
14
15 import cc.path
16 from cc.tools.io import DataIO
17 from cc.modeling.codes import Gastronoom
18
19
20
22
23 """
24 Creating a vic manager which communicates with the Vic3 supercomputer and
25 updates modeling results on the home disk.
26
27 """
28
29 - def __init__(self,account,path='code23-01-2010',credits_acc=None,\
30 time_per_sphinx=30,recover_sphinxfiles=0):
31
32 """
33 Initializing a Vic instance.
34
35 @param account: the name of the user account at VIC
36
37 (default: 'vsc30226')
38 @type account: string
39
40 @keyword path: The output folder in the GASTRoNOoM home folder
41
42 (default: 'codeSep2010')
43 @type path: string
44 @keyword credits_acc: the name of the credits account to be charged. If
45 one, the standard user account is charged.
46
47 (default: None)
48 @type credits_acc: string
49 @keyword time_per_sphinx: the expected calculation time for one sphinx
50 model in minutes
51
52 (default: 30)
53 @type time_per_sphinx: int
54 @keyword recover_sphinxfiles: Try to recover sphinx files from the Vic
55 disk in case they were correctly
56 calculated, but not saved to the database
57 for one reason or another.
58
59 (default: 0)
60 @type recover_sphinxfiles: bool
61
62 """
63
64 self.finished = dict()
65 self.failed = dict()
66 self.models = dict()
67 self.command_lists = dict()
68 self.transitions = dict()
69 self.sphinx_model_ids = dict()
70 self.inputfiles = dict()
71 self.trans_in_progress = []
72 self.path = path
73 self.account = account
74 self.disk = account[3:6]
75 self.uname = os.path.split(os.path.expanduser('~')+'/'.rstrip('/'))[-1]
76 self.time_per_sphinx = float(time_per_sphinx)
77 self.recover_sphinxfiles = recover_sphinxfiles
78 self.current_model = 0
79 if not credits_acc:
80 self.credits_acc = None
81 else:
82 self.credits_acc = credits_acc
83
84 main_vic_folder = os.path.join('/data','leuven',self.disk,\
85 self.account,'COCode')
86 subprocess.call('ssh %s@login.vic3.cc.kuleuven.be mkdir %s/output/'\
87 %(self.account,main_vic_folder),shell=True)
88 subprocess.call('ssh %s@login.vic3.cc.kuleuven.be mkdir %s'\
89 %(self.account,main_vic_folder) + '/dust_files/',\
90 shell=True)
91 subprocess.call('ssh %s@login.vic3.cc.kuleuven.be mkdir %s'\
92 %(self.account,main_vic_folder) +'/CustomAbundances/',\
93 shell=True)
94 subprocess.call('ssh %s@login.vic3.cc.kuleuven.be mkdir %s'\
95 %(self.account,main_vic_folder) +'/StarFiles/',\
96 shell=True)
97
98
100
101 '''
102 Set the Sphinx db for this Vic instance.
103
104 @param sph_db: The sphinx database
105 @type sph_db: Database()
106
107 '''
108
109 self.sph_db = sph_db
110
111
112
114
115 '''
116 Update telescope.spec files on VIC3.
117
118 '''
119
120 homespec = os.path.join(cc.path.gdata,'*spec')
121 vicspec = os.path.join('/user','leuven',self.disk,self.account,\
122 'COCode','data','.')
123 subprocess.call(['scp %s %s@login.vic3.cc.kuleuven.be:%s'\
124 %(homespec,self.account,vicspec)],shell=True)
125
126
127
128 - def addModel(self,model_id,command_list):
129
130 '''
131 Add model to the list of to be processed models.
132
133 Here, dictionaries are initiated to contain information about the
134 modeling session.
135
136 Every entry in the dictionaries have an index number associated with
137 them to uniquely identify a modeling session across the vic manager.
138
139 Dictionaries are kept for model ids, parameter sets, transitions,
140 failed calculations and finished calculations, and vic inputfiles for
141 every transition.
142
143 The current_model index is the same between a call to the addModel
144 method and the queueModel() OR the reset methods. Anything between uses
145 the same index. queueModel() will move to the next index value, while
146 reset() will reset the current index number.
147
148 @param model_id: The cooling model_id
149 @type model_id: string
150 @param command_list: The parameters for this GASTRoNOoM model
151 @type command_list: dict()
152
153 '''
154
155 if self.models.has_key(self.current_model):
156 raise Error('Vic().addModel() is trying to add a model_id ' + \
157 'to a session that was already assigned an id. ' + \
158 'Reset or queue the previous model first.')
159 self.models[self.current_model] = model_id
160 self.command_lists[self.current_model] = command_list
161 self.transitions[self.current_model] = []
162 self.failed[self.current_model] = []
163 self.finished[self.current_model] = []
164 self.inputfiles[self.current_model] = []
165
166
167
169
170 '''
171 Add a transition to the list of transitions in progress. They will be
172 checked at the end of the VIC run to see if they have been correctly
173 calculated.
174
175 This concerns transitions that are requested, but are already present
176 in the sphinx database with an "IN_PROGRESS" keyword included in the
177 transition dictionary. These will not be calculated again, instead they
178 are remembered and checked at the end of the full modeling run.
179
180 @param trans: The transition calculating on Vic3
181 @type trans: Transition()
182
183 '''
184
185 self.trans_in_progress.append(trans)
186
187
188
190
191 '''
192 Add a transition to the vic transition list.
193
194 These will be calculated on Vic3 and inputfiles will be prepared for
195 them.
196
197 The entries in the transitions dictionary are deleted when the entry
198 has been completely finished.
199
200 @param trans: The transition to be calculated on Vic3
201 @type trans: Transition()
202
203 '''
204
205 self.transitions[self.current_model].append(trans)
206
207
208
210
211 '''
212 Queue the current model on VIC3, which will cp mline and cooling output
213 file to VIC, run the shell script there, and create the necessary input
214 files.
215
216 Once everything has been started up, the current model index number is
217 increased by one to allow for a new model to be added. You cannot add a
218 model and then add another model unless you reset or queue the previous
219 model.
220
221 '''
222
223 self.sphinx_model_ids[self.current_model] \
224 = list(set([trans.getModelId()
225 for trans in self.transitions[self.current_model]]))
226 if not self.recover_sphinxfiles:
227 printing = self.makeJobFile()
228 self.makeInputFiles()
229 model_id = self.models[self.current_model]
230 jobfile = os.path.join('/user','leuven',self.disk,self.account,\
231 'COCode','vic_run_jobs_%s_%i.sh'\
232 %(model_id,self.current_model))
233 subprocess.Popen('ssh %s@login.vic3.cc.kuleuven.be %s'\
234 %(self.account,jobfile),shell=True,\
235 stdout=subprocess.PIPE,stderr=subprocess.STDOUT)
236 if printing: print '\n'.join(printing)
237 self.current_model += 1
238
239
240
242
243 '''
244 If a model has been added, and no transitions were required to be
245 calculated, remove that model entry here.
246
247 This only removes the self.models and self.transitions entries. The
248 other dictionaries are re-initiated anyway when adding a model.
249
250 '''
251
252 del self.models[self.current_model]
253 del self.transitions[self.current_model]
254
255
256
258
259 '''
260 Make the job file that will run the loop on VIC3 and copy the cooling
261 and mline output to VIC3.
262
263 @return: to be printed strings once all the copying is done, which
264 shows how many transitions are being calculated for which
265 sphinx model id
266 @rtype: list[string]
267
268 '''
269
270 model_id = self.models[self.current_model]
271 vic_server = '%s@login.vic3.cc.kuleuven.be'%self.account
272 jobfiles = []
273 printing = []
274 for model_id_sphinx in self.sphinx_model_ids[self.current_model]:
275 these_trans = [trans
276 for trans in self.transitions[self.current_model]
277 if trans.getModelId() == model_id_sphinx]
278 models_in_job = (int(log10(len(these_trans))))**2
279 if not models_in_job: models_in_job = 1
280 job_number = len(these_trans)%models_in_job==0.0 \
281 and len(these_trans)/models_in_job \
282 or int(len(these_trans)/models_in_job)+1
283
284 job_number = job_number/8+1
285 time_per_job = self.time_per_sphinx*models_in_job
286 walltimestring = '%.2i:00:00'%(int(time_per_job/60)+1)
287
288
289 jobfile = DataIO.readFile(os.path.join(cc.path.gastronoom,\
290 'vic_job_example.sh'))
291 new_jobfile = []
292 for line in jobfile:
293
294
295 if line.split('=')[0].find('#PBS -l walltime') != -1:
296 new_line = '='.join([line.split('=')[0],walltimestring])
297 elif line.split('=')[0].find('export COCODEHOME') != -1:
298 new_line = line.replace('vsc30226',self.account)\
299 .replace('/302/','/%s/'%self.disk)
300 elif line.split('=')[0].find('export COCODEDATA') != -1:
301 new_line = '='.join([line.split('=')[0],\
302 os.path.join(line.split('=')[1],model_id+'_'+\
303 str(self.current_model)+'/')])\
304 .replace('vsc30226',self.account)\
305 .replace('/302/','/%s/'%self.disk)
306 elif line.find('for i in $(seq 1 1)') != -1:
307 new_line = 'for i in $(seq 1 %i)'%models_in_job
308 elif line.split('=')[0].find('export MODELNUMBER') != -1:
309 new_line = '='.join([line.split('=')[0],line.split('=')[1]\
310 .replace('model_id',model_id_sphinx)])
311 else:
312 new_line = line
313 new_jobfile.append(new_line)
314
315
316 local_folder = os.path.join(cc.path.gastronoom,\
317 self.path,'models',model_id_sphinx)
318 jobfilename_vic = '/user/leuven/'+self.disk+'/' + self.account + \
319 '/COCode/vic_job_' + model_id_sphinx + '.sh'
320 jobfilename_local = os.path.join(local_folder,'vic_input',\
321 'vic_job_%s.sh'%model_id_sphinx)
322 DataIO.writeFile(jobfilename_local,new_jobfile)
323 subprocess.call(['chmod +x %s'%jobfilename_local],shell=True)
324 subprocess.call(['scp %s %s:%s'%(jobfilename_local,vic_server,\
325 jobfilename_vic)],shell=True)
326 jobfiles.append((jobfilename_vic,job_number))
327
328
329 vic_folder = '/data/leuven/%s/%s/COCode/output/%s/'\
330 %(self.disk,self.account,model_id_sphinx)
331 subprocess.call('ssh %s mkdir %s'%(vic_server,vic_folder),\
332 shell=True)
333
334
335 these_molecules = set(['sampling'] + \
336 [trans.molecule.molecule
337 for trans in these_trans])
338 to_be_copied = ['coolfgr*','input%s.dat'%model_id_sphinx]
339 to_be_copied.extend(['cool*_%s.dat'%molec
340 for molec in these_molecules])
341 to_be_copied.extend(['ml*_%s.dat'%molec
342 for molec in these_molecules
343 if molec != 'sampling'])
344 for filecopy in to_be_copied:
345 subprocess.call(['scp %s %s:%s.'\
346 %(os.path.join(local_folder,filecopy),\
347 vic_server,vic_folder)], shell=True)
348
349
350 printing.append('Running %i jobs with %i models each for ID %s.' \
351 %(job_number*7, models_in_job,model_id_sphinx))
352
353
354 runjobsfile = DataIO.readFile(os.path.join(cc.path.gastronoom,\
355 'vic_run_jobs_example.sh'))
356 new_runjobsfile = []
357 for i,job in enumerate(jobfiles):
358 for line in runjobsfile:
359 if line.find('#!/bin/bash -l') != -1 and i == 0:
360 new_line = line
361 elif line.find('cd COCode') != -1 and i == 0:
362 new_line = 'cd /user/leuven/'+self.disk+'/'+ self.account+\
363 '/COCode/'
364 elif line.find('module load worker') != -1 and i == 0:
365 new_line = line
366 elif line.find('for i in $(seq 1 1) ;') != -1:
367 new_line = 'for i in $(seq 1 %i) ;' %(job[1])
368 elif line.find('do wsub -A dummy -t 1-8 -batch') != -1:
369 new_line = line.replace('vic_job_example.sh',\
370 os.path.split(job[0])[1])
371 if not self.credits_acc is None:
372 new_line = new_line.replace('-A dummy',\
373 '-A %s'%self.credits_acc)
374 else:
375 new_line = new_line.replace('-A dummy ','')
376 elif line.find('done') != -1:
377 new_line = line
378 else:
379 new_line = ''
380 new_runjobsfile.append(new_line)
381
382
383 runjobsfilename_local = os.path.join(cc.path.gastronoom,self.path,\
384 'models',model_id,'vic_input',\
385 'vic_run_jobs_%s_%s.sh'\
386 %(model_id,\
387 str(self.current_model)))
388 runjobsfilename_vic = '/user/leuven/%s/%s/COCode/vic_run_jobs_%s_%s.sh'\
389 %(self.disk,self.account,model_id,\
390 str(self.current_model))
391 DataIO.writeFile(runjobsfilename_local,new_runjobsfile)
392 subprocess.call(['chmod +x %s'%runjobsfilename_local],shell=True)
393 subprocess.call(['scp %s %s:%s'\
394 %(runjobsfilename_local,vic_server,\
395 runjobsfilename_vic)],shell=True)
396 return printing
397
398
399
534
535
536
538
539 '''
540 Finalize a modeling procedure on VIC: successful and failed results
541 are printed to a file, including the transitions.
542
543 This log file can be used as input for ComboCode again by putting
544 LINE_LISTS=2.
545
546 '''
547
548 for trans in self.trans_in_progress:
549 filename = os.path.join(cc.path.gastronoom,\
550 self.path,'models',trans.getModelId(),\
551 trans.makeSphinxFilename(2))
552 if not os.path.isfile(filename):
553 trans.setModelId('')
554 if self.models.keys():
555 time_stamp = '%.4i-%.2i-%.2ih%.2i:%.2i:%.2i' \
556 %(gmtime()[0],gmtime()[1],gmtime()[2],\
557 gmtime()[3],gmtime()[4],gmtime()[5])
558 results = ['# Successfully calculated models:'] \
559 + [self.models[current_model]
560 for current_model in self.models.keys()
561 if current_model not in self.failed.keys()] \
562 + ['# Unsuccessfully calculated models (see 3 logfiles '+ \
563 'for these models):'] \
564 + [self.models[current_model]
565 for current_model in self.models.keys()
566 if current_model in self.failed.keys()]
567 DataIO.writeFile(os.path.join(cc.path.gastronoom,self.path,\
568 'vic_results','log_' + time_stamp),\
569 results)
570 for current_model,model_id in self.models.items():
571 model_results = ['# Successfully calculated transitions:'] + \
572 ['Sphinx %s: %s' %(trans.getModelId(),str(trans))
573 for trans in self.finished[current_model]] + \
574 ['# Unsuccessfully calculated transitions (see 2 other ' + \
575 'logfiles for these transitions):'] + \
576 ['Sphinx %s: %s' %(trans.getModelId(),str(trans))
577 for trans in self.failed[current_model]]
578 DataIO.writeFile(os.path.join(cc.path.gastronoom,self.path,\
579 'vic_results','log_results%s_%i'\
580 %(time_stamp,current_model)),\
581 model_results)
582 for this_id in self.sphinx_model_ids[current_model]:
583 sphinx_files = os.path.join(cc.path.gastronoom,self.path,\
584 'models',this_id,'sph*')
585 subprocess.call(['chmod a+r %s'%sphinx_files],shell=True)
586
587
588
590
591 '''
592 Checks progress on all queued model_ids by checking if the output has
593 been copied to the local disk from VIC3.
594
595 In this method, the self.failed and self.finished keywords are updated,
596 and the self.inputfiles and self.transitions is cleaned up on the go.
597
598 @keyword wait_qstat: wait 10 seconds before checking the qstat query on
599 vic, in order to make sure that the qeueu command
600 on vic is finished queueing up the new models, as
601 this may be slower than the python script. Use
602 this for progress check after just queueing a new
603 model
604
605 (default: 0)
606 @type wait_qstat: bool
607
608 @return: Have all models been finished on Vic3?
609 @rtype: bool
610
611 '''
612
613
614
615 for current_model in self.transitions.keys():
616 model_id = self.models[current_model]
617 print 'Currently checking %s...' %model_id
618 lsprocess = subprocess.Popen('ssh %s@login.vic3.cc.kuleuven.be '\
619 %self.account + \
620 'ls /data/leuven/%s/%s/COCode/%s_%i/'\
621 %(self.disk,self.account,model_id,\
622 current_model),\
623 shell=True,stdout=subprocess.PIPE)
624 lsfile = lsprocess.communicate()[0].split('\n')
625 lsfile = [f
626 for f in lsfile
627 if f[-9:] != '.inp.done' and f != 'jobs.log' and f]
628 if wait_qstat:
629 sleep(10)
630 qstatprocess = subprocess.Popen(\
631 'ssh %s@login.vic3.cc.kuleuven.be qstat | grep %s'\
632 %(self.account,self.account),shell=True,\
633 stdout=subprocess.PIPE)
634 qstatfile = qstatprocess.communicate()[0].split('\n')
635 if not lsfile or qstatfile == ['']:
636 for model_id_sphinx in self.sphinx_model_ids[current_model]:
637 trans_strings = [trans.makeSphinxFilename()
638 for trans in self.transitions\
639 [current_model]
640 if trans.getModelId() == model_id_sphinx]
641 heresph = os.path.join(cc.path.gastronoom,self.path,\
642 'models',model_id_sphinx)
643 for trans_string in trans_strings:
644 vicsph = '/data/leuven/%s/%s/COCode/output/%s/%s'\
645 %(self.disk,self.account,model_id_sphinx,\
646 trans_string)
647 subprocess.call('scp %s@login.vic3.cc.kuleuven.be:%s '\
648 %(self.account,vicsph)+'%s/.'%heresph,\
649 shell=True,stdout=subprocess.PIPE,\
650 stderr=subprocess.PIPE)
651 vicjob = '/data/leuven/%s/%s/COCode/%s_%i/jobs.log'\
652 %(self.disk,self.account,model_id,current_model)
653 subprocess.call('scp %s@login.vic3.cc.kuleuven.be:%s '\
654 %(self.account,vicjob) + \
655 '%s/log_vic_jobs'%(heresph),\
656 shell=True,stdout=subprocess.PIPE)
657 gas_session = Gastronoom.Gastronoom(\
658 path_gastronoom=self.path,\
659 sph_db=self.sph_db)
660 gas_session.model_id = model_id
661 gas_session.trans_list \
662 = [trans
663 for trans in self.transitions[current_model]
664 if trans.getModelId() == model_id_sphinx]
665 for trans in gas_session.trans_list:
666 gas_session.checkSphinxOutput(trans)
667 gas_session.finalizeSphinx()
668 self.finished[current_model] \
669 = [trans
670 for trans in gas_session.trans_list
671 if trans.getModelId()]
672 self.failed[current_model] \
673 = [trans
674 for trans in gas_session.trans_list
675 if not trans.getModelId()]
676 vicjob = '/user/leuven/%s/%s/COCode/vic_job_%s.sh*'\
677 %(self.disk,self.account,model_id_sphinx)
678 subprocess.call(['ssh %s@login.vic3.cc.kuleuven.be rm %s'\
679 %(self.account,vicjob)],shell=True)
680 del self.transitions[current_model]
681 while self.inputfiles[current_model]:
682 subprocess.call([' '.join(\
683 ['ssh','%s@login.vic3.cc.kuleuven.be'%self.account,\
684 'rm'] + \
685 [self.inputfiles[current_model].pop()
686 for i in range(len(self.inputfiles[current_model]) \
687 < 100\
688 and len(self.inputfiles\
689 [current_model])\
690 or 100)])],\
691 shell=True,stdout=subprocess.PIPE)
692 vicjobfile = '/user/leuven/%s/%s/COCode/vic_run_jobs_%s_%i.sh'\
693 %(self.disk,self.account,model_id,current_model)
694 subprocess.call(['ssh %s@login.vic3.cc.kuleuven.be rm %s'\
695 %(self.account,vicjobfile)],shell=True)
696 self.sph_db.sync()
697 if self.transitions:
698 return True
699 else:
700 return False
701
702
703
705
706 '''
707 Get a list of unique queue number + cooling model_id for those models
708 that are still in progress on VIC.
709
710 @return: The queue numbers and model_ids still in progress.
711 @rtype: list[(int,string)]
712
713 '''
714
715 return [(k,v)
716 for k,v in self.models.items()
717 if self.transitions.has_key(k)]
718