import defs import numpy as np import math import misc from scipy.spatial import cKDTree from os import path ALPHABET_DIR = "./alphabets" # def _make_gray(n): # if n <= 0: # return [] # arr = ['0', '1'] # i = 2 # while True: # if i >= 1 << n: # break # for j in range(i - 1, -1, -1): # arr.append(arr[j]) # for j in range(i): # arr[j] = "0" + arr[j] # for j in range(i, 2 * i): # arr[j] = "1" + arr[j] # i = i << 1 # return list(map(lambda x: int(x, 2), arr)) # # # def _gen_mary_alphabet(size, gray=True, polar=True): # alphabet = np.zeros((size, 2)) # N = math.ceil(math.sqrt(size)) # # # if sqrt(size) != size^2 (not a perfect square), # # skip defines how many corners to cut off. # skip = 0 # if N ** 2 > size: # skip = int(math.sqrt((N ** 2 - size) // 4)) # # step = 2 / (N - 1) # skipped = 0 # for x in range(N): # for y in range(N): # i = x * N + y - skipped # if i >= size: # break # # Reverse y every odd column # if x % 2 == 0 and N < 4: # y = N - y - 1 # if skip > 0: # if (x < skip or x + 1 > N - skip) and \ # (y < skip or y + 1 > N - skip): # skipped += 1 # continue # # Exception for 3-ary alphabet, skip centre point # if size == 8 and x == 1 and y == 1: # skipped += 1 # continue # alphabet[i, :] = [step * x - 1, step * y - 1] # if gray: # shape = alphabet.shape # d1 = 4 if N > 4 else 2 ** N // 4 # g1 = np.array([0, 1, 3, 2]) # g2 = g1[:d1] # hypershape = (d1, 4, 2) # if N > 4: # hypercube = alphabet.reshape(hypershape + (N-4, )) # hypercube = hypercube[:, g1, :, :][g2, :, :, :] # else: # hypercube = alphabet.reshape(hypershape) # hypercube = hypercube[:, g1, :][g2, :, :] # alphabet = hypercube.reshape(shape) # if polar: # alphabet = misc.rect2polar(alphabet) # return alphabet def load_alphabet(name, polar=True): apath = path.join(ALPHABET_DIR, name + '.a') if not path.exists(apath): raise ValueError(f"Alphabet '{name}' does not exist") data = [] indexes = [] with open(apath, 'r') as f: header = f.readline().lower() if 'd' not in header and 'r' not in header: raise ValueError(f"Alphabet {name} header does not specify valid format") for i, row in enumerate(f.readlines()): row = row.strip() if len(row) == 0: continue cols = row.split(',') try: if len(cols) == 3: base = 2 if 'i' in header: base = 10 indexes.append(int(cols[0], base)) x = float(cols[1]) y = float(cols[2]) elif len(cols) == 2: indexes.append(i) x = float(cols[0]) y = float(cols[1]) else: raise ValueError() if 'd' in header: p = y*math.pi/180 y = math.sin(p) * x x = math.cos(p) * x data.append((x, y)) except ValueError: raise ValueError(f"Alphabet {name} line {i+1}: '{row}' has invalid values") data2 = [None] * len(data) for i, d in enumerate(data): data2[indexes[i]] = d arr = np.array(data2, dtype=float) if polar: arr = misc.rect2polar(arr) return arr class BypassChannel(defs.Channel): def forward(self, values): return values class AWGNChannel(defs.Channel): def __init__(self, noise_level, **kwargs): """ :param noise_level: in dB """ super().__init__(**kwargs) self.noise = 10 ** (noise_level / 10) def forward(self, values): a = np.random.normal(0, 1, values.shape[0]) * self.noise p = np.random.normal(0, 1, values.shape[0]) * self.noise f = np.zeros(values.shape[0]) noise_mat = np.c_[a, p, f] return values + noise_mat class BPSKMod(defs.Modulator): def __init__(self, carrier_f, **kwargs): super().__init__(2, **kwargs) self.f = carrier_f def forward(self, binary: np.ndarray): a = np.ones(binary.shape[0]) p = np.zeros(binary.shape[0]) p[binary == True] = np.pi f = np.zeros(binary.shape[0]) + self.f return np.c_[a, p, f] class BPSKDemod(defs.Demodulator): def __init__(self, carrier_f, bandwidth, **kwargs): """ :param carrier_f: Carrier frequency :param bandwidth: demodulator bandwidth """ super().__init__(2, **kwargs) self.upper_f = carrier_f + bandwidth / 2 self.lower_f = carrier_f - bandwidth / 2 def forward(self, values): # TODO: Channel noise simulator for frequency component? # for now we only care about amplitude and phase ap = np.delete(values, 2, 1) ap = misc.polar2rect(ap) result = np.ones(values.shape[0], dtype=bool) result[ap[:, 0] > 0] = False return result class AlphabetMod(defs.Modulator): def __init__(self, modulation, carrier_f): # if N < 2: # raise ValueError("M-ary modulator N value has to be larger than 1") self.alphabet = load_alphabet(modulation) super().__init__(self.alphabet.shape[0]) self.f = carrier_f self.mult_mat = np.array([2 ** i for i in range(self.N)]) def forward(self, binary): if binary.shape[0] % self.N > 0: to_add = self.N - binary.shape[0] % self.N binary = np.concatenate((binary, np.zeros(to_add, bool))) reshaped = binary.reshape((binary.shape[0] // self.N, self.N)) indices = np.matmul(reshaped, self.mult_mat) values = self.alphabet[indices, :] a = values[:, 0] p = values[:, 1] f = np.zeros(reshaped.shape[0]) + self.f return np.c_[a, p, f] #, indices class AlphabetDemod(defs.Demodulator): def __init__(self, modulation, carrier_f): # if N < 2: # raise ValueError("M-ary modulator N value has to be larger than 1") self.alphabet = load_alphabet(modulation, polar=False) super().__init__(self.alphabet.shape[0]) self.f = carrier_f # self.alphabet = _gen_mary_alphabet(self.alphabet_size, gray=gray, polar=False) self.ktree = cKDTree(self.alphabet) def forward(self, binary): binary = binary[:, :2] # ignore frequency rbin = misc.polar2rect(binary) indices = self.ktree.query(rbin)[1] # Converting indices to bite array # FIXME: unpackbits requires 8bit inputs, thus largest demodulation is 256-QAM values = np.unpackbits(np.array([indices], dtype=np.uint8).T, bitorder='little', axis=1) return values[:, :self.N].reshape((-1,)).astype(bool) #, indices