Source code for tsx.utils

import torch
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from tqdm import trange
from itertools import combinations
from typing import Union

def to_float32(X):
    if isinstance(X, np.ndarray):
        return X.astype(np.float32)
    if isinstance(X, torch.Tensor):
        return X.float()

    raise RuntimeError(f'Cannot cast type {type(X)} to float32')

def to_int64(X):
    if isinstance(X, np.ndarray):
        return X.astype(np.int64)
    if isinstance(X, torch.Tensor):
        return X.long()

    raise RuntimeError(f'Cannot cast type {type(X)} to int64')

class NSGA2:
    # based on implementation from https://github.com/haris989/NSGA-II/blob/master/NSGA%20II.py
    # NSGA2 is configured to always minimize (with 0 being optimal), so make sure to configure your criteria functions accordingly

    def __init__(self, parent_size=10, offspring_size=10, dimensions=3, generations=10, log_generations=False):
        self.parent_size = parent_size
        self.offspring_size = offspring_size
        self.dimensions = dimensions
        self.generations = generations
        self.log_generations = log_generations

    def set_criterias(self, criterias):
        self.criterias = criterias

    def _random_individuals(self, n):
        return np.random.binomial(1, p=0.5, size=3*n).reshape(n, 3)

    def _apply_functions(self, X):
        result = np.zeros((len(X), len(self.criterias)))
        for i, f in enumerate(self.criterias):
            result[:, i] = f(X)

        return result

    # input: (batch_size, len(self.criterias))
    def fast_non_dominated_sort(self, individual_performs):
        fronts = []
        indices = np.arange(len(individual_performs))
        while len(indices) != 0:
            permutations = list(combinations(np.arange(len(indices)), 2))
            domination_count = np.zeros(len(indices))
            dominates = [[] for _ in range(len(indices))]
            for (ix1, ix2) in permutations:
                if self._dominates(individual_performs[ix1], individual_performs[ix2]):
                    domination_count[ix2] += 1
                    dominates[ix1].append(ix2)
                if self._dominates(individual_performs[ix2], individual_performs[ix1]):
                    domination_count[ix1] += 1
                    dominates[ix2].append(ix1)

            domination_mask = domination_count == 0
            inv_domination_mask = np.logical_not(domination_mask)
            non_dominated = np.where(domination_mask)[0]
            for ix in non_dominated:
                for i in dominates[ix]:
                    domination_count[i] -= 1
            if np.all(inv_domination_mask):
                fronts.append(indices[inv_domination_mask])
                return fronts
            fronts.append(indices[non_dominated])
            indices = indices[inv_domination_mask]
        return fronts

    def recombination(self, x):
        return self._random_individuals(len(x))

    def mutation(self, x):
        return self._random_individuals(len(x))

    def run(self, guide=None):
        parents = self._random_individuals(self.parent_size, guide=guide)
        offspring = self._random_individuals(self.offspring_size, guide=guide)

        mean_metrics = np.zeros((self.generations, len(self.criterias)))
        var_metrics = np.zeros((self.generations, len(self.criterias)))

        range_generations = range(self.generations)
        if not self.log_generations:
            range_generations = trange(self.generations)

        for g in range_generations:
            offspring = self.recombination(parents.copy())
            offspring = self.mutation(offspring)
            population = np.concatenate((parents, offspring))

            population = np.unique(population, axis=0)
            assert len(population) >= self.parent_size

            evaluation = self._apply_functions(population)
            fronts = self.fast_non_dominated_sort(evaluation)

            parent_indices = []
            for i, front in enumerate(fronts):

                if (len(front) + len(parent_indices)) <= self.parent_size:
                    # take entire front
                    parent_indices.extend(list(front))
                else:
                    # do selection
                    cd = self.crowding_distance(evaluation[front])

                    # randomized argsort descending
                    perm = np.random.permutation(len(cd))
                    sorted_indices = np.argsort(cd[perm])
                    sorted_indices = np.flip(perm[sorted_indices])
                    sorted_indices = sorted_indices[:(self.parent_size - len(parent_indices))]
                    parent_indices.extend(list(front[sorted_indices]))

                    # because of duplicate removal: maybe needs another round
                    if len(parent_indices) == self.parent_size:
                        break
                    if len(parent_indices) > self.parent_size:
                        raise RuntimeError("Cannot have more parents than specified")

            parents = population[parent_indices]
            evaluation = evaluation[parent_indices]

            mean_metrics[g] = np.mean(evaluation, axis=0)
            var_metrics[g] = np.var(evaluation, axis=0)

            if self.log_generations:
                print("GENERATION {}".format(g+1))
                print(evaluation)
                print("-"*20)
        return evaluation, parents

    def crowding_distance(self, individuals):
        n_ind, n_obj = individuals.shape

        eps = 1e-9 # to prevent possible division by zero
        distances = np.zeros(n_ind)

        for c in range(n_obj):

            sorted_indices = np.argsort(individuals[:, c])
            distances[sorted_indices[0]] += np.inf
            distances[sorted_indices[-1]] += np.inf
            normalization = np.max(individuals[:, c]) - np.min(individuals[:, c])
            normalization = eps if normalization == 0 else normalization

            for j in range(1, n_ind-1):
                dist = individuals[sorted_indices[j+1]][c] - individuals[sorted_indices[j-1]][c]
                dist /= normalization
                distances[sorted_indices[j]] += dist

        return distances
        
    def _dominates(self, a, b):
        return np.all(a <= b) and np.any(a < b)

def to_numpy(x):
    if isinstance(x, type(torch.zeros(1))):
        if x.requires_grad:
            return x.detach().numpy()
        else:
            return x.numpy()
    if isinstance(x, type(pd.Series(data=[1,2]))):
        return x.to_numpy()
    if isinstance(x, type(np.zeros(1))):
        return x

    raise ValueError("Input of type {} cannot be converted to numpy array".format(type(x)))

def sigmoid(x):
    return 1/(1 + np.exp(-x)) 

def prepare_for_pytorch(x, batch=True, channel=True):
    if isinstance(x, type(np.zeros(0))):
        x = torch.from_numpy(x)
    
    # Missing batch and channel information
    if batch and len(x.shape) == 1:
        x = x.unsqueeze(0)

    if channel and len(x.shape) == 2:
        x = x.unsqueeze(1)

    return x

[docs]def to_random_state(rs: Union[int, None, np.random.Generator]): ''' Return `np.random.Generator` object from input Args: rs: Something that `np.random.default_rng` can process. Returns: A `np.random.default_rng` object ''' rs = np.random.default_rng(rs) return rs
[docs]def get_device(): ''' Return the "best" device in the following order: If a GPU is available, return "cuda". If Metal is available, return "mps" Otherwise, return "cpu" Returns: String indicating best possible device for Torch Tensors ''' device = 'cpu' device = 'mps' if torch.backends.mps.is_available() and torch.backends.mps.is_built() else device device = 'cuda' if torch.cuda.is_available() else device return device