import collections
import sys
from multiprocessing import Pool
from cvbase.timer import Timer
[docs]class ProgressBar(object):
"""A progress bar which can print the progress"""
def __init__(self, task_num=0, bar_width=50, start=True):
self.task_num = task_num
max_bar_width = self._get_max_bar_width()
self.bar_width = (bar_width
if bar_width <= max_bar_width else max_bar_width)
self.completed = 0
if start:
self.start()
def _get_max_bar_width(self):
if sys.version_info > (3, 3):
from shutil import get_terminal_size
else:
from backports.shutil_get_terminal_size import get_terminal_size
terminal_width, _ = get_terminal_size()
max_bar_width = min(int(terminal_width * 0.6), terminal_width - 50)
if max_bar_width < 10:
print('terminal width is too small ({}), please consider '
'widen the terminal for better progressbar '
'visualization'.format(terminal_width))
max_bar_width = 10
return max_bar_width
def start(self):
if self.task_num > 0:
sys.stdout.write('[{}] 0/{}, elapsed: 0s, ETA:'.format(
' ' * self.bar_width, self.task_num))
else:
sys.stdout.write('completed: 0, elapsed: 0s')
sys.stdout.flush()
self.timer = Timer()
def update(self):
self.completed += 1
elapsed = self.timer.since_start()
fps = self.completed / elapsed
if self.task_num > 0:
percentage = self.completed / float(self.task_num)
eta = int(elapsed * (1 - percentage) / percentage + 0.5)
mark_width = int(self.bar_width * percentage)
bar_chars = '>' * mark_width + ' ' * (self.bar_width - mark_width)
sys.stdout.write(
'\r[{}] {}/{}, {:.1f} task/s, elapsed: {}s, ETA: {:5}s'.format(
bar_chars, self.completed, self.task_num, fps,
int(elapsed + 0.5), eta))
else:
sys.stdout.write('completed: {}, elapsed: {}s, {:.1f} tasks/s'.
format(self.completed, int(elapsed + 0.5), fps))
sys.stdout.flush()
[docs]def track_progress(func, tasks, bar_width=50, **kwargs):
"""Track the progress of tasks execution with a progress bar
Tasks are done with a simple for-loop.
Args:
func(callable): the function to be applied to each task
tasks(tuple of 2 or list): a list of tasks
bar_width(int): width of progress bar
Returns:
list: the results
"""
if isinstance(tasks, tuple):
assert len(tasks) == 2
assert isinstance(tasks[0], collections.Iterable)
assert isinstance(tasks[1], int)
task_num = tasks[1]
tasks = tasks[0]
elif isinstance(tasks, collections.Iterable):
task_num = len(tasks)
else:
raise TypeError(
'"tasks" must be an iterable object or a (iterator, int) tuple')
prog_bar = ProgressBar(task_num, bar_width)
results = []
for task in tasks:
results.append(func(task, **kwargs))
prog_bar.update()
sys.stdout.write('\n')
return results
def init_pool(process_num, initializer=None, initargs=None):
if initializer is None:
return Pool(process_num)
elif initargs is None:
return Pool(process_num, initializer)
else:
if not isinstance(initargs, tuple):
raise TypeError('"initargs" must be a tuple')
return Pool(process_num, initializer, initargs)
[docs]def track_parallel_progress(func,
tasks,
process_num,
initializer=None,
initargs=None,
bar_width=50,
chunksize=1,
skip_first=False,
keep_order=True):
"""Track the progress of parallel task execution with a progress bar
The built-in :mod:`multiprocessing` module is used for process pools and
tasks are done with :func:`Pool.map` or :func:`Pool.imap_unordered`.
Args:
func(callable): the function to be applied to each task
tasks(tuple of 2 or list): a list of tasks
process_num(int): the process(worker) number
initializer(None or callable): see :class:`multiprocessing.Pool` for details
initargs(None or tuple): see :class:`multiprocessing.Pool` for details
chunksize(int): see :class:`multiprocessing.Pool` for details
bar_width(int): width of progress bar
skip_first(bool): whether to skip the first sample when calculating fps
keep_order(bool): if True, :func:`Pool.imap` is used, otherwise :func:`Pool.imap_unordered` is used
Returns:
list: the results
"""
if isinstance(tasks, tuple):
assert len(tasks) == 2
assert isinstance(tasks[0], collections.Iterable)
assert isinstance(tasks[1], int)
task_num = tasks[1]
tasks = tasks[0]
elif isinstance(tasks, collections.Iterable):
task_num = len(tasks)
else:
raise TypeError(
'"tasks" must be an iterable object or a (iterator, int) tuple')
pool = init_pool(process_num, initializer, initargs)
start = not skip_first
task_num -= process_num * chunksize * int(skip_first)
prog_bar = ProgressBar(task_num, bar_width, start)
results = []
if keep_order:
gen = pool.imap(func, tasks, chunksize)
else:
gen = pool.imap_unordered(func, tasks, chunksize)
for result in gen:
results.append(result)
if skip_first:
if len(results) < process_num * chunksize:
continue
elif len(results) == process_num * chunksize:
prog_bar.start()
continue
prog_bar.update()
sys.stdout.write('\n')
pool.close()
pool.join()
return results