| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- import defs
- import numpy as np
- import math
- import misc
- from scipy.spatial import cKDTree
- from os import path
- import tensorflow as tf
- ALPHABET_DIR = "./alphabets"
- def load_alphabet(name, polar=True):
- apath = path.join(ALPHABET_DIR, name + '.a')
- if not path.exists(apath):
- raise ValueError(f"Alphabet '{name}' was not found in {path.abspath(apath)}")
- data = []
- indexes = []
- with open(apath, 'r') as f:
- header = f.readline().lower()
- if 'd' not in header and 'r' not in header:
- raise ValueError(f"Alphabet {name} header does not specify valid format")
- for i, row in enumerate(f.readlines()):
- row = row.strip()
- if len(row) == 0:
- continue
- cols = row.split(',')
- try:
- if len(cols) == 3:
- base = 2
- if 'i' in header:
- base = 10
- indexes.append(int(cols[0], base))
- x = float(cols[1])
- y = float(cols[2])
- elif len(cols) == 2:
- indexes.append(i)
- x = float(cols[0])
- y = float(cols[1])
- else:
- raise ValueError()
- if 'd' in header:
- p = y * math.pi / 180
- y = math.sin(p) * x
- x = math.cos(p) * x
- data.append((x, y))
- except ValueError:
- raise ValueError(f"Alphabet {name} line {i + 1}: '{row}' has invalid values")
- data2 = [None] * len(data)
- for i, d in enumerate(data):
- data2[indexes[i]] = d
- arr = np.array(data2, dtype=float)
- if polar:
- arr = misc.rect2polar(arr)
- return arr
- class RFSignal(defs.Signal):
- def __init__(self, array: np.ndarray):
- self.amplitude = array[:, 0]
- self.phase = array[:, 1]
- self.frequency = array[:, 2]
- self.symbols = array.shape[0]
- @property
- def rect(self) -> np.ndarray:
- return misc.polar2rect(np.c_[self.amplitude, self.phase])
- def set_rect_xy(self, x_mat: np.ndarray, y_mat: np.ndarray):
- self.set_rect(np.c_[x_mat, y_mat])
- def set_rect(self, mat: np.ndarray):
- polar = misc.rect2polar(mat)
- self.amplitude = polar[:, 0]
- self.phase = polar[:, 1]
- @property
- def apf(self):
- return np.c_[self.amplitude, self.phase, self.frequency]
- class BypassChannel(defs.Channel):
- def forward(self, values):
- return values
- class AWGNChannel(defs.Channel):
- def __init__(self, noise_level, **kwargs):
- """
- :param noise_level: in dB
- """
- super().__init__(**kwargs)
- self.noise = 10 ** (noise_level / 10)
- def forward(self, values: RFSignal) -> RFSignal:
- values.set_rect_xy(
- values.rect_x + np.random.normal(0, 1, values.symbols) * self.noise,
- values.rect_y + np.random.normal(0, 1, values.symbols) * self.noise,
- )
- return values
- def forward_tensor(self, tensor: tf.Tensor) -> tf.Tensor:
- noise = tf.random.normal([2], mean=0.0, stddev=1.0, dtype=tf.dtypes.float32, seed=None, name=None)
- tensor += noise * self.noise
- return tensor
- class BPSKMod(defs.Modulator):
- def __init__(self, carrier_f, **kwargs):
- super().__init__(2, **kwargs)
- self.f = carrier_f
- def forward(self, binary):
- a = np.ones(binary.shape[0])
- p = np.zeros(binary.shape[0])
- p[binary == True] = np.pi
- f = np.zeros(binary.shape[0]) + self.f
- return RFSignal(np.c_[a, p, f])
- class BPSKDemod(defs.Demodulator):
- def __init__(self, carrier_f, bandwidth, **kwargs):
- """
- :param carrier_f: Carrier frequency
- :param bandwidth: demodulator bandwidth
- """
- super().__init__(2, **kwargs)
- self.upper_f = carrier_f + bandwidth / 2
- self.lower_f = carrier_f - bandwidth / 2
- def forward(self, values):
- # TODO: Channel noise simulator for frequency component?
- # for now we only care about amplitude and phase
- # ap = np.delete(values, 2, 1)
- # ap = misc.polar2rect(ap)
- result = np.ones(values.symbols, dtype=bool)
- result[values.rect_x[:, 0] > 0] = False
- return result
- class AlphabetMod(defs.Modulator):
- def __init__(self, modulation, carrier_f):
- # if N < 2:
- # raise ValueError("M-ary modulator N value has to be larger than 1")
- self.alphabet = load_alphabet(modulation)
- super().__init__(self.alphabet.shape[0])
- self.f = carrier_f
- self.mult_mat = np.array([2 ** i for i in range(self.N)])
- def forward(self, binary):
- if binary.shape[0] % self.N > 0:
- to_add = self.N - binary.shape[0] % self.N
- binary = np.concatenate((binary, np.zeros(to_add, bool)))
- reshaped = binary.reshape((binary.shape[0] // self.N, self.N))
- indices = np.matmul(reshaped, self.mult_mat)
- values = self.alphabet[indices, :]
- a = values[:, 0]
- p = values[:, 1]
- f = np.zeros(reshaped.shape[0]) + self.f
- return RFSignal(np.c_[a, p, f]) # , indices
- class AlphabetDemod(defs.Demodulator):
- def __init__(self, modulation, carrier_f):
- # if N < 2:
- # raise ValueError("M-ary modulator N value has to be larger than 1")
- self.alphabet = load_alphabet(modulation, polar=False)
- super().__init__(self.alphabet.shape[0])
- self.f = carrier_f
- # self.alphabet = _gen_mary_alphabet(self.alphabet_size, gray=gray, polar=False)
- self.ktree = cKDTree(self.alphabet)
- def forward(self, binary):
- # binary = binary[:, :2] # ignore frequency
- # rbin = misc.polar2rect(binary)
- indices = self.ktree.query(binary.rect)[1]
- # Converting indices to bite array
- # FIXME: unpackbits requires 8bit inputs, thus largest demodulation is 256-QAM
- values = np.unpackbits(np.array([indices], dtype=np.uint8).T, bitorder='little', axis=1)
- return values[:, :self.N].reshape((-1,)).astype(bool) # , indices
|