# Autoencoder
# Imports
from warnings import warn
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.stats import norm
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Input, Dense, Lambda, Layer, Add, Multiply, Conv2D, MaxPooling2D, UpSampling2D, Flatten, Reshape, Dropout
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.losses import mse, binary_crossentropy
import autopandas
def _nll(y_true, y_pred):
""" Negative log likelihood (Bernoulli). """
# tf.keras.losses.binary_crossentropy gives the mean
# over the last axis. we require the sum
return K.sum(binary_crossentropy(y_true, y_pred), axis=-1)
def _mse(y_true, y_pred):
return mse(y_true, y_pred) # to pass loss to compile function
def _binary_crossentropy(y_true, y_pred):
return binary_crossentropy(y_true, y_pred) # to pass loss to compile function
[docs]class AE():
[docs] def __init__(self, input_dim, layers=[], latent_dim=2, architecture='fully', loss='nll', optimizer='rmsprop', decoder_layers=None):
""" Autoencoder with fully connected layers.
Default behaviour: Symmetric layers but no weight sharing.
Default behaviour: For CNN architecture, if latent_dim is None then there is no dense layers.
The latent space dimension depends on the convolutional layers in this case.
:param input_dim: Input/output size.
:param layers: Dimension of intermediate layers (encoder and decoder).
It can be:
- an integer (one intermediate layer)
- a list of integers (several intermediate layers)
:param latent_dim: Dimension of latent space layer.
:param architecture: 'fully', 'cnn'.
:param espilon_std: Standard deviation of gaussian distribution prior.
:param decoder_layers: Dimension of intermediate decoder layers for asymmetrical architectures.
"""
if isinstance(layers, int): # 1 intermediate layers
layers = [layers]
# attributes
self.input_dim = input_dim
self.layers = layers
# only for asymmetrical architecture
# in the decoder we arrange layers in the opposite order compared to the encoder
self.decoder_layers = decoder_layers or layers[::-1]
self.latent_dim = latent_dim
# for data frame indexes
self.columns = None
self.indexes = None
# loss function
loss_function = self.init_loss(loss=loss)
# init architecture
autoencoder, encoder, decoder = self.init_model(architecture=architecture)
autoencoder.compile(optimizer=optimizer, loss=loss_function)
self.autoencoder = autoencoder
self.encoder = encoder
self.decoder = decoder
[docs] def init_loss(self, loss='nll'):
if loss == 'nll':
loss_function = _nll
elif loss == 'mse':
loss_function = _mse
elif loss == 'binary_crossentropy':
loss_function = _binary_crossentropy
else:
raise Exception('Unknown loss name: {}'.format(loss))
return loss_function
[docs] def init_model(self, architecture='fully'):
""" :param architecture: 'fully', 'cnn'
"""
if architecture in ['fully', 'fc']:
return self._init_model_fc()
elif architecture in ['cnn', 'CNN']:
return self._init_model_cnn()
else:
raise Exception('Unknown architecture: {}'.format(architecture))
def _init_model_fc(self):
""" Initialize fully connected architecture.
"""
# encoder architecture
input = Input(shape=(self.input_dim,))
x = input
for layer_dim in self.layers:
x = Dense(layer_dim, activation='relu')(x)
x = Dense(self.latent_dim, activation='relu')(x)
z = x
# decoder architecture
for layer_dim in self.decoder_layers:
#x = Dense(self.decoder_layers[0], input_dim=self.latent_dim, activation='relu')(x)
x = Dense(layer_dim, activation='relu')(x)
x = Dense(self.input_dim, activation='sigmoid')(x)
autoencoder = Model(input, x) # define autoencoder
encoder = Model(input, z) # define encoder
# define decoder
latent_input = Input(shape=(self.latent_dim,))
decoder = latent_input
for i in range((len(self.decoder_layers)+1)*-1, 0):
decoder = autoencoder.layers[i](decoder)
decoder = Model(latent_input, decoder)
return autoencoder, encoder, decoder
def _init_model_cnn(self, kernel=(3, 3), pool=(2, 2), strides=(2, 2)):
""" Initialize CNN architecture.
"""
bool = self.latent_dim is not None # if latent_dim is not defined then the latent space dim will depend on the convolutional layers
warn('strides argument is currently not implemented.')
if self.layers != self.decoder_layers[::-1]:
warn('self.layers is {} and self.decoder_layers is {}. Use asymmetric architecture with CNN wisely.'.format(self.layers, self.decoder_layers))
# encoder architecture
input = Input(shape=self.input_dim)
x = input
for layer_dim in self.layers:
x = Conv2D(layer_dim, kernel, activation='relu', padding='same')(x)
x = MaxPooling2D(pool, padding='same')(x)
# flatten encoding
new_shape = x.shape[1:]
x = Flatten()(x)
flatten_dim = x.shape[1]
if bool:
x = Dense(self.latent_dim)(x) # dense layer to latent space
else:
self.latent_dim = flatten_dim # no dense layers before and after latent space
z = x # latent space
# decoder architecture
if bool:
x = Dense(flatten_dim)(x) # inverse dense layer
x = Reshape(new_shape)(x)
for layer_dim in self.decoder_layers:
x = Conv2D(layer_dim, kernel, activation='relu', padding='same')(x)
x = UpSampling2D(pool)(x)
x = Conv2D(1, kernel, activation='sigmoid', padding='same')(x)
# define models
autoencoder = Model(input, x)
encoder = Model(input, z)
latent_input = Input(shape=(self.latent_dim,))
decoder = latent_input
index = (len(self.decoder_layers)*2+2)*-1
if bool:
index -= 1 # one more layer
for i in range(index, 0):
decoder = autoencoder.layers[i](decoder)
decoder = Model(latent_input, decoder)
return autoencoder, encoder, decoder
[docs] def get_autoencoder(self):
return self.autoencoder
[docs] def get_encoder(self):
return self.encoder
[docs] def get_decoder(self):
return self.decoder
[docs] def fit(self, X, X2=None, **kwargs):
if isinstance(X, pd.DataFrame):
self.columns = X.columns
if isinstance(X, autopandas.AutoData):
self.indexes = X.indexes
X = X.as_matrix()
if X2 is None:
return self.autoencoder.fit(X, X, **kwargs)
else: # for robustness and being able to put two different distributions
return self.autoencoder.fit(X, X2, **kwargs)
[docs] def sample(self, n=100, loc=0, scale=1):
""" :param scale: Standard deviation of gaussian distribution prior.
"""
randoms = np.array([np.random.normal(loc, scale, self.latent_dim) for _ in range(n)])
decoded = self.decoder.predict(randoms)
try:
decoded = autopandas.AutoData(decoded)
if self.columns is not None:
decoded.columns = self.columns
if self.indexes is not None:
decoded.indexes = self.indexes
except:
warn('Impossible to cast sampled data to autopandas.AutoData')
return decoded
[docs] def siamese_distance(self, x, y, **kwargs):
x_enc = self.encoder.predict(np.array([x]))
y_enc = self.encoder.predict(np.array([y]))
return autopandas.metric.distance(x_enc, y_enc, **kwargs)
[docs] def distance(self, X, Y, **kwargs):
""" Step 1: project X and Y in the learned latent space,
Step 2: compute distance between the projections (NNAA score by default).
"""
X_enc = self.encoder.predict(X)
Y_enc = self.encoder.predict(Y)
if not isinstance(X_enc, autopandas.AutoData):
X_enc = autopandas.AutoData(X_enc)
if not isinstance(Y_enc, autopandas.AutoData):
Y_enc = autopandas.AutoData(Y_enc)
return X_enc.distance(Y_enc, **kwargs)