monop-state/irc_client.py

86 lines
2.7 KiB
Python
Raw Permalink Normal View History

"""Minimal IRC client controller for black-box bot testing."""
import socket
import time
import threading
class IRCClient:
def __init__(self, nick, host="127.0.0.1", port=6667):
self.nick = nick
self.host = host
self.port = port
self.sock = None
self.buffer = ""
self.messages = [] # collected incoming messages
self._lock = threading.Lock()
self._reader = None
self._running = False
def connect(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
self._running = True
self._reader = threading.Thread(target=self._read_loop, daemon=True)
self._reader.start()
self._send(f"NICK {self.nick}")
self._send(f"USER {self.nick} 0 * :{self.nick}")
# Wait for RPL_WELCOME (001) to confirm registration
welcome = self.wait_for(" 001 ", timeout=10)
if not welcome:
raise ConnectionError(f"{self.nick}: registration timed out")
def _send(self, line):
self.sock.sendall((line + "\r\n").encode())
def _read_loop(self):
while self._running:
try:
data = self.sock.recv(4096)
if not data:
break
self.buffer += data.decode("utf-8", errors="replace")
while "\r\n" in self.buffer:
line, self.buffer = self.buffer.split("\r\n", 1)
# Auto-respond to PING
if line.startswith("PING"):
self._send("PONG" + line[4:])
with self._lock:
self.messages.append(line)
except OSError:
break
def join(self, channel):
self._send(f"JOIN {channel}")
time.sleep(0.3)
def say(self, target, message):
self._send(f"PRIVMSG {target} :{message}")
def get_messages(self, clear=True):
with self._lock:
msgs = list(self.messages)
if clear:
self.messages.clear()
return msgs
def wait_for(self, substring, timeout=5.0):
"""Wait until a message containing substring appears. Returns it or None."""
deadline = time.time() + timeout
while time.time() < deadline:
with self._lock:
for i, msg in enumerate(self.messages):
if substring in msg:
self.messages.pop(i)
return msg
time.sleep(0.1)
return None
def quit(self):
self._running = False
try:
self._send("QUIT :bye")
self.sock.close()
except OSError:
pass