import defs import numpy as np import math import misc from scipy.spatial import cKDTree from os import path import tensorflow as tf ALPHABET_DIR = "./alphabets" def load_alphabet(name, polar=True): apath = path.join(ALPHABET_DIR, name + '.a') if not path.exists(apath): raise ValueError(f"Alphabet '{name}' was not found in {path.abspath(apath)}") data = [] indexes = [] with open(apath, 'r') as f: header = f.readline().lower() if 'd' not in header and 'r' not in header: raise ValueError(f"Alphabet {name} header does not specify valid format") for i, row in enumerate(f.readlines()): row = row.strip() if len(row) == 0: continue cols = row.split(',') try: if len(cols) == 3: base = 2 if 'i' in header: base = 10 indexes.append(int(cols[0], base)) x = float(cols[1]) y = float(cols[2]) elif len(cols) == 2: indexes.append(i) x = float(cols[0]) y = float(cols[1]) else: raise ValueError() if 'd' in header: p = y * math.pi / 180 y = math.sin(p) * x x = math.cos(p) * x data.append((x, y)) except ValueError: raise ValueError(f"Alphabet {name} line {i + 1}: '{row}' has invalid values") data2 = [None] * len(data) for i, d in enumerate(data): data2[indexes[i]] = d arr = np.array(data2, dtype=float) if polar: arr = misc.rect2polar(arr) return arr class RFSignal(defs.Signal): def __init__(self, array: np.ndarray): self.amplitude = array[:, 0] self.phase = array[:, 1] self.frequency = array[:, 2] self.symbols = array.shape[0] @property def rect(self) -> np.ndarray: return misc.polar2rect(np.c_[self.amplitude, self.phase]) def set_rect_xy(self, x_mat: np.ndarray, y_mat: np.ndarray): self.set_rect(np.c_[x_mat, y_mat]) def set_rect(self, mat: np.ndarray): polar = misc.rect2polar(mat) self.amplitude = polar[:, 0] self.phase = polar[:, 1] @property def apf(self): return np.c_[self.amplitude, self.phase, self.frequency] class BypassChannel(defs.Channel): def forward(self, values): return values class AWGNChannel(defs.Channel): def __init__(self, noise_level, **kwargs): """ :param noise_level: in dB """ super().__init__(**kwargs) self.noise = 10 ** (noise_level / 10) def forward(self, values: RFSignal) -> RFSignal: values.set_rect_xy( values.rect_x + np.random.normal(0, 1, values.symbols) * self.noise, values.rect_y + np.random.normal(0, 1, values.symbols) * self.noise, ) return values def forward_tensor(self, tensor: tf.Tensor) -> tf.Tensor: noise = tf.random.normal([2], mean=0.0, stddev=1.0, dtype=tf.dtypes.float32, seed=None, name=None) tensor += noise * self.noise return tensor class BPSKMod(defs.Modulator): def __init__(self, carrier_f, **kwargs): super().__init__(2, **kwargs) self.f = carrier_f def forward(self, binary): a = np.ones(binary.shape[0]) p = np.zeros(binary.shape[0]) p[binary == True] = np.pi f = np.zeros(binary.shape[0]) + self.f return RFSignal(np.c_[a, p, f]) class BPSKDemod(defs.Demodulator): def __init__(self, carrier_f, bandwidth, **kwargs): """ :param carrier_f: Carrier frequency :param bandwidth: demodulator bandwidth """ super().__init__(2, **kwargs) self.upper_f = carrier_f + bandwidth / 2 self.lower_f = carrier_f - bandwidth / 2 def forward(self, values): # TODO: Channel noise simulator for frequency component? # for now we only care about amplitude and phase # ap = np.delete(values, 2, 1) # ap = misc.polar2rect(ap) result = np.ones(values.symbols, dtype=bool) result[values.rect_x[:, 0] > 0] = False return result class AlphabetMod(defs.Modulator): def __init__(self, modulation, carrier_f): # if N < 2: # raise ValueError("M-ary modulator N value has to be larger than 1") self.alphabet = load_alphabet(modulation) super().__init__(self.alphabet.shape[0]) self.f = carrier_f self.mult_mat = np.array([2 ** i for i in range(self.N)]) def forward(self, binary): if binary.shape[0] % self.N > 0: to_add = self.N - binary.shape[0] % self.N binary = np.concatenate((binary, np.zeros(to_add, bool))) reshaped = binary.reshape((binary.shape[0] // self.N, self.N)) indices = np.matmul(reshaped, self.mult_mat) values = self.alphabet[indices, :] a = values[:, 0] p = values[:, 1] f = np.zeros(reshaped.shape[0]) + self.f return RFSignal(np.c_[a, p, f]) # , indices class AlphabetDemod(defs.Demodulator): def __init__(self, modulation, carrier_f): # if N < 2: # raise ValueError("M-ary modulator N value has to be larger than 1") self.alphabet = load_alphabet(modulation, polar=False) super().__init__(self.alphabet.shape[0]) self.f = carrier_f # self.alphabet = _gen_mary_alphabet(self.alphabet_size, gray=gray, polar=False) self.ktree = cKDTree(self.alphabet) def forward(self, binary): # binary = binary[:, :2] # ignore frequency # rbin = misc.polar2rect(binary) indices = self.ktree.query(binary.rect)[1] # Converting indices to bite array # FIXME: unpackbits requires 8bit inputs, thus largest demodulation is 256-QAM values = np.unpackbits(np.array([indices], dtype=np.uint8).T, bitorder='little', axis=1) return values[:, :self.N].reshape((-1,)).astype(bool) # , indices