Машинное обучение для прогнозирования событий. Практические шаги


БлогИсследования
Нейросеть и астрология

4 апреля 2026 г. 19:30 Продвинутый уровень Алексей Бореалис (Марк Русборн) 17 мин. на чтение


В предыдущей статье мы описали алгоритм машинного обучения с помощью нейронной сети. В этой статье мы построим код на языке Python для внедрения этого алгоритма. Мы будем использовать термины и обозначения из предыдущей статьи.

Для работы с векторами и матрицами мы будем использовать библиотеку NumPy, которая изначально написана на C, то есть очень быстра в исполнении.

[toc]

Структура нейронной сети

Наша задача сконструировать такую нейронную сеть, которую легко инициализировать глобальными параметрами, а именно:

  • Размерностью входного вектора астрологических признаков $x_i$ — input_dim
  • Размерностью каждого скрытого слоя, которую можно задать через массив чисел hidden_dims, например hidden_dims=[16, 9] означало бы, что у нас есть 2 скрытых слоя в 16 и 9 нейронов соответственно.
  • Шагом градиентного спуска, $\lambda$ (learning rate, lr)
  • Количеством эпох для обучения модели — epoches
  • Числом примеров $N$ в отдельной подвыборке из тренировочных данных — batch_size

Для этого мы создадим класс BernoulliNet, который можно инициализировать этими параметрами, например:

model = BernoulliNet(
    input_dim=8,
    hidden_dims=[16, 9],
    lr=0.05,
    epochs=100,
    batch_size=32,
)

Здесь параметры input_dims и hidden_dims определяет общую структуру нейросети

dims = [input_dim, 16, 9, 1]
#           ↑       ↑  ↑  ↑
#        слой 0     1  2  3 (выходной)

При этом матрицы весов будут иметь следующие размерности $[\text{rows}\times \text{cols}]$ в этом примере:

$$ \begin{align} &W^{(1)} \;\;[16 \times 8] & \text{соединяет слой 0 → слой 1}\\ &W^{(2)} \;\;[9 \times 16] & \text{соединяет слой 1 → слой 2}\\ &W^{(3)} \;\;[1 \times 9] & \text{соединяет слой 2 → слой 3}\\ \end{align} $$

Мы сделаем веса и смещения внутренними свойствами модели, то есть

class BernoulliNet:

    def __init__(self,
                 input_dim: int,
                 hidden_dims: list[int],
                 lr: float = 0.01,
                 epochs: int = 100,
                 batch_size: int = 32,
                 ) -> None:
        # Размерность всей нейросети
        dims: list[int] = [input_dim] + hidden_dims + [1]
        # Инициализация весов
        self.W, self.b = self._init_weights(dims)

Поскольку мы будем пробегать по значениям $W^{(1)}, W^{(2)}, W^{(3)}$ в компьютерной нотации это будут элементы массива с индексами self.W[0], self.W[1], self.W[2]. Аналогично смещения $b^{(1)}, b^{(2)}, b^{(3)}$ будут хранится в массиве self.b[0], self.b[1], self.b[2]

В общем случае

  • матрица весов self.W[layer-1] соответствует $W^{(l)}$ размерностью $[n^{(l)}\times n^{(l-1)}]$.
  • вектор-столбец смещений self.b[layer-1] соответствует вектору-столбцу $\vec{\mathbf{b}}^{(l)}$ размерностью $[n^{(l)}\times 1]$.

Типы данных

Мы договорились ранее, что нижние индексы обозначают вектор-столбец $x_i.$ Формально — это матрица размерностью $n$ строк и одним столбцом $[n\times1]$.

Верхние индексы $x^i$ обозначают сопряженный вектор, или вектор-строку. Формально — это матрица размерностью $[1\times n]$.

Любые матрицы, наполненные десятичными числами в библиотеке NumPy — это объекты NDArray[np.float64], поэтому чтобы отличать их размерности мы введем явную аннотацию типов данных, которыми мы будем оперировать в коде:

import numpy as np
from numpy.typing import NDArray

# Геометрические типы
# ===================

# [n × 1] — вектор-столбец
Vector = NDArray[np.float64]

# [1 × n] — вектор-строка
CoVector = NDArray[np.float64]

# [m × n] — матрица весов
Matrix = NDArray[np.float64]

# Семантические типы
# ==================

# [n × 1] — один пример (набор входных x⃗ или промежуточных h⃗)
Sample = NDArray[np.float64]

# [n × N] — N примеров (N наборов входных x⃗ или промежуточных h⃗ для всего батча)
Batch = NDArray[np.float64]

# [n × N_total] — весь датасет признаков x⃗ из AstroDataBank
Dataset = NDArray[np.float64]

Это никак не влияет на функциональность — мы вводим эти аннотации исключительно для удобства восприятия кода.

Функции активации для нейронной сети

Прежде всего мы реализуем функции активации в нейронной сети, которые понадобятся нам далее.

Выходной слой

Напомню, что функция активации выходного слоя — сигмоида $\sigma(z^i) = \dfrac{1}{1 + e^{-z^i}}$ — применяется для каждого $i$-го примера в батче. Вместо применения сигмоиды к каждому $z^i$ мы можем делегировать это поэлементное вычисление библиотека NumPy которая выполняет эту задачу на скомпилированном C-коде, что даёт существенный выигрыш в скорости. Для этого достаточно передать в функцию сразу весь набор из $N$ примеров. то есть вектор-строку $\vec{\mathbf{z}}$ размерностью $[1\times N]$:

def sigmoid(z: Batch) -> Batch:
    return 1.0 / (1.0 + np.exp(-z))

Промежуточные слои

Функция активации промежуточных слоев $\text{ReLU}(z^i) = \max(0, z^i)$ для каждого $i$-го примера в батче реализуется по тому же принципу — мы делегируем NumPy поэлементное применение функции, передавая ей весь набор из $N$ примеров:

def relu(z: Batch) -> Batch:
    return np.maximum(0, z)

Производная функции ReLU равна 1 для $z^i>0$ и 0 во всех остальных случаях. Мы можем представить это в виде строки логических значений ${z^i>0}=(\text{True} \mid \text{False})^i$, в котором затем мы поэлементно преобразуем каждое значение $\text{True}_i$ в 1.0 и каждое $\text{False}_i$ — в 0.0 через метод .astype() библиотеки NumPy:

def d_relu(z: Batch) -> Batch:
    result: Batch = (z > 0).astype(np.float64)
    return result

Градиентный спуск для нейронной сети

Функция потерь

Напомню, что функция потерь для всех примеров из батча описывается формулой:

$$ \mathcal{L} = -\frac{1}{N}\sum_{i=1}^{N}\left[y^i \ln \hat{p}^i + (1 - y^i)\ln(1 - \hat{p}^i)\right] $$

где $\hat{p}^i$ — предсказанная вероятность конкретного события на примере $i$-го гороскопа, а $y^i$ — метка реального события (1, если события произошло и 0 — если иначе). В библиотеке NumPy есть встроенная функция усреднения mean, которая эквивалентна $\frac{1}{N}\sum_1^N$, поэтому функцию потерь можно реализовать так:

def loss(self, p_hat: Batch, y: Batch) -> float:
    """
    Бинарная кросс-энтропия (логарифмическое правдоподобие Бернулли)
    """
    return float(
        -np.mean(
            y * np.log(p_hat + 1e-9)
            + (1 - y) * np.log(1 - p_hat + 1e-9)
        )
    )

Здесь p_hat и y — это векторы-строки размерностью $[1\times N]$

Инициализация весов и смещений

Мы говорили в первой части статьи, что для стабильного обучения надо, чтобы веса в каждом слое $l$ были инициированы случайными числами с нулевым средним и дисперсией $2/(n^{(l-2)})$, где $n^{(l-1)} $ — число нейронов в предыдущем слое.

Для одного слоя

В библиотеке NumPy инициализацию матрицу весов $W^{(l)}$ размерностью $[n^{(l)}\times n^{(l-1)}]$ для каждого $l$-го слоя можно реализовать так:

W_layer: Matrix = np.random.normal( # Нормальное распределение
    loc=0.0,                        # среднее значение
    scale=np.sqrt(2.0 / n_prev),    # Дисперсия
    size=(n_curr, n_prev),          # Размер матрицы 
).astype(np.float64)        # десятичные числа в ячейках матрицы

Также мы должны инициализировать нулями вектор-столбец смещения $b^{(l)}$ размерностью $[n^{(l)}\times 1]$ для каждого $l$-го слоя. В нотации NumPy это можно реализовать так:

b_layer: Vector = np.zeros( # Массив нулей
    (n_curr, 1),            # в виде столбца
    dtype=np.float64        # в десятичной форме, т.е. 0.0
)

Для всех слоев

Чтобы инициализировать параметры во всех слоях нейросети мы просто последовательно применяем эти формулы на стыке соседних слоев:

for layer in range(len(dims) - 1):
    n_prev = dims[layer]      # число нейронов в предыдущем слое
    n_curr = dims[layer + 1]  # число нейронов в текущем слое

    # Далее инициируем матрицы весов и вектора смещений

После этого мы добавляем эти первичные значения в результирующий список весов и смещений для всех слоев нейросети self.W[0], self.W[1], self.W[2] и self.b[0], self.b[1], self.b[2], что соответствует $W^{(1)}, W^{(2)}, W^{(3)}$ и $b^{(1)}, b^{(2)}, b^{(3)}$ в наших формулах.

Прямой проход

Как вы помните из предыдущей статьи после инициации весов и смещений мы сначала совершаем прямой проход, применяя для каждого $i$-го примера $(\vec{\mathbf{x}}, y)^i$ в батче формулу:

$$ \begin{align} &\vec{\mathbf{h}}^{(0)\,i} = \vec{\mathbf{x}}^i \\ &\vec{\mathbf{h}}^{(l)\,i} = \text{ReLU}\left(W^{(l)}\vec{\mathbf{h}}^{(l-1)\,i} + \vec{\mathbf{b}}^{(l)\,i}\right), \;\; l= 1, 2,..., L-1 \\ &\hat{p}^i = \sigma\left(\vec{\mathbf{w}}^{(L)\,i}\;\vec{\mathbf{h}}^{(L-1)\,i} + b^{(L)\,i}\right) \\ \end{align} $$

и затем мы сохраняем эти значения в кэш, чтобы использовать их далее в обратном проходе.

Как обычно, вместо циклического применения этих формул к каждому примеру отдельно, с точки зрения скорости вычисления эффективней передать сразу весь массив примеров библиотеке NumPy — ее матричные вычисления существенно быстрее, чем обычный цикл перебора на языке Python.

Нулевой слой

В нулевом слое для каждого $i$-го примера в батче $\vec{\mathbf{h}}^{(0)\,i} = \vec{\mathbf{x}}^i$ — вектор-столбец размерностью $[\text{input_dim} \times 1]$. А для всех примеров в батче этот вектор превращается в матрицу $H^{(0)}=X$ размерностью $[\text{input_dim}\times N]$.

В нотации NumPy эту матрицу можно задать так:

H: Batch = X 
H_cache: list[Batch] = [H]

В первой строке мы делаем поэлементное присвоение. А во второй строке мы инициализируем временное хранилище H_cache матриц $H^{(0)}, H^{(1)}, ... H^{(L-1)}$, которые понадобятся нам при обратном проходе. Для инициализации мы просто заносим значение $H^{(0)}$ в массив H_cache. Итого

  • H_cache[0], H_cache[1], H_cache[2]соответствует матрицам $H^{(0)}, H^{(1)}, H^{(2)}$
  • Размерность каждой матрицы $H^{(l)}$ — $[n^{(l)}\times N]$.

Скрытые слои

Напомню, что выходной результат $\vec{\mathbf{h}}^{(l)} = \text{ReLU}\left(W^{(l)} \vec{\mathbf{h}}^{(l-1)} + \vec{\mathbf{b}}^{(l)}\right)$ каждого скрытого слоя $l$ представляет собой вектор-столбец размерностью $[n^{(l)}\times 1]$ для отдельного взятого примера. Но для всех $N$ примеров в батче — это уже матрица $H^{(l)}$ размером $[n^{(l)} \times N]$. По-сути — это результат активации $l$-го скрытого слоя для всего набора $N$ примеров в батче.

В нотации NumPy эти матрицы можно реализовать так:

for W, b in zip(self.W[:-1], self.b[:-1]):
    H = relu(W @ H + b)
    H_cache.append(H)

Здесь в первой строке внутри цикла:

  • Переменная H — это выход предыдущего слоя $H^{(l-1)}$ размерностью $[n^{(l-1)}\times N]$.
  • Переменная W — это self.W[layer-1], то есть матрица $W^{(l)}$ размерностью $[n^{(l)}\times n^{(l-1)}]$
  • b — это вектор self.b[layer-1] размерностью $[n^{(l)}\times 1]$, который NumPy автоматически расширяет до матрицы $[n^{(l)}\times N]$ простым повторением столбцов, чтобы обеспечить операцию сложения.
  • W @ H + b — это матрица размером $[n^{(l)}\times N]$, к которой поэлементно применяется ReLU
  • Результат поэлементного применения функции активации заносится в переменную H, которая соответствует $H^{(l)}$ в нашей формуле и будет использоваться в следующем цикле.

Во второй строке цикла мы просто заносим это результирующее значение $H^{(l)}$ в кэш для использования в обратном проходе.

Обычный цикл Питона обеспечивает проход по всем слоям. Мы берем набор (self.W[layer], self.b[layer]) за минусом последнего слоя, что соответствует $(W^{(l)},\vec{\mathbf{b}}^{(l)}), l=1...L-1$ в наших формулах.

Здесь функция zip склеивает массивы $W^{(l)}$ и $\vec{\mathbf{b}}^{(l)}$ в массив $(W^{(l)},\vec{\mathbf{b}}^{(l)})$, а нотация [:-1] означает, что мы пробегаемся по всем значения self.W[0], self.W[1], ... и self.b[0], self.b[1], ..., кроме последнего, то есть по $W^{(1)}, W^{(2)}, ... W^{(L-1)}$ и $b^{(1)}, b^{(2)}, ..., b^{(L-1)}$.

Выходной слой

Для выходного слоя мы применяем формулу $\hat{p}^{\,i} = \sigma\left(\left(\vec{\mathbf{w}}^{(l)\,i}\right)^{\text{T}}\;\vec{\mathbf{h}}^{(l-1)\,i} + b^{(l)\,i}\right)$ для каждого $i$-го примера. Для одного примера — это скаляр, то есть матрица $[1\times 1]$, а для всего батча примеров — это вектор-строка размером $[1\times N]$. Как обычно, мы передаем библиотеке NumPy сразу весь набор примеров из батча для быстрого последовательного применения сигмоиды:

p_hat: Batch = sigmoid(
    self.W[-1] @ H + self.b[-1]
)

Здесь нотация [-1] означает, что мы берем последний элемент массива self.W, то есть матрицу $W^{(L)}$ выходного слоя размерностью $[n^{(L)}\times n^{(L-1)}] = [1\times n^{(L-1)}]$, умножаем ее на выход предыдущего слоя $H^{(L-1)}$ размером $[n^{(L-1)}\times N]$, что дает нам вектор-строку размером $[1\times N]$, и далее мы прибавляем последний элемент списка self.b размерностью $[1\times 1]$, который библиотека NumPy преобразует в вектор-строку размером $[1\times N]$ простым повторением числа b $N$ раз.

В итоге мы получаем вектор-строку $\vec{\mathbf{\hat{p}}}$ с с числами от 0 до 1 — вероятностями наступления события в каждом из примеров в батче.

Обратный проход

Для обратного прохода нам сначала надо вычислить дельту выходного слоя, а затем по рекуррентной формуле — дельты оставшихся слоев.

Дельта выходного слоя

Напомню, что дельта выходного слоя для каждого $i$-го примера — это скаляр $\delta^{(L)\;i} = \frac{1}{N}\sum_{k=1}^{N}(\hat{p}_k - y_k)^i.$ Но для всех примеров в батче — это вектор-строка размером $[1\times N]$.

В нотации библиотеки NumPy этот вектор реализуется так:

delta: Batch = (p_hat - y) / N

Здесь операция вычитания двух массивов (p_hat - y) вычисляет разность для каждого элемента вектора батча поэлементно, а при операции деления на скаляр N автоматически выполняет усреднение.

Дельты промежуточных слоев

Как вы помните, для промежуточных слоев рекуррентная формула для дельты каждого слоя для каждого $i$-го примера выглядит так: $$ \vec{\boldsymbol{\delta}}^{(l)\,i} = \left(W^{(l+1)}\right)^T \vec{\boldsymbol{\delta}}^{(l+1)\,i} \odot f'(\vec{\mathbf{z}}^{(l)\,i}) $$

Шаг 1

Сначала реализуем вычисление $\vec{\mathbf{z}}^{(l)\,i}.$

Для каждого $i$-го примера $\vec{\mathbf{z}}^{(l)\,i} = \left(W^{(l)}\right)^\text{T}\cdot \vec{\mathbf{h}}^{(l-1)\,i} + \vec{\mathbf{b}}^{(l)}$ — это вектор-столбец размерностью $[n^{(l)}\times 1]$, где $n^{(l)} $ — число нейронов в $l$-м слое. А для всего батча примеров — это матрица $Z^{(l)}$ размерностью $[n^{(l)}\times N].$

В нотации NumPy мы можем записать это в виде:

z_cache[layer] = self.W[layer] @ H_cache[layer] + self.b[layer]

Напомню, что H_cache — это временное хранилище (кэш) матриц $H^{(0)}, H^{(1)}, ... H^{(L-1)}$, которые мы создали при прямом проходе. Каждая из матриц имеет размерность $[n^{(l)}\times N]$, а self.W[layer] имеет размерность $[n^{(l+1)}\times n^{(l)}]$, поэтому матричное произведение self.W[layer] @ H_cache[layer] имеет размерность $[n^{(l+1)}\times N]$.

Отдельный элемент self.b[layer] имеет размерность $[n^{(l+1)}\times 1]$, но NumPy автоматически расширяет его до матрицы $[n^{(l+1)}\times N]$ простым повторением столбцов.

Итого:

  • Элементы массива z_cache[0], z_cache[1], ... соответствуют $Z^{(1)}, Z^{(2)}, ...$
  • Каждая матрица $Z^{(l)}$ имеет размерность $[n^{(l)}\times N].$

Шаг 2

Теперь, когда нам известны $Z^{(l)}$ для всех скрытых слоев $l=1, 2, …, L-1$, мы можем перейти к реализации дельты скрытых слоев. Для каждого скрытого слоя для $i$-го примера $\vec{\boldsymbol{\delta}}^{(l)\,i} = \left(W^{(l+1)}\right)^T \vec{\boldsymbol{\delta}}^{(l+1)\,i} \odot f'(\vec{\mathbf{z}}^{(l)\,i})$ — это вектор-столбец размерностью $[n^{(l)}\times 1]$, а для всего батча из $N$ примеров это уже матрица размерностью $[n^{(l)}\times N]$: $$ \Delta^{(l)} = \left(W^{(l+1)}\right)^T \Delta^{(l+1)\;i} \odot f'(Z^{(l)}) $$

где $l$ пробегает значения $L-1, L-2, …, 1$.

В нотации NumPy эту матрицу для каждого слоя можно реализовать вот так:

for layer in range(len(self.W) - 1, 0, -1):
    delta = (self.W[layer].T @ delta) * d_relu(z_cache[layer - 1])
  • Здесь мы проходим в обратном порядке по индексам layer от предпоследнего до нулевого, что соответствует скрытым слоям $l=L, L-1, ...1$.

  • Затем в каждом слое мы производим матричное умножение self.W[layer].T @ delta, что соответствует умножению матрицы $\left(W^{(l+1)}\right)^\text{T}$ размером $[n^{(l+1)}\times n^{(l)}]^\text{T}$ или $[n^{(l)}\times n^{(l+1)}]$ на $\Delta^{(l+1)}$ размерностью $[n^{(l+1)}\times N]$ для всего батча. Результирующая матрица имеет размерность $[n^{(l)}\times N].$

  • Наконец, мы поэлементно умножаем матрицу размером $[n^{(l)}\times N]$ на производную $f'(Z^{(l)})$ той же размерности.

Градиенты по весам

Теперь, когда нам известен дельта-вектор $\vec{\boldsymbol{\delta}}^{(L)}$ выходного слоя размерностью $[1\times N]$ для всего батча и каждая последующая дельта-матрица $\Delta^{(l)}$ скрытого слоя, мы можем реализовать формулы градиентов.

Напомню, что для выходного слоя слоя

  • Для одного примера $(\nabla_{\vec{\mathbf{w}}^{(L)}} \mathcal{L}) = \delta^{(L)} \cdot \left(\vec{\mathbf{h}}^{(L-1)}\right)^\text{T}$ — это вектор размером $[1\times n^{(L-1)}]$
  • Для всего батча примеров — это произведение вектора-строки $\vec{\boldsymbol{\delta}}^{(L)}$ размерностью $[1\times N]$ на транспонированную матрицу $\left(H^{(L-1)}\right)^\text{T}$ размерностью $[N\times n^{(L-1)}]$. То есть результат произведения — это матрица размером $[1\times n^{(L-1)}]$

$$ \nabla_{W^{(L)}} \mathcal{L} = \vec{ \boldsymbol{\delta}}^{(L)} \cdot \left(H^{(L-1)}\right)^\text{T} $$

Для скрытых слоев

  • Для одного примера $\nabla_{W^{(l)}} \mathcal{L} = \vec{ \boldsymbol{\delta}}^{(l)} \cdot \left(\vec{\mathbf{h}}^{(l-1)}\right)^\text{T}$ — это матрица размером $[n^{(l)}\times n^{(l-1)}]$
  • Для всего батча примеров — это произведение матрицы $\Delta^{(l)}$ размерностью $[n^{(l)}\times N]$ на транспонированную матрицу $\left(H^{(l-1)}\right)^\text{T}$ размерностью $[N\times n^{(l-1)}]$. То есть результат произведения — это матрица размером $[n^{(l)}\times n^{(l-1)}]$:

$$ \nabla_{W^{(l)}} \mathcal{L} = \Delta^{(l)} \cdot \left(H^{(l-1)}\right)^\text{T} $$

Эти формулы реализуются в NumPy одной строкой матричного произведения:

delta @ H_cache[layer - 1].T

Здесь delta — это матрица размером $[n^{(l)}\times n^{(l-1)}]$. Она принимает размерность $[1\times n^{(L-1)}]$ для последнего слоя, так как в нашей сети в выходном слое только 1 нейрон: $n^{(L)} = 1.$

cache[layer - 1] — это элемент сохраненного при прямом проходе массива $H^{(0)}, H^{(1)}, ... H^{(L)},$ который соответствует предыдущему слою.

Так как в обратном проходе мы идем по всем слоям от последнего к первому, мы сохраняем эти градиенты в массив в обратном порядке:

# Массив градиентов по каждому слою
dW: list[Matrix] = []

# Начинаем с выходного слоя
delta: Batch = (p_hat - y) / N
dW.insert(0, delta @ H_cache[-1].T)

# Затем рекуррентно движемся к первому слою
for layer in range(len(self.W) - 1, 0, -1):
    delta = (self.W[layer].T @ delta) * d_relu(z_cache[layer - 1])
    dW.insert(0, delta @ H_cache[layer - 1].T)

Градиенты по смещениям

Аналогичным образом мы теперь можем реализовать формулы градиентов по смещению.

Напомню, что для выходного слоя слоя

  • Для одного примера $\nabla_{b^{(L)}} \mathcal{L} = \dfrac{\partial \mathcal{L}}{\partial z^{(L)}} \dfrac{\partial z^{(L)}}{\partial b^{(L)}} = \delta^{(L)} \cdot 1$ — это скаляр размерностью $[1\times 1]$

  • Для $N$ примеров — это произведение вектора-строки $\vec{\boldsymbol{\delta}}^{(l)}$ для размерностью $[1\times N]$ всего батча на единичный вектор-строку $\vec{1}$ размерностью $[N\times 1],$ то есть градиент по смещению — это скаляр размерностью $[1\times 1]$

Для скрытых слоев

  • Для одного примера $\nabla_{\vec{\mathbf{b}}^{(l)}} \mathcal{L} = \vec{\boldsymbol{\delta}}^{(l)}\cdot 1$ — это вектор размером $[n^{(l)}\times 1]$
  • Для всех примеров батча — это произведение матрицы $\Delta^{(l)}$ размером $[n^{(l)}\times N]$ на единичный вектор-столбец $\vec{1}$ размерностью $[N\times 1]$, то есть результирующий градиент — это вектор-столбец размером $[n^{(l)}\times 1]$.

Мы можем реализовать этот градиент через матричное произведение в NumPy:

delta @ np.ones((N, 1))

Но еще быстрее — просто просуммировать дельты для всех примеров в батче

delta.sum(axis=1, keepdims=True)

Здесь delta — это матрица $\Delta^{(l)}$ размером $[n^{(l)}\times N]$. В частности для выходного слоя она имеет размерность $[1\times N]$.

sum(axis=1) означает, что мы суммируем по столбцам, то есть по N. Параметр keepdims=True означает, что мы учитываем размерность матрицы после суммирования.

Так как в обратном проходе мы идем по всем слоям от последнего к первому, мы сохраняем эти градиенты в массив в обратном порядке:

# Массив градиентов по каждому слою
db: list[Vector] = []

# Начинаем с выходного слоя
delta: Batch = (p_hat - y) / N
db.insert(0, delta.sum(axis=1, keepdims=True))

# Затем рекуррентно движемся к первому слою
for layer in range(len(self.W) - 1, 0, -1):
    delta = (self.W[layer].T @ delta) * d_relu(z_cache[layer - 1])
    db.insert(0, delta.sum(axis=1, keepdims=True))

Общий алгоритм градиентного спуска

Шаг 1. Перемешивание датасета в начале эпохи

Для каждой эпохи мы берем весь датасет и перемешивает его. Это можно реализовать так:

N_total: int = X.shape[1]
for epoch in range(1, self.epochs + 1):
    idx = np.random.permutation(N_total)
    X, y = X[:, idx], y[:, idx]

Здесь Х — это астрологические признаки всех примеров взятых из AstroDataBank, то есть матрица размером $[\text{input_dim}\times N_\text{total}]$.

Сначала мы берем индексы $i$ всех примеров и перемешиваем их, затем пересобираем примеры $(\vec{\mathbf{x}, y})_i$ в порядке новых (перемешанных) индексов.

Шаг 2. Разбиение датасета на батчи

Далее мы разбиваем датасет на батчи и начинаем обучение на каждом отдельном батче из N примеров. Это разбиение можно реализовать так:

for s in range(0, N_total, self.batch_size):
    Xb: Batch = X[:, s: s + self.batch_size]
    yb: Batch = y[:, s: s + self.batch_size]

Здесь мы проходим порядковые номера элементов датасета $0, 1, ..., N_\text{total}$ с шагом $N$ (размером батча), то есть последовательно выбираем $s$-й элемент, кратный размеру батча.

Затем мы выделяем из датасета две подматрицы — $X_\text{batch}$ размером $[\text{input_dim}\times N]$ и $y_\text{batch}$ размером $[1\times N]$ простым выбором столбцов в диапазоне от индекса $s$ до $s+N$. Эти две подматрицы вместе образуют батч из $N$ примеров $(\vec{\mathbf{x}, y})_i$.

Шаг 3. Прямой и обратный проходы

Теперь мы должны пройтись прямым проходом по нашему батчу и сохранить результаты активации слоев — вектор-строку $\vec{\mathbf{\hat{p}}}$ для выходного слоя и матрицы $H^{(l)}$ для всех промежуточных слоев.

Затем используя эти сохраненные величины мы совершаем обратный проход и сохраняем градиенты по весам и смещениям для всех слоев.

Этот шаг можно реализовать так:

p_hat, H_cache = forward(Xb)
dW, db = backward(p_hat, yb, H_cache)

Шаг 4. Обновление параметров

Градиенты указывают в сторону роста функции потерь, значит нам надо сделать небольшой шаг в противоположном направлении для каждого примера:

$$ \begin{align} & W^{(l)} = W^{(l)} - \lambda\cdot\nabla_{W^{(l)}} \mathcal{L}\\ & \vec{\mathbf{b}}^{(l)} = \vec{\mathbf{b}}^{(l)} - \lambda\cdot \nabla_{\vec{\mathbf{b}}^{(l)}} \mathcal{L} \end{align} $$

Это обновление параметров можно реализовать через обычный цикл Python по всем слоям:

self.W = [W - self.lr * dw for W, dw in zip(self.W, dW)]
self.b = [b - self.lr * db_ for b, db_ in zip(self.b, db)]

Полный код

Теперь мы можем собрать все вместе в единый код. Этот код эмулирует пример астрологических признаков и меток событий и проводит на их основе тренинг в небольшой нейросети.

Затем код оценивает точность предсказаний на этом же датасете.

import numpy as np
from numpy.typing import NDArray
from typing import cast
# pyright: reportConstantRedefinition=false

# Геометрические типы
Vector = NDArray[np.float64]    # [n × 1]   — столбец
CoVector = NDArray[np.float64]  # [1 × n]   — строка
Matrix = NDArray[np.float64]    # [m × n]   — матрица весов

# Семантические типы
Sample = NDArray[np.float64]    # [n × 1]       — один пример x⃗
Batch = NDArray[np.float64]     # [n × N]       — N примеров
Dataset = NDArray[np.float64]   # [n × N_total] — весь датасет


# Функции активации и их производные
def sigmoid(z: Batch) -> Batch:
    return 1.0 / (1.0 + np.exp(-z))


def relu(z: Batch) -> Batch:
    return np.maximum(0, z)


def d_relu(z: Batch) -> Batch:
    result: Batch = (z > 0).astype(np.float64)
    return result


class BernoulliNet:
    """
    Нейросеть для бинарной классификации (распределение Бернулли).

    Параметры
    ---------
    input_dim   : размер входного вектора x
    hidden_dims : список размеров скрытых слоёв, напр. [64, 32]
    lr          : шаг градиентного спуска λ
    epochs      : число эпох
    batch_size  : размер мини-батча
    """

    W: list[Matrix]
    b: list[Vector]
    lr: float
    epochs: int
    batch_size: int

    def __init__(self,
                 input_dim: int,
                 hidden_dims: list[int],
                 lr: float = 0.01,
                 epochs: int = 100,
                 batch_size: int = 32,
                 ) -> None:
        dims: list[int] = [input_dim] + hidden_dims + [1]

        # Инициализация весов: W ~ N(0, 2/n_prev)
        self.W, self.b = self._init_weights(dims)
        self.lr, self.epochs, self.batch_size = lr, epochs, batch_size

    def _init_weights(self, dims: list[int]) -> tuple[list[Matrix], list[Vector]]:
        """
        Инициаализирует веса W и смещения b для всех слоев:
        W ~ N(μ=0, σ²=2/n_prev).
        """
        W_list: list[Matrix] = []
        b_list: list[Vector] = []

        for layer in range(len(dims) - 1):
            n_prev = dims[layer]      # число нейронов в предыдущем слое
            n_curr = dims[layer + 1]  # число нейронов в текущем слое

            # Случайная матрица с нормальным распределением N(μ=0, σ²=1=2/n_prev),
            W_layer: Matrix = np.random.normal(
                loc=0.0,
                scale=np.sqrt(2.0 / n_prev),
                size=(n_curr, n_prev),
            ).astype(np.float64)

            W_list.append(W_layer)

            # Вектор смещений — вектор-столбец нулей
            b_layer: Vector = np.zeros((n_curr, 1), dtype=np.float64)
            b_list.append(b_layer)

        return W_list, b_list

    def forward(self, X: Batch) -> tuple[Batch, list[Batch]]:
        """
        Прямой проход по сети: возвращает предсказания p̂ для всех примеров в батче
        и кэш активаций Hᵢ для для обратного прохода (для всех примеров на каждом слое).

        Параметры
        ---------
        X : Batch  матрица астрологических признаков размером [input_dim × N]
        """

        # Прямой проход по нулевому слою:
        H: Batch = X
        H_cache: list[Batch] = [H]

        # Прямой проход по скрытым слоям:
        for W, b in zip(self.W[:-1], self.b[:-1]):
            H = relu(W @ H + b)  # размер [n_current × N]
            H_cache.append(H)

        # Прямой проход по выходному слою:
        p_hat: Batch = sigmoid(
            self.W[-1] @ H + self.b[-1])  # размер [1 × N]

        return p_hat, H_cache

    def loss(self, p_hat: Batch, y: Batch) -> float:
        """
        Бинарная кросс-энтропия (логарифмическое правдоподобие Бернулли)
        """
        return float(
            -np.mean(
                y * np.log(p_hat + 1e-9)
                + (1 - y) * np.log(1 - p_hat + 1e-9)
            )
        )

    def backward(self,
                 p_hat: Batch,
                 y: Batch,
                 H_cache: list[Batch]
                 ) -> tuple[list[Matrix], list[Vector]]:
        """
        Обратный проход по сети: возвращает градиенты dW и db 
        для всех слоев. 

        Параметры
        ----------
        p_hat : Batch  размер [1 × N]  — предсказания для всех примеров в батче
        y     : Batch  размер [1 × N]  — метки событий для всех примеров в батче
        cache : list[Batch]  — сохраненные активации H⁽ˡ⁾ для каждого слоя, размер [n⁽ˡ⁾ × N]
        """
        N: int = y.shape[1]
        dW: list[Matrix] = []
        db: list[Vector] = []

        # Выходной слой:
        delta: Batch = (p_hat - y) / N
        dW.insert(0, delta @ H_cache[-1].T)
        db.insert(0, delta.sum(axis=1, keepdims=True))

        # Скрытые слои:
        z_cache: list[Batch] = [
            self.W[layer] @ H_cache[layer] + self.b[layer]
            for layer in range(len(self.W) - 1)
        ]

        # Обратный проход по скрытым слоям
        for layer in range(len(self.W) - 1, 0, -1):
            delta = (self.W[layer].T @ delta) * d_relu(z_cache[layer - 1])
            dW.insert(0, delta @ H_cache[layer - 1].T)
            db.insert(0, delta.sum(axis=1, keepdims=True))

        return dW, db

    def train(self, X: Dataset, y: Dataset, verbose: bool = True) -> None:
        """
        X : Dataset  shape [input_dim × N_total]  — признаки всех примеров в датасете
        y : Dataset  shape [1 × N_total]  — метки реальных событий {0, 1}
        """
        N_total: int = X.shape[1]
        for epoch in range(1, self.epochs + 1):
            idx = np.random.permutation(N_total)
            X, y = X[:, idx], y[:, idx]
            for s in range(0, N_total, self.batch_size):
                Xb: Batch = X[:, s: s + self.batch_size]
                yb: Batch = y[:, s: s + self.batch_size]
                p_hat, H_cache = self.forward(Xb)
                dW, db = self.backward(p_hat, yb, H_cache)
                self.W = [W - self.lr * dw for W, dw in zip(self.W, dW)]
                self.b = [b - self.lr * db_ for b, db_ in zip(self.b, db)]
            if verbose and epoch % 10 == 0:
                p_hat, _ = self.forward(X)
                print(f"Эпоха {epoch:4d} | потери = {self.loss(p_hat, y):.4f}")

    def predict_probability(self, X: Dataset) -> Dataset:
        """Возвращает p̂ ∈ (0,1) события для каждого столбца батча."""
        p_hat, _ = self.forward(X)
        return p_hat

    def predict_label(self, X: Dataset, threshold: float = 0.5) -> Dataset:
        """Возвращает наблюдаемые бинарные метки события y = {0, 1}."""
        result: Batch = (self.predict_probability(X) >=
                         threshold).astype(np.float64)
        return result


# Пример использования
if __name__ == "__main__":
    np.random.seed(42)
    N = 500

    # Эмулируем астрологические признаки (нормированы в [0, 1])
    X: Dataset = np.random.rand(8, N).astype(np.float64)

    # Эмулируем бинарные метки событий (например, "сбылось" или "не сбылось")
    y: Dataset = (X[0] + X[3] > 1.0).astype(np.float64).reshape(1, N)

    # Инициализация и обучение модели
    model = BernoulliNet(
        input_dim=8,
        hidden_dims=[16, 9],
        lr=0.05,
        epochs=100,
        batch_size=32,
    )
    model.train(X, y)

    # Оценка точности на трейне
    accuracy: float = cast(float, np.mean(model.predict_label(X) == y))
    print(f"\nТочность на трейне: {accuracy:.2%}")

В реальном сценарии, оценка точности происходит сначала на данных для валидации для улучшения входных параметров модели, а затем такая же оценка точности происходит на тестовых данных. А найденные веса и смешения необходимо сохранить в файл для дальнейшего использования.

Также в реальном сценарии вместо библиотеки NumPy надо использовать ее аналог CuPy для быстрого вычисления на GPU.


Алексей Бореалис (Марк Русборн)

Алексей Бореалис (Марк Русборн)

Магистр наук (MSc), профессиональный астролог (MAPAI). Об авторе