| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290 |
- import matplotlib.pyplot as plt
- import numpy as np
- import tensorflow as tf
- from sklearn.metrics import accuracy_score
- from sklearn.model_selection import train_test_split
- from tensorflow.keras import layers, losses
- from tensorflow.keras.models import Model
- from tensorflow.python.keras.layers import LeakyReLU, ReLU
- # from functools import partial
- import misc
- import defs
- from models import basic
- import os
- # from tensorflow_model_optimization.python.core.quantization.keras import quantize, quantize_aware_activation
- from models.data import BinaryOneHotGenerator
- latent_dim = 64
- print("# GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))
- class AutoencoderMod(defs.Modulator):
- def __init__(self, autoencoder):
- super().__init__(2 ** autoencoder.N)
- self.autoencoder = autoencoder
- def forward(self, binary: np.ndarray):
- reshaped = binary.reshape((-1, (self.N * self.autoencoder.parallel)))
- reshaped_ho = misc.bit_matrix2one_hot(reshaped)
- encoded = self.autoencoder.encoder(reshaped_ho)
- x = encoded.numpy()
- if self.autoencoder.bipolar:
- x = x * 2 - 1
- if self.autoencoder.parallel > 1:
- x = x.reshape((-1, self.autoencoder.signal_dim))
- f = np.zeros(x.shape[0])
- if self.autoencoder.signal_dim <= 1:
- p = np.zeros(x.shape[0])
- else:
- p = x[:, 1]
- x3 = misc.rect2polar(np.c_[x[:, 0], p, f])
- return basic.RFSignal(x3)
- class AutoencoderDemod(defs.Demodulator):
- def __init__(self, autoencoder):
- super().__init__(2 ** autoencoder.N)
- self.autoencoder = autoencoder
- def forward(self, values: defs.Signal) -> np.ndarray:
- if self.autoencoder.signal_dim <= 1:
- val = values.rect_x
- else:
- val = values.rect
- if self.autoencoder.parallel > 1:
- val = val.reshape((-1, self.autoencoder.parallel))
- decoded = self.autoencoder.decoder(val).numpy()
- result = misc.int2bit_array(decoded.argmax(axis=1), self.N * self.autoencoder.parallel)
- return result.reshape(-1, )
- class Autoencoder(Model):
- def __init__(self, N, channel, signal_dim=2, parallel=1, all_onehot=True, bipolar=True):
- super(Autoencoder, self).__init__()
- self.N = N
- self.parallel = parallel
- self.signal_dim = signal_dim
- self.bipolar = bipolar
- self._input_shape = 2 ** (N * parallel) if all_onehot else (2 ** N) * parallel
- self.encoder = tf.keras.Sequential()
- self.encoder.add(layers.Input(shape=(self._input_shape,)))
- self.encoder.add(layers.Dense(units=2 ** (N + 1)))
- self.encoder.add(LeakyReLU(alpha=0.001))
- # self.encoder.add(layers.Dropout(0.2))
- self.encoder.add(layers.Dense(units=2 ** (N + 1)))
- self.encoder.add(LeakyReLU(alpha=0.001))
- self.encoder.add(layers.Dense(units=signal_dim * parallel, activation="sigmoid"))
- # self.encoder.add(layers.ReLU(max_value=1.0))
- # self.encoder = quantize.quantize_model(self.encoder)
- self.decoder = tf.keras.Sequential()
- self.decoder.add(tf.keras.Input(shape=(signal_dim * parallel,)))
- self.decoder.add(layers.Dense(units=2 ** (N + 1)))
- # self.encoder.add(LeakyReLU(alpha=0.001))
- # self.decoder.add(layers.Dense(units=2 ** (N + 1)))
- # leaky relu with alpha=1 gives by far best results
- self.decoder.add(LeakyReLU(alpha=1))
- self.decoder.add(layers.Dense(units=self._input_shape, activation="softmax"))
- # self.randomiser = tf.random_normal_initializer(mean=0.0, stddev=0.1, seed=None)
- self.mod = None
- self.demod = None
- self.compiled = False
- if isinstance(channel, int) or isinstance(channel, float):
- self.channel = basic.AWGNChannel(channel)
- else:
- if not hasattr(channel, 'forward_tensor'):
- raise ValueError("Channel has no forward_tensor function")
- if not callable(channel.forward_tensor):
- raise ValueError("Channel.forward_tensor is not callable")
- self.channel = channel
- # self.decoder.add(layers.Softmax(units=4, dtype=bool))
- # [
- # layers.Input(shape=(28, 28, 1)),
- # layers.Conv2D(16, (3, 3), activation='relu', padding='same', strides=2),
- # layers.Conv2D(8, (3, 3), activation='relu', padding='same', strides=2)
- # ])
- # self.decoder = tf.keras.Sequential([
- # layers.Conv2DTranspose(8, kernel_size=3, strides=2, activation='relu', padding='same'),
- # layers.Conv2DTranspose(16, kernel_size=3, strides=2, activation='relu', padding='same'),
- # layers.Conv2D(1, kernel_size=(3, 3), activation='sigmoid', padding='same')
- # ])
- @property
- def all_layers(self):
- return self.layers[0].layers + self.layers[1].layers
- def call(self, x, **kwargs):
- signal = self.encoder(x)
- if self.bipolar:
- signal = signal * 2 - 1
- else:
- signal = tf.clip_by_value(signal, 0, 1)
- signal = self.channel.forward_tensor(signal)
- # encoded = encoded * 2 - 1
- # encoded = tf.clip_by_value(encoded, clip_value_min=0, clip_value_max=1, name=None)
- # noise = self.randomiser(shape=(-1, 2), dtype=tf.float32)
- # noise = np.random.normal(0, 1, (1, 2)) * self.noise
- # noisy = tf.convert_to_tensor(noise, dtype=tf.float32)
- decoded = self.decoder(signal)
- return decoded
- def fit_encoder(self, modulation, sample_size, train_size=0.8, epochs=1, batch_size=1, shuffle=False):
- alphabet = basic.load_alphabet(modulation, polar=False)
- if not alphabet.shape[0] == self.N ** 2:
- raise Exception("Cardinality of modulation scheme is different from cardinality of autoencoder!")
- x_train = np.random.randint(self.N ** 2, size=int(sample_size * train_size))
- y_train = alphabet[x_train]
- x_train_ho = np.zeros((int(sample_size * train_size), self.N ** 2))
- for idx, x in np.ndenumerate(x_train):
- x_train_ho[idx, x] = 1
- x_test = np.random.randint(self.N ** 2, size=int(sample_size * (1 - train_size)))
- y_test = alphabet[x_test]
- x_test_ho = np.zeros((int(sample_size * (1 - train_size)), self.N ** 2))
- for idx, x in np.ndenumerate(x_test):
- x_test_ho[idx, x] = 1
- self.encoder.compile(optimizer='adam', loss=tf.keras.losses.MeanSquaredError())
- self.encoder.fit(x_train_ho, y_train,
- epochs=epochs,
- batch_size=batch_size,
- shuffle=shuffle,
- validation_data=(x_test_ho, y_test))
- def fit_decoder(self, modulation, samples):
- samples = int(samples * 1.3)
- demod = basic.AlphabetDemod(modulation, 0)
- x = np.random.rand(samples, 2) * 2 - 1
- x = x.reshape((-1, 2))
- f = np.zeros(x.shape[0])
- xf = np.c_[x[:, 0], x[:, 1], f]
- y = demod.forward(basic.RFSignal(misc.rect2polar(xf)))
- y_ho = misc.bit_matrix2one_hot(y.reshape((-1, 4)))
- X_train, X_test, y_train, y_test = train_test_split(x, y_ho)
- self.decoder.compile(optimizer='adam', loss=tf.keras.losses.MeanSquaredError())
- self.decoder.fit(X_train, y_train, shuffle=False, validation_data=(X_test, y_test))
- y_pred = self.decoder(X_test).numpy()
- y_pred2 = np.zeros(y_test.shape, dtype=bool)
- y_pred2[np.arange(y_pred2.shape[0]), np.argmax(y_pred, axis=1)] = True
- print("Decoder accuracy: %.4f" % accuracy_score(y_pred2, y_test))
- def train(self, epoch_size=3e3, epochs=5):
- m = self.N * self.parallel
- x_train = BinaryOneHotGenerator(size=epoch_size, shape=m)
- x_test = BinaryOneHotGenerator(size=epoch_size*.3, shape=m)
- # test_samples = epoch_size
- # if test_samples % m:
- # test_samples += m - (test_samples % m)
- # x_test_array = misc.generate_random_bit_array(test_samples)
- # x_test = x_test_array.reshape((-1, m))
- # x_test_ho = misc.bit_matrix2one_hot(x_test)
- if not self.compiled:
- self.compile(optimizer='adam', loss=losses.MeanSquaredError())
- self.compiled = True
- # self.build((self._input_shape, -1))
- # self.summary()
- self.fit(x_train, shuffle=False, validation_data=x_test, epochs=epochs)
- # encoded_data = self.encoder(x_test_ho)
- # decoded_data = self.decoder(encoded_data).numpy()
- def get_modulator(self):
- if self.mod is None:
- self.mod = AutoencoderMod(self)
- return self.mod
- def get_demodulator(self):
- if self.demod is None:
- self.demod = AutoencoderDemod(self)
- return self.demod
- def view_encoder(encoder, N, samples=1000, title="Autoencoder generated alphabet"):
- test_values = misc.generate_random_bit_array(samples).reshape((-1, N))
- test_values_ho = misc.bit_matrix2one_hot(test_values)
- mvector = np.array([2 ** i for i in range(N)], dtype=int)
- symbols = (test_values * mvector).sum(axis=1)
- encoded = encoder(test_values_ho).numpy()
- if encoded.shape[1] == 1:
- encoded = np.c_[encoded, np.zeros(encoded.shape[0])]
- # encoded = misc.polar2rect(encoded)
- for i in range(2 ** N):
- xy = encoded[symbols == i]
- plt.plot(xy[:, 0], xy[:, 1], 'x', markersize=12, label=format(i, f'0{N}b'))
- plt.annotate(xy=[xy[:, 0].mean() + 0.01, xy[:, 1].mean() + 0.01], text=format(i, f'0{N}b'))
- plt.xlabel('Real')
- plt.ylabel('Imaginary')
- plt.title(title)
- # plt.legend()
- plt.show()
- pass
- if __name__ == '__main__':
- # (x_train, _), (x_test, _) = fashion_mnist.load_data()
- #
- # x_train = x_train.astype('float32') / 255.
- # x_test = x_test.astype('float32') / 255.
- #
- # print(f"Train data: {x_train.shape}")
- # print(f"Test data: {x_test.shape}")
- n = 4
- # samples = 1e6
- # x_train = misc.generate_random_bit_array(samples).reshape((-1, n))
- # x_train_ho = misc.bit_matrix2one_hot(x_train)
- # x_test_array = misc.generate_random_bit_array(samples * 0.3)
- # x_test = x_test_array.reshape((-1, n))
- # x_test_ho = misc.bit_matrix2one_hot(x_test)
- autoencoder = Autoencoder(n, -15)
- autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())
- # autoencoder.fit_encoder(modulation='16qam',
- # sample_size=2e6,
- # train_size=0.8,
- # epochs=1,
- # batch_size=256,
- # shuffle=True)
- # view_encoder(autoencoder.encoder, n)
- # autoencoder.fit_decoder(modulation='16qam', samples=2e6)
- autoencoder.train()
- view_encoder(autoencoder.encoder, n)
- # view_encoder(autoencoder.encoder, n)
- # view_encoder(autoencoder.encoder, n)
- # autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())
- #
- # autoencoder.fit(x_train_ho, x_train_ho,
- # epochs=1,
- # shuffle=False,
- # validation_data=(x_test_ho, x_test_ho))
- #
- # encoded_data = autoencoder.encoder(x_test_ho)
- # decoded_data = autoencoder.decoder(encoded_data).numpy()
- #
- # result = misc.int2bit_array(decoded_data.argmax(axis=1), n)
- # print("Accuracy: %.4f" % accuracy_score(x_test_array, result.reshape(-1, )))
- # view_encoder(autoencoder.encoder, n)
- pass
|