Compare commits

...

4 Commits

  1. 18
      code/python/models/commands-schema.yaml
  2. 143
      code/python/src/algorithms/random_walk/algorithms.py
  3. 16
      code/python/src/endpoints/ep_algorithm_random_walk.py
  4. 24
      code/python/src/models/random_walk/landscape.py
  5. 4
      code/python/src/thirdparty/maths.py

18
code/python/models/commands-schema.yaml

@ -170,9 +170,21 @@ components:
$ref: '#/components/schemas/DataTypeLandscapeGeometry'
optimise:
$ref: '#/components/schemas/EnumOptimiseMode'
coords-init:
description: Initial co-ordinates to start the algorithm.
type: array
items:
type: integer
minItems: 1
temperature-init:
type: float
default: 1.
annealing:
type: boolean
default: false
one-based:
type: boolean
default: false
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Algorithm: Genetic Algorithm
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@ -243,10 +255,16 @@ components:
type: object
required:
- neighbourhoods
- labels
- values
properties:
neighbourhoods:
$ref: '#/components/schemas/DataTypeLandscapeNeighbourhoods'
labels:
type: array
items:
type: string
minItems: 1
values:
$ref: '#/components/schemas/DataTypeLandscapeValues'
DataTypeLandscapeNeighbourhoods:

143
code/python/src/algorithms/random_walk/algorithms.py

@ -32,14 +32,50 @@ __all__ = [
def adaptive_walk_algorithm(
landscape: Landscape,
r: float,
coords_init: tuple,
optimise: EnumOptimiseMode,
verbose: bool,
):
'''
Führt den Adapative-Walk Algorithmus aus, um ein lokales Minimum zu bestimmen.
'''
log_warn('Noch nicht implementiert!');
return;
# lege Fitness- und Umgebungsfunktionen fest:
match optimise:
case EnumOptimiseMode.max:
f = lambda x: -landscape.fitness(*x);
case _:
f = lambda x: landscape.fitness(*x);
nbhd = lambda x: landscape.neighbourhood(*x, r=r, strict=True);
label = lambda x: landscape.label(*x);
# initialisiere
x = coords_init;
fx = f(x);
fy = fx;
N = nbhd(x);
# führe walk aus:
while True:
# Wähle zufälligen Punkt und berechne fitness-Wert:
y = uniform_random_choice(N);
fy = f(y);
# Nur dann aktualisieren, wenn sich f-Wert verbessert:
if fy < fx:
# Punkt + Umgebung + f-Wert aktualisieren
x = y;
fx = fy;
N = nbhd(x);
else:
# Nichts machen!
pass;
# Nur dann (erfolgreich) abbrechen, wenn f-Wert lokal Min:
if fx <= min([f(y) for y in N], default=fx):
break;
return x;
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# METHOD gradient walk
@ -48,14 +84,56 @@ def adaptive_walk_algorithm(
def gradient_walk_algorithm(
landscape: Landscape,
r: float,
coords_init: tuple,
optimise: EnumOptimiseMode,
verbose: bool,
):
'''
Führt den Gradient-Descent (bzw. Ascent) Algorithmus aus, um ein lokales Minimum zu bestimmen.
'''
log_warn('Noch nicht implementiert!');
return;
# lege Fitness- und Umgebungsfunktionen fest:
match optimise:
case EnumOptimiseMode.max:
f = lambda x: -landscape.fitness(*x);
case _:
f = lambda x: landscape.fitness(*x);
nbhd = lambda x: landscape.neighbourhood(*x, r=r, strict=True);
label = lambda x: landscape.label(*x);
# initialisiere
x = coords_init;
fx = landscape.fitness(*x);
fy = fx;
N = nbhd(x);
f_values = [f(y) for y in N];
fmin = min(f_values);
Z = [y for y, fy in zip(N, f_values) if fy == fmin];
# führe walk aus:
while True:
# Wähle zufälligen Punkt mit steilstem Abstieg und berechne fitness-Wert:
y = uniform_random_choice(Z);
fy = fmin;
# Nur dann aktualisieren, wenn sich f-Wert verbessert:
if fy < fx:
# Punkt + Umgebung + f-Wert aktualisieren
x = y;
fx = fy;
N = nbhd(y);
f_values = [f(y) for y in N];
fmin = min(f_values);
Z = [y for y, fy in zip(N, f_values) if fy == fmin];
else:
# Nichts machen!
pass;
# Nur dann (erfolgreich) abbrechen, wenn f-Wert lokal Min:
if fx <= min([f(y) for y in N], default=fx):
break;
return x;
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# METHOD metropolis walk
@ -64,6 +142,8 @@ def gradient_walk_algorithm(
def metropolis_walk_algorithm(
landscape: Landscape,
r: float,
coords_init: tuple,
T: float,
annealing: bool,
optimise: EnumOptimiseMode,
verbose: bool,
@ -71,5 +151,56 @@ def metropolis_walk_algorithm(
'''
Führt den Metropolis-Walk Algorithmus aus, um ein lokales Minimum zu bestimmen.
'''
log_warn('Noch nicht implementiert!');
return;
# lege Fitness- und Umgebungsfunktionen fest:
match optimise:
case EnumOptimiseMode.max:
f = lambda x: -landscape.fitness(*x);
case _:
f = lambda x: landscape.fitness(*x);
nbhd = lambda x: landscape.neighbourhood(*x, r=r, strict=True);
label = lambda x: landscape.label(*x);
# initialisiere
x = coords_init;
fx = f(x);
fy = fx;
nbhd_x = nbhd(x);
# führe walk aus:
k = 0;
while True:
# Wähle zufälligen Punkt und berechne fitness-Wert:
y = uniform_random_choice(nbhd_x);
r = uniform(0,1);
fy = f(y);
# Nur dann aktualisieren, wenn sich f-Wert verbessert:
if fy < fx or r < math.exp(-(fy-fx)/T):
# Punkt + Umgebung + f-Wert aktualisieren
x = y;
fx = fy;
nbhd_x = nbhd(x);
else:
# Nichts machen!
pass;
# »Temperatur« ggf. abkühlen:
if annealing:
T = cool_temperature(T, k);
# Nur dann (erfolgreich) abbrechen, wenn f-Wert lokal Min:
if fx <= min([f(y) for y in nbhd_x], default=fx):
break;
k += 1;
return x;
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# AUXILIARY METHODS
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def cool_temperature(T: float, k: int, const: float = 1.) -> float:
harm = const*(k + 1);
return T/(1 + T/harm);

16
code/python/src/endpoints/ep_algorithm_random_walk.py

@ -27,15 +27,28 @@ __all__ = [
@run_safely()
def endpoint_random_walk(command: CommandRandomWalk) -> Result[CallResult, CallError]:
# Compute landscape (fitness fct + topology) + initial co-ordinates:
one_based = command.one_based;
landscape = Landscape(
values = command.landscape.values,
labels = command.landscape.labels,
metric = command.landscape.neighbourhoods.metric,
one_based = one_based,
);
if isinstance(command.coords_init, list):
coords_init = tuple(command.coords_init);
if one_based:
coords_init = tuple(xx - 1 for xx in coords_init);
assert len(coords_init) == landscape.dim, 'Dimension of initial co-ordinations inconsistent with landscape!';
else:
coords_init = landscape.coords_middle;
match command.algorithm:
case EnumWalkMode.adaptive:
result = adaptive_walk_algorithm(
landscape = landscape,
r = command.landscape.neighbourhoods.radius,
coords_init = coords_init,
optimise = command.optimise,
verbose = config.OPTIONS.random_walk.verbose
);
@ -43,6 +56,7 @@ def endpoint_random_walk(command: CommandRandomWalk) -> Result[CallResult, CallE
result = gradient_walk_algorithm(
landscape = landscape,
r = command.landscape.neighbourhoods.radius,
coords_init = coords_init,
optimise = command.optimise,
verbose = config.OPTIONS.random_walk.verbose
);
@ -50,6 +64,8 @@ def endpoint_random_walk(command: CommandRandomWalk) -> Result[CallResult, CallE
result = metropolis_walk_algorithm(
landscape = landscape,
r = command.landscape.neighbourhoods.radius,
coords_init = coords_init,
T = command.temperature_init,
annealing = command.annealing,
optimise = command.optimise,
verbose = config.OPTIONS.random_walk.verbose

24
code/python/src/models/random_walk/landscape.py

@ -27,16 +27,23 @@ __all__ = [
class Landscape():
_fct: np.ndarray;
_labels: list[str];
_metric: EnumLandscapeMetric;
_radius: float;
_one_based: bool;
def __init__(
self,
values: DataTypeLandscapeValues,
labels: List[str],
metric: EnumLandscapeMetric = EnumLandscapeMetric.maximum,
one_based: bool = False,
):
self._fct = convert_to_nparray(values);
assert len(labels) == self.dim, 'A label is required for each axis/dimension!';
self._labels = labels;
self._metric = metric;
self._one_based = one_based;
return;
@property
@ -47,14 +54,27 @@ class Landscape():
def dim(self) -> int:
return len(self._fct.shape);
@property
def coords_middle(self) -> tuple:
return tuple(math.floor(s/2) for s in self.shape);
def fitness(self, *x: int) -> float:
return self._fct[x];
def label(self, *x: int) -> str:
if self._one_based:
x = tuple(xx + 1 for xx in x);
expr = ','.join([ f'{name}{xx}' for name, xx in zip(self._labels, x)]);
if self.dim > 1:
expr = f'({expr})';
return expr;
def neighbourhood(self, *x: int, r: float, strict: bool = False) -> List[tuple]:
r = int(r);
sides = [
[ xx - j for j in range(1,r+1) if xx - j in range(s) ]
[ xx - j for j in range(1, r+1) if xx - j in range(s) ]
+ ([ xx ] if xx in range(s) else [])
+ [ xx + j for j in range(1,r+1) if xx + j in range(s) ]
+ [ xx + j for j in range(1, r+1) if xx + j in range(s) ]
for xx, s in zip(x, self.shape)
];
match self._metric:

4
code/python/src/thirdparty/maths.py vendored

@ -10,6 +10,8 @@ import math;
import numpy as np;
import pandas as pd;
import random;
from random import uniform;
from random import choice as uniform_random_choice;
from tabulate import tabulate;
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@ -22,5 +24,7 @@ __all__ = [
'np',
'pd',
'random',
'uniform',
'uniform_random_choice',
'tabulate',
];

Loading…
Cancel
Save