""" These are some unstructured tests. Feel free to use this code for anything else """ import logging import pathlib from itertools import chain from sys import stdout import defs from graphs import get_SNR, get_AWGN_ber from models import basic from models.autoencoder import Autoencoder, view_encoder import matplotlib.pyplot as plt import tensorflow as tf import misc import numpy as np from models.basic import AlphabetDemod, AlphabetMod from models.optical_channel import OpticalChannel from models.quantized_net import QuantizedNeuralNetwork def _test_optics_autoencoder(): ch = OpticalChannel( noise_level=-10, dispersion=-21.7, symbol_rate=10e9, sample_rate=400e9, length=10, pulse_shape='rcos', sqrt_out=True ) tf.executing_eagerly() aenc = Autoencoder(4, channel=ch) aenc.train(samples=1e6) plt.plot(*get_SNR( aenc.get_modulator(), aenc.get_demodulator(), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15 ), '-', label='AE') plt.plot(*get_SNR( AlphabetMod('4pam', 10e6), AlphabetDemod('4pam', 10e6), samples=30000, steps=50, start=-5, stop=15, length=1, pulse_shape='rcos' ), '-', label='4PAM') plt.yscale('log') plt.grid() plt.xlabel('SNR dB') plt.title("Autoencoder Performance") plt.legend() plt.savefig('optics_autoencoder.eps', format='eps') plt.show() def _test_autoencoder_pretrain(): # aenc = Autoencoder(4, -25) # aenc.train(samples=1e6) # plt.plot(*get_SNR( # aenc.get_modulator(), # aenc.get_demodulator(), # ber_func=get_AWGN_ber, # samples=100000, # steps=50, # start=-5, # stop=15 # ), '-', label='Random AE') aenc = Autoencoder(4, -25) # aenc.fit_encoder('16qam', 3e4) aenc.fit_decoder('16qam', 1e5) plt.plot(*get_SNR( aenc.get_modulator(), aenc.get_demodulator(), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15 ), '-', label='16QAM Pre-trained AE') aenc.train(samples=3e6) plt.plot(*get_SNR( aenc.get_modulator(), aenc.get_demodulator(), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15 ), '-', label='16QAM Post-trained AE') plt.plot(*get_SNR( AlphabetMod('16qam', 10e6), AlphabetDemod('16qam', 10e6), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15 ), '-', label='16QAM') plt.yscale('log') plt.grid() plt.xlabel('SNR dB') plt.title("4Bit Autoencoder Performance") plt.legend() plt.show() class LiteTFMod(defs.Modulator): def __init__(self, name, autoencoder): super().__init__(2 ** autoencoder.N) self.autoencoder = autoencoder tflite_models_dir = pathlib.Path("/tmp/tflite/") tflite_model_file = tflite_models_dir / (name + ".tflite") self.interpreter = tf.lite.Interpreter(model_path=str(tflite_model_file)) self.interpreter.allocate_tensors() pass def forward(self, binary: np.ndarray): reshaped = binary.reshape((-1, (self.N * self.autoencoder.parallel))) reshaped_ho = misc.bit_matrix2one_hot(reshaped) input_index = self.interpreter.get_input_details()[0]["index"] input_dtype = self.interpreter.get_input_details()[0]["dtype"] input_shape = self.interpreter.get_input_details()[0]["shape"] output_index = self.interpreter.get_output_details()[0]["index"] output_shape = self.interpreter.get_output_details()[0]["shape"] x = np.zeros((len(reshaped_ho), output_shape[1])) for i, ho in enumerate(reshaped_ho): self.interpreter.set_tensor(input_index, ho.reshape(input_shape).astype(input_dtype)) self.interpreter.invoke() x[i] = self.interpreter.get_tensor(output_index) if self.autoencoder.bipolar: x = x * 2 - 1 if self.autoencoder.parallel > 1: x = x.reshape((-1, self.autoencoder.signal_dim)) f = np.zeros(x.shape[0]) if self.autoencoder.signal_dim <= 1: p = np.zeros(x.shape[0]) else: p = x[:, 1] x3 = misc.rect2polar(np.c_[x[:, 0], p, f]) return basic.RFSignal(x3) class LiteTFDemod(defs.Demodulator): def __init__(self, name, autoencoder): super().__init__(2 ** autoencoder.N) self.autoencoder = autoencoder tflite_models_dir = pathlib.Path("/tmp/tflite/") tflite_model_file = tflite_models_dir / (name + ".tflite") self.interpreter = tf.lite.Interpreter(model_path=str(tflite_model_file)) self.interpreter.allocate_tensors() def forward(self, values: defs.Signal) -> np.ndarray: if self.autoencoder.signal_dim <= 1: val = values.rect_x else: val = values.rect if self.autoencoder.parallel > 1: val = val.reshape((-1, self.autoencoder.parallel)) input_index = self.interpreter.get_input_details()[0]["index"] input_dtype = self.interpreter.get_input_details()[0]["dtype"] input_shape = self.interpreter.get_input_details()[0]["shape"] output_index = self.interpreter.get_output_details()[0]["index"] output_shape = self.interpreter.get_output_details()[0]["shape"] decoded = np.zeros((len(val), output_shape[1])) for i, v in enumerate(val): self.interpreter.set_tensor(input_index, v.reshape(input_shape).astype(input_dtype)) self.interpreter.invoke() decoded[i] = self.interpreter.get_tensor(output_index) result = misc.int2bit_array(decoded.argmax(axis=1), self.N * self.autoencoder.parallel) return result.reshape(-1, ) def _test_autoencoder_perf(): assert float(tf.__version__[:3]) >= 2.3 # aenc = Autoencoder(3, -15) # aenc.train(samples=1e6) # plt.plot(*get_SNR( # aenc.get_modulator(), # aenc.get_demodulator(), # ber_func=get_AWGN_ber, # samples=100000, # steps=50, # start=-5, # stop=15 # ), '-', label='3Bit AE') # aenc = Autoencoder(4, -25, bipolar=True, dtype=tf.float64) # aenc.train(samples=5e5) # plt.plot(*get_SNR( # aenc.get_modulator(), # aenc.get_demodulator(), # ber_func=get_AWGN_ber, # samples=100000, # steps=50, # start=-5, # stop=15 # ), '-', label='4Bit AE F64') aenc = Autoencoder(4, -25, bipolar=True) aenc.train(epoch_size=1e3, epochs=10) # # m = aenc.N * aenc.parallel x_train = misc.bit_matrix2one_hot(misc.generate_random_bit_array(100*m).reshape((-1, m))) x_train_enc = aenc.encoder(x_train) x_train = tf.cast(x_train, tf.float32) # # plt.plot(*get_SNR( # aenc.get_modulator(), # aenc.get_demodulator(), # ber_func=get_AWGN_ber, # samples=100000, # steps=50, # start=-5, # stop=15 # ), '-', label='4AE F32') # # # def save_tfline(model, name, types=None, ops=None, io_types=None, train_x=None): converter = tf.lite.TFLiteConverter.from_keras_model(model) if types is not None: converter.optimizations = [tf.lite.Optimize.DEFAULT] converter.target_spec.supported_types = types if ops is not None: converter.optimizations = [tf.lite.Optimize.DEFAULT] converter.target_spec.supported_ops = ops if io_types is not None: converter.inference_input_type = io_types converter.inference_output_type = io_types if train_x is not None: def representative_data_gen(): for input_value in tf.data.Dataset.from_tensor_slices(train_x).batch(1).take(100): yield [input_value] converter.representative_dataset = representative_data_gen tflite_model = converter.convert() tflite_models_dir = pathlib.Path("/tmp/tflite/") tflite_models_dir.mkdir(exist_ok=True, parents=True) tflite_model_file = tflite_models_dir / (name + ".tflite") tflite_model_file.write_bytes(tflite_model) print("Saving models") save_tfline(aenc.encoder, "default_enc") save_tfline(aenc.decoder, "default_dec") # # save_tfline(aenc.encoder, "float16_enc", [tf.float16]) # save_tfline(aenc.decoder, "float16_dec", [tf.float16]) # # save_tfline(aenc.encoder, "bfloat16_enc", [tf.bfloat16]) # save_tfline(aenc.decoder, "bfloat16_dec", [tf.bfloat16]) INT16X8 = tf.lite.OpsSet.EXPERIMENTAL_TFLITE_BUILTINS_ACTIVATIONS_INT16_WEIGHTS_INT8 save_tfline(aenc.encoder, "int16x8_enc", ops=[INT16X8], train_x=x_train) save_tfline(aenc.decoder, "int16x8_dec", ops=[INT16X8], train_x=x_train_enc) # save_tfline(aenc.encoder, "int8_enc", ops=[tf.lite.OpsSet.TFLITE_BUILTINS_INT8], io_types=tf.uint8, train_x=x_train) # save_tfline(aenc.decoder, "int8_dec", ops=[tf.lite.OpsSet.TFLITE_BUILTINS_INT8], io_types=tf.uint8, train_x=x_train_enc) print("Testing BER vs SNR") plt.plot(*get_SNR( LiteTFMod("default_enc", aenc), LiteTFDemod("default_dec", aenc), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15 ), '-', label='4AE F32') # plt.plot(*get_SNR( # LiteTFMod("float16_enc", aenc), # LiteTFDemod("float16_dec", aenc), # ber_func=get_AWGN_ber, # samples=100000, # steps=50, # start=-5, # stop=15 # ), '-', label='4AE F16') # # # plt.plot(*get_SNR( # LiteTFMod("bfloat16_enc", aenc), # LiteTFDemod("bfloat16_dec", aenc), # ber_func=get_AWGN_ber, # samples=100000, # steps=50, # start=-5, # stop=15 # ), '-', label='4AE BF16') # plt.plot(*get_SNR( LiteTFMod("int16x8_enc", aenc), LiteTFDemod("int16x8_dec", aenc), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15 ), '-', label='4AE I16x8') # plt.plot(*get_SNR( # AlphabetMod('16qam', 10e6), # AlphabetDemod('16qam', 10e6), # ber_func=get_AWGN_ber, # samples=100000, # steps=50, # start=-5, # stop=15, # ), '-', label='16qam') plt.yscale('log') plt.grid() plt.xlabel('SNR dB') plt.ylabel('BER') plt.title("Autoencoder with different precision data types") plt.legend() plt.savefig('autoencoder_compression.eps', format='eps') plt.show() view_encoder(aenc.encoder, 4) # aenc = Autoencoder(5, -25) # aenc.train(samples=2e6) # plt.plot(*get_SNR( # aenc.get_modulator(), # aenc.get_demodulator(), # ber_func=get_AWGN_ber, # samples=100000, # steps=50, # start=-5, # stop=15 # ), '-', label='5Bit AE') # # aenc = Autoencoder(6, -25) # aenc.train(samples=2e6) # plt.plot(*get_SNR( # aenc.get_modulator(), # aenc.get_demodulator(), # ber_func=get_AWGN_ber, # samples=100000, # steps=50, # start=-5, # stop=15 # ), '-', label='6Bit AE') # # for scheme in ['64qam', '32qam', '16qam', 'qpsk', '8psk']: # plt.plot(*get_SNR( # AlphabetMod(scheme, 10e6), # AlphabetDemod(scheme, 10e6), # ber_func=get_AWGN_ber, # samples=100000, # steps=50, # start=-5, # stop=15, # ), '-', label=scheme.upper()) # # plt.yscale('log') # plt.grid() # plt.xlabel('SNR dB') # plt.title("Autoencoder vs defined modulations") # plt.legend() # plt.show() def _test_autoencoder_perf2(): aenc = Autoencoder(2, -20) aenc.train(samples=3e6) plt.plot(*get_SNR(aenc.get_modulator(), aenc.get_demodulator(), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15), '-', label='2Bit AE') aenc = Autoencoder(3, -20) aenc.train(samples=3e6) plt.plot(*get_SNR(aenc.get_modulator(), aenc.get_demodulator(), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15), '-', label='3Bit AE') aenc = Autoencoder(4, -20) aenc.train(samples=3e6) plt.plot(*get_SNR(aenc.get_modulator(), aenc.get_demodulator(), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15), '-', label='4Bit AE') aenc = Autoencoder(5, -20) aenc.train(samples=3e6) plt.plot(*get_SNR(aenc.get_modulator(), aenc.get_demodulator(), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15), '-', label='5Bit AE') for a in ['qpsk', '8psk', '16qam', '32qam', '64qam']: try: plt.plot(*get_SNR(AlphabetMod(a, 10e6), AlphabetDemod(a, 10e6), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15,), '-', label=a.upper()) except KeyboardInterrupt: break except Exception: pass plt.yscale('log') plt.grid() plt.xlabel('SNR dB') plt.title("Autoencoder vs defined modulations") plt.legend() plt.savefig('autoencoder_mods.eps', format='eps') plt.show() # view_encoder(aenc.encoder, 2) def _test_autoencoder_perf_qnn(): fh = logging.FileHandler("model_quantizing.log", mode="w+") fh.setLevel(logging.INFO) sh = logging.StreamHandler(stream=stdout) sh.setLevel(logging.INFO) logger = logging.getLogger(__name__) logger.setLevel(level=logging.INFO) logger.addHandler(fh) logger.addHandler(sh) aenc = Autoencoder(4, -25, bipolar=True) # aenc.train(epoch_size=1e3, epochs=10) # aenc.encoder.save_weights('ae_enc.bin') # aenc.decoder.save_weights('ae_dec.bin') # aenc.encoder.load_weights('ae_enc.bin') # aenc.decoder.load_weights('ae_dec.bin') aenc.load_weights('autoencoder.bin') aenc.compile(optimizer='adam', loss=tf.losses.MeanSquaredError()) aenc.build() aenc.summary() m = aenc.N * aenc.parallel view_encoder(aenc.encoder, 4, title="FP32 Alphabet") batch_size = 25000 x_train = misc.bit_matrix2one_hot(misc.generate_random_bit_array(batch_size*m).reshape((-1, m))) x_test = misc.bit_matrix2one_hot(misc.generate_random_bit_array(5000*m).reshape((-1, m))) bits = [np.log2(i) for i in (32,)][0] alphabet_scalars = 2 # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] num_layers = sum([layer.__class__.__name__ in ('Dense',) for layer in aenc.all_layers]) # for b in (3, 6, 8, 12, 16, 24, 32, 48, 64): get_data = (sample for sample in x_train) for i in range(num_layers): get_data = chain(get_data, (sample for sample in x_train)) qnn = QuantizedNeuralNetwork( network=aenc, batch_size=batch_size, get_data=get_data, logger=logger, bits=np.log2(16), alphabet_scalar=alphabet_scalars, ) qnn.quantize_network() qnn.quantized_net.compile(optimizer='adam', loss=tf.keras.losses.MeanSquaredError()) view_encoder(qnn.quantized_net, 4, title=f"Quantised {16}b alphabet") # view_encoder(qnn_enc.quantized_net, 4, title=f"Quantised {b}b alphabet") # _, q_accuracy = qnn.quantized_net.evaluate(x_test, x_test, verbose=True) pass if __name__ == '__main__': # plt.plot(*get_SNR( # AlphabetMod('16qam', 10e6), # AlphabetDemod('16qam', 10e6), # ber_func=get_AWGN_ber, # samples=100000, # steps=50, # start=-5, # stop=15, # ), '-', label='16qam AWGN') # # plt.plot(*get_SNR( # AlphabetMod('16qam', 10e6), # AlphabetDemod('16qam', 10e6), # samples=100000, # steps=50, # start=-5, # stop=15, # ), '-', label='16qam OPTICAL') # # plt.yscale('log') # plt.grid() # plt.xlabel('SNR dB') # plt.show() # _test_autoencoder_perf() _test_autoencoder_perf_qnn() # _test_autoencoder_perf2() # _test_autoencoder_pretrain() # _test_optics_autoencoder()