机器学习代码汇总
1. 目标与评价
损失函数
import numpy as np
class MSE:
def loss(self, y_true, y_pred):
return 0.5 * np.power(y_true - y_pred, 2)
def gradient(self, y_true, y_pred):
# 损失函数对 y_pred (网络最后一层输出)的梯度
return -1 * (y_true - y_pred)
class CrossEntropyLoss:
"""
pytorch中labels的格式:nn.CrossEntropyLoss()是一维的类别, bce是one hot的多维
ln(x)的导数 1/x,exp(x)的导数exp(x)
"""
def loss(self, labels, logits, epsilon=1e-12):
"""
labels = np.array([[0, 1], [1, 0]])
logits = np.array([[0.1, 0.9], [0.8, 0.2]])
"""
logits = np.clip(logits, epsilon, 1. - epsilon)
return -np.mean(np.sum(labels * np.log(logits), axis=1))
def gradient(self, labels, logits, epsilon=1e-12):
logits = np.clip(logits, epsilon, 1. - epsilon)
return -labels / logits
class CrossEntropyLoss2:
def loss(self, labels, logits, epsilon=1e-12):
""" 类似线形回归的shape
labels = np.array([1, 0])
logits = np.array([0.9, 0.2])
"""
logits = np.clip(logits, epsilon, 1. - epsilon)
return np.mean(- labels * np.log(logits) - (1 - labels) * np.log(1 - logits))
def gradient(self, labels, logits, epsilon=1e-12):
logits = np.clip(logits, epsilon, 1. - epsilon)
return - (labels / logits) + (1 - labels) / (1 - logits)
focal loss
import torch
from torch import nn
class FocalLoss(nn.Module):
def __init__(self, gamma, eps=1e-7):
super().__init__()
self.gamma = gamma
self.eps = eps
def forward(self, preds, targets):
preds = preds.clamp(self.eps, 1 - self.eps)
loss = (1 - preds) ** self.gamma * targets * torch.log(preds) + preds ** self.gamma * (1 - targets) * torch.log(1 - preds)
return -torch.mean(loss)
指标
AUC
cross validation
2. 统计学习模型
线形回归: native
线性回归: numpy
逻辑回归: native
逻辑回归: numpy
决策树分类
import numpy as np
class TreeNode:
def __init__(self):
pass
class DecisionTree:
def __init__(self):
pass
决策树回归
Xgboost回归
Xgboost分类
K-means
import numpy as np
class KMeansCluster:
"""
examples: (n_example, n_feature)
distance: (n_example, k)
clusters: (n_example,)
"""
def __init__(self, k, max_iter=100, tolerance=1e-4):
self.k = k # number of clusters
self.max_iter = max_iter # maximum number of iterations
self.tolerance = tolerance # convergence tolerance
def fit(self, examples):
n_examples, n_features = examples.shape
centroids = examples[np.random.choice(n_examples, self.k, replace=False)]
# Placeholder for storing cluster assignments
clusters = np.zeros(n_examples)
for _ in range(self.max_iter):
# Step 1: Calculate distance between each example and centroids
distances = np.zeros((n_examples, self.k))
for i in range(self.k):
distances[:, i] = self.euclidean_distance(examples, centroids[i])
# Step 2: Assign each example to the closest centroid
new_clusters = np.argmin(distances, axis=1)
# Step 3: Check for convergence (if assignments haven't changed)
if np.all(clusters == new_clusters):
break
clusters = new_clusters
# Step 4: Update centroids by calculating the mean of the assigned points
for i in range(self.k):
centroids[i] = examples[clusters == i].mean(axis=0)
self.centroids = centroids
self.clusters = clusters
def predict(self, examples):
distances = np.zeros((examples.shape[0], self.k))
for i in range(self.k):
distances[:, i] = self.euclidean_distance(examples, self.centroids[i])
return np.argmin(distances, axis=1)
def euclidean_distance(self, a, b):
return np.sqrt(np.sum((a - b) ** 2, axis=-1))
KNN
PCA
from numpy.linalg import svd
3. 深度学习模型
MLP-numpy
import numpy as np
class Dense:
def __init__(self, input_dim, output_dim):
self.input_dim = input_dim
self.output_dim = output_dim
self.weight = np.random.randn(input_dim, output_dim) * 0.01 # (input_dim, output_dim)
self.bias = np.zeros((1, output_dim))
def forward(self, input):
self.layer_input = input
return np.matmul(input, self.weight) + self.bias
def backward(self, accum_gradient, lr):
# accum_gradient是loss对 layer_output的gradient, 形状相同
grad_weight = np.matmul(self.layer_input.T, accum_gradient) # (input_dim, output_dim)
grad_bias = np.sum(accum_gradient, axis=0, keepdims=True)
grad_input = np.matmul(accum_gradient, self.weight.T) # (1, output_dim), weight更新前计算
self.weight -= lr * grad_weight
self.bias -= lr * grad_bias
return grad_input
MLP-torch
CNN-numpy
option1: native
import numpy as np
class Conv2D:
def __init__(self, kernel_size, input_channels, filters, padding, strides):
self.kernel_size = kernel_size
self.input_channels = input_channels
self.filters = filters
self.padding_size = padding
self.strides = strides
# 标准正态分布的随机数,注意其形状
self.kernels = np.random.randn(filters, input_channels, kernel_size, kernel_size)
self.bias = np.zeros(filters)
def forward(self, input):
input_channels, h_in, w_in = input.shape
assert input_channels == self.input_channels, "Input channels do not match the kernel input channels."
# 填充输入数据。第一个轴(通常是通道轴)上不进行填充,第二个轴和第三个轴(通常是高度和宽度轴)上在开始和结束位置都填充padding个值
input_pad = np.pad(input, ((0, 0), (self.padding_size, self.padding_size), (self.padding_size, self.padding_size)))
self.layer_input = input_pad
# h_out = (h_in + 2 * pad - k) // stride + 1
h_out = (h_in + 2 * self.padding_size - self.kernel_size) // self.strides + 1
w_out = (w_in + 2 * self.padding_size - self.kernel_size) // self.strides + 1
output = np.zeros((self.filters, h_out, w_out))
for f in range(self.filters):
for i in range(h_out):
for j in range(w_out):
i_start = i * self.strides
j_start = j * self.strides
im_region = input_pad[:, i_start:(i_start + self.kernel_size), j_start:(j_start + self.kernel_size)]
output[f, i, j] = np.sum(im_region * self.kernels[f]) + self.bias[f]
return output
def backward(self, accum_gradient, lr):
# accum_gradient: the loss gradient for this layer's outputs
input_gradient = np.zeros_like(self.layer_input)
kernel_gradient = np.zeros_like(self.kernels)
bias_gradient = np.zeros_like(self.bias)
_, h_out, w_out = accum_gradient.shape
for f in range(self.filters):
for i in range(h_out):
for j in range(w_out):
i_start = i * self.strides
j_start = j * self.strides
im_region = self.layer_input[:, i_start:(i_start + self.kernel_size), j_start:(j_start + self.kernel_size)]
kernel_gradient[f] += accum_gradient[f, i, j] * im_region
input_gradient[:, i_start:i_start + self.kernel_size, j_start:j_start + self.kernel_size] += (
accum_gradient[f, i, j] * self.kernels[f]
)
bias_gradient[f] += np.sum(accum_gradient[f])
self.kernels -= kernel_gradient * lr
self.bias -= lr * bias_gradient
if self.padding_size > 0:
input_gradient = input_gradient[:, self.padding_size:-self.padding_size, self.padding_size:-self.padding_size]
return input_gradient
option2: 转化为一个大矩阵运算, 加快训练速度
import numpy as np
def image2col():
return
def col2image():
return
def conv2D(image, kernel, padding=0, strides=1):
# Cross Correlation
kernel = np.flipud(np.fliplr(kernel))
# Gather Shapes of Kernel + Image + Padding
xKernShape = kernel.shape[0]
yKernShape = kernel.shape[1]
xImgShape = image.shape[0]
yImgShape = image.shape[1]
# Shape of Output Convolution
xOutput = int(((xImgShape - xKernShape + 2 * padding) / strides) + 1)
yOutput = int(((yImgShape - yKernShape + 2 * padding) / strides) + 1)
output = np.zeros((xOutput, yOutput))
# Apply Equal Padding to All Sides
if padding != 0:
imagePadded = np.zeros((image.shape[0] + padding*2, image.shape[1] + padding*2))
imagePadded[int(padding):int(-1 * padding), int(padding):int(-1 * padding)] = image
else:
imagePadded = image
# Iterate through image
for y in range(image.shape[1]):
for x in range(image.shape[0]):
output[x, y] = (kernel * imagePadded[x: x + xKernShape, y: y + yKernShape]).sum()
return output
CNN-torch
# https://github.com/openai/gpt-2/blob/master/src/model.py
import tensorflow as tf
def conv1d(x, scope, nf, *, w_init_stdev=0.02):
with tf.variable_scope(scope):
*start, nx = shape_list(x)
w = tf.get_variable('w', [1, nx, nf], initializer=tf.random_normal_initializer(stddev=w_init_stdev))
b = tf.get_variable('b', [nf], initializer=tf.constant_initializer(0))
c = tf.reshape(tf.matmul(tf.reshape(x, [-1, nx]), tf.reshape(w, [-1, nf]))+b, start+[nf])
return c
LSTM-numpy
LSTM-torch
Attention-numpy
Attention-torch
# https://nlp.seas.harvard.edu/annotated-transformer/
import torch
import torch.nn as nn
def scaled_dot_attention(q, k, v):
d_k = k.size(-1)
score = torch.matmul(q, k.transpose(-2, -1))
score /= d_k ** 0.5
score = torch.softmax(score, dim=-1)
out = torch.matmul(score, v)
return out
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads):
super().__init__()
self.d_k = d_model // num_heads
self.num_heads = num_heads
self.q_proj = nn.Linear(d_model, d_model)
self.k_proj = nn.Linear(d_model, d_model)
self.v_proj = nn.Linear(d_model, d_model)
self.out_proj = nn.Linear(d_model, d_model)
def forward(self, q, k, v, mask=None):
q_state = self.q_proj(q)
k_state = self.k_proj(k)
v_state = self.v_proj(v)
batch_size = q_state.size(0)
# view只适合对满足连续性条件(contiguous)的tensor
q_state = q_state.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
k_state = k_state.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
v_state = v_state.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
atten_output = scaled_dot_attention(q_state, k_state, v_state)
atten_output = atten_output.transpose(1, 2).contiguous() # view只前transpose或permute, 需要continuous
atten_output = atten_output.view(batch_size, -1, self.d_k * self.num_heads)
out = self.out_proj(atten_output)
return out
Dropout
BatchNorm
Activation
4. 领域-NLP
n-gram
# https://web.stanford.edu/~jurafsky/slp3/3.pdf
#
tfidf
# term-frequency: w represents a word, d means the document
# tf(w, d) = count(w, d) / total(d)
def compute_term_frequency(text: str, vocabularies: dict[str, int]) -> dict:
"""
calculate term frequency: 每个document中,一个词出现次数越多越重要
Args:
text (str): input text
vocabularies (dict[str, int]): vocabulary list from corpus
Returns:
dict: a dict containing the tf for each word
"""
words = text.split(' ')
word_count_norm = copy.deepcopy(vocabularies)
for word in words:
if word in word_count_norm.keys():
word_count_norm[word] += 1
else:
# considering unknown words in testing
word_count_norm["[UNK]"] += 1
for word, count in word_count_norm.items():
word_count_norm[word] = count / len(words)
return word_count_norm # 结果按vocab排序, 一个document可以根据tf转化为一个向量
# Inverse Document Frequency: N is the total number of documents, while df(w) means the document frequency
# idf(w) = log(N / df(w))
def compute_inverse_document_frequency(documents: List[str]) -> dict[str, float]:
"""
calculate the idf: 一个单词出现在越多document中,证明这个单词越不重要
Args:
documents (List[str]): a list of documents
Returns:
dict[str, float]: idf
"""
N = len(documents)
idf_dict = {}
for document in documents:
for word in set(document.split(' ')):
# Count how many documents appear this word
idf_dict[word] = idf_dict.get(word, 0) + 1
# Apply logarithmic function to the counts
idf_dict = {word: math.log(N / count) for word, count in idf_dict.items()}
# Consider unknown words in the testing
idf_dict['[UNK]'] = math.log(N + 1 / 1)
return idf_dict
def calculate_feature_vector(term_frequency: dict[str, int], inverse_document_frequency: dict[str, int]):
tfidf = dict()
for word, tf_word in term_frequency.items():
tfidf[word] = tf_word * inverse_document_frequency[word]
tfidf_vector = np.array([tfidf_word for _, tfidf_word in tfidf.items()])
return tfidf_vector
word2vec
Bayes文本分类器
kv-cache
bert-summary
tokenizer: BPE贪心
# subword词表,之后编码和解码
import re, collections
def get_stats(vocab):
pairs = collections.defaultdict(int)
for word, freq in vocab.items():
symbols = word.split() # 使用空格进行区分
for i in range(len(symbols)-1): # 连续字符组成一个字符对
pairs[symbols[i], symbols[i+1]] += freq # 频率
return pairs
def merge_vocab(pair, v_in):
v_out = {}
bigram = re.escape(' '.join(pair))
p = re.compile(r'(?<!\S)' + bigram + r'(?!\S)')
for word in v_in:
w_out = p.sub(''.join(pair), word) # 合并字符对
v_out[w_out] = v_in[word]
return v_out
# 设置待编码文本,key为字符,value为频率
# 每个单词后面增加</w>, 可以知道每个单词的结束位置, 而不会统计到下一个单词中
words = text.strip().split(" ")
word_freq_dict = collections.defaultdict(int)
for word in words:
word_freq_dict[' '.join(word) + ' </w>'] += 1
vocab = {'l o w</w>' : 5, 'l o w e s t</w>' : 2, 'n e w e r</w>':6, 'w i d e r</w>':3}
num_merges = 10 # 迭代次数
for i in range(num_merges):
pairs = get_stats(vocab) # 字符字典
best = max(pairs, key=pairs.get) # 找到频率最高的字符对
vocab = merge_vocab(best, vocab)
print(best)
positional encoding
import numpy as np
import torch
def get_positional_embedding(d_model, max_seq_len):
positional_embedding = torch.tensor([
[pos / np.power(10000, 2.0 * (i // 2) / d_model) for i in range(d_model)] # i 的取值为 [0, d_model)
for pos in range(max_seq_len)] # pos 的取值为 [0, max_seq_len)
)
positional_embedding[:, 0::2] = torch.sin(positional_embedding[:, 0::2])
positional_embedding[:, 1::2] = torch.cos(positional_embedding[:, 1::2])
return positional_embedding
beam search
# https://zhuanlan.zhihu.com/p/114669778
top_k LLM token decoding
import numpy as np
def top_k_sampling(logits, k=5):
"""Perform top-k sampling on the logits array."""
# Calculate the probabilities from logits
probabilities = np.exp(logits) / np.sum(np.exp(logits))
top_k_indices = np.argsort(probabilities)[-k:]
# Normalize probabilities of top-k tokens
top_k_probs = probabilities[top_k_indices] / np.sum(probabilities[top_k_indices])
# Sample a token from the top-k tokens based on their probabilities
sampled_token = np.random.choice(top_k_indices, p=top_k_probs)
return sampled_token
logits = np.array([1.2, 0.8, 0.5, 2.0, 1.5])
sampled_token = top_k_sampling(logits, k=3)
print("Sampled token index:", sampled_token)
5. 领域-CV
6. pipeline
XGBoost
torch
PySpark
7. 特征工程
前处理-转换
数量特征-pandas
数量特征-SQL
类别特征-pandas
类别特征-SQL
Last updated