В предыдущей статье мы описали алгоритм машинного обучения с помощью нейронной сети. В этой статье мы построим код на языке Python для внедрения этого алгоритма. Мы будем использовать термины и обозначения из предыдущей статьи.
Для работы с векторами и матрицами мы будем использовать библиотеку NumPy, которая изначально написана на C, то есть очень быстра в исполнении.
[toc]
Структура нейронной сети
Наша задача сконструировать такую нейронную сеть, которую легко инициализировать глобальными параметрами, а именно:
- Размерностью входного вектора астрологических признаков $x_i$ —
input_dim - Размерностью каждого скрытого слоя, которую можно задать через массив чисел
hidden_dims, напримерhidden_dims=[16, 9]означало бы, что у нас есть 2 скрытых слоя в 16 и 9 нейронов соответственно. - Шагом градиентного спуска, $\lambda$ (learning rate,
lr) - Количеством эпох для обучения модели —
epoches - Числом примеров $N$ в отдельной подвыборке из тренировочных данных —
batch_size
Для этого мы создадим класс BernoulliNet, который можно инициализировать этими параметрами, например:
model = BernoulliNet(
input_dim=8,
hidden_dims=[16, 9],
lr=0.05,
epochs=100,
batch_size=32,
)
Здесь параметры input_dims и hidden_dims определяет общую структуру нейросети
dims = [input_dim, 16, 9, 1]
# ↑ ↑ ↑ ↑
# слой 0 1 2 3 (выходной)
При этом матрицы весов будут иметь следующие размерности $[\text{rows}\times \text{cols}]$ в этом примере:
Мы сделаем веса и смещения внутренними свойствами модели, то есть
class BernoulliNet:
def __init__(self,
input_dim: int,
hidden_dims: list[int],
lr: float = 0.01,
epochs: int = 100,
batch_size: int = 32,
) -> None:
# Размерность всей нейросети
dims: list[int] = [input_dim] + hidden_dims + [1]
# Инициализация весов
self.W, self.b = self._init_weights(dims)
Поскольку мы будем пробегать по значениям $W^{(1)}, W^{(2)}, W^{(3)}$ в компьютерной нотации это будут элементы массива с индексами self.W[0], self.W[1], self.W[2]. Аналогично смещения $b^{(1)}, b^{(2)}, b^{(3)}$ будут хранится в массиве self.b[0], self.b[1], self.b[2]
В общем случае
- матрица весов
self.W[layer-1]соответствует $W^{(l)}$ размерностью $[n^{(l)}\times n^{(l-1)}]$. - вектор-столбец смещений
self.b[layer-1]соответствует вектору-столбцу $\vec{\mathbf{b}}^{(l)}$ размерностью $[n^{(l)}\times 1]$.
Типы данных
Мы договорились ранее, что нижние индексы обозначают вектор-столбец $x_i.$ Формально — это матрица размерностью $n$ строк и одним столбцом $[n\times1]$.
Верхние индексы $x^i$ обозначают сопряженный вектор, или вектор-строку. Формально — это матрица размерностью $[1\times n]$.
Любые матрицы, наполненные десятичными числами в библиотеке NumPy — это объекты NDArray[np.float64], поэтому чтобы отличать их размерности мы введем явную аннотацию типов данных, которыми мы будем оперировать в коде:
import numpy as np
from numpy.typing import NDArray
# Геометрические типы
# ===================
# [n × 1] — вектор-столбец
Vector = NDArray[np.float64]
# [1 × n] — вектор-строка
CoVector = NDArray[np.float64]
# [m × n] — матрица весов
Matrix = NDArray[np.float64]
# Семантические типы
# ==================
# [n × 1] — один пример (набор входных x⃗ или промежуточных h⃗)
Sample = NDArray[np.float64]
# [n × N] — N примеров (N наборов входных x⃗ или промежуточных h⃗ для всего батча)
Batch = NDArray[np.float64]
# [n × N_total] — весь датасет признаков x⃗ из AstroDataBank
Dataset = NDArray[np.float64]
Это никак не влияет на функциональность — мы вводим эти аннотации исключительно для удобства восприятия кода.
Функции активации для нейронной сети
Прежде всего мы реализуем функции активации в нейронной сети, которые понадобятся нам далее.
Выходной слой
Напомню, что функция активации выходного слоя — сигмоида $\sigma(z^i) = \dfrac{1}{1 + e^{-z^i}}$ — применяется для каждого $i$-го примера в батче. Вместо применения сигмоиды к каждому $z^i$ мы можем делегировать это поэлементное вычисление библиотека NumPy которая выполняет эту задачу на скомпилированном C-коде, что даёт существенный выигрыш в скорости. Для этого достаточно передать в функцию сразу весь набор из $N$ примеров. то есть вектор-строку $\vec{\mathbf{z}}$ размерностью $[1\times N]$:
def sigmoid(z: Batch) -> Batch:
return 1.0 / (1.0 + np.exp(-z))
Промежуточные слои
Функция активации промежуточных слоев $\text{ReLU}(z^i) = \max(0, z^i)$ для каждого $i$-го примера в батче реализуется по тому же принципу — мы делегируем NumPy поэлементное применение функции, передавая ей весь набор из $N$ примеров:
def relu(z: Batch) -> Batch:
return np.maximum(0, z)
Производная функции ReLU равна 1 для $z^i>0$ и 0 во всех остальных случаях. Мы можем представить это в виде строки логических значений ${z^i>0}=(\text{True} \mid \text{False})^i$, в котором затем мы поэлементно преобразуем каждое значение $\text{True}_i$ в 1.0 и каждое $\text{False}_i$ — в 0.0 через метод .astype() библиотеки NumPy:
def d_relu(z: Batch) -> Batch:
result: Batch = (z > 0).astype(np.float64)
return result
Градиентный спуск для нейронной сети
Функция потерь
Напомню, что функция потерь для всех примеров из батча описывается формулой:
где $\hat{p}^i$ — предсказанная вероятность конкретного события на примере $i$-го гороскопа, а $y^i$ — метка реального события (1, если события произошло и 0 — если иначе). В библиотеке NumPy есть встроенная функция усреднения mean, которая эквивалентна $\frac{1}{N}\sum_1^N$, поэтому функцию потерь можно реализовать так:
def loss(self, p_hat: Batch, y: Batch) -> float:
"""
Бинарная кросс-энтропия (логарифмическое правдоподобие Бернулли)
"""
return float(
-np.mean(
y * np.log(p_hat + 1e-9)
+ (1 - y) * np.log(1 - p_hat + 1e-9)
)
)
Здесь p_hat и y — это векторы-строки размерностью $[1\times N]$
Инициализация весов и смещений
Мы говорили в первой части статьи, что для стабильного обучения надо, чтобы веса в каждом слое $l$ были инициированы случайными числами с нулевым средним и дисперсией $2/(n^{(l-2)})$, где $n^{(l-1)} $ — число нейронов в предыдущем слое.
Для одного слоя
В библиотеке NumPy инициализацию матрицу весов $W^{(l)}$ размерностью $[n^{(l)}\times n^{(l-1)}]$ для каждого $l$-го слоя можно реализовать так:
W_layer: Matrix = np.random.normal( # Нормальное распределение
loc=0.0, # среднее значение
scale=np.sqrt(2.0 / n_prev), # Дисперсия
size=(n_curr, n_prev), # Размер матрицы
).astype(np.float64) # десятичные числа в ячейках матрицы
Также мы должны инициализировать нулями вектор-столбец смещения $b^{(l)}$ размерностью $[n^{(l)}\times 1]$ для каждого $l$-го слоя. В нотации NumPy это можно реализовать так:
b_layer: Vector = np.zeros( # Массив нулей
(n_curr, 1), # в виде столбца
dtype=np.float64 # в десятичной форме, т.е. 0.0
)
Для всех слоев
Чтобы инициализировать параметры во всех слоях нейросети мы просто последовательно применяем эти формулы на стыке соседних слоев:
for layer in range(len(dims) - 1):
n_prev = dims[layer] # число нейронов в предыдущем слое
n_curr = dims[layer + 1] # число нейронов в текущем слое
# Далее инициируем матрицы весов и вектора смещений
После этого мы добавляем эти первичные значения в результирующий список весов и смещений для всех слоев нейросети self.W[0], self.W[1], self.W[2] и self.b[0], self.b[1], self.b[2], что соответствует $W^{(1)}, W^{(2)}, W^{(3)}$ и $b^{(1)}, b^{(2)}, b^{(3)}$ в наших формулах.
Прямой проход
Как вы помните из предыдущей статьи после инициации весов и смещений мы сначала совершаем прямой проход, применяя для каждого $i$-го примера $(\vec{\mathbf{x}}, y)^i$ в батче формулу:
и затем мы сохраняем эти значения в кэш, чтобы использовать их далее в обратном проходе.
Как обычно, вместо циклического применения этих формул к каждому примеру отдельно, с точки зрения скорости вычисления эффективней передать сразу весь массив примеров библиотеке NumPy — ее матричные вычисления существенно быстрее, чем обычный цикл перебора на языке Python.
Нулевой слой
В нулевом слое для каждого $i$-го примера в батче $\vec{\mathbf{h}}^{(0)\,i} = \vec{\mathbf{x}}^i$ — вектор-столбец размерностью $[\text{input_dim} \times 1]$. А для всех примеров в батче этот вектор превращается в матрицу $H^{(0)}=X$ размерностью $[\text{input_dim}\times N]$.
В нотации NumPy эту матрицу можно задать так:
H: Batch = X
H_cache: list[Batch] = [H]
В первой строке мы делаем поэлементное присвоение. А во второй строке мы инициализируем временное хранилище H_cache матриц $H^{(0)}, H^{(1)}, ... H^{(L-1)}$, которые понадобятся нам при обратном проходе. Для инициализации мы просто заносим значение $H^{(0)}$ в массив H_cache. Итого
H_cache[0], H_cache[1], H_cache[2]соответствует матрицам $H^{(0)}, H^{(1)}, H^{(2)}$- Размерность каждой матрицы $H^{(l)}$ — $[n^{(l)}\times N]$.
Скрытые слои
Напомню, что выходной результат $\vec{\mathbf{h}}^{(l)} = \text{ReLU}\left(W^{(l)} \vec{\mathbf{h}}^{(l-1)} + \vec{\mathbf{b}}^{(l)}\right)$ каждого скрытого слоя $l$ представляет собой вектор-столбец размерностью $[n^{(l)}\times 1]$ для отдельного взятого примера. Но для всех $N$ примеров в батче — это уже матрица $H^{(l)}$ размером $[n^{(l)} \times N]$. По-сути — это результат активации $l$-го скрытого слоя для всего набора $N$ примеров в батче.
В нотации NumPy эти матрицы можно реализовать так:
for W, b in zip(self.W[:-1], self.b[:-1]):
H = relu(W @ H + b)
H_cache.append(H)
Здесь в первой строке внутри цикла:
- Переменная
H— это выход предыдущего слоя $H^{(l-1)}$ размерностью $[n^{(l-1)}\times N]$. - Переменная
W— этоself.W[layer-1], то есть матрица $W^{(l)}$ размерностью $[n^{(l)}\times n^{(l-1)}]$ b— это векторself.b[layer-1]размерностью $[n^{(l)}\times 1]$, который NumPy автоматически расширяет до матрицы $[n^{(l)}\times N]$ простым повторением столбцов, чтобы обеспечить операцию сложения.W @ H + b— это матрица размером $[n^{(l)}\times N]$, к которой поэлементно применяется ReLU- Результат поэлементного применения функции активации заносится в переменную
H, которая соответствует $H^{(l)}$ в нашей формуле и будет использоваться в следующем цикле.
Во второй строке цикла мы просто заносим это результирующее значение $H^{(l)}$ в кэш для использования в обратном проходе.
Обычный цикл Питона обеспечивает проход по всем слоям. Мы берем набор (self.W[layer], self.b[layer]) за минусом последнего слоя, что соответствует $(W^{(l)},\vec{\mathbf{b}}^{(l)}), l=1...L-1$ в наших формулах.
Здесь функция zip склеивает массивы $W^{(l)}$ и $\vec{\mathbf{b}}^{(l)}$ в массив $(W^{(l)},\vec{\mathbf{b}}^{(l)})$, а нотация [:-1] означает, что мы пробегаемся по всем значения self.W[0], self.W[1], ... и self.b[0], self.b[1], ..., кроме последнего, то есть по $W^{(1)}, W^{(2)}, ... W^{(L-1)}$ и $b^{(1)}, b^{(2)}, ..., b^{(L-1)}$.
Выходной слой
Для выходного слоя мы применяем формулу $\hat{p}^{\,i} = \sigma\left(\left(\vec{\mathbf{w}}^{(l)\,i}\right)^{\text{T}}\;\vec{\mathbf{h}}^{(l-1)\,i} + b^{(l)\,i}\right)$ для каждого $i$-го примера. Для одного примера — это скаляр, то есть матрица $[1\times 1]$, а для всего батча примеров — это вектор-строка размером $[1\times N]$. Как обычно, мы передаем библиотеке NumPy сразу весь набор примеров из батча для быстрого последовательного применения сигмоиды:
p_hat: Batch = sigmoid(
self.W[-1] @ H + self.b[-1]
)
Здесь нотация [-1] означает, что мы берем последний элемент массива self.W, то есть матрицу $W^{(L)}$ выходного слоя размерностью $[n^{(L)}\times n^{(L-1)}] = [1\times n^{(L-1)}]$, умножаем ее на выход предыдущего слоя $H^{(L-1)}$ размером $[n^{(L-1)}\times N]$, что дает нам вектор-строку размером $[1\times N]$, и далее мы прибавляем последний элемент списка self.b размерностью $[1\times 1]$, который библиотека NumPy преобразует в вектор-строку размером $[1\times N]$ простым повторением числа b $N$ раз.
В итоге мы получаем вектор-строку $\vec{\mathbf{\hat{p}}}$ с с числами от 0 до 1 — вероятностями наступления события в каждом из примеров в батче.
Обратный проход
Для обратного прохода нам сначала надо вычислить дельту выходного слоя, а затем по рекуррентной формуле — дельты оставшихся слоев.
Дельта выходного слоя
Напомню, что дельта выходного слоя для каждого $i$-го примера — это скаляр $\delta^{(L)\;i} = \frac{1}{N}\sum_{k=1}^{N}(\hat{p}_k - y_k)^i.$ Но для всех примеров в батче — это вектор-строка размером $[1\times N]$.
В нотации библиотеки NumPy этот вектор реализуется так:
delta: Batch = (p_hat - y) / N
Здесь операция вычитания двух массивов (p_hat - y) вычисляет разность для каждого элемента вектора батча поэлементно, а при операции деления на скаляр N автоматически выполняет усреднение.
Дельты промежуточных слоев
Как вы помните, для промежуточных слоев рекуррентная формула для дельты каждого слоя для каждого $i$-го примера выглядит так: $$ \vec{\boldsymbol{\delta}}^{(l)\,i} = \left(W^{(l+1)}\right)^T \vec{\boldsymbol{\delta}}^{(l+1)\,i} \odot f'(\vec{\mathbf{z}}^{(l)\,i}) $$
Шаг 1
Сначала реализуем вычисление $\vec{\mathbf{z}}^{(l)\,i}.$
Для каждого $i$-го примера $\vec{\mathbf{z}}^{(l)\,i} = \left(W^{(l)}\right)^\text{T}\cdot \vec{\mathbf{h}}^{(l-1)\,i} + \vec{\mathbf{b}}^{(l)}$ — это вектор-столбец размерностью $[n^{(l)}\times 1]$, где $n^{(l)} $ — число нейронов в $l$-м слое. А для всего батча примеров — это матрица $Z^{(l)}$ размерностью $[n^{(l)}\times N].$
В нотации NumPy мы можем записать это в виде:
z_cache[layer] = self.W[layer] @ H_cache[layer] + self.b[layer]
Напомню, что H_cache — это временное хранилище (кэш) матриц $H^{(0)}, H^{(1)}, ... H^{(L-1)}$, которые мы создали при прямом проходе. Каждая из матриц имеет размерность $[n^{(l)}\times N]$, а self.W[layer] имеет размерность $[n^{(l+1)}\times n^{(l)}]$, поэтому матричное произведение self.W[layer] @ H_cache[layer] имеет размерность $[n^{(l+1)}\times N]$.
Отдельный элемент self.b[layer] имеет размерность $[n^{(l+1)}\times 1]$, но NumPy автоматически расширяет его до матрицы $[n^{(l+1)}\times N]$ простым повторением столбцов.
Итого:
- Элементы массива
z_cache[0], z_cache[1], ...соответствуют $Z^{(1)}, Z^{(2)}, ...$ - Каждая матрица $Z^{(l)}$ имеет размерность $[n^{(l)}\times N].$
Шаг 2
Теперь, когда нам известны $Z^{(l)}$ для всех скрытых слоев $l=1, 2, …, L-1$, мы можем перейти к реализации дельты скрытых слоев. Для каждого скрытого слоя для $i$-го примера $\vec{\boldsymbol{\delta}}^{(l)\,i} = \left(W^{(l+1)}\right)^T \vec{\boldsymbol{\delta}}^{(l+1)\,i} \odot f'(\vec{\mathbf{z}}^{(l)\,i})$ — это вектор-столбец размерностью $[n^{(l)}\times 1]$, а для всего батча из $N$ примеров это уже матрица размерностью $[n^{(l)}\times N]$: $$ \Delta^{(l)} = \left(W^{(l+1)}\right)^T \Delta^{(l+1)\;i} \odot f'(Z^{(l)}) $$
где $l$ пробегает значения $L-1, L-2, …, 1$.
В нотации NumPy эту матрицу для каждого слоя можно реализовать вот так:
for layer in range(len(self.W) - 1, 0, -1):
delta = (self.W[layer].T @ delta) * d_relu(z_cache[layer - 1])
-
Здесь мы проходим в обратном порядке по индексам
layerот предпоследнего до нулевого, что соответствует скрытым слоям $l=L, L-1, ...1$. -
Затем в каждом слое мы производим матричное умножение
self.W[layer].T @ delta, что соответствует умножению матрицы $\left(W^{(l+1)}\right)^\text{T}$ размером $[n^{(l+1)}\times n^{(l)}]^\text{T}$ или $[n^{(l)}\times n^{(l+1)}]$ на $\Delta^{(l+1)}$ размерностью $[n^{(l+1)}\times N]$ для всего батча. Результирующая матрица имеет размерность $[n^{(l)}\times N].$ - Наконец, мы поэлементно умножаем матрицу размером $[n^{(l)}\times N]$ на производную $f'(Z^{(l)})$ той же размерности.
Градиенты по весам
Теперь, когда нам известен дельта-вектор $\vec{\boldsymbol{\delta}}^{(L)}$ выходного слоя размерностью $[1\times N]$ для всего батча и каждая последующая дельта-матрица $\Delta^{(l)}$ скрытого слоя, мы можем реализовать формулы градиентов.
Напомню, что для выходного слоя слоя
- Для одного примера $(\nabla_{\vec{\mathbf{w}}^{(L)}} \mathcal{L}) = \delta^{(L)} \cdot \left(\vec{\mathbf{h}}^{(L-1)}\right)^\text{T}$ — это вектор размером $[1\times n^{(L-1)}]$
- Для всего батча примеров — это произведение вектора-строки $\vec{\boldsymbol{\delta}}^{(L)}$ размерностью $[1\times N]$ на транспонированную матрицу $\left(H^{(L-1)}\right)^\text{T}$ размерностью $[N\times n^{(L-1)}]$. То есть результат произведения — это матрица размером $[1\times n^{(L-1)}]$
$$ \nabla_{W^{(L)}} \mathcal{L} = \vec{ \boldsymbol{\delta}}^{(L)} \cdot \left(H^{(L-1)}\right)^\text{T} $$
Для скрытых слоев
- Для одного примера $\nabla_{W^{(l)}} \mathcal{L} = \vec{ \boldsymbol{\delta}}^{(l)} \cdot \left(\vec{\mathbf{h}}^{(l-1)}\right)^\text{T}$ — это матрица размером $[n^{(l)}\times n^{(l-1)}]$
- Для всего батча примеров — это произведение матрицы $\Delta^{(l)}$ размерностью $[n^{(l)}\times N]$ на транспонированную матрицу $\left(H^{(l-1)}\right)^\text{T}$ размерностью $[N\times n^{(l-1)}]$. То есть результат произведения — это матрица размером $[n^{(l)}\times n^{(l-1)}]$:
$$ \nabla_{W^{(l)}} \mathcal{L} = \Delta^{(l)} \cdot \left(H^{(l-1)}\right)^\text{T} $$
Эти формулы реализуются в NumPy одной строкой матричного произведения:
delta @ H_cache[layer - 1].T
Здесь delta — это матрица размером $[n^{(l)}\times n^{(l-1)}]$. Она принимает размерность $[1\times n^{(L-1)}]$ для последнего слоя, так как в нашей сети в выходном слое только 1 нейрон: $n^{(L)} = 1.$
cache[layer - 1] — это элемент сохраненного при прямом проходе массива $H^{(0)}, H^{(1)}, ... H^{(L)},$ который соответствует предыдущему слою.
Так как в обратном проходе мы идем по всем слоям от последнего к первому, мы сохраняем эти градиенты в массив в обратном порядке:
# Массив градиентов по каждому слою
dW: list[Matrix] = []
# Начинаем с выходного слоя
delta: Batch = (p_hat - y) / N
dW.insert(0, delta @ H_cache[-1].T)
# Затем рекуррентно движемся к первому слою
for layer in range(len(self.W) - 1, 0, -1):
delta = (self.W[layer].T @ delta) * d_relu(z_cache[layer - 1])
dW.insert(0, delta @ H_cache[layer - 1].T)
Градиенты по смещениям
Аналогичным образом мы теперь можем реализовать формулы градиентов по смещению.
Напомню, что для выходного слоя слоя
-
Для одного примера $\nabla_{b^{(L)}} \mathcal{L} = \dfrac{\partial \mathcal{L}}{\partial z^{(L)}} \dfrac{\partial z^{(L)}}{\partial b^{(L)}} = \delta^{(L)} \cdot 1$ — это скаляр размерностью $[1\times 1]$
-
Для $N$ примеров — это произведение вектора-строки $\vec{\boldsymbol{\delta}}^{(l)}$ для размерностью $[1\times N]$ всего батча на единичный вектор-строку $\vec{1}$ размерностью $[N\times 1],$ то есть градиент по смещению — это скаляр размерностью $[1\times 1]$
Для скрытых слоев
- Для одного примера $\nabla_{\vec{\mathbf{b}}^{(l)}} \mathcal{L} = \vec{\boldsymbol{\delta}}^{(l)}\cdot 1$ — это вектор размером $[n^{(l)}\times 1]$
- Для всех примеров батча — это произведение матрицы $\Delta^{(l)}$ размером $[n^{(l)}\times N]$ на единичный вектор-столбец $\vec{1}$ размерностью $[N\times 1]$, то есть результирующий градиент — это вектор-столбец размером $[n^{(l)}\times 1]$.
Мы можем реализовать этот градиент через матричное произведение в NumPy:
delta @ np.ones((N, 1))
Но еще быстрее — просто просуммировать дельты для всех примеров в батче
delta.sum(axis=1, keepdims=True)
Здесь delta — это матрица $\Delta^{(l)}$ размером $[n^{(l)}\times N]$. В частности для выходного слоя она имеет размерность $[1\times N]$.
sum(axis=1) означает, что мы суммируем по столбцам, то есть по N. Параметр keepdims=True означает, что мы учитываем размерность матрицы после суммирования.
Так как в обратном проходе мы идем по всем слоям от последнего к первому, мы сохраняем эти градиенты в массив в обратном порядке:
# Массив градиентов по каждому слою
db: list[Vector] = []
# Начинаем с выходного слоя
delta: Batch = (p_hat - y) / N
db.insert(0, delta.sum(axis=1, keepdims=True))
# Затем рекуррентно движемся к первому слою
for layer in range(len(self.W) - 1, 0, -1):
delta = (self.W[layer].T @ delta) * d_relu(z_cache[layer - 1])
db.insert(0, delta.sum(axis=1, keepdims=True))
Общий алгоритм градиентного спуска
Шаг 1. Перемешивание датасета в начале эпохи
Для каждой эпохи мы берем весь датасет и перемешивает его. Это можно реализовать так:
N_total: int = X.shape[1]
for epoch in range(1, self.epochs + 1):
idx = np.random.permutation(N_total)
X, y = X[:, idx], y[:, idx]
Здесь Х — это астрологические признаки всех примеров взятых из AstroDataBank, то есть матрица размером $[\text{input_dim}\times N_\text{total}]$.
Сначала мы берем индексы $i$ всех примеров и перемешиваем их, затем пересобираем примеры $(\vec{\mathbf{x}, y})_i$ в порядке новых (перемешанных) индексов.
Шаг 2. Разбиение датасета на батчи
Далее мы разбиваем датасет на батчи и начинаем обучение на каждом отдельном батче из N примеров. Это разбиение можно реализовать так:
for s in range(0, N_total, self.batch_size):
Xb: Batch = X[:, s: s + self.batch_size]
yb: Batch = y[:, s: s + self.batch_size]
Здесь мы проходим порядковые номера элементов датасета $0, 1, ..., N_\text{total}$ с шагом $N$ (размером батча), то есть последовательно выбираем $s$-й элемент, кратный размеру батча.
Затем мы выделяем из датасета две подматрицы — $X_\text{batch}$ размером $[\text{input_dim}\times N]$ и $y_\text{batch}$ размером $[1\times N]$ простым выбором столбцов в диапазоне от индекса $s$ до $s+N$. Эти две подматрицы вместе образуют батч из $N$ примеров $(\vec{\mathbf{x}, y})_i$.
Шаг 3. Прямой и обратный проходы
Теперь мы должны пройтись прямым проходом по нашему батчу и сохранить результаты активации слоев — вектор-строку $\vec{\mathbf{\hat{p}}}$ для выходного слоя и матрицы $H^{(l)}$ для всех промежуточных слоев.
Затем используя эти сохраненные величины мы совершаем обратный проход и сохраняем градиенты по весам и смещениям для всех слоев.
Этот шаг можно реализовать так:
p_hat, H_cache = forward(Xb)
dW, db = backward(p_hat, yb, H_cache)
Шаг 4. Обновление параметров
Градиенты указывают в сторону роста функции потерь, значит нам надо сделать небольшой шаг в противоположном направлении для каждого примера:
Это обновление параметров можно реализовать через обычный цикл Python по всем слоям:
self.W = [W - self.lr * dw for W, dw in zip(self.W, dW)]
self.b = [b - self.lr * db_ for b, db_ in zip(self.b, db)]
Полный код
Теперь мы можем собрать все вместе в единый код. Этот код эмулирует пример астрологических признаков и меток событий и проводит на их основе тренинг в небольшой нейросети.
Затем код оценивает точность предсказаний на этом же датасете.
import numpy as np
from numpy.typing import NDArray
from typing import cast
# pyright: reportConstantRedefinition=false
# Геометрические типы
Vector = NDArray[np.float64] # [n × 1] — столбец
CoVector = NDArray[np.float64] # [1 × n] — строка
Matrix = NDArray[np.float64] # [m × n] — матрица весов
# Семантические типы
Sample = NDArray[np.float64] # [n × 1] — один пример x⃗
Batch = NDArray[np.float64] # [n × N] — N примеров
Dataset = NDArray[np.float64] # [n × N_total] — весь датасет
# Функции активации и их производные
def sigmoid(z: Batch) -> Batch:
return 1.0 / (1.0 + np.exp(-z))
def relu(z: Batch) -> Batch:
return np.maximum(0, z)
def d_relu(z: Batch) -> Batch:
result: Batch = (z > 0).astype(np.float64)
return result
class BernoulliNet:
"""
Нейросеть для бинарной классификации (распределение Бернулли).
Параметры
---------
input_dim : размер входного вектора x
hidden_dims : список размеров скрытых слоёв, напр. [64, 32]
lr : шаг градиентного спуска λ
epochs : число эпох
batch_size : размер мини-батча
"""
W: list[Matrix]
b: list[Vector]
lr: float
epochs: int
batch_size: int
def __init__(self,
input_dim: int,
hidden_dims: list[int],
lr: float = 0.01,
epochs: int = 100,
batch_size: int = 32,
) -> None:
dims: list[int] = [input_dim] + hidden_dims + [1]
# Инициализация весов: W ~ N(0, 2/n_prev)
self.W, self.b = self._init_weights(dims)
self.lr, self.epochs, self.batch_size = lr, epochs, batch_size
def _init_weights(self, dims: list[int]) -> tuple[list[Matrix], list[Vector]]:
"""
Инициаализирует веса W и смещения b для всех слоев:
W ~ N(μ=0, σ²=2/n_prev).
"""
W_list: list[Matrix] = []
b_list: list[Vector] = []
for layer in range(len(dims) - 1):
n_prev = dims[layer] # число нейронов в предыдущем слое
n_curr = dims[layer + 1] # число нейронов в текущем слое
# Случайная матрица с нормальным распределением N(μ=0, σ²=1=2/n_prev),
W_layer: Matrix = np.random.normal(
loc=0.0,
scale=np.sqrt(2.0 / n_prev),
size=(n_curr, n_prev),
).astype(np.float64)
W_list.append(W_layer)
# Вектор смещений — вектор-столбец нулей
b_layer: Vector = np.zeros((n_curr, 1), dtype=np.float64)
b_list.append(b_layer)
return W_list, b_list
def forward(self, X: Batch) -> tuple[Batch, list[Batch]]:
"""
Прямой проход по сети: возвращает предсказания p̂ для всех примеров в батче
и кэш активаций Hᵢ для для обратного прохода (для всех примеров на каждом слое).
Параметры
---------
X : Batch матрица астрологических признаков размером [input_dim × N]
"""
# Прямой проход по нулевому слою:
H: Batch = X
H_cache: list[Batch] = [H]
# Прямой проход по скрытым слоям:
for W, b in zip(self.W[:-1], self.b[:-1]):
H = relu(W @ H + b) # размер [n_current × N]
H_cache.append(H)
# Прямой проход по выходному слою:
p_hat: Batch = sigmoid(
self.W[-1] @ H + self.b[-1]) # размер [1 × N]
return p_hat, H_cache
def loss(self, p_hat: Batch, y: Batch) -> float:
"""
Бинарная кросс-энтропия (логарифмическое правдоподобие Бернулли)
"""
return float(
-np.mean(
y * np.log(p_hat + 1e-9)
+ (1 - y) * np.log(1 - p_hat + 1e-9)
)
)
def backward(self,
p_hat: Batch,
y: Batch,
H_cache: list[Batch]
) -> tuple[list[Matrix], list[Vector]]:
"""
Обратный проход по сети: возвращает градиенты dW и db
для всех слоев.
Параметры
----------
p_hat : Batch размер [1 × N] — предсказания для всех примеров в батче
y : Batch размер [1 × N] — метки событий для всех примеров в батче
cache : list[Batch] — сохраненные активации H⁽ˡ⁾ для каждого слоя, размер [n⁽ˡ⁾ × N]
"""
N: int = y.shape[1]
dW: list[Matrix] = []
db: list[Vector] = []
# Выходной слой:
delta: Batch = (p_hat - y) / N
dW.insert(0, delta @ H_cache[-1].T)
db.insert(0, delta.sum(axis=1, keepdims=True))
# Скрытые слои:
z_cache: list[Batch] = [
self.W[layer] @ H_cache[layer] + self.b[layer]
for layer in range(len(self.W) - 1)
]
# Обратный проход по скрытым слоям
for layer in range(len(self.W) - 1, 0, -1):
delta = (self.W[layer].T @ delta) * d_relu(z_cache[layer - 1])
dW.insert(0, delta @ H_cache[layer - 1].T)
db.insert(0, delta.sum(axis=1, keepdims=True))
return dW, db
def train(self, X: Dataset, y: Dataset, verbose: bool = True) -> None:
"""
X : Dataset shape [input_dim × N_total] — признаки всех примеров в датасете
y : Dataset shape [1 × N_total] — метки реальных событий {0, 1}
"""
N_total: int = X.shape[1]
for epoch in range(1, self.epochs + 1):
idx = np.random.permutation(N_total)
X, y = X[:, idx], y[:, idx]
for s in range(0, N_total, self.batch_size):
Xb: Batch = X[:, s: s + self.batch_size]
yb: Batch = y[:, s: s + self.batch_size]
p_hat, H_cache = self.forward(Xb)
dW, db = self.backward(p_hat, yb, H_cache)
self.W = [W - self.lr * dw for W, dw in zip(self.W, dW)]
self.b = [b - self.lr * db_ for b, db_ in zip(self.b, db)]
if verbose and epoch % 10 == 0:
p_hat, _ = self.forward(X)
print(f"Эпоха {epoch:4d} | потери = {self.loss(p_hat, y):.4f}")
def predict_probability(self, X: Dataset) -> Dataset:
"""Возвращает p̂ ∈ (0,1) события для каждого столбца батча."""
p_hat, _ = self.forward(X)
return p_hat
def predict_label(self, X: Dataset, threshold: float = 0.5) -> Dataset:
"""Возвращает наблюдаемые бинарные метки события y = {0, 1}."""
result: Batch = (self.predict_probability(X) >=
threshold).astype(np.float64)
return result
# Пример использования
if __name__ == "__main__":
np.random.seed(42)
N = 500
# Эмулируем астрологические признаки (нормированы в [0, 1])
X: Dataset = np.random.rand(8, N).astype(np.float64)
# Эмулируем бинарные метки событий (например, "сбылось" или "не сбылось")
y: Dataset = (X[0] + X[3] > 1.0).astype(np.float64).reshape(1, N)
# Инициализация и обучение модели
model = BernoulliNet(
input_dim=8,
hidden_dims=[16, 9],
lr=0.05,
epochs=100,
batch_size=32,
)
model.train(X, y)
# Оценка точности на трейне
accuracy: float = cast(float, np.mean(model.predict_label(X) == y))
print(f"\nТочность на трейне: {accuracy:.2%}")
В реальном сценарии, оценка точности происходит сначала на данных для валидации для улучшения входных параметров модели, а затем такая же оценка точности происходит на тестовых данных. А найденные веса и смешения необходимо сохранить в файл для дальнейшего использования.
Также в реальном сценарии вместо библиотеки NumPy надо использовать ее аналог CuPy для быстрого вычисления на GPU.