Min 4 лет назад
Родитель
Сommit
195d651bf9
2 измененных файлов с 519 добавлено и 1 удалено
  1. 0 1
      .gitignore
  2. 519 0
      tests/min_test.py

+ 0 - 1
.gitignore

@@ -5,4 +5,3 @@ __pycache__
 
 # Environments
 venv/
-tests/local_test.py

+ 519 - 0
tests/min_test.py

@@ -0,0 +1,519 @@
+"""
+These are some unstructured tests. Feel free to use this code for anything else
+"""
+
+import logging
+import pathlib
+from itertools import chain
+from sys import stdout
+
+import defs
+from graphs import get_SNR, get_AWGN_ber
+from models import basic
+from models.autoencoder import Autoencoder, view_encoder
+import matplotlib.pyplot as plt
+import tensorflow as tf
+import misc
+import numpy as np
+
+from models.basic import AlphabetDemod, AlphabetMod
+from models.optical_channel import OpticalChannel
+from models.quantized_net import QuantizedNeuralNetwork
+
+
+
+
+
+def _test_optics_autoencoder():
+    ch = OpticalChannel(
+        noise_level=-10,
+        dispersion=-21.7,
+        symbol_rate=10e9,
+        sample_rate=400e9,
+        length=10,
+        pulse_shape='rcos',
+        sqrt_out=True
+    )
+
+    tf.executing_eagerly()
+
+    aenc = Autoencoder(4, channel=ch)
+    aenc.train(samples=1e6)
+    plt.plot(*get_SNR(
+        aenc.get_modulator(),
+        aenc.get_demodulator(),
+        ber_func=get_AWGN_ber,
+        samples=100000,
+        steps=50,
+        start=-5,
+        stop=15
+    ), '-', label='AE')
+
+    plt.plot(*get_SNR(
+        AlphabetMod('4pam', 10e6),
+        AlphabetDemod('4pam', 10e6),
+        samples=30000,
+        steps=50,
+        start=-5,
+        stop=15,
+        length=1,
+        pulse_shape='rcos'
+    ), '-', label='4PAM')
+
+    plt.yscale('log')
+    plt.grid()
+    plt.xlabel('SNR dB')
+    plt.title("Autoencoder Performance")
+    plt.legend()
+    plt.savefig('optics_autoencoder.eps', format='eps')
+    plt.show()
+
+
+
+def _test_autoencoder_pretrain():
+    # aenc = Autoencoder(4, -25)
+    # aenc.train(samples=1e6)
+    # plt.plot(*get_SNR(
+    #     aenc.get_modulator(),
+    #     aenc.get_demodulator(),
+    #     ber_func=get_AWGN_ber,
+    #     samples=100000,
+    #     steps=50,
+    #     start=-5,
+    #     stop=15
+    # ), '-', label='Random AE')
+
+    aenc = Autoencoder(4, -25)
+    # aenc.fit_encoder('16qam', 3e4)
+    aenc.fit_decoder('16qam', 1e5)
+
+    plt.plot(*get_SNR(
+        aenc.get_modulator(),
+        aenc.get_demodulator(),
+        ber_func=get_AWGN_ber,
+        samples=100000,
+        steps=50,
+        start=-5,
+        stop=15
+    ), '-', label='16QAM Pre-trained AE')
+
+    aenc.train(samples=3e6)
+    plt.plot(*get_SNR(
+        aenc.get_modulator(),
+        aenc.get_demodulator(),
+        ber_func=get_AWGN_ber,
+        samples=100000,
+        steps=50,
+        start=-5,
+        stop=15
+    ), '-', label='16QAM Post-trained AE')
+
+    plt.plot(*get_SNR(
+        AlphabetMod('16qam', 10e6),
+        AlphabetDemod('16qam', 10e6),
+        ber_func=get_AWGN_ber,
+        samples=100000,
+        steps=50,
+        start=-5,
+        stop=15
+    ), '-', label='16QAM')
+
+    plt.yscale('log')
+    plt.grid()
+    plt.xlabel('SNR dB')
+    plt.title("4Bit Autoencoder Performance")
+    plt.legend()
+    plt.show()
+
+
+class LiteTFMod(defs.Modulator):
+    def __init__(self, name, autoencoder):
+        super().__init__(2 ** autoencoder.N)
+        self.autoencoder = autoencoder
+        tflite_models_dir = pathlib.Path("/tmp/tflite/")
+        tflite_model_file = tflite_models_dir / (name + ".tflite")
+        self.interpreter = tf.lite.Interpreter(model_path=str(tflite_model_file))
+        self.interpreter.allocate_tensors()
+        pass
+
+    def forward(self, binary: np.ndarray):
+        reshaped = binary.reshape((-1, (self.N * self.autoencoder.parallel)))
+        reshaped_ho = misc.bit_matrix2one_hot(reshaped)
+
+        input_index = self.interpreter.get_input_details()[0]["index"]
+        input_dtype = self.interpreter.get_input_details()[0]["dtype"]
+        input_shape = self.interpreter.get_input_details()[0]["shape"]
+        output_index = self.interpreter.get_output_details()[0]["index"]
+        output_shape = self.interpreter.get_output_details()[0]["shape"]
+
+        x = np.zeros((len(reshaped_ho), output_shape[1]))
+        for i, ho in enumerate(reshaped_ho):
+            self.interpreter.set_tensor(input_index, ho.reshape(input_shape).astype(input_dtype))
+            self.interpreter.invoke()
+            x[i] = self.interpreter.get_tensor(output_index)
+
+        if self.autoencoder.bipolar:
+            x = x * 2 - 1
+
+        if self.autoencoder.parallel > 1:
+            x = x.reshape((-1, self.autoencoder.signal_dim))
+
+        f = np.zeros(x.shape[0])
+        if self.autoencoder.signal_dim <= 1:
+            p = np.zeros(x.shape[0])
+        else:
+            p = x[:, 1]
+        x3 = misc.rect2polar(np.c_[x[:, 0], p, f])
+        return basic.RFSignal(x3)
+
+
+class LiteTFDemod(defs.Demodulator):
+    def __init__(self, name, autoencoder):
+        super().__init__(2 ** autoencoder.N)
+        self.autoencoder = autoencoder
+        tflite_models_dir = pathlib.Path("/tmp/tflite/")
+        tflite_model_file = tflite_models_dir / (name + ".tflite")
+        self.interpreter = tf.lite.Interpreter(model_path=str(tflite_model_file))
+        self.interpreter.allocate_tensors()
+
+    def forward(self, values: defs.Signal) -> np.ndarray:
+        if self.autoencoder.signal_dim <= 1:
+            val = values.rect_x
+        else:
+            val = values.rect
+        if self.autoencoder.parallel > 1:
+            val = val.reshape((-1, self.autoencoder.parallel))
+
+        input_index = self.interpreter.get_input_details()[0]["index"]
+        input_dtype = self.interpreter.get_input_details()[0]["dtype"]
+        input_shape = self.interpreter.get_input_details()[0]["shape"]
+        output_index = self.interpreter.get_output_details()[0]["index"]
+        output_shape = self.interpreter.get_output_details()[0]["shape"]
+
+        decoded = np.zeros((len(val), output_shape[1]))
+        for i, v in enumerate(val):
+            self.interpreter.set_tensor(input_index, v.reshape(input_shape).astype(input_dtype))
+            self.interpreter.invoke()
+            decoded[i] = self.interpreter.get_tensor(output_index)
+        result = misc.int2bit_array(decoded.argmax(axis=1), self.N * self.autoencoder.parallel)
+        return result.reshape(-1, )
+
+
+def _test_autoencoder_perf():
+    assert float(tf.__version__[:3]) >= 2.3
+
+    # aenc = Autoencoder(3, -15)
+    # aenc.train(samples=1e6)
+    # plt.plot(*get_SNR(
+    #     aenc.get_modulator(),
+    #     aenc.get_demodulator(),
+    #     ber_func=get_AWGN_ber,
+    #     samples=100000,
+    #     steps=50,
+    #     start=-5,
+    #     stop=15
+    # ), '-', label='3Bit AE')
+
+    # aenc = Autoencoder(4, -25, bipolar=True, dtype=tf.float64)
+    # aenc.train(samples=5e5)
+    # plt.plot(*get_SNR(
+    #     aenc.get_modulator(),
+    #     aenc.get_demodulator(),
+    #     ber_func=get_AWGN_ber,
+    #     samples=100000,
+    #     steps=50,
+    #     start=-5,
+    #     stop=15
+    # ), '-', label='4Bit AE F64')
+
+    aenc = Autoencoder(4, -25, bipolar=True)
+    aenc.train(epoch_size=1e3, epochs=10)
+    # #
+    m = aenc.N * aenc.parallel
+    x_train = misc.bit_matrix2one_hot(misc.generate_random_bit_array(100*m).reshape((-1, m)))
+    x_train_enc = aenc.encoder(x_train)
+    x_train = tf.cast(x_train, tf.float32)
+    #
+    # plt.plot(*get_SNR(
+    #     aenc.get_modulator(),
+    #     aenc.get_demodulator(),
+    #     ber_func=get_AWGN_ber,
+    #     samples=100000,
+    #     steps=50,
+    #     start=-5,
+    #     stop=15
+    # ), '-', label='4AE F32')
+    # # #
+    def save_tfline(model, name, types=None, ops=None, io_types=None, train_x=None):
+        converter = tf.lite.TFLiteConverter.from_keras_model(model)
+        if types is not None:
+            converter.optimizations = [tf.lite.Optimize.DEFAULT]
+            converter.target_spec.supported_types = types
+        if ops is not None:
+            converter.optimizations = [tf.lite.Optimize.DEFAULT]
+            converter.target_spec.supported_ops = ops
+        if io_types is not None:
+            converter.inference_input_type = io_types
+            converter.inference_output_type = io_types
+        if train_x is not None:
+            def representative_data_gen():
+                for input_value in tf.data.Dataset.from_tensor_slices(train_x).batch(1).take(100):
+                    yield [input_value]
+            converter.representative_dataset = representative_data_gen
+        tflite_model = converter.convert()
+        tflite_models_dir = pathlib.Path("/tmp/tflite/")
+        tflite_models_dir.mkdir(exist_ok=True, parents=True)
+        tflite_model_file = tflite_models_dir / (name + ".tflite")
+        tflite_model_file.write_bytes(tflite_model)
+
+    print("Saving models")
+
+    save_tfline(aenc.encoder, "default_enc")
+    save_tfline(aenc.decoder, "default_dec")
+    #
+    # save_tfline(aenc.encoder, "float16_enc", [tf.float16])
+    # save_tfline(aenc.decoder, "float16_dec", [tf.float16])
+    #
+    # save_tfline(aenc.encoder, "bfloat16_enc", [tf.bfloat16])
+    # save_tfline(aenc.decoder, "bfloat16_dec", [tf.bfloat16])
+
+    INT16X8 = tf.lite.OpsSet.EXPERIMENTAL_TFLITE_BUILTINS_ACTIVATIONS_INT16_WEIGHTS_INT8
+    save_tfline(aenc.encoder, "int16x8_enc", ops=[INT16X8], train_x=x_train)
+    save_tfline(aenc.decoder, "int16x8_dec", ops=[INT16X8], train_x=x_train_enc)
+
+    # save_tfline(aenc.encoder, "int8_enc", ops=[tf.lite.OpsSet.TFLITE_BUILTINS_INT8], io_types=tf.uint8, train_x=x_train)
+    # save_tfline(aenc.decoder, "int8_dec", ops=[tf.lite.OpsSet.TFLITE_BUILTINS_INT8], io_types=tf.uint8, train_x=x_train_enc)
+
+    print("Testing BER vs SNR")
+    plt.plot(*get_SNR(
+        LiteTFMod("default_enc", aenc),
+        LiteTFDemod("default_dec", aenc),
+        ber_func=get_AWGN_ber,
+        samples=100000,
+        steps=50,
+        start=-5,
+        stop=15
+    ), '-', label='4AE F32')
+
+    # plt.plot(*get_SNR(
+    #     LiteTFMod("float16_enc", aenc),
+    #     LiteTFDemod("float16_dec", aenc),
+    #     ber_func=get_AWGN_ber,
+    #     samples=100000,
+    #     steps=50,
+    #     start=-5,
+    #     stop=15
+    # ), '-', label='4AE F16')
+    # #
+    # plt.plot(*get_SNR(
+    #     LiteTFMod("bfloat16_enc", aenc),
+    #     LiteTFDemod("bfloat16_dec", aenc),
+    #     ber_func=get_AWGN_ber,
+    #     samples=100000,
+    #     steps=50,
+    #     start=-5,
+    #     stop=15
+    # ), '-', label='4AE BF16')
+    #
+    plt.plot(*get_SNR(
+        LiteTFMod("int16x8_enc", aenc),
+        LiteTFDemod("int16x8_dec", aenc),
+        ber_func=get_AWGN_ber,
+        samples=100000,
+        steps=50,
+        start=-5,
+        stop=15
+    ), '-', label='4AE I16x8')
+
+    # plt.plot(*get_SNR(
+    #     AlphabetMod('16qam', 10e6),
+    #     AlphabetDemod('16qam', 10e6),
+    #     ber_func=get_AWGN_ber,
+    #     samples=100000,
+    #     steps=50,
+    #     start=-5,
+    #     stop=15,
+    # ), '-', label='16qam')
+
+    plt.yscale('log')
+    plt.grid()
+    plt.xlabel('SNR dB')
+    plt.ylabel('BER')
+    plt.title("Autoencoder with different precision data types")
+    plt.legend()
+    plt.savefig('autoencoder_compression.eps', format='eps')
+    plt.show()
+
+    view_encoder(aenc.encoder, 4)
+
+    # aenc = Autoencoder(5, -25)
+    # aenc.train(samples=2e6)
+    # plt.plot(*get_SNR(
+    #     aenc.get_modulator(),
+    #     aenc.get_demodulator(),
+    #     ber_func=get_AWGN_ber,
+    #     samples=100000,
+    #     steps=50,
+    #     start=-5,
+    #     stop=15
+    # ), '-', label='5Bit AE')
+    #
+    # aenc = Autoencoder(6, -25)
+    # aenc.train(samples=2e6)
+    # plt.plot(*get_SNR(
+    #     aenc.get_modulator(),
+    #     aenc.get_demodulator(),
+    #     ber_func=get_AWGN_ber,
+    #     samples=100000,
+    #     steps=50,
+    #     start=-5,
+    #     stop=15
+    # ), '-', label='6Bit AE')
+    #
+    # for scheme in ['64qam', '32qam', '16qam', 'qpsk', '8psk']:
+    #     plt.plot(*get_SNR(
+    #         AlphabetMod(scheme, 10e6),
+    #         AlphabetDemod(scheme, 10e6),
+    #         ber_func=get_AWGN_ber,
+    #         samples=100000,
+    #         steps=50,
+    #         start=-5,
+    #         stop=15,
+    #     ), '-', label=scheme.upper())
+    #
+    # plt.yscale('log')
+    # plt.grid()
+    # plt.xlabel('SNR dB')
+    # plt.title("Autoencoder vs defined modulations")
+    # plt.legend()
+    # plt.show()
+
+
+def _test_autoencoder_perf2():
+    aenc = Autoencoder(2, -20)
+    aenc.train(samples=3e6)
+    plt.plot(*get_SNR(aenc.get_modulator(), aenc.get_demodulator(), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15), '-', label='2Bit AE')
+
+    aenc = Autoencoder(3, -20)
+    aenc.train(samples=3e6)
+    plt.plot(*get_SNR(aenc.get_modulator(), aenc.get_demodulator(), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15), '-', label='3Bit AE')
+
+    aenc = Autoencoder(4, -20)
+    aenc.train(samples=3e6)
+    plt.plot(*get_SNR(aenc.get_modulator(), aenc.get_demodulator(), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15), '-', label='4Bit AE')
+
+    aenc = Autoencoder(5, -20)
+    aenc.train(samples=3e6)
+    plt.plot(*get_SNR(aenc.get_modulator(), aenc.get_demodulator(), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15), '-', label='5Bit AE')
+
+    for a in ['qpsk', '8psk', '16qam', '32qam', '64qam']:
+        try:
+            plt.plot(*get_SNR(AlphabetMod(a, 10e6), AlphabetDemod(a, 10e6), ber_func=get_AWGN_ber, samples=100000, steps=50, start=-5, stop=15,), '-', label=a.upper())
+        except KeyboardInterrupt:
+            break
+        except Exception:
+            pass
+
+    plt.yscale('log')
+    plt.grid()
+    plt.xlabel('SNR dB')
+    plt.title("Autoencoder vs defined modulations")
+    plt.legend()
+    plt.savefig('autoencoder_mods.eps', format='eps')
+    plt.show()
+
+    # view_encoder(aenc.encoder, 2)
+
+
+def _test_autoencoder_perf_qnn():
+    fh = logging.FileHandler("model_quantizing.log", mode="w+")
+    fh.setLevel(logging.INFO)
+    sh = logging.StreamHandler(stream=stdout)
+    sh.setLevel(logging.INFO)
+
+    logger = logging.getLogger(__name__)
+    logger.setLevel(level=logging.INFO)
+    logger.addHandler(fh)
+    logger.addHandler(sh)
+
+    aenc = Autoencoder(4, -25, bipolar=True)
+    # aenc.train(epoch_size=1e3, epochs=10)
+    # aenc.encoder.save_weights('ae_enc.bin')
+    # aenc.decoder.save_weights('ae_dec.bin')
+    # aenc.encoder.load_weights('ae_enc.bin')
+    # aenc.decoder.load_weights('ae_dec.bin')
+    aenc.load_weights('autoencoder.bin')
+    aenc.compile(optimizer='adam', loss=tf.losses.MeanSquaredError())
+    aenc.build()
+    aenc.summary()
+
+    m = aenc.N * aenc.parallel
+    view_encoder(aenc.encoder, 4, title="FP32 Alphabet")
+
+    batch_size = 25000
+    x_train = misc.bit_matrix2one_hot(misc.generate_random_bit_array(batch_size*m).reshape((-1, m)))
+    x_test = misc.bit_matrix2one_hot(misc.generate_random_bit_array(5000*m).reshape((-1, m)))
+    bits = [np.log2(i) for i in (32,)][0]
+    alphabet_scalars = 2  # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
+    num_layers = sum([layer.__class__.__name__ in ('Dense',) for layer in aenc.all_layers])
+
+    # for b in (3, 6, 8, 12, 16, 24, 32, 48, 64):
+    get_data = (sample for sample in x_train)
+    for i in range(num_layers):
+        get_data = chain(get_data, (sample for sample in x_train))
+
+    qnn = QuantizedNeuralNetwork(
+        network=aenc,
+        batch_size=batch_size,
+        get_data=get_data,
+        logger=logger,
+        bits=np.log2(16),
+        alphabet_scalar=alphabet_scalars,
+    )
+
+    qnn.quantize_network()
+    qnn.quantized_net.compile(optimizer='adam', loss=tf.keras.losses.MeanSquaredError())
+    view_encoder(qnn.quantized_net, 4, title=f"Quantised {16}b alphabet")
+
+
+
+
+    # view_encoder(qnn_enc.quantized_net, 4, title=f"Quantised {b}b alphabet")
+
+
+    # _, q_accuracy = qnn.quantized_net.evaluate(x_test, x_test, verbose=True)
+    pass
+
+if __name__ == '__main__':
+
+
+    # plt.plot(*get_SNR(
+    #     AlphabetMod('16qam', 10e6),
+    #     AlphabetDemod('16qam', 10e6),
+    #     ber_func=get_AWGN_ber,
+    #     samples=100000,
+    #     steps=50,
+    #     start=-5,
+    #     stop=15,
+    # ), '-', label='16qam AWGN')
+    #
+    # plt.plot(*get_SNR(
+    #     AlphabetMod('16qam', 10e6),
+    #     AlphabetDemod('16qam', 10e6),
+    #     samples=100000,
+    #     steps=50,
+    #     start=-5,
+    #     stop=15,
+    # ), '-', label='16qam OPTICAL')
+    #
+    # plt.yscale('log')
+    # plt.grid()
+    # plt.xlabel('SNR dB')
+    # plt.show()
+
+    # _test_autoencoder_perf()
+    _test_autoencoder_perf_qnn()
+    # _test_autoencoder_perf2()
+    # _test_autoencoder_pretrain()
+    # _test_optics_autoencoder()
+