瀏覽代碼

Merge remote-tracking branch 'origin/matched_filter' into photonics

# Conflicts:
#	main.py
#	models/autoencoder.py
Min 5 年之前
父節點
當前提交
2677c5e41f
共有 4 個文件被更改,包括 155 次插入36 次删除
  1. 5 0
      alphabets/4pam.a
  2. 1 0
      main.py
  3. 88 20
      models/autoencoder.py
  4. 61 16
      models/optical_channel.py

+ 5 - 0
alphabets/4pam.a

@@ -0,0 +1,5 @@
+R
+00,0,0
+01,0.25,0
+10,0.5,0
+11,1,0

+ 1 - 0
main.py

@@ -14,6 +14,7 @@ from multiprocessing import Pool
 CPU_COUNT = os.environ.get("CPU_COUNT", os.cpu_count())
 
 
+
 def show_constellation(mod, chan, demod, samples=1000):
     x = misc.generate_random_bit_array(samples)
     x_mod = mod.forward(x)

+ 88 - 20
models/autoencoder.py

@@ -3,12 +3,16 @@ import numpy as np
 import tensorflow as tf
 
 from sklearn.metrics import accuracy_score
+from sklearn.model_selection import train_test_split
 from tensorflow.keras import layers, losses
 from tensorflow.keras.models import Model
+from tensorflow.python.keras.layers import LeakyReLU, ReLU
+
 from functools import partial
 import misc
 import defs
 from models import basic
+import os
 
 latent_dim = 64
 
@@ -17,7 +21,7 @@ print("# GPUs Available: ", len(tf.config.experimental.list_physical_devices('GP
 
 class AutoencoderMod(defs.Modulator):
     def __init__(self, autoencoder):
-        super().__init__(2**autoencoder.N)
+        super().__init__(2 ** autoencoder.N)
         self.autoencoder = autoencoder
 
     def forward(self, binary: np.ndarray) -> defs.Signal:
@@ -34,7 +38,7 @@ class AutoencoderMod(defs.Modulator):
 
 class AutoencoderDemod(defs.Demodulator):
     def __init__(self, autoencoder):
-        super().__init__(2**autoencoder.N)
+        super().__init__(2 ** autoencoder.N)
         self.autoencoder = autoencoder
 
     def forward(self, values: defs.Signal) -> np.ndarray:
@@ -50,16 +54,18 @@ class Autoencoder(Model):
         self.encoder = tf.keras.Sequential()
         self.encoder.add(tf.keras.Input(shape=(2 ** N,), dtype=bool))
         self.encoder.add(layers.Dense(units=2 ** (N + 1)))
+        self.encoder.add(LeakyReLU(alpha=0.001))
         # self.encoder.add(layers.Dropout(0.2))
         self.encoder.add(layers.Dense(units=2 ** (N + 1)))
-        self.encoder.add(layers.Dense(units=2, activation="sigmoid"))
+        self.encoder.add(LeakyReLU(alpha=0.001))
+        self.encoder.add(layers.Dense(units=2, activation="tanh"))
         # self.encoder.add(layers.ReLU(max_value=1.0))
 
         self.decoder = tf.keras.Sequential()
         self.decoder.add(tf.keras.Input(shape=(2,)))
         self.decoder.add(layers.Dense(units=2 ** (N + 1)))
-        # self.decoder.add(layers.Dropout(0.2))
-        self.decoder.add(layers.Dense(units=2 ** (N + 1)))
+        # leaky relu with alpha=1 gives by far best results
+        self.decoder.add(LeakyReLU(alpha=1))
         self.decoder.add(layers.Dense(units=2 ** N, activation="softmax"))
 
         # self.randomiser = tf.random_normal_initializer(mean=0.0, stddev=0.1, seed=None)
@@ -97,6 +103,50 @@ class Autoencoder(Model):
         decoded = self.decoder(signal)
         return decoded
 
+    def fit_encoder(self, modulation, sample_size, train_size=0.8, epochs=1, batch_size=1, shuffle=False):
+        alphabet = basic.load_alphabet(modulation, polar=False)
+
+        if not alphabet.shape[0] == self.N ** 2:
+            raise Exception("Cardinality of modulation scheme is different from cardinality of autoencoder!")
+
+        x_train = np.random.randint(self.N ** 2, size=int(sample_size * train_size))
+        y_train = alphabet[x_train]
+        x_train_ho = np.zeros((int(sample_size * train_size), self.N ** 2))
+        for idx, x in np.ndenumerate(x_train):
+            x_train_ho[idx, x] = 1
+
+        x_test = np.random.randint(self.N ** 2, size=int(sample_size * (1 - train_size)))
+        y_test = alphabet[x_test]
+        x_test_ho = np.zeros((int(sample_size * (1 - train_size)), self.N ** 2))
+        for idx, x in np.ndenumerate(x_test):
+            x_test_ho[idx, x] = 1
+
+        self.encoder.compile(optimizer='adam', loss=tf.keras.losses.MeanSquaredError())
+        self.encoder.fit(x_train_ho, y_train,
+                         epochs=epochs,
+                         batch_size=batch_size,
+                         shuffle=shuffle,
+                         validation_data=(x_test_ho, y_test))
+
+    def fit_decoder(self, modulation, samples):
+        samples = int(samples * 1.3)
+        demod = basic.AlphabetDemod(modulation, 0)
+        x = np.random.rand(samples, 2) * 2 - 1
+        x = x.reshape((-1, 2))
+        f = np.zeros(x.shape[0])
+        xf = np.c_[x[:, 0], x[:, 1], f]
+        y = demod.forward(misc.rect2polar(xf))
+        y_ho = misc.bit_matrix2one_hot(y.reshape((-1, 4)))
+
+        X_train, X_test, y_train, y_test = train_test_split(x, y_ho)
+        self.decoder.compile(optimizer='adam', loss=tf.keras.losses.MeanSquaredError())
+        self.decoder.fit(X_train, y_train, shuffle=False, validation_data=(X_test, y_test))
+        y_pred = autoencoder.decoder(X_test).numpy()
+        y_pred2 = np.zeros(y_test.shape, dtype=bool)
+        y_pred2[np.arange(y_pred2.shape[0]), np.argmax(y_pred, axis=1)] = True
+
+        print("Accuracy: %.4f" % accuracy_score(y_pred2, y_test))
+
     def train(self, samples=1e6):
         if samples % self.N:
             samples += self.N - (samples % self.N)
@@ -160,26 +210,44 @@ if __name__ == '__main__':
 
     n = 4
 
-    samples = 1e6
-    x_train = misc.generate_random_bit_array(samples).reshape((-1, n))
-    x_train_ho = misc.bit_matrix2one_hot(x_train)
-    x_test_array = misc.generate_random_bit_array(samples * 0.3)
-    x_test = x_test_array.reshape((-1, n))
-    x_test_ho = misc.bit_matrix2one_hot(x_test)
+    # samples = 1e6
+    # x_train = misc.generate_random_bit_array(samples).reshape((-1, n))
+    # x_train_ho = misc.bit_matrix2one_hot(x_train)
+    # x_test_array = misc.generate_random_bit_array(samples * 0.3)
+    # x_test = x_test_array.reshape((-1, n))
+    # x_test_ho = misc.bit_matrix2one_hot(x_test)
 
     autoencoder = Autoencoder(n, -8)
     autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())
 
-    autoencoder.fit(x_train_ho, x_train_ho,
-                    epochs=1,
-                    shuffle=False,
-                    validation_data=(x_test_ho, x_test_ho))
+    autoencoder.fit_encoder(modulation='16qam',
+                            sample_size=2e6,
+                            train_size=0.8,
+                            epochs=1,
+                            batch_size=256,
+                            shuffle=True)
 
-    encoded_data = autoencoder.encoder(x_test_ho)
-    decoded_data = autoencoder.decoder(encoded_data).numpy()
-
-    result = misc.int2bit_array(decoded_data.argmax(axis=1), n)
-    print("Accuracy: %.4f" % accuracy_score(x_test_array, result.reshape(-1, )))
     view_encoder(autoencoder.encoder, n)
+    autoencoder.fit_decoder(modulation='16qam', samples=2e6)
+    autoencoder.train()
+    view_encoder(autoencoder.encoder, n)
+
+    # view_encoder(autoencoder.encoder, n)
+    # view_encoder(autoencoder.encoder, n)
+
+
+    # autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())
+    #
+    # autoencoder.fit(x_train_ho, x_train_ho,
+    #                 epochs=1,
+    #                 shuffle=False,
+    #                 validation_data=(x_test_ho, x_test_ho))
+    #
+    # encoded_data = autoencoder.encoder(x_test_ho)
+    # decoded_data = autoencoder.decoder(encoded_data).numpy()
+    #
+    # result = misc.int2bit_array(decoded_data.argmax(axis=1), n)
+    # print("Accuracy: %.4f" % accuracy_score(x_test_array, result.reshape(-1, )))
+    # view_encoder(autoencoder.encoder, n)
 
     pass

+ 61 - 16
models/optical_channel.py

@@ -3,17 +3,20 @@ import matplotlib.pyplot as plt
 import defs
 import numpy as np
 import math
-from scipy.fft import fft, ifft
-
+from numpy.fft import fft, fftfreq, ifft
+from commpy.filters import rrcosfilter, rcosfilter, rectfilter
 
 class OpticalChannel(defs.Channel):
-    def __init__(self, noise_level, dispersion, symbol_rate, sample_rate, length, show_graphs=False, **kwargs):
+    def __init__(self, noise_level, dispersion, symbol_rate, sample_rate, length, pulse_shape='rect',
+                 sqrt_out=False, show_graphs=False, **kwargs):
         """
         :param noise_level: Noise level in dB
         :param dispersion: dispersion coefficient is ps^2/km
         :param symbol_rate: Symbol rate of modulated signal in Hz
         :param sample_rate: Sample rate of time-domain model (time steps in simulation) in Hz
         :param length: fibre length in km
+        :param pulse_shape: pulse shape -> ['rect', 'rcos', 'rrcos']
+        :param sqrt_out: Take the root of the out to compensate for photodiode detection
         :param show_graphs: if graphs should be displayed or not
 
         Optical Channel class constructor
@@ -21,29 +24,45 @@ class OpticalChannel(defs.Channel):
         super().__init__(**kwargs)
         self.noise = 10 ** (noise_level / 10)
 
-        self.dispersion = dispersion # * 1e-24  # Converting from ps^2/km to s^2/km
+        self.dispersion = dispersion * 1e-24  # Converting from ps^2/km to s^2/km
         self.symbol_rate = symbol_rate
         self.symbol_period = 1 / self.symbol_rate
         self.sample_rate = sample_rate
         self.sample_period = 1 / self.sample_rate
         self.length = length
+        self.pulse_shape = pulse_shape.strip().lower()
+        self.sqrt_out = sqrt_out
         self.show_graphs = show_graphs
 
     def __get_time_domain(self, symbol_vals):
         samples_per_symbol = int(self.sample_rate / self.symbol_rate)
         samples = int(symbol_vals.shape[0] * samples_per_symbol)
 
-        symbol_vals_a = np.repeat(symbol_vals, repeats=samples_per_symbol, axis=0)
-        t = np.linspace(start=0, stop=samples * self.sample_period, num=samples)
-        val_t = symbol_vals_a[:, 0] * np.cos(2 * math.pi * symbol_vals_a[:, 2] * t + symbol_vals_a[:, 1])
+        symbol_impulse = np.zeros(samples)
+
+        # TODO: Implement Frequency/Phase Modulation
+
+        for i in range(symbol_vals.shape[0]):
+            symbol_impulse[i*samples_per_symbol] = symbol_vals[i, 0]
+
+        if self.pulse_shape == 'rrcos':
+            self.filter_samples = 5 * samples_per_symbol
+            self.t_filter, self.h_filter = rrcosfilter(self.filter_samples, 0.8, self.symbol_period, self.sample_rate)
+        elif self.pulse_shape == 'rcos':
+            self.filter_samples = 5 * samples_per_symbol
+            self.t_filter, self.h_filter = rcosfilter(self.filter_samples, 0.8, self.symbol_period, self.sample_rate)
+        else:
+            self.filter_samples = samples_per_symbol
+            self.t_filter, self.h_filter = rectfilter(self.filter_samples, self.symbol_period, self.sample_rate)
+
+        val_t = np.convolve(symbol_impulse, self.h_filter)
+        t = np.linspace(start=0, stop=val_t.shape[0] * self.sample_period, num=val_t.shape[0])
 
         return t, val_t
 
     def __time_to_frequency(self, values):
         val_f = fft(values)
-        f = np.linspace(0.0, 1 / (2 * self.sample_period), (values.size // 2))
-        f_neg = -1 * np.flip(f)
-        f = np.concatenate((f, f_neg), axis=0)
+        f = fftfreq(values.shape[-1])*self.sample_rate
         return f, val_f
 
     def __frequency_to_time(self, values):
@@ -106,22 +125,48 @@ class OpticalChannel(defs.Channel):
         # Photodiode Detection
         t, val_t = self.__photodiode_detection(val_t)
 
+        # Symbol Decisions
+        idx = np.arange(self.filter_samples/2, t.shape[0] - (self.filter_samples/2),
+                        self.symbol_period/self.sample_period, dtype='int64')
+        t_descision = self.sample_period * idx
+
         if self.show_graphs:
             plt.plot(t, val_t)
             plt.title('time domain (post-detection)')
             plt.show()
 
-        return t, val_t
+            plt.plot(t, val_t)
+            for xc in t_descision:
+                plt.axvline(x=xc, color='r')
+            plt.title('time domain (post-detection with decision times)')
+            plt.show()
+
+        # TODO: Implement Frequency/Phase Modulation
+
+        out = np.zeros(values.shape)
+
+        out[:, 0] = val_t[idx]
+        out[:, 1] = values[:, 1]
+        out[:, 2] = values[:, 2]
+
+        if self.sqrt_out:
+            out[:, 0] = np.sqrt(out[:, 0])
+
+        return out
 
 
 if __name__ == '__main__':
     # Simple OOK modulation
-    num_of_symbols = 10
+    num_of_symbols = 100
     symbol_vals = np.zeros((num_of_symbols, 3))
 
     symbol_vals[:, 0] = np.random.randint(2, size=symbol_vals.shape[0])
-    symbol_vals[:, 2] = 10e6
+    symbol_vals[:, 2] = 40e9
+
+    channel = OpticalChannel(noise_level=-10, dispersion=-21.7, symbol_rate=10e9,
+                             sample_rate=400e9, length=100, pulse_shape='rcos', show_graphs=True)
+    v = channel.forward(symbol_vals)
 
-    channel = OpticalChannel(noise_level=-20, dispersion=-21.7, symbol_rate=100e3,
-                             sample_rate=500e6, length=100, show_graphs=True)
-    time, v = channel.forward(symbol_vals)
+    rx = (v > 0.5).astype(int)
+    tru = np.sum(rx == symbol_vals[:, 0].astype(int))
+    print("Accuracy: {}".format(tru/num_of_symbols))