Source code for zoonado.recipes.allocator

from __future__ import unicode_literals

import collections
import itertools
import json
import logging

from tornado import gen, ioloop

from .data_watcher import DataWatcher
from .party import Party
from .lock import Lock
from .recipe import Recipe


log = logging.getLogger(__name__)


class Allocator(Recipe):

    sub_recipes = {
        "party": (Party, ["member_path", "name"]),
        "lock": (Lock, ["lock_path"]),
        "data_watcher": DataWatcher,
    }

    def __init__(self, base_path, name, allocator_fn=None):
        self.name = name

        super(Allocator, self).__init__(base_path)

        if allocator_fn is None:
            allocator_fn = round_robin

        self.allocator_fn = allocator_fn

        self.active = False

        self.full_allocation = collections.defaultdict(set)
        self.full_set = set()

    @property
    def lock_path(self):
        return self.base_path + "/lock"

    @property
    def member_path(self):
        return self.base_path + "/members"

    @property
    def allocation(self):
        return self.full_allocation[self.name]

    def validate(self, new_allocation):
        as_list = []
        for subset in new_allocation.values():
            as_list.extend(list(subset))

        # make sure there are no duplicates among the subsets
        assert len(as_list) == len(set(as_list)), (
            "duplicate items found in allocation: %s" % self.full_allocation
        )

        # make sure there's no mismatch beween the full set and allocations
        assert len(self.full_set.symmetric_difference(set(as_list))) == 0, (
            "mismatch between full set and allocation: %s vs %s" % (
                self.full_set, self.full_allocation
            )
        )

    @gen.coroutine
    def start(self):
        self.active = True

        yield self.ensure_path()

        yield self.party.join()

        self.data_watcher.add_callback(self.base_path, self.handle_data_change)

        ioloop.IOLoop.current().add_callback(self.monitor_member_changes)

    @gen.coroutine
    def add(self, new_item):
        new_set = self.full_set.copy().add(new_item)
        yield self.update_set(new_set)

    @gen.coroutine
    def remove(self, new_item):
        new_set = self.full_set.copy().remove(new_item)
        yield self.update_set(new_set)

    @gen.coroutine
    def update(self, new_items):
        new_items = set(new_items)
        data = json.dumps(list(new_items))

        with (yield self.lock.acquire()):
            yield self.client.set_data(self.base_path, data=data)

    def monitor_member_changes(self):
        while self.active:
            yield self.party.wait_for_change()
            if not self.active:
                break

            self.allocate()

    def handle_data_change(self, new_set_data):
        if new_set_data is None:
            return

        new_set_data = set(json.loads(new_set_data))
        if new_set_data == self.full_set:
            return

        self.full_set = new_set_data
        self.allocate()

    def allocate(self):
        new_allocation = self.allocator_fn(self.party.members, self.full_set)
        self.validate(new_allocation)

        self.full_allocation = new_allocation

    @gen.coroutine
    def stop(self):
        yield self.party.leave()

        self.data_watcher.remove_callback(
            self.base_path, self.handle_data_change
        )


[docs]def round_robin(members, items): """ Default allocator with a round robin approach. In this algorithm, each member of the group is cycled over and given an item until there are no items left. This assumes roughly equal capacity for each member and aims for even distribution of item counts. """ allocation = collections.defaultdict(set) for member, item in zip(itertools.cycle(members), items): allocation[member].add(item) return allocation