| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358 |
- import matplotlib.pyplot as plt
- import numpy as np
- import tensorflow as tf
- from sklearn.metrics import accuracy_score
- from tensorflow.keras import layers, losses
- from tensorflow.keras.models import Model
- import misc
- import defs
- import pyswarms as ps
- latent_dim = 64
- print("# GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))
- class AutoencoderMod(defs.Modulator):
- def __init__(self, autoencoder):
- super().__init__(2**autoencoder.N)
- self.autoencoder = autoencoder
- def forward(self, binary: np.ndarray) -> np.ndarray:
- reshaped = binary.reshape((-1, self.N))
- reshaped_ho = misc.bit_matrix2one_hot(reshaped)
- encoded = self.autoencoder.encoder(reshaped_ho)
- x = encoded.numpy()
- x2 = x * 2 - 1
- f = np.zeros(x2.shape[0])
- x3 = misc.rect2polar(np.c_[x2[:, 0], x2[:, 1], f])
- return x3
- class AutoencoderDemod(defs.Demodulator):
- def __init__(self, autoencoder):
- super().__init__(2**autoencoder.N)
- self.autoencoder = autoencoder
- def forward(self, values: np.ndarray) -> np.ndarray:
- rect = misc.polar2rect(values[:, [0, 1]])
- decoded = self.autoencoder.decoder(rect).numpy()
- result = misc.int2bit_array(decoded.argmax(axis=1), self.N)
- return result.reshape(-1, )
- class Autoencoder(Model):
- def __init__(self, N, noise):
- super(Autoencoder, self).__init__()
- self.N = N
- self.encoder = tf.keras.Sequential()
- self.encoder.add(tf.keras.Input(shape=(2 ** N,), dtype=bool))
- self.encoder.add(layers.Dense(units=2 ** (N + 1)))
- # self.encoder.add(layers.Dropout(0.2))
- self.encoder.add(layers.Dense(units=2 ** (N + 1)))
- self.encoder.add(layers.Dense(units=2, activation="sigmoid"))
- # self.encoder.add(layers.ReLU(max_value=1.0))
- self.decoder = tf.keras.Sequential()
- self.decoder.add(tf.keras.Input(shape=(2,)))
- self.decoder.add(layers.Dense(units=2 ** (N + 1)))
- # self.decoder.add(layers.Dropout(0.2))
- self.decoder.add(layers.Dense(units=2 ** (N + 1)))
- self.decoder.add(layers.Dense(units=2 ** N, activation="softmax"))
- # self.randomiser = tf.random_normal_initializer(mean=0.0, stddev=0.1, seed=None)
- self.mod = None
- self.demod = None
- self.compiled = False
- # Divide by 2 because encoder outputs values between 0 and 1 instead of -1 and 1
- self.noise = 10 ** (noise / 10) # / 2
- # self.decoder.add(layers.Softmax(units=4, dtype=bool))
- # [
- # layers.Input(shape=(28, 28, 1)),
- # layers.Conv2D(16, (3, 3), activation='relu', padding='same', strides=2),
- # layers.Conv2D(8, (3, 3), activation='relu', padding='same', strides=2)
- # ])
- # self.decoder = tf.keras.Sequential([
- # layers.Conv2DTranspose(8, kernel_size=3, strides=2, activation='relu', padding='same'),
- # layers.Conv2DTranspose(16, kernel_size=3, strides=2, activation='relu', padding='same'),
- # layers.Conv2D(1, kernel_size=(3, 3), activation='sigmoid', padding='same')
- # ])
- def call(self, x, **kwargs):
- encoded = self.encoder(x)
- encoded = encoded * 2 - 1
- # encoded = tf.clip_by_value(encoded, clip_value_min=0, clip_value_max=1, name=None)
- # noise = self.randomiser(shape=(-1, 2), dtype=tf.float32)
- noise = np.random.normal(0, 1, (1, 2)) * self.noise
- noisy = tf.convert_to_tensor(noise, dtype=tf.float32)
- decoded = self.decoder(encoded + noisy)
- return decoded
- def train(self, samples=1e6):
- if samples % self.N:
- samples += self.N - (samples % self.N)
- x_train = misc.generate_random_bit_array(samples).reshape((-1, self.N))
- x_train_ho = misc.bit_matrix2one_hot(x_train)
- x_test_array = misc.generate_random_bit_array(samples * 0.3)
- x_test = x_test_array.reshape((-1, self.N))
- x_test_ho = misc.bit_matrix2one_hot(x_test)
- if not self.compiled:
- self.compile(optimizer='adam', loss=losses.MeanSquaredError())
- self.compiled = True
- self.fit(x_train_ho, x_train_ho, shuffle=False, validation_data=(x_test_ho, x_test_ho))
- # encoded_data = self.encoder(x_test_ho)
- # decoded_data = self.decoder(encoded_data).numpy()
- def get_modulator(self):
- if self.mod is None:
- self.mod = AutoencoderMod(self)
- return self.mod
- def get_demodulator(self):
- if self.demod is None:
- self.demod = AutoencoderDemod(self)
- return self.demod
- def view_encoder(encoder, N, samples=1000):
- test_values = misc.generate_random_bit_array(samples).reshape((-1, N))
- test_values_ho = misc.bit_matrix2one_hot(test_values)
- mvector = np.array([2 ** i for i in range(N)], dtype=int)
- symbols = (test_values * mvector).sum(axis=1)
- encoded = encoder(test_values_ho).numpy()
- # encoded = misc.polar2rect(encoded)
- for i in range(2 ** N):
- xy = encoded[symbols == i]
- plt.plot(xy[:, 0], xy[:, 1], 'x', markersize=12, label=format(i, f'0{N}b'))
- plt.annotate(xy=[xy[:, 0].mean() + 0.01, xy[:, 1].mean() + 0.01], text=format(i, f'0{N}b'))
- plt.xlabel('Real')
- plt.ylabel('Imaginary')
- plt.title("Autoencoder generated alphabet")
- # plt.legend()
- plt.show()
- pass
- def test_batch_sizes(autoencoder, sizes):
- accuracy = []
- for batch_size in sizes:
- autoencoder.fit(x_train_ho, x_train_ho,
- epochs=1,
- batch_size=batch_size,
- shuffle=False,
- validation_data=(x_test_ho, x_test_ho))
- encoded_data = autoencoder.encoder(x_test_ho)
- decoded_data = autoencoder.decoder(encoded_data).numpy()
- result = misc.int2bit_array(decoded_data.argmax(axis=1), n)
- print("Accuracy: %.4f" % accuracy_score(x_test_array, result.reshape(-1, )))
- accuracy.append(accuracy_score(x_test_array, result.reshape(-1, )))
- plt.plot(sizes,accuracy)
- plt.xscale('log')
- plt.grid()
- plt.xlabel('batch size')
- plt.ylabel('Accuracy')
- plt.legend()
- plt.show()
- def test_epoch_sizes(autoencoder, sizes):
- accuracy = []
- for epoch_size in sizes:
- autoencoder.fit(x_train_ho, x_train_ho,
- epochs=epoch_size,
- shuffle=False,
- validation_data=(x_test_ho, x_test_ho))
- encoded_data = autoencoder.encoder(x_test_ho)
- decoded_data = autoencoder.decoder(encoded_data).numpy()
- result = misc.int2bit_array(decoded_data.argmax(axis=1), n)
- print("Accuracy: %.4f" % accuracy_score(x_test_array, result.reshape(-1, )))
- accuracy.append(accuracy_score(x_test_array, result.reshape(-1, )))
- plt.plot(sizes,accuracy)
- plt.xscale('log')
- plt.grid()
- plt.xlabel('epoch size')
- plt.ylabel('Accuracy')
- plt.legend()
- plt.show()
- def sigmoid(z):
- return 1. / (1. + np.exp(-z))
- # Forward propagation
- def logits_function(params, n, train):
- """Forward propagation as objective function
- This computes for the forward propagation of the neural network, as
- well as the loss. It receives a set of parameters that must be
- rolled-back into the corresponding weights and biases.
- Inputs
- ------
- params: np.ndarray
- The dimensions should include an unrolled version of the
- weights and biases.
- Returns
- -------
- float
- The computed negative log-likelihood loss given the parameters
- """
- # Neural network architecture
- n_inputs = 2 ** n
- n_hidden = 2 ** (n + 1)
- n_classes = 2
- # Roll-back the weights and biases
- W1 = params[0:n_inputs*n_hidden].reshape((n_inputs,n_hidden))
- b1 = params[n_inputs*n_hidden:n_inputs*n_hidden+n_hidden].reshape((n_hidden,))
- W2 = params[n_inputs*n_hidden+n_hidden:n_inputs*n_hidden+n_hidden+n_hidden*n_classes].reshape((n_hidden,n_classes))
- b2 = params[n_inputs*n_hidden+n_hidden+n_hidden*n_classes:n_inputs*n_hidden+n_hidden+n_hidden*n_classes+n_classes].reshape((n_classes,))
- # Perform forward propagation
- z1 = train.dot(W1) + b1 # Pre-activation in Layer 1
- a1 = np.tanh(z1) # Activation in Layer 1
- z2 = a1.dot(W2) + b2 # Pre-activation in Layer 2
- logits = z2 # Logits for Layer 2
- return logits
- # Forward propagation
- def forward_prop(params, n, train, samples):
- """Forward propagation as objective function
- This computes for the forward propagation of the neural network, as
- well as the loss.
- Inputs
- ------
- params: np.ndarray
- The dimensions should include an unrolled version of the
- weights and biases.
- Returns
- -------
- float
- The computed negative log-likelihood loss given the parameters
- """
- logits = logits_function(params, n, train)
- # Compute for the softmax of the logits
- exp_scores = np.exp(logits)
- probs = exp_scores / np.sum(exp_scores, axis=1, keepdims=True)
- # Compute for the negative log likelihood
- corect_logprobs = -np.log(probs[range(samples), train])
- loss = np.sum(corect_logprobs) / samples
- return loss
- def f(x, n, train, samples):
- """Higher-level method to do forward_prop in the
- whole swarm.
- Inputs
- ------
- x: numpy.ndarray of shape (n_particles, dimensions)
- The swarm that will perform the search
- Returns
- -------
- numpy.ndarray of shape (n_particles, )
- The computed loss for each particle
- """
- n_particles = x.shape[0]
- j = [forward_prop(x[i], n, train, samples) for i in range(n_particles)]
- return np.array(j)
- def predict(pos):
- """
- Use the trained weights to perform class predictions.
- Inputs
- ------
- pos: numpy.ndarray
- Position matrix found by the swarm. Will be rolled
- into weights and biases.
- """
- logits = logits_function(pos)
- y_pred = np.argmax(logits, axis=1)
- return y_pred
- if __name__ == '__main__':
- # (x_train, _), (x_test, _) = fashion_mnist.load_data()
- #
- # x_train = x_train.astype('float32') / 255.
- # x_test = x_test.astype('float32') / 255.
- #
- # print(f"Train data: {x_train.shape}")
- # print(f"Test data: {x_test.shape}")
- n = 4
- n_inputs = 2 ** n
- n_hidden = 2 ** (n + 1)
- n_classes = 2
- samples = 1e5
- x_train = misc.generate_random_bit_array(samples).reshape((-1, n))
- x_train_ho = misc.bit_matrix2one_hot(x_train)
- x_test_array = misc.generate_random_bit_array(samples * 0.3)
- x_test = x_test_array.reshape((-1, n))
- x_test_ho = misc.bit_matrix2one_hot(x_test)
- """
- # Initialize swarm
- options = {'c1': 0.5, 'c2': 0.3, 'w': 0.9}
- # Call instance of PSO
- dimensions = (n_inputs * n_hidden) + (n_hidden * n_classes) + n_hidden + n_classes
- optimizer = ps.single.GlobalBestPSO(n_particles=100, dimensions=dimensions, options=options)
- # Perform optimization
- cost, pos = optimizer.optimize(f(x, n, x_train_ho, samples), iters=1000)
- print((predict(pos) == x_train_ho).mean())
- pass
- """
- autoencoder = Autoencoder(n, -8)
- autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())
- batch_sizes = [10,20,30,40, 100,200,300,400,500, 1000,2000,3000,4000,5000]
- epoch_sizes = [1, 10, 20, 50, 100, 200, 500]
- test_epoch_sizes(autoencoder, epoch_sizes)
- pass
- """
- autoencoder.fit(x_train_ho, x_train_ho,
- epochs=1,
- shuffle=False,
- validation_data=(x_test_ho, x_test_ho))
- encoded_data = autoencoder.encoder(x_test_ho)
- decoded_data = autoencoder.decoder(encoded_data).numpy()
- result = misc.int2bit_array(decoded_data.argmax(axis=1), n)
- print("Accuracy: %.4f" % accuracy_score(x_test_array, result.reshape(-1, )))
- view_encoder(autoencoder.encoder, n)
- """
|