Kaynağa Gözat

Train decoder to match some modulation

Min 5 yıl önce
ebeveyn
işleme
294eb2b46c
1 değiştirilmiş dosya ile 41 ekleme ve 16 silme
  1. 41 16
      models/autoencoder.py

+ 41 - 16
models/autoencoder.py

@@ -3,9 +3,10 @@ import numpy as np
 import tensorflow as tf
 
 from sklearn.metrics import accuracy_score
+from sklearn.model_selection import train_test_split
 from tensorflow.keras import layers, losses
 from tensorflow.keras.models import Model
-from tensorflow.python.keras.layers import LeakyReLU
+from tensorflow.python.keras.layers import LeakyReLU, ReLU
 
 import misc
 import defs
@@ -19,7 +20,7 @@ print("# GPUs Available: ", len(tf.config.experimental.list_physical_devices('GP
 
 class AutoencoderMod(defs.Modulator):
     def __init__(self, autoencoder):
-        super().__init__(2**autoencoder.N)
+        super().__init__(2 ** autoencoder.N)
         self.autoencoder = autoencoder
 
     def forward(self, binary: np.ndarray) -> np.ndarray:
@@ -36,7 +37,7 @@ class AutoencoderMod(defs.Modulator):
 
 class AutoencoderDemod(defs.Demodulator):
     def __init__(self, autoencoder):
-        super().__init__(2**autoencoder.N)
+        super().__init__(2 ** autoencoder.N)
         self.autoencoder = autoencoder
 
     def forward(self, values: np.ndarray) -> np.ndarray:
@@ -45,6 +46,7 @@ class AutoencoderDemod(defs.Demodulator):
         result = misc.int2bit_array(decoded.argmax(axis=1), self.N)
         return result.reshape(-1, )
 
+
 class Autoencoder(Model):
     def __init__(self, N, noise):
         super(Autoencoder, self).__init__()
@@ -62,10 +64,8 @@ class Autoencoder(Model):
         self.decoder = tf.keras.Sequential()
         self.decoder.add(tf.keras.Input(shape=(2,)))
         self.decoder.add(layers.Dense(units=2 ** (N + 1)))
-        self.decoder.add(LeakyReLU(alpha=0.001))
-        # self.decoder.add(layers.Dropout(0.2))
-        self.decoder.add(layers.Dense(units=2 ** (N + 1)))
-        self.decoder.add(LeakyReLU(alpha=0.001))
+        # leaky relu with alpha=1 gives by far best results
+        self.decoder.add(LeakyReLU(alpha=1))
         self.decoder.add(layers.Dense(units=2 ** N, activation="softmax"))
 
         # self.randomiser = tf.random_normal_initializer(mean=0.0, stddev=0.1, seed=None)
@@ -101,21 +101,20 @@ class Autoencoder(Model):
         return decoded
 
     def fit_encoder(self, modulation, sample_size, train_size=0.8, epochs=1, batch_size=1, shuffle=False):
-        os.chdir('../')
         alphabet = basic.load_alphabet(modulation, polar=False)
 
-        if not alphabet.shape[0] == self.N**2:
+        if not alphabet.shape[0] == self.N ** 2:
             raise Exception("Cardinality of modulation scheme is different from cardinality of autoencoder!")
 
-        x_train = np.random.randint(self.N**2, size=int(sample_size*train_size))
+        x_train = np.random.randint(self.N ** 2, size=int(sample_size * train_size))
         y_train = alphabet[x_train]
-        x_train_ho = np.zeros((int(sample_size*train_size), self.N**2))
+        x_train_ho = np.zeros((int(sample_size * train_size), self.N ** 2))
         for idx, x in np.ndenumerate(x_train):
             x_train_ho[idx, x] = 1
 
-        x_test = np.random.randint(self.N**2, size=int(sample_size*(1-train_size)))
+        x_test = np.random.randint(self.N ** 2, size=int(sample_size * (1 - train_size)))
         y_test = alphabet[x_test]
-        x_test_ho = np.zeros((int(sample_size*(1-train_size)), self.N ** 2))
+        x_test_ho = np.zeros((int(sample_size * (1 - train_size)), self.N ** 2))
         for idx, x in np.ndenumerate(x_test):
             x_test_ho[idx, x] = 1
 
@@ -125,7 +124,25 @@ class Autoencoder(Model):
                          batch_size=batch_size,
                          shuffle=shuffle,
                          validation_data=(x_test_ho, y_test))
-        pass
+
+    def fit_decoder(self, modulation, samples):
+        samples = int(samples * 1.3)
+        demod = basic.AlphabetDemod(modulation, 0)
+        x = np.random.rand(samples, 2) * 2 - 1
+        x = x.reshape((-1, 2))
+        f = np.zeros(x.shape[0])
+        xf = np.c_[x[:, 0], x[:, 1], f]
+        y = demod.forward(misc.rect2polar(xf))
+        y_ho = misc.bit_matrix2one_hot(y.reshape((-1, 4)))
+
+        X_train, X_test, y_train, y_test = train_test_split(x, y_ho)
+        self.decoder.compile(optimizer='adam', loss=tf.keras.losses.MeanSquaredError())
+        self.decoder.fit(X_train, y_train, shuffle=False, validation_data=(X_test, y_test))
+        y_pred = autoencoder.decoder(X_test).numpy()
+        y_pred2 = np.zeros(y_test.shape, dtype=bool)
+        y_pred2[np.arange(y_pred2.shape[0]), np.argmax(y_pred, axis=1)] = True
+
+        print("Accuracy: %.4f" % accuracy_score(y_pred2, y_test))
 
     def train(self, samples=1e6):
         if samples % self.N:
@@ -197,13 +214,21 @@ if __name__ == '__main__':
     autoencoder = Autoencoder(n, -8)
 
     autoencoder.fit_encoder(modulation='16qam',
-                            sample_size=1e6,
+                            sample_size=2e6,
                             train_size=0.8,
-                            epochs=50,
+                            epochs=1,
                             batch_size=256,
                             shuffle=True)
+
+    view_encoder(autoencoder.encoder, n)
+    autoencoder.fit_decoder(modulation='16qam', samples=2e6)
+    autoencoder.train()
     view_encoder(autoencoder.encoder, n)
 
+    # view_encoder(autoencoder.encoder, n)
+    # view_encoder(autoencoder.encoder, n)
+
+
     # autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())
     #
     # autoencoder.fit(x_train_ho, x_train_ho,