import matplotlib.pyplot as plt import numpy as np import tensorflow as tf from sklearn.metrics import accuracy_score from tensorflow.keras import layers, losses from tensorflow.keras.models import Model import misc import defs import pyswarms as ps latent_dim = 64 print("# GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU'))) class AutoencoderMod(defs.Modulator): def __init__(self, autoencoder): super().__init__(2**autoencoder.N) self.autoencoder = autoencoder def forward(self, binary: np.ndarray) -> np.ndarray: reshaped = binary.reshape((-1, self.N)) reshaped_ho = misc.bit_matrix2one_hot(reshaped) encoded = self.autoencoder.encoder(reshaped_ho) x = encoded.numpy() x2 = x * 2 - 1 f = np.zeros(x2.shape[0]) x3 = misc.rect2polar(np.c_[x2[:, 0], x2[:, 1], f]) return x3 class AutoencoderDemod(defs.Demodulator): def __init__(self, autoencoder): super().__init__(2**autoencoder.N) self.autoencoder = autoencoder def forward(self, values: np.ndarray) -> np.ndarray: rect = misc.polar2rect(values[:, [0, 1]]) decoded = self.autoencoder.decoder(rect).numpy() result = misc.int2bit_array(decoded.argmax(axis=1), self.N) return result.reshape(-1, ) class Autoencoder(Model): def __init__(self, N, noise): super(Autoencoder, self).__init__() self.N = N self.encoder = tf.keras.Sequential() self.encoder.add(tf.keras.Input(shape=(2 ** N,), dtype=bool)) self.encoder.add(layers.Dense(units=2 ** (N + 1))) # self.encoder.add(layers.Dropout(0.2)) self.encoder.add(layers.Dense(units=2 ** (N + 1))) self.encoder.add(layers.Dense(units=2, activation="sigmoid")) # self.encoder.add(layers.ReLU(max_value=1.0)) self.decoder = tf.keras.Sequential() self.decoder.add(tf.keras.Input(shape=(2,))) self.decoder.add(layers.Dense(units=2 ** (N + 1))) # self.decoder.add(layers.Dropout(0.2)) self.decoder.add(layers.Dense(units=2 ** (N + 1))) self.decoder.add(layers.Dense(units=2 ** N, activation="softmax")) # self.randomiser = tf.random_normal_initializer(mean=0.0, stddev=0.1, seed=None) self.mod = None self.demod = None self.compiled = False # Divide by 2 because encoder outputs values between 0 and 1 instead of -1 and 1 self.noise = 10 ** (noise / 10) # / 2 # self.decoder.add(layers.Softmax(units=4, dtype=bool)) # [ # layers.Input(shape=(28, 28, 1)), # layers.Conv2D(16, (3, 3), activation='relu', padding='same', strides=2), # layers.Conv2D(8, (3, 3), activation='relu', padding='same', strides=2) # ]) # self.decoder = tf.keras.Sequential([ # layers.Conv2DTranspose(8, kernel_size=3, strides=2, activation='relu', padding='same'), # layers.Conv2DTranspose(16, kernel_size=3, strides=2, activation='relu', padding='same'), # layers.Conv2D(1, kernel_size=(3, 3), activation='sigmoid', padding='same') # ]) def call(self, x, **kwargs): encoded = self.encoder(x) encoded = encoded * 2 - 1 # encoded = tf.clip_by_value(encoded, clip_value_min=0, clip_value_max=1, name=None) # noise = self.randomiser(shape=(-1, 2), dtype=tf.float32) noise = np.random.normal(0, 1, (1, 2)) * self.noise noisy = tf.convert_to_tensor(noise, dtype=tf.float32) decoded = self.decoder(encoded + noisy) return decoded def train(self, samples=1e6): if samples % self.N: samples += self.N - (samples % self.N) x_train = misc.generate_random_bit_array(samples).reshape((-1, self.N)) x_train_ho = misc.bit_matrix2one_hot(x_train) x_test_array = misc.generate_random_bit_array(samples * 0.3) x_test = x_test_array.reshape((-1, self.N)) x_test_ho = misc.bit_matrix2one_hot(x_test) if not self.compiled: self.compile(optimizer='adam', loss=losses.MeanSquaredError()) self.compiled = True self.fit(x_train_ho, x_train_ho, shuffle=False, validation_data=(x_test_ho, x_test_ho)) # encoded_data = self.encoder(x_test_ho) # decoded_data = self.decoder(encoded_data).numpy() def get_modulator(self): if self.mod is None: self.mod = AutoencoderMod(self) return self.mod def get_demodulator(self): if self.demod is None: self.demod = AutoencoderDemod(self) return self.demod def view_encoder(encoder, N, samples=1000): test_values = misc.generate_random_bit_array(samples).reshape((-1, N)) test_values_ho = misc.bit_matrix2one_hot(test_values) mvector = np.array([2 ** i for i in range(N)], dtype=int) symbols = (test_values * mvector).sum(axis=1) encoded = encoder(test_values_ho).numpy() # encoded = misc.polar2rect(encoded) for i in range(2 ** N): xy = encoded[symbols == i] plt.plot(xy[:, 0], xy[:, 1], 'x', markersize=12, label=format(i, f'0{N}b')) plt.annotate(xy=[xy[:, 0].mean() + 0.01, xy[:, 1].mean() + 0.01], text=format(i, f'0{N}b')) plt.xlabel('Real') plt.ylabel('Imaginary') plt.title("Autoencoder generated alphabet") # plt.legend() plt.show() pass def test_batch_sizes(autoencoder, sizes): accuracy = [] for batch_size in sizes: autoencoder.fit(x_train_ho, x_train_ho, epochs=1, batch_size=batch_size, shuffle=False, validation_data=(x_test_ho, x_test_ho)) encoded_data = autoencoder.encoder(x_test_ho) decoded_data = autoencoder.decoder(encoded_data).numpy() result = misc.int2bit_array(decoded_data.argmax(axis=1), n) print("Accuracy: %.4f" % accuracy_score(x_test_array, result.reshape(-1, ))) accuracy.append(accuracy_score(x_test_array, result.reshape(-1, ))) plt.plot(sizes,accuracy) plt.xscale('log') plt.grid() plt.xlabel('batch size') plt.ylabel('Accuracy') plt.legend() plt.show() def test_epoch_sizes(autoencoder, sizes): accuracy = [] for epoch_size in sizes: autoencoder.fit(x_train_ho, x_train_ho, epochs=epoch_size, shuffle=False, validation_data=(x_test_ho, x_test_ho)) encoded_data = autoencoder.encoder(x_test_ho) decoded_data = autoencoder.decoder(encoded_data).numpy() result = misc.int2bit_array(decoded_data.argmax(axis=1), n) print("Accuracy: %.4f" % accuracy_score(x_test_array, result.reshape(-1, ))) accuracy.append(accuracy_score(x_test_array, result.reshape(-1, ))) plt.plot(sizes,accuracy) plt.xscale('log') plt.grid() plt.xlabel('epoch size') plt.ylabel('Accuracy') plt.legend() plt.show() def sigmoid(z): return 1. / (1. + np.exp(-z)) # Forward propagation def logits_function(params, n, train): """Forward propagation as objective function This computes for the forward propagation of the neural network, as well as the loss. It receives a set of parameters that must be rolled-back into the corresponding weights and biases. Inputs ------ params: np.ndarray The dimensions should include an unrolled version of the weights and biases. Returns ------- float The computed negative log-likelihood loss given the parameters """ # Neural network architecture n_inputs = 2 ** n n_hidden = 2 ** (n + 1) n_classes = 2 # Roll-back the weights and biases W1 = params[0:n_inputs*n_hidden].reshape((n_inputs,n_hidden)) b1 = params[n_inputs*n_hidden:n_inputs*n_hidden+n_hidden].reshape((n_hidden,)) W2 = params[n_inputs*n_hidden+n_hidden:n_inputs*n_hidden+n_hidden+n_hidden*n_classes].reshape((n_hidden,n_classes)) b2 = params[n_inputs*n_hidden+n_hidden+n_hidden*n_classes:n_inputs*n_hidden+n_hidden+n_hidden*n_classes+n_classes].reshape((n_classes,)) # Perform forward propagation z1 = train.dot(W1) + b1 # Pre-activation in Layer 1 a1 = np.tanh(z1) # Activation in Layer 1 z2 = a1.dot(W2) + b2 # Pre-activation in Layer 2 logits = z2 # Logits for Layer 2 return logits # Forward propagation def forward_prop(params, n, train, samples): """Forward propagation as objective function This computes for the forward propagation of the neural network, as well as the loss. Inputs ------ params: np.ndarray The dimensions should include an unrolled version of the weights and biases. Returns ------- float The computed negative log-likelihood loss given the parameters """ logits = logits_function(params, n, train) # Compute for the softmax of the logits exp_scores = np.exp(logits) probs = exp_scores / np.sum(exp_scores, axis=1, keepdims=True) # Compute for the negative log likelihood corect_logprobs = -np.log(probs[range(samples), train]) loss = np.sum(corect_logprobs) / samples return loss def f(x, n, train, samples): """Higher-level method to do forward_prop in the whole swarm. Inputs ------ x: numpy.ndarray of shape (n_particles, dimensions) The swarm that will perform the search Returns ------- numpy.ndarray of shape (n_particles, ) The computed loss for each particle """ n_particles = x.shape[0] j = [forward_prop(x[i], n, train, samples) for i in range(n_particles)] return np.array(j) def predict(pos): """ Use the trained weights to perform class predictions. Inputs ------ pos: numpy.ndarray Position matrix found by the swarm. Will be rolled into weights and biases. """ logits = logits_function(pos) y_pred = np.argmax(logits, axis=1) return y_pred if __name__ == '__main__': # (x_train, _), (x_test, _) = fashion_mnist.load_data() # # x_train = x_train.astype('float32') / 255. # x_test = x_test.astype('float32') / 255. # # print(f"Train data: {x_train.shape}") # print(f"Test data: {x_test.shape}") n = 4 n_inputs = 2 ** n n_hidden = 2 ** (n + 1) n_classes = 2 samples = 1e5 x_train = misc.generate_random_bit_array(samples).reshape((-1, n)) x_train_ho = misc.bit_matrix2one_hot(x_train) x_test_array = misc.generate_random_bit_array(samples * 0.3) x_test = x_test_array.reshape((-1, n)) x_test_ho = misc.bit_matrix2one_hot(x_test) """ # Initialize swarm options = {'c1': 0.5, 'c2': 0.3, 'w': 0.9} # Call instance of PSO dimensions = (n_inputs * n_hidden) + (n_hidden * n_classes) + n_hidden + n_classes optimizer = ps.single.GlobalBestPSO(n_particles=100, dimensions=dimensions, options=options) # Perform optimization cost, pos = optimizer.optimize(f(x, n, x_train_ho, samples), iters=1000) print((predict(pos) == x_train_ho).mean()) pass """ autoencoder = Autoencoder(n, -8) autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError()) batch_sizes = [10,20,30,40, 100,200,300,400,500, 1000,2000,3000,4000,5000] epoch_sizes = [1, 10, 20, 50, 100, 200, 500] test_epoch_sizes(autoencoder, epoch_sizes) pass """ autoencoder.fit(x_train_ho, x_train_ho, epochs=1, shuffle=False, validation_data=(x_test_ho, x_test_ho)) encoded_data = autoencoder.encoder(x_test_ho) decoded_data = autoencoder.decoder(encoded_data).numpy() result = misc.int2bit_array(decoded_data.argmax(axis=1), n) print("Accuracy: %.4f" % accuracy_score(x_test_array, result.reshape(-1, ))) view_encoder(autoencoder.encoder, n) """