basic.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. import defs
  2. import numpy as np
  3. import math
  4. import misc
  5. from scipy.spatial import cKDTree
  6. from os import path
  7. import tensorflow as tf
  8. ALPHABET_DIR = path.join(path.dirname(__file__), "../alphabets")
  9. def load_alphabet(name, polar=True):
  10. apath = path.join(ALPHABET_DIR, name + '.a')
  11. if not path.exists(apath):
  12. raise ValueError(f"Alphabet '{name}' was not found in {path.abspath(apath)}")
  13. data = []
  14. indexes = []
  15. with open(apath, 'r') as f:
  16. header = f.readline().lower()
  17. if 'd' not in header and 'r' not in header:
  18. raise ValueError(f"Alphabet {name} header does not specify valid format")
  19. for i, row in enumerate(f.readlines()):
  20. row = row.strip()
  21. if len(row) == 0:
  22. continue
  23. cols = row.split(',')
  24. try:
  25. if len(cols) == 3:
  26. base = 2
  27. if 'i' in header:
  28. base = 10
  29. indexes.append(int(cols[0], base))
  30. x = float(cols[1])
  31. y = float(cols[2])
  32. elif len(cols) == 2:
  33. indexes.append(i)
  34. x = float(cols[0])
  35. y = float(cols[1])
  36. else:
  37. raise ValueError()
  38. if 'd' in header:
  39. p = y * math.pi / 180
  40. y = math.sin(p) * x
  41. x = math.cos(p) * x
  42. data.append((x, y))
  43. except ValueError:
  44. raise ValueError(f"Alphabet {name} line {i + 1}: '{row}' has invalid values")
  45. data2 = [None] * len(data)
  46. for i, d in enumerate(data):
  47. data2[indexes[i]] = d
  48. arr = np.array(data2, dtype=float)
  49. if polar:
  50. arr = misc.rect2polar(arr)
  51. return arr
  52. class RFSignal(defs.Signal):
  53. def __init__(self, array: np.ndarray):
  54. self.amplitude = array[:, 0]
  55. self.phase = array[:, 1]
  56. self.frequency = array[:, 2]
  57. self.symbols = array.shape[0]
  58. @property
  59. def rect(self) -> np.ndarray:
  60. return misc.polar2rect(np.c_[self.amplitude, self.phase])
  61. def set_rect_xy(self, x_mat: np.ndarray, y_mat: np.ndarray):
  62. self.set_rect(np.c_[x_mat, y_mat])
  63. def set_rect(self, mat: np.ndarray):
  64. polar = misc.rect2polar(mat)
  65. self.amplitude = polar[:, 0]
  66. self.phase = polar[:, 1]
  67. @property
  68. def apf(self):
  69. return np.c_[self.amplitude, self.phase, self.frequency]
  70. class BypassChannel(defs.Channel):
  71. def forward(self, values):
  72. return values
  73. class AWGNChannel(defs.Channel):
  74. def __init__(self, noise_level, **kwargs):
  75. """
  76. :param noise_level: in dB
  77. """
  78. super().__init__(**kwargs)
  79. self.noise = 10 ** (noise_level / 10)
  80. def forward(self, values: RFSignal) -> RFSignal:
  81. values.set_rect_xy(
  82. values.rect_x + np.random.normal(0, 1, values.symbols) * self.noise,
  83. values.rect_y + np.random.normal(0, 1, values.symbols) * self.noise,
  84. )
  85. return values
  86. def forward_tensor(self, tensor: tf.Tensor) -> tf.Tensor:
  87. noise = tf.random.normal([2], mean=0.0, stddev=1.0, dtype=tf.dtypes.float32, seed=None, name=None)
  88. tensor += noise * self.noise
  89. return tensor
  90. class BPSKMod(defs.Modulator):
  91. def __init__(self, carrier_f, **kwargs):
  92. super().__init__(2, **kwargs)
  93. self.f = carrier_f
  94. def forward(self, binary):
  95. a = np.ones(binary.shape[0])
  96. p = np.zeros(binary.shape[0])
  97. p[binary == True] = np.pi
  98. f = np.zeros(binary.shape[0]) + self.f
  99. return RFSignal(np.c_[a, p, f])
  100. class BPSKDemod(defs.Demodulator):
  101. def __init__(self, carrier_f, bandwidth, **kwargs):
  102. """
  103. :param carrier_f: Carrier frequency
  104. :param bandwidth: demodulator bandwidth
  105. """
  106. super().__init__(2, **kwargs)
  107. self.upper_f = carrier_f + bandwidth / 2
  108. self.lower_f = carrier_f - bandwidth / 2
  109. def forward(self, values):
  110. # TODO: Channel noise simulator for frequency component?
  111. # for now we only care about amplitude and phase
  112. # ap = np.delete(values, 2, 1)
  113. # ap = misc.polar2rect(ap)
  114. result = np.ones(values.symbols, dtype=bool)
  115. result[values.rect_x[:, 0] > 0] = False
  116. return result
  117. class AlphabetMod(defs.Modulator):
  118. def __init__(self, modulation, carrier_f):
  119. # if N < 2:
  120. # raise ValueError("M-ary modulator N value has to be larger than 1")
  121. self.alphabet = load_alphabet(modulation)
  122. super().__init__(self.alphabet.shape[0])
  123. self.f = carrier_f
  124. self.mult_mat = np.array([2 ** i for i in range(self.N)])
  125. def forward(self, binary):
  126. if binary.shape[0] % self.N > 0:
  127. to_add = self.N - binary.shape[0] % self.N
  128. binary = np.concatenate((binary, np.zeros(to_add, bool)))
  129. reshaped = binary.reshape((binary.shape[0] // self.N, self.N))
  130. indices = np.matmul(reshaped, self.mult_mat)
  131. values = self.alphabet[indices, :]
  132. a = values[:, 0]
  133. p = values[:, 1]
  134. f = np.zeros(reshaped.shape[0]) + self.f
  135. return RFSignal(np.c_[a, p, f]) # , indices
  136. class AlphabetDemod(defs.Demodulator):
  137. def __init__(self, modulation, carrier_f):
  138. # if N < 2:
  139. # raise ValueError("M-ary modulator N value has to be larger than 1")
  140. self.alphabet = load_alphabet(modulation, polar=False)
  141. super().__init__(self.alphabet.shape[0])
  142. self.f = carrier_f
  143. # self.alphabet = _gen_mary_alphabet(self.alphabet_size, gray=gray, polar=False)
  144. self.ktree = cKDTree(self.alphabet)
  145. def forward(self, binary):
  146. # binary = binary[:, :2] # ignore frequency
  147. # rbin = misc.polar2rect(binary)
  148. indices = self.ktree.query(binary.rect)[1]
  149. # Converting indices to bite array
  150. # FIXME: unpackbits requires 8bit inputs, thus largest demodulation is 256-QAM
  151. values = np.unpackbits(np.array([indices], dtype=np.uint8).T, bitorder='little', axis=1)
  152. return values[:, :self.N].reshape((-1,)).astype(bool) # , indices