Source code for enconnect.irc.test.test_models
"""
Functions and routines associated with Enasis Network Remote Connect.
This file is part of Enasis Network software eco-system. Distribution
is permitted, for more information consult the project license file.
"""
from threading import Thread
from time import sleep as block_sleep
from encommon.types import inrepr
from encommon.types import instr
from encommon.types import lattrs
from pytest import raises
from .helpers import EVENTS
from .helpers import SVENTS
from ..client import Client
from ..models import ClientEvent
from ..params import ClientParams
from ...fixtures import IRCClientSocket
[docs]
def test_ClientEvent() -> None:
"""
Perform various tests associated with relevant routines.
"""
params = ClientParams(
server='mocked',
port=6667,
nickname='ircbot',
username='ircbot',
realname='ircbot',
ssl_enable=False)
client = Client(params)
_event = (
':server PING'
' :123456789')
event = ClientEvent(
client, _event)
attrs = lattrs(event)
assert attrs == [
'prefix',
'command',
'params',
'original',
'kind',
'isme',
'hasme',
'whome',
'author',
'recipient',
'message']
assert inrepr(
'ClientEvent(prefix',
event)
with raises(TypeError):
hash(event)
assert instr(
"prefix='server'",
event)
assert event.original
assert event.kind == 'event'
assert not event.isme
assert not event.hasme
assert not event.whome
assert not event.author
assert not event.recipient
assert not event.message
[docs]
def test_ClientEvent_cover( # noqa: CFQ001
client_ircsock: IRCClientSocket,
) -> None:
"""
Perform various tests associated with relevant routines.
.. note::
Duplicative test routine as below but with EVENTS.
:param client_ircsock: Object to mock client connection.
"""
params = ClientParams(
server='mocked',
port=6667,
nickname='ircbot',
username='ircbot',
realname='ircbot',
ssl_enable=False)
client = Client(params)
def _operate() -> None:
client_ircsock(EVENTS)
_raises = ConnectionError
with raises(_raises):
client.operate()
thread = Thread(
target=_operate)
thread.start()
mqueue = client.mqueue
item = mqueue.get()
assert item.prefix == 'mocked'
assert item.command == '001'
assert item.params == (
'ircbot :Welcome to network')
assert item.kind == 'event'
assert not item.isme
assert not item.hasme
assert item.whome == 'ircbot'
assert not item.author
assert not item.recipient
assert not item.message
assert not client.canceled
assert client.connected
assert client.nickname == 'ircbot'
item = mqueue.get()
assert item.prefix == 'mocked'
assert item.command == '376'
assert item.params == (
'ircbot :End of /MOTD command.')
assert item.kind == 'event'
assert not item.isme
assert not item.hasme
assert item.whome == 'ircbot'
assert not item.author
assert not item.recipient
assert not item.message
item = mqueue.get()
assert item.prefix == 'mocked'
assert item.command == '376'
assert item.params == (
'ircbot :End of /MOTD command.')
assert item.kind == 'event'
assert not item.isme
assert not item.hasme
assert item.whome == 'ircbot'
assert not item.author
assert not item.recipient
assert not item.message
item = mqueue.get()
assert item.prefix == 'mocked'
assert item.command == '376'
assert item.params == (
'ircbot :End of /MOTD command.')
assert item.kind == 'event'
assert not item.isme
assert not item.hasme
assert item.whome == 'ircbot'
assert not item.author
assert not item.recipient
assert not item.message
item = mqueue.get()
assert item.prefix == (
'nick!user@host')
assert item.command == 'PRIVMSG'
assert item.params == (
'ircbot :Hello ircbot')
assert item.kind == 'privmsg'
assert not item.isme
assert item.hasme
assert item.whome == 'ircbot'
assert item.author == 'nick'
assert item.recipient == 'ircbot'
assert item.message == (
'Hello ircbot')
item = mqueue.get()
assert item.prefix == (
'nick!user@host')
assert item.command == 'PRIVMSG'
assert item.params == (
'# :Hello world')
assert item.kind == 'chanmsg'
assert not item.isme
assert not item.hasme
assert item.whome == 'ircbot'
assert item.author == 'nick'
assert item.recipient == '#'
assert item.message == (
'Hello world')
item = mqueue.get()
assert item.prefix == (
'nick!user@host')
assert item.command == 'PRIVMSG'
assert item.params == (
'#funchat :Hello world')
assert item.kind == 'chanmsg'
assert not item.isme
assert not item.hasme
assert item.whome == 'ircbot'
assert item.author == 'nick'
assert item.recipient == '#funchat'
assert item.message == (
'Hello world')
item = mqueue.get()
assert item.prefix == (
'ircbot!user@host')
assert item.command == 'NICK'
assert item.params == ':botirc'
assert item.kind == 'event'
assert not item.isme
assert not item.hasme
assert item.whome == 'botirc'
assert not item.author
assert not item.recipient
assert not item.message
assert client.nickname == 'botirc'
item = mqueue.get()
assert item.prefix == (
'botirc!user@host')
assert item.command == 'PRIVMSG'
assert item.params == (
'# :Hello nick')
assert item.kind == 'chanmsg'
assert item.isme
assert not item.hasme
assert item.whome == 'botirc'
assert item.author == 'botirc'
assert item.recipient == '#'
assert item.message == (
'Hello nick')
item = mqueue.get()
assert not item.prefix
assert item.command == 'ERROR'
assert item.params == (
':Closing Link: botirc'
'[mocked] (Quit: botirc)')
assert item.kind == 'event'
assert not item.isme
assert not item.hasme
assert item.whome == 'botirc'
assert not item.author
assert not item.recipient
assert not item.message
block_sleep(1)
assert not client.canceled
assert not client.connected
assert client.nickname == 'botirc'
thread.join(10)
assert mqueue.empty()
[docs]
def test_ClientEvent_service(
client_ircsock: IRCClientSocket,
) -> None:
"""
Perform various tests associated with relevant routines.
.. note::
Duplicative test routine as above but with SVENTS.
:param client_ircsock: Object to mock client connection.
"""
sname = 'jupiter.enasis.net'
about = 'Network Services'
params = ClientParams(
server='mocked',
port=6900,
operate='service',
servername=sname,
password='password',
realname=about)
client = Client(params)
def _operate() -> None:
client_ircsock(SVENTS)
_raises = ConnectionError
with raises(_raises):
client.operate()
thread = Thread(
target=_operate)
thread.start()
mqueue = client.mqueue
for count in range(20):
item = mqueue.get()
assert not item.prefix
assert not item.command
assert not item.params
assert item.kind == 'event'
assert (
not item.isme
if count != 13
else item.isme)
assert not item.author
assert not item.recipient
assert not item.message
assert client.nickname == '42X'
thread.join(10)
assert mqueue.empty()