main > main: src - init

This commit is contained in:
RD
2022-10-10 11:17:47 +02:00
parent 9c6ee783fd
commit e560119bb0
17 changed files with 785 additions and 0 deletions

0
src/core/__init__.py Normal file
View File

215
src/core/calls.py Normal file
View File

@@ -0,0 +1,215 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# IMPORTS
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
from __future__ import annotations;
from src.thirdparty.code import *;
from src.thirdparty.config import *;
from src.thirdparty.types import *;
from src.core.utils import *;
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# EXPORTS
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
__all__ = [
'CallState',
'GetState',
'CallValue',
'CallError',
'keep_calm_and_carry_on',
'run_safely',
'to_async',
];
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# CONSTANTS
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# local usage only
T = TypeVar('T');
V = TypeVar('V');
ARGS = ParamSpec('ARGS');
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Class State for keeping track of results in the course of a computation
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@dataclass
class CallState(Generic[V]):
'''
An auxiliary class which keeps track of the latest state of during calls.
'''
tag: Option[str] = field(default=None);
result: Optional[V] = field(default=None, repr=False);
timestamp: str = field(default_factory=get_timestamp_string);
has_action: bool = field(default=False);
no_action: bool = field(default=False);
has_error: bool = field(default=False);
values: list[tuple[bool, dict]] = field(default_factory=list, repr=False);
errors: list[str] = field(default_factory=list, repr=False);
def __copy__(self) -> CallState:
return CallState(**asdict(self));
def __add__(self, other) -> CallState:
'''
Combines states sequentially:
- takes on latest `timestamp`.
- takes on lates `value`.
- takes on `tag` of latest non-null value.
- takes on latest value of `no_action`
- `has_action` = `true` <==> at least one action taken,
unless `no_action` = `true`.
- `has_error` = `true` <==> at least one error occurred.
'''
if isinstance(other, CallState):
no_action = other.no_action;
has_action = False if no_action else (self.has_action or other.has_action);
return CallState(
tag = other.tag or self.tag,
value = other.value,
timestamp = other.timestamp,
has_action = has_action,
no_action = no_action,
has_error = self.has_error or other.has_error,
values = self.values + other.values,
errors = self.errors + other.errors,
);
raise Exception('Cannot add states!');
## NOTE: only need __radd__ for additions of form <other> + <state>
def __radd__(self, other) -> CallState:
if other == 0:
return self.__copy__();
raise Exception('Cannot add a CallState to the right of a non-zero object!');
def get_result(self) -> V:
if self.result is not None:
return self.result;
raise Exception('No result set!');
@property
def first_data(self) -> dict:
'''
Returns data in first value collected or else defaults to empty dictionary.
'''
return self.values[0][1] if len(self.values) > 0 else dict();
@property
def data(self) -> list[dict]:
'''
Returns the data collected.
'''
return [ data for _, data in self.values ];
@property
def data_log(self) -> list[dict]:
'''
Returns the data to be logged.
'''
return [ data for log, data in self.values if log == True ];
@property
def data_log_json(self) -> list[str]:
'''
Returns the data to be logged as json.
'''
return list(map(json.dumps, self.data_log));
def GetState(result: Result[CallState, CallState]) -> CallState:
if isinstance(result, Ok):
return result.unwrap();
return result.unwrap_err();
def CallValue(
tag: str = None,
result: Optional[V] = None,
has_action: bool = True,
no_action: bool = False,
value: Option[tuple[bool, dict] | list[tuple[bool, dict]]] = Nothing(),
) -> CallState[V]:
x = [];
if isinstance(value, Some):
x = value.unwrap() or [];
x = x if isinstance(x, list) else [ x ];
X = CallState(tag=tag, result=result, values=x, has_action=has_action, no_action=no_action, has_error=False);
return X;
def CallError(
tag: str = None,
has_action: bool = True,
error: Option[str | BaseException | list[str | BaseException]] = Nothing(),
) -> CallState[V]:
x = [];
if isinstance(error, Some):
x = error.unwrap() or [];
x = x if isinstance(x, list) else [ x ];
x = list(map(str, x));
return CallState(tag=tag, errors=x, has_action=has_action, has_error=True);
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# DECORATOR - forces methods to run safely
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def run_safely(tag: str | None = None, error_message: str | None = None):
'''
Creates a decorator for an action to perform it safely.
@inputs (parameters)
- `tag` - optional string to aid error tracking.
- `error_message` - optional string for an error message.
### Example usage ###
```py
@run_safely(tag='recognise int', error_message='unrecognise string')
def action1(x: str) -> Result[int, CallState]:
return Ok(int(x));
assert action1('5') == Ok(5);
result = action1('not a number');
assert isinstance(result, Err);
err = result.unwrap_err();
assert isinstance(err, CallState);
assert err.tag == 'recognise int';
assert err.errors == ['unrecognise string'];
@run_safely('recognise int')
def action2(x: str) -> Result[int, CallState]:
return Ok(int(x));
assert action2('5') == Ok(5);
result = action2('not a number');
assert isinstance(result, Err);
err = result.unwrap_err();
assert isinstance(err, CallState);
assert err.tag == 'recognise int';
assert len(err.errors) == 1;
```
NOTE: in the second example, err.errors is a list containing
the stringified Exception generated when calling `int('not a number')`.
'''
def dec(action: Callable[ARGS, Result[V, CallState]]) -> Callable[ARGS, Result[V, CallState]]:
'''
Wraps action with return type Result[..., CallState],
so that it is performed safely a promise,
catching any internal exceptions as an Err(...)-component of the Result.
'''
@wraps(action)
def wrapped_action(*_, **__) -> Result[V, CallState]:
# NOTE: intercept Exceptions first, then flatten:
return Result.of(lambda: action(*_, **__)) \
.or_else(
lambda err: Err(CallError(
tag = tag or action.__name__,
error = Some(error_message or err),
))
) \
.flatmap(lambda value: value); # necessary to flatten result.
return wrapped_action;
return dec;

61
src/core/env.py Normal file
View File

@@ -0,0 +1,61 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# IMPORTS
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
from src.thirdparty.code import *;
from src.thirdparty.config import *;
from src.thirdparty.system import *;
from src.thirdparty.types import *;
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# EXPORTS
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
__all__ = [
'get_env_string',
'get_env_optional_string',
'get_env_int',
'get_env_optional_int',
'get_env_float',
'get_env_optional_float',
];
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# AUXILIARY METHODS
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def get_env_value(env: dict, key: str, default: Any = None) -> Any: # pragma: no cover
return env[key] if key in env else default;
def get_env_string(env: dict, key: str, default: Optional[str] = None) -> str:
result = Result.of(lambda: str(env[key] or default));
if default is None:
return result.unwrap();
return result.unwrap_or(default);
def get_env_optional_string(env: dict, key: str) -> Optional[str]:
result = Result.of(lambda: str(env[key]));
return result.unwrap_or(None);
def get_env_int(env: dict, key: str, default: Optional[int] = None) -> int:
result = Result.of(lambda: int(env[key] or default));
if default is None:
return result.unwrap();
return result.unwrap_or(default);
def get_env_optional_int(env: dict, key: str) -> Optional[int]:
result = Result.of(lambda: int(env[key]));
return result.unwrap_or(None);
def get_env_float(env: dict, key: str, default: Optional[float] = None) -> float:
result = Result.of(lambda: float(env[key] or default));
if default is None:
return result.unwrap();
return result.unwrap_or(default);
def get_env_optional_float(env: dict, key: str) -> Optional[float]:
result = Result.of(lambda: float(env[key]));
return result.unwrap_or(None);

163
src/core/log.py Normal file
View File

@@ -0,0 +1,163 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# IMPORTS
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
from src.thirdparty.code import *;
from src.thirdparty.io import *;
from src.thirdparty.log import *;
from src.thirdparty.misc import *;
from src.thirdparty.types import *;
from src.core.calls import *;
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# EXPORTS
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
__all__ = [
'LOG_LEVELS',
'configure_logging',
'log_info',
'log_debug',
'log_warn',
'log_error',
'log_fatal',
'log_result',
'log_dev',
'catch_fatal',
'prompt_user_input',
'prompt_secure_user_input',
'prompt_confirmation',
];
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# CONSTANTS
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
class LOG_LEVELS(Enum): # pragma: no cover
INFO = logging.INFO;
DEBUG = logging.DEBUG;
# local usage only
_LOGGING_DEBUG_FILE: str = 'logs/debug.log';
T = TypeVar('T');
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# METHODS
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def configure_logging(level: LOG_LEVELS): # pragma: no cover
logging.basicConfig(
format = '%(asctime)s [\x1b[1m%(levelname)s\x1b[0m] %(message)s',
level = level.value,
datefmt = r'%Y-%m-%d %H:%M:%S',
);
return;
def log_debug(*messages: Any):
logging.debug(*messages);
def log_info(*messages: Any):
logging.info(*messages);
def log_warn(*messages: Any):
logging.warning(*messages);
def log_error(*messages: Any):
logging.error(*messages);
def log_fatal(*messages: Any):
logging.fatal(*messages);
exit(1);
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Special Methods
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Intercept fatal errors
def catch_fatal(method: Callable[[], T]) -> T:
try:
return method();
except Exception as e:
log_fatal(e);
def log_result(result: Result[CallState, CallState], debug: bool = False):
'''
Logs safely encapsulated result of call as either debug/info or error.
@inputs
- `result` - the result of the call.
- `debug = False` (default) - if the result is okay, will be logged as an INFO message.
- `debug = True` - if the result is okay, will be logged as a DEBUG message.
'''
if isinstance(result, Ok):
value = result.unwrap();
log_debug(value);
for x in value.data_log:
log_info(x);
else:
err = result.unwrap_err();
log_debug(err);
for e in err.errors:
log_error(e);
return;
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# DEBUG LOGGING FOR DEVELOPMENT
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def log_dev(*messages: Any): # pragma: no cover
with open(_LOGGING_DEBUG_FILE, 'a') as fp:
print(*messages, file=fp);
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# User Input
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def prompt_user_input(message: str, expectedformat: Callable) -> Optional[str]:
answer = None;
while True:
try:
answer = input(f'{message}: ');
## Capture Meta+C:
except KeyboardInterrupt:
print('');
return None;
## Capture Meta+D:
except EOFError:
print('');
return None;
except:
continue;
if expectedformat(answer):
break;
return answer;
def prompt_secure_user_input(message: str, expectedformat: Callable) -> Optional[str]:
answer = None;
while True:
try:
## TODO: zeige **** ohne Zeilenumbruch an.
answer = getpass(f'{message}: ', stream=None);
## Capture Meta+C:
except KeyboardInterrupt:
print('');
return None;
## Capture Meta+D:
except EOFError:
print('');
return None;
except:
continue;
if expectedformat(answer):
break;
return answer;
def prompt_confirmation(message: str, default: bool = False) -> bool:
answer = prompt_user_input(message, lambda x: not not re.match(r'^(y|yes|j|ja|n|no|nein)$', x));
if isinstance(answer, str):
return True if re.match(r'^(y|yes|j|ja)$', answer) else False;
return default;

36
src/core/utils.py Normal file
View File

@@ -0,0 +1,36 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# IMPORTS
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
from src.thirdparty.code import *;
from src.thirdparty.misc import *;
from src.thirdparty.types import *;
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# EXPORTS
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
__all__ = [
'flatten',
'get_timestamp_string',
];
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# METHODS - date, time
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def get_timestamp() -> datetime: # pragma: no cover
return datetime.now();
def get_timestamp_string() -> str: # pragma: no cover
return str(get_timestamp());
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# METHODS arrays
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def flatten(X: list[list[Any]]) -> list[Any]:
return list(itertools_chain.from_iterable(X));