Min 5 jaren geleden
commit
e71f9723b5
7 gewijzigde bestanden met toevoegingen van 396 en 0 verwijderingen
  1. 4 0
      .gitignore
  2. 48 0
      defs.py
  3. 58 0
      main.py
  4. 70 0
      misc.py
  5. 21 0
      models/ann.py
  6. 173 0
      models/basic.py
  7. 22 0
      models/knn.py

+ 4 - 0
.gitignore

@@ -0,0 +1,4 @@
+__pycache__
+.idea
+*.pyc
+*.pyo

+ 48 - 0
defs.py

@@ -0,0 +1,48 @@
+import math
+import numpy as np
+
+
+class COMComponent:
+    def __init__(self, epoch_size=100):
+        self._epoch = epoch_size
+
+
+class Channel(COMComponent):
+    """
+    Communication base module containing all model structure.
+    This model is just empty therefore just bypasses any input to output
+    """
+
+    def forward(self, values: np.ndarray) -> np.ndarray:
+        """
+        :param values: value generator, each iteration returns tuple of (amplitude, phase, frequency)
+        :return: affected tuple of (amplitude, phase, frequency)
+        """
+        raise NotImplemented("Need to define forward function")
+
+
+class ModComponent(COMComponent):
+    def __init__(self, alphabet_size, **kwargs):
+        super().__init__(**kwargs)
+        self.N = math.ceil(math.log2(alphabet_size))
+        self.alphabet_size = alphabet_size
+
+
+class Modulator(ModComponent):
+
+    def forward(self, binary: np.ndarray) -> np.ndarray:
+        """
+        :param binary: raw bytes as input (most be dtype=bool)
+        :return: amplitude, phase, frequency
+        """
+        raise NotImplemented("Need to define forward function")
+
+
+class Demodulator(ModComponent):
+
+    def forward(self, values: np.ndarray) -> np.ndarray:
+        """
+        :param values: value generator, each iteration returns tuple of (amplitude, phase, frequency)
+        :return: binary resulting values (dtype=bool)
+        """
+        raise NotImplemented("Need to define forward function")

+ 58 - 0
main.py

@@ -0,0 +1,58 @@
+import matplotlib.pyplot as plt
+
+import numpy as np
+from sklearn.metrics import accuracy_score
+from models import basic
+from models.basic import AWGNChannel, BPSKDemod, BPSKMod, BypassChannel, MaryMod, MaryDemod
+import misc
+
+def show_constellation(mod, chan, demod, samples=1000):
+    x = misc.generate_random_bit_array(samples)
+    x_mod = mod.forward(x)
+    x_chan = chan.forward(x_mod)
+    x_demod = demod.forward(x_chan)
+
+    x_mod_rect = misc.polar2rect(x_mod)
+    x_chan_rect = misc.polar2rect(x_chan)
+    plt.plot(x_chan_rect[:, 0][x], x_chan_rect[:, 1][x], '+')
+    plt.plot(x_chan_rect[:, 0][~x], x_chan_rect[:, 1][~x], '+')
+    plt.plot(x_mod_rect[:, 0], x_mod_rect[:, 1], 'ro')
+    axes = plt.gca()
+    axes.set_xlim([-2, +2])
+    axes.set_ylim([-2, +2])
+    plt.grid()
+    plt.show()
+    print('Accuracy : ' + str())
+
+
+def get_ber(mod, chan, demod, samples=1000):
+    x = misc.generate_random_bit_array(samples)
+    x_mod = mod.forward(x)
+    x_chan = chan.forward(x_mod)
+    x_demod = demod.forward(x_chan)
+    return 1 - accuracy_score(x, x_demod)
+
+
+def get_AWGN_ber(mod, demod, samples=1000, start=-8, stop=5, steps=30):
+    ber_x = np.linspace(start, stop, steps)
+    ber_y = []
+    for noise in ber_x:
+        ber_y.append(get_ber(mod, AWGNChannel(noise), demod, samples=samples))
+    return ber_x, ber_y
+
+
+if __name__ == '__main__':
+
+    plt.plot(*get_AWGN_ber(MaryMod(6, 10e6, gray=True), MaryDemod(6, 10e6), samples=12000, start=-15), '-', label='64-QAM')
+    plt.plot(*get_AWGN_ber(MaryMod(5, 10e6, gray=True), MaryDemod(5, 10e6), samples=12000, start=-15), '-', label='32-QAM')
+    plt.plot(*get_AWGN_ber(MaryMod(4, 10e6, gray=True), MaryDemod(4, 10e6), samples=12000, start=-15), '-', label='16-QAM')
+    plt.plot(*get_AWGN_ber(BPSKMod(10e6), BPSKDemod(10e6, 10e3), samples=12000), '-', label='BPSK')
+    plt.yscale('log')
+    plt.gca().invert_xaxis()
+    plt.grid()
+    plt.xlabel('Noise dB')
+    plt.ylabel('BER')
+    plt.legend()
+    plt.show()
+
+    pass

+ 70 - 0
misc.py

@@ -0,0 +1,70 @@
+import numpy as np
+import math
+import matplotlib.pyplot as plt
+
+
+def display_alphabet(alphabet, values=None, a_vals=False, title="Alphabet constellation diagram"):
+    rect = polar2rect(alphabet)
+    if values is not None:
+        rect2 = polar2rect(values)
+        plt.plot(rect2[:, 0], rect2[:, 1], 'r.')
+    plt.plot(rect[:, 0], rect[:, 1], 'b.', markersize=12)
+    plt.title(title)
+    N = math.ceil(math.log2(len(alphabet)))
+    if a_vals:
+        for i, value in enumerate(rect):
+            plt.annotate(xy=value+[0.01, 0.01], s=format(i, f'0{N}b'))
+    plt.xlabel('Real')
+    plt.ylabel('Imaginary')
+    plt.grid()
+    plt.show()
+
+
+def polar2rect(array, amp_column=0, phase_column=1) -> np.ndarray:
+    """
+    Return copy of array with amp_column and phase_column as polar coordinates replaced by rectangular coordinates
+    """
+    if len(array) == 2 or len(array.shape) == 1:
+        array = np.array([array])
+
+    if array.shape[1] < 2:
+        raise ValueError('Matrix has less than two columns')
+
+    result = array.copy()
+    result[:, amp_column] = np.cos(array[:, phase_column]) * array[:, amp_column]
+    result[:, phase_column] = np.sin(array[:, phase_column]) * array[:, amp_column]
+    return result
+
+
+def rect2polar(array, x_column=0, y_column=1) -> np.ndarray:
+    """
+    Return copy of array with x_column and y_column as rectangular coordinates replaced by polar coordinates
+    """
+    if len(array) == 2 or len(array.shape) == 1:
+        array = np.array([array])
+
+    if array.shape[1] < 2:
+        raise ValueError('Matrix has less than two columns')
+
+    x_arr = array[:, x_column]
+    y_arr = array[:, y_column]
+    result = array.copy()
+    result[:, x_column] = np.sqrt(x_arr**2 + y_arr**2)
+    result[x_arr != 0, y_column] = np.arctan(y_arr[x_arr != 0, ] / x_arr[x_arr != 0, ])
+    result[np.bitwise_and(x_arr == 0, y_arr < 0), y_column] = -np.pi / 2
+    result[np.bitwise_and(x_arr == 0, y_arr == 0), y_column] = 0
+    result[np.bitwise_and(x_arr == 0, y_arr > 0), y_column] = np.pi / 2
+    result[np.bitwise_and(x_arr < 0, y_arr < 0), y_column] -= np.pi
+    result[np.bitwise_and(x_arr < 0, y_arr >= 0), y_column] += np.pi
+    return result
+
+
+def generate_random_bit_array(size):
+    z, o = np.zeros(int(size // 2), dtype=bool), np.ones(int(size // 2), dtype=bool)
+    if size % 2 == 1:
+        p = (z, o, [np.random.choice([True, False])])
+    else:
+        p = (z, o)
+    arr = np.concatenate(p)
+    np.random.shuffle(arr)
+    return arr

+ 21 - 0
models/ann.py

@@ -0,0 +1,21 @@
+import tensorflow as tf
+
+print("# GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))
+
+
+def create_modulation_model(nary):
+    model = tf.keras.models.Sequential()
+    model.add(tf.keras.layers.Dense(units=32, activation='relu', input_shape=(nary,), dtype='bool'))
+    model.add(tf.keras.layers.Dropout(0.2))
+    model.add(tf.keras.layers.Dense(units=2, activation='softmax'))
+    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['sparse_categorical_accuracy'])
+    return model
+
+
+def create_demodulation_model(nary):
+    model = tf.keras.models.Sequential()
+    model.add(tf.keras.layers.Dense(units=32, activation='relu', input_shape=(2,)))
+    model.add(tf.keras.layers.Dropout(0.2))
+    model.add(tf.keras.layers.Dense(units=nary, activation='softmax'))
+    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['sparse_categorical_accuracy'])
+    return model

+ 173 - 0
models/basic.py

@@ -0,0 +1,173 @@
+import defs
+import numpy as np
+import math
+import misc
+from scipy.spatial import cKDTree
+
+def _make_gray(n):
+    if n <= 0:
+        return []
+    arr = ['0', '1']
+    i = 2
+    while True:
+        if i >= 1 << n:
+            break
+        for j in range(i - 1, -1, -1):
+            arr.append(arr[j])
+        for j in range(i):
+            arr[j] = "0" + arr[j]
+        for j in range(i, 2 * i):
+            arr[j] = "1" + arr[j]
+        i = i << 1
+    return list(map(lambda x: int(x, 2), arr))
+
+
+def _gen_mary_alphabet(size, gray=True, polar=True):
+    alphabet = np.zeros((size, 2))
+    N = math.ceil(math.sqrt(size))
+
+    # if sqrt(size) != size^2 (not a perfect square),
+    # skip defines how many corners to cut off.
+    skip = 0
+    if N ** 2 > size:
+        skip = int(math.sqrt((N ** 2 - size) // 4))
+
+    step = 2 / (N - 1)
+    skipped = 0
+    for x in range(N):
+        for y in range(N):
+            i = x * N + y - skipped
+            if i >= size:
+                break
+            # Reverse y every odd column
+            if x % 2 == 0 and N < 4:
+                y = N - y - 1
+            if skip > 0:
+                if (x < skip or x + 1 > N - skip) and \
+                        (y < skip or y + 1 > N - skip):
+                    skipped += 1
+                    continue
+            # Exception for 3-ary alphabet, skip centre point
+            if size == 8 and x == 1 and y == 1:
+                skipped += 1
+                continue
+            alphabet[i, :] = [step * x - 1, step * y - 1]
+    if gray:
+        shape = alphabet.shape
+        d1 = 4 if N > 4 else 2 ** N // 4
+        g1 = np.array([0, 1, 3, 2])
+        g2 = g1[:d1]
+        hypershape = (d1, 4, 2)
+        if N > 4:
+            hypercube = alphabet.reshape(hypershape + (N-4, ))
+            hypercube = hypercube[:, g1, :, :][g2, :, :, :]
+        else:
+            hypercube = alphabet.reshape(hypershape)
+            hypercube = hypercube[:, g1, :][g2, :, :]
+        alphabet = hypercube.reshape(shape)
+    if polar:
+        alphabet = misc.rect2polar(alphabet)
+    return alphabet
+
+
+class BypassChannel(defs.Channel):
+    def forward(self, values):
+        return values
+
+
+class AWGNChannel(defs.Channel):
+    def __init__(self, noise_level, **kwargs):
+        """
+        :param noise_level: in dB
+        """
+        super().__init__(**kwargs)
+        self.noise = 10 ** (noise_level / 10)
+
+    def forward(self, values):
+        a = np.random.normal(0, 1, values.shape[0]) * self.noise
+        p = np.random.normal(0, 1, values.shape[0]) * self.noise
+        f = np.zeros(values.shape[0])
+        noise_mat = np.c_[a, p, f]
+        return values + noise_mat
+
+
+class BPSKMod(defs.Modulator):
+
+    def __init__(self, carrier_f, **kwargs):
+        super().__init__(2, **kwargs)
+        self.f = carrier_f
+
+    def forward(self, binary: np.ndarray):
+        a = np.ones(binary.shape[0])
+        p = np.zeros(binary.shape[0])
+        p[binary == True] = np.pi
+        f = np.zeros(binary.shape[0]) + self.f
+        return np.c_[a, p, f]
+
+
+class BPSKDemod(defs.Demodulator):
+
+    def __init__(self, carrier_f, bandwidth, **kwargs):
+        """
+        :param carrier_f: Carrier frequency
+        :param bandwidth: demodulator bandwidth
+        """
+        super().__init__(2, **kwargs)
+        self.upper_f = carrier_f + bandwidth / 2
+        self.lower_f = carrier_f - bandwidth / 2
+
+    def forward(self, values):
+        # TODO: Channel noise simulator for frequency component?
+        # for now we only care about amplitude and phase
+        ap = np.delete(values, 2, 1)
+        ap = misc.polar2rect(ap)
+
+        result = np.ones(values.shape[0], dtype=bool)
+        result[ap[:, 0] > 0] = False
+        return result
+
+
+class MaryMod(defs.Modulator):
+
+    def __init__(self, N, carrier_f, gray=True):
+        if N < 2:
+            raise ValueError("M-ary modulator N value has to be larger than 1")
+        super().__init__(2 ** N)
+        self.f = carrier_f
+        self.alphabet = _gen_mary_alphabet(self.alphabet_size, gray)
+        self.mult_mat = np.array([2 ** i for i in range(self.N)])
+
+    def forward(self, binary):
+        if binary.shape[0] % self.N > 0:
+            to_add = self.N - binary.shape[0] % self.N
+            binary = np.concatenate((binary, np.zeros(to_add, bool)))
+        reshaped = binary.reshape((binary.shape[0] // self.N, self.N))
+        indices = np.matmul(reshaped, self.mult_mat)
+        values = self.alphabet[indices, :]
+
+        a = values[:, 0]
+        p = values[:, 1]
+        f = np.zeros(reshaped.shape[0]) + self.f
+        return np.c_[a, p, f]  #, indices
+
+
+class MaryDemod(defs.Demodulator):
+
+    def __init__(self, N, carrier_f, gray=True):
+        if N < 2:
+            raise ValueError("M-ary modulator N value has to be larger than 1")
+        super().__init__(2 ** N)
+        self.f = carrier_f
+        self.N = N
+        self.alphabet = _gen_mary_alphabet(self.alphabet_size, gray=gray, polar=False)
+        self.ktree = cKDTree(self.alphabet)
+
+    def forward(self, binary):
+        binary = binary[:, :2]  # ignore frequency
+        rbin = misc.polar2rect(binary)
+        indices = self.ktree.query(rbin)[1]
+
+        # Converting indices to bite array
+        # FIXME: unpackbits requires 8bit inputs, thus largest demodulation is 256-QAM
+        values = np.unpackbits(np.array([indices], dtype=np.uint8).T, bitorder='little', axis=1)
+        return values[:, :self.N].reshape((-1,)).astype(bool)  #, indices

+ 22 - 0
models/knn.py

@@ -0,0 +1,22 @@
+import pandas as pd
+import seaborn as sns
+import matplotlib.pyplot as plt
+import numpy as np
+from sklearn.preprocessing import StandardScaler
+from sklearn.neighbors import KNeighborsClassifier
+
+
+class Model:
+
+    def __init__(self):
+        scaler = StandardScaler()
+
+    def train(self, X_train, Y_train, X_test, Y_test, n_range=40):
+        error_rate = []
+        # Might take some time
+        for i in range(1, n_range):
+            knn = KNeighborsClassifier(n_neighbors=i)
+            knn.fit(X_train, Y_train)
+            pred_i = knn.predict(X_test)
+            error_rate.append(np.mean(pred_i != Y_test))
+