"""Minimal IRC client controller for black-box bot testing.""" import socket import time import threading class IRCClient: def __init__(self, nick, host="127.0.0.1", port=6667): self.nick = nick self.host = host self.port = port self.sock = None self.buffer = "" self.messages = [] # collected incoming messages self._lock = threading.Lock() self._reader = None self._running = False def connect(self): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((self.host, self.port)) self._running = True self._reader = threading.Thread(target=self._read_loop, daemon=True) self._reader.start() self._send(f"NICK {self.nick}") self._send(f"USER {self.nick} 0 * :{self.nick}") # Wait for RPL_WELCOME (001) to confirm registration welcome = self.wait_for(" 001 ", timeout=10) if not welcome: raise ConnectionError(f"{self.nick}: registration timed out") def _send(self, line): self.sock.sendall((line + "\r\n").encode()) def _read_loop(self): while self._running: try: data = self.sock.recv(4096) if not data: break self.buffer += data.decode("utf-8", errors="replace") while "\r\n" in self.buffer: line, self.buffer = self.buffer.split("\r\n", 1) # Auto-respond to PING if line.startswith("PING"): self._send("PONG" + line[4:]) with self._lock: self.messages.append(line) except OSError: break def join(self, channel): self._send(f"JOIN {channel}") time.sleep(0.3) def say(self, target, message): self._send(f"PRIVMSG {target} :{message}") def get_messages(self, clear=True): with self._lock: msgs = list(self.messages) if clear: self.messages.clear() return msgs def wait_for(self, substring, timeout=5.0): """Wait until a message containing substring appears. Returns it or None.""" deadline = time.time() + timeout while time.time() < deadline: with self._lock: for i, msg in enumerate(self.messages): if substring in msg: self.messages.pop(i) return msg time.sleep(0.1) return None def quit(self): self._running = False try: self._send("QUIT :bye") self.sock.close() except OSError: pass