-
Notifications
You must be signed in to change notification settings - Fork 1
/
process.py
478 lines (437 loc) · 17.4 KB
/
process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright David Manthey
#
# Licensed under the Apache License, Version 2.0 ( the "License" ); you may
# not use this file except in compliance with the License. You may obtain a
# copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Process yml and md files into results using the ballistics code.
See help for details.
"""
import copy
import functools
import json
import multiprocessing
import os
import pprint
import psutil
import signal
import sys
import time
import yaml
import ballistics
Pool = None
class FloatList:
"""
A special list for rendering floats in JSON with predictable precision
and in a controlled way.
"""
def __init__(self, newList=None, formatString='%0.10g'):
"""
Create a FloatList.
Enter: newList: array for the list.
formatString: default format string for floats.
"""
self.list = list(newList)
self.formatString = formatString
def __repr__(self):
"""
Return a formatted float list.
"""
return '[' + ','.join([
self.formatString % val if isinstance(val, float) else repr(val)
for val in self.list]) + ']'
class FloatEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, FloatList):
return repr(o)
try:
return super(FloatEncoder, self).default(o)
except TypeError:
print('Can\'t encode: %r' % o)
raise
class SafeLineLoader(yaml.loader.SafeLoader):
"""
Record the line number of any dictionary parsed from yaml. See
https://stackoverflow.com/questions/13319067
"""
def construct_mapping(self, node, deep=False):
mapping = super(SafeLineLoader, self).construct_mapping(node, deep=deep)
mapping['__line__'] = node.start_mark.line + 1
return mapping
def add_path_to_files(path, files):
"""
Add a file or directory of files to the input file list.
Enter: path: a relative or absolute path to a yml file or a directory of
yml files.
files: a list of files to process; updated.
"""
path = os.path.abspath(os.path.expanduser(path))
if not os.path.exists(path):
raise Exception('Input path %s does not exist' % path)
if os.path.isdir(path):
dirfiles = [os.path.abspath(os.path.join(path, file))
for file in os.listdir(path)
if os.path.splitext(file)[1] == '.yml']
dirfiles = sorted(dirfiles)
files.extend(dirfiles)
else:
files.append(path)
def calculate_case(hashval, args, info, verbose):
"""
Process an individual case.
Enter: hashval: hash value used to memoize results.
args: arguments formulated for the ballistics routines.
info: info that was used to construct the arguments.
verbose: verbosity for the ballistics program
Exit: hashval: the input hash value.
state: final state from the ballistics routines.
points: time series of trajectory.
"""
if verbose >= 3:
pprint.pprint(info)
if verbose >= 3:
print(hashval)
ballistics.Verbose = max(0, verbose - 2)
params, state, help = ballistics.parse_arguments(
args, allowUnknownParams=True)
ballistics.Verbose = max(0, verbose - 2)
if verbose >= 4:
pprint.pprint(state)
starttime = ballistics.get_cpu_time()
newstate, points = ballistics.find_unknown(state, params['unknown'], params.get('unknown_scan'))
newstate['computation_time'] = ballistics.get_cpu_time()-starttime
for key, technique in [
('power_factor', 'given'),
('initial_velocity', 'given_velocity'),
('pendulum_length', 'pendulum'),
('chamber_volume', 'pressure'),
('final_velocity', 'chronograph'),
('rising_height', 'trajectory'),
('range', 'range'),
('final_time', 'time'),
('max_height', 'height'),
('final_angle', 'final_angle'),
]:
if newstate.get('technique') is None and key in state:
newstate['technique'] = technique
if verbose >= 3:
pprint.pprint(newstate)
if len(points) > 0:
subset = 1 if len(points) < 50 else (
2 if len(points) < 100 else (5 if len(points) < 250 else 10))
points = points[:-1:subset] + points[-1:]
points = {key: FloatList([
point.get(key) for point in points], '%.6g')
for key in points[0]}
else:
points = None
if verbose >= 2:
if newstate.get('power_factor') is None:
print('%s --> FAILED' % (hashval, ))
else:
print('%s --> %3.1f' % (hashval, newstate.get('power_factor')))
return hashval, newstate, points
def calculate_cases(results, cases, verbose, pool=None):
"""
Given a set of cases, generate results for each, possibly using
multiprocessing.
Enter: results: array to store results.
cases: cases to process:
verbose: verbosity for the ballistics program.
pool: if not None, use this multiprocessing pool.
Exit: success: False for cancelled
"""
hashes = [item[-1] for item in sorted([(cases[hashval]['position'], hashval)
for hashval in cases])]
if pool is None:
left = len(hashes)
for hashval in hashes:
hashval, state, points = calculate_case(
hashval, cases[hashval]['args'], cases[hashval]['info'], verbose)
calculate_cases_results(hashval, state, points, results)
left -= 1
if verbose >= 1:
sys.stdout.write(' %d/%d left %s' % (
left, len(hashes), '\n' if verbose >= 2 else '\r'))
sys.stdout.flush()
else:
tasks = []
for hashval in hashes:
tasks.append(pool.apply_async(calculate_case, (
hashval, cases[hashval]['args'], cases[hashval]['info'], verbose)))
while len(tasks):
lentasks = len(tasks)
for pos in range(len(tasks) - 1, -1, -1):
task = tasks[pos]
if task.ready():
hashval, state, points = task.get()
calculate_cases_results(hashval, state, points, results)
del tasks[pos]
if verbose >= 1 and len(tasks) < lentasks:
sys.stdout.write(' %d/%d left %s' % (
len(tasks), len(hashes), '\n' if verbose >= 2 else '\r'))
sys.stdout.flush()
if len(tasks):
tasks[0].wait(0.1)
return True
def calculate_cases_results(hashval, state, points, results):
"""
Store results from a processed case.
Enter: hash: hash of the case.
state: final state of the case.
points: trajectory points of the case.
results: array to store results.
"""
for res in results['results']:
if res.get('hash') == hashval:
res['results'] = state
res['points'] = points
del res['hash']
def get_multiprocess_pool(multi, verbose=0):
"""
Get a multiprocessing pool.
Enter: multi: the number of processors to use to True to use all logical
processors. Since the process tends to be computation-bound,
using hyperthreading is not particularly advantageous.
verbose: verbosity for the ballistics program
Exit: pool: a multiprocess pool.
"""
poolsize = psutil.cpu_count(True) if multi is True else multi
pool = multiprocessing.Pool(processes=poolsize, initializer=worker_init)
priorityLevel = (psutil.BELOW_NORMAL_PRIORITY_CLASS
if sys.platform == 'win32' else 10)
parent = psutil.Process()
parent.nice(priorityLevel)
for child in parent.children():
child.nice(priorityLevel)
if verbose >= 2:
print('Running with a process pool of %d' % poolsize)
return pool
def process_cases(info, results, cases, verbose=0, nextcaseindex=0, extraArgs=None):
"""
Check if there are any data entries in the current level of the info.
If so, process each entry in turn. If not, calculate the ballistics and
store the results.
Enter: info: dictionary of information. If a data key is present, that
is a list of sub-cases, each of which will be processed in
turn.
results: a list to append results to.
cases: a dictionary to collect cases in
verbose: verbosity for the ballistics program
nextcaseindex: the index of the next case that will be generated.
extraArgs: extra arguments to use in all calculations.
Exit: nextcaseindex: the index of the next case that will be generated.
"""
if info.get('skip'):
return nextcaseindex
info = copy.deepcopy(info)
if isinstance(info.get('data'), list):
data = info.pop('data')
for entry in data:
subinfo = copy.deepcopy(info)
subinfo.update(entry)
nextcaseindex = process_cases(subinfo, results, cases, verbose,
nextcaseindex, extraArgs)
return nextcaseindex
infokey = info['key']
line = info.pop('__line__', None)
for key in ('key', 'details', 'link', 'summary', 'cms'):
info.pop(key, None)
args = []
if not max([info[key] == '?' for key in info]):
args.append('--power=?')
args.extend(sorted([
'--%s=%s' % (key, info[key]) for key in info if key not in (
'date', 'ref', 'ref2', 'ref3', 'desc', 'desc2', 'desc3',
'technique', 'group') and
not key.endswith('_note')]))
if extraArgs:
args.extend(extraArgs)
hashval = ' '.join([('"%s"' if ' ' in arg else '%s') % arg for arg in args])
if hashval not in cases:
cases[hashval] = {'info': info, 'args': args, 'position': len(cases),
'hash': hashval}
results.append({'conditions': info, 'hash': hashval, 'key': infokey,
'idx': nextcaseindex})
if line is not None:
results[-1]['sourceline'] = line
nextcaseindex += 1
return nextcaseindex
def read_and_process_file(srcfile, outputPath, all=False, verbose=0,
pool=None, reverse=False, extraArgs=None):
"""
Load a yaml file and any companion files. For each non-skipped data set,
calculate the ballistics result. Output the results as a json file with
the same base name and .json extension. Don't calculate the file if there
is already a results file that is newer than the source file(s) unless the
all flag is set.
Enter: srcfile: path of the yml file to load.
outputPath: directory where the results will be stored.
all: True to process regardless of results time.
verbose: verbosity for the ballistics program.
pool: if not None, use this multiprocessing pool.
reverse: if True, calculate the cases in the file in reverse order.
extraArgs: extra arguments to use in all calculations.
"""
srcdate = os.path.getmtime(srcfile)
info = yaml.load(open(srcfile), Loader=SafeLineLoader)
if info.get('skip'):
return
basename = os.path.splitext(os.path.basename(srcfile))[0]
basepath = os.path.dirname(srcfile)
companionFiles = [os.path.join(basepath, file)
for file in os.listdir(basepath)
if os.path.splitext(file)[0] == basename]
srcdate = max(srcdate, max([os.path.getmtime(file)
for file in companionFiles]))
destpath = os.path.join(outputPath, basename + '.json')
if (os.path.exists(destpath) and not all and
os.path.getmtime(destpath) > srcdate):
return
for file in companionFiles:
ext = os.path.splitext(file)[1]
if ext == '.md':
info['details'] = open(file).read()
elif ext == '.yml':
pass # our source file.
else:
raise Exception('Unknown companion file %s\n' % file)
if verbose >= 1:
print(srcfile)
results = copy.deepcopy(info)
results['results'] = []
results['version'] = ballistics.__version__
cases = {}
process_cases(info, results['results'], cases, verbose, extraArgs=extraArgs)
if reverse:
for hashval in cases:
cases[hashval]['position'] *= -1
if calculate_cases(results, cases, verbose, pool):
json.dump(results, open(destpath, 'wt'), sort_keys=True, indent=1,
separators=(',', ': '), cls=FloatEncoder)
def worker_init():
"""Supress the ctrl-c signal in the worker processes."""
signal.signal(signal.SIGINT, signal.SIG_IGN)
if __name__ == '__main__': # noqa - mccabe
files = []
allFiles = False
extraArgs = []
multi = False
multiFile = False
outputPath = 'results'
reverse = False
timeLimit = None
timeLimitFile = None
verbose = 0
help = False
for arg in sys.argv[1:]:
if arg == '--all':
allFiles = True
elif arg.startswith('--arg='):
extraArgs.append('--' + arg.split('=', 1)[1])
elif arg.startswith('--multi'):
multi = True
if '=' in arg:
multi = int(arg.split('=', 1)[1])
if multi <= 1:
multi = False
if 'multicase' in arg:
multiFile = False
if 'multifile' in arg:
multiFile = True
elif arg.startswith('--limit='):
timeLimit = float(arg.split('=', 1)[1].split(',')[0])
if ',' in arg:
timeLimitFile = arg.split(',', 1)[1]
elif arg.startswith('--out='):
outputPath = os.path.abspath(os.path.expanduser(
arg.split('=', 1)[1]))
elif arg == '--reverse':
reverse = True
elif arg == '-v':
verbose += 1
elif arg.startswith('-'):
help = True
else:
add_path_to_files(arg, files)
if not len(files):
add_path_to_files('data', files)
if (help or not len(files) or not outputPath or
not os.path.isdir(outputPath)):
print("""Process yml and md files using the ballistics code.
Syntax: process.py --out=(path) --all --reverse -v --limit=(seconds)[,(path)]
--multi|--multifile|--multicase[=(number of processes)]
--arg=(key)=(value) (input files ...)
If the input files are a directory, all yml files in that path are processed.
Only files newer than the matching results are processed unless the --all flag
is used. The default for input files is 'data'.
--all processes files even if they appear up to date.
--arg specifies extra arguments to pass to all calculations (e.g.,
--arg=time_delta=0.005).
--limit doesn't start processing a file if the time limit has been exceeded.
Files that are started are still finished. If multiprocessing per file, this
will exit more promptly. If a path is specified and the process runs out of
time, an file is created at that path.
--multi runs parallel processes. This uses the number of processors available
unless a number is specified. --multifile runs a process per input file,
--multicase runs a process per ballistics case.
--out specifies an output directory, which must exist. Default is 'results'.
--reverse calculates the last conditions in a file first. The output is
identical to the forward calculation.
-v increase verbosity.
""")
sys.exit(0)
starttime = time.time()
if multi:
pool = get_multiprocess_pool(multi, verbose)
else:
pool = None
reachedTimeLimit = False
try:
if not multi or not multiFile:
for file in files:
if timeLimit and time.time() - starttime > timeLimit:
reachedTimeLimit = True
print('Cancelled due to time limit')
break
read_and_process_file(file, outputPath, allFiles, verbose,
pool, reverse=reverse, extraArgs=extraArgs)
else:
mapfunc = functools.partial(read_and_process_file, *[], **{
'outputPath': outputPath,
'all': allFiles,
'verbose': verbose,
'reverse': reverse,
extraArgs: extraArgs
})
task = pool.map_async(mapfunc, files, 1)
while not task.ready():
if timeLimit and time.time() - starttime > timeLimit:
reachedTimeLimit = True
print('Cancelled due to time limit')
pool.terminate()
break
task.wait(1)
pool.close()
pool.join()
except KeyboardInterrupt:
if pool:
try:
pool.terminate()
pool.join()
except Exception:
pass
print('Cancelled via keyboard interrupt')
if timeLimitFile and reachedTimeLimit:
with open(timeLimitFile, 'a'):
os.utime(timeLimitFile, None)
if verbose >= 1:
print('Total computation time: %4.2f s' % (time.time() - starttime))