Source code for machin.parallel.server.ordered_server

from abc import ABC, abstractmethod
from collections import OrderedDict
from threading import Lock
from copy import deepcopy
from ..distributed import RpcGroup, debug_with_process


[docs]class OrderedServerBase(ABC): # pragma: no cover """ Descendent classes of OrderedServer does not have to guarantee strong consistency, that is, even if :meth:`.OrderedServerBase.push_service`` has returned True, there are possibilities that these acknowledged push are discarded. """
[docs] @abstractmethod def push(self, key, value, version, prev_version): """ Push a new ``version`` of ``value`` in ``key`` to the ordered server. Note: If ``version = prev_version`` then there is no order guarantee. But you may exploit this feature. Args: key: Key. value: value. version: New version. prev_version: Previous version. Returns: ``True`` if success, and ``False`` if not. """ pass
[docs] @abstractmethod def pull(self, key, version=None): """ Pull a value with the specified ``version`` in ``key``. Args: key: Key. version: Target version, if ``None``, then the newest version of value of key will be pulled. Returns: ``None`` if version is not found, auto-deleted, or key is not found, otherwise returns value with the specified ``version`` in ``key``, and then ``version`` """ pass
[docs]class OrderedServerSimple(OrderedServerBase): def __init__(self, server_name: str, group: RpcGroup): self._push_service = server_name + "/_push_service" self._pull_service = server_name + "/_pull_service" self.group = group
[docs] def push(self, key, value, version, prev_version): # DOC INHERITED debug_with_process( f"calling push service {self._push_service} " f"on group [{self.group.group_name}]" ) return self.group.registered_sync( self._push_service, args=(key, value, version, prev_version) )
[docs] def pull(self, key, version=None): # DOC INHERITED debug_with_process( f"calling pull service {self._push_service} " f"on group [{self.group.group_name}]" ) return self.group.registered_sync(self._pull_service, args=(key, version))
[docs]class OrderedServerSimpleImpl: """ A simple key-value server, with strict ordered update """ def __init__(self, server_name: str, group: RpcGroup, version_depth: int = 1, **__): """ This init function must be only invoked on the runner process, and the runner process must be a member process of ``group``. Args: server_name: Name of this server, used to registered the server as a paired class of ``group``. group: Rpc group where server locates. server_runner: Name of the process serving the ordered server. By default is the first member of the group. version_depth: Storage depth of old versions of the same key. If ``depth = 1``, then only the newest version of the key will be saved. """ assert group.is_member() assert version_depth > 0 and isinstance(version_depth, int) self.server_name = server_name self.group = group self.storage = {} self.lock = Lock() self.version_depth = version_depth # pair an accessor to group self.group.pair(server_name, OrderedServerSimple(self.server_name, self.group)) self.group.register(server_name + "/_push_service", self._push_service) self.group.register(server_name + "/_pull_service", self._pull_service) def _push_service(self, key, value, version, prev_version): success = False with self.lock: if key in self.storage: ref = self.storage[key] # Check previous version consistency. if next(reversed(ref)) == prev_version: ref[version] = value success = True if len(ref) > self.version_depth + 1: ref.popitem(last=False) else: # Create a new key. ref = self.storage[key] = OrderedDict() ref[version] = value success = True return success def _pull_service(self, key, version=None): result = None with self.lock: if key in self.storage: ref = self.storage[key] # Try to find the target version. if version is not None and version in ref: result = (deepcopy(ref[version]), version) # Find the newest version. elif version is None: version = next(reversed(ref)) result = (deepcopy(ref[version]), version) return result