瀏覽代碼

Separated custom layers to new file

Min 4 年之前
父節點
當前提交
f29ac999c1
共有 3 個文件被更改,包括 108 次插入57 次删除
  1. 45 45
      models/autoencoder.py
  2. 0 12
      models/end_to_end.py
  3. 63 0
      models/layers.py

+ 45 - 45
models/autoencoder.py

@@ -15,6 +15,7 @@ from models import basic
 import os
 # from tensorflow_model_optimization.python.core.quantization.keras import quantize, quantize_aware_activation
 from models.data import BinaryOneHotGenerator
+from models import layers as custom_layers
 
 latent_dim = 64
 
@@ -22,14 +23,15 @@ print("# GPUs Available: ", len(tf.config.experimental.list_physical_devices('GP
 
 
 class AutoencoderMod(defs.Modulator):
-    def __init__(self, autoencoder):
+    def __init__(self, autoencoder, encoder=None):
         super().__init__(2 ** autoencoder.N)
         self.autoencoder = autoencoder
+        self.encoder = encoder or autoencoder.encoder
 
     def forward(self, binary: np.ndarray):
         reshaped = binary.reshape((-1, (self.N * self.autoencoder.parallel)))
         reshaped_ho = misc.bit_matrix2one_hot(reshaped)
-        encoded = self.autoencoder.encoder(reshaped_ho)
+        encoded = self.encoder(reshaped_ho)
         x = encoded.numpy()
         if self.autoencoder.bipolar:
             x = x * 2 - 1
@@ -47,9 +49,10 @@ class AutoencoderMod(defs.Modulator):
 
 
 class AutoencoderDemod(defs.Demodulator):
-    def __init__(self, autoencoder):
+    def __init__(self, autoencoder, decoder=None):
         super().__init__(2 ** autoencoder.N)
         self.autoencoder = autoencoder
+        self.decoder = decoder or autoencoder.decoder
 
     def forward(self, values: defs.Signal) -> np.ndarray:
         if self.autoencoder.signal_dim <= 1:
@@ -58,53 +61,60 @@ class AutoencoderDemod(defs.Demodulator):
             val = values.rect
         if self.autoencoder.parallel > 1:
             val = val.reshape((-1, self.autoencoder.parallel))
-        decoded = self.autoencoder.decoder(val).numpy()
+        decoded = self.decoder(val).numpy()
         result = misc.int2bit_array(decoded.argmax(axis=1), self.N * self.autoencoder.parallel)
         return result.reshape(-1, )
 
 
 class Autoencoder(Model):
-    def __init__(self, N, channel, signal_dim=2, parallel=1, all_onehot=True, bipolar=True):
+    def __init__(self, N, channel, signal_dim=2, parallel=1, all_onehot=True, bipolar=True, encoder=None, decoder=None):
         super(Autoencoder, self).__init__()
         self.N = N
         self.parallel = parallel
         self.signal_dim = signal_dim
         self.bipolar = bipolar
         self._input_shape = 2 ** (N * parallel) if all_onehot else (2 ** N) * parallel
-        self.encoder = tf.keras.Sequential()
-        self.encoder.add(layers.Input(shape=(self._input_shape,)))
-        self.encoder.add(layers.Dense(units=2 ** (N + 1)))
-        self.encoder.add(LeakyReLU(alpha=0.001))
-        # self.encoder.add(layers.Dropout(0.2))
-        self.encoder.add(layers.Dense(units=2 ** (N + 1)))
-        self.encoder.add(LeakyReLU(alpha=0.001))
-        self.encoder.add(layers.Dense(units=signal_dim * parallel, activation="sigmoid"))
-        # self.encoder.add(layers.ReLU(max_value=1.0))
-        # self.encoder = quantize.quantize_model(self.encoder)
-
-        self.decoder = tf.keras.Sequential()
-        self.decoder.add(tf.keras.Input(shape=(signal_dim * parallel,)))
-        self.decoder.add(layers.Dense(units=2 ** (N + 1)))
-        # self.encoder.add(LeakyReLU(alpha=0.001))
-        # self.decoder.add(layers.Dense(units=2 ** (N + 1)))
-        # leaky relu with alpha=1 gives by far best results
-        self.decoder.add(LeakyReLU(alpha=1))
-        self.decoder.add(layers.Dense(units=self._input_shape, activation="softmax"))
-
+        if encoder is None:
+            self.encoder = tf.keras.Sequential()
+            self.encoder.add(layers.Input(shape=(self._input_shape,)))
+            self.encoder.add(layers.Dense(units=2 ** (N + 1)))
+            self.encoder.add(LeakyReLU(alpha=0.001))
+            # self.encoder.add(layers.Dropout(0.2))
+            self.encoder.add(layers.Dense(units=2 ** (N + 1)))
+            self.encoder.add(LeakyReLU(alpha=0.001))
+            self.encoder.add(layers.Dense(units=signal_dim * parallel, activation="sigmoid"))
+            # self.encoder.add(layers.ReLU(max_value=1.0))
+            # self.encoder = quantize.quantize_model(self.encoder)
+        else:
+            self.encoder = encoder
+
+        if decoder is None:
+            self.decoder = tf.keras.Sequential()
+            self.decoder.add(tf.keras.Input(shape=(signal_dim * parallel,)))
+            self.decoder.add(layers.Dense(units=2 ** (N + 1)))
+            # self.encoder.add(LeakyReLU(alpha=0.001))
+            # self.decoder.add(layers.Dense(units=2 ** (N + 1)))
+            # leaky relu with alpha=1 gives by far best results
+            self.decoder.add(LeakyReLU(alpha=1))
+            self.decoder.add(layers.Dense(units=self._input_shape, activation="softmax"))
+        else:
+            self.decoder = decoder
         # self.randomiser = tf.random_normal_initializer(mean=0.0, stddev=0.1, seed=None)
 
         self.mod = None
         self.demod = None
         self.compiled = False
 
+        self.channel = tf.keras.Sequential()
+        if self.bipolar:
+            self.channel.add(custom_layers.ScaleAndOffset(2, -1, input_shape=(signal_dim * parallel,)))
+
         if isinstance(channel, int) or isinstance(channel, float):
-            self.channel = basic.AWGNChannel(channel)
+            self.channel.add(custom_layers.AwgnChannel(noise_dB=channel, input_shape=(signal_dim * parallel,)))
         else:
-            if not hasattr(channel, 'forward_tensor'):
-                raise ValueError("Channel has no forward_tensor function")
-            if not callable(channel.forward_tensor):
-                raise ValueError("Channel.forward_tensor is not callable")
-            self.channel = channel
+            if not isinstance(channel, tf.keras.layers.Layer):
+                raise ValueError("Channel is not a keras layer")
+            self.channel.add(channel)
 
         # self.decoder.add(layers.Softmax(units=4, dtype=bool))
 
@@ -120,22 +130,12 @@ class Autoencoder(Model):
         # ])
     @property
     def all_layers(self):
-        return self.layers[0].layers + self.layers[1].layers
+        return self.encoder.layers + self.decoder.layers #self.channel.layers +
 
     def call(self, x, **kwargs):
-        signal = self.encoder(x)
-        if self.bipolar:
-            signal = signal * 2 - 1
-        else:
-            signal = tf.clip_by_value(signal, 0, 1)
-        signal = self.channel.forward_tensor(signal)
-        # encoded = encoded * 2 - 1
-        # encoded = tf.clip_by_value(encoded, clip_value_min=0, clip_value_max=1, name=None)
-        # noise = self.randomiser(shape=(-1, 2), dtype=tf.float32)
-        # noise = np.random.normal(0, 1, (1, 2)) * self.noise
-        # noisy = tf.convert_to_tensor(noise, dtype=tf.float32)
-        decoded = self.decoder(signal)
-        return decoded
+        y = self.encoder(x)
+        z = self.channel(y)
+        return self.decoder(z)
 
     def fit_encoder(self, modulation, sample_size, train_size=0.8, epochs=1, batch_size=1, shuffle=False):
         alphabet = basic.load_alphabet(modulation, polar=False)

+ 0 - 12
models/end_to_end.py

@@ -29,18 +29,6 @@ class ExtractCentralMessage(layers.Layer):
         return tf.matmul(inputs, self.w)
 
 
-class AwgnChannel(layers.Layer):
-    def __init__(self, rx_stddev=0.1):
-        """
-        :param rx_stddev: Standard deviation of receiver noise (due to e.g. TIA circuit)
-        """
-        super(AwgnChannel, self).__init__()
-        self.noise_layer = layers.GaussianNoise(rx_stddev)
-
-    def call(self, inputs, **kwargs):
-        return self.noise_layer.call(inputs, training=True)
-
-
 class DigitizationLayer(layers.Layer):
     def __init__(self,
                  fs,

+ 63 - 0
models/layers.py

@@ -0,0 +1,63 @@
+"""
+Custom Keras Layers for general use
+"""
+import itertools
+
+from tensorflow.keras import layers
+import tensorflow as tf
+import numpy as np
+
+
+class AwgnChannel(layers.Layer):
+    def __init__(self, rx_stddev=0.1, noise_dB=None, **kwargs):
+        """
+        :param rx_stddev: Standard deviation of receiver noise (due to e.g. TIA circuit)
+        """
+        super(AwgnChannel, self).__init__(**kwargs)
+        if noise_dB is not None:
+            # rx_stddev = np.sqrt(1 / (20 ** (noise_dB / 10.0)))
+            rx_stddev = 10 ** (noise_dB / 10.0)
+        self.noise_layer = layers.GaussianNoise(rx_stddev)
+
+    def call(self, inputs, **kwargs):
+        return self.noise_layer.call(inputs, training=True)
+
+
+class ScaleAndOffset(layers.Layer):
+    """
+    Scales and offsets a tensor
+    """
+
+    def __init__(self, scale=1, offset=0, **kwargs):
+        super(ScaleAndOffset, self).__init__(**kwargs)
+        self.offset = offset
+        self.scale = scale
+
+    def call(self, inputs, **kwargs):
+        return inputs * self.scale + self.offset
+
+
+class BitsToSymbol(layers.Layer):
+    def __init__(self, cardinality, **kwargs):
+        super().__init__(**kwargs)
+        self.cardinality = cardinality
+        n = int(np.log(self.cardinality, 2))
+        self.powers = tf.convert_to_tensor(
+            np.power(2, np.linspace(n - 1, 0, n)).reshape(-1, 1),
+            dtype=tf.float32
+        )
+
+    def call(self, inputs, **kwargs):
+        idx = tf.cast(tf.tensordot(inputs, self.powers, axes=1), dtype=tf.int32)
+        return tf.one_hot(idx, self.cardinality)
+
+
+class SymbolToBits(layers.Layer):
+    def __init__(self, cardinality, **kwargs):
+        super().__init__(**kwargs)
+        n = int(np.log(cardinality, 2))
+        l = [list(i) for i in itertools.product([0, 1], repeat=n)]
+        self.all_syms = tf.transpose(tf.convert_to_tensor(np.asarray(l), dtype=tf.float32))
+
+    def call(self, inputs, **kwargs):
+        return tf.matmul(self.all_syms, inputs)