""" These are some unstructured tests. Feel free to use this code for anything else """ import logging import pathlib from itertools import chain from sys import stdout from tensorflow.python.framework.errors_impl import NotFoundError import defs from graphs import get_SNR, get_AWGN_ber from models import basic from models.autoencoder import Autoencoder, view_encoder import matplotlib.pyplot as plt import tensorflow as tf import misc import numpy as np from models.basic import AlphabetDemod, AlphabetMod from models.optical_channel import OpticalChannel from models.quantized_net import QuantizedNeuralNetwork def _test_optics_autoencoder(): ch = OpticalChannel( noise_level=-10, dispersion=-21.7, symbol_rate=10e9, sample_rate=400e9, length=10, pulse_shape='rcos', sqrt_out=True ) tf.executing_eagerly() aenc = Autoencoder(4, channel=ch) aenc.train(samples=1e6) plt.plot(*get_SNR( aenc.get_modulator(), aenc.get_demodulator(), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15 ), '-', label='AE') plt.plot(*get_SNR( AlphabetMod('4pam', 10e6), AlphabetDemod('4pam', 10e6), samples=30000, steps=50, start=-5, stop=15, length=1, pulse_shape='rcos' ), '-', label='4PAM') plt.yscale('log') plt.grid() plt.xlabel('SNR dB') plt.title("Autoencoder Performance") plt.legend() plt.savefig('optics_autoencoder.eps', format='eps') plt.show() def _test_autoencoder_pretrain(): # aenc = Autoencoder(4, -25) # aenc.train(samples=1e6) # plt.plot(*get_SNR( # aenc.get_modulator(), # aenc.get_demodulator(), # ber_func=get_AWGN_ber, # samples=100000, # steps=50, # start=-5, # stop=15 # ), '-', label='Random AE') aenc = Autoencoder(4, -25) # aenc.fit_encoder('16qam', 3e4) aenc.fit_decoder('16qam', 1e5) plt.plot(*get_SNR( aenc.get_modulator(), aenc.get_demodulator(), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15 ), '-', label='16QAM Pre-trained AE') aenc.train(samples=3e6) plt.plot(*get_SNR( aenc.get_modulator(), aenc.get_demodulator(), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15 ), '-', label='16QAM Post-trained AE') plt.plot(*get_SNR( AlphabetMod('16qam', 10e6), AlphabetDemod('16qam', 10e6), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15 ), '-', label='16QAM') plt.yscale('log') plt.grid() plt.xlabel('SNR dB') plt.title("4Bit Autoencoder Performance") plt.legend() plt.show() class LiteTFMod(defs.Modulator): def __init__(self, name, autoencoder): super().__init__(2 ** autoencoder.N) self.autoencoder = autoencoder tflite_models_dir = pathlib.Path("/tmp/tflite/") tflite_model_file = tflite_models_dir / (name + ".tflite") self.interpreter = tf.lite.Interpreter(model_path=str(tflite_model_file)) self.interpreter.allocate_tensors() pass def forward(self, binary: np.ndarray): reshaped = binary.reshape((-1, (self.N * self.autoencoder.parallel))) reshaped_ho = misc.bit_matrix2one_hot(reshaped) input_index = self.interpreter.get_input_details()[0]["index"] input_dtype = self.interpreter.get_input_details()[0]["dtype"] input_shape = self.interpreter.get_input_details()[0]["shape"] output_index = self.interpreter.get_output_details()[0]["index"] output_shape = self.interpreter.get_output_details()[0]["shape"] x = np.zeros((len(reshaped_ho), output_shape[1])) for i, ho in enumerate(reshaped_ho): self.interpreter.set_tensor(input_index, ho.reshape(input_shape).astype(input_dtype)) self.interpreter.invoke() x[i] = self.interpreter.get_tensor(output_index) if self.autoencoder.bipolar: x = x * 2 - 1 if self.autoencoder.parallel > 1: x = x.reshape((-1, self.autoencoder.signal_dim)) f = np.zeros(x.shape[0]) if self.autoencoder.signal_dim <= 1: p = np.zeros(x.shape[0]) else: p = x[:, 1] x3 = misc.rect2polar(np.c_[x[:, 0], p, f]) return basic.RFSignal(x3) class LiteTFDemod(defs.Demodulator): def __init__(self, name, autoencoder): super().__init__(2 ** autoencoder.N) self.autoencoder = autoencoder tflite_models_dir = pathlib.Path("/tmp/tflite/") tflite_model_file = tflite_models_dir / (name + ".tflite") self.interpreter = tf.lite.Interpreter(model_path=str(tflite_model_file)) self.interpreter.allocate_tensors() def forward(self, values: defs.Signal) -> np.ndarray: if self.autoencoder.signal_dim <= 1: val = values.rect_x else: val = values.rect if self.autoencoder.parallel > 1: val = val.reshape((-1, self.autoencoder.parallel)) input_index = self.interpreter.get_input_details()[0]["index"] input_dtype = self.interpreter.get_input_details()[0]["dtype"] input_shape = self.interpreter.get_input_details()[0]["shape"] output_index = self.interpreter.get_output_details()[0]["index"] output_shape = self.interpreter.get_output_details()[0]["shape"] decoded = np.zeros((len(val), output_shape[1])) for i, v in enumerate(val): self.interpreter.set_tensor(input_index, v.reshape(input_shape).astype(input_dtype)) self.interpreter.invoke() decoded[i] = self.interpreter.get_tensor(output_index) result = misc.int2bit_array(decoded.argmax(axis=1), self.N * self.autoencoder.parallel) return result.reshape(-1, ) def _test_autoencoder_perf(): assert float(tf.__version__[:3]) >= 2.3 # aenc = Autoencoder(3, -15) # aenc.train(samples=1e6) # plt.plot(*get_SNR( # aenc.get_modulator(), # aenc.get_demodulator(), # ber_func=get_AWGN_ber, # samples=100000, # steps=50, # start=-5, # stop=15 # ), '-', label='3Bit AE') # aenc = Autoencoder(4, -25, bipolar=True, dtype=tf.float64) # aenc.train(samples=5e5) # plt.plot(*get_SNR( # aenc.get_modulator(), # aenc.get_demodulator(), # ber_func=get_AWGN_ber, # samples=100000, # steps=50, # start=-5, # stop=15 # ), '-', label='4Bit AE F64') aenc = Autoencoder(4, -25, bipolar=True) aenc.train(epoch_size=1e3, epochs=10) # # m = aenc.N * aenc.parallel x_train = misc.bit_matrix2one_hot(misc.generate_random_bit_array(100*m).reshape((-1, m))) x_train_enc = aenc.encoder(x_train) x_train = tf.cast(x_train, tf.float32) # # plt.plot(*get_SNR( # aenc.get_modulator(), # aenc.get_demodulator(), # ber_func=get_AWGN_ber, # samples=100000, # steps=50, # start=-5, # stop=15 # ), '-', label='4AE F32') # # # def save_tfline(model, name, types=None, ops=None, io_types=None, train_x=None): converter = tf.lite.TFLiteConverter.from_keras_model(model) if types is not None: converter.optimizations = [tf.lite.Optimize.DEFAULT] converter.target_spec.supported_types = types if ops is not None: converter.optimizations = [tf.lite.Optimize.DEFAULT] converter.target_spec.supported_ops = ops if io_types is not None: converter.inference_input_type = io_types converter.inference_output_type = io_types if train_x is not None: def representative_data_gen(): for input_value in tf.data.Dataset.from_tensor_slices(train_x).batch(1).take(100): yield [input_value] converter.representative_dataset = representative_data_gen tflite_model = converter.convert() tflite_models_dir = pathlib.Path("/tmp/tflite/") tflite_models_dir.mkdir(exist_ok=True, parents=True) tflite_model_file = tflite_models_dir / (name + ".tflite") tflite_model_file.write_bytes(tflite_model) print("Saving models") save_tfline(aenc.encoder, "default_enc") save_tfline(aenc.decoder, "default_dec") # # save_tfline(aenc.encoder, "float16_enc", [tf.float16]) # save_tfline(aenc.decoder, "float16_dec", [tf.float16]) # # save_tfline(aenc.encoder, "bfloat16_enc", [tf.bfloat16]) # save_tfline(aenc.decoder, "bfloat16_dec", [tf.bfloat16]) INT16X8 = tf.lite.OpsSet.EXPERIMENTAL_TFLITE_BUILTINS_ACTIVATIONS_INT16_WEIGHTS_INT8 save_tfline(aenc.encoder, "int16x8_enc", ops=[INT16X8], train_x=x_train) save_tfline(aenc.decoder, "int16x8_dec", ops=[INT16X8], train_x=x_train_enc) # save_tfline(aenc.encoder, "int8_enc", ops=[tf.lite.OpsSet.TFLITE_BUILTINS_INT8], io_types=tf.uint8, train_x=x_train) # save_tfline(aenc.decoder, "int8_dec", ops=[tf.lite.OpsSet.TFLITE_BUILTINS_INT8], io_types=tf.uint8, train_x=x_train_enc) print("Testing BER vs SNR") plt.plot(*get_SNR( LiteTFMod("default_enc", aenc), LiteTFDemod("default_dec", aenc), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15 ), '-', label='4AE F32') # plt.plot(*get_SNR( # LiteTFMod("float16_enc", aenc), # LiteTFDemod("float16_dec", aenc), # ber_func=get_AWGN_ber, # samples=100000, # steps=50, # start=-5, # stop=15 # ), '-', label='4AE F16') # # # plt.plot(*get_SNR( # LiteTFMod("bfloat16_enc", aenc), # LiteTFDemod("bfloat16_dec", aenc), # ber_func=get_AWGN_ber, # samples=100000, # steps=50, # start=-5, # stop=15 # ), '-', label='4AE BF16') # plt.plot(*get_SNR( LiteTFMod("int16x8_enc", aenc), LiteTFDemod("int16x8_dec", aenc), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15 ), '-', label='4AE I16x8') # plt.plot(*get_SNR( # AlphabetMod('16qam', 10e6), # AlphabetDemod('16qam', 10e6), # ber_func=get_AWGN_ber, # samples=100000, # steps=50, # start=-5, # stop=15, # ), '-', label='16qam') plt.yscale('log') plt.grid() plt.xlabel('SNR dB') plt.ylabel('BER') plt.title("Autoencoder with different precision data types") plt.legend() plt.savefig('autoencoder_compression.eps', format='eps') plt.show() view_encoder(aenc.encoder, 4) # aenc = Autoencoder(5, -25) # aenc.train(samples=2e6) # plt.plot(*get_SNR( # aenc.get_modulator(), # aenc.get_demodulator(), # ber_func=get_AWGN_ber, # samples=100000, # steps=50, # start=-5, # stop=15 # ), '-', label='5Bit AE') # # aenc = Autoencoder(6, -25) # aenc.train(samples=2e6) # plt.plot(*get_SNR( # aenc.get_modulator(), # aenc.get_demodulator(), # ber_func=get_AWGN_ber, # samples=100000, # steps=50, # start=-5, # stop=15 # ), '-', label='6Bit AE') # # for scheme in ['64qam', '32qam', '16qam', 'qpsk', '8psk']: # plt.plot(*get_SNR( # AlphabetMod(scheme, 10e6), # AlphabetDemod(scheme, 10e6), # ber_func=get_AWGN_ber, # samples=100000, # steps=50, # start=-5, # stop=15, # ), '-', label=scheme.upper()) # # plt.yscale('log') # plt.grid() # plt.xlabel('SNR dB') # plt.title("Autoencoder vs defined modulations") # plt.legend() # plt.show() def _test_autoencoder_perf2(): aenc = Autoencoder(2, -20) aenc.train(samples=3e6) plt.plot(*get_SNR(aenc.get_modulator(), aenc.get_demodulator(), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15), '-', label='2Bit AE') aenc = Autoencoder(3, -20) aenc.train(samples=3e6) plt.plot(*get_SNR(aenc.get_modulator(), aenc.get_demodulator(), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15), '-', label='3Bit AE') aenc = Autoencoder(4, -20) aenc.train(samples=3e6) plt.plot(*get_SNR(aenc.get_modulator(), aenc.get_demodulator(), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15), '-', label='4Bit AE') aenc = Autoencoder(5, -20) aenc.train(samples=3e6) plt.plot(*get_SNR(aenc.get_modulator(), aenc.get_demodulator(), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15), '-', label='5Bit AE') for a in ['qpsk', '8psk', '16qam', '32qam', '64qam']: try: plt.plot(*get_SNR(AlphabetMod(a, 10e6), AlphabetDemod(a, 10e6), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15,), '-', label=a.upper()) except KeyboardInterrupt: break except Exception: pass plt.yscale('log') plt.grid() plt.xlabel('SNR dB') plt.title("Autoencoder vs defined modulations") plt.legend() plt.savefig('autoencoder_mods.eps', format='eps') plt.show() # view_encoder(aenc.encoder, 2) def _test_autoencoder_perf_qnn(): fh = logging.FileHandler("model_quantizing.log", mode="w+") fh.setLevel(logging.INFO) sh = logging.StreamHandler(stream=stdout) sh.setLevel(logging.INFO) logger = logging.getLogger(__name__) logger.setLevel(level=logging.INFO) logger.addHandler(fh) logger.addHandler(sh) aenc = Autoencoder(4, -25, bipolar=True) # aenc.encoder.save_weights('ae_enc.bin') # aenc.decoder.save_weights('ae_dec.bin') # aenc.encoder.load_weights('ae_enc.bin') # aenc.decoder.load_weights('ae_dec.bin') try: aenc.load_weights('autoencoder') except NotFoundError: aenc.train(epoch_size=1e3, epochs=10) aenc.save_weights('autoencoder') aenc.compile(optimizer='adam', loss=tf.losses.MeanSquaredError()) m = aenc.N * aenc.parallel view_encoder(aenc.encoder, 4, title="FP32 Alphabet") batch_size = 25000 x_train = misc.bit_matrix2one_hot(misc.generate_random_bit_array(batch_size*m).reshape((-1, m))) x_test = misc.bit_matrix2one_hot(misc.generate_random_bit_array(5000*m).reshape((-1, m))) bits = [np.log2(i) for i in (32,)][0] alphabet_scalars = 2 # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] num_layers = sum([layer.__class__.__name__ in ('Dense',) for layer in aenc.all_layers]) # for b in (3, 6, 8, 12, 16, 24, 32, 48, 64): get_data = (sample for sample in x_train) for i in range(num_layers): get_data = chain(get_data, (sample for sample in x_train)) qnn = QuantizedNeuralNetwork( network=aenc, batch_size=batch_size, get_data=get_data, logger=logger, bits=np.log2(16), alphabet_scalar=alphabet_scalars, ) qnn.quantize_network() qnn.quantized_net.compile(optimizer='adam', loss=tf.keras.losses.MeanSquaredError()) view_encoder(qnn.quantized_net, 4, title=f"Quantised {16}b alphabet") # view_encoder(qnn_enc.quantized_net, 4, title=f"Quantised {b}b alphabet") # _, q_accuracy = qnn.quantized_net.evaluate(x_test, x_test, verbose=True) pass if __name__ == '__main__': # plt.plot(*get_SNR( # AlphabetMod('16qam', 10e6), # AlphabetDemod('16qam', 10e6), # ber_func=get_AWGN_ber, # samples=100000, # steps=50, # start=-5, # stop=15, # ), '-', label='16qam AWGN') # # plt.plot(*get_SNR( # AlphabetMod('16qam', 10e6), # AlphabetDemod('16qam', 10e6), # samples=100000, # steps=50, # start=-5, # stop=15, # ), '-', label='16qam OPTICAL') # # plt.yscale('log') # plt.grid() # plt.xlabel('SNR dB') # plt.show() # _test_autoencoder_perf() _test_autoencoder_perf_qnn() # _test_autoencoder_perf2() # _test_autoencoder_pretrain() # _test_optics_autoencoder()