From 2bd07544f35466d42627efcb75537b862181a9f9 Mon Sep 17 00:00:00 2001 From: raj_mathe Date: Thu, 30 Jun 2022 05:44:16 +0200 Subject: [PATCH] =?UTF-8?q?woche12=20>=20master:=20code=20py=20-=20random?= =?UTF-8?q?=20walks=20erg=C3=A4nzt=20-=20stopkriterien=20-=20logging?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/algorithms/random_walk/algorithms.py | 71 +++++++++++++++---- .../src/models/random_walk/landscape.py | 14 ++++ 2 files changed, 70 insertions(+), 15 deletions(-) diff --git a/code/python/src/algorithms/random_walk/algorithms.py b/code/python/src/algorithms/random_walk/algorithms.py index 7a8ed45..744b594 100644 --- a/code/python/src/algorithms/random_walk/algorithms.py +++ b/code/python/src/algorithms/random_walk/algorithms.py @@ -5,8 +5,9 @@ # IMPORTS # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -from src.thirdparty.types import *; from src.thirdparty.maths import *; +from src.thirdparty.plots import *; +from src.thirdparty.types import *; from models.generated.config import *; from models.generated.commands import *; @@ -25,6 +26,12 @@ __all__ = [ 'metropolis_walk_algorithm', ]; +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# CONSTANTS +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +MAX_ITERATIONS = 1000; # um endlose Schleifen zu verhindern + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # METHOD adaptive walk # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -50,13 +57,15 @@ def adaptive_walk_algorithm( label = lambda x: landscape.label(*x); # initialisiere + steps = []; x = coords_init; fx = f(x); fy = fx; N = nbhd(x); # führe walk aus: - while True: + k = 0; + while k < MAX_ITERATIONS: # Wähle zufälligen Punkt und berechne fitness-Wert: y = uniform_random_choice(N); fy = f(y); @@ -67,14 +76,20 @@ def adaptive_walk_algorithm( x = y; fx = fy; N = nbhd(x); + step = Step(coords=x, label=label(x), improved=True, changed=True); else: - # Nichts machen! - pass; + # Nichts (außer logging) machen! + step = Step(coords=x, label=label(x)); # Nur dann (erfolgreich) abbrechen, wenn f-Wert lokal Min: if fx <= min([f(y) for y in N], default=fx): + step.stopped = True; + steps.append(step); break; + steps.append(step); + k += 1; + return x; # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -102,6 +117,7 @@ def gradient_walk_algorithm( label = lambda x: landscape.label(*x); # initialisiere + steps = []; x = coords_init; fx = landscape.fitness(*x); fy = fx; @@ -111,7 +127,8 @@ def gradient_walk_algorithm( Z = [y for y, fy in zip(N, f_values) if fy == fmin]; # führe walk aus: - while True: + k = 0; + while k < MAX_ITERATIONS: # Wähle zufälligen Punkt mit steilstem Abstieg und berechne fitness-Wert: y = uniform_random_choice(Z); fy = fmin; @@ -125,14 +142,20 @@ def gradient_walk_algorithm( f_values = [f(y) for y in N]; fmin = min(f_values); Z = [y for y, fy in zip(N, f_values) if fy == fmin]; + step = Step(coords=x, label=label(x), improved=True, changed=True); else: - # Nichts machen! - pass; + # Nichts (außer logging) machen! + step = Step(coords=x, label=label(x)); # Nur dann (erfolgreich) abbrechen, wenn f-Wert lokal Min: if fx <= min([f(y) for y in N], default=fx): + step.stopped = True; + steps.append(step); break; + steps.append(step); + k += 1; + return x; # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -161,46 +184,64 @@ def metropolis_walk_algorithm( nbhd = lambda x: landscape.neighbourhood(*x, r=r, strict=True); label = lambda x: landscape.label(*x); + # definiere anzahl der hinreichenden Schritt für Stabilität: + n_stable = 2*(3**(landscape.dim) - 1); + # initialisiere x = coords_init; fx = f(x); fy = fx; nbhd_x = nbhd(x); + steps = []; + step = Step(coords=x, label=label(x)); # führe walk aus: k = 0; - while True: + n_unchanged = 0; + while k < MAX_ITERATIONS: # Wähle zufälligen Punkt und berechne fitness-Wert: y = uniform_random_choice(nbhd_x); - r = uniform(0,1); fy = f(y); + p = math.exp(-abs(fy-fx)/T); + u = random_binary(p); - # Nur dann aktualisieren, wenn sich f-Wert verbessert: - if fy < fx or r < math.exp(-(fy-fx)/T): + # Aktualisieren, wenn sich f-Wert verbessert + # oder mit einer Wahrscheinlichkeit von p: + if fy < fx or u: # Punkt + Umgebung + f-Wert aktualisieren x = y; fx = fy; nbhd_x = nbhd(x); + n_unchanged = 0; + step = Step(coords=x, label=label(x), improved=(fy < fx), chance=u, probability=p, changed=True); else: - # Nichts machen! - pass; + # Nichts (außer logging) machen! + n_unchanged += 1; + step = Step(coords=x, label=label(x)); # »Temperatur« ggf. abkühlen: if annealing: T = cool_temperature(T, k); # Nur dann (erfolgreich) abbrechen, wenn f-Wert lokal Min: - if fx <= min([f(y) for y in nbhd_x], default=fx): + if n_unchanged >= n_stable: + step.stopped = True; + steps.append(step); break; + steps.append(step); k += 1; + if verbose: + for step in steps: + print(step); + return x; # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # AUXILIARY METHODS # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -def cool_temperature(T: float, k: int, const: float = 1.) -> float: +def cool_temperature(T: float, k: int, const: float = 2.) -> float: harm = const*(k + 1); return T/(1 + T/harm); diff --git a/code/python/src/models/random_walk/landscape.py b/code/python/src/models/random_walk/landscape.py index dad9239..9332daa 100644 --- a/code/python/src/models/random_walk/landscape.py +++ b/code/python/src/models/random_walk/landscape.py @@ -58,9 +58,23 @@ class Landscape(): def coords_middle(self) -> tuple: return tuple(math.floor(s/2) for s in self.shape); + @property + def values(self) -> np.ndarray: + return self._fct; + def fitness(self, *x: int) -> float: return self._fct[x]; + def axis_label(self, i: int, x: int) -> str: + if self._one_based: + x = x + 1; + name = self._labels[i]; + return f'{name}{x}'; + + def axis_labels(self, i: int) -> str: + s = self.shape[i]; + return [ self.axis_label(i, x) for x in range(s) ]; + def label(self, *x: int) -> str: if self._one_based: x = tuple(xx + 1 for xx in x);