autoencoder.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. import matplotlib.pyplot as plt
  2. import numpy as np
  3. import tensorflow as tf
  4. from sklearn.metrics import accuracy_score
  5. from tensorflow.keras import layers, losses
  6. from tensorflow.keras.models import Model
  7. from tensorflow.python.keras.layers import LeakyReLU
  8. import misc
  9. import defs
  10. import os
  11. from models import basic
  12. latent_dim = 64
  13. print("# GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))
  14. class AutoencoderMod(defs.Modulator):
  15. def __init__(self, autoencoder):
  16. super().__init__(2**autoencoder.N)
  17. self.autoencoder = autoencoder
  18. def forward(self, binary: np.ndarray) -> np.ndarray:
  19. reshaped = binary.reshape((-1, self.N))
  20. reshaped_ho = misc.bit_matrix2one_hot(reshaped)
  21. encoded = self.autoencoder.encoder(reshaped_ho)
  22. x = encoded.numpy()
  23. x2 = x * 2 - 1
  24. f = np.zeros(x2.shape[0])
  25. x3 = misc.rect2polar(np.c_[x2[:, 0], x2[:, 1], f])
  26. return x3
  27. class AutoencoderDemod(defs.Demodulator):
  28. def __init__(self, autoencoder):
  29. super().__init__(2**autoencoder.N)
  30. self.autoencoder = autoencoder
  31. def forward(self, values: np.ndarray) -> np.ndarray:
  32. rect = misc.polar2rect(values[:, [0, 1]])
  33. decoded = self.autoencoder.decoder(rect).numpy()
  34. result = misc.int2bit_array(decoded.argmax(axis=1), self.N)
  35. return result.reshape(-1, )
  36. class Autoencoder(Model):
  37. def __init__(self, N, noise):
  38. super(Autoencoder, self).__init__()
  39. self.N = N
  40. self.encoder = tf.keras.Sequential()
  41. self.encoder.add(tf.keras.Input(shape=(2 ** N,), dtype=bool))
  42. self.encoder.add(layers.Dense(units=2 ** (N + 1)))
  43. self.encoder.add(LeakyReLU(alpha=0.001))
  44. # self.encoder.add(layers.Dropout(0.2))
  45. self.encoder.add(layers.Dense(units=2 ** (N + 1)))
  46. self.encoder.add(LeakyReLU(alpha=0.001))
  47. self.encoder.add(layers.Dense(units=2, activation="tanh"))
  48. # self.encoder.add(layers.ReLU(max_value=1.0))
  49. self.decoder = tf.keras.Sequential()
  50. self.decoder.add(tf.keras.Input(shape=(2,)))
  51. self.decoder.add(layers.Dense(units=2 ** (N + 1)))
  52. self.decoder.add(LeakyReLU(alpha=0.001))
  53. # self.decoder.add(layers.Dropout(0.2))
  54. self.decoder.add(layers.Dense(units=2 ** (N + 1)))
  55. self.decoder.add(LeakyReLU(alpha=0.001))
  56. self.decoder.add(layers.Dense(units=2 ** N, activation="softmax"))
  57. # self.randomiser = tf.random_normal_initializer(mean=0.0, stddev=0.1, seed=None)
  58. self.mod = None
  59. self.demod = None
  60. self.compiled = False
  61. # Divide by 2 because encoder outputs values between 0 and 1 instead of -1 and 1
  62. self.noise = 10 ** (noise / 10) # / 2
  63. # self.decoder.add(layers.Softmax(units=4, dtype=bool))
  64. # [
  65. # layers.Input(shape=(28, 28, 1)),
  66. # layers.Conv2D(16, (3, 3), activation='relu', padding='same', strides=2),
  67. # layers.Conv2D(8, (3, 3), activation='relu', padding='same', strides=2)
  68. # ])
  69. # self.decoder = tf.keras.Sequential([
  70. # layers.Conv2DTranspose(8, kernel_size=3, strides=2, activation='relu', padding='same'),
  71. # layers.Conv2DTranspose(16, kernel_size=3, strides=2, activation='relu', padding='same'),
  72. # layers.Conv2D(1, kernel_size=(3, 3), activation='sigmoid', padding='same')
  73. # ])
  74. def call(self, x, **kwargs):
  75. encoded = self.encoder(x)
  76. # encoded = encoded * 2 - 1
  77. # encoded = tf.clip_by_value(encoded, clip_value_min=0, clip_value_max=1, name=None)
  78. # noise = self.randomiser(shape=(-1, 2), dtype=tf.float32)
  79. noise = np.random.normal(0, 1, (1, 2)) * self.noise
  80. noisy = tf.convert_to_tensor(noise, dtype=tf.float32)
  81. decoded = self.decoder(encoded + noisy)
  82. return decoded
  83. def fit_encoder(self, modulation, sample_size, train_size=0.8, epochs=1, batch_size=1, shuffle=False):
  84. os.chdir('../')
  85. alphabet = basic.load_alphabet(modulation, polar=False)
  86. if not alphabet.shape[0] == self.N**2:
  87. raise Exception("Cardinality of modulation scheme is different from cardinality of autoencoder!")
  88. x_train = np.random.randint(self.N**2, size=int(sample_size*train_size))
  89. y_train = alphabet[x_train]
  90. x_train_ho = np.zeros((int(sample_size*train_size), self.N**2))
  91. for idx, x in np.ndenumerate(x_train):
  92. x_train_ho[idx, x] = 1
  93. x_test = np.random.randint(self.N**2, size=int(sample_size*(1-train_size)))
  94. y_test = alphabet[x_test]
  95. x_test_ho = np.zeros((int(sample_size*(1-train_size)), self.N ** 2))
  96. for idx, x in np.ndenumerate(x_test):
  97. x_test_ho[idx, x] = 1
  98. self.encoder.compile(optimizer='adam', loss=tf.keras.losses.MeanSquaredError())
  99. self.encoder.fit(x_train_ho, y_train,
  100. epochs=epochs,
  101. batch_size=batch_size,
  102. shuffle=shuffle,
  103. validation_data=(x_test_ho, y_test))
  104. pass
  105. def train(self, samples=1e6):
  106. if samples % self.N:
  107. samples += self.N - (samples % self.N)
  108. x_train = misc.generate_random_bit_array(samples).reshape((-1, self.N))
  109. x_train_ho = misc.bit_matrix2one_hot(x_train)
  110. x_test_array = misc.generate_random_bit_array(samples * 0.3)
  111. x_test = x_test_array.reshape((-1, self.N))
  112. x_test_ho = misc.bit_matrix2one_hot(x_test)
  113. if not self.compiled:
  114. self.compile(optimizer='adam', loss=losses.MeanSquaredError())
  115. self.compiled = True
  116. self.fit(x_train_ho, x_train_ho, shuffle=False, validation_data=(x_test_ho, x_test_ho))
  117. # encoded_data = self.encoder(x_test_ho)
  118. # decoded_data = self.decoder(encoded_data).numpy()
  119. def get_modulator(self):
  120. if self.mod is None:
  121. self.mod = AutoencoderMod(self)
  122. return self.mod
  123. def get_demodulator(self):
  124. if self.demod is None:
  125. self.demod = AutoencoderDemod(self)
  126. return self.demod
  127. def view_encoder(encoder, N, samples=1000):
  128. test_values = misc.generate_random_bit_array(samples).reshape((-1, N))
  129. test_values_ho = misc.bit_matrix2one_hot(test_values)
  130. mvector = np.array([2 ** i for i in range(N)], dtype=int)
  131. symbols = (test_values * mvector).sum(axis=1)
  132. encoded = encoder(test_values_ho).numpy()
  133. # encoded = misc.polar2rect(encoded)
  134. for i in range(2 ** N):
  135. xy = encoded[symbols == i]
  136. plt.plot(xy[:, 0], xy[:, 1], 'x', markersize=12, label=format(i, f'0{N}b'))
  137. plt.annotate(xy=[xy[:, 0].mean() + 0.01, xy[:, 1].mean() + 0.01], text=format(i, f'0{N}b'))
  138. plt.xlabel('Real')
  139. plt.ylabel('Imaginary')
  140. plt.title("Autoencoder generated alphabet")
  141. # plt.legend()
  142. plt.show()
  143. pass
  144. if __name__ == '__main__':
  145. # (x_train, _), (x_test, _) = fashion_mnist.load_data()
  146. #
  147. # x_train = x_train.astype('float32') / 255.
  148. # x_test = x_test.astype('float32') / 255.
  149. #
  150. # print(f"Train data: {x_train.shape}")
  151. # print(f"Test data: {x_test.shape}")
  152. n = 4
  153. # samples = 1e6
  154. # x_train = misc.generate_random_bit_array(samples).reshape((-1, n))
  155. # x_train_ho = misc.bit_matrix2one_hot(x_train)
  156. # x_test_array = misc.generate_random_bit_array(samples * 0.3)
  157. # x_test = x_test_array.reshape((-1, n))
  158. # x_test_ho = misc.bit_matrix2one_hot(x_test)
  159. autoencoder = Autoencoder(n, -8)
  160. autoencoder.fit_encoder(modulation='16qam',
  161. sample_size=1e6,
  162. train_size=0.8,
  163. epochs=50,
  164. batch_size=256,
  165. shuffle=True)
  166. view_encoder(autoencoder.encoder, n)
  167. # autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())
  168. #
  169. # autoencoder.fit(x_train_ho, x_train_ho,
  170. # epochs=1,
  171. # shuffle=False,
  172. # validation_data=(x_test_ho, x_test_ho))
  173. #
  174. # encoded_data = autoencoder.encoder(x_test_ho)
  175. # decoded_data = autoencoder.decoder(encoded_data).numpy()
  176. #
  177. # result = misc.int2bit_array(decoded_data.argmax(axis=1), n)
  178. # print("Accuracy: %.4f" % accuracy_score(x_test_array, result.reshape(-1, )))
  179. # view_encoder(autoencoder.encoder, n)
  180. pass