Saturday, March 30, 2024

Wasserstein Generative Adversarial Networks (WGANs) - An Easy Tutorial.

Introduction.

The unbeatable aspects of Wasserstein Generative Adversarial Networks (WGANs) come from their significant improvements over traditional GAN architectures. They address critical challenges like mode collapse and enhance the generation of high-quality, diverse samples. The following are the key technical advancements in WGAN architecture that motivate me to create tutorials on WGAN:

  1. Wasserstein Distance: Shifts from traditional metrics to the Wasserstein distance for more meaningful training gradients, reducing mode collapse and stabilizing network convergence.
  2. Weight Clipping and Lipschitz Constraint: Initially, WGANs used weight clipping to meet the Lipschitz constraint for the Wasserstein distance, but this approach had drawbacks like capacity underuse and gradient problems. The WGAN-GP variant introduced a gradient penalty to overcome these issues, leading to better training stability and sample quality.
  3. Gradient Penalty (WGAN-GP): Incorporates a gradient penalty in the loss function, promoting stable training and high-quality output by preventing excessive critic gradients.
  4. Critic Role: Unlike traditional GANs' discriminators, WGAN critics assess generated sample quality on a continuous scale, enabling finer quality evaluation and aiding in model training dynamics.
  5. Training Protocol: WGANs employ a distinct training method, often involving more frequent training of the critic than the generator to provide effective gradients, ensuring balanced learning and model stability.

     These advancements make WGANs superior for generating realistic samples and ensuring smoother model training, maintaining their unique position in AI research and development.

Video Tutorials.

Part-1

Part-2

Part-3


Code - Training WGAN

# example of training a wgan on mnist
from numpy import expand_dims
import keras
import keras.backend as K
import tensorflow as tf
import numpy as np
from keras import Model
from keras.optimizers import Adam
from keras.layers import Input, Reshape, Flatten
from keras.layers import Dense, BatchNormalization, Conv2D, Conv2DTranspose, LeakyReLU, Dropout
batch_size = 32
input_shape = (28, 28, 1)
latent_dim = 100
img_shape = (28, 28, 1)
class WGAN_1:
def __init__(self):
print("welcome to WGAN coding")
# write code for wasserstein loss.
def wasserstein_loss(self, y_true, y_pred):
return K.mean(y_true * y_pred)

def preprocess_real_part_training_dataset(self):
# load mnist dataset
(dataX, dataY), (testDX, testDY) = keras.datasets.fashion_mnist.load_data()
# Select the first 1000 rows of training data and labels
dataX = dataX[:1000]
dataY = dataY[:1000]
# Add an additional dimension for the grayscale channel by using expand_dims() from NumPy
dataX = expand_dims(dataX, axis=-1)
# convert from unsigned ints to floats and scale from [0,255] to [0,1]
dataX = dataX.astype(np.float32) / 255.0
return dataX

# latent_dim = 100
# img_shape = (28, 28, 1)
def define_generator(self, latent_dim, img_shape):
inputs = Input(shape=latent_dim)
# Project and reshape the input
proj = Dense(128 * 7 * 7)(inputs)
proj = Reshape((7, 7, 128))(proj)
# Upsample to 14x14
upsample_1 = Conv2DTranspose(filters=128, kernel_size=4, strides=2, padding='same', activation=LeakyReLU(alpha=0.2),)(proj)
upsample_1 = BatchNormalization()(upsample_1)
# Upsample to 28x28
upsample_2 = Conv2DTranspose(filters=128, kernel_size=4, strides=2, padding='same', activation=LeakyReLU(alpha=0.2),)(upsample_1)
upsample_2 = BatchNormalization()(upsample_2)
# Generate output image (28x28x1)
gen_output = Conv2D(filters=img_shape[2], kernel_size=7, activation='tanh', padding='same')(upsample_2)
g_model = Model(inputs, gen_output)
g_model.summary()
# keras.utils.plot_model(g_model, to_file="g_model.png", show_shapes=True)
return g_model

# input_shape = (28, 28, 1)
def define_critic(self, input_shape):
inputs = Input(shape=input_shape)
# convolution layers
conv1 = Conv2D(filters=64, kernel_size=3, strides=2, activation=LeakyReLU(alpha=0.2), padding='same')(inputs)
conv1 = Dropout(0.4)(conv1)
conv1 = Conv2D(filters=128, kernel_size=3, strides=2, activation=LeakyReLU(alpha=0.2), padding='same')(conv1)
conv1 = Dropout(0.4)(conv1)
conv1 = Conv2D(filters=256, kernel_size=3, strides=2, activation=LeakyReLU(alpha=0.2), padding='same')(conv1)
conv1 = Dropout(0.4)(conv1)
# Flatten Layer
flatten_layer = Flatten()(conv1)
critic_decision_layer = Dense(1)(flatten_layer)
critic_model = Model(inputs, critic_decision_layer)
# compile model
optimizer = keras.optimizers.RMSprop(learning_rate=0.00005)
critic_model.compile(loss=self.wasserstein_loss, optimizer=optimizer, metrics=['accuracy'])
critic_model.summary()
# keras.utils.plot_model(critic_model, to_file="critic_model.png", show_shapes=True)
return critic_model

def define_wgan(self,latent_dim0, img_shape0):
# Define the input for the generator
latent_input = Input(shape=(latent_dim0,))
# Build the generator
generator_output = self.define_generator(latent_dim=latent_dim0,img_shape=img_shape0)(latent_input)
# Build the critic
critic_input = Input(shape=img_shape0)
critic_output = self.define_critic(input_shape=img_shape0)(critic_input)
# Compile the critic
critic = Model(critic_input, critic_output)
critic.compile(loss=self.wasserstein_loss, optimizer=Adam(lr=0.0002, beta_1=0.5))
# Make the critic not trainable
critic.trainable = False
# Combine the generator and critic
gan_output = critic(generator_output)
wgan_model = Model(latent_input, gan_output)
# Compile the GAN
wgan_model.compile(loss=self.wasserstein_loss, optimizer="adam")
wgan_model.summary()
# keras.utils.plot_model(wgan_model, to_file="wgan_model.png", show_shapes=True)
return wgan_model

def train_save_models(self, clip_value, n_critic, batch_size, input_shape, latent_dim, img_shape, n_epochs=2):
# manually enumerate epochs
trainX = self.preprocess_real_part_training_dataset()
g_model = self.define_generator(latent_dim=latent_dim,img_shape=img_shape)
critic_model = self.define_critic(input_shape)
wgan_main = self.define_wgan(latent_dim0=latent_dim,img_shape0=img_shape)
realY = -tf.ones(shape=(batch_size, 1))
fakeY = tf.ones(shape=(batch_size, 1))
for i in range(n_epochs):
for j in range(len(trainX) // batch_size):
# generate random noise as an input to initialize the generator
noise = tf.random.normal(shape=[batch_size, latent_dim], mean=0, stddev=1)
for _ in range(n_critic):
critic_model.trainable=True
# Real samples
X_real = trainX[j * batch_size : (j + 1) * batch_size]
Y_real = realY
d_loss_real = critic_model.train_on_batch(x = X_real,y = Y_real)
# fake samples
X_fake = g_model.predict_on_batch(noise)
Y_fake = fakeY
d_loss_fake = critic_model.train_on_batch(x = X_fake, y = Y_fake)
# Clip critic weights
for l in critic_model.layers:
weights = l.get_weights()
weights = [np.clip(w, (1-clip_value), clip_value) for w in weights]
l.set_weights(weights)
# Train Generator weights
critic_model.trainable = False
g_loss_batch = wgan_main.train_on_batch(x=noise, y=realY)
print("epoch = ",i,"//",n_epochs," batch = ", j," G_loss_batch ", g_loss_batch)

g_model.save("g_model.h5")
critic_model.save("critic_model.h5")
wgan_main.save("wgan_model.h5")

if __name__ == "__main__":
print ("Executed when invoked directly")
input_shape1 = (28, 28, 1)
img_shape1 = (28, 28, 1)
latent_dim1 = 100
n_critic = 5
clip_value = 0.01
# Create some dog objects
wgan1 = WGAN_1()
critic_model = wgan1.define_critic(input_shape=img_shape1)
g_model = wgan1.define_generator(latent_dim=latent_dim1, img_shape=img_shape1)
gan_model = wgan1.define_wgan(latent_dim0=latent_dim1,img_shape0=img_shape1)
wgan1.train_save_models(n_critic=n_critic, clip_value=clip_value,batch_size=32, input_shape=input_shape1,latent_dim=latent_dim1, img_shape=img_shape1,n_epochs=2)

Code - Testing Generator Model.

# example of loading the generator model and generating images
import numpy as np
from keras.models import load_model
from numpy.random import randn
from keras.models import load_model
from matplotlib import pyplot
import matplotlib.pyplot as plt
# load model
model = load_model('g_model.h5')
# Generate synthetic images
num_images = 10
latent_dim = 100
noise = np.random.normal(0, 1, (num_images, latent_dim))
generated_images = model.predict(noise)

# Plot the generated images
plt.figure(figsize=(10, 10))
for i in range(num_images):
plt.subplot(1, num_images, i+1)
plt.imshow(generated_images[i, :, :, 0], cmap='gray')
plt.axis('off')
plt.show()

Reference:

1. Wasserstein GAN; Martin Arjovsky (Courant Institute of Mathematical Sciences), Soumith Chintala, and Leon Bottou1 (Facebook AI Research) 

2. Ti, Yu. "Gradient Penalty Approach for Wasserstein Generative Adversarial Networks."

3. Kwon, Dohyun, Yeoneung Kim, Guido Montúfar, and Insoon Yang. "Training Wasserstein GANs without gradient penalties." arXiv preprint arXiv:2110.14150 (2021).

4. Guo, Xin, Johnny Hong, Tianyi Lin, and Nan Yang. "Relaxed Wasserstein with applications to GANs." In ICASSP 2021-2021 IEEE International Conference on Acoustics, Speech and Signal Processing (ICASSP), pp. 3325-3329. IEEE, 2021.

Sunday, March 17, 2024

Use of Long Text Sequences with LLM’s Trained on Shorter Text Sequences - ALiBi & RoFORMER

 

Introduction.

Training large language models (LLMs) on longer sequences poses challenges in computational resources, model complexity, gradient propagation, and overfitting. These include increased memory requirements due to self-attention mechanisms, longer training times, difficulty in scaling Transformers for very long sequences, challenges in capturing long-term dependencies, risk of vanishing or exploding gradients, and potential overfitting to training data. Solutions like linear biases, RoFormer, and RoPE improve handling of long-range dependencies, enhance model generalization, and incorporate positional information for better performance in NLP tasks. For Example:

Attention with linear Biases

Improved Handling of Long-Range Dependencies. Traditional attention mechanisms struggle with capturing long-range dependencies in text due to the quadratic increase in computational complexity with sequence length. Linear biases help to mitigate this by effectively incorporating positional information, thus enhancing the model’s ability to maintain context over long distances within the text. 

RoFormer

Improved Model Generalization: By more effectively encoding positional information, RoFormer helps LLMs to generalize better across different tasks and datasets. This results in enhanced performance on a wide range of NLP tasks, including text classification, machine translation, and semantic analysis. 
Enhanced Positional Encoding: RoPE uniquely integrates positional information with the token embeddings, preserving the relative distances between tokens. This method enables the model to better understand and utilize the order of words or tokens, which is crucial for many language understanding and generation tasks.

Video Tutorial -1

Video Tutorial -2

Video Tutorial -3



References.
  1. Su, Jianlin, Murtadha Ahmed, Yu Lu, Shengfeng Pan, Wen Bo, and Yunfeng Liu. "Roformer: Enhanced transformer with rotary position embedding." Neurocomputing 568 (2024): 127063.
  2. Press, Ofir, Noah A. Smith, and Mike Lewis. "Train short, test long: Attention with linear biases enables input length extrapolation." arXiv preprint arXiv:2108.12409 (2021).
  3. Vaswani, Ashish, Noam Shazeer, Niki Parmar, Jakob Uszkoreit, Llion Jones, Aidan N. Gomez, Łukasz Kaiser, and Illia Polosukhin. "Attention is all you need." Advances in neural information processing systems 30 (2017).
 

Sunday, March 3, 2024

Generative Adversarial Network (GAN) , DCGAN, Tutorial and Keras Implementation

 Introduction.

GANs, called Generative Adversarial Networks, are special types of deep learning models that have two main parts: a generator and a discriminator. The generator creates fake data, and the discriminator checks if this data looks real or not compared to real data. By training against each other, GANs get better at making data that looks real, changing how we make new images, expand datasets, and learn without supervision. The following points show the significance of GAN in the area of AI.
  • Creative Applications: GANs create realistic images, music, text, and videos, enabling creativity in art, content, and virtual environments.
  • Data Augmentation: GANs generate synthetic data to enhance small datasets, improving the performance of machine learning models.
  • Defense Against Deepfakes: GANs are used to develop defenses and detect manipulated media content amidst the growth of deepfake technology.
  • Drug Discovery and Molecular Design: GANs play an increasing role in drug discovery, producing novel molecular structures with desired properties, potentially transforming the pharmaceutical industry.

Scope.

This article comprises interactive video tutorials and code demonstrations to elucidate the GAN architecture. The discussion covers various topics, culminating in the presentation of straightforwardly designed code.

GAN Part-1


GAN Part-2

GAN Part-3.

Keras implementation of GAN 

The following contains the Kera implementation of Deep Convolution GAN (DCGAN). Please go through the above video tutorials, to properly understand and use the code. 
The system is built using Python 3.10 and relies on several essential library dependencies:
  • Tensorflow (version 2.15)
  • tqdm (version 4.66.2)
  • h5py (version 3.10)
  • Keras (version 2.115)

Train DCGAN.

# example of training a gan on mnist
from numpy import expand_dims
from tqdm import tqdm
import keras
import tensorflow as tf
import numpy as np
from keras import Model
from keras.optimizers import Adam
from keras.layers import Input, Reshape, Flatten
from keras.layers import Dense, BatchNormalization, Conv2D, Conv2DTranspose, LeakyReLU, Dropout
batch_size = 32
input_shape = (28, 28, 1)
latent_dim = 100
img_shape = (28, 28, 1)
class GAN_1:
def __init__(self):
print("welcome to GAN coding")
# This code prepares a TensorFlow dataset for training by shuffling the data, batching it into
# consistent batch sizes, and prefetching batches to optimize data loading during training.
def preprocess_real_part_training_dataset(self, batch_size):
# load mnist dataset
(dataX, dataY), (testDX, testDY) = keras.datasets.fashion_mnist.load_data()
# Add an additional dimension for the grayscale channel by using expand_dims() from NumPy
dataX = expand_dims(dataX, axis=-1)
# convert from unsigned ints to floats and scale from [0,255] to [0,1]
dataX = dataX.astype(np.float32) / 255.0
# testDX = testDX.astype(np.float32) / 255.0
trainX = tf.data.Dataset.from_tensor_slices(dataX).shuffle(1000)
# Combines consecutive elements of this dataset into batches.
trainX = trainX.batch(batch_size, drop_remainder=True).prefetch(1)
return trainX

# latent_dim = 100
# img_shape = (28, 28, 1)
def define_generator(self, latent_dim, img_shape):
inputs = Input(shape=latent_dim)
# Project and reshape the input
proj = Dense(128 * 7 * 7)(inputs)
proj = Reshape((7, 7, 128))(proj)
# Upsample to 14x14
upsample_1 = Conv2DTranspose(filters=128, kernel_size=4, strides=2, padding='same', activation=LeakyReLU(alpha=0.2),)(proj)
upsample_1 = BatchNormalization()(upsample_1)
# Upsample to 28x28
upsample_2 = Conv2DTranspose(filters=128, kernel_size=4, strides=2, padding='same', activation=LeakyReLU(alpha=0.2),)(upsample_1)
upsample_2 = BatchNormalization()(upsample_2)
# Generate output image (28x28x1)
gen_output = Conv2D(filters=img_shape[2], kernel_size=7, activation='sigmoid', padding='same')(upsample_2)
g_model = Model(inputs, gen_output)
# compile model
g_model.compile(loss='binary_crossentropy', optimizer=Adam(lr=0.0002, beta_1=0.5), metrics=['accuracy'])
g_model.summary()
return g_model

# input_shape = (28, 28, 1)
def define_descriminator(self, input_shape):
inputs = Input(shape=input_shape)
# convolution layers
conv1 = Conv2D(filters=64, kernel_size=3, strides=2, activation=LeakyReLU(alpha=0.2), padding='same')(inputs)
conv1 = Dropout(0.4)(conv1)
conv1 = Conv2D(filters=128, kernel_size=3, strides=2, activation=LeakyReLU(alpha=0.2), padding='same')(conv1)
conv1 = Dropout(0.4)(conv1)
conv1 = Conv2D(filters=256, kernel_size=3, strides=2, activation=LeakyReLU(alpha=0.2), padding='same')(conv1)
conv1 = Dropout(0.4)(conv1)
# Flatten Layer
flatten_layer = Flatten()(conv1)
discriminator_decision_layer = Dense(1, activation='sigmoid')(flatten_layer)
d_model = Model(inputs, discriminator_decision_layer)
# compile model
d_model.compile(loss='binary_crossentropy', optimizer=Adam(lr=0.0002, beta_1=0.5), metrics=['accuracy'])
d_model.summary()
return d_model

def define_gan(self,latent_dim0, img_shape0):
# Define the input for the generator
latent_input = Input(shape=(latent_dim0,))
# Build the generator
generator_output = self.define_generator(latent_dim=latent_dim0,img_shape=img_shape0)(latent_input)
# Build the discriminator
discriminator_input = Input(shape=img_shape0)
discriminator_output = self.define_descriminator(input_shape=img_shape0)(discriminator_input)
# Compile the discriminator
discriminator = Model(discriminator_input, discriminator_output)
discriminator.compile(loss="binary_crossentropy", optimizer=Adam(lr=0.0002, beta_1=0.5))
# Make the discriminator not trainable
discriminator.trainable = False
# Combine the generator and discriminator
gan_output = discriminator(generator_output)
gan_model = Model(latent_input, gan_output)
# Compile the GAN
gan_model.compile(loss="binary_crossentropy", optimizer="adam")
gan_model.summary()
return gan_model

def train_save_models(self, input_shape, latent_dim, img_shape, n_epochs=2, n_batch=256):
# manually enumerate epochs
g_model = self.define_generator(latent_dim=latent_dim,img_shape=img_shape)
d_model = self.define_descriminator(input_shape)
gan_main = self.define_gan(latent_dim0=latent_dim,img_shape0=img_shape)
for i in tqdm(range(n_epochs)):
print()
print("Epoch {}/{}".format(i + 1, n_epochs))
# enumerate batches over the training set
for X_batch in trainX:
# generate random noise as an input to initialize the generator
noise = tf.random.normal(shape=[batch_size, latent_dim])
generated_images = g_model(noise)
# print("shape of noise => ",np.shape(noise))
X_fake_and_real = tf.concat([generated_images, X_batch], axis=0)
y1 = tf.constant([[0.]] * batch_size + [[1.]] * batch_size)
d_loss = d_model.train_on_batch(x=X_fake_and_real,y=y1)
noise1 = tf.random.normal(shape=[batch_size, latent_dim])
# print("shape of noise1 => ", np.shape(noise1))
y2 = tf.constant([[1.]] * batch_size)
gan_loss = gan_main.train_on_batch(noise1, y2)
print("discriminator loss =>",d_loss, " Gan-Loss => ",gan_loss)
g_model.save("g_model.h5")
d_model.save("d_model.h5")
gan_main.save("gan_model.h5")

if __name__ == "__main__":
print ("Executed when invoked directly")
input_shape1 = (28, 28, 1)
img_shape1 = (28, 28, 1)
latent_dim1 = 100
# Create some dog objects
gan1 = GAN_1()
trainX = gan1.preprocess_real_part_training_dataset(batch_size=32)
d_model = gan1.define_descriminator(input_shape=img_shape1)
# visualkeras.layered_view(d_model)
# visualkeras.layered_view(d_model, legend=True)
g_model = gan1.define_generator(latent_dim=latent_dim1, img_shape=img_shape1)
gan_model = gan1.define_gan(latent_dim0=latent_dim1,img_shape0=img_shape1)
gan1.train_save_models(input_shape=input_shape1,latent_dim=latent_dim1, img_shape=img_shape1,n_epochs=50,n_batch=32)

Test the trained GAN model.

# example of loading the generator model and generating images
import numpy as np
from keras.models import load_model
from numpy.random import randn
from keras.models import load_model
from matplotlib import pyplot
import matplotlib.pyplot as plt
# load model
model = load_model('g_model.h5')
# Generate synthetic images
num_images = 10
latent_dim = 100
noise = np.random.normal(0, 1, (num_images, latent_dim))
generated_images = model.predict(noise)

# Plot the generated images
plt.figure(figsize=(10, 10))
for i in range(num_images):
plt.subplot(1, num_images, i+1)
plt.imshow(generated_images[i, :, :, 0], cmap='gray')
plt.axis('off')
plt.show()

NOTE: This code is not intended for any commercial use. It is created solely for simple educational purposes. 

Niraj Kumar