Source code for enhomie.homie.threads.stream

"""
Functions and routines associated with Enasis Network Homie Automate.

This file is part of Enasis Network software eco-system. Distribution
is permitted, for more information consult the project license file.
"""



from copy import deepcopy
from dataclasses import dataclass
from typing import TYPE_CHECKING

from encommon.types import DictStrAny

from .thread import HomieThread
from .thread import HomieThreadItem

if TYPE_CHECKING:
    from ..childs import HomieOrigin
    from ..members import HomieStreams



[docs] @dataclass class HomieStreamItem(HomieThreadItem): """ Contain information for sharing using the Python queue. """ event: DictStrAny def __init__( self, origin: 'HomieOrigin', event: DictStrAny, ) -> None: """ Initialize instance for class using provided parameters. """ event = deepcopy(event) self.event = event super().__init__(origin)
[docs] class HomieStream(HomieThread): """ Common methods and routines for Homie Automate threads. """ @property def member( self, ) -> 'HomieStreams': """ Return the value for the attribute from class instance. :returns: Value for the attribute from class instance. """ from ..members import ( HomieStreams) member = super().member assert isinstance( member, HomieStreams) return member
[docs] def operate( self, ) -> None: """ Perform the operation related to Homie service threads. """ member = self.member vacate = member.vacate if vacate.is_set(): return None self.operate_streams()
[docs] def operate_streams( self, ) -> None: """ Perform the operation related to Homie service threads. """ raise NotImplementedError