Source code for tooz.drivers.file

# -*- coding: utf-8 -*-
#
# Copyright © 2015 eNovance
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import contextlib
import datetime
import errno
import hashlib
import logging
import os
import shutil
import threading
import weakref

from concurrent import futures

import fasteners
from oslo_utils import encodeutils
from oslo_utils import timeutils
import six
import voluptuous

import tooz
from tooz import coordination
from tooz import locking
from tooz import utils

LOG = logging.getLogger(__name__)


class _Barrier(object):
    def __init__(self):
        self.cond = threading.Condition()
        self.owner = None


@contextlib.contextmanager
def _translate_failures():
    try:
        yield
    except (EnvironmentError, voluptuous.Invalid) as e:
        coordination.raise_with_cause(coordination.ToozError,
                                      encodeutils.exception_to_unicode(e),
                                      cause=e)


_SCHEMAS = {
    'group': voluptuous.Schema({
        voluptuous.Required('group_id'): voluptuous.Any(six.text_type,
                                                        six.binary_type),
        # NOTE(sileht): tooz <1.36 was creating file without this
        voluptuous.Optional('encoded'): bool,
    }),
    'member': voluptuous.Schema({
        voluptuous.Required('member_id'): voluptuous.Any(six.text_type,
                                                         six.binary_type),
        voluptuous.Required('joined_on'): datetime.datetime,
        # NOTE(sileht): tooz <1.36 was creating file without this
        voluptuous.Optional('encoded'): bool,
    }, extra=voluptuous.ALLOW_EXTRA),
}


def _convert_from_old_format(data):
    # NOTE(sileht): previous version of the driver was storing str as-is
    # making impossible to read from python3 something written with python2
    # version of the lib.
    # Now everything is stored with explicit type bytes or unicode. This
    # convert the old format to the new one to maintain compat of already
    # deployed file.
    # example of potential old python2 payload:
    # {b"member_id": b"member"}
    # {b"member_id": u"member"}
    # example of potential old python3 payload:
    # {u"member_id": b"member"}
    # {u"member_id": u"member"}
    if six.PY3 and b"member_id" in data or b"group_id" in data:
        data = dict((k.decode("utf8"), v) for k, v in data.items())
        # About member_id and group_id valuse if the file have been written
        # with python2 and in the old format, we can't known with python3
        # if we need to decode the value or not. Python3 see bytes blob
        # We keep it as-is and pray, this have a good change to break if
        # the application was using str in python2 and unicode in python3
        # The member file is often overriden so it's should be fine
        # But the group file can be very old, so we
        # now have to update it each time create_group is called
    return data


def _load_and_validate(blob, schema_key):
    data = utils.loads(blob)
    data = _convert_from_old_format(data)
    schema = _SCHEMAS[schema_key]
    schema(data)
    return data


def _lock_me(lock):

    def wrapper(func):

        @six.wraps(func)
        def decorator(*args, **kwargs):
            with lock:
                return func(*args, **kwargs)

        return decorator

    return wrapper


class FileLock(locking.Lock):
    """A file based lock."""

    def __init__(self, path, barrier, member_id):
        super(FileLock, self).__init__(path)
        self.acquired = False
        self._lock = fasteners.InterProcessLock(path)
        self._barrier = barrier
        self._member_id = member_id

    def is_still_owner(self):
        return self.acquired

    def acquire(self, blocking=True):
        blocking, timeout = utils.convert_blocking(blocking)
        watch = timeutils.StopWatch(duration=timeout)
        watch.start()

        # Make the shared barrier ours first.
        with self._barrier.cond:
            while self._barrier.owner is not None:
                if not blocking or watch.expired():
                    return False
                self._barrier.cond.wait(watch.leftover(return_none=True))
            self._barrier.owner = (threading.current_thread().ident,
                                   os.getpid(), self._member_id)

        # Ok at this point we are now working in a thread safe manner,
        # and now we can try to get the actual lock...
        gotten = False
        try:
            gotten = self._lock.acquire(
                blocking=blocking,
                # Since the barrier waiting may have
                # taken a long time, we have to use
                # the leftover (and not the original).
                timeout=watch.leftover(return_none=True))
        finally:
            # NOTE(harlowja): do this in a finally block to **ensure** that
            # we release the barrier if something bad happens...
            if not gotten:
                # Release the barrier to let someone else have a go at it...
                with self._barrier.cond:
                    self._barrier.owner = None
                    self._barrier.cond.notify_all()

        self.acquired = gotten
        return gotten

    def release(self):
        if not self.acquired:
            return False
        with self._barrier.cond:
            self.acquired = False
            self._barrier.owner = None
            self._barrier.cond.notify_all()
        return True

    def __del__(self):
        if self.acquired:
            LOG.warning("Unreleased lock %s garbage collected", self.name)


[docs]class FileDriver(coordination._RunWatchersMixin, coordination.CoordinationDriver): """A file based driver. This driver uses files and directories (and associated file locks) to provide the coordination driver semantics and required API(s). It **is** missing some functionality but in the future these not implemented API(s) will be filled in. General recommendations/usage considerations: - It does **not** automatically delete members from groups of processes that have died, manual cleanup will be needed for those types of failures. - It is **not** distributed (or recommended to be used in those situations, so the developer using this should really take that into account when applying this driver in there app). """ CHARACTERISTICS = ( coordination.Characteristics.NON_TIMEOUT_BASED, coordination.Characteristics.DISTRIBUTED_ACROSS_THREADS, coordination.Characteristics.DISTRIBUTED_ACROSS_PROCESSES, ) """ Tuple of :py:class:`~tooz.coordination.Characteristics` introspectable enum member(s) that can be used to interogate how this driver works. """ HASH_ROUTINE = 'sha1' """This routine is used to hash a member (or group) id into a filesystem safe name that can be used for member lookup and group joining.""" _barriers = weakref.WeakValueDictionary() """ Barriers shared among all file driver locks, this is required since interprocess locking is not thread aware, so we must add the thread awareness on-top of it instead. """
[docs] def __init__(self, member_id, parsed_url, options): """Initialize the file driver.""" super(FileDriver, self).__init__() self._member_id = member_id self._dir = parsed_url.path self._executor = utils.ProxyExecutor.build("File", options) self._group_dir = os.path.join(self._dir, 'groups') self._driver_lock_path = os.path.join(self._dir, '.driver_lock') self._driver_lock = self._get_raw_lock(self._driver_lock_path, self._member_id) self._reserved_dirs = [self._dir, self._group_dir] self._reserved_paths = list(self._reserved_dirs) self._reserved_paths.append(self._driver_lock_path) self._joined_groups = set() self._safe_member_id = self._make_filesystem_safe(member_id)
@classmethod def _get_raw_lock(cls, path, member_id): lock_barrier = cls._barriers.setdefault(path, _Barrier()) return FileLock(path, lock_barrier, member_id) def get_lock(self, name): path = utils.safe_abs_path(self._dir, name.decode()) if path in self._reserved_paths: raise ValueError("Unable to create a lock using" " reserved path '%s' for lock" " with name '%s'" % (path, name)) return self._get_raw_lock(path, self._member_id) @classmethod def _make_filesystem_safe(cls, item): item = utils.to_binary(item, encoding="utf8") return hashlib.new(cls.HASH_ROUTINE, item).hexdigest() def _start(self): for a_dir in self._reserved_dirs: try: utils.ensure_tree(a_dir) except OSError as e: raise coordination.ToozConnectionError(e) self._executor.start() def _stop(self): while self._joined_groups: self.leave_group(self._joined_groups.pop()) self._executor.stop() def _update_group_metadata(self, path, group_id): details = { u'group_id': utils.to_binary(group_id, encoding="utf8") } details[u'encoded'] = details[u"group_id"] != group_id details_blob = utils.dumps(details) with open(path, "wb") as fh: fh.write(details_blob) def create_group(self, group_id): safe_group_id = self._make_filesystem_safe(group_id) group_dir = os.path.join(self._group_dir, safe_group_id) group_meta_path = os.path.join(group_dir, '.metadata') @_lock_me(self._driver_lock) def _do_create_group(): if os.path.isdir(group_dir): # NOTE(sileht): We update the group metadata even # they are already good, so ensure dict key are convert # to unicode in case of the file have been written with # tooz < 1.36 self._update_group_metadata(group_meta_path, group_id) raise coordination.GroupAlreadyExist(group_id) else: utils.ensure_tree(group_dir) self._update_group_metadata(group_meta_path, group_id) fut = self._executor.submit(_do_create_group) return FileFutureResult(fut) def join_group(self, group_id, capabilities=b""): safe_group_id = self._make_filesystem_safe(group_id) group_dir = os.path.join(self._group_dir, safe_group_id) me_path = os.path.join(group_dir, "%s.raw" % self._safe_member_id) @_lock_me(self._driver_lock) def _do_join_group(): if not os.path.isdir(group_dir): raise coordination.GroupNotCreated(group_id) if os.path.isfile(me_path): raise coordination.MemberAlreadyExist(group_id, self._member_id) details = { u'capabilities': capabilities, u'joined_on': datetime.datetime.now(), u'member_id': utils.to_binary(self._member_id, encoding="utf-8") } details[u'encoded'] = details[u"member_id"] != self._member_id details_blob = utils.dumps(details) with open(me_path, "wb") as fh: fh.write(details_blob) self._joined_groups.add(group_id) fut = self._executor.submit(_do_join_group) return FileFutureResult(fut) def leave_group(self, group_id): safe_group_id = self._make_filesystem_safe(group_id) group_dir = os.path.join(self._group_dir, safe_group_id) me_path = os.path.join(group_dir, "%s.raw" % self._safe_member_id) @_lock_me(self._driver_lock) def _do_leave_group(): if not os.path.isdir(group_dir): raise coordination.GroupNotCreated(group_id) try: os.unlink(me_path) except EnvironmentError as e: if e.errno != errno.ENOENT: raise else: raise coordination.MemberNotJoined(group_id, self._member_id) else: self._joined_groups.discard(group_id) fut = self._executor.submit(_do_leave_group) return FileFutureResult(fut) def get_members(self, group_id): safe_group_id = self._make_filesystem_safe(group_id) group_dir = os.path.join(self._group_dir, safe_group_id) def _read_member_id(path): with open(path, 'rb') as fh: details = _load_and_validate(fh.read(), 'member') if details.get("encoded"): return details[u'member_id'].decode("utf-8") else: return details[u'member_id'] @_lock_me(self._driver_lock) def _do_get_members(): if not os.path.isdir(group_dir): raise coordination.GroupNotCreated(group_id) members = [] try: entries = os.listdir(group_dir) except EnvironmentError as e: # Did someone manage to remove it before we got here... if e.errno != errno.ENOENT: raise else: for entry in entries: if not entry.endswith('.raw'): continue entry_path = os.path.join(group_dir, entry) try: member_id = _read_member_id(entry_path) except EnvironmentError as e: if e.errno != errno.ENOENT: raise else: members.append(member_id) return members fut = self._executor.submit(_do_get_members) return FileFutureResult(fut) def get_member_capabilities(self, group_id, member_id): safe_group_id = self._make_filesystem_safe(group_id) group_dir = os.path.join(self._group_dir, safe_group_id) safe_member_id = self._make_filesystem_safe(member_id) member_path = os.path.join(group_dir, "%s.raw" % safe_member_id) @_lock_me(self._driver_lock) def _do_get_member_capabilities(): try: with open(member_path, "rb") as fh: contents = fh.read() except EnvironmentError as e: if e.errno == errno.ENOENT: if not os.path.isdir(group_dir): raise coordination.GroupNotCreated(group_id) else: raise coordination.MemberNotJoined(group_id, member_id) else: raise else: details = _load_and_validate(contents, 'member') return details.get(u"capabilities") fut = self._executor.submit(_do_get_member_capabilities) return FileFutureResult(fut) def delete_group(self, group_id): safe_group_id = self._make_filesystem_safe(group_id) group_dir = os.path.join(self._group_dir, safe_group_id) @_lock_me(self._driver_lock) def _do_delete_group(): try: entries = os.listdir(group_dir) except EnvironmentError as e: if e.errno == errno.ENOENT: raise coordination.GroupNotCreated(group_id) else: raise else: if len(entries) > 1: raise coordination.GroupNotEmpty(group_id) else: try: shutil.rmtree(group_dir) except EnvironmentError as e: if e.errno != errno.ENOENT: raise fut = self._executor.submit(_do_delete_group) return FileFutureResult(fut) def get_groups(self): def _read_group_id(path): with open(path, 'rb') as fh: details = _load_and_validate(fh.read(), 'group') if details.get("encoded"): return details[u'group_id'].decode("utf-8") else: return details[u'group_id'] @_lock_me(self._driver_lock) def _do_get_groups(): groups = [] for entry in os.listdir(self._group_dir): path = os.path.join(self._group_dir, entry, '.metadata') try: groups.append(_read_group_id(path)) except EnvironmentError as e: if e.errno != errno.ENOENT: raise return groups fut = self._executor.submit(_do_get_groups) return FileFutureResult(fut) def _init_watch_group(self, group_id): group_members_fut = self.get_members(group_id) group_members = group_members_fut.get(timeout=None) self._group_members[group_id].update(group_members) def watch_join_group(self, group_id, callback): self._init_watch_group(group_id) return super(FileDriver, self).watch_join_group(group_id, callback) def unwatch_join_group(self, group_id, callback): return super(FileDriver, self).unwatch_join_group(group_id, callback) def watch_leave_group(self, group_id, callback): self._init_watch_group(group_id) return super(FileDriver, self).watch_leave_group(group_id, callback) def unwatch_leave_group(self, group_id, callback): return super(FileDriver, self).unwatch_leave_group(group_id, callback) @staticmethod def watch_elected_as_leader(group_id, callback): raise tooz.NotImplemented @staticmethod def unwatch_elected_as_leader(group_id, callback): raise tooz.NotImplemented
class FileFutureResult(coordination.CoordAsyncResult): """File asynchronous result that references a future.""" def __init__(self, fut): self._fut = fut def get(self, timeout=None): try: # Late translate the common failures since the file driver # may throw things that we can not catch in the callbacks where # it is used. with _translate_failures(): return self._fut.result(timeout=timeout) except futures.TimeoutError as e: coordination.raise_with_cause(coordination.OperationTimedOut, encodeutils.exception_to_unicode(e), cause=e) def done(self): return self._fut.done()