Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Add the Horn-Schunck algorithm (#5333)
* Added implementation of the Horn-Schunck algorithm * Cleaner variable names * added doctests * Fix doctest * Update horn_schunck.py * Update horn_schunck.py * Update horn_schunck.py * Update horn_schunck.py Co-authored-by: John Law <[email protected]>
- Loading branch information
Showing
1 changed file
with
130 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| @@ -0,0 +1,130 @@ | ||
| """ | ||
| The Horn-Schunck method estimates the optical flow for every single pixel of | ||
| a sequence of images. | ||
| It works by assuming brightness constancy between two consecutive frames | ||
| and smoothness in the optical flow. | ||
| Useful resources: | ||
| Wikipedia: https://en.wikipedia.org/wiki/Horn%E2%80%93Schunck_method | ||
| Paper: http://image.diku.dk/imagecanon/material/HornSchunckOptical_Flow.pdf | ||
| """ | ||
|
|
||
| import numpy as np | ||
| from scipy.ndimage.filters import convolve | ||
| from typing_extensions import SupportsIndex | ||
|
|
||
|
|
||
| def warp( | ||
| image: np.ndarray, horizontal_flow: np.ndarray, vertical_flow: np.ndarray | ||
| ) -> np.ndarray: | ||
| """ | ||
| Warps the pixels of an image into a new image using the horizontal and vertical | ||
| flows. | ||
| Pixels that are warped from an invalid location are set to 0. | ||
| Parameters: | ||
| image: Grayscale image | ||
| horizontal_flow: Horizontal flow | ||
| vertical_flow: Vertical flow | ||
| Returns: Warped image | ||
| >>> warp(np.array([[0, 1, 2], [0, 3, 0], [2, 2, 2]]), \ | ||
| np.array([[0, 1, -1], [-1, 0, 0], [1, 1, 1]]), \ | ||
| np.array([[0, 0, 0], [0, 1, 0], [0, 0, 1]])) | ||
| array([[0, 0, 0], | ||
| [3, 1, 0], | ||
| [0, 2, 3]]) | ||
| """ | ||
| flow = np.stack((horizontal_flow, vertical_flow), 2) | ||
|
|
||
| # Create a grid of all pixel coordinates and subtract the flow to get the | ||
| # target pixels coordinates | ||
| grid = np.stack( | ||
| np.meshgrid(np.arange(0, image.shape[1]), np.arange(0, image.shape[0])), 2 | ||
| ) | ||
| grid = np.round(grid - flow).astype(np.int32) | ||
|
|
||
| # Find the locations outside of the original image | ||
| invalid = (grid < 0) | (grid >= np.array([image.shape[1], image.shape[0]])) | ||
| grid[invalid] = 0 | ||
|
|
||
| warped = image[grid[:, :, 1], grid[:, :, 0]] | ||
|
|
||
| # Set pixels at invalid locations to 0 | ||
| warped[invalid[:, :, 0] | invalid[:, :, 1]] = 0 | ||
|
|
||
| return warped | ||
|
|
||
|
|
||
| def horn_schunck( | ||
| image0: np.ndarray, | ||
| image1: np.ndarray, | ||
| num_iter: SupportsIndex, | ||
| alpha: float | None = None, | ||
| ) -> tuple[np.ndarray, np.ndarray]: | ||
| """ | ||
| This function performs the Horn-Schunck algorithm and returns the estimated | ||
| optical flow. It is assumed that the input images are grayscale and | ||
| normalized to be in [0, 1]. | ||
| Parameters: | ||
| image0: First image of the sequence | ||
| image1: Second image of the sequence | ||
| alpha: Regularization constant | ||
| num_iter: Number of iterations performed | ||
| Returns: estimated horizontal & vertical flow | ||
| >>> np.round(horn_schunck(np.array([[0, 0, 2], [0, 0, 2]]), \ | ||
| np.array([[0, 2, 0], [0, 2, 0]]), alpha=0.1, num_iter=110)).\ | ||
| astype(np.int32) | ||
| array([[[ 0, -1, -1], | ||
| [ 0, -1, -1]], | ||
| <BLANKLINE> | ||
| [[ 0, 0, 0], | ||
| [ 0, 0, 0]]], dtype=int32) | ||
| """ | ||
| if alpha is None: | ||
| alpha = 0.1 | ||
|
|
||
| # Initialize flow | ||
| horizontal_flow = np.zeros_like(image0) | ||
| vertical_flow = np.zeros_like(image0) | ||
|
|
||
| # Prepare kernels for the calculation of the derivatives and the average velocity | ||
| kernel_x = np.array([[-1, 1], [-1, 1]]) * 0.25 | ||
| kernel_y = np.array([[-1, -1], [1, 1]]) * 0.25 | ||
| kernel_t = np.array([[1, 1], [1, 1]]) * 0.25 | ||
| kernel_laplacian = np.array( | ||
| [[1 / 12, 1 / 6, 1 / 12], [1 / 6, 0, 1 / 6], [1 / 12, 1 / 6, 1 / 12]] | ||
| ) | ||
|
|
||
| # Iteratively refine the flow | ||
| for _ in range(num_iter): | ||
| warped_image = warp(image0, horizontal_flow, vertical_flow) | ||
| derivative_x = convolve(warped_image, kernel_x) + convolve(image1, kernel_x) | ||
| derivative_y = convolve(warped_image, kernel_y) + convolve(image1, kernel_y) | ||
| derivative_t = convolve(warped_image, kernel_t) + convolve(image1, -kernel_t) | ||
|
|
||
| avg_horizontal_velocity = convolve(horizontal_flow, kernel_laplacian) | ||
| avg_vertical_velocity = convolve(vertical_flow, kernel_laplacian) | ||
|
|
||
| # This updates the flow as proposed in the paper (Step 12) | ||
| update = ( | ||
| derivative_x * avg_horizontal_velocity | ||
| + derivative_y * avg_vertical_velocity | ||
| + derivative_t | ||
| ) | ||
| update = update / (alpha**2 + derivative_x**2 + derivative_y**2) | ||
|
|
||
| horizontal_flow = avg_horizontal_velocity - derivative_x * update | ||
| vertical_flow = avg_vertical_velocity - derivative_y * update | ||
|
|
||
| return horizontal_flow, vertical_flow | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| import doctest | ||
|
|
||
| doctest.testmod() |

Formed in 2009, the Archive Team (not to be confused with the archive.org Archive-It Team) is a rogue archivist collective dedicated to saving copies of rapidly dying or deleted websites for the sake of history and digital heritage. The group is 100% composed of volunteers and interested parties, and has expanded into a large amount of related projects for saving online and digital history.
