Anomaly detection with parametrized quantum circuits

With this example we want to study the quantum version of a classic machine learning algorithm known as anomaly detection.

Background

In classical machine learning, anomaly detection is implemented with an artificial neural network architecture called autoencoder. In quantum machine learning, the autoencoder is realised using parametrized quantum circuits.The proposed algorithm is not meant to outperform the classical counterpart on classical data. This example aims to demonstrate that it is possible to use quantum variational algorithms for anomaly detection with possible future advantages in the analysis of quantum data. In the next two sections a short introductions about anomaly detection algorithms and paametrized quantum circuits are reported.

Anomaly detection

Anomaly detection is a classification algorithm that allows to identify anomalous data. The advantage in using this machine learning technique is that only standard data are required for the training. To achieve this it's necessary to train a particular artificial neural network (ANN) architecture called autoencoder. An autoencoder is composed of two main parts: encoder and decoder. The encoder compresses initial data down to a small dimension (called latent dimension). The decoder inverts the process to reconstruct the original data from the compressed one. The parameters of the neural network are trained in order to minimise the difference between the initial and reconstructed data. The loss function (also called reconstruction loss) is therefore a measure of how accurately the reconstructed data resembles the original.

For anomaly detection, the autoencoder is trained only on data samples belonging to the standard class. When the trained model is applied to new samples we expect the loss function to have different values for standard and anomalous data. By choosing a threshold value for the loss function it is possible to classify an input based on whether its reconstruction loss lands above or below this threshold. The ROC curve (Receiver operating characteristic) indicates the true positive rate and false positive rate as a function of the threshold. This can help to set the threshold value in order to maximize true positive classifications and minimize false positives.

Parametrized quantum circuits

A parametrized quantum circuit (PQC), also known as variational quantum circuits, can be used as the quantum counterpart of classical ANNs. In this kind of circuits the input information is stored in the initial state of the qubits. It can be stored as the phase (phase encoding) or in the states amplitudes (amplitude encoding). The initial state is transformed using rotation gates and entangling gates, usually controlled-not (C-NOT) gates. These gates can be organised in layers, in this circuit architecture one layer is composed of rotation gates (R_x, R_y, R_z) acting on all qubits followed by a series of C-NOT gates coupling neighbouring qubits. The trainable weights are the angles of rotation gates and can be trained using standard backpropagation (implemented with Tensorflow).

A quantum circuit implements a unitary, thus invertible, transformation on the initial state. This represents a great advantage for the autoencoder architecture, as the decoder can be taken as the inverse of the encoder quantum circuit. In order to compress information the encoder circuit has to disentangle and set to zero state a given number of qubits. The loss function is thus taken as the expected measurement values of these qubits. In this way, for the training of the circuit, it is necessary only the encoder.

Dataset

In this example anomaly detection is performed on handwritten digits of the MNIST dataset using zeros as the standard data and ones as the anomalous data.

Loading required features

import tensorflow as tf
import numpy as np
from tensorflow.keras.datasets import mnist
import matplotlib.pyplot as plt

Load MNIST dataset and merge test and train sets (we will make this division later).

(data_train, labels_train), (data_test, labels_test) = mnist.load_data()
data = tf.concat([data_train, data_test], 0)
labels = tf.concat([labels_train, labels_test], 0)

print(data.shape)
print(labels.shape)

Divide between standandard data (zero digits) and anomalous data (one digits). After that reshape the images to 8x8 pixels and then convert them to vectors. Note that it is necessary to add one channel to reshape images with tensorflow. At last normalize images, this is necessary for amplitude encoding. In order to normalise images we have to convert them to numpy.

# Divide standard and anomalous data
standard_data = data[labels==0]
anomalous_data = data[labels==1]

# Reshape images
standard_data = tf.reshape(standard_data, [-1,28,28,1])
standard_data = tf.image.resize(standard_data, [8,8])
standard_data = tf.reshape(standard_data, [-1,64])

anomalous_data = tf.reshape(anomalous_data, [-1,28,28,1])
anomalous_data = tf.image.resize(anomalous_data, [8,8])
anomalous_data = tf.reshape(anomalous_data, [-1,64])

# Normalise images
standard_data_np=standard_data.numpy()
anomalous_data_np=anomalous_data.numpy()
    
for i in range(len(standard_data)):
    standard_data_np[i]=standard_data_np[i]/np.linalg.norm(standard_data_np[i])
    
for i in range(len(anomalous_data)):
    anomalous_data_np[i]=anomalous_data_np[i]/np.linalg.norm(anomalous_data_np[i])
    
standard_data=tf.convert_to_tensor(standard_data_np)
anomalous_data=tf.convert_to_tensor(anomalous_data_np)

Now we have 6903 standard data and 7877 anomalous data in the form of vectors of 64 elements. Let us visualize two of them as 8x8 images.

print("Standard data shape:", standard_data.shape)
print("Anomalous data shape:", anomalous_data.shape)

print("STANDARD")
plt.imshow(tf.reshape(standard_data[4], [8,8]), cmap="gray")
plt.show()

print("ANOMALOUS")
plt.imshow(tf.reshape(anomalous_data[1], [8,8]), cmap="gray")
plt.show()

Quantum circuit definition

The dataset is ready, now we can create the circuit ansatz. We will do it with Qibo.

import qibo
from qibo import gates
from qibo.models import Circuit

qibo.set_backend("tensorflow")

Hyper-parameters of the circuit

# Number of qubits in the circuit (6 qubits for 64 features)
n_qubits=6
# Number of circuit layers
n_layers=6
# Number of qubits to be compressed
q_compression=3

# Number of trainable parameters, defined by the other hyper-parameters
n_params=(n_layers*n_qubits+q_compression)*3

print("Trainable parameters:", n_params)

Create and Draw the circuit

def make_encoder(n_qubits, n_layers, params, q_compression):
    index=0
    encoder = Circuit(n_qubits)
    for i in range(n_layers):
        for j in range(n_qubits):
            encoder.add(gates.RX(j, params[index]))
            encoder.add(gates.RY(j, params[index+1]))
            encoder.add(gates.RZ(j, params[index+2]))
            index+=3
        
        for j in range(n_qubits):
            encoder.add(gates.CNOT(j,(j+1)%n_qubits))
            
    for j in range(q_compression):
        encoder.add(gates.RX(j, params[index]))
        encoder.add(gates.RY(j, params[index+1]))
        encoder.add(gates.RZ(j, params[index+2])) 
        index+=3  
    return encoder

# Parameters are initialised with random values
params = tf.Variable(tf.random.normal((n_params,)))

encoder=make_encoder(n_qubits, n_layers, params, q_compression)
print("Circuit model summary")
print(encoder.draw())

Training

Now we are ready to train the circuit. But first import the last required features.

import math
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.optimizers import schedules

Hyper-parameters for training

Learning rate decay helps avoiding barren plateaus. For more informations about this learning rate schedule see: Tensorflow PiecewiseConstantDecay.

# Number of training epochs
nepochs = 14
# Batch size
batch_size=20
# Dimension of the training set (the remaining samples will be used for testing)
train_size=4000
# File where trained parameters are saved
filename="trained_params"

steps_for_epoch=math.ceil(train_size/batch_size)
train_set=standard_data[0:train_size]

# Learning rate
boundaries = [
    steps_for_epoch*2, 
    steps_for_epoch*4, 
    steps_for_epoch*6, 
    steps_for_epoch*8, 
    steps_for_epoch*10, 
    steps_for_epoch*12
]
values = [0.4, 0.2, 0.08, 0.04, 0.01, 0.005, 0.001]
learning_rate_fn = schedules.PiecewiseConstantDecay(
    boundaries, values)
optimizer = Adam(learning_rate=learning_rate_fn)

Loss function

Let us define the loss function and a training step. The loss is the sum of the ground state measurement probabilities of the compressed qubits. Thus this loss forces the compressed qubits in the |1> state. At each training step the loss is computed on a batch and parameters are updated with backpropagation.

NB: For a training speed-up it is possible to use @tf.function decorators. However this can generate some problems for model evaluation (common error: "Operation object has no attribute _Graph")

# @tf.function
def compute_loss(encoder, params, vector):
    encoder.set_parameters(params)
    out=encoder(vector)
    # 3 compressed qubits
    loss=out.probabilities(qubits=[0])[0] + out.probabilities(qubits=[1])[0] + out.probabilities(qubits=[2])[0]
    return loss

# @tf.function
def train_step(batch_size, encoder, params, dataset):
    loss=0.
    with tf.GradientTape() as tape:
        for sample in range(batch_size):
            loss=loss+compute_loss(encoder, params, dataset[sample])
        loss=loss/batch_size
        grads = tape.gradient(loss, params)
        optimizer.apply_gradients(zip([grads], [params]))
    return loss

Now we can start the training

# This array contains the trained parameters at each epoch
trained_params = np.zeros((nepochs, n_params), dtype=float)

print("Start training")
for epoch in range(nepochs):
    tf.random.shuffle(train_set)
    for i in range(steps_for_epoch):
        loss=train_step(batch_size, encoder, params, train_set[i*batch_size: (i+1)*batch_size])
    trained_params[epoch]=params.numpy()
    print("Epoch: %d  Loss: %f" % (epoch+1,loss))

If you are running the script locally you can save the trained parameters of the best epoch

# Choose epoch parameters you want to use
best_epoch=nepochs
# np.save(filename, trained_params[best_epoch-1])

Performance test

Test the performance of the model on the test set. First create the two test sets.

standard_test_set = standard_data[train_size:]
dim_test = len(standard_test_set)
anomalous_test_set = anomalous_data[0:dim_test]

print("Test will be performed on %d standard and %d anomalous samples." % (dim_test, dim_test))

Compute the loss on test samples.

encoder.set_parameters(tf.convert_to_tensor(trained_params[best_epoch-1]))
encoder.compile()

def compute_loss_test(encoder, vector):
    out=encoder(vector)
    loss=out.probabilities(qubits=[0])[0] + out.probabilities(qubits=[1])[0] + out.probabilities(qubits=[2])[0]
    return loss

print("Computing standard losses")
loss_s = []
for i in range(dim_test):
    loss_s.append(compute_loss_test(encoder, standard_test_set[i]).numpy())

print("Computing anomalous losses")
loss_a = []
for i in range(dim_test):
    loss_a.append(compute_loss_test(encoder, anomalous_test_set[i]).numpy())

Plot loss function distribution

plt.hist(loss_a, bins=40, histtype="step", color="red", label="Anomalous data")
plt.hist(loss_s, bins=40, histtype="step", color="blue", label="Standard data")
plt.ylabel("Number of images")
plt.xlabel("Loss value")
plt.title("Loss function distribution (MNIST dataset)")
plt.legend()
plt.show()

Compute and plot ROC curve

max1=np.amax(loss_s)
max2=np.amax(loss_a)
ma=max(max1,max2)
min1=np.amin(loss_s)
min2=np.amin(loss_a)
mi=min(min1,min2)

tot_neg=len(loss_s)
tot_pos=len(loss_a)

n_step=100.
n_step_int=100
step=(ma-mi)/n_step
fpr=[]
tpr=[]
for i in range(n_step_int):
  treshold=i*step+mi
  c=0
  for j in range(tot_neg):
    if loss_s[j]>treshold:
      c+=1
  false_positive=c/float(tot_neg)
  fpr.append(false_positive)
  c=0
  for j in range(tot_pos):
    if loss_a[j]>treshold:
      c+=1
  true_positive=c/float(tot_pos)
  tpr.append(true_positive)


plt.title("Receiver Operating Characteristic")
plt.plot(fpr, tpr)
plt.xlim([0, 1])
plt.ylim([0, 1])
plt.ylabel('True Positive Rate')
plt.xlabel('False Positive Rate')
plt.show()

© The Qibo Team.