时间序列数据建模流程范例

前言

最开始在学习神经网络,PyTorch 的时候,懂的都还不多,虽然也知道 RNN, CNN 这些网络的原理,但真正自己实现起来又是另一回事,代码往往也都是从网上 copy 过来然后再自己魔改的,这也就导致了一系列的问题,代码格式不统一,没弄懂具体实现细节等等。当然,凭这些 copy 过来的代码让模型运行起来还是不难的,你只需要知晓一定的原理。显而易见,这些时间往往最后都是要“还”的。

写这篇文章主要还是记录一下整体的思路,并对网络训练的整个过程进行标准化。当然,这只是我自己在写网络时的总结而已,未必适合每一个人的风格,希望能对你有所启发。

还是从一个例子开始,问题的背景很简单,一维时序数据的预测问题

假如你对 RNN、LSTM 的原理并不了解同样不影响阅读,说白了,这里探讨的并不是怎么建立网络,重要的是整体的流程。

你也可以 点击这里 了解 RNN、LSTM 的工作原理

准备数据

首先就是准备数据,这部分往往是最花费时间,最会发生问题的地方。这里说的准备数据并不只是丢出来一个数据库或是 csv 文件,它涉及到数据获取,数据清洗,数据标准化,创建数据集等过程,让我们一个一个来讨论。

数据获取

数据获取部分没什么好讲的,根据你的数据来源,可能是格式化的,也可能的非格式化的。

你可以 点击这里 获取本文所使用的数据。

这里我使用的数据是从 2020/08/01 到 2020/08/31 的小时数据,如下图所示。

数据清洗

视你的需求以及原始数据来说,数据清洗可以很简单,也可以很复杂。简单来说,去除空值,去除重复值,去除连续常值,正态分布的 3σ 去除异常值等等,根据你想要的目标,选择不同的数据清洗方式。

下面是一个简单的标准化函数,使用 MinMaxScaler 将数据归一化为 0 - 1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def data_normalized(data):
'''标准化数据

Args:
data(pd.DataFrame): 待标准化数据

Returns:
norm_data(tensor): 标准化后的数据
scaler(MinMaxScaler): 标准化器
'''
__data = np.array(data)
# 将小于 0 的值置为 0
__data[__data < 0] = 0
# 标准化数据
scaler = MinMaxScaler()
norm_data = scaler.fit_transform(__data.reshape(-1, 1))
norm_data = torch.tensor(norm_data, dtype=torch.float32)
return norm_data, scaler

为了简便起见,这里我给出的数据是已经经过了差分,重采样等步骤之后的数据。

1
2
3
4
5
6
7
8
9
data = pd.read_csv('TIME_SEQ_DATA.csv')
data['CreateDate'] = pd.to_datetime(data['CreateDate'])

data.dropna(inplace=True)
data.drop_duplicates(inplace=True)
data.sort_values(by=['CreateDate'], inplace=True)
data.reset_index(drop=True, inplace=True)

norm_data, scaler = data_normalized(data['Value'])

上面的处理都是常规操作,还是那句话,根据你的实际需求。

至此,我们完成了简单的数据清洗,获得了标准化的数据。

创建数据集

创建数据集同样也有很多方法,手动对数据划分,或是利用 PyTorch 定义好的 Dataset 进行重写。

网上有许多手动划分的例子,大多数都是类似下面这样的。

1
2
3
4
5
6
7
8
9
10
11
def create_dataset(data, look_back):
dataset_x, dataset_y = [], []
for i in range(len(data) - look_back):
dataset_x.append(data[i:(i + look_back)])
dataset_y.append(data[i + look_back])
return np.array(dataset_x), np.array(dataset_y)

...
# 划分训练集和测试集
train_size = int(len(dataset_x) * 0.7)
...

这里我使用 DatasetDataLoader 这两个工具类来构建数据

  • Dataset 定义了数据集的内容,它相当于一个类似列表的数据结构,具有确定的长度,能够用索引获取数据集中的元素。
  • DataLoader 定义了按 batch 加载数据集的方法,能够控制 batch 的大小,batch 中元素的采样方法,以及将 batch 结果整理成模型所需输入形式的方法,并且能够使用多进程读取数据。

根据 Tensor 创建数据集

现在让我们暂时抛开背景问题,下面这个例子很好的说明了创建鸢尾花数据集的过程:

  1. 使用 TensorDataset,将 data 和 target,也就是 x 和 y 分别传入,得到了 TensorDataset 类型的数据,你可以使用 for 循环查看里面的具体形式。
  2. 使用 random_split,将整个数据集划分为训练集和预测集,得到 Subset,你可以加上 torch.manual_seed(0) 来指定随机种子。
  3. 使用 DataLoader 加载数据集。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from sklearn import datasets

# 根据Tensor创建数据集
iris = datasets.load_iris()
ds_iris = TensorDataset(torch.tensor(iris.data), torch.tensor(iris.target))

# 分割成训练集和预测集
n_train = int(len(ds_iris) * 0.8)
n_valid = len(ds_iris) - n_train
ds_train, ds_valid = random_split(ds_iris, [n_train, n_valid])

print(type(ds_iris))
# <class 'torch.utils.data.dataset.TensorDataset'>
print(type(ds_train))
# <class 'torch.utils.data.dataset.Subset'>

dl_train = DataLoader(ds_train, batch_size = 8)
dl_valid = DataLoader(ds_valid, batch_size = 8)

for features, labels in dl_train:
print(features, labels)
break

创建自定义数据集

在上面的例子中,我们使用 TensorDataset 直接创建数据集。当你完成了对 x 和 y 的划分之后,对于划分简单的数据可以直接使用这样的方法。对于一些要求复杂的数据集,更优秀的方法是自定义。

我们只需实现 Dataset__len__ 方法和 __getitem__ 方法,就可以轻松构建自己的数据集。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 定义数据集
class myDataset(Dataset):
def __init__(self, data, look_back) -> None:
super().__init__()
self.data = data
self.look_back = look_back

def __len__(self) -> int:
return len(self.data) - self.look_back

def __getitem__(self, index):
feature = self.data[index:index + self.look_back]
label = self.data[index + self.look_back]
return feature, label

这里,我们通过 look_back 个数据点,预测下一个数据点。

具体来说,我们对 __len__ 方法和 __getitem__ 方法进行了重写,具体的代码并不复杂。接下来,我们就可以使用 myDataset 达到和上面提到的 create_dataset 同样的效果。

1
2
3
4
5
6
7
8
9
10
# 生成数据集
ds_data = myDataset(norm_data.view(-1).to(DEVICE), look_back=LOOK_BACK)

# 将数据集分为训练集和测试集
n_train = int(len(ds_data) * 0.8)
n_test = len(ds_data) - n_train
ds_train, ds_test = random_split(ds_data, [n_train, n_test])

dl_tarin = DataLoader(ds_train, batch_size=BATCH_SIZE, shuffle=True, drop_last=True)
dl_test = DataLoader(ds_test, batch_size=BATCH_SIZE, shuffle=False, drop_last=False)

类似的,参考创建鸢尾花数据集的方法,同样将数据集分为训练集和测试集,并使用 DataLoader 加载。

使用 DataLoader 加载数据集

现在让我们回过头来看看 DataLoader 的具体使用。

DataLoader 能够控制 batch 的大小,batch 中元素的采样方法,以及将 batch 结果整理成模型所需输入形式的方法,并且能够使用多进程读取数据。

DataLoader 的函数签名如下:

1
2
3
4
5
6
7
8
DataLoader(
dataset, # 数据集
batch_size=1, # 批次大小
shuffle=False, # 是否乱序
num_workers=0, # 使用多进程读取数据,设置的进程数。
drop_last=False, # 是否丢弃最后一个样本数量不足batch_size批次数据。
...
)

一般情况下,我们仅仅会配置 dataset, batch_size, shuffle, num_workers, drop_last 这五个参数,其他参数使用默认值即可。

关于 shape 的一些问题

准备数据的过程往往是复杂的,后面模型出了问题,或许就是数据处理上出了问题。上面我们着重将了如何创建数据集,但还有隐含在其中的另一个重要的点没有提及,也就是 size,或者说 shape。

最开始学习的时候,相信许多人都有疑问,为什么这里要 reshape(),为什么那里要 view(-1),为什么这里要 flatten(),为什么那里要 unsqueeze(0)

问题的根本原因就是,没有弄清楚经过某个处理之后你的数据的 shape 的变化,再或许就是没搞清上面这些函数的用法。

1
2
3
# LSTM input shape: (seq_len, batch_size, input_size)
output, hidden = self.lstm(input)
# output shape: (seq_len, batch_size, output_size)

另外就是 layer 往往需要特定的输入维度,以 LSTM 为例,它需要传入的是三维参数:(seq_len, batch_size, input_size),out 的输出维度 (seq_len, batch_size, output_size),在我看来,时刻注意 shape 是一个好的习惯,特别是当数据经过那些你不熟悉的函数后。

定义模型

好了,终于到了定义网络的时候了,或许这部分是最简单的。

一般来说,我们使用 nn.Sequential 按层顺序构建模型,或是继承 nn.Module 基类构建自定义模型。

感觉就像这样,你只需要把它当做一个复合的层:

1
2
3
4
5
6
self.my_seq = nn.Sequential(nn.Linear(input_size, 24),
nn.Dropout(0.5),
nn.ReLU(True),
nn.Linear(24, 10),
nn.Dropout(0.5),
nn.ReLU(True))

这里,我只是简单搭建了一个 LSTM 网络,就像所有其他网络一样,结构并不复杂。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 定义模型
class LSTM(nn.Module):
'''
Args:
input_size: feature size
hidden_size: number of hidden units
output_size: number of output
num_layers: layers of LSTM to stack
'''
def __init__(self, input_size, hidden_size, output_size=1, num_layers=3):
super(LSTM, self).__init__()
self.lstm = nn.LSTM(input_size, hidden_size, num_layers)
self.linear = nn.Linear(hidden_size, output_size)

def forward(self, input):
# LSTM input shape: (seq_len, batch, input_size)
output, hidden = self.lstm(input)
output = self.linear(output[-1])
return output

训练模型

如何训练网络因人而异,但大致都是类似的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def train(dl_train):
model = LSTM(LOOK_BACK, 64).to(DEVICE)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
train_loss = []
# 开始训练
for e in range(EPOCH):
__loss = 0
for feature, label in dl_train:
# feature: torch.Size([10, 24]) (BATCH_SIZE, LOOK_BACK)
# label: torch.Size([10]) (BATCH_SIZE, )
# out: torch.Size([10, 1]) (BATCH_SIZE, 1)
out = model(feature.unsqueeze(0))
loss = criterion(out, label.unsqueeze(1))
# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()
__loss += loss.item()
train_loss.append(__loss)
if (e + 1) % 10 == 0: # 每 10 次输出结果
print('Epoch: {}, Loss: {}'.format(e + 1, __loss / len(dl_train)))

# 保存模型参数
# torch.save(model.state_dict(), MODEL_DIR)
return model, train_loss
1
model, train_loss = train(dl_tarin)

可视化损失函数在训练集上的迭代情况。

评估模型

在这里,我们直接使用之前创建的测试集进行训练,并计算根均方误差。

1
2
3
4
5
6
7
8
9
10
11
12
13
model = model.eval()
pred, actual = [], []
for feature, label in dl_test:
# feature: torch.Size([10, 24]) (BATCH_SIZE, LOOK_BACK)
# label: torch.Size([10]) (BATCH_SIZE, )
# out: torch.Size([10, 1]) (BATCH_SIZE, 1)
out = model(feature.unsqueeze(0))
pred += out.view(-1).data.cpu().tolist()
actual += label.view(-1).data.cpu().tolist()
pred, actual = np.array(pred), np.array(actual)

rmse = np.sqrt(mean_squared_error(actual.reshape(-1), pred.reshape(-1)))
print("根均方误差(RMSE):" + str(rmse))

根均方误差(RMSE):0.12173503830068468

小结

感谢你阅读至此,本文只是简单介绍了一些自己的经验,梳理了一下建模的简单流程。总的来说,我希望我的代码是模块化,标准化的,相信你也如此,希望本文能对你有所帮助。

你可以 点击这里 得到完整代码。

参考资料