|
@@ -0,0 +1,523 @@
|
|
|
|
|
+"""
|
|
|
|
|
+These are some unstructured tests. Feel free to use this code for anything else
|
|
|
|
|
+"""
|
|
|
|
|
+
|
|
|
|
|
+import logging
|
|
|
|
|
+import pathlib
|
|
|
|
|
+from itertools import chain
|
|
|
|
|
+from sys import stdout
|
|
|
|
|
+
|
|
|
|
|
+from tensorflow.python.framework.errors_impl import NotFoundError
|
|
|
|
|
+
|
|
|
|
|
+import defs
|
|
|
|
|
+from graphs import get_SNR, get_AWGN_ber
|
|
|
|
|
+from models import basic
|
|
|
|
|
+from models.autoencoder import Autoencoder, view_encoder
|
|
|
|
|
+import matplotlib.pyplot as plt
|
|
|
|
|
+import tensorflow as tf
|
|
|
|
|
+import misc
|
|
|
|
|
+import numpy as np
|
|
|
|
|
+
|
|
|
|
|
+from models.basic import AlphabetDemod, AlphabetMod
|
|
|
|
|
+from models.optical_channel import OpticalChannel
|
|
|
|
|
+from models.quantized_net import QuantizedNeuralNetwork
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _test_optics_autoencoder():
|
|
|
|
|
+ ch = OpticalChannel(
|
|
|
|
|
+ noise_level=-10,
|
|
|
|
|
+ dispersion=-21.7,
|
|
|
|
|
+ symbol_rate=10e9,
|
|
|
|
|
+ sample_rate=400e9,
|
|
|
|
|
+ length=10,
|
|
|
|
|
+ pulse_shape='rcos',
|
|
|
|
|
+ sqrt_out=True
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ tf.executing_eagerly()
|
|
|
|
|
+
|
|
|
|
|
+ aenc = Autoencoder(4, channel=ch)
|
|
|
|
|
+ aenc.train(samples=1e6)
|
|
|
|
|
+ plt.plot(*get_SNR(
|
|
|
|
|
+ aenc.get_modulator(),
|
|
|
|
|
+ aenc.get_demodulator(),
|
|
|
|
|
+ ber_func=get_AWGN_ber,
|
|
|
|
|
+ samples=100000,
|
|
|
|
|
+ steps=50,
|
|
|
|
|
+ start=-5,
|
|
|
|
|
+ stop=15
|
|
|
|
|
+ ), '-', label='AE')
|
|
|
|
|
+
|
|
|
|
|
+ plt.plot(*get_SNR(
|
|
|
|
|
+ AlphabetMod('4pam', 10e6),
|
|
|
|
|
+ AlphabetDemod('4pam', 10e6),
|
|
|
|
|
+ samples=30000,
|
|
|
|
|
+ steps=50,
|
|
|
|
|
+ start=-5,
|
|
|
|
|
+ stop=15,
|
|
|
|
|
+ length=1,
|
|
|
|
|
+ pulse_shape='rcos'
|
|
|
|
|
+ ), '-', label='4PAM')
|
|
|
|
|
+
|
|
|
|
|
+ plt.yscale('log')
|
|
|
|
|
+ plt.grid()
|
|
|
|
|
+ plt.xlabel('SNR dB')
|
|
|
|
|
+ plt.title("Autoencoder Performance")
|
|
|
|
|
+ plt.legend()
|
|
|
|
|
+ plt.savefig('optics_autoencoder.eps', format='eps')
|
|
|
|
|
+ plt.show()
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _test_autoencoder_pretrain():
|
|
|
|
|
+ # aenc = Autoencoder(4, -25)
|
|
|
|
|
+ # aenc.train(samples=1e6)
|
|
|
|
|
+ # plt.plot(*get_SNR(
|
|
|
|
|
+ # aenc.get_modulator(),
|
|
|
|
|
+ # aenc.get_demodulator(),
|
|
|
|
|
+ # ber_func=get_AWGN_ber,
|
|
|
|
|
+ # samples=100000,
|
|
|
|
|
+ # steps=50,
|
|
|
|
|
+ # start=-5,
|
|
|
|
|
+ # stop=15
|
|
|
|
|
+ # ), '-', label='Random AE')
|
|
|
|
|
+
|
|
|
|
|
+ aenc = Autoencoder(4, -25)
|
|
|
|
|
+ # aenc.fit_encoder('16qam', 3e4)
|
|
|
|
|
+ aenc.fit_decoder('16qam', 1e5)
|
|
|
|
|
+
|
|
|
|
|
+ plt.plot(*get_SNR(
|
|
|
|
|
+ aenc.get_modulator(),
|
|
|
|
|
+ aenc.get_demodulator(),
|
|
|
|
|
+ ber_func=get_AWGN_ber,
|
|
|
|
|
+ samples=100000,
|
|
|
|
|
+ steps=50,
|
|
|
|
|
+ start=-5,
|
|
|
|
|
+ stop=15
|
|
|
|
|
+ ), '-', label='16QAM Pre-trained AE')
|
|
|
|
|
+
|
|
|
|
|
+ aenc.train(samples=3e6)
|
|
|
|
|
+ plt.plot(*get_SNR(
|
|
|
|
|
+ aenc.get_modulator(),
|
|
|
|
|
+ aenc.get_demodulator(),
|
|
|
|
|
+ ber_func=get_AWGN_ber,
|
|
|
|
|
+ samples=100000,
|
|
|
|
|
+ steps=50,
|
|
|
|
|
+ start=-5,
|
|
|
|
|
+ stop=15
|
|
|
|
|
+ ), '-', label='16QAM Post-trained AE')
|
|
|
|
|
+
|
|
|
|
|
+ plt.plot(*get_SNR(
|
|
|
|
|
+ AlphabetMod('16qam', 10e6),
|
|
|
|
|
+ AlphabetDemod('16qam', 10e6),
|
|
|
|
|
+ ber_func=get_AWGN_ber,
|
|
|
|
|
+ samples=100000,
|
|
|
|
|
+ steps=50,
|
|
|
|
|
+ start=-5,
|
|
|
|
|
+ stop=15
|
|
|
|
|
+ ), '-', label='16QAM')
|
|
|
|
|
+
|
|
|
|
|
+ plt.yscale('log')
|
|
|
|
|
+ plt.grid()
|
|
|
|
|
+ plt.xlabel('SNR dB')
|
|
|
|
|
+ plt.title("4Bit Autoencoder Performance")
|
|
|
|
|
+ plt.legend()
|
|
|
|
|
+ plt.show()
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class LiteTFMod(defs.Modulator):
|
|
|
|
|
+ def __init__(self, name, autoencoder):
|
|
|
|
|
+ super().__init__(2 ** autoencoder.N)
|
|
|
|
|
+ self.autoencoder = autoencoder
|
|
|
|
|
+ tflite_models_dir = pathlib.Path("/tmp/tflite/")
|
|
|
|
|
+ tflite_model_file = tflite_models_dir / (name + ".tflite")
|
|
|
|
|
+ self.interpreter = tf.lite.Interpreter(model_path=str(tflite_model_file))
|
|
|
|
|
+ self.interpreter.allocate_tensors()
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ def forward(self, binary: np.ndarray):
|
|
|
|
|
+ reshaped = binary.reshape((-1, (self.N * self.autoencoder.parallel)))
|
|
|
|
|
+ reshaped_ho = misc.bit_matrix2one_hot(reshaped)
|
|
|
|
|
+
|
|
|
|
|
+ input_index = self.interpreter.get_input_details()[0]["index"]
|
|
|
|
|
+ input_dtype = self.interpreter.get_input_details()[0]["dtype"]
|
|
|
|
|
+ input_shape = self.interpreter.get_input_details()[0]["shape"]
|
|
|
|
|
+ output_index = self.interpreter.get_output_details()[0]["index"]
|
|
|
|
|
+ output_shape = self.interpreter.get_output_details()[0]["shape"]
|
|
|
|
|
+
|
|
|
|
|
+ x = np.zeros((len(reshaped_ho), output_shape[1]))
|
|
|
|
|
+ for i, ho in enumerate(reshaped_ho):
|
|
|
|
|
+ self.interpreter.set_tensor(input_index, ho.reshape(input_shape).astype(input_dtype))
|
|
|
|
|
+ self.interpreter.invoke()
|
|
|
|
|
+ x[i] = self.interpreter.get_tensor(output_index)
|
|
|
|
|
+
|
|
|
|
|
+ if self.autoencoder.bipolar:
|
|
|
|
|
+ x = x * 2 - 1
|
|
|
|
|
+
|
|
|
|
|
+ if self.autoencoder.parallel > 1:
|
|
|
|
|
+ x = x.reshape((-1, self.autoencoder.signal_dim))
|
|
|
|
|
+
|
|
|
|
|
+ f = np.zeros(x.shape[0])
|
|
|
|
|
+ if self.autoencoder.signal_dim <= 1:
|
|
|
|
|
+ p = np.zeros(x.shape[0])
|
|
|
|
|
+ else:
|
|
|
|
|
+ p = x[:, 1]
|
|
|
|
|
+ x3 = misc.rect2polar(np.c_[x[:, 0], p, f])
|
|
|
|
|
+ return basic.RFSignal(x3)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class LiteTFDemod(defs.Demodulator):
|
|
|
|
|
+ def __init__(self, name, autoencoder):
|
|
|
|
|
+ super().__init__(2 ** autoencoder.N)
|
|
|
|
|
+ self.autoencoder = autoencoder
|
|
|
|
|
+ tflite_models_dir = pathlib.Path("/tmp/tflite/")
|
|
|
|
|
+ tflite_model_file = tflite_models_dir / (name + ".tflite")
|
|
|
|
|
+ self.interpreter = tf.lite.Interpreter(model_path=str(tflite_model_file))
|
|
|
|
|
+ self.interpreter.allocate_tensors()
|
|
|
|
|
+
|
|
|
|
|
+ def forward(self, values: defs.Signal) -> np.ndarray:
|
|
|
|
|
+ if self.autoencoder.signal_dim <= 1:
|
|
|
|
|
+ val = values.rect_x
|
|
|
|
|
+ else:
|
|
|
|
|
+ val = values.rect
|
|
|
|
|
+ if self.autoencoder.parallel > 1:
|
|
|
|
|
+ val = val.reshape((-1, self.autoencoder.parallel))
|
|
|
|
|
+
|
|
|
|
|
+ input_index = self.interpreter.get_input_details()[0]["index"]
|
|
|
|
|
+ input_dtype = self.interpreter.get_input_details()[0]["dtype"]
|
|
|
|
|
+ input_shape = self.interpreter.get_input_details()[0]["shape"]
|
|
|
|
|
+ output_index = self.interpreter.get_output_details()[0]["index"]
|
|
|
|
|
+ output_shape = self.interpreter.get_output_details()[0]["shape"]
|
|
|
|
|
+
|
|
|
|
|
+ decoded = np.zeros((len(val), output_shape[1]))
|
|
|
|
|
+ for i, v in enumerate(val):
|
|
|
|
|
+ self.interpreter.set_tensor(input_index, v.reshape(input_shape).astype(input_dtype))
|
|
|
|
|
+ self.interpreter.invoke()
|
|
|
|
|
+ decoded[i] = self.interpreter.get_tensor(output_index)
|
|
|
|
|
+ result = misc.int2bit_array(decoded.argmax(axis=1), self.N * self.autoencoder.parallel)
|
|
|
|
|
+ return result.reshape(-1, )
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _test_autoencoder_perf():
|
|
|
|
|
+ assert float(tf.__version__[:3]) >= 2.3
|
|
|
|
|
+
|
|
|
|
|
+ # aenc = Autoencoder(3, -15)
|
|
|
|
|
+ # aenc.train(samples=1e6)
|
|
|
|
|
+ # plt.plot(*get_SNR(
|
|
|
|
|
+ # aenc.get_modulator(),
|
|
|
|
|
+ # aenc.get_demodulator(),
|
|
|
|
|
+ # ber_func=get_AWGN_ber,
|
|
|
|
|
+ # samples=100000,
|
|
|
|
|
+ # steps=50,
|
|
|
|
|
+ # start=-5,
|
|
|
|
|
+ # stop=15
|
|
|
|
|
+ # ), '-', label='3Bit AE')
|
|
|
|
|
+
|
|
|
|
|
+ # aenc = Autoencoder(4, -25, bipolar=True, dtype=tf.float64)
|
|
|
|
|
+ # aenc.train(samples=5e5)
|
|
|
|
|
+ # plt.plot(*get_SNR(
|
|
|
|
|
+ # aenc.get_modulator(),
|
|
|
|
|
+ # aenc.get_demodulator(),
|
|
|
|
|
+ # ber_func=get_AWGN_ber,
|
|
|
|
|
+ # samples=100000,
|
|
|
|
|
+ # steps=50,
|
|
|
|
|
+ # start=-5,
|
|
|
|
|
+ # stop=15
|
|
|
|
|
+ # ), '-', label='4Bit AE F64')
|
|
|
|
|
+
|
|
|
|
|
+ aenc = Autoencoder(4, -25, bipolar=True)
|
|
|
|
|
+ aenc.train(epoch_size=1e3, epochs=10)
|
|
|
|
|
+ # #
|
|
|
|
|
+ m = aenc.N * aenc.parallel
|
|
|
|
|
+ x_train = misc.bit_matrix2one_hot(misc.generate_random_bit_array(100*m).reshape((-1, m)))
|
|
|
|
|
+ x_train_enc = aenc.encoder(x_train)
|
|
|
|
|
+ x_train = tf.cast(x_train, tf.float32)
|
|
|
|
|
+ #
|
|
|
|
|
+ # plt.plot(*get_SNR(
|
|
|
|
|
+ # aenc.get_modulator(),
|
|
|
|
|
+ # aenc.get_demodulator(),
|
|
|
|
|
+ # ber_func=get_AWGN_ber,
|
|
|
|
|
+ # samples=100000,
|
|
|
|
|
+ # steps=50,
|
|
|
|
|
+ # start=-5,
|
|
|
|
|
+ # stop=15
|
|
|
|
|
+ # ), '-', label='4AE F32')
|
|
|
|
|
+ # # #
|
|
|
|
|
+ def save_tfline(model, name, types=None, ops=None, io_types=None, train_x=None):
|
|
|
|
|
+ converter = tf.lite.TFLiteConverter.from_keras_model(model)
|
|
|
|
|
+ if types is not None:
|
|
|
|
|
+ converter.optimizations = [tf.lite.Optimize.DEFAULT]
|
|
|
|
|
+ converter.target_spec.supported_types = types
|
|
|
|
|
+ if ops is not None:
|
|
|
|
|
+ converter.optimizations = [tf.lite.Optimize.DEFAULT]
|
|
|
|
|
+ converter.target_spec.supported_ops = ops
|
|
|
|
|
+ if io_types is not None:
|
|
|
|
|
+ converter.inference_input_type = io_types
|
|
|
|
|
+ converter.inference_output_type = io_types
|
|
|
|
|
+ if train_x is not None:
|
|
|
|
|
+ def representative_data_gen():
|
|
|
|
|
+ for input_value in tf.data.Dataset.from_tensor_slices(train_x).batch(1).take(100):
|
|
|
|
|
+ yield [input_value]
|
|
|
|
|
+ converter.representative_dataset = representative_data_gen
|
|
|
|
|
+ tflite_model = converter.convert()
|
|
|
|
|
+ tflite_models_dir = pathlib.Path("/tmp/tflite/")
|
|
|
|
|
+ tflite_models_dir.mkdir(exist_ok=True, parents=True)
|
|
|
|
|
+ tflite_model_file = tflite_models_dir / (name + ".tflite")
|
|
|
|
|
+ tflite_model_file.write_bytes(tflite_model)
|
|
|
|
|
+
|
|
|
|
|
+ print("Saving models")
|
|
|
|
|
+
|
|
|
|
|
+ save_tfline(aenc.encoder, "default_enc")
|
|
|
|
|
+ save_tfline(aenc.decoder, "default_dec")
|
|
|
|
|
+ #
|
|
|
|
|
+ # save_tfline(aenc.encoder, "float16_enc", [tf.float16])
|
|
|
|
|
+ # save_tfline(aenc.decoder, "float16_dec", [tf.float16])
|
|
|
|
|
+ #
|
|
|
|
|
+ # save_tfline(aenc.encoder, "bfloat16_enc", [tf.bfloat16])
|
|
|
|
|
+ # save_tfline(aenc.decoder, "bfloat16_dec", [tf.bfloat16])
|
|
|
|
|
+
|
|
|
|
|
+ INT16X8 = tf.lite.OpsSet.EXPERIMENTAL_TFLITE_BUILTINS_ACTIVATIONS_INT16_WEIGHTS_INT8
|
|
|
|
|
+ save_tfline(aenc.encoder, "int16x8_enc", ops=[INT16X8], train_x=x_train)
|
|
|
|
|
+ save_tfline(aenc.decoder, "int16x8_dec", ops=[INT16X8], train_x=x_train_enc)
|
|
|
|
|
+
|
|
|
|
|
+ # save_tfline(aenc.encoder, "int8_enc", ops=[tf.lite.OpsSet.TFLITE_BUILTINS_INT8], io_types=tf.uint8, train_x=x_train)
|
|
|
|
|
+ # save_tfline(aenc.decoder, "int8_dec", ops=[tf.lite.OpsSet.TFLITE_BUILTINS_INT8], io_types=tf.uint8, train_x=x_train_enc)
|
|
|
|
|
+
|
|
|
|
|
+ print("Testing BER vs SNR")
|
|
|
|
|
+ plt.plot(*get_SNR(
|
|
|
|
|
+ LiteTFMod("default_enc", aenc),
|
|
|
|
|
+ LiteTFDemod("default_dec", aenc),
|
|
|
|
|
+ ber_func=get_AWGN_ber,
|
|
|
|
|
+ samples=100000,
|
|
|
|
|
+ steps=50,
|
|
|
|
|
+ start=-5,
|
|
|
|
|
+ stop=15
|
|
|
|
|
+ ), '-', label='4AE F32')
|
|
|
|
|
+
|
|
|
|
|
+ # plt.plot(*get_SNR(
|
|
|
|
|
+ # LiteTFMod("float16_enc", aenc),
|
|
|
|
|
+ # LiteTFDemod("float16_dec", aenc),
|
|
|
|
|
+ # ber_func=get_AWGN_ber,
|
|
|
|
|
+ # samples=100000,
|
|
|
|
|
+ # steps=50,
|
|
|
|
|
+ # start=-5,
|
|
|
|
|
+ # stop=15
|
|
|
|
|
+ # ), '-', label='4AE F16')
|
|
|
|
|
+ # #
|
|
|
|
|
+ # plt.plot(*get_SNR(
|
|
|
|
|
+ # LiteTFMod("bfloat16_enc", aenc),
|
|
|
|
|
+ # LiteTFDemod("bfloat16_dec", aenc),
|
|
|
|
|
+ # ber_func=get_AWGN_ber,
|
|
|
|
|
+ # samples=100000,
|
|
|
|
|
+ # steps=50,
|
|
|
|
|
+ # start=-5,
|
|
|
|
|
+ # stop=15
|
|
|
|
|
+ # ), '-', label='4AE BF16')
|
|
|
|
|
+ #
|
|
|
|
|
+ plt.plot(*get_SNR(
|
|
|
|
|
+ LiteTFMod("int16x8_enc", aenc),
|
|
|
|
|
+ LiteTFDemod("int16x8_dec", aenc),
|
|
|
|
|
+ ber_func=get_AWGN_ber,
|
|
|
|
|
+ samples=100000,
|
|
|
|
|
+ steps=50,
|
|
|
|
|
+ start=-5,
|
|
|
|
|
+ stop=15
|
|
|
|
|
+ ), '-', label='4AE I16x8')
|
|
|
|
|
+
|
|
|
|
|
+ # plt.plot(*get_SNR(
|
|
|
|
|
+ # AlphabetMod('16qam', 10e6),
|
|
|
|
|
+ # AlphabetDemod('16qam', 10e6),
|
|
|
|
|
+ # ber_func=get_AWGN_ber,
|
|
|
|
|
+ # samples=100000,
|
|
|
|
|
+ # steps=50,
|
|
|
|
|
+ # start=-5,
|
|
|
|
|
+ # stop=15,
|
|
|
|
|
+ # ), '-', label='16qam')
|
|
|
|
|
+
|
|
|
|
|
+ plt.yscale('log')
|
|
|
|
|
+ plt.grid()
|
|
|
|
|
+ plt.xlabel('SNR dB')
|
|
|
|
|
+ plt.ylabel('BER')
|
|
|
|
|
+ plt.title("Autoencoder with different precision data types")
|
|
|
|
|
+ plt.legend()
|
|
|
|
|
+ plt.savefig('autoencoder_compression.eps', format='eps')
|
|
|
|
|
+ plt.show()
|
|
|
|
|
+
|
|
|
|
|
+ view_encoder(aenc.encoder, 4)
|
|
|
|
|
+
|
|
|
|
|
+ # aenc = Autoencoder(5, -25)
|
|
|
|
|
+ # aenc.train(samples=2e6)
|
|
|
|
|
+ # plt.plot(*get_SNR(
|
|
|
|
|
+ # aenc.get_modulator(),
|
|
|
|
|
+ # aenc.get_demodulator(),
|
|
|
|
|
+ # ber_func=get_AWGN_ber,
|
|
|
|
|
+ # samples=100000,
|
|
|
|
|
+ # steps=50,
|
|
|
|
|
+ # start=-5,
|
|
|
|
|
+ # stop=15
|
|
|
|
|
+ # ), '-', label='5Bit AE')
|
|
|
|
|
+ #
|
|
|
|
|
+ # aenc = Autoencoder(6, -25)
|
|
|
|
|
+ # aenc.train(samples=2e6)
|
|
|
|
|
+ # plt.plot(*get_SNR(
|
|
|
|
|
+ # aenc.get_modulator(),
|
|
|
|
|
+ # aenc.get_demodulator(),
|
|
|
|
|
+ # ber_func=get_AWGN_ber,
|
|
|
|
|
+ # samples=100000,
|
|
|
|
|
+ # steps=50,
|
|
|
|
|
+ # start=-5,
|
|
|
|
|
+ # stop=15
|
|
|
|
|
+ # ), '-', label='6Bit AE')
|
|
|
|
|
+ #
|
|
|
|
|
+ # for scheme in ['64qam', '32qam', '16qam', 'qpsk', '8psk']:
|
|
|
|
|
+ # plt.plot(*get_SNR(
|
|
|
|
|
+ # AlphabetMod(scheme, 10e6),
|
|
|
|
|
+ # AlphabetDemod(scheme, 10e6),
|
|
|
|
|
+ # ber_func=get_AWGN_ber,
|
|
|
|
|
+ # samples=100000,
|
|
|
|
|
+ # steps=50,
|
|
|
|
|
+ # start=-5,
|
|
|
|
|
+ # stop=15,
|
|
|
|
|
+ # ), '-', label=scheme.upper())
|
|
|
|
|
+ #
|
|
|
|
|
+ # plt.yscale('log')
|
|
|
|
|
+ # plt.grid()
|
|
|
|
|
+ # plt.xlabel('SNR dB')
|
|
|
|
|
+ # plt.title("Autoencoder vs defined modulations")
|
|
|
|
|
+ # plt.legend()
|
|
|
|
|
+ # plt.show()
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _test_autoencoder_perf2():
|
|
|
|
|
+ aenc = Autoencoder(2, -20)
|
|
|
|
|
+ aenc.train(samples=3e6)
|
|
|
|
|
+ plt.plot(*get_SNR(aenc.get_modulator(), aenc.get_demodulator(), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15), '-', label='2Bit AE')
|
|
|
|
|
+
|
|
|
|
|
+ aenc = Autoencoder(3, -20)
|
|
|
|
|
+ aenc.train(samples=3e6)
|
|
|
|
|
+ plt.plot(*get_SNR(aenc.get_modulator(), aenc.get_demodulator(), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15), '-', label='3Bit AE')
|
|
|
|
|
+
|
|
|
|
|
+ aenc = Autoencoder(4, -20)
|
|
|
|
|
+ aenc.train(samples=3e6)
|
|
|
|
|
+ plt.plot(*get_SNR(aenc.get_modulator(), aenc.get_demodulator(), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15), '-', label='4Bit AE')
|
|
|
|
|
+
|
|
|
|
|
+ aenc = Autoencoder(5, -20)
|
|
|
|
|
+ aenc.train(samples=3e6)
|
|
|
|
|
+ plt.plot(*get_SNR(aenc.get_modulator(), aenc.get_demodulator(), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15), '-', label='5Bit AE')
|
|
|
|
|
+
|
|
|
|
|
+ for a in ['qpsk', '8psk', '16qam', '32qam', '64qam']:
|
|
|
|
|
+ try:
|
|
|
|
|
+ plt.plot(*get_SNR(AlphabetMod(a, 10e6), AlphabetDemod(a, 10e6), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15,), '-', label=a.upper())
|
|
|
|
|
+ except KeyboardInterrupt:
|
|
|
|
|
+ break
|
|
|
|
|
+ except Exception:
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ plt.yscale('log')
|
|
|
|
|
+ plt.grid()
|
|
|
|
|
+ plt.xlabel('SNR dB')
|
|
|
|
|
+ plt.title("Autoencoder vs defined modulations")
|
|
|
|
|
+ plt.legend()
|
|
|
|
|
+ plt.savefig('autoencoder_mods.eps', format='eps')
|
|
|
|
|
+ plt.show()
|
|
|
|
|
+
|
|
|
|
|
+ # view_encoder(aenc.encoder, 2)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _test_autoencoder_perf_qnn():
|
|
|
|
|
+ fh = logging.FileHandler("model_quantizing.log", mode="w+")
|
|
|
|
|
+ fh.setLevel(logging.INFO)
|
|
|
|
|
+ sh = logging.StreamHandler(stream=stdout)
|
|
|
|
|
+ sh.setLevel(logging.INFO)
|
|
|
|
|
+
|
|
|
|
|
+ logger = logging.getLogger(__name__)
|
|
|
|
|
+ logger.setLevel(level=logging.INFO)
|
|
|
|
|
+ logger.addHandler(fh)
|
|
|
|
|
+ logger.addHandler(sh)
|
|
|
|
|
+
|
|
|
|
|
+ aenc = Autoencoder(4, -25, bipolar=True)
|
|
|
|
|
+ # aenc.encoder.save_weights('ae_enc.bin')
|
|
|
|
|
+ # aenc.decoder.save_weights('ae_dec.bin')
|
|
|
|
|
+ # aenc.encoder.load_weights('ae_enc.bin')
|
|
|
|
|
+ # aenc.decoder.load_weights('ae_dec.bin')
|
|
|
|
|
+ try:
|
|
|
|
|
+ aenc.load_weights('autoencoder')
|
|
|
|
|
+ except NotFoundError:
|
|
|
|
|
+ aenc.train(epoch_size=1e3, epochs=10)
|
|
|
|
|
+ aenc.save_weights('autoencoder')
|
|
|
|
|
+
|
|
|
|
|
+ aenc.compile(optimizer='adam', loss=tf.losses.MeanSquaredError())
|
|
|
|
|
+
|
|
|
|
|
+ m = aenc.N * aenc.parallel
|
|
|
|
|
+ view_encoder(aenc.encoder, 4, title="FP32 Alphabet")
|
|
|
|
|
+
|
|
|
|
|
+ batch_size = 25000
|
|
|
|
|
+ x_train = misc.bit_matrix2one_hot(misc.generate_random_bit_array(batch_size*m).reshape((-1, m)))
|
|
|
|
|
+ x_test = misc.bit_matrix2one_hot(misc.generate_random_bit_array(5000*m).reshape((-1, m)))
|
|
|
|
|
+ bits = [np.log2(i) for i in (32,)][0]
|
|
|
|
|
+ alphabet_scalars = 2 # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
|
|
|
|
|
+ num_layers = sum([layer.__class__.__name__ in ('Dense',) for layer in aenc.all_layers])
|
|
|
|
|
+
|
|
|
|
|
+ # for b in (3, 6, 8, 12, 16, 24, 32, 48, 64):
|
|
|
|
|
+ get_data = (sample for sample in x_train)
|
|
|
|
|
+ for i in range(num_layers):
|
|
|
|
|
+ get_data = chain(get_data, (sample for sample in x_train))
|
|
|
|
|
+
|
|
|
|
|
+ qnn = QuantizedNeuralNetwork(
|
|
|
|
|
+ network=aenc,
|
|
|
|
|
+ batch_size=batch_size,
|
|
|
|
|
+ get_data=get_data,
|
|
|
|
|
+ logger=logger,
|
|
|
|
|
+ bits=np.log2(16),
|
|
|
|
|
+ alphabet_scalar=alphabet_scalars,
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ qnn.quantize_network()
|
|
|
|
|
+ qnn.quantized_net.compile(optimizer='adam', loss=tf.keras.losses.MeanSquaredError())
|
|
|
|
|
+ view_encoder(qnn.quantized_net, 4, title=f"Quantised {16}b alphabet")
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ # view_encoder(qnn_enc.quantized_net, 4, title=f"Quantised {b}b alphabet")
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ # _, q_accuracy = qnn.quantized_net.evaluate(x_test, x_test, verbose=True)
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+if __name__ == '__main__':
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ # plt.plot(*get_SNR(
|
|
|
|
|
+ # AlphabetMod('16qam', 10e6),
|
|
|
|
|
+ # AlphabetDemod('16qam', 10e6),
|
|
|
|
|
+ # ber_func=get_AWGN_ber,
|
|
|
|
|
+ # samples=100000,
|
|
|
|
|
+ # steps=50,
|
|
|
|
|
+ # start=-5,
|
|
|
|
|
+ # stop=15,
|
|
|
|
|
+ # ), '-', label='16qam AWGN')
|
|
|
|
|
+ #
|
|
|
|
|
+ # plt.plot(*get_SNR(
|
|
|
|
|
+ # AlphabetMod('16qam', 10e6),
|
|
|
|
|
+ # AlphabetDemod('16qam', 10e6),
|
|
|
|
|
+ # samples=100000,
|
|
|
|
|
+ # steps=50,
|
|
|
|
|
+ # start=-5,
|
|
|
|
|
+ # stop=15,
|
|
|
|
|
+ # ), '-', label='16qam OPTICAL')
|
|
|
|
|
+ #
|
|
|
|
|
+ # plt.yscale('log')
|
|
|
|
|
+ # plt.grid()
|
|
|
|
|
+ # plt.xlabel('SNR dB')
|
|
|
|
|
+ # plt.show()
|
|
|
|
|
+
|
|
|
|
|
+ # _test_autoencoder_perf()
|
|
|
|
|
+ _test_autoencoder_perf_qnn()
|
|
|
|
|
+ # _test_autoencoder_perf2()
|
|
|
|
|
+ # _test_autoencoder_pretrain()
|
|
|
|
|
+ # _test_optics_autoencoder()
|
|
|
|
|
+
|