Blog » General » How to Work with Autoencoders [Case Study Guide]

How to Work with Autoencoders [Case Study Guide]

Autoencoders are a class of neural networks that are used in unsupervised learning tasks. They have two neural networks components: Encoder and Decoder. Both components have essentially the same configurations, which means that the shape of the input will be similar to the shape of the output, and also the input will be the same as the output.

What’s the use of an architecture that copies the input to the output? There’s no use at all. Let me explain:

In order to make sense of these networks, they have something called an information bottleneck in between. The number of neurons in this bottleneck region is much smaller compared to both encoder and decoder. This forces the network to reduce the information such that the noise is reduced, and they could only approximate the original data rather than copy it end-to-end. 

These algorithms are trained in an attempt to:

  1. Learn representations or patterns from the given set of data in the bottleneck.
  2. Generate a new set of data from the input. 

Traditionally, autoencoders were used for dimensionality reduction, where the high-dimensional data can be represented in the low dimensional space, something like PCA. But PCAs were limited by their linearity and couldn’t represent data with a high-dimensional non-linear manifold into a low dimensional space.

Autoencoders can do that thanks to neural networks. This is why the autoencoder and its variants are used in a lot of applications including high-energy and quantum physics, molecular biology, medical image segmentation, data compression, and more. 

Mathematical intuition behind autoencoders

A generic way to define an autoencoder using a mathematical notional will be f(x) = h, where x is the input data and h is the latent variables in the information bottleneck. This formula denotes the encoder part of the network. 

The decoder takes the latent variables from the information bottleneck, and then maps them into some output which can be denoted as g(h) = x`. The decoder is usually the mirror opposite of the encoder. 

Let’s explore the terms “information bottleneck” and “latent variables” a bit more, because they’re quite important. 

Information Bottleneck (IB) was introduced in 1999 with a hypothesis that it can extract vital information or representation by compressing the amount of information that can traverse through the network. This information is known as latent variables or latent representations

In a nutshell, latent variables are random variables that can’t be observed directly but are extracted from the distribution. These variables are very fundamental and they give us abstract knowledge of the topology and the distribution of the data. The latent variables, here denoted as h, can differ based on the variant of the autoencoder you’re using. 

The whole autoencoder can be described as:

Where both f and g are nonlinear functions. 

Types of autoencoders

Autoencoders have evolved a lot since their creation in 1987, and their applications have made them more task-specific. Here, we’ll discuss different autoencoders and their workings.

Undercomplete autoencoder

Undercomplete autoencoders aim to map input x to output x` by limiting the capacity of the model as much as possible, minimizing the amount of information that flows through the network. 

Undercomplete autoencoders learn features by minimizing the same loss function:

Where L is the loss function penalizing g(f(x)) from diverging from the original input x. L can be a mean squared error or even a mean absolute error. 

Autoencoders are powerful because their capacity is reduced—the number of nodes in the hidden layers is reduced along with the number of nodes in the information bottleneck. It is that way because even if the bottleneck consists of only one dimension, it’s still possible for the autoencoder to copy the input to the output, without extracting any information, when the capacity of the model is high. 

Our aim is always to extract representations and then reconstruct the input from those representations. In order to make an autoencoder that can learn and extract representations and also reconstruct the input, we need to:

  1. Increase model capacity,
  2. Increase information bottleneck capacity,
  3. Regularize the model so that it supports the above two points. 

Pros:

Since the Undercomplete autoencoders maximize the probability distribution, they do not need a regularization function. 

Cons:

Undercomplete autoencoders are not versatile and they tend to overfit. One of the reasons why it overfits is because it is a simple model with a limited capacity which does not allow it to be flexible.

Regularized autoencoders

Regularised autoencoders are designed based on data complexity, and they address the problems of Undercomplete autoencoders. The encoder and decoder, along with the information bottleneck, can have a higher capacity. This makes them more flexible and powerful.

Regularised autoencoders use a loss function for properties like:

  1. The ability to reconstruct the output from the input through approximation.
  2. The sparsity of representation.
  3. The smallness of the derivative of the representation.
  4. Robustness to noise, outliers, or missing inputs.

Sparse autoencoder

Sparse autoencoders are regularized autoencoders with a penalty on the hidden layer along with the reconstruction loss: 

Where h represents hidden layers.

This approach of penalizing the hidden layers means that the autoencoder can have a larger capacity while still constraining the network to learn representations. The network is constrained by activating only a certain number of neurons in the hidden layer. 

It’s important to note that neuron activation depends on the input data, so they’re data-dependent, which means that the distribution of input data results in the activation of neurons in the hidden layers. 

From the image above you can see how sparse autoencoders activate different neurons in the hidden layers to learn the representation. 

There are two ways in which the sparsity can be implemented on a given network:

  1. L1 regularization
  2. KL-Divergence

In L1 regularization, we add a lambda term that penalizes the absolute value of activation a in layer h to our loss function.

This lambda term helps us to regularize the whole model, as the regularization term depends only on the lambda.

Kullback-Leibler Divergence or KL-Divergence, on the other hand, calculates the difference between two probability distributions. KL-divergence is excellent at measuring how much data is lost while performing an approximation. 

KL-divergence emerges from information theory where we use entropy to calculate the amount of randomness in a piece of information. The higher the randomness or entropy, the more difficult it is to interpret data. 

Another way to define entropy is the minimum number of information required to encode. Essentially, if the randomness is high more information is required and if the randomness is low, less information is required. 

Information entropy is denoted as:

Where x is the information required

The problem with information entropy is that we don’t get the optimal information required to achieve that encoding. 

KL-divergence on the other hand modifies information entropy by considering the approximating distribution. 

We can describe KL-divergence as:

From the above formula, you can see an added approximation term. With KL-divergence being calculated to measure the difference in the distribution, we can now add this regularization term with the loss function. KL-divergence is also known as relative entropy. 

Pros:

In sparse autoencoders, overfitting is prevented by applying a sparsity penalty. The sparsity penalty is applied on both the hidden layers and reconstruction error. This allows the model to be more versatile by increasing the capacity and learning complex topologies. 

Cons:

It is important that the nodes are data-dependent since the input vectors are responsible for the activation of different nodes that yields results during training. Hence, any slight statistical change in the test data can yield different results. 

Contractive Autoencoder

A contractive autoencoder learns representations that are robust to a slight variation of the input data. The idea behind a contractive autoencoder is to map a finite distribution of input to a  smaller distribution of output. This idea essentially trains an autoencoder to learn representation even if the neighboring points change slightly. 

Like the previous types we discussed, this also adds a penalizing term with the loss criteria. 

Let’s explore the second part of the equation. It’s the squared Frobenius ||A|| norm of the Jacobian matrix J. Frobenius can be considered as the L2 norm of a matrix, and the Jacobian matrix represents the first-order partial derivatives of the vector-valued function, i.e. the vectors of the latent representation. 

The term above describes the gradient field of the latent representation h with respect to input x. This term penalizes large derivatives of the jacobian matrix or gradient field of the latent representation h. Any small change in the input that leads to a large change or variation in the representational space is penalized. 

Pros:

Contractive autoencoders are a good choice over sparse autoencoders to learn good representation since they are robust to slight variations and the nodes are not data-dependent. 

Cons:

Contractive autoencoders suffer from a major drawback in its reconstruction error during the encoding and decoding process of the input vectors. This leads to neglecting finer details worth considering for reconstruction. 

Denoising Autoencoders

So far we’ve seen how we can improve an autoencoder by penalizing it for being different from the original input x. The approach that we’ll see now is the opposite. We design our loss function such that the model trains to be less similar to the original output. 

In denoising autoencoders, we pass input that has noise added. The goal here is to train an autoencoder to remove those noises and yield an output that’s noise-free. It’s assumed that the higher-level representations are relatively stable and can be easily extracted.

Source

To achieve this, we need autoencoders that minimize the following criteria:

Instead of minimizing the traditional criteria: 

The denoising autoencoder is trained to learn the representations and not simply memorize and copy the input to the output, because the input and output aren’t the same anymore. 

Pros:

Denoising autoencoders is good for learning the latent representation in corrupted data while creating a robust representation of the same, allowing the model to recover true features.

Cons

In order to train a denoising autoencoder, it is important to perform preliminary stochastic mapping to corrupt the data and then use it as input. This does not allow the model to create a mapping because the input and output are different.

Variational autoencoders

Variational autoencoders, popularly known as VAE, are a more advanced variant of autoencoders. Although being similar in basic architecture, they possess a completely different mathematical formulation. 

One of the biggest changes one can observe is in the way the latent variables are calculated. VAE uses a probabilistic approach to find latent variables or representations. This property makes it very powerful and compares to the autoencoders that we saw previously. 

The information bottleneck of VAE consists of two components. One component represents the mean of input distribution while the other represents the standard deviation of the distribution. 

Intuitively, the mean controls where the encoding of an input should be centered around, while the standard deviation controls the “area”; how much from the mean the encoding can vary.  We also inject a Gaussian distribution to the latent space which allows VAE to randomly sample noise and then model it using the mean and standard deviation.  

This allows VAE to have a probabilistic approach to represent each latent attribute for a given input. 

The encoder of the VAE (also known as the approximate inference network) tries to infer the properties of latent variables z. This can be described as:

The decoder (known as the generator) takes samples from the latent and generates output. The decoder can be described as: 

VAE can be described as:

The first part of the equation describes the reconstruction likelihood which tries to maximize the reconstruction likelihood, and the second is the regularization term. 

The fact that latent variables in the VAE are continuous makes them a powerful generative model. Also, the simultaneous training of a parametric encoder with a generator encourages the model to learn predictable coordinate systems, making them an excellent choice of manifold learning. 

Pros:

VAE gives us control over how we would like to model the distribution of latent variables over other models which can be later used to generate new data. 

Cons:

The generated image is blurry because of the injected Gaussian distribution in the latent space. 

Applications of autoencoders

Autoencoders have been widely used for dimensionality reduction and representation learning. When compared with PCA, autoencoders yielded less reconstruction error. It’s also shown that lower dimension manifolds extracted from autoencoders enhanced the performance on many tasks, for example:

  • Classification
  • Anomaly detection 
  • Data denoising 
  • Image inpainting
  • Information retrieval

Understanding autoencoders in Pytorch with MNIST [tutorial]

Now, let’s understand how we code an autoencoder with PyTorch and do a bit of visualization to explore latent space and interpret the model. 

For a vanilla autoencoder, the following library will be enough. We’ll also define a variable for the device. It will automatically detect whether the colab notebook has a cpu or a gpu. 

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
from torchvision import datasets, transforms
import os

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

The images that are downloaded from the Pytorch library are in PIL format, and should be converted into the tensor format.  transforms.ToTensor() converts PIL to the tensor format required by Pytorch.

transform = transforms.Compose([transforms.ToTensor(),
                               transforms.Normalize((0.5, ), (0.5, ))
                              ])

Download and load the training data.

trainset = datasets.MNIST('MNIST_data/', download = True, train = True, transform = transform)
testset = datasets.MNIST('MNIST_data/', download = True, train = False, transform = transform)

batch_size  = 128
trainloader = torch.utils.data.DataLoader(trainset, batch_size = batch_size, shuffle = True)
testloader  = torch.utils.data.DataLoader(testset,  batch_size = batch_size, shuffle = True)

Let’s examine a sample.

dataiter = iter(trainloader)
images, labels = dataiter.next()
plt.figure()
plt.imshow(images[0].numpy().squeeze(), cmap = 'gray')

plt.colorbar(fraction=0.046, pad=0.04)
Autoencoders pytorch mnist

We’ll define a learner function that will:

  1. Train the model,
  2. Compute loss for each mini-batch and update the parameters,
  3. Show visual progress after each epoch.

Also, pay attention to the loss function. We’ll define mean squared error loss to calculate the loss and use adam optimizer for the optimization process. Adam is one of the most popular optimizers, used in most deep learning tasks.

class Learner():
 def __init__(self, train_dl, valid_dl, model, loss_func = nn.MSELoss()):
   self.train_dl, self.valid_dl = train_dl, valid_dl
   self.model        = model
   self.loss_func    = loss_func
   self.train_losses = []
   self.valid_losses = []

 # method to compute loss for each mini-batch
 def update(self,x, optm, vae):
   # forward pass of the model
  

   # compute loss
   # loss = self.loss_func(y_hat,x.squeeze())
   y_hat, z = self.model(x)
   loss = self.loss_func(y_hat,x)

  
   # backward pass
   loss.backward()
   optm.step() # update parameters
   optm.zero_grad() # set parameters grads to zero
  
   return loss.item()


 # method to show visual progress after each epoch
 def show_visual_progress(self, rows, title, epoch, flatten, vae):

   image_title = f'{title}{epoch}'
   plt.title(image_title)
      
   iter(self.valid_dl)
   image_rows = []
  
   for idx, (image, label) in enumerate(self.valid_dl):
    
     #show only 5 rows of images
     if rows == idx:
         break
    
     image = image.to(device)
     if flatten:
       image = image.view(image.size(0), 28*28)

     # evaluate the images
     images, z = self.model(image)
     images = images.detach().cpu().numpy().reshape(image.size(0),28,28)
     self.z = z.detach().cpu().numpy()
     



image_idxs = [list(label.numpy()).index(x) for x in range(10)]
     combined_images = np.concatenate([images[x].reshape(28,28) for x in image_idxs],1)
     image_rows.append(combined_images)
  
   plt.imshow(np.concatenate(image_rows))
   plt.show()

 def fit(self, epochs = 1, lr = 1e-4, flatten = False, vae = False):
   opt   = torch.optim.Adam(self.model.parameters(), lr = lr) # define optimizer
   for epoch in range(epochs):
     for image, label in self.train_dl:
       image = image.to(device)
       if flatten:
         image = image.view(image.size(0), 28*28)
       self.train_losses.append(self.update(image, opt, vae=vae))
    
     for image, label in self.valid_dl:
       image = image.to(device)
       if flatten:
         image = image.view(image.size(0), 28*28)
       self.valid_losses.append(self.update(image, opt, vae= vae))
     print("Epoch number {} finished.".format(epoch))
     self.show_visual_progress(rows=5, title='AutoEncoder Learning State at Epoch: ', epoch = epoch, flatten = flatten, vae= vae)

We define a very minimalist model where latent size can be changed. This will help us to see and understand how the model performs with changing latent size. 

class AutoEncoder(nn.Module):

 def __init__(self, latent_size = 3):
   super(AutoEncoder, self).__init__()
   self.encoder = nn.Sequential(
       nn.Linear(784, 512),
       nn.ReLU(),
       nn.Linear(512, 128),
       nn.ReLU(),
       nn.Linear(128, latent_size)
   )

   self.decoder = nn.Sequential(
       nn.Linear(latent_size, 128),
       nn.ReLU(),
       nn.Linear(128, 512),
       nn.ReLU(),
       nn.Linear(512, 784)
   )

 def forward(self, x):
   z = self.encoder(x)
   o = self.decoder(z)
   return o, z

Initializing the training. 

model = AutoEncoder().to(device)
learn = Learner(trainloader, testloader, model)
learn.fit(epochs = 10, lr = 1e-4, flatten = True)

# plot losses
plt.subplot(121)
plt.plot(learn.train_losses)
plt.subplot(122)
plt.plot(learn.valid_losses)

The image below is from the ninth epoch when the latent variable is fixed to 5. You’ll observe how nicely it can reconstruct the image. 

Autoencoders output
The image above shows the output yielded by the autoencoder after 9 epochs.

Case study 1: Image denoising with Denoising Autoencoders 

In the first case study, we’ll apply autoencoders to remove noise from the image. This is very useful in computer tomography (CT) scans where the image can be blurry, and it’s hard to interpret or train a segmentation model. 

Autoencoder in Pytorch with MNIST

We’ll use MNIST again, this time with Keras to download the dataset because Pytorch MNIST data is in the PIL format. Although you can work with that for the simplicity of this article, we’ll download data from Keras, because it will ensure that we download the data in the NumPy format so that we can add noise to it. 

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
from torchvision import datasets, transforms
import os

from keras.datasets import mnist
from torch.utils.data import DataLoader,Dataset

Adding noise to the dataset, we’ll add two types of noise:

  1. Gaussian
  2. Speckle
def add_noise(img,noise_type="gaussian"):
  row,col=28,28
 img=img.astype(np.float32)
  if noise_type=="gaussian":
   mean=0
   var=10
   sigma=var**.5
   noise=np.random.normal(-5.9,5.9,img.shape)
   noise=noise.reshape(row,col)
   img=img+noise
   return img

 if noise_type=="speckle":
   noise=np.random.randn(row,col)
   noise=noise.reshape(row,col)
   img=img+img*noise
   return img

noises=["gaussian","speckle"]
noise_ct=0
noise_id=0
traindata=np.zeros((60000,28,28))

for idx in (range(len(xtrain))):
  if noise_ct<(len(xtrain)/2):
   noise_ct+=1
   traindata[idx]=add_noise(xtrain[idx],noise_type=noises[noise_id])
  
 else:
   print("\n{} noise addition completed to images".format(noises[noise_id]))
   noise_id+=1
   noise_ct=0

print("\n{} noise addition completed to images".format(noises[noise_id]))

noise_ct=0
noise_id=0
testdata=np.zeros((10000,28,28))

for idx in (range(len(xtest))):
  if noise_ct<(len(xtest)/2):
   noise_ct+=1
   x=add_noise(xtest[idx],noise_type=noises[noise_id])
   testdata[idx]=x
  
 else:
   print("\n{} noise addition completed to images".format(noises[noise_id]))
   noise_id+=1
   noise_ct=0


print("\n{} noise addition completed to images".format(noises[noise_id]))

Visualizing the data. 

f, axes=plt.subplots(2,2)

#showing images with gaussian noise
axes[0,0].imshow(xtrain[0])
axes[0,0].set_title("Original Image")
axes[1,0].imshow(traindata[0])
axes[1,0].set_title("Noised Image")

#showing images with speckle noise
axes[0,1].imshow(xtrain[25000])
axes[0,1].set_title("Original Image")
axes[1,1].imshow(traindata[25000])
axes[1,1].set_title("Noised Image")
Denoising outoencoders case study

Once the data is preprocessed, we can: 

  1. Transform the data to tensor,
  2. Create a dataset that will contain both original data and noise data,

Use DataLoader function from Pytorch to create generator variables that can be used to train and model.

class noisedDataset(Dataset):
  def __init__(self,datasetnoised,datasetclean,labels,transform):
   self.noise=datasetnoised
   self.clean=datasetclean
   self.labels=labels
   self.transform=transform
  def __len__(self):
   return len(self.noise)
  def __getitem__(self,idx):
   xNoise=self.noise[idx]
   xClean=self.clean[idx]
   y=self.labels[idx]
  
   if self.transform != None:
     xNoise=self.transform(xNoise)
     xClean=self.transform(xClean)
    
  
   return (xNoise,xClean,y)



transform =transforms.Compose([
   transforms.ToTensor()
])

trainset=noisedDataset(traindata,xtrain,ytrain,transform)
testset=noisedDataset(testdata,xtest,ytest,transform)

batch_size=32
trainloader=DataLoader(trainset,batch_size=32,shuffle=True)
testloader=DataLoader(testset,batch_size=1,shuffle=True)

Defining a denoising model is similar to defining a vanilla autoencoder, the only thing that changes is the sigmoid function used to get the final output from the encoder. 

The sigmoid function will ensure that the final output is in the range of 0 to 1 because the clean data is from the range of 0 to 1. It makes sense that we transform the final signal to the same range. 

class denoising_model(nn.Module):
 def __init__(self):
   super(denoising_model,self).__init__()
   self.encoder=nn.Sequential(
                 nn.Linear(28*28,256),
                 nn.ReLU(True),
                 nn.Linear(256,128),
                 nn.ReLU(True),
                 nn.Linear(128,64),
                 nn.ReLU(True)
      
                 )
  
   self.decoder=nn.Sequential(
                 nn.Linear(64,128),
                 nn.ReLU(True),
                 nn.Linear(128,256),
                 nn.ReLU(True),
                 nn.Linear(256,28*28),
                 nn.Sigmoid(),
                 )
  
 def forward(self,x):
   z=self.encoder(x)
  sigmoid =self.decoder(z)
  
   return sigmoid

We’ll initialize the same loss and optimizer functions for this task as well. 

model=denoising_model().to(device)
criterion=nn.MSELoss()
optimizer=torch.optim.Adam(model.parameters(),lr=0.01)

Initialize the training for 120 epochs. 

epochs=120
l=len(trainloader)
losslist=list()
epochloss=0
running_loss=0
for epoch in range(epochs):
  print("Entering Epoch: ",epoch)
 for (idx), (dirty,clean,label) in enumerate(trainloader):
   if idx == 50:
     print('+', end='')
  
  
   dirty=dirty.view(dirty.size(0),-1).type(torch.FloatTensor)
   clean=clean.view(clean.size(0),-1).type(torch.FloatTensor)
   dirty,clean=dirty.to(device),clean.to(device)
  
  
  
   #-----------------Forward Pass----------------------
   output=model(dirty)
   loss=criterion(output,clean)
   #-----------------Backward Pass---------------------
   optimizer.zero_grad()
   loss.backward()
   optimizer.step()
  
   running_loss+=loss.item()
   epochloss+=loss.item()
 #-----------------Log-------------------------------
 losslist.append(running_loss/l)
 running_loss=0
 print("======> epoch: {}/{}, Loss:{}".format(epoch,epochs,loss.item()))

Visualizing the results. 

f,axes= plt.subplots(6,3,figsize=(20,20))
axes[0,0].set_title("Original Image")
axes[0,1].set_title("Dirty Image")
axes[0,2].set_title("Cleaned Image")

test_imgs=np.random.randint(0,10000,size=6)
for idx in range((6)):
 dirty=testset[test_imgs[idx]][0]
 clean=testset[test_imgs[idx]][1]
 label=testset[test_imgs[idx]][2]
 dirty=dirty.view(dirty.size(0),-1).type(torch.FloatTensor)
 dirty=dirty.to(device)
 output=model(dirty)
  output=output.view(1,28,28)
 output=output.permute(1,2,0).squeeze(2)
 output=output.detach().cpu().numpy()
  dirty=dirty.view(1,28,28)
 dirty=dirty.permute(1,2,0).squeeze(2)
 dirty=dirty.detach().cpu().numpy()
  clean=clean.permute(1,2,0).squeeze(2)
 clean=clean.detach().cpu().numpy()
  axes[idx,0].imshow(clean,cmap="gray")
 axes[idx,1].imshow(dirty,cmap="gray")
 axes[idx,2].imshow(output,cmap="gray")
Denoising autoencoders output
The image above shows the output yielded by the denoising autoencoder which was able successfully remove noise from the input image and produced images similar to the original image. 

Case study 2: Anomaly detection in ECG with LSTM Autoencoders

An electrocardiogram (ECG or EKG) is a test that checks heart function by measuring its electrical activity. These signals can be measured using smartwatches. 

The ECG signal can tell a lot about a person’s health and well-being. In this case study, we’ll predict ECG signals and detect anomalies in the signal. 

We’ll be working with Attribute-Relation File Format (ARFF), so we’ll install two packages to support them. First is arff2pandas, to convert arff files into the pandas framework. Second: pandas profiling, to generate profile reports from a pandas DataFrame.

To install these packages in the colab notebook, we use the following command:

!pip install -qq arff2pandas
!pip install -U pandas-profiling

Once the required packages are installed, we can start importing them along with other packages required for this case study. 

import torch

import copy
import numpy as np
import pandas as pd
import seaborn as sns
from pylab import rcParams
import matplotlib.pyplot as plt
from matplotlib import rc
from sklearn.model_selection import train_test_split

from torch import nn, optim

import torch.nn.functional as F
from arff2pandas import a2p

For visually appealing plots, we set the following variables: 

%matplotlib inline
%config InlineBackend.figure_format='retina'

sns.set(style='whitegrid', palette='muted', font_scale=1.2)

HAPPY_COLORS_PALETTE = ["#01BEFE", "#FFDD00", "#FF7D00", "#FF006D", "#ADFF02", "#8F00FF"]

sns.set_palette(sns.color_palette(HAPPY_COLORS_PALETTE))

rcParams['figure.figsize'] = 12, 8

RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)

The following command can be used to download the data directly to your colab notebook:

!gdown --id 16MIleqoIr1vYxlGk4GKnGmrsCPuWkkpT
Once the data is downloaded we unzip it in the desired folder. 

The mkdir -p command creates a child directory along with the parent directory. In our case, ‘data’ is the parent directory and ‘timeseries’ is the child directory. 

!mkdir -p data/timeseries

Likewise, we can use !unzip ECG5000.zip -d data/timeseries for unzipping the file. The argument -d allows the file to get unzipped in the desired path. In our case, the directory that we created. 

!unzip ECG5000.zip -d data/timeseries

We’ll define the device for CPU or GPU use. 

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

Once the file is unzipped, we can then open it using the a2p function that opens the arff file with the pandas dataframe. 

with open('data/timeseries/ECG5000_TRAIN.arff') as f:
 train = a2p.load(f)

with open('data/timeseries/ECG5000_TEST.arff') as f:
 test = a2p.load(f)

Visualizing the data. 

df = train.append(test)
df = df.sample(frac=1.0)
df.shape
CLASS_NORMAL = 1

class_names = ['Normal','R on T','PVC','SP','UB']

ax = sns.countplot(df.target)
ax.set_xticklabels(class_names);
Autoencoders classes
As you can see from the bar chart above, the normal class has by far the most examples. Now let’s plot a time series graph to see how these features look like against the target feature.
def plot_time_series_class(data, class_name, ax, n_steps=10):
 time_series_df = pd.DataFrame(data)

 smooth_path = time_series_df.rolling(n_steps).mean()
 path_deviation = 2 * time_series_df.rolling(n_steps).std()

 under_line = (smooth_path - path_deviation)[0]
 over_line = (smooth_path + path_deviation)[0]

 ax.plot(smooth_path, linewidth=2)
 ax.fill_between(
   path_deviation.index,
   under_line,
   over_line,
   alpha=.125
 )
 ax.set_title(class_name)



classes = df.target.unique()

fig, axs = plt.subplots(
 nrows=len(classes) // 3 + 1,
 ncols=3,
 sharey=True,
 figsize=(14, 8)
)

for i, cls in enumerate(classes):
 ax = axs.flat[i]
 data = df[df.target == cls] \
   .drop(labels='target', axis=1) \
   .mean(axis=0) \
   .to_numpy()
 plot_time_series_class(data, class_names[i], ax)

fig.delaxes(axs.flat[-1])
fig.tight_layout()
Autoencoders classes
If you observe carefully, the normal class stands out compared to other classes. This data attribute can help the autoencoder find anomalies more easily. 

Time to create the dataset, with two variables: normal and anomaly, using a condition statement. Once the two variables are created, we can use train_test_split from sklearn to separate training, testing, and validation data.

normal_df = df[df.target == str(CLASS_NORMAL)].drop(labels='target', axis=1)

anomaly_df = df[df.target != str(CLASS_NORMAL)].drop(labels='target', axis=1)


train_df, val_df = train_test_split(
 normal_df,
 test_size=0.15,
 random_state=RANDOM_SEED
)

val_df, test_df = train_test_split(
 val_df,
 test_size=0.33,
 random_state=RANDOM_SEED
)

Once the data is separated, we convert the NumPy into a 2-dimensional tensor, which the autoencoder can use for training and modeling the data.

def create_dataset(df):

 sequences = df.astype(np.float32).to_numpy().tolist()

 dataset = [torch.tensor(s).unsqueeze(1).float() for s in sequences]

 n_seq, seq_len, n_features = torch.stack(dataset).shape

 return dataset, seq_len, n_features

train_dataset, seq_len, n_features = create_dataset(train_df)
val_dataset, _, _ = create_dataset(val_df)
test_normal_dataset, _, _ = create_dataset(test_df)
test_anomaly_dataset, _, _ = create_dataset(anomaly_df)

Now, let’s code the autoencoder. Unlike the method that we used earlier to model an image, we use a different method to model a time series or sequence data. For this case study, we’ll use Long Short Term Memory or LSTM.

LSTMs are good at modeling sequential data. They can remember long-term sequences, which makes it easier to predict variables based on sequence length. 

class Encoder(nn.Module):

 def __init__(self, seq_len, n_features, embedding_dim=64):
   super(Encoder, self).__init__()

   self.seq_len, self.n_features = seq_len, n_features
   self.embedding_dim, self.hidden_dim = embedding_dim, 2 * embedding_dim

   self.rnn1 = nn.LSTM(
     input_size=n_features,
     hidden_size=self.hidden_dim,
     num_layers=1,
     batch_first=True
   )
  
   self.rnn2 = nn.LSTM(
     input_size=self.hidden_dim,
     hidden_size=embedding_dim,
     num_layers=1,
     batch_first=True
   )

 def forward(self, x):
   x = x.reshape((1, self.seq_len, self.n_features))

   x, (_, _) = self.rnn1(x)
   x, (hidden_n, _) = self.rnn2(x)

   return hidden_n.reshape((self.n_features, self.embedding_dim))

Once the encoder is ready, we define a decoder as well. The decoder will also have the same LSTM architecture. 

class Decoder(nn.Module):

 def __init__(self, seq_len, input_dim=64, n_features=1):
   super(Decoder, self).__init__()

   self.seq_len, self.input_dim = seq_len, input_dim
   self.hidden_dim, self.n_features = 2 * input_dim, n_features

   self.rnn1 = nn.LSTM(
     input_size=input_dim,
     hidden_size=input_dim,
     num_layers=1,
     batch_first=True
   )

   self.rnn2 = nn.LSTM(
     input_size=input_dim,
     hidden_size=self.hidden_dim,
     num_layers=1,
     batch_first=True
   )

   self.output_layer = nn.Linear(self.hidden_dim, n_features)

 def forward(self, x):
   x = x.repeat(self.seq_len, self.n_features)
   x = x.reshape((self.n_features, self.seq_len, self.input_dim))

   x, (hidden_n, cell_n) = self.rnn1(x)
   x, (hidden_n, cell_n) = self.rnn2(x)
   x = x.reshape((self.seq_len, self.hidden_dim))

   return self.output_layer(x)

Let’s wrap the encoder and decoder together in a single module.

class RecurrentAutoencoder(nn.Module):

 def __init__(self, seq_len, n_features, embedding_dim=64):
   super(RecurrentAutoencoder, self).__init__()

   self.encoder = Encoder(seq_len, n_features, embedding_dim).to(device)
   self.decoder = Decoder(seq_len, embedding_dim, n_features).to(device)

 def forward(self, x):
   z = self.encoder(x)
   o = self.decoder(z)
   return o

Now we define the model.

model = RecurrentAutoencoder(seq_len, n_features, 128)
model = model.to(device)

Let’s write a function that we will train.

def train_model(model, train_dataset, val_dataset, n_epochs):
 optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
 criterion = nn.L1Loss(reduction='sum').to(device)
 history = dict(train=[], val=[])

 best_model_wts = copy.deepcopy(model.state_dict())
 best_loss = 10000.0
  for epoch in range(1, n_epochs + 1):
   model = model.train()

   train_losses = []
   for seq_true in train_dataset:
     optimizer.zero_grad()

     seq_true = seq_true.to(device)
     seq_pred = model(seq_true)

     loss = criterion(seq_pred, seq_true)

     loss.backward()
     optimizer.step()

     train_losses.append(loss.item())

   val_losses = []
   model = model.eval()
   with torch.no_grad():
     for seq_true in val_dataset:

       seq_true = seq_true.to(device)
       seq_pred = model(seq_true)

       loss = criterion(seq_pred, seq_true)
       val_losses.append(loss.item())

   train_loss = np.mean(train_losses)
   val_loss = np.mean(val_losses)

   history['train'].append(train_loss)
   history['val'].append(val_loss)

   if val_loss < best_loss:
     best_loss = val_loss
     best_model_wts = copy.deepcopy(model.state_dict())

   print(f'Epoch {epoch}: train loss {train_loss} val loss {val_loss}')

 model.load_state_dict(best_model_wts)
 return model.eval(), history

Note: We’re using L1Loss or mean absolute error. This is useful when dealing with outliers or anomalies in the data. Also, for sequence analysis, it works very well because it uses the same scale as the data being measured. 

Let’s begin the training.

model, history = train_model(
 model,
 train_dataset,
 val_dataset,
 n_epochs=150
)

Once the model is trained, we can start predicting the values and compare them with the original data to see how our model performs.

def predict(model, dataset):
 predictions, losses = [], []
 criterion = nn.L1Loss(reduction='sum').to(device)
 with torch.no_grad():
   model = model.eval()
   for seq_true in dataset:
     seq_true = seq_true.to(device)
     seq_pred = model(seq_true)

     loss = criterion(seq_pred, seq_true)

     predictions.append(seq_pred.cpu().numpy().flatten())
     losses.append(loss.item())
 return predictions, losses

_, losses = predict(model, train_dataset)


sns.distplot(losses, bins=50, kde=True)
Autoencoders case study loss
As you can see from the graph, the model managed to minimize loss on the training data set. Now let’s do the same with the test dataset and see how our model performs.
predictions, pred_losses = predict(model, test_normal_dataset)
sns.distplot(pred_losses, bins=50, kde=True)
Autoencoders case study performance
You can see similar performance on the validation set as well. The losses are reduced. 

Lastly, let’s see the model performance in a time-series graph.

def plot_prediction(data, model, title, ax):
 predictions, pred_losses = predict(model, [data])

 ax.plot(data, label='true')
 ax.plot(predictions[0], label='reconstructed')
 ax.set_title(f'{title} (loss: {np.around(pred_losses[0], 2)})')
 ax.legend()


fig, axs = plt.subplots(
 nrows=2,
 ncols=6,
 sharey=True,
 sharex=True,
 figsize=(22, 8)
)

for i, data in enumerate(test_normal_dataset[:6]):
 plot_prediction(data, model, title='Normal', ax=axs[0, i])

for i, data in enumerate(test_anomaly_dataset[:6]):
 plot_prediction(data, model, title='Anomaly', ax=axs[1, i])

fig.tight_layout()
Autoencoders case study performance

From the graph above, it’s quite evident that the model performs well at predicting normal ECG patterns. It doesn’t have enough accuracy to make a good prediction regarding anomaly patterns. If our model is trained for more epochs and more data, then it can perform well on the given task. 

Note: The model performs excellently to predict normal patterns because the dataset had more normal features compared to abnormal features. 

Conclusion

We’ve discussed what autoencoders are and how they can discover structure within data. The information bottleneck, which compresses the data, is important because it allows autoencoders to learn latent representation which can be used in various deep learning tasks. Autoencoders are advanced versions of PCA, which could be non-linear manifold and still be robust to outliers. 

We saw the different variants of autoencoders and how they improve each other for a specific task. We also coded autoencoder to get a better understanding of how it works and also saw how different dimensions of latent space can have a different effect on the results. 

Lastly, we explored how we can use autoencoders for two tasks: image denoising and anomaly detection. 

I hope this tutorial was interesting and informative. Thank you for reading!

Acknowledgment

The codes used in the second case study were inspired by Venelin. I just modified them to make it readable. 

Further reading


READ NEXT

ML Experiment Tracking: What It Is, Why It Matters, and How to Implement It

10 mins read | Author Jakub Czakon | Updated July 14th, 2021

Let me share a story that I’ve heard too many times.

”… We were developing an ML model with my team, we ran a lot of experiments and got promising results…

…unfortunately, we couldn’t tell exactly what performed best because we forgot to save some model parameters and dataset versions…

…after a few weeks, we weren’t even sure what we have actually tried and we needed to re-run pretty much everything”

– unfortunate ML researcher.

And the truth is, when you develop ML models you will run a lot of experiments.

Those experiments may:

  • use different models and model hyperparameters
  • use different training or evaluation data, 
  • run different code (including this small change that you wanted to test quickly)
  • run the same code in a different environment (not knowing which PyTorch or Tensorflow version was installed)

And as a result, they can produce completely different evaluation metrics. 

Keeping track of all that information can very quickly become really hard. Especially if you want to organize and compare those experiments and feel confident that you know which setup produced the best result.  

This is where ML experiment tracking comes in. 

Continue reading ->
Graph-Neural-Network-GNN

Graph Neural Network and Some of GNN Applications – Everything You Need to Know

Read more
Organize Deep Learning projects

How to Organize Deep Learning Projects – Examples of Best Practices

Read more
Metadata store solutions

Best Metadata Store Solutions: Kubeflow Metadata vs TensorFlow Extended (TFX) ML Metadata (MLMD) vs MLflow vs Neptune

Read more

The Best Tools for Machine Learning Model Visualization

Read more