|
@@ -3,11 +3,16 @@ import numpy as np
|
|
|
import tensorflow as tf
|
|
import tensorflow as tf
|
|
|
|
|
|
|
|
from sklearn.metrics import accuracy_score
|
|
from sklearn.metrics import accuracy_score
|
|
|
|
|
+from sklearn.model_selection import train_test_split
|
|
|
from tensorflow.keras import layers, losses
|
|
from tensorflow.keras import layers, losses
|
|
|
from tensorflow.keras.models import Model
|
|
from tensorflow.keras.models import Model
|
|
|
|
|
+from tensorflow.python.keras.layers import LeakyReLU, ReLU
|
|
|
|
|
+
|
|
|
from functools import partial
|
|
from functools import partial
|
|
|
import misc
|
|
import misc
|
|
|
import defs
|
|
import defs
|
|
|
|
|
+from models import basic
|
|
|
|
|
+import os
|
|
|
|
|
|
|
|
latent_dim = 64
|
|
latent_dim = 64
|
|
|
|
|
|
|
@@ -16,10 +21,10 @@ print("# GPUs Available: ", len(tf.config.experimental.list_physical_devices('GP
|
|
|
|
|
|
|
|
class AutoencoderMod(defs.Modulator):
|
|
class AutoencoderMod(defs.Modulator):
|
|
|
def __init__(self, autoencoder):
|
|
def __init__(self, autoencoder):
|
|
|
- super().__init__(2**autoencoder.N)
|
|
|
|
|
|
|
+ super().__init__(2 ** autoencoder.N)
|
|
|
self.autoencoder = autoencoder
|
|
self.autoencoder = autoencoder
|
|
|
|
|
|
|
|
- def forward(self, binary: np.ndarray) -> np.ndarray:
|
|
|
|
|
|
|
+ def forward(self, binary: np.ndarray):
|
|
|
reshaped = binary.reshape((-1, self.N))
|
|
reshaped = binary.reshape((-1, self.N))
|
|
|
reshaped_ho = misc.bit_matrix2one_hot(reshaped)
|
|
reshaped_ho = misc.bit_matrix2one_hot(reshaped)
|
|
|
encoded = self.autoencoder.encoder(reshaped_ho)
|
|
encoded = self.autoencoder.encoder(reshaped_ho)
|
|
@@ -28,38 +33,39 @@ class AutoencoderMod(defs.Modulator):
|
|
|
|
|
|
|
|
f = np.zeros(x2.shape[0])
|
|
f = np.zeros(x2.shape[0])
|
|
|
x3 = misc.rect2polar(np.c_[x2[:, 0], x2[:, 1], f])
|
|
x3 = misc.rect2polar(np.c_[x2[:, 0], x2[:, 1], f])
|
|
|
- return x3
|
|
|
|
|
|
|
+ return basic.RFSignal(x3)
|
|
|
|
|
|
|
|
|
|
|
|
|
class AutoencoderDemod(defs.Demodulator):
|
|
class AutoencoderDemod(defs.Demodulator):
|
|
|
def __init__(self, autoencoder):
|
|
def __init__(self, autoencoder):
|
|
|
- super().__init__(2**autoencoder.N)
|
|
|
|
|
|
|
+ super().__init__(2 ** autoencoder.N)
|
|
|
self.autoencoder = autoencoder
|
|
self.autoencoder = autoencoder
|
|
|
|
|
|
|
|
- def forward(self, values: np.ndarray) -> np.ndarray:
|
|
|
|
|
- rect = misc.polar2rect(values[:, [0, 1]])
|
|
|
|
|
- decoded = self.autoencoder.decoder(rect).numpy()
|
|
|
|
|
|
|
+ def forward(self, values: defs.Signal) -> np.ndarray:
|
|
|
|
|
+ decoded = self.autoencoder.decoder(values.rect).numpy()
|
|
|
result = misc.int2bit_array(decoded.argmax(axis=1), self.N)
|
|
result = misc.int2bit_array(decoded.argmax(axis=1), self.N)
|
|
|
return result.reshape(-1, )
|
|
return result.reshape(-1, )
|
|
|
|
|
|
|
|
|
|
|
|
|
class Autoencoder(Model):
|
|
class Autoencoder(Model):
|
|
|
- def __init__(self, N, noise):
|
|
|
|
|
|
|
+ def __init__(self, N, channel, signal_dim=2):
|
|
|
super(Autoencoder, self).__init__()
|
|
super(Autoencoder, self).__init__()
|
|
|
self.N = N
|
|
self.N = N
|
|
|
self.encoder = tf.keras.Sequential()
|
|
self.encoder = tf.keras.Sequential()
|
|
|
self.encoder.add(tf.keras.Input(shape=(2 ** N,), dtype=bool))
|
|
self.encoder.add(tf.keras.Input(shape=(2 ** N,), dtype=bool))
|
|
|
self.encoder.add(layers.Dense(units=2 ** (N + 1)))
|
|
self.encoder.add(layers.Dense(units=2 ** (N + 1)))
|
|
|
|
|
+ self.encoder.add(LeakyReLU(alpha=0.001))
|
|
|
# self.encoder.add(layers.Dropout(0.2))
|
|
# self.encoder.add(layers.Dropout(0.2))
|
|
|
self.encoder.add(layers.Dense(units=2 ** (N + 1)))
|
|
self.encoder.add(layers.Dense(units=2 ** (N + 1)))
|
|
|
- self.encoder.add(layers.Dense(units=2, activation="sigmoid"))
|
|
|
|
|
|
|
+ self.encoder.add(LeakyReLU(alpha=0.001))
|
|
|
|
|
+ self.encoder.add(layers.Dense(units=signal_dim, activation="tanh"))
|
|
|
# self.encoder.add(layers.ReLU(max_value=1.0))
|
|
# self.encoder.add(layers.ReLU(max_value=1.0))
|
|
|
|
|
|
|
|
self.decoder = tf.keras.Sequential()
|
|
self.decoder = tf.keras.Sequential()
|
|
|
- self.decoder.add(tf.keras.Input(shape=(2,)))
|
|
|
|
|
- self.decoder.add(layers.Dense(units=2 ** (N + 1)))
|
|
|
|
|
- # self.decoder.add(layers.Dropout(0.2))
|
|
|
|
|
|
|
+ self.decoder.add(tf.keras.Input(shape=(signal_dim,)))
|
|
|
self.decoder.add(layers.Dense(units=2 ** (N + 1)))
|
|
self.decoder.add(layers.Dense(units=2 ** (N + 1)))
|
|
|
|
|
+ # leaky relu with alpha=1 gives by far best results
|
|
|
|
|
+ self.decoder.add(LeakyReLU(alpha=1))
|
|
|
self.decoder.add(layers.Dense(units=2 ** N, activation="softmax"))
|
|
self.decoder.add(layers.Dense(units=2 ** N, activation="softmax"))
|
|
|
|
|
|
|
|
# self.randomiser = tf.random_normal_initializer(mean=0.0, stddev=0.1, seed=None)
|
|
# self.randomiser = tf.random_normal_initializer(mean=0.0, stddev=0.1, seed=None)
|
|
@@ -68,8 +74,14 @@ class Autoencoder(Model):
|
|
|
self.demod = None
|
|
self.demod = None
|
|
|
self.compiled = False
|
|
self.compiled = False
|
|
|
|
|
|
|
|
- # Divide by 2 because encoder outputs values between 0 and 1 instead of -1 and 1
|
|
|
|
|
- self.noise = 10 ** (noise / 10) # / 2
|
|
|
|
|
|
|
+ if isinstance(channel, int) or isinstance(channel, float):
|
|
|
|
|
+ self.channel = basic.AWGNChannel(channel)
|
|
|
|
|
+ else:
|
|
|
|
|
+ if not hasattr(channel, 'forward_tensor'):
|
|
|
|
|
+ raise ValueError("Channel has no forward_tensor function")
|
|
|
|
|
+ if not callable(channel.forward_tensor):
|
|
|
|
|
+ raise ValueError("Channel.forward_tensor is not callable")
|
|
|
|
|
+ self.channel = channel
|
|
|
|
|
|
|
|
# self.decoder.add(layers.Softmax(units=4, dtype=bool))
|
|
# self.decoder.add(layers.Softmax(units=4, dtype=bool))
|
|
|
|
|
|
|
@@ -85,15 +97,61 @@ class Autoencoder(Model):
|
|
|
# ])
|
|
# ])
|
|
|
|
|
|
|
|
def call(self, x, **kwargs):
|
|
def call(self, x, **kwargs):
|
|
|
- encoded = self.encoder(x)
|
|
|
|
|
- encoded = encoded * 2 - 1
|
|
|
|
|
|
|
+ signal = self.encoder(x)
|
|
|
|
|
+ signal = signal * 2 - 1
|
|
|
|
|
+ signal = self.channel.forward_tensor(signal)
|
|
|
|
|
+ # encoded = encoded * 2 - 1
|
|
|
# encoded = tf.clip_by_value(encoded, clip_value_min=0, clip_value_max=1, name=None)
|
|
# encoded = tf.clip_by_value(encoded, clip_value_min=0, clip_value_max=1, name=None)
|
|
|
# noise = self.randomiser(shape=(-1, 2), dtype=tf.float32)
|
|
# noise = self.randomiser(shape=(-1, 2), dtype=tf.float32)
|
|
|
- noise = np.random.normal(0, 1, (1, 2)) * self.noise
|
|
|
|
|
- noisy = tf.convert_to_tensor(noise, dtype=tf.float32)
|
|
|
|
|
- decoded = self.decoder(encoded + noisy)
|
|
|
|
|
|
|
+ # noise = np.random.normal(0, 1, (1, 2)) * self.noise
|
|
|
|
|
+ # noisy = tf.convert_to_tensor(noise, dtype=tf.float32)
|
|
|
|
|
+ decoded = self.decoder(signal)
|
|
|
return decoded
|
|
return decoded
|
|
|
|
|
|
|
|
|
|
+ def fit_encoder(self, modulation, sample_size, train_size=0.8, epochs=1, batch_size=1, shuffle=False):
|
|
|
|
|
+ alphabet = basic.load_alphabet(modulation, polar=False)
|
|
|
|
|
+
|
|
|
|
|
+ if not alphabet.shape[0] == self.N ** 2:
|
|
|
|
|
+ raise Exception("Cardinality of modulation scheme is different from cardinality of autoencoder!")
|
|
|
|
|
+
|
|
|
|
|
+ x_train = np.random.randint(self.N ** 2, size=int(sample_size * train_size))
|
|
|
|
|
+ y_train = alphabet[x_train]
|
|
|
|
|
+ x_train_ho = np.zeros((int(sample_size * train_size), self.N ** 2))
|
|
|
|
|
+ for idx, x in np.ndenumerate(x_train):
|
|
|
|
|
+ x_train_ho[idx, x] = 1
|
|
|
|
|
+
|
|
|
|
|
+ x_test = np.random.randint(self.N ** 2, size=int(sample_size * (1 - train_size)))
|
|
|
|
|
+ y_test = alphabet[x_test]
|
|
|
|
|
+ x_test_ho = np.zeros((int(sample_size * (1 - train_size)), self.N ** 2))
|
|
|
|
|
+ for idx, x in np.ndenumerate(x_test):
|
|
|
|
|
+ x_test_ho[idx, x] = 1
|
|
|
|
|
+
|
|
|
|
|
+ self.encoder.compile(optimizer='adam', loss=tf.keras.losses.MeanSquaredError())
|
|
|
|
|
+ self.encoder.fit(x_train_ho, y_train,
|
|
|
|
|
+ epochs=epochs,
|
|
|
|
|
+ batch_size=batch_size,
|
|
|
|
|
+ shuffle=shuffle,
|
|
|
|
|
+ validation_data=(x_test_ho, y_test))
|
|
|
|
|
+
|
|
|
|
|
+ def fit_decoder(self, modulation, samples):
|
|
|
|
|
+ samples = int(samples * 1.3)
|
|
|
|
|
+ demod = basic.AlphabetDemod(modulation, 0)
|
|
|
|
|
+ x = np.random.rand(samples, 2) * 2 - 1
|
|
|
|
|
+ x = x.reshape((-1, 2))
|
|
|
|
|
+ f = np.zeros(x.shape[0])
|
|
|
|
|
+ xf = np.c_[x[:, 0], x[:, 1], f]
|
|
|
|
|
+ y = demod.forward(basic.RFSignal(misc.rect2polar(xf)))
|
|
|
|
|
+ y_ho = misc.bit_matrix2one_hot(y.reshape((-1, 4)))
|
|
|
|
|
+
|
|
|
|
|
+ X_train, X_test, y_train, y_test = train_test_split(x, y_ho)
|
|
|
|
|
+ self.decoder.compile(optimizer='adam', loss=tf.keras.losses.MeanSquaredError())
|
|
|
|
|
+ self.decoder.fit(X_train, y_train, shuffle=False, validation_data=(X_test, y_test))
|
|
|
|
|
+ y_pred = self.decoder(X_test).numpy()
|
|
|
|
|
+ y_pred2 = np.zeros(y_test.shape, dtype=bool)
|
|
|
|
|
+ y_pred2[np.arange(y_pred2.shape[0]), np.argmax(y_pred, axis=1)] = True
|
|
|
|
|
+
|
|
|
|
|
+ print("Decoder accuracy: %.4f" % accuracy_score(y_pred2, y_test))
|
|
|
|
|
+
|
|
|
def train(self, samples=1e6):
|
|
def train(self, samples=1e6):
|
|
|
if samples % self.N:
|
|
if samples % self.N:
|
|
|
samples += self.N - (samples % self.N)
|
|
samples += self.N - (samples % self.N)
|
|
@@ -157,26 +215,44 @@ if __name__ == '__main__':
|
|
|
|
|
|
|
|
n = 4
|
|
n = 4
|
|
|
|
|
|
|
|
- samples = 1e6
|
|
|
|
|
- x_train = misc.generate_random_bit_array(samples).reshape((-1, n))
|
|
|
|
|
- x_train_ho = misc.bit_matrix2one_hot(x_train)
|
|
|
|
|
- x_test_array = misc.generate_random_bit_array(samples * 0.3)
|
|
|
|
|
- x_test = x_test_array.reshape((-1, n))
|
|
|
|
|
- x_test_ho = misc.bit_matrix2one_hot(x_test)
|
|
|
|
|
|
|
+ # samples = 1e6
|
|
|
|
|
+ # x_train = misc.generate_random_bit_array(samples).reshape((-1, n))
|
|
|
|
|
+ # x_train_ho = misc.bit_matrix2one_hot(x_train)
|
|
|
|
|
+ # x_test_array = misc.generate_random_bit_array(samples * 0.3)
|
|
|
|
|
+ # x_test = x_test_array.reshape((-1, n))
|
|
|
|
|
+ # x_test_ho = misc.bit_matrix2one_hot(x_test)
|
|
|
|
|
|
|
|
autoencoder = Autoencoder(n, -8)
|
|
autoencoder = Autoencoder(n, -8)
|
|
|
autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())
|
|
autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())
|
|
|
|
|
|
|
|
- autoencoder.fit(x_train_ho, x_train_ho,
|
|
|
|
|
- epochs=1,
|
|
|
|
|
- shuffle=False,
|
|
|
|
|
- validation_data=(x_test_ho, x_test_ho))
|
|
|
|
|
|
|
+ autoencoder.fit_encoder(modulation='16qam',
|
|
|
|
|
+ sample_size=2e6,
|
|
|
|
|
+ train_size=0.8,
|
|
|
|
|
+ epochs=1,
|
|
|
|
|
+ batch_size=256,
|
|
|
|
|
+ shuffle=True)
|
|
|
|
|
|
|
|
- encoded_data = autoencoder.encoder(x_test_ho)
|
|
|
|
|
- decoded_data = autoencoder.decoder(encoded_data).numpy()
|
|
|
|
|
-
|
|
|
|
|
- result = misc.int2bit_array(decoded_data.argmax(axis=1), n)
|
|
|
|
|
- print("Accuracy: %.4f" % accuracy_score(x_test_array, result.reshape(-1, )))
|
|
|
|
|
view_encoder(autoencoder.encoder, n)
|
|
view_encoder(autoencoder.encoder, n)
|
|
|
|
|
+ autoencoder.fit_decoder(modulation='16qam', samples=2e6)
|
|
|
|
|
+ autoencoder.train()
|
|
|
|
|
+ view_encoder(autoencoder.encoder, n)
|
|
|
|
|
+
|
|
|
|
|
+ # view_encoder(autoencoder.encoder, n)
|
|
|
|
|
+ # view_encoder(autoencoder.encoder, n)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ # autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())
|
|
|
|
|
+ #
|
|
|
|
|
+ # autoencoder.fit(x_train_ho, x_train_ho,
|
|
|
|
|
+ # epochs=1,
|
|
|
|
|
+ # shuffle=False,
|
|
|
|
|
+ # validation_data=(x_test_ho, x_test_ho))
|
|
|
|
|
+ #
|
|
|
|
|
+ # encoded_data = autoencoder.encoder(x_test_ho)
|
|
|
|
|
+ # decoded_data = autoencoder.decoder(encoded_data).numpy()
|
|
|
|
|
+ #
|
|
|
|
|
+ # result = misc.int2bit_array(decoded_data.argmax(axis=1), n)
|
|
|
|
|
+ # print("Accuracy: %.4f" % accuracy_score(x_test_array, result.reshape(-1, )))
|
|
|
|
|
+ # view_encoder(autoencoder.encoder, n)
|
|
|
|
|
|
|
|
pass
|
|
pass
|