Source code for autopandas.generators.vae

# Variational Autoencoder
# Inspired from: http://louistiao.me/posts/implementing-variational-autoencoders-in-keras-beyond-the-quickstart-tutorial/

# Imports
from warnings import warn
from .autoencoder import AE
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.stats import norm
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Input, Dense, Lambda, Layer, Add, Multiply, Conv2D, MaxPooling2D, UpSampling2D, Flatten, Reshape, Dropout
from tensorflow.keras.models import Model, Sequential
import autopandas

[docs]class KLDivergenceLayer(Layer): """ Identity transform layer that adds KL divergence to the final model loss. """
[docs] def __init__(self, *args, **kwargs): self.is_placeholder = True super(KLDivergenceLayer, self).__init__(*args, **kwargs)
[docs] def call(self, inputs): mu, log_var = inputs kl_batch = - .5 * K.sum(1 + log_var - K.square(mu) - K.exp(log_var), axis=-1) self.add_loss(K.mean(kl_batch), inputs=inputs) return inputs
[docs]class VAE(AE):
[docs] def __init__(self, input_dim, layers=[], latent_dim=2, architecture='fully', epsilon_std=1.0, loss='nll', optimizer='rmsprop', decoder_layers=None): """ Variational Autoencoder. :param input_dim: Input/output size. :param layers: Dimension of intermediate layers (encoder and decoder). It can be: - an integer (one intermediate layer) - a list of integers (several intermediate layers) :param latent_dim: Dimension of latent space layer. :param architecture: 'fully', 'cnn'. :param espilon_std: Standard deviation of gaussian distribution prior. :param decoder_layers: Dimension of intermediate decoder layers for asymmetrical architectures. """ if isinstance(layers, int): # 1 intermediate layers layers = [layers] # attributes self.input_dim = input_dim self.layers = layers self.decoder_layers = decoder_layers or layers[::-1] self.latent_dim = latent_dim self.epsilon_std = epsilon_std # for data frame indexes self.columns = None self.indexes = None # loss function loss_function = self.init_loss(loss=loss) # init architecture autoencoder, encoder, decoder = self.init_model(architecture=architecture) autoencoder.compile(optimizer=optimizer, loss=loss_function) self.autoencoder = autoencoder self.encoder = encoder self.decoder = decoder
def _init_model_fc(self): """ Initialize fully connected architecture. """ # encoder architecture x = Input(shape=(self.input_dim,)) h = x for layer_dim in self.layers: h = Dense(layer_dim, activation='relu')(h) # decoder architecture latent_input = Input(shape=(self.latent_dim,)) decoder = latent_input for layer_dim in self.decoder_layers: decoder = Dense(layer_dim, activation='relu')(decoder) decoder = Dense(self.input_dim, activation='sigmoid')(decoder) decoder = Model(latent_input, decoder) # variational layer z_mu = Dense(self.latent_dim)(h) z_log_var = Dense(self.latent_dim)(h) z_mu, z_log_var = KLDivergenceLayer()([z_mu, z_log_var]) # sampling z_sigma = Lambda(lambda t: K.exp(.5*t))(z_log_var) eps = Input(tensor=K.random_normal(stddev=self.epsilon_std, shape=(K.shape(x)[0], self.latent_dim))) z_eps = Multiply()([z_sigma, eps]) z = Add()([z_mu, z_eps]) x_pred = decoder(z) autoencoder = Model([x, eps], x_pred) encoder = Model(x, z_mu) return autoencoder, encoder, decoder def _init_model_cnn(self, kernel=(3, 3), pool=(2, 2), strides=(2, 2), dense_layer=None): """ Initialize CNN architecture. :param dense_layer: supplementary dense layer (in encoder and decoder) between convolution and latent space. """ warn('strides argument is currently not implemented.') if self.layers != self.decoder_layers[::-1]: warn('self.layers is {} and self.decoder_layers is {}. Use asymmetric architecture with CNN wisely.'.format(self.layers, self.decoder_layers)) # encoder architecture x = Input(shape=self.input_dim) h = x for layer_dim in self.layers: h = Conv2D(layer_dim, kernel, activation='relu', padding='same')(h) h = MaxPooling2D(pool, padding='same')(h) new_shape = h.shape[1:] h = Flatten()(h) flatten_dim = h.shape[1] if dense_layer is not None: # add dense layer h = Dense(dense_layer)(h) # variational layer z_mu = Dense(self.latent_dim)(h) z_log_var = Dense(self.latent_dim)(h) z_mu, z_log_var = KLDivergenceLayer()([z_mu, z_log_var]) # decoder architecture latent_input = Input(shape=(self.latent_dim,)) if dense_layer is not None: # add dense layer decoder = Dense(dense_layer)(latent_input) decoder = Dense(flatten_dim)(decoder) else: decoder = Dense(flatten_dim)(latent_input) decoder = Reshape(new_shape)(decoder) for layer_dim in self.decoder_layers: decoder = Conv2D(layer_dim, kernel, activation='relu', padding='same')(decoder) decoder = UpSampling2D(pool)(decoder) decoder = Conv2D(1, kernel, activation='sigmoid', padding='same')(decoder) decoder = Model(latent_input, decoder) # define decoder # sampling z_sigma = Lambda(lambda t: K.exp(.5*t))(z_log_var) eps = Input(tensor=K.random_normal(stddev=self.epsilon_std, shape=(K.shape(x)[0], self.latent_dim))) z_eps = Multiply()([z_sigma, eps]) z = Add()([z_mu, z_eps]) x_pred = decoder(z) autoencoder = Model([x, eps], x_pred) # add decoder sequentially encoder = Model(x, z_mu) # define encoder return autoencoder, encoder, decoder