| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597 |
- """
- These are some unstructured tests. Feel free to use this code for anything else
- """
- import logging
- import pathlib
- from itertools import chain
- from sys import stdout
- from tensorflow.python.framework.errors_impl import NotFoundError
- from tensorflow.keras import backend as K
- import defs
- from graphs import get_SNR, get_AWGN_ber, show_train_history
- from models import basic
- from models.autoencoder import Autoencoder, view_encoder
- import matplotlib.pyplot as plt
- import tensorflow as tf
- import misc
- import numpy as np
- from models.basic import AlphabetDemod, AlphabetMod
- from models.data import BinaryGenerator
- from models.layers import BitsToSymbols, SymbolsToBits
- from models.optical_channel import OpticalChannel
- from models.quantized_net import QuantizedNeuralNetwork
- def _test_optics_autoencoder():
- ch = OpticalChannel(
- noise_level=-10,
- dispersion=-21.7,
- symbol_rate=10e9,
- sample_rate=400e9,
- length=10,
- pulse_shape='rcos',
- sqrt_out=True
- )
- tf.executing_eagerly()
- aenc = Autoencoder(4, channel=ch)
- aenc.train(samples=1e6)
- plt.plot(*get_SNR(
- aenc.get_modulator(),
- aenc.get_demodulator(),
- ber_func=get_AWGN_ber,
- samples=100000,
- steps=50,
- start=-5,
- stop=15
- ), '-', label='AE')
- plt.plot(*get_SNR(
- AlphabetMod('4pam', 10e6),
- AlphabetDemod('4pam', 10e6),
- samples=30000,
- steps=50,
- start=-5,
- stop=15,
- length=1,
- pulse_shape='rcos'
- ), '-', label='4PAM')
- plt.yscale('log')
- plt.grid()
- plt.xlabel('SNR dB')
- plt.title("Autoencoder Performance")
- plt.legend()
- plt.savefig('optics_autoencoder.eps', format='eps')
- plt.show()
- def _test_autoencoder_pretrain():
- # aenc = Autoencoder(4, -25)
- # aenc.train(samples=1e6)
- # plt.plot(*get_SNR(
- # aenc.get_modulator(),
- # aenc.get_demodulator(),
- # ber_func=get_AWGN_ber,
- # samples=100000,
- # steps=50,
- # start=-5,
- # stop=15
- # ), '-', label='Random AE')
- aenc = Autoencoder(4, -25)
- # aenc.fit_encoder('16qam', 3e4)
- aenc.fit_decoder('16qam', 1e5)
- plt.plot(*get_SNR(
- aenc.get_modulator(),
- aenc.get_demodulator(),
- ber_func=get_AWGN_ber,
- samples=100000,
- steps=50,
- start=-5,
- stop=15
- ), '-', label='16QAM Pre-trained AE')
- aenc.train(samples=3e6)
- plt.plot(*get_SNR(
- aenc.get_modulator(),
- aenc.get_demodulator(),
- ber_func=get_AWGN_ber,
- samples=100000,
- steps=50,
- start=-5,
- stop=15
- ), '-', label='16QAM Post-trained AE')
- plt.plot(*get_SNR(
- AlphabetMod('16qam', 10e6),
- AlphabetDemod('16qam', 10e6),
- ber_func=get_AWGN_ber,
- samples=100000,
- steps=50,
- start=-5,
- stop=15
- ), '-', label='16QAM')
- plt.yscale('log')
- plt.grid()
- plt.xlabel('SNR dB')
- plt.title("4Bit Autoencoder Performance")
- plt.legend()
- plt.show()
- class LiteTFMod(defs.Modulator):
- def __init__(self, name, autoencoder):
- super().__init__(2 ** autoencoder.N)
- self.autoencoder = autoencoder
- tflite_models_dir = pathlib.Path("/tmp/tflite/")
- tflite_model_file = tflite_models_dir / (name + ".tflite")
- self.interpreter = tf.lite.Interpreter(model_path=str(tflite_model_file))
- self.interpreter.allocate_tensors()
- pass
- def forward(self, binary: np.ndarray):
- reshaped = binary.reshape((-1, (self.N * self.autoencoder.parallel)))
- reshaped_ho = misc.bit_matrix2one_hot(reshaped)
- input_index = self.interpreter.get_input_details()[0]["index"]
- input_dtype = self.interpreter.get_input_details()[0]["dtype"]
- input_shape = self.interpreter.get_input_details()[0]["shape"]
- output_index = self.interpreter.get_output_details()[0]["index"]
- output_shape = self.interpreter.get_output_details()[0]["shape"]
- x = np.zeros((len(reshaped_ho), output_shape[1]))
- for i, ho in enumerate(reshaped_ho):
- self.interpreter.set_tensor(input_index, ho.reshape(input_shape).astype(input_dtype))
- self.interpreter.invoke()
- x[i] = self.interpreter.get_tensor(output_index)
- if self.autoencoder.bipolar:
- x = x * 2 - 1
- if self.autoencoder.parallel > 1:
- x = x.reshape((-1, self.autoencoder.signal_dim))
- f = np.zeros(x.shape[0])
- if self.autoencoder.signal_dim <= 1:
- p = np.zeros(x.shape[0])
- else:
- p = x[:, 1]
- x3 = misc.rect2polar(np.c_[x[:, 0], p, f])
- return basic.RFSignal(x3)
- class LiteTFDemod(defs.Demodulator):
- def __init__(self, name, autoencoder):
- super().__init__(2 ** autoencoder.N)
- self.autoencoder = autoencoder
- tflite_models_dir = pathlib.Path("/tmp/tflite/")
- tflite_model_file = tflite_models_dir / (name + ".tflite")
- self.interpreter = tf.lite.Interpreter(model_path=str(tflite_model_file))
- self.interpreter.allocate_tensors()
- def forward(self, values: defs.Signal) -> np.ndarray:
- if self.autoencoder.signal_dim <= 1:
- val = values.rect_x
- else:
- val = values.rect
- if self.autoencoder.parallel > 1:
- val = val.reshape((-1, self.autoencoder.parallel))
- input_index = self.interpreter.get_input_details()[0]["index"]
- input_dtype = self.interpreter.get_input_details()[0]["dtype"]
- input_shape = self.interpreter.get_input_details()[0]["shape"]
- output_index = self.interpreter.get_output_details()[0]["index"]
- output_shape = self.interpreter.get_output_details()[0]["shape"]
- decoded = np.zeros((len(val), output_shape[1]))
- for i, v in enumerate(val):
- self.interpreter.set_tensor(input_index, v.reshape(input_shape).astype(input_dtype))
- self.interpreter.invoke()
- decoded[i] = self.interpreter.get_tensor(output_index)
- result = misc.int2bit_array(decoded.argmax(axis=1), self.N * self.autoencoder.parallel)
- return result.reshape(-1, )
- def _test_autoencoder_perf():
- assert float(tf.__version__[:3]) >= 2.3
- # aenc = Autoencoder(3, -15)
- # aenc.train(samples=1e6)
- # plt.plot(*get_SNR(
- # aenc.get_modulator(),
- # aenc.get_demodulator(),
- # ber_func=get_AWGN_ber,
- # samples=100000,
- # steps=50,
- # start=-5,
- # stop=15
- # ), '-', label='3Bit AE')
- # aenc = Autoencoder(4, -25, bipolar=True, dtype=tf.float64)
- # aenc.train(samples=5e5)
- # plt.plot(*get_SNR(
- # aenc.get_modulator(),
- # aenc.get_demodulator(),
- # ber_func=get_AWGN_ber,
- # samples=100000,
- # steps=50,
- # start=-5,
- # stop=15
- # ), '-', label='4Bit AE F64')
- aenc = Autoencoder(4, -25, bipolar=True)
- aenc.train(epoch_size=1e3, epochs=10)
- # #
- m = aenc.N * aenc.parallel
- x_train = misc.bit_matrix2one_hot(misc.generate_random_bit_array(100 * m).reshape((-1, m)))
- x_train_enc = aenc.encoder(x_train)
- x_train = tf.cast(x_train, tf.float32)
- #
- # plt.plot(*get_SNR(
- # aenc.get_modulator(),
- # aenc.get_demodulator(),
- # ber_func=get_AWGN_ber,
- # samples=100000,
- # steps=50,
- # start=-5,
- # stop=15
- # ), '-', label='4AE F32')
- # # #
- def save_tfline(model, name, types=None, ops=None, io_types=None, train_x=None):
- converter = tf.lite.TFLiteConverter.from_keras_model(model)
- if types is not None:
- converter.optimizations = [tf.lite.Optimize.DEFAULT]
- converter.target_spec.supported_types = types
- if ops is not None:
- converter.optimizations = [tf.lite.Optimize.DEFAULT]
- converter.target_spec.supported_ops = ops
- if io_types is not None:
- converter.inference_input_type = io_types
- converter.inference_output_type = io_types
- if train_x is not None:
- def representative_data_gen():
- for input_value in tf.data.Dataset.from_tensor_slices(train_x).batch(1).take(100):
- yield [input_value]
- converter.representative_dataset = representative_data_gen
- tflite_model = converter.convert()
- tflite_models_dir = pathlib.Path("/tmp/tflite/")
- tflite_models_dir.mkdir(exist_ok=True, parents=True)
- tflite_model_file = tflite_models_dir / (name + ".tflite")
- tflite_model_file.write_bytes(tflite_model)
- print("Saving models")
- save_tfline(aenc.encoder, "default_enc")
- save_tfline(aenc.decoder, "default_dec")
- #
- # save_tfline(aenc.encoder, "float16_enc", [tf.float16])
- # save_tfline(aenc.decoder, "float16_dec", [tf.float16])
- #
- # save_tfline(aenc.encoder, "bfloat16_enc", [tf.bfloat16])
- # save_tfline(aenc.decoder, "bfloat16_dec", [tf.bfloat16])
- INT16X8 = tf.lite.OpsSet.EXPERIMENTAL_TFLITE_BUILTINS_ACTIVATIONS_INT16_WEIGHTS_INT8
- save_tfline(aenc.encoder, "int16x8_enc", ops=[INT16X8], train_x=x_train)
- save_tfline(aenc.decoder, "int16x8_dec", ops=[INT16X8], train_x=x_train_enc)
- # save_tfline(aenc.encoder, "int8_enc", ops=[tf.lite.OpsSet.TFLITE_BUILTINS_INT8], io_types=tf.uint8, train_x=x_train)
- # save_tfline(aenc.decoder, "int8_dec", ops=[tf.lite.OpsSet.TFLITE_BUILTINS_INT8], io_types=tf.uint8, train_x=x_train_enc)
- print("Testing BER vs SNR")
- plt.plot(*get_SNR(
- LiteTFMod("default_enc", aenc),
- LiteTFDemod("default_dec", aenc),
- ber_func=get_AWGN_ber,
- samples=100000,
- steps=50,
- start=-5,
- stop=15
- ), '-', label='4AE F32')
- # plt.plot(*get_SNR(
- # LiteTFMod("float16_enc", aenc),
- # LiteTFDemod("float16_dec", aenc),
- # ber_func=get_AWGN_ber,
- # samples=100000,
- # steps=50,
- # start=-5,
- # stop=15
- # ), '-', label='4AE F16')
- # #
- # plt.plot(*get_SNR(
- # LiteTFMod("bfloat16_enc", aenc),
- # LiteTFDemod("bfloat16_dec", aenc),
- # ber_func=get_AWGN_ber,
- # samples=100000,
- # steps=50,
- # start=-5,
- # stop=15
- # ), '-', label='4AE BF16')
- #
- plt.plot(*get_SNR(
- LiteTFMod("int16x8_enc", aenc),
- LiteTFDemod("int16x8_dec", aenc),
- ber_func=get_AWGN_ber,
- samples=100000,
- steps=50,
- start=-5,
- stop=15
- ), '-', label='4AE I16x8')
- # plt.plot(*get_SNR(
- # AlphabetMod('16qam', 10e6),
- # AlphabetDemod('16qam', 10e6),
- # ber_func=get_AWGN_ber,
- # samples=100000,
- # steps=50,
- # start=-5,
- # stop=15,
- # ), '-', label='16qam')
- plt.yscale('log')
- plt.grid()
- plt.xlabel('SNR dB')
- plt.ylabel('BER')
- plt.title("Autoencoder with different precision data types")
- plt.legend()
- plt.savefig('autoencoder_compression.eps', format='eps')
- plt.show()
- view_encoder(aenc.encoder, 4)
- # aenc = Autoencoder(5, -25)
- # aenc.train(samples=2e6)
- # plt.plot(*get_SNR(
- # aenc.get_modulator(),
- # aenc.get_demodulator(),
- # ber_func=get_AWGN_ber,
- # samples=100000,
- # steps=50,
- # start=-5,
- # stop=15
- # ), '-', label='5Bit AE')
- #
- # aenc = Autoencoder(6, -25)
- # aenc.train(samples=2e6)
- # plt.plot(*get_SNR(
- # aenc.get_modulator(),
- # aenc.get_demodulator(),
- # ber_func=get_AWGN_ber,
- # samples=100000,
- # steps=50,
- # start=-5,
- # stop=15
- # ), '-', label='6Bit AE')
- #
- # for scheme in ['64qam', '32qam', '16qam', 'qpsk', '8psk']:
- # plt.plot(*get_SNR(
- # AlphabetMod(scheme, 10e6),
- # AlphabetDemod(scheme, 10e6),
- # ber_func=get_AWGN_ber,
- # samples=100000,
- # steps=50,
- # start=-5,
- # stop=15,
- # ), '-', label=scheme.upper())
- #
- # plt.yscale('log')
- # plt.grid()
- # plt.xlabel('SNR dB')
- # plt.title("Autoencoder vs defined modulations")
- # plt.legend()
- # plt.show()
- def _test_autoencoder_perf2():
- aenc = Autoencoder(2, -20)
- aenc.train(samples=3e6)
- plt.plot(*get_SNR(aenc.get_modulator(), aenc.get_demodulator(), ber_func=get_AWGN_ber, samples=100000, steps=50,
- start=-5, stop=15), '-', label='2Bit AE')
- aenc = Autoencoder(3, -20)
- aenc.train(samples=3e6)
- plt.plot(*get_SNR(aenc.get_modulator(), aenc.get_demodulator(), ber_func=get_AWGN_ber, samples=100000, steps=50,
- start=-5, stop=15), '-', label='3Bit AE')
- aenc = Autoencoder(4, -20)
- aenc.train(samples=3e6)
- plt.plot(*get_SNR(aenc.get_modulator(), aenc.get_demodulator(), ber_func=get_AWGN_ber, samples=100000, steps=50,
- start=-5, stop=15), '-', label='4Bit AE')
- aenc = Autoencoder(5, -20)
- aenc.train(samples=3e6)
- plt.plot(*get_SNR(aenc.get_modulator(), aenc.get_demodulator(), ber_func=get_AWGN_ber, samples=100000, steps=50,
- start=-5, stop=15), '-', label='5Bit AE')
- for a in ['qpsk', '8psk', '16qam', '32qam', '64qam']:
- try:
- plt.plot(
- *get_SNR(AlphabetMod(a, 10e6), AlphabetDemod(a, 10e6), ber_func=get_AWGN_ber, samples=100000, steps=50,
- start=-5, stop=15, ), '-', label=a.upper())
- except KeyboardInterrupt:
- break
- except Exception:
- pass
- plt.yscale('log')
- plt.grid()
- plt.xlabel('SNR dB')
- plt.title("Autoencoder vs defined modulations")
- plt.legend()
- plt.savefig('autoencoder_mods.eps', format='eps')
- plt.show()
- # view_encoder(aenc.encoder, 2)
- def _test_autoencoder_perf_qnn():
- fh = logging.FileHandler("model_quantizing.log", mode="w+")
- fh.setLevel(logging.INFO)
- sh = logging.StreamHandler(stream=stdout)
- sh.setLevel(logging.INFO)
- logger = logging.getLogger(__name__)
- logger.setLevel(level=logging.INFO)
- logger.addHandler(fh)
- logger.addHandler(sh)
- aenc = Autoencoder(4, -25, bipolar=True)
- # aenc.encoder.save_weights('ae_enc.bin')
- # aenc.decoder.save_weights('ae_dec.bin')
- # aenc.encoder.load_weights('ae_enc.bin')
- # aenc.decoder.load_weights('ae_dec.bin')
- try:
- aenc.load_weights('autoencoder')
- except NotFoundError:
- aenc.train(epoch_size=1e3, epochs=10)
- aenc.save_weights('autoencoder')
- aenc.compile(optimizer='adam', loss=tf.losses.MeanSquaredError())
- m = aenc.N * aenc.parallel
- view_encoder(aenc.encoder, 4, title="FP32 Alphabet")
- batch_size = 25000
- x_train = misc.bit_matrix2one_hot(misc.generate_random_bit_array(batch_size * m).reshape((-1, m)))
- x_test = misc.bit_matrix2one_hot(misc.generate_random_bit_array(5000 * m).reshape((-1, m)))
- bits = [np.log2(i) for i in (32,)][0]
- alphabet_scalars = 2 # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
- num_layers = sum([layer.__class__.__name__ in ('Dense',) for layer in aenc.all_layers])
- # for b in (3, 6, 8, 12, 16, 24, 32, 48, 64):
- get_data = (sample for sample in x_train)
- for i in range(num_layers):
- get_data = chain(get_data, (sample for sample in x_train))
- qnn = QuantizedNeuralNetwork(
- network=aenc,
- batch_size=batch_size,
- get_data=get_data,
- logger=logger,
- bits=np.log2(16),
- alphabet_scalar=alphabet_scalars,
- )
- qnn.quantize_network()
- qnn.quantized_net.compile(optimizer='adam', loss=tf.keras.losses.MeanSquaredError())
- view_encoder(qnn.quantized_net, 4, title=f"Quantised {16}b alphabet")
- # view_encoder(qnn_enc.quantized_net, 4, title=f"Quantised {b}b alphabet")
- # _, q_accuracy = qnn.quantized_net.evaluate(x_test, x_test, verbose=True)
- pass
- class BitAwareAutoencoder(Autoencoder):
- def __init__(self, N, channel, **kwargs):
- super().__init__(N, channel, **kwargs, cost=self.cost)
- self.BITS = 2 ** N - 1
- # data_generator=BinaryGenerator,
- # self.b2s_layer = BitsToSymbols(2**N)
- # self.s2b_layer = SymbolsToBits(2**N)
- def cost(self, y_true, y_pred):
- y = tf.cast(y_true, dtype=tf.float32)
- z0 = tf.math.argmax(y) / self.BITS
- z1 = tf.math.argmax(y_pred) / self.BITS
- error0 = y - y_pred
- sqr_error0 = K.square(error0) # mean of the square of the error
- mean_sqr_error0 = K.mean(sqr_error0) # square root of the mean of the square of the error
- sme0 = K.sqrt(mean_sqr_error0) # return the error
- error1 = z0 - z1
- sqr_error1 = K.square(error1)
- mean_sqr_error1 = K.mean(sqr_error1)
- sme1 = K.sqrt(mean_sqr_error1)
- return sme0 + tf.cast(sme1 * 300, dtype=tf.float32)
- # def call(self, x, **kwargs):
- # x1 = self.b2s_layer(x)
- # y = self.encoder(x1)
- # z = self.channel(y)
- # z1 = self.decoder(z)
- # return self.s2b_layer(z1)
- def _bit_aware_test():
- aenc = BitAwareAutoencoder(6, -50, bipolar=True)
- try:
- aenc.load_weights('ae_bitaware')
- except NotFoundError:
- pass
- # try:
- # hist = aenc.train(
- # epochs=70,
- # epoch_size=1e3,
- # optimizer='adam',
- # # metrics=[tf.keras.metrics.Accuracy()],
- # # callbacks=[tf.keras.callbacks.EarlyStopping(monitor='loss', patience=3, min_delta=0.001)]
- # )
- # show_train_history(hist, "Autonecoder training history")
- # except KeyboardInterrupt:
- # aenc.save_weights('ae_bitaware')
- # exit(0)
- #
- # aenc.save_weights('ae_bitaware')
- view_encoder(aenc.encoder, 6, title=f"4bit autoencoder alphabet")
- print("Computing BER/SNR for autoencoder")
- plt.plot(*get_SNR(
- aenc.get_modulator(),
- aenc.get_demodulator(),
- ber_func=get_AWGN_ber,
- samples=1000000, steps=40,
- start=-5, stop=15), '-', label='4Bit AE')
- print("Computing BER/SNR for QAM16")
- plt.plot(*get_SNR(
- AlphabetMod('64qam', 10e6),
- AlphabetDemod('64qam', 10e6),
- ber_func=get_AWGN_ber,
- samples=1000000,
- steps=40,
- start=-5,
- stop=15,
- ), '-', label='16qam AWGN')
- plt.yscale('log')
- plt.grid()
- plt.xlabel('SNR dB')
- plt.ylabel('BER')
- plt.title("16QAM vs autoencoder")
- plt.show()
- def _graphs():
- y = [5.000000e-06, 1.343333e-04, 3.144667e-04, 1.612333e-03, 1.601600e-03, 2.255200e-03, 2.767467e-03]
- x = [50, 60, 75, 80, 85, 90, 90]
- plt.plot(x, y, 'x')
- plt.yscale('log')
- plt.grid()
- plt.xlabel('Fibre length (km)')
- plt.ylabel('BER')
- plt.title("Autoencoder performance")
- plt.show()
- if __name__ == '__main__':
- _graphs()
- # _bit_aware_test()
- # _test_autoencoder_perf()
- # _test_autoencoder_perf_qnn()
- # _test_autoencoder_perf2()
- # _test_autoencoder_pretrain()
- # _test_optics_autoencoder()
|