Adding MLP support to AutoGrad
Table of contents
Post
In the previous post I’ve added support for tensors to the toy automatic differentiation system. Tensors are basically a linear array with a rule to access it in a multi-dimensional fasion.
In this update I’ll extend it to handle MLPs. And for that we need to add a linear layer aka matrix multiplication. First thing we add is a Matrix class, right next to the learnable parameter class. Its job is to own a array of learnable parameters [out_features, in_channels]. The way it works in this setup is that we don’t care about the actual shape of the tensor, all we need to know is that it is an array of N * [in_features], then what we do is we multiply each such array by the matrix and the result is a new array of N * [out_features].
Next thing we need to add the matrix multiplication node, its job is to take a node that produces a tensor and a node that produces the matrix, and multiply the last channels of the tensor by the matrix to produces a new tensor. Next we use that to backpropagate the gradient. If you do the math you’ll see that gradients to the tensor are just gradients of this node multiplied by the transpose matrix, and gradients for the weights is an outer product of the node’s gradients and the input channels. That’s coming up from the basic rules of back propagation for addition and multiplication.
Next I add LeakyReLU activation function for chaining matrix multiplies together to comprise the feedforward MLP pass. Another thing we need is an optimizer. For that I implemented a basic AdamW without variance bias corrections. AdamW is a variant of the Adam optimizer that decouples weight decay from the optimization steps. Meaning that the weight decay is applied directly to the weights after the update step, rather than being included in the loss function and gradients. The function is pretty similar we just keep an exponential running average for grad and grad*grad, basically new_val=lerp(old_val, update, 0.1) - that’s just TAA applied to the grads. Then we divide the mean grad by the standard deviation of the variance. We compute variance as avg(t^2) - avg(t)^2, the rule for convex or concave functions is such that this expression is always positive, so to get the standard deviation we need to take the square root of it. This acts as a dynamic scale for the gradient, the bigger the variance, the smaller the learning rate which helps a lot to stabilize the training. It actually works unreasonably well for a simple technique like this. One thing I’ve omitted is the statistical bias correction but you can look it up.
Source Code:
import math
import random
def make_array(dim):
arr = []
for _ in range(dim):
arr.append(0.0)
return arr
def dims_get_total(dims):
total = 1
for d in dims:
total *= d
return total
"""
New basic building block class for compute.
Basically a flat array with a rule for accessing elements.
We're using a basic rule of linear strides.
"""
class Tensor:
def __init__(self, shape):
self.shape = shape
self.data = make_array(dims_get_total(shape))
self.strides = []
self._compute_strides()
def _compute_strides(self):
total = 1
for d in reversed(self.shape):
self.strides.insert(0, total)
total *= d
def _get_flat_index(self, indices):
if len(indices) != len(self.shape):
raise ValueError("Incorrect number of indices")
index = 0
for i, idx in enumerate(indices):
if not (0 <= idx < self.shape[i]):
raise IndexError("Index out of bounds")
index += idx * self.strides[i]
return index
def __getitem__(self, indices):
index = self._get_flat_index(indices)
return self.data[index]
def __setitem__(self, indices, value):
index = self._get_flat_index(indices)
self.data[index] = value
def __repr__(self):
return f"Tensor(shape={self.shape}, data={self.data})"
def __add__(self, other):
if isinstance(other, (int, float)):
result = Tensor(self.shape)
for i in range(len(self.data)):
result.data[i] = self.data[i] + other
return result
if not isinstance(other, Tensor):
raise TypeError(f"Unsupported operand type: {type(other)}")
if self.shape != other.shape:
raise ValueError(f"Shapes must be the same: {self.shape} vs {other.shape}")
result = Tensor(self.shape)
for i in range(len(self.data)):
result.data[i] = self.data[i] + other.data[i]
return result
def __mul__(self, other):
if isinstance(other, (int, float)):
result = Tensor(self.shape)
for i in range(len(self.data)):
result.data[i] = self.data[i] * other
return result
if not isinstance(other, Tensor):
raise TypeError(f"Unsupported operand type: {type(other)}")
if self.shape != other.shape:
raise ValueError(f"Shapes must be the same: {self.shape} vs {other.shape}")
result = Tensor(self.shape)
for i in range(len(self.data)):
result.data[i] = self.data[i] * other.data[i]
return result
def __sub__(self, other):
if isinstance(other, (int, float)):
result = Tensor(self.shape)
for i in range(len(self.data)):
result.data[i] = self.data[i] - other
return result
if not isinstance(other, Tensor):
raise TypeError(f"Unsupported operand type: {type(other)}")
if self.shape != other.shape:
raise ValueError(f"Shapes must be the same: {self.shape} vs {other.shape}")
result = Tensor(self.shape)
for i in range(len(self.data)):
result.data[i] = self.data[i] - other.data[i]
return result
def __truediv__(self, other):
if isinstance(other, (int, float)):
result = Tensor(self.shape)
for i in range(len(self.data)):
result.data[i] = self.data[i] / other
return result
if not isinstance(other, Tensor):
raise TypeError(f"Unsupported operand type: {type(other)}")
if self.shape != other.shape:
raise ValueError(f"Shapes must be the same: {self.shape} vs {other.shape}")
result = Tensor(self.shape)
for i in range(len(self.data)):
result.data[i] = self.data[i] / other.data[i]
return result
def abs(self):
result = Tensor(self.shape)
for i in range(len(self.data)):
result.data[i] = abs(self.data[i])
return result
def get_list_shape(lst):
shape = []
while isinstance(lst, list):
shape.append(len(lst))
lst = lst[0] if lst else []
return shape
def linearize_list(lst):
result = []
def recurse(sublist):
if isinstance(sublist, list):
for item in sublist:
recurse(item)
else:
result.append(sublist)
recurse(lst)
return result
def tensor_from_list(lst):
shape = get_list_shape(lst)
tensor = Tensor(shape)
tensor.data = linearize_list(lst)
return tensor
# Compute graph basic building block
class AutoGradNode:
def __init__(self, shape):
# scalar valued gradient accumulator for the final dL/dp
self.shape = shape
self.grad = Tensor(shape)
# dependencies for causation sort
self.dependencies = []
def zero_grad(self):
self.grad = Tensor(shape=self.shape)
# Overload operators to build the computation graph
def __add__(self, other): return Add(self, other)
def __mul__(self, other): return Mul(self, other)
def __sub__(self, other): return Sub(self, other)
# Get a topologically sorted list of dependencies
# starts from the leaf nodes and terminates at the root
def get_topo_sorted_list_of_deps(self):
visited = set()
topo_order = []
def dfs(node): # depth-first search
if node in visited:
return
visited.add(node)
for dep in node.dependencies:
dfs(dep)
topo_order.append(node)
dfs(self)
return topo_order
def get_pretty_name(self): return self.__class__.__name__
# Pretty print the computation graph in DOT format
def pretty_print_dot_graph(self):
topo_order = self.get_topo_sorted_list_of_deps()
_str = ""
_str += "digraph G {\n"
for node in topo_order:
_str += f" {id(node)} [label=\"{node.get_pretty_name()}\"];\n"
for dep in node.dependencies:
_str += f" {id(node)} -> {id(dep)};\n"
_str += "}"
return _str
def backward(self):
topo_order = self.get_topo_sorted_list_of_deps()
for node in topo_order:
node.zero_grad() # we don't want to accumulate gradients
for i in range(len(self.grad.data)):
self.grad.data[i] = 1.0 # seed the gradient at the output node dL/dL=1
# Reverse the topological order for backpropagation to start from the output
for node in reversed(topo_order):
# from the tip of the graph down to leaf learnable parameters
# Distribute gradients
node._backward()
# The job of this method is to propagate gradients backward through the network
def _backward(self):
assert False, "Not implemented in base class"
# Materialize the numerical value at the node
# i.e. Evaluate the computation graph
def materialize(self):
assert False, "Not implemented in base class"
# Any value that is not learnable
class Variable(AutoGradNode):
def __init__(self, values, name=None):
assert isinstance(values, Tensor), "Values must be a Tensor"
super().__init__(shape=values.shape)
self.values = values
self.name = name
def get_pretty_name(self):
if self.name:
return f"Variable({self.name})"
else:
return f"Constant({[f'{round(v, 3)}' for v in self.values.data]})"
def materialize(self): return self.values
def _backward(self):
pass
Constant = Variable
# Learnable parameter with initial random value 0..1
class LearnableParameter(AutoGradNode):
def __init__(self, shape):
super().__init__(shape=shape)
self.values = Tensor(shape)
for i in range(len(self.values.data)):
self.values.data[i] = random.random()
def get_pretty_name(self):
if len(self.values.data) <= 3:
return f"LearnableParameter({[round(v, 3) for v in self.values.data]})"
else:
return f"LearnableParameter(shape={self.values.shape})"
def materialize(self): return self.values
def _backward(self):
pass
class Matrix(AutoGradNode):
def __init__(self, in_channels, out_features):
super().__init__(shape=[out_features, in_channels])
self.values = Tensor([out_features, in_channels])
for i in range(len(self.values.data)):
self.values.data[i] = random.random()
def get_pretty_name(self):
return f"Matrix({self.values.shape})"
def materialize(self):
return self.values
def _backward(self):
pass
def transposed(self):
in_features = self.values.shape[1]
out_features = self.values.shape[0]
transposed = Matrix(in_channels=out_features, out_features=in_features)
for i in range(out_features):
for j in range(in_features):
transposed.values.data[i * in_features + j] = self.values.data[j * out_features + i]
return transposed
def tensor_matrix_multiply(tensor, matrix):
in_features = tensor.shape[-1]
out_features = matrix.shape[0]
assert in_features == matrix.shape[1], f"Incompatible matrix dimensions {tensor.shape} and {matrix.shape}"
result = Tensor(tensor.shape[:-1] + [out_features])
total_number_of_elems = len(result.data) # we don't really care about the actual shape, for this we know that the tensor is an array of input features
for s in range(total_number_of_elems // out_features):
for i in range(out_features):
for j in range(in_features):
result.data[s * out_features + i] += tensor.data[s * in_features + j] * matrix.data[i * in_features + j]
return result
def tensor_outer_product(tensor_a, tensor_b):
in_features = tensor_a.shape[-1]
out_features = tensor_b.shape[-1]
result_shape = [out_features, in_features] # tensor_a.shape[:-1] + [out_features, in_features]
result = Tensor(result_shape)
total_number_of_elems_a = len(tensor_a.data) # we don't really care about the actual shape, for this we know that the tensor is an array of input features
total_number_of_elems_b = len(tensor_b.data) # we don't really care about the actual shape, for this we know that the tensor is an array of output features
assert total_number_of_elems_a // in_features == total_number_of_elems_b // out_features, "Incompatible tensor dimensions for outer product"
for s in range(total_number_of_elems_a // in_features):
for i in range(out_features):
for j in range(in_features):
# s * in_features * out_features +
result.data[i * in_features + j] += tensor_a.data[s * in_features + j] * tensor_b.data[s * out_features + i]
return result
class VectorMatrixMultiply(AutoGradNode):
def __init__(self, tensor, matrix):
assert tensor.shape[-1] == matrix.shape[1], "Incompatible matrix dimensions"
super().__init__(shape=tensor.shape[:-1] + [matrix.shape[0]])
self.tensor = tensor
self.matrix = matrix
self.dependencies = [tensor, matrix]
def materialize(self):
tensor = self.tensor.materialize()
matrix = self.matrix.materialize()
return tensor_matrix_multiply(tensor, matrix)
def _backward(self):
# print(f"VectorMatrixMultiply backward")
# print(f"grad : {self.grad}")
# print(f"tensor : {self.tensor.materialize()}")
# print(f"matrix : {self.matrix.materialize()}")
# print(f"mT : {mT.materialize()}")
mT = self.matrix.transposed()
tmp = tensor_matrix_multiply(self.grad, mT.materialize())
self.tensor.grad = self.tensor.grad + tmp
self.matrix.grad = self.matrix.grad + tensor_outer_product(self.tensor.materialize(), self.grad)
class LeakyRelu(AutoGradNode):
def __init__(self, a, negative_slope=0.01):
super().__init__(shape=a.shape)
self.a = a
self.negative_slope = negative_slope
self.dependencies = [a]
def materialize(self):
x = self.a.materialize()
result = Tensor(x.shape)
for i in range(len(x.data)):
result.data[i] = x.data[i] if x.data[i] > 0 else self.negative_slope * x.data[i]
return result
def _backward(self):
am = self.a.materialize()
slope = Tensor(self.a.shape)
for i in range(len(am.data)):
slope.data[i] = 1.0 if am.data[i] > 0.0 else self.negative_slope
self.a.grad = self.a.grad + self.grad * slope
class Reduce(AutoGradNode):
def __init__(self, a, op='+'):
super().__init__(shape=[1,])
self.a = a
self.dependencies = [a]
self.op = op
assert op in ['+'], "Only sum reduction is supported"
def materialize(self): return tensor_from_list([sum(self.a.materialize().data)])
def _backward(self):
self.a.grad = self.a.grad + self.grad.data[0] # broadcast the gradient
class Square(AutoGradNode):
def __init__(self, a):
super().__init__(shape=a.shape)
self.a = a
self.dependencies = [a]
def materialize(self):
x = self.a.materialize()
return x * x
def _backward(self):
self.a.grad = self.a.grad + self.grad * 2.0 * self.a.materialize()
class Sub(AutoGradNode):
def __init__(self, a, b):
assert a.shape == b.shape, "Incompatible tensor dimensions"
super().__init__(shape=a.shape)
self.a = a
self.b = b
self.dependencies = [a, b]
def materialize(self):
return self.a.materialize() - self.b.materialize()
def _backward(self):
self.a.grad = self.a.grad + self.grad
self.b.grad = self.b.grad - self.grad
class Add(AutoGradNode):
def __init__(self, a, b):
assert a.shape == b.shape, "Incompatible tensor dimensions"
super().__init__(shape=a.shape)
self.a = a
self.b = b
self.dependencies = [a, b]
def materialize(self):
return self.a.materialize() + self.b.materialize()
def _backward(self):
self.a.grad = self.a.grad + self.grad
self.b.grad = self.b.grad + self.grad
class Mul(AutoGradNode):
def __init__(self, a, b):
assert a.shape == b.shape, "Incompatible tensor dimensions"
super().__init__(shape=a.shape)
self.a = a
self.b = b
self.dependencies = [a, b]
def materialize(self):
return self.a.materialize() * self.b.materialize()
def _backward(self):
self.a.grad = self.a.grad + self.grad * self.b.materialize()
self.b.grad = self.b.grad + self.grad * self.a.materialize()
class Sin(AutoGradNode):
def __init__(self, a):
super().__init__(shape=a.shape)
self.a = a
self.dependencies = [a]
def materialize(self):
ma = self.a.materialize()
result = Tensor(ma.shape)
for i in range(len(ma.data)):
result.data[i] = math.sin(ma.data[i])
return result
def _backward(self):
ma = self.a.materialize()
for i in range(len(ma.data)):
self.a.grad.data[i] = self.a.grad.data[i] + self.grad.data[i] * math.cos(ma.data[i])
num_nodes = 64
m0 = Matrix(in_channels=1, out_features=num_nodes)
b0 = LearnableParameter(shape=[num_nodes,]) # bias
m1 = Matrix(in_channels=num_nodes, out_features=num_nodes)
b1 = LearnableParameter(shape=[num_nodes,]) # bias
m2 = Matrix(in_channels=num_nodes, out_features=1)
b2 = LearnableParameter(shape=[1,]) # bias
def eval_mlp(x):
z = VectorMatrixMultiply(tensor=x, matrix=m0)
z = z + b0
z = LeakyRelu(z, negative_slope=0.1)
z = VectorMatrixMultiply(tensor=z, matrix=m1)
z = z + b1
z = LeakyRelu(z, negative_slope=0.1)
z = VectorMatrixMultiply(tensor=z, matrix=m2)
z = z + b2
return z
def eval_target(x):
return Square(x) * Constant(tensor_from_list([2.777, ])) + Constant(tensor_from_list([0.123,])) - x * x * x * Constant(tensor_from_list([1.5,])) + Sin(x * Constant(tensor_from_list([4.0,])))
class AdamW:
"""
Simple AdamW without variance bias correction
"""
def __init__(self, parameters, lr=0.001, betas=(0.9, 0.999), weight_decay=0.01):
self.parameters = parameters
self.lr = lr
self.betas = betas
self.weight_decay = weight_decay
self.moments_1 = []
self.moments_2 = []
for p in parameters:
self.moments_1.append(Tensor(p.grad.shape))
self.moments_2.append(Tensor(p.grad.shape))
def step(self):
for i, p in enumerate(self.parameters):
self.moments_1[i] = self.moments_1[i] * self.betas[0] + p.grad * (1 - self.betas[0])
self.moments_2[i] = self.moments_2[i] * self.betas[1] + (p.grad * p.grad) * (1 - self.betas[1])
variance = self.moments_2[i] - self.moments_1[i] * self.moments_1[i]
for didx in range(len(p.values.data)):
p.values.data[didx] -= self.lr * self.moments_1[i].data[didx] / (variance.data[didx] ** 0.5 + 1e-8)
p.values.data[didx] -= self.weight_decay * self.lr * p.values.data[didx]
adamw = AdamW(parameters=[m0, b0, m1, b1, m2, b2], lr=0.001, weight_decay=0.001, betas=(0.9, 0.9))
for epoch in range(3000):
x = Variable(tensor_from_list([random.random() * 2.0 - 1.0,]), name="x")
mlp = eval_mlp(x)
loss = Reduce(Square(mlp - eval_target(x))) # L2 loss to Ax^2+B
print(f"Epoch {epoch}: loss = {loss.materialize()};")
# Backward pass
# Gradient reset happens internally in the backward pass
loss.backward()
adamw.step()
with open(".tmp/graph.dot", "w") as f:
f.write(loss.pretty_print_dot_graph())
# Plot our mlp
import matplotlib.pyplot as plt
max_range = 100
x_test_vals = [(i / max_range - 1.0) for i in range(2 * max_range)]
y_test_vals = []
for x in x_test_vals:
y_test_vals.append(eval_mlp(Variable(tensor_from_list([x,]), name="x")).materialize().data[0])
x_ref_vals = [(i / max_range - 1.0) for i in range(2 * max_range)]
y_ref_vals = []
for x in x_ref_vals:
y_ref_vals.append(eval_target(Variable(tensor_from_list([x,]), name="x")).materialize().data[0])
plt.plot(x_test_vals, y_test_vals, label="MLP")
plt.plot(x_ref_vals, y_ref_vals, label="Target", linestyle="--")
plt.xlabel("X")
plt.ylabel("Y")
plt.show()
After a few minutes you should get this plot:
The dotted line represents the target function we are trying to learn, while the solid line represents the output of our MLP. As you can see, the MLP is able to learn the target function quite well.
This is our compute graph:
It looks like we have 2 branches in the computation graph: 1 for the MLP and the other for the target function. In reality we can replace the target function by just raw data and it will yield the same result. So you could think of MLP as being an automatic approximation discovery mechanism to a black box.