Source code for enhomie.ubiquiti.test.test_origin

"""
Functions and routines associated with Enasis Network Homie Automate.

This file is part of Enasis Network software eco-system. Distribution
is permitted, for more information consult the project license file.
"""



from typing import TYPE_CHECKING

from encommon.types import DictStrAny
from encommon.types import lattrs
from encommon.utils import load_sample
from encommon.utils import prep_sample
from encommon.utils.sample import ENPYRWS

from . import SAMPLES
from ..origin import UbiqOrigin
from ..update import UbiqUpdateItem

if TYPE_CHECKING:
    from ...homie import Homie
    from ...homie import HomieService
    from ...utils import TestBodies



[docs] def test_UbiqOrigin( homie: 'Homie', replaces: DictStrAny, bodies: 'TestBodies', ) -> None: """ Perform various tests associated with relevant routines. :param homie: Primary class instance for Homie Automate. :param replaces: Mapping of what to replace in samples. :param bodies: Locations and groups for use in testing. """ childs = homie.childs origins = childs.origins family = 'ubiquiti' samples = SAMPLES / 'origin' planets = bodies.planets for planet in planets: name = f'{planet}_{family}' origin = origins[name] assert isinstance( origin, UbiqOrigin) attrs = lattrs(origin) assert attrs == [ '_HomieChild__homie', '_HomieChild__name', '_HomieChild__params', '_UbiqOrigin__router', '_UbiqOrigin__fetch', '_UbiqOrigin__merge'] origin.validate() assert origin.homie assert origin.name == name assert origin.family == family assert origin.kind == 'origin' assert origin.params assert origin.dumped assert origin.router assert not origin.fetch assert not origin.merge assert origin.refresh() assert origin.fetch assert origin.merge sample_path = ( f'{samples}/fetch' f'/{planet}.json') sample = load_sample( path=sample_path, update=ENPYRWS, content=origin.fetch, replace=replaces) expect = prep_sample( content=origin.fetch, replace=replaces) assert expect == sample sample_path = ( f'{samples}/merge' f'/{planet}.json') sample = load_sample( path=sample_path, update=ENPYRWS, content=origin.merge, replace=replaces) expect = prep_sample( content=origin.merge, replace=replaces) assert expect == sample
[docs] def test_UbiqOrigin_update( homie: 'Homie', service: 'HomieService', bodies: 'TestBodies', ) -> None: """ Perform various tests associated with relevant routines. :param homie: Primary class instance for Homie Automate. :param service: Ancilary Homie Automate class instance. :param bodies: Locations and groups for use in testing. """ assert service.updates childs = homie.childs origins = childs.origins member = service.updates uqueue = member.uqueue planets = bodies.planets for planet in planets: origin = origins[ f'{planet}_ubiquiti'] assert isinstance( origin, UbiqOrigin) assert origin.refresh() fetch = origin.fetch uitem = origin.get_update() assert isinstance( uitem, UbiqUpdateItem) uitem.fetch['foo'] = 'bar' origin.set_update(uitem) origin.put_update(uqueue) _fetch = origin.fetch assert _fetch != fetch assert uqueue.qsize == 2
[docs] def test_UbiqOrigin_source( homie: 'Homie', bodies: 'TestBodies', ) -> None: """ Perform various tests associated with relevant routines. :param homie: Primary class instance for Homie Automate. :param bodies: Locations and groups for use in testing. """ childs = homie.childs origins = childs.origins planets = bodies.planets for planet in planets: origin = origins[ f'{planet}_ubiquiti'] origin.refresh() source = origin.source( label='Nothing') assert source is None source1 = origin.source( unique='192.168.1.120') source2 = origin.source( unique='192.168.2.120') assert source1 or source2 source = origin.source( label='Phone') assert source is not None