First pass
Add PEP484 type hints.
In your loops, you can pre-index some vectors for a minor speedup.
Prefer member .dot() over np.dot().
Prefer in-place add += rather than form x = x + ... for ndarray arguments.
Avoid native pow() in this context. np.square() is more appropriate.
Always set a known seed for predictable tests. Also: write tests! The tests I show give confidence that the results of matrix_factorization are exactly the same after refactoring.
A light refactor of your original code looks like
import numpy as np
def matrix_factorization(
R: np.ndarray, # rating matrix of a user with an item; 0 means prior
P: np.ndarray, # score on latent features to predict R (will be modified)
Q: np.ndarray, # score on latent features to predict R (will be modified)
K: int, # number of extra features
max_steps: int = 5_000,
alpha: float = 2e-4,
beta: float = 0.02,
epsilon: float = 1e-3,
) -> tuple[
np.ndarray, # P
np.ndarray, # Q
]:
Q = Q.T
for _ in range(max_steps):
for i in range(len(R)):
pi = P[i,:]
ri = R[i,:]
for j, rij in enumerate(ri):
qj = Q[:,j]
eij = 2*(rij - pi.dot(qj))
if rij > 0:
for k in range(K):
pi[k] += alpha*(eij*qj[k] - beta*pi[k])
qj[k] += alpha*(eij*pi[k] - beta*qj[k])
e = 0
for i in range(len(R)):
pi = P[i,:]
ri = R[i,:]
for j, rij in enumerate(ri):
qj = Q[:,j]
if rij > 0:
e += np.square(rij - pi.dot(qj)) + sum(
0.5*beta*(np.square(pi[k]) + np.square(qj[k]))
for k in range(K)
)
if e < epsilon:
break
return P, Q.T
def test() -> None:
R = np.array((
(5, 3, 0, 1),
(4, 0, 0, 1),
(1, 1, 0, 5),
(1, 0, 0, 4),
(0, 1, 5, 4),
))
N, M = R.shape
K = 2
rand = np.random.default_rng(seed=0)
P = rand.random((N, K))
Q = rand.random((M, K))
nP, nQ = matrix_factorization(R, P, Q, K)
assert np.allclose(
(
( 2.09499876, -0.47952543),
( 1.70134553, -0.29322251),
( 1.15021851, 2.01373816),
( 0.97941849, 1.58532621),
( 1.17893093, 1.51589598),
), nP, rtol=0, atol=1e-8,
)
assert np.allclose(
(
(2.20776869, -0.74542874),
(1.34473580, -0.31621481),
(2.03835722, 1.67576850),
(0.92082345, 1.93850648),
), nQ, rtol=0, atol=1e-8,
)
nR = nP @ nQ.T
assert np.allclose(
(
(4.98272469, 2.96885287, 3.46678224, 0.99956084),
(3.97475387, 2.38058154, 2.97657691, 0.99822514),
(1.0383181 , 0.90996617, 5.71911518, 4.96279265),
(0.98058175, 0.81575547, 4.65304446, 3.97503663),
(1.47281436, 1.10600186, 4.9433731 , 4.02416142),
), nR, rtol=0, atol=1e-8,
)
priors = R != 0
assert np.allclose(R[priors], nR[priors], atol=0, rtol=0.1)
if __name__ == '__main__':
test()
Algorithm
The quuxlabs guide you reference is... not good. It's typical of "academic" code: it writes out loops element-by-element without using the libraries properly. Also, it has a very bad termination condition; for your example data it always runs to the iteration limit and the e tolerance termination never fires.
It's probably possible to salvage that implementation if it were written in C and the termination condition improved. I don't go down that path. Instead, I show a generic least-squares approach. The quality of this approach hinges on a good Jacobian definition, and thankfully that's straightforward since the system is linear (but still requires some tricky Kronecker products).
This approach is fast. It converges in five iterations for input the same size as your original example.
It works especially well if you use the support for sparse Jacobians through the linear least-squares minres aka. LSMR backend of lsmr.
Suggested implementation
This includes benchmarks and tests:
import functools
import time
import typing
import numpy as np
import pandas as pd
import scipy.optimize
import scipy.sparse as ssp
from numpy.linalg import LinAlgError
def unpack(
decision: np.ndarray, mask_shape: tuple[int, int],
) -> tuple[
np.ndarray, np.ndarray, np.ndarray, # phat, qhat, rhat
]:
"""
Unpack the flat decision variable vector to P, Q, R matrices
"""
n, m = mask_shape
k = decision.size//(m + n)
phat, qhat = np.split(decision, (n*k,))
phat.resize((n, k))
qhat.resize((k, m))
return phat, qhat, phat @ qhat
def residual(
decision: np.ndarray, prior_mask: np.ndarray, alpha: float, beta: float,
P: np.ndarray, Q: np.ndarray, Rprior: np.ndarray,
) -> np.ndarray:
"""
Given a decision vector, return the residual vector for those decisions.
The residuals cover P error from original, Q error from original, and
(only for nonzero R aka. the mask of prior elements) R error from original.
"""
phat, qhat, rhat = unpack(decision=decision, mask_shape=prior_mask.shape)
eij = rhat[prior_mask] - Rprior
return np.concat((
alpha*eij,
beta*(phat - P).ravel(),
beta*(qhat - Q).ravel(),
))
def jacobian(
decision: np.ndarray, prior_mask: np.ndarray,
dpfactor: ssp.dia_array, dqfactor: ssp.dia_array, dpq_dpq: ssp.dia_array,
dense: bool,
) -> np.ndarray:
"""
Produce Jacobian matrix for a decision vector. The first dimension covers flattened
sizes for R residual, P residual and Q residual; the second dimension covers flattened
sizes for P decision and Q decision. The R residual is only defined for prior elements.
"""
phat, qhat, rhat = unpack(decision=decision, mask_shape=prior_mask.shape)
# Rerr by P+Q
dr_dpq = ssp.hstack(
blocks=(ssp.kron(dpfactor, qhat.T), ssp.kron(phat, dqfactor)),
format='csr',
)[prior_mask.ravel()]
grad = ssp.vstack(blocks=(dr_dpq, dpq_dpq), format='csr')
if dense:
return grad.toarray()
return grad
def make_sparsity(
prior_mask: np.ndarray, k: int,
) -> ssp.csr_array:
n, m = prior_mask.shape
dr_dpq = ssp.hstack(
blocks=(
ssp.kron(ssp.eye_array(n), np.ones((m, k))),
ssp.kron(np.ones((n, k)), ssp.eye_array(m)),
),
format='csr',
)[prior_mask.ravel()]
dpq_dpq = ssp.eye_array(n*k + k*m)
return ssp.vstack(blocks=(dr_dpq, dpq_dpq), format='csr')
def matrix_factorization(
# scores on latent features to predict R
P: np.ndarray, # strength of the associations between a user and the features
Q: np.ndarray, # strength of the associations between an item and the features
R: np.ndarray, # rating matrix of a user with an item; nonzero means prior
# algorithm to use in least_squares
method: typing.Literal['trf', 'dogbox', 'lm'] = 'trf',
# trust region solver to use in least_squares. lsmr will pass a sparsity structure.
tr_solver: None | typing.Literal['exact', 'lsmr'] = 'lsmr',
loss: typing.Literal[
'linear', 'soft_l1', 'huber', 'cauchy', 'arctan',
] = 'linear', # loss function for least_squares
max_nfev: int | None = None, # maximum number of function evaluations in least_squares
ftol: float = 1e-3, # termination tolerance in least_squares
alpha: float = 0.9, # weight of R error
beta: float = 0.1, # weight of P, Q error
jac_tol: float | None = 1e-7, # if used, the allowed tolerance in a Jacobian sanity check
) -> tuple[
np.ndarray, np.ndarray, np.ndarray, # Phat, Qhat, Rhat
scipy.optimize.OptimizeResult, # from least_squares
]:
"""
Find Phat, Qhat such that:
Phat ~ P (n,k)
Qhat ~ Q (k,m)
Phat @ Qhat ~ R (n,m) for nonzero "prior" R
"""
prior_mask: np.ndarray = R != 0
n, k = P.shape
k, m = Q.shape
x0 = np.concatenate((P.ravel(), Q.ravel()))
dpfactor = ssp.diags_array(np.full(shape=n, fill_value=alpha))
dqfactor = ssp.diags_array(np.full(shape=m, fill_value=alpha))
dpq_dpq = ssp.diags_array(np.full(shape=x0.size, fill_value=beta))
dense = method == 'lm' or tr_solver == 'exact'
if dense:
jac_sparsity: ssp.csr_array | None = None # not supported
else:
jac_sparsity = make_sparsity(prior_mask=prior_mask, k=k)
fun = functools.partial(
residual, prior_mask=prior_mask, P=P, Q=Q, Rprior=R[prior_mask], alpha=alpha, beta=beta,
)
jac = functools.partial(
jacobian, prior_mask=prior_mask, dense=dense,
dpfactor=dpfactor, dqfactor=dqfactor, dpq_dpq=dpq_dpq,
)
if jac_tol is not None:
err = scipy.optimize.check_grad(func=fun, grad=jac, x0=x0)
if err > jac_tol:
raise ValueError('Gradient incorrect')
result = scipy.optimize.least_squares(
fun=fun, x0=x0, jac=jac, method=method, ftol=ftol, x_scale='jac', loss=loss,
max_nfev=max_nfev, tr_solver=tr_solver, jac_sparsity=jac_sparsity,
)
if not result.success:
raise ValueError(result.message)
return unpack(decision=result.x, mask_shape=R.shape) + (result,)
def test() -> None:
m = 4
n = 5
k = 2
rand = np.random.default_rng(seed=0)
Phidden = rand.random(size=(n, k))
Qhidden = rand.random(size=(k, m))
Rhidden = Phidden @ Qhidden
Ptrain = Phidden.round(1)
Qtrain = Qhidden.round(1)
Rtrain = Rhidden.round(2)*((
(1, 1, 0, 1),
(1, 0, 0, 1),
(1, 1, 0, 1),
(1, 0, 0, 1),
(0, 1, 1, 1),
))
for method in ('trf', 'dogbox', 'lm'):
for tr_solver in (None, 'exact', 'lsmr'):
Phat, Qhat, Rhat, result = matrix_factorization(
P=Ptrain, Q=Qtrain, R=Rtrain, method=method, tr_solver=tr_solver,
)
assert np.allclose(Ptrain, Phat, rtol=0, atol=0.1)
assert np.allclose(Qtrain, Qhat, rtol=0, atol=0.1)
assert np.allclose(Rtrain[Rtrain != 0], Rhat[Rtrain != 0], rtol=0, atol=0.01)
def benchmark() -> None:
results = []
rand = np.random.default_rng(seed=0)
for _ in range(3): # best of
for n in (5, 10, 20, 40):
m = int(0.8*n)
for k_factor in (0.4, 0.8):
k = int(k_factor*n)
Phidden = rand.random(size=(n, k))
Qhidden = rand.random(size=(k, m))
Rhidden = Phidden @ Qhidden
Ptrain = Phidden.round(1)
Qtrain = Qhidden.round(1)
for sparsity in (0.25, 0.75):
Rtrain = Rhidden.round(2) * (
rand.random(size=Rhidden.shape) > sparsity
)
for method in ('trf', 'dogbox', 'lm'):
if method == 'lm':
solvers = (None,)
else:
solvers = ('exact', 'lsmr')
for tr_solver in solvers:
if method == 'lm':
losses = ('linear',)
else:
losses = ('linear', 'soft_l1', 'huber', 'cauchy', 'arctan')
for loss in losses:
row = (
sparsity, k_factor, n, method or 'None', tr_solver or 'None',
loss,
)
print(row)
start = time.perf_counter()
try:
matrix_factorization(
P=Ptrain, Q=Qtrain, R=Rtrain, method=method, tr_solver=tr_solver,
loss=loss, jac_tol=None,
)
dur = time.perf_counter() - start
results.append(row + (dur,))
except LinAlgError:
pass
independents = ['sparsity', 'k_factor', 'n', 'method', 'tr_solver', 'loss']
df = pd.DataFrame(
data=results,
columns=independents + ['dur'],
)
df = df.groupby(independents).min().sort_values([
'sparsity', 'k_factor', 'n', 'dur',
])
df.reset_index().to_csv('benchmark.csv')
pd.options.display.max_columns = 99
pd.options.display.max_rows = None
pd.options.display.width = 200
print(df)
if __name__ == '__main__':
test()
benchmark()
The output matrices of the test data look like
nfev = 5
njev = 5
P =
[[0.6 0.3]
[0. 0. ]
[0.8 0.9]
[0.6 0.7]
[0.5 0.9]]
~
[[0.6166414 0.2883536 ]
[0.04428404 0.01744651]
[0.81438868 0.90631738]
[0.60317056 0.73364438]
[0.49604817 0.91790365]]
Q =
[[0.8 0. 0.9 0. ]
[0.7 0.2 0.9 0.5]]
~
[[ 0.82958178 -0.00997023 0.89884808 0.01698354]
[ 0.72185913 0.18847501 0.89786845 0.55756378]]
R =
[[0.72 0.05 0. 0.17]
[0.05 0. 0. 0.01]
[1.33 0.16 0. 0.52]
[1.03 0. 0. 0.42]
[0. 0.17 1.27 0.52]]
~
[[0.71970515 0.04819939 0.81317055 0.17124827]
[0.04933115 0.00284671 0.05546929 0.01047964]
[1.32983547 0.16269853 1.54576548 0.51916094]
[1.0299672 0.13225988 1.20087485 0.4192975 ]
[1.07410964 0.16805619 1.27002867 0.52021447]]
Benchmarks
When graphing the data via
import pandas as pd
import matplotlib.pyplot as plt
import seaborn
df = pd.read_csv('benchmark.csv')
df['tr_solver'] = df['tr_solver'].fillna('none')
df['solver'] = df['method'] + '-' + df['tr_solver']
ul: plt.Axes
ur: plt.Axes
bl: plt.Axes
br: plt.Axes
fig, ((ul, ur), (bl, br)) = plt.subplots(nrows=2, ncols=2)
for ax, ((k_factor, sparsity), group) in zip(
(ul, ur, bl, br),
df.groupby(['k_factor', 'sparsity']),
):
seaborn.lineplot(
data=group,
ax=ax, x='n', y='dur', # style='loss',
hue='solver',
)
ax.set_xscale('log')
ax.set_yscale('log')
ax.set_title(f'k_factor={k_factor} sparsity={sparsity}')
plt.show()
we see that LSMR is strongly preferred over multiple R sparsities and values of k:

sklearn.decomposition? \$\endgroup\$