autoencoder.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. import matplotlib.pyplot as plt
  2. import numpy as np
  3. import tensorflow as tf
  4. from sklearn.metrics import accuracy_score
  5. from tensorflow.keras import layers, losses
  6. from tensorflow.keras.models import Model
  7. import misc
  8. import defs
  9. import pyswarms as ps
  10. latent_dim = 64
  11. print("# GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))
  12. class AutoencoderMod(defs.Modulator):
  13. def __init__(self, autoencoder):
  14. super().__init__(2**autoencoder.N)
  15. self.autoencoder = autoencoder
  16. def forward(self, binary: np.ndarray) -> np.ndarray:
  17. reshaped = binary.reshape((-1, self.N))
  18. reshaped_ho = misc.bit_matrix2one_hot(reshaped)
  19. encoded = self.autoencoder.encoder(reshaped_ho)
  20. x = encoded.numpy()
  21. x2 = x * 2 - 1
  22. f = np.zeros(x2.shape[0])
  23. x3 = misc.rect2polar(np.c_[x2[:, 0], x2[:, 1], f])
  24. return x3
  25. class AutoencoderDemod(defs.Demodulator):
  26. def __init__(self, autoencoder):
  27. super().__init__(2**autoencoder.N)
  28. self.autoencoder = autoencoder
  29. def forward(self, values: np.ndarray) -> np.ndarray:
  30. rect = misc.polar2rect(values[:, [0, 1]])
  31. decoded = self.autoencoder.decoder(rect).numpy()
  32. result = misc.int2bit_array(decoded.argmax(axis=1), self.N)
  33. return result.reshape(-1, )
  34. class Autoencoder(Model):
  35. def __init__(self, N, noise):
  36. super(Autoencoder, self).__init__()
  37. self.N = N
  38. self.encoder = tf.keras.Sequential()
  39. self.encoder.add(tf.keras.Input(shape=(2 ** N,), dtype=bool))
  40. self.encoder.add(layers.Dense(units=2 ** (N + 1)))
  41. # self.encoder.add(layers.Dropout(0.2))
  42. self.encoder.add(layers.Dense(units=2 ** (N + 1)))
  43. self.encoder.add(layers.Dense(units=2, activation="sigmoid"))
  44. # self.encoder.add(layers.ReLU(max_value=1.0))
  45. self.decoder = tf.keras.Sequential()
  46. self.decoder.add(tf.keras.Input(shape=(2,)))
  47. self.decoder.add(layers.Dense(units=2 ** (N + 1)))
  48. # self.decoder.add(layers.Dropout(0.2))
  49. self.decoder.add(layers.Dense(units=2 ** (N + 1)))
  50. self.decoder.add(layers.Dense(units=2 ** N, activation="softmax"))
  51. # self.randomiser = tf.random_normal_initializer(mean=0.0, stddev=0.1, seed=None)
  52. self.mod = None
  53. self.demod = None
  54. self.compiled = False
  55. # Divide by 2 because encoder outputs values between 0 and 1 instead of -1 and 1
  56. self.noise = 10 ** (noise / 10) # / 2
  57. # self.decoder.add(layers.Softmax(units=4, dtype=bool))
  58. # [
  59. # layers.Input(shape=(28, 28, 1)),
  60. # layers.Conv2D(16, (3, 3), activation='relu', padding='same', strides=2),
  61. # layers.Conv2D(8, (3, 3), activation='relu', padding='same', strides=2)
  62. # ])
  63. # self.decoder = tf.keras.Sequential([
  64. # layers.Conv2DTranspose(8, kernel_size=3, strides=2, activation='relu', padding='same'),
  65. # layers.Conv2DTranspose(16, kernel_size=3, strides=2, activation='relu', padding='same'),
  66. # layers.Conv2D(1, kernel_size=(3, 3), activation='sigmoid', padding='same')
  67. # ])
  68. def call(self, x, **kwargs):
  69. encoded = self.encoder(x)
  70. encoded = encoded * 2 - 1
  71. # encoded = tf.clip_by_value(encoded, clip_value_min=0, clip_value_max=1, name=None)
  72. # noise = self.randomiser(shape=(-1, 2), dtype=tf.float32)
  73. noise = np.random.normal(0, 1, (1, 2)) * self.noise
  74. noisy = tf.convert_to_tensor(noise, dtype=tf.float32)
  75. decoded = self.decoder(encoded + noisy)
  76. return decoded
  77. def train(self, samples=1e6):
  78. if samples % self.N:
  79. samples += self.N - (samples % self.N)
  80. x_train = misc.generate_random_bit_array(samples).reshape((-1, self.N))
  81. x_train_ho = misc.bit_matrix2one_hot(x_train)
  82. x_test_array = misc.generate_random_bit_array(samples * 0.3)
  83. x_test = x_test_array.reshape((-1, self.N))
  84. x_test_ho = misc.bit_matrix2one_hot(x_test)
  85. if not self.compiled:
  86. self.compile(optimizer='adam', loss=losses.MeanSquaredError())
  87. self.compiled = True
  88. self.fit(x_train_ho, x_train_ho, shuffle=False, validation_data=(x_test_ho, x_test_ho))
  89. # encoded_data = self.encoder(x_test_ho)
  90. # decoded_data = self.decoder(encoded_data).numpy()
  91. def get_modulator(self):
  92. if self.mod is None:
  93. self.mod = AutoencoderMod(self)
  94. return self.mod
  95. def get_demodulator(self):
  96. if self.demod is None:
  97. self.demod = AutoencoderDemod(self)
  98. return self.demod
  99. def view_encoder(encoder, N, samples=1000):
  100. test_values = misc.generate_random_bit_array(samples).reshape((-1, N))
  101. test_values_ho = misc.bit_matrix2one_hot(test_values)
  102. mvector = np.array([2 ** i for i in range(N)], dtype=int)
  103. symbols = (test_values * mvector).sum(axis=1)
  104. encoded = encoder(test_values_ho).numpy()
  105. # encoded = misc.polar2rect(encoded)
  106. for i in range(2 ** N):
  107. xy = encoded[symbols == i]
  108. plt.plot(xy[:, 0], xy[:, 1], 'x', markersize=12, label=format(i, f'0{N}b'))
  109. plt.annotate(xy=[xy[:, 0].mean() + 0.01, xy[:, 1].mean() + 0.01], text=format(i, f'0{N}b'))
  110. plt.xlabel('Real')
  111. plt.ylabel('Imaginary')
  112. plt.title("Autoencoder generated alphabet")
  113. # plt.legend()
  114. plt.show()
  115. pass
  116. def test_batch_sizes(autoencoder, sizes):
  117. accuracy = []
  118. for batch_size in sizes:
  119. autoencoder.fit(x_train_ho, x_train_ho,
  120. epochs=1,
  121. batch_size=batch_size,
  122. shuffle=False,
  123. validation_data=(x_test_ho, x_test_ho))
  124. encoded_data = autoencoder.encoder(x_test_ho)
  125. decoded_data = autoencoder.decoder(encoded_data).numpy()
  126. result = misc.int2bit_array(decoded_data.argmax(axis=1), n)
  127. print("Accuracy: %.4f" % accuracy_score(x_test_array, result.reshape(-1, )))
  128. accuracy.append(accuracy_score(x_test_array, result.reshape(-1, )))
  129. plt.plot(sizes,accuracy)
  130. plt.xscale('log')
  131. plt.grid()
  132. plt.xlabel('batch size')
  133. plt.ylabel('Accuracy')
  134. plt.legend()
  135. plt.show()
  136. def test_epoch_sizes(autoencoder, sizes):
  137. accuracy = []
  138. for epoch_size in sizes:
  139. autoencoder.fit(x_train_ho, x_train_ho,
  140. epochs=epoch_size,
  141. shuffle=False,
  142. validation_data=(x_test_ho, x_test_ho))
  143. encoded_data = autoencoder.encoder(x_test_ho)
  144. decoded_data = autoencoder.decoder(encoded_data).numpy()
  145. result = misc.int2bit_array(decoded_data.argmax(axis=1), n)
  146. print("Accuracy: %.4f" % accuracy_score(x_test_array, result.reshape(-1, )))
  147. accuracy.append(accuracy_score(x_test_array, result.reshape(-1, )))
  148. plt.plot(sizes,accuracy)
  149. plt.xscale('log')
  150. plt.grid()
  151. plt.xlabel('epoch size')
  152. plt.ylabel('Accuracy')
  153. plt.legend()
  154. plt.show()
  155. def sigmoid(z):
  156. return 1. / (1. + np.exp(-z))
  157. # Forward propagation
  158. def logits_function(params, n, train):
  159. """Forward propagation as objective function
  160. This computes for the forward propagation of the neural network, as
  161. well as the loss. It receives a set of parameters that must be
  162. rolled-back into the corresponding weights and biases.
  163. Inputs
  164. ------
  165. params: np.ndarray
  166. The dimensions should include an unrolled version of the
  167. weights and biases.
  168. Returns
  169. -------
  170. float
  171. The computed negative log-likelihood loss given the parameters
  172. """
  173. # Neural network architecture
  174. n_inputs = 2 ** n
  175. n_hidden = 2 ** (n + 1)
  176. n_classes = 2
  177. # Roll-back the weights and biases
  178. W1 = params[0:n_inputs*n_hidden].reshape((n_inputs,n_hidden))
  179. b1 = params[n_inputs*n_hidden:n_inputs*n_hidden+n_hidden].reshape((n_hidden,))
  180. W2 = params[n_inputs*n_hidden+n_hidden:n_inputs*n_hidden+n_hidden+n_hidden*n_classes].reshape((n_hidden,n_classes))
  181. b2 = params[n_inputs*n_hidden+n_hidden+n_hidden*n_classes:n_inputs*n_hidden+n_hidden+n_hidden*n_classes+n_classes].reshape((n_classes,))
  182. # Perform forward propagation
  183. z1 = train.dot(W1) + b1 # Pre-activation in Layer 1
  184. a1 = np.tanh(z1) # Activation in Layer 1
  185. z2 = a1.dot(W2) + b2 # Pre-activation in Layer 2
  186. logits = z2 # Logits for Layer 2
  187. return logits
  188. # Forward propagation
  189. def forward_prop(params, n, train, samples):
  190. """Forward propagation as objective function
  191. This computes for the forward propagation of the neural network, as
  192. well as the loss.
  193. Inputs
  194. ------
  195. params: np.ndarray
  196. The dimensions should include an unrolled version of the
  197. weights and biases.
  198. Returns
  199. -------
  200. float
  201. The computed negative log-likelihood loss given the parameters
  202. """
  203. logits = logits_function(params, n, train)
  204. # Compute for the softmax of the logits
  205. exp_scores = np.exp(logits)
  206. probs = exp_scores / np.sum(exp_scores, axis=1, keepdims=True)
  207. # Compute for the negative log likelihood
  208. corect_logprobs = -np.log(probs[range(samples), train])
  209. loss = np.sum(corect_logprobs) / samples
  210. return loss
  211. def f(x, n, train, samples):
  212. """Higher-level method to do forward_prop in the
  213. whole swarm.
  214. Inputs
  215. ------
  216. x: numpy.ndarray of shape (n_particles, dimensions)
  217. The swarm that will perform the search
  218. Returns
  219. -------
  220. numpy.ndarray of shape (n_particles, )
  221. The computed loss for each particle
  222. """
  223. n_particles = x.shape[0]
  224. j = [forward_prop(x[i], n, train, samples) for i in range(n_particles)]
  225. return np.array(j)
  226. def predict(pos):
  227. """
  228. Use the trained weights to perform class predictions.
  229. Inputs
  230. ------
  231. pos: numpy.ndarray
  232. Position matrix found by the swarm. Will be rolled
  233. into weights and biases.
  234. """
  235. logits = logits_function(pos)
  236. y_pred = np.argmax(logits, axis=1)
  237. return y_pred
  238. if __name__ == '__main__':
  239. # (x_train, _), (x_test, _) = fashion_mnist.load_data()
  240. #
  241. # x_train = x_train.astype('float32') / 255.
  242. # x_test = x_test.astype('float32') / 255.
  243. #
  244. # print(f"Train data: {x_train.shape}")
  245. # print(f"Test data: {x_test.shape}")
  246. n = 4
  247. n_inputs = 2 ** n
  248. n_hidden = 2 ** (n + 1)
  249. n_classes = 2
  250. samples = 1e5
  251. x_train = misc.generate_random_bit_array(samples).reshape((-1, n))
  252. x_train_ho = misc.bit_matrix2one_hot(x_train)
  253. x_test_array = misc.generate_random_bit_array(samples * 0.3)
  254. x_test = x_test_array.reshape((-1, n))
  255. x_test_ho = misc.bit_matrix2one_hot(x_test)
  256. """
  257. # Initialize swarm
  258. options = {'c1': 0.5, 'c2': 0.3, 'w': 0.9}
  259. # Call instance of PSO
  260. dimensions = (n_inputs * n_hidden) + (n_hidden * n_classes) + n_hidden + n_classes
  261. optimizer = ps.single.GlobalBestPSO(n_particles=100, dimensions=dimensions, options=options)
  262. # Perform optimization
  263. cost, pos = optimizer.optimize(f(x, n, x_train_ho, samples), iters=1000)
  264. print((predict(pos) == x_train_ho).mean())
  265. pass
  266. """
  267. autoencoder = Autoencoder(n, -8)
  268. autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())
  269. batch_sizes = [10,20,30,40, 100,200,300,400,500, 1000,2000,3000,4000,5000]
  270. epoch_sizes = [1, 10, 20, 50, 100, 200, 500]
  271. test_epoch_sizes(autoencoder, epoch_sizes)
  272. pass
  273. """
  274. autoencoder.fit(x_train_ho, x_train_ho,
  275. epochs=1,
  276. shuffle=False,
  277. validation_data=(x_test_ho, x_test_ho))
  278. encoded_data = autoencoder.encoder(x_test_ho)
  279. decoded_data = autoencoder.decoder(encoded_data).numpy()
  280. result = misc.int2bit_array(decoded_data.argmax(axis=1), n)
  281. print("Accuracy: %.4f" % accuracy_score(x_test_array, result.reshape(-1, )))
  282. view_encoder(autoencoder.encoder, n)
  283. """