|
|
@@ -8,11 +8,13 @@ from tensorflow.keras import layers, losses
|
|
|
from tensorflow.keras.models import Model
|
|
|
from tensorflow.python.keras.layers import LeakyReLU, ReLU
|
|
|
|
|
|
-from functools import partial
|
|
|
+# from functools import partial
|
|
|
import misc
|
|
|
import defs
|
|
|
from models import basic
|
|
|
import os
|
|
|
+# from tensorflow_model_optimization.python.core.quantization.keras import quantize, quantize_aware_activation
|
|
|
+from models.data import BinaryOneHotGenerator
|
|
|
|
|
|
latent_dim = 64
|
|
|
|
|
|
@@ -25,14 +27,22 @@ class AutoencoderMod(defs.Modulator):
|
|
|
self.autoencoder = autoencoder
|
|
|
|
|
|
def forward(self, binary: np.ndarray):
|
|
|
- reshaped = binary.reshape((-1, self.N))
|
|
|
+ reshaped = binary.reshape((-1, (self.N * self.autoencoder.parallel)))
|
|
|
reshaped_ho = misc.bit_matrix2one_hot(reshaped)
|
|
|
encoded = self.autoencoder.encoder(reshaped_ho)
|
|
|
x = encoded.numpy()
|
|
|
- x2 = x * 2 - 1
|
|
|
+ if self.autoencoder.bipolar:
|
|
|
+ x = x * 2 - 1
|
|
|
|
|
|
- f = np.zeros(x2.shape[0])
|
|
|
- x3 = misc.rect2polar(np.c_[x2[:, 0], x2[:, 1], f])
|
|
|
+ if self.autoencoder.parallel > 1:
|
|
|
+ x = x.reshape((-1, self.autoencoder.signal_dim))
|
|
|
+
|
|
|
+ f = np.zeros(x.shape[0])
|
|
|
+ if self.autoencoder.signal_dim <= 1:
|
|
|
+ p = np.zeros(x.shape[0])
|
|
|
+ else:
|
|
|
+ p = x[:, 1]
|
|
|
+ x3 = misc.rect2polar(np.c_[x[:, 0], p, f])
|
|
|
return basic.RFSignal(x3)
|
|
|
|
|
|
|
|
|
@@ -42,31 +52,44 @@ class AutoencoderDemod(defs.Demodulator):
|
|
|
self.autoencoder = autoencoder
|
|
|
|
|
|
def forward(self, values: defs.Signal) -> np.ndarray:
|
|
|
- decoded = self.autoencoder.decoder(values.rect).numpy()
|
|
|
- result = misc.int2bit_array(decoded.argmax(axis=1), self.N)
|
|
|
+ if self.autoencoder.signal_dim <= 1:
|
|
|
+ val = values.rect_x
|
|
|
+ else:
|
|
|
+ val = values.rect
|
|
|
+ if self.autoencoder.parallel > 1:
|
|
|
+ val = val.reshape((-1, self.autoencoder.parallel))
|
|
|
+ decoded = self.autoencoder.decoder(val).numpy()
|
|
|
+ result = misc.int2bit_array(decoded.argmax(axis=1), self.N * self.autoencoder.parallel)
|
|
|
return result.reshape(-1, )
|
|
|
|
|
|
|
|
|
class Autoencoder(Model):
|
|
|
- def __init__(self, N, channel, signal_dim=2):
|
|
|
+ def __init__(self, N, channel, signal_dim=2, parallel=1, all_onehot=True, bipolar=True):
|
|
|
super(Autoencoder, self).__init__()
|
|
|
self.N = N
|
|
|
+ self.parallel = parallel
|
|
|
+ self.signal_dim = signal_dim
|
|
|
+ self.bipolar = bipolar
|
|
|
+ self._input_shape = 2 ** (N * parallel) if all_onehot else (2 ** N) * parallel
|
|
|
self.encoder = tf.keras.Sequential()
|
|
|
- self.encoder.add(tf.keras.Input(shape=(2 ** N,), dtype=bool))
|
|
|
+ self.encoder.add(layers.Input(shape=(self._input_shape,)))
|
|
|
self.encoder.add(layers.Dense(units=2 ** (N + 1)))
|
|
|
self.encoder.add(LeakyReLU(alpha=0.001))
|
|
|
# self.encoder.add(layers.Dropout(0.2))
|
|
|
self.encoder.add(layers.Dense(units=2 ** (N + 1)))
|
|
|
self.encoder.add(LeakyReLU(alpha=0.001))
|
|
|
- self.encoder.add(layers.Dense(units=signal_dim, activation="tanh"))
|
|
|
+ self.encoder.add(layers.Dense(units=signal_dim * parallel, activation="sigmoid"))
|
|
|
# self.encoder.add(layers.ReLU(max_value=1.0))
|
|
|
+ # self.encoder = quantize.quantize_model(self.encoder)
|
|
|
|
|
|
self.decoder = tf.keras.Sequential()
|
|
|
- self.decoder.add(tf.keras.Input(shape=(signal_dim,)))
|
|
|
+ self.decoder.add(tf.keras.Input(shape=(signal_dim * parallel,)))
|
|
|
self.decoder.add(layers.Dense(units=2 ** (N + 1)))
|
|
|
+ # self.encoder.add(LeakyReLU(alpha=0.001))
|
|
|
+ # self.decoder.add(layers.Dense(units=2 ** (N + 1)))
|
|
|
# leaky relu with alpha=1 gives by far best results
|
|
|
self.decoder.add(LeakyReLU(alpha=1))
|
|
|
- self.decoder.add(layers.Dense(units=2 ** N, activation="softmax"))
|
|
|
+ self.decoder.add(layers.Dense(units=self._input_shape, activation="softmax"))
|
|
|
|
|
|
# self.randomiser = tf.random_normal_initializer(mean=0.0, stddev=0.1, seed=None)
|
|
|
|
|
|
@@ -95,10 +118,16 @@ class Autoencoder(Model):
|
|
|
# layers.Conv2DTranspose(16, kernel_size=3, strides=2, activation='relu', padding='same'),
|
|
|
# layers.Conv2D(1, kernel_size=(3, 3), activation='sigmoid', padding='same')
|
|
|
# ])
|
|
|
+ @property
|
|
|
+ def all_layers(self):
|
|
|
+ return self.layers[0].layers + self.layers[1].layers
|
|
|
|
|
|
def call(self, x, **kwargs):
|
|
|
signal = self.encoder(x)
|
|
|
- signal = signal * 2 - 1
|
|
|
+ if self.bipolar:
|
|
|
+ signal = signal * 2 - 1
|
|
|
+ else:
|
|
|
+ signal = tf.clip_by_value(signal, 0, 1)
|
|
|
signal = self.channel.forward_tensor(signal)
|
|
|
# encoded = encoded * 2 - 1
|
|
|
# encoded = tf.clip_by_value(encoded, clip_value_min=0, clip_value_max=1, name=None)
|
|
|
@@ -152,24 +181,25 @@ class Autoencoder(Model):
|
|
|
|
|
|
print("Decoder accuracy: %.4f" % accuracy_score(y_pred2, y_test))
|
|
|
|
|
|
- def train(self, samples=1e6):
|
|
|
- if samples % self.N:
|
|
|
- samples += self.N - (samples % self.N)
|
|
|
- x_train = misc.generate_random_bit_array(samples).reshape((-1, self.N))
|
|
|
- x_train_ho = misc.bit_matrix2one_hot(x_train)
|
|
|
+ def train(self, epoch_size=3e3, epochs=5):
|
|
|
+ m = self.N * self.parallel
|
|
|
+ x_train = BinaryOneHotGenerator(size=epoch_size, shape=m)
|
|
|
+ x_test = BinaryOneHotGenerator(size=epoch_size*.3, shape=m)
|
|
|
|
|
|
- test_samples = samples * 0.3
|
|
|
- if test_samples % self.N:
|
|
|
- test_samples += self.N - (test_samples % self.N)
|
|
|
- x_test_array = misc.generate_random_bit_array(test_samples)
|
|
|
- x_test = x_test_array.reshape((-1, self.N))
|
|
|
- x_test_ho = misc.bit_matrix2one_hot(x_test)
|
|
|
+ # test_samples = epoch_size
|
|
|
+ # if test_samples % m:
|
|
|
+ # test_samples += m - (test_samples % m)
|
|
|
+ # x_test_array = misc.generate_random_bit_array(test_samples)
|
|
|
+ # x_test = x_test_array.reshape((-1, m))
|
|
|
+ # x_test_ho = misc.bit_matrix2one_hot(x_test)
|
|
|
|
|
|
if not self.compiled:
|
|
|
self.compile(optimizer='adam', loss=losses.MeanSquaredError())
|
|
|
self.compiled = True
|
|
|
+ # self.build((self._input_shape, -1))
|
|
|
+ # self.summary()
|
|
|
|
|
|
- self.fit(x_train_ho, x_train_ho, shuffle=False, validation_data=(x_test_ho, x_test_ho))
|
|
|
+ self.fit(x_train, shuffle=False, validation_data=x_test, epochs=epochs)
|
|
|
# encoded_data = self.encoder(x_test_ho)
|
|
|
# decoded_data = self.decoder(encoded_data).numpy()
|
|
|
|
|
|
@@ -184,12 +214,14 @@ class Autoencoder(Model):
|
|
|
return self.demod
|
|
|
|
|
|
|
|
|
-def view_encoder(encoder, N, samples=1000):
|
|
|
+def view_encoder(encoder, N, samples=1000, title="Autoencoder generated alphabet"):
|
|
|
test_values = misc.generate_random_bit_array(samples).reshape((-1, N))
|
|
|
test_values_ho = misc.bit_matrix2one_hot(test_values)
|
|
|
mvector = np.array([2 ** i for i in range(N)], dtype=int)
|
|
|
symbols = (test_values * mvector).sum(axis=1)
|
|
|
encoded = encoder(test_values_ho).numpy()
|
|
|
+ if encoded.shape[1] == 1:
|
|
|
+ encoded = np.c_[encoded, np.zeros(encoded.shape[0])]
|
|
|
# encoded = misc.polar2rect(encoded)
|
|
|
for i in range(2 ** N):
|
|
|
xy = encoded[symbols == i]
|
|
|
@@ -197,7 +229,7 @@ def view_encoder(encoder, N, samples=1000):
|
|
|
plt.annotate(xy=[xy[:, 0].mean() + 0.01, xy[:, 1].mean() + 0.01], text=format(i, f'0{N}b'))
|
|
|
plt.xlabel('Real')
|
|
|
plt.ylabel('Imaginary')
|
|
|
- plt.title("Autoencoder generated alphabet")
|
|
|
+ plt.title(title)
|
|
|
# plt.legend()
|
|
|
plt.show()
|
|
|
|
|
|
@@ -222,18 +254,18 @@ if __name__ == '__main__':
|
|
|
# x_test = x_test_array.reshape((-1, n))
|
|
|
# x_test_ho = misc.bit_matrix2one_hot(x_test)
|
|
|
|
|
|
- autoencoder = Autoencoder(n, -8)
|
|
|
+ autoencoder = Autoencoder(n, -15)
|
|
|
autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())
|
|
|
|
|
|
- autoencoder.fit_encoder(modulation='16qam',
|
|
|
- sample_size=2e6,
|
|
|
- train_size=0.8,
|
|
|
- epochs=1,
|
|
|
- batch_size=256,
|
|
|
- shuffle=True)
|
|
|
+ # autoencoder.fit_encoder(modulation='16qam',
|
|
|
+ # sample_size=2e6,
|
|
|
+ # train_size=0.8,
|
|
|
+ # epochs=1,
|
|
|
+ # batch_size=256,
|
|
|
+ # shuffle=True)
|
|
|
|
|
|
- view_encoder(autoencoder.encoder, n)
|
|
|
- autoencoder.fit_decoder(modulation='16qam', samples=2e6)
|
|
|
+ # view_encoder(autoencoder.encoder, n)
|
|
|
+ # autoencoder.fit_decoder(modulation='16qam', samples=2e6)
|
|
|
autoencoder.train()
|
|
|
view_encoder(autoencoder.encoder, n)
|
|
|
|