import matplotlib.pyplot as plt import numpy as np import tensorflow as tf from sklearn.metrics import accuracy_score from sklearn.model_selection import train_test_split from tensorflow.keras import layers, losses from tensorflow.keras.models import Model from tensorflow.python.keras.layers import LeakyReLU, ReLU from functools import partial import misc import defs from models import basic import os latent_dim = 64 print("# GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU'))) class AutoencoderMod(defs.Modulator): def __init__(self, autoencoder): super().__init__(2 ** autoencoder.N) self.autoencoder = autoencoder def forward(self, binary: np.ndarray) -> defs.Signal: reshaped = binary.reshape((-1, self.N)) reshaped_ho = misc.bit_matrix2one_hot(reshaped) encoded = self.autoencoder.encoder(reshaped_ho) x = encoded.numpy() x2 = x * 2 - 1 f = np.zeros(x2.shape[0]) x3 = misc.rect2polar(np.c_[x2[:, 0], x2[:, 1], f]) return defs.Signal(x3) class AutoencoderDemod(defs.Demodulator): def __init__(self, autoencoder): super().__init__(2 ** autoencoder.N) self.autoencoder = autoencoder def forward(self, values: defs.Signal) -> np.ndarray: decoded = self.autoencoder.decoder(values.rect).numpy() result = misc.int2bit_array(decoded.argmax(axis=1), self.N) return result.reshape(-1, ) class Autoencoder(Model): def __init__(self, N, channel, signal_dim=2): super(Autoencoder, self).__init__() self.N = N self.encoder = tf.keras.Sequential() self.encoder.add(tf.keras.Input(shape=(2 ** N,), dtype=bool)) self.encoder.add(layers.Dense(units=2 ** (N + 1))) self.encoder.add(LeakyReLU(alpha=0.001)) # self.encoder.add(layers.Dropout(0.2)) self.encoder.add(layers.Dense(units=2 ** (N + 1))) self.encoder.add(LeakyReLU(alpha=0.001)) self.encoder.add(layers.Dense(units=signal_dim, activation="tanh")) # self.encoder.add(layers.ReLU(max_value=1.0)) self.decoder = tf.keras.Sequential() self.decoder.add(tf.keras.Input(shape=(signal_dim,))) self.decoder.add(layers.Dense(units=2 ** (N + 1))) # leaky relu with alpha=1 gives by far best results self.decoder.add(LeakyReLU(alpha=1)) self.decoder.add(layers.Dense(units=2 ** N, activation="softmax")) # self.randomiser = tf.random_normal_initializer(mean=0.0, stddev=0.1, seed=None) self.mod = None self.demod = None self.compiled = False if isinstance(channel, int) or isinstance(channel, float): self.channel = basic.AWGNChannel(channel) else: if not hasattr(channel, 'forward_tensor'): raise ValueError("Channel has no forward_tensor function") if not callable(channel.forward_tensor): raise ValueError("Channel.forward_tensor is not callable") self.channel = channel # self.decoder.add(layers.Softmax(units=4, dtype=bool)) # [ # layers.Input(shape=(28, 28, 1)), # layers.Conv2D(16, (3, 3), activation='relu', padding='same', strides=2), # layers.Conv2D(8, (3, 3), activation='relu', padding='same', strides=2) # ]) # self.decoder = tf.keras.Sequential([ # layers.Conv2DTranspose(8, kernel_size=3, strides=2, activation='relu', padding='same'), # layers.Conv2DTranspose(16, kernel_size=3, strides=2, activation='relu', padding='same'), # layers.Conv2D(1, kernel_size=(3, 3), activation='sigmoid', padding='same') # ]) def call(self, x, **kwargs): signal = self.encoder(x) signal = signal * 2 - 1 signal = self.channel.forward_tensor(signal) # encoded = encoded * 2 - 1 # encoded = tf.clip_by_value(encoded, clip_value_min=0, clip_value_max=1, name=None) # noise = self.randomiser(shape=(-1, 2), dtype=tf.float32) # noise = np.random.normal(0, 1, (1, 2)) * self.noise # noisy = tf.convert_to_tensor(noise, dtype=tf.float32) decoded = self.decoder(signal) return decoded def fit_encoder(self, modulation, sample_size, train_size=0.8, epochs=1, batch_size=1, shuffle=False): alphabet = basic.load_alphabet(modulation, polar=False) if not alphabet.shape[0] == self.N ** 2: raise Exception("Cardinality of modulation scheme is different from cardinality of autoencoder!") x_train = np.random.randint(self.N ** 2, size=int(sample_size * train_size)) y_train = alphabet[x_train] x_train_ho = np.zeros((int(sample_size * train_size), self.N ** 2)) for idx, x in np.ndenumerate(x_train): x_train_ho[idx, x] = 1 x_test = np.random.randint(self.N ** 2, size=int(sample_size * (1 - train_size))) y_test = alphabet[x_test] x_test_ho = np.zeros((int(sample_size * (1 - train_size)), self.N ** 2)) for idx, x in np.ndenumerate(x_test): x_test_ho[idx, x] = 1 self.encoder.compile(optimizer='adam', loss=tf.keras.losses.MeanSquaredError()) self.encoder.fit(x_train_ho, y_train, epochs=epochs, batch_size=batch_size, shuffle=shuffle, validation_data=(x_test_ho, y_test)) def fit_decoder(self, modulation, samples): samples = int(samples * 1.3) demod = basic.AlphabetDemod(modulation, 0) x = np.random.rand(samples, 2) * 2 - 1 x = x.reshape((-1, 2)) f = np.zeros(x.shape[0]) xf = np.c_[x[:, 0], x[:, 1], f] y = demod.forward(defs.Signal(misc.rect2polar(xf))) y_ho = misc.bit_matrix2one_hot(y.reshape((-1, 4))) X_train, X_test, y_train, y_test = train_test_split(x, y_ho) self.decoder.compile(optimizer='adam', loss=tf.keras.losses.MeanSquaredError()) self.decoder.fit(X_train, y_train, shuffle=False, validation_data=(X_test, y_test)) y_pred = self.decoder(X_test).numpy() y_pred2 = np.zeros(y_test.shape, dtype=bool) y_pred2[np.arange(y_pred2.shape[0]), np.argmax(y_pred, axis=1)] = True print("Decoder accuracy: %.4f" % accuracy_score(y_pred2, y_test)) def train(self, samples=1e6): if samples % self.N: samples += self.N - (samples % self.N) x_train = misc.generate_random_bit_array(samples).reshape((-1, self.N)) x_train_ho = misc.bit_matrix2one_hot(x_train) test_samples = samples * 0.3 if test_samples % self.N: test_samples += self.N - (test_samples % self.N) x_test_array = misc.generate_random_bit_array(test_samples) x_test = x_test_array.reshape((-1, self.N)) x_test_ho = misc.bit_matrix2one_hot(x_test) if not self.compiled: self.compile(optimizer='adam', loss=losses.MeanSquaredError()) self.compiled = True self.fit(x_train_ho, x_train_ho, shuffle=False, validation_data=(x_test_ho, x_test_ho)) # encoded_data = self.encoder(x_test_ho) # decoded_data = self.decoder(encoded_data).numpy() def get_modulator(self): if self.mod is None: self.mod = AutoencoderMod(self) return self.mod def get_demodulator(self): if self.demod is None: self.demod = AutoencoderDemod(self) return self.demod def view_encoder(encoder, N, samples=1000): test_values = misc.generate_random_bit_array(samples).reshape((-1, N)) test_values_ho = misc.bit_matrix2one_hot(test_values) mvector = np.array([2 ** i for i in range(N)], dtype=int) symbols = (test_values * mvector).sum(axis=1) encoded = encoder(test_values_ho).numpy() # encoded = misc.polar2rect(encoded) for i in range(2 ** N): xy = encoded[symbols == i] plt.plot(xy[:, 0], xy[:, 1], 'x', markersize=12, label=format(i, f'0{N}b')) plt.annotate(xy=[xy[:, 0].mean() + 0.01, xy[:, 1].mean() + 0.01], text=format(i, f'0{N}b')) plt.xlabel('Real') plt.ylabel('Imaginary') plt.title("Autoencoder generated alphabet") # plt.legend() plt.show() pass if __name__ == '__main__': # (x_train, _), (x_test, _) = fashion_mnist.load_data() # # x_train = x_train.astype('float32') / 255. # x_test = x_test.astype('float32') / 255. # # print(f"Train data: {x_train.shape}") # print(f"Test data: {x_test.shape}") n = 4 # samples = 1e6 # x_train = misc.generate_random_bit_array(samples).reshape((-1, n)) # x_train_ho = misc.bit_matrix2one_hot(x_train) # x_test_array = misc.generate_random_bit_array(samples * 0.3) # x_test = x_test_array.reshape((-1, n)) # x_test_ho = misc.bit_matrix2one_hot(x_test) autoencoder = Autoencoder(n, -8) autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError()) autoencoder.fit_encoder(modulation='16qam', sample_size=2e6, train_size=0.8, epochs=1, batch_size=256, shuffle=True) view_encoder(autoencoder.encoder, n) autoencoder.fit_decoder(modulation='16qam', samples=2e6) autoencoder.train() view_encoder(autoencoder.encoder, n) # view_encoder(autoencoder.encoder, n) # view_encoder(autoencoder.encoder, n) # autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError()) # # autoencoder.fit(x_train_ho, x_train_ho, # epochs=1, # shuffle=False, # validation_data=(x_test_ho, x_test_ho)) # # encoded_data = autoencoder.encoder(x_test_ho) # decoded_data = autoencoder.decoder(encoded_data).numpy() # # result = misc.int2bit_array(decoded_data.argmax(axis=1), n) # print("Accuracy: %.4f" % accuracy_score(x_test_array, result.reshape(-1, ))) # view_encoder(autoencoder.encoder, n) pass