"""
Functions and routines associated with Enasis Network Remote Connect.
This file is part of Enasis Network software eco-system. Distribution
is permitted, for more information consult the project license file.
"""
from json import dumps
from json import loads
from queue import Queue
from threading import Event
from typing import Callable
from typing import Optional
from typing import TYPE_CHECKING
from encommon.times import Timer
from encommon.types import DictStrAny
from encommon.types import NCNone
from encommon.types import sort_dict
from httpx import Response
from websockets.exceptions import ConnectionClosedOK
from websockets.sync.client import ClientConnection
from websockets.sync.client import connect
from .models import ClientEvent
from ..utils import HTTPClient
from ..utils import dumlog
from ..utils.http import _METHODS
from ..utils.http import _PAYLOAD
if TYPE_CHECKING:
from .params import ClientParams
PING = {
'action': 'ping',
'seq': 1}
[docs]
class Client:
"""
Establish and maintain connection with the chat service.
:param params: Parameters used to instantiate the class.
"""
__params: 'ClientParams'
__logger: Callable[..., None]
__client: HTTPClient
__socket: Optional[ClientConnection]
__conned: Event
__exited: Event
__mynick: Optional[tuple[str, str]]
__lsnick: Optional[tuple[str, str]]
__mqueue: Queue[ClientEvent]
__cancel: Event
def __init__(
self,
params: 'ClientParams',
logger: Optional[Callable[..., None]] = None,
) -> None:
"""
Initialize instance for class using provided parameters.
"""
self.__params = params
self.__logger = (
logger or dumlog)
client = HTTPClient(
timeout=params.timeout,
verify=params.ssl_verify,
capem=params.ssl_capem)
self.__client = client
self.__socket = None
self.__conned = Event()
self.__exited = Event()
self.__mynick = None
self.__lsnick = None
self.__mqueue = Queue(
params.queue_size)
self.__cancel = Event()
@property
def params(
self,
) -> 'ClientParams':
"""
Return the Pydantic model containing the configuration.
:returns: Pydantic model containing the configuration.
"""
return self.__params
@property
def connected(
self,
) -> bool:
"""
Return the value for the attribute from class instance.
:returns: Value for the attribute from class instance.
"""
return (
not self.__exited.is_set()
and self.__conned.is_set())
@property
def nickname(
self,
) -> Optional[tuple[str, str]]:
"""
Return the value for the attribute from class instance.
:returns: Value for the attribute from class instance.
"""
return self.__mynick or self.__lsnick
@property
def mqueue(
self,
) -> Queue[ClientEvent]:
"""
Return the value for the attribute from class instance.
:returns: Value for the attribute from class instance.
"""
return self.__mqueue
@property
def canceled(
self,
) -> bool:
"""
Return the value for the attribute from class instance.
:returns: Value for the attribute from class instance.
"""
return (
self.__cancel.is_set()
or self.__exited.is_set())
[docs]
def operate(
self,
) -> None:
"""
Operate the client and populate queue with the messages.
"""
logger = self.__logger
try:
logger(item='initial')
self.__socket = None
self.__conned.clear()
self.__exited.clear()
self.__mynick = None
self.__cancel.clear()
logger(item='operate')
self.__operate()
finally:
self.__socket = None
self.__conned.clear()
self.__exited.clear()
self.__mynick = None
self.__cancel.clear()
logger(item='finish')
def __operate(
self,
) -> None:
"""
Operate the client and populate queue with the messages.
"""
logger = self.__logger
self.__connect()
socket = self.__socket
assert socket is not None
timer = Timer(
30, start='min')
self.__identify()
while not self.canceled:
receive = (
self.socket_recv())
if receive is not None:
self.__event(receive)
if timer.pause():
continue # NOCVR
logger(item='ping')
self.socket_send(PING)
logger(item='close')
socket.close(1000)
if self.__exited.is_set():
raise ConnectionError
def __event(
self,
event: DictStrAny,
) -> None:
"""
Operate the client and populate queue with the messages.
:param event: Raw event received from the network peer.
"""
logger = self.__logger
mqueue = self.__mqueue
tneve = event.get('event')
model = ClientEvent
if tneve == 'hello':
logger(item='helo')
object = model(
self, event)
mqueue.put(object)
[docs]
def stop(
self,
) -> None:
"""
Gracefully close the connection with the server socket.
"""
logger = self.__logger
logger(item='stop')
self.__cancel.set()
def __connect(
self,
) -> None:
"""
Establish the connection with the upstream using socket.
"""
logger = self.__logger
_params = self.__params
server = _params.server
logger(item='connect')
socket = connect(
f'wss://{server}/'
'api/v4/websocket')
self.__socket = socket
self.__conned.set()
self.__exited.clear()
def __identify(
self,
) -> None:
"""
Identify the client once the connection is established.
"""
logger = self.__logger
request = self.request
_params = self.__params
token = _params.token
logger(item='identify')
action = (
'authentication'
'_challenge')
data = {'token': token}
self.socket_send({
'seq': 1,
'action': action,
'data': data})
response = request(
'get', 'users/me')
(response
.raise_for_status())
fetch = response.json()
assert isinstance(fetch, dict)
logger(
item='whome',
json=dumps(fetch))
self.__mynick = (
(fetch['username']
.lstrip('@')),
fetch['id'])
self.__lsnick = (
(fetch['username']
.lstrip('@')),
fetch['id'])
[docs]
def socket_send(
self,
send: DictStrAny,
) -> None:
"""
Transmit provided content through the socket connection.
:param send: Content which will be sent through socket.
"""
logger = self.__logger
exited = self.__exited
socket = self.__socket
if socket is None:
return NCNone
transmit = dumps(send)
logger(
item='transmit',
value=transmit)
try:
socket.send(transmit)
except ConnectionClosedOK:
exited.set()
return None
[docs]
def socket_recv( # noqa: CFQ004
self,
) -> Optional[DictStrAny]:
"""
Return the content received from the socket connection.
:returns: Content received from the socket connection.
"""
logger = self.__logger
exited = self.__exited
socket = self.__socket
if socket is None:
return NCNone
try:
recv = socket.recv(1)
logger(
item='receive',
value=recv)
except TimeoutError:
return None
except ConnectionClosedOK:
exited.set()
return None
event = loads(recv)
assert isinstance(event, dict)
type = event.get('event')
if type == 'discon':
exited.set()
return sort_dict(event)
[docs]
def request(
self,
method: _METHODS,
path: str,
params: Optional[_PAYLOAD] = None,
json: Optional[_PAYLOAD] = None,
*,
timeout: Optional[int] = None,
) -> Response:
"""
Return the response for upstream request to the server.
:param method: Method for operation with the API server.
:param path: Path for the location to upstream endpoint.
:param params: Optional parameters included in request.
:param json: Optional JSON payload included in request.
:param timeout: Timeout waiting for the server response.
This will override the default client instantiated.
:returns: Response from upstream request to the server.
"""
params = dict(params or {})
json = dict(json or {})
logger = self.__logger
client = self.__client
_params = self.__params
server = _params.server
port = _params.port
token = _params.token
address = f'{server}:{port}'
tokey = 'Authorization'
content = 'application/json'
headers = {
tokey: f'Bearer {token}',
'Content-Type': content}
location = (
f'https://{address}'
f'/api/v4/{path}')
request = client.request_block
logger(
item='request',
method=method,
path=path,
params=params,
json=(
dumps(json)
if len(json) >= 1
else None))
return request(
method=method,
location=location,
params=params,
headers=headers,
json=json,
timeout=timeout)