Saturday, March 30, 2024

Wasserstein Generative Adversarial Networks (WGANs) - An Easy Tutorial.

Introduction.

The unbeatable aspects of Wasserstein Generative Adversarial Networks (WGANs) come from their significant improvements over traditional GAN architectures. They address critical challenges like mode collapse and enhance the generation of high-quality, diverse samples. The following are the key technical advancements in WGAN architecture that motivate me to create tutorials on WGAN:

  1. Wasserstein Distance: Shifts from traditional metrics to the Wasserstein distance for more meaningful training gradients, reducing mode collapse and stabilizing network convergence.
  2. Weight Clipping and Lipschitz Constraint: Initially, WGANs used weight clipping to meet the Lipschitz constraint for the Wasserstein distance, but this approach had drawbacks like capacity underuse and gradient problems. The WGAN-GP variant introduced a gradient penalty to overcome these issues, leading to better training stability and sample quality.
  3. Gradient Penalty (WGAN-GP): Incorporates a gradient penalty in the loss function, promoting stable training and high-quality output by preventing excessive critic gradients.
  4. Critic Role: Unlike traditional GANs' discriminators, WGAN critics assess generated sample quality on a continuous scale, enabling finer quality evaluation and aiding in model training dynamics.
  5. Training Protocol: WGANs employ a distinct training method, often involving more frequent training of the critic than the generator to provide effective gradients, ensuring balanced learning and model stability.

     These advancements make WGANs superior for generating realistic samples and ensuring smoother model training, maintaining their unique position in AI research and development.

Video Tutorials.

Part-1

Part-2

Part-3


Code - Training WGAN

# example of training a wgan on mnist
from numpy import expand_dims
import keras
import keras.backend as K
import tensorflow as tf
import numpy as np
from keras import Model
from keras.optimizers import Adam
from keras.layers import Input, Reshape, Flatten
from keras.layers import Dense, BatchNormalization, Conv2D, Conv2DTranspose, LeakyReLU, Dropout
batch_size = 32
input_shape = (28, 28, 1)
latent_dim = 100
img_shape = (28, 28, 1)
class WGAN_1:
def __init__(self):
print("welcome to WGAN coding")
# write code for wasserstein loss.
def wasserstein_loss(self, y_true, y_pred):
return K.mean(y_true * y_pred)

def preprocess_real_part_training_dataset(self):
# load mnist dataset
(dataX, dataY), (testDX, testDY) = keras.datasets.fashion_mnist.load_data()
# Select the first 1000 rows of training data and labels
dataX = dataX[:1000]
dataY = dataY[:1000]
# Add an additional dimension for the grayscale channel by using expand_dims() from NumPy
dataX = expand_dims(dataX, axis=-1)
# convert from unsigned ints to floats and scale from [0,255] to [0,1]
dataX = dataX.astype(np.float32) / 255.0
return dataX

# latent_dim = 100
# img_shape = (28, 28, 1)
def define_generator(self, latent_dim, img_shape):
inputs = Input(shape=latent_dim)
# Project and reshape the input
proj = Dense(128 * 7 * 7)(inputs)
proj = Reshape((7, 7, 128))(proj)
# Upsample to 14x14
upsample_1 = Conv2DTranspose(filters=128, kernel_size=4, strides=2, padding='same', activation=LeakyReLU(alpha=0.2),)(proj)
upsample_1 = BatchNormalization()(upsample_1)
# Upsample to 28x28
upsample_2 = Conv2DTranspose(filters=128, kernel_size=4, strides=2, padding='same', activation=LeakyReLU(alpha=0.2),)(upsample_1)
upsample_2 = BatchNormalization()(upsample_2)
# Generate output image (28x28x1)
gen_output = Conv2D(filters=img_shape[2], kernel_size=7, activation='tanh', padding='same')(upsample_2)
g_model = Model(inputs, gen_output)
g_model.summary()
# keras.utils.plot_model(g_model, to_file="g_model.png", show_shapes=True)
return g_model

# input_shape = (28, 28, 1)
def define_critic(self, input_shape):
inputs = Input(shape=input_shape)
# convolution layers
conv1 = Conv2D(filters=64, kernel_size=3, strides=2, activation=LeakyReLU(alpha=0.2), padding='same')(inputs)
conv1 = Dropout(0.4)(conv1)
conv1 = Conv2D(filters=128, kernel_size=3, strides=2, activation=LeakyReLU(alpha=0.2), padding='same')(conv1)
conv1 = Dropout(0.4)(conv1)
conv1 = Conv2D(filters=256, kernel_size=3, strides=2, activation=LeakyReLU(alpha=0.2), padding='same')(conv1)
conv1 = Dropout(0.4)(conv1)
# Flatten Layer
flatten_layer = Flatten()(conv1)
critic_decision_layer = Dense(1)(flatten_layer)
critic_model = Model(inputs, critic_decision_layer)
# compile model
optimizer = keras.optimizers.RMSprop(learning_rate=0.00005)
critic_model.compile(loss=self.wasserstein_loss, optimizer=optimizer, metrics=['accuracy'])
critic_model.summary()
# keras.utils.plot_model(critic_model, to_file="critic_model.png", show_shapes=True)
return critic_model

def define_wgan(self,latent_dim0, img_shape0):
# Define the input for the generator
latent_input = Input(shape=(latent_dim0,))
# Build the generator
generator_output = self.define_generator(latent_dim=latent_dim0,img_shape=img_shape0)(latent_input)
# Build the critic
critic_input = Input(shape=img_shape0)
critic_output = self.define_critic(input_shape=img_shape0)(critic_input)
# Compile the critic
critic = Model(critic_input, critic_output)
critic.compile(loss=self.wasserstein_loss, optimizer=Adam(lr=0.0002, beta_1=0.5))
# Make the critic not trainable
critic.trainable = False
# Combine the generator and critic
gan_output = critic(generator_output)
wgan_model = Model(latent_input, gan_output)
# Compile the GAN
wgan_model.compile(loss=self.wasserstein_loss, optimizer="adam")
wgan_model.summary()
# keras.utils.plot_model(wgan_model, to_file="wgan_model.png", show_shapes=True)
return wgan_model

def train_save_models(self, clip_value, n_critic, batch_size, input_shape, latent_dim, img_shape, n_epochs=2):
# manually enumerate epochs
trainX = self.preprocess_real_part_training_dataset()
g_model = self.define_generator(latent_dim=latent_dim,img_shape=img_shape)
critic_model = self.define_critic(input_shape)
wgan_main = self.define_wgan(latent_dim0=latent_dim,img_shape0=img_shape)
realY = -tf.ones(shape=(batch_size, 1))
fakeY = tf.ones(shape=(batch_size, 1))
for i in range(n_epochs):
for j in range(len(trainX) // batch_size):
# generate random noise as an input to initialize the generator
noise = tf.random.normal(shape=[batch_size, latent_dim], mean=0, stddev=1)
for _ in range(n_critic):
critic_model.trainable=True
# Real samples
X_real = trainX[j * batch_size : (j + 1) * batch_size]
Y_real = realY
d_loss_real = critic_model.train_on_batch(x = X_real,y = Y_real)
# fake samples
X_fake = g_model.predict_on_batch(noise)
Y_fake = fakeY
d_loss_fake = critic_model.train_on_batch(x = X_fake, y = Y_fake)
# Clip critic weights
for l in critic_model.layers:
weights = l.get_weights()
weights = [np.clip(w, (1-clip_value), clip_value) for w in weights]
l.set_weights(weights)
# Train Generator weights
critic_model.trainable = False
g_loss_batch = wgan_main.train_on_batch(x=noise, y=realY)
print("epoch = ",i,"//",n_epochs," batch = ", j," G_loss_batch ", g_loss_batch)

g_model.save("g_model.h5")
critic_model.save("critic_model.h5")
wgan_main.save("wgan_model.h5")

if __name__ == "__main__":
print ("Executed when invoked directly")
input_shape1 = (28, 28, 1)
img_shape1 = (28, 28, 1)
latent_dim1 = 100
n_critic = 5
clip_value = 0.01
# Create some dog objects
wgan1 = WGAN_1()
critic_model = wgan1.define_critic(input_shape=img_shape1)
g_model = wgan1.define_generator(latent_dim=latent_dim1, img_shape=img_shape1)
gan_model = wgan1.define_wgan(latent_dim0=latent_dim1,img_shape0=img_shape1)
wgan1.train_save_models(n_critic=n_critic, clip_value=clip_value,batch_size=32, input_shape=input_shape1,latent_dim=latent_dim1, img_shape=img_shape1,n_epochs=2)

Code - Testing Generator Model.

# example of loading the generator model and generating images
import numpy as np
from keras.models import load_model
from numpy.random import randn
from keras.models import load_model
from matplotlib import pyplot
import matplotlib.pyplot as plt
# load model
model = load_model('g_model.h5')
# Generate synthetic images
num_images = 10
latent_dim = 100
noise = np.random.normal(0, 1, (num_images, latent_dim))
generated_images = model.predict(noise)

# Plot the generated images
plt.figure(figsize=(10, 10))
for i in range(num_images):
plt.subplot(1, num_images, i+1)
plt.imshow(generated_images[i, :, :, 0], cmap='gray')
plt.axis('off')
plt.show()

Reference:

1. Wasserstein GAN; Martin Arjovsky (Courant Institute of Mathematical Sciences), Soumith Chintala, and Leon Bottou1 (Facebook AI Research) 

2. Ti, Yu. "Gradient Penalty Approach for Wasserstein Generative Adversarial Networks."

3. Kwon, Dohyun, Yeoneung Kim, Guido Montúfar, and Insoon Yang. "Training Wasserstein GANs without gradient penalties." arXiv preprint arXiv:2110.14150 (2021).

4. Guo, Xin, Johnny Hong, Tianyi Lin, and Nan Yang. "Relaxed Wasserstein with applications to GANs." In ICASSP 2021-2021 IEEE International Conference on Acoustics, Speech and Signal Processing (ICASSP), pp. 3325-3329. IEEE, 2021.

No comments: