|
|
@@ -0,0 +1,116 @@
|
|
|
+# End to end Autoencoder completely implemented in Keras/TF
|
|
|
+
|
|
|
+import keras
|
|
|
+import math
|
|
|
+import tensorflow as tf
|
|
|
+import numpy as np
|
|
|
+import matplotlib.pyplot as plt
|
|
|
+
|
|
|
+from keras import layers
|
|
|
+
|
|
|
+
|
|
|
+class ExtractCentralMessage(keras.layers.Layer):
|
|
|
+ def __init__(self, neighbouring_blocks, samples_per_symbol):
|
|
|
+ super(ExtractCentralMessage, self).__init__()
|
|
|
+
|
|
|
+ temp_w = np.zeros((neighbouring_blocks * samples_per_symbol, samples_per_symbol))
|
|
|
+ i = np.identity(samples_per_symbol)
|
|
|
+ begin = int(samples_per_symbol * ((neighbouring_blocks - 1) / 2))
|
|
|
+ end = int(samples_per_symbol * ((neighbouring_blocks + 1) / 2))
|
|
|
+ temp_w[begin:end, :] = i
|
|
|
+
|
|
|
+ self.w = tf.convert_to_tensor(temp_w, dtype=tf.float32)
|
|
|
+
|
|
|
+ def call(self, inputs):
|
|
|
+ return tf.matmul(inputs, self.w)
|
|
|
+
|
|
|
+
|
|
|
+class AwgnChannel(keras.layers.Layer):
|
|
|
+ def __init__(self, stddev=0.1):
|
|
|
+ super(AwgnChannel, self).__init__()
|
|
|
+ self.stddev = stddev
|
|
|
+ self.noise_layer = layers.GaussianNoise(stddev)
|
|
|
+
|
|
|
+ def call(self, inputs):
|
|
|
+ serialized = layers.Flatten(inputs)
|
|
|
+ return self.noise_layer.call(serialized, training=True)
|
|
|
+
|
|
|
+
|
|
|
+class OpticalChannel(keras.layers.Layer):
|
|
|
+ def __init__(self, fs, stddev=0.1):
|
|
|
+ super(OpticalChannel, self).__init__()
|
|
|
+ self.noise_layer = layers.GaussianNoise(stddev)
|
|
|
+ self.fs = fs
|
|
|
+
|
|
|
+ def call(self, inputs):
|
|
|
+ # TODO:
|
|
|
+ # Low-pass filter & digitization noise for DAC and ADC (probably as an external layer called twice)
|
|
|
+
|
|
|
+ # Serializing outputs of all blocks
|
|
|
+ serialized = layers.Flatten(inputs)
|
|
|
+
|
|
|
+ # Squared-Law Detection
|
|
|
+ squared = tf.square(tf.abs(serialized))
|
|
|
+
|
|
|
+ # Adding gaussian noise
|
|
|
+ noisy = self.noise_layer.call(squared, training=True)
|
|
|
+
|
|
|
+ return noisy
|
|
|
+
|
|
|
+
|
|
|
+class EndToEndAutoencoder(tf.keras.Model):
|
|
|
+ def __init__(self,
|
|
|
+ cardinality,
|
|
|
+ samples_per_symbol,
|
|
|
+ neighbouring_blocks,
|
|
|
+ oversampling,
|
|
|
+ channel_name='awgn'):
|
|
|
+
|
|
|
+ # Number of leading/following messages
|
|
|
+ if neighbouring_blocks % 2 == 0:
|
|
|
+ neighbouring_blocks += 1
|
|
|
+ # Oversampling rate
|
|
|
+ oversampling = int(oversampling)
|
|
|
+ # Transmission Channel : ['awgn', 'optical']
|
|
|
+ channel_name = channel_name.strip().lower()
|
|
|
+
|
|
|
+ self.encoder = tf.keras.Sequential([
|
|
|
+ layers.Input(shape=(neighbouring_blocks, cardinality)),
|
|
|
+ layers.Dense(2 * cardinality, activation='relu'),
|
|
|
+ layers.Dense(2 * cardinality, activation='relu'),
|
|
|
+ layers.Dense(samples_per_symbol),
|
|
|
+ layers.ReLU(max_value=1.0)
|
|
|
+ ])
|
|
|
+
|
|
|
+ # Channel Model Layer
|
|
|
+ if channel_name == 'optical':
|
|
|
+ self.channel = OpticalChannel()
|
|
|
+ elif channel_name == 'awgn':
|
|
|
+ self.channel = AwgnChannel()
|
|
|
+ else:
|
|
|
+ raise TypeError("{} is not an accepted Channel Model.".format(channel_name))
|
|
|
+
|
|
|
+ self.encoder = tf.keras.Sequential([
|
|
|
+ ExtractCentralMessage(neighbouring_blocks, samples_per_symbol),
|
|
|
+ layers.Dense(samples_per_symbol, activation='relu'),
|
|
|
+ layers.Dense(2 * cardinality, activation='relu'),
|
|
|
+ layers.Dense(2 * cardinality, activation='relu'),
|
|
|
+ layers.Dense(cardinality, activation='softmax')
|
|
|
+ ])
|
|
|
+
|
|
|
+ def view_encoder(self):
|
|
|
+ # TODO
|
|
|
+ # Visualize encoder
|
|
|
+ pass
|
|
|
+
|
|
|
+ def call(self, x):
|
|
|
+ tx = self.encoder(x)
|
|
|
+ rx = self.channel(tx)
|
|
|
+ y = self.decoder(rx)
|
|
|
+ return y
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ # TODO
|
|
|
+ # training/testing of autoencoder
|
|
|
+ pass
|