import numpy as np
def _pair_name(filename):
parts = filename.split('.')
parts[-2] += '_dx'
dx_filename = '.'.join(parts)
parts[-2] = parts[-2][:-1] + 'y'
dy_filename = '.'.join(parts)
return dx_filename, dy_filename
[docs]def read_flow(flow_or_path, quantize=False, *args, **kwargs):
"""Read an optical flow map
Args:
flow_or_path(ndarray or str): either a flow map or path of a flow
quantize(bool): whether to read quantized pair, if set to True,
remaining args will be passed to :func:`dequantize_flow`
Returns:
ndarray: optical flow
"""
if isinstance(flow_or_path, np.ndarray):
if (flow_or_path.ndim != 3) or (flow_or_path.shape[-1] != 2):
raise ValueError(
'Invalid flow with shape {}'.format(flow_or_path.shape))
return flow_or_path
elif not isinstance(flow_or_path, str):
raise TypeError(
'"flow_or_path" must be a filename or numpy array, not {}'.format(
type(flow_or_path)))
if not quantize:
with open(flow_or_path, 'rb') as f:
try:
header = f.read(4).decode('utf-8')
except:
raise IOError('Invalid flow file: {}'.format(flow_or_path))
else:
if header != 'PIEH':
raise IOError(
'Invalid flow file: {}, header does not contain PIEH'.
format(flow_or_path))
w = np.fromfile(f, np.int32, 1).squeeze()
h = np.fromfile(f, np.int32, 1).squeeze()
flow = np.fromfile(f, np.float32, w * h * 2).reshape((h, w, 2))
else:
from cvbase.image import read_img
from cvbase.opencv import IMREAD_UNCHANGED
dx_filename, dy_filename = _pair_name(flow_or_path)
dx = read_img(dx_filename, flag=IMREAD_UNCHANGED)
dy = read_img(dy_filename, flag=IMREAD_UNCHANGED)
flow = dequantize_flow(dx, dy, *args, **kwargs)
return flow.astype(np.float32)
[docs]def write_flow(flow, filename, quantize=False, *args, **kwargs):
"""Write optical flow to file
Args:
flow(ndarray): optical flow
filename(str): file path
quantize(bool): whether to quantize the flow and save as 2 images,
if set to True, remaining args will be passed
to :func:`quantize_flow`
"""
if not quantize:
with open(filename, 'wb') as f:
f.write('PIEH'.encode('utf-8'))
np.array([flow.shape[1], flow.shape[0]], dtype=np.int32).tofile(f)
flow = flow.astype(np.float32)
flow.tofile(f)
f.flush()
else:
from cvbase.image import write_img
dx, dy = quantize_flow(flow, *args, **kwargs)
dx_filename, dy_filename = _pair_name(filename)
write_img(dx, dx_filename)
write_img(dy, dy_filename)
[docs]def quantize_flow(flow, max_val=0.02, norm=True):
"""Quantize flow to [0, 255] (much smaller size when dumping as images)
Args:
flow(ndarray): optical flow
max_val(float): maximum value of flow, values beyond
[-max_val, max_val] will be truncated.
norm(bool): whether to divide flow values by width/height
Returns:
tuple: quantized dx and dy
"""
h, w, _ = flow.shape
dx = flow[..., 0]
dy = flow[..., 1]
if norm:
dx = dx / w # avoid inplace operations
dy = dy / h
dx = np.maximum(0, np.minimum(dx + max_val, 2 * max_val))
dy = np.maximum(0, np.minimum(dy + max_val, 2 * max_val))
dx = np.round(dx * 255 / (max_val * 2)).astype(np.uint8)
dy = np.round(dy * 255 / (max_val * 2)).astype(np.uint8)
return dx, dy
[docs]def dequantize_flow(dx, dy, max_val=0.02, denorm=True):
"""Recover flow from quantized flow
Args:
dx(ndarray): quantized dx
dy(ndarray): quantized dy
max_val(float): maximum value used when quantizing.
denorm(bool): whether to multiply flow values with width/height
Returns:
tuple: dequantized dx and dy
"""
assert dx.shape == dy.shape
assert dx.ndim == 2 or (dx.ndim == 3 and dx.shape[-1] == 1)
dx = dx.astype(np.float32) * max_val * 2 / 255 - max_val
dy = dy.astype(np.float32) * max_val * 2 / 255 - max_val
if denorm:
dx *= dx.shape[1]
dy *= dx.shape[0]
flow = np.dstack((dx, dy))
return flow