85 lines
2.7 KiB
Python
85 lines
2.7 KiB
Python
"""Minimal IRC client controller for black-box bot testing."""
|
|
|
|
import socket
|
|
import time
|
|
import threading
|
|
|
|
|
|
class IRCClient:
|
|
def __init__(self, nick, host="127.0.0.1", port=6667):
|
|
self.nick = nick
|
|
self.host = host
|
|
self.port = port
|
|
self.sock = None
|
|
self.buffer = ""
|
|
self.messages = [] # collected incoming messages
|
|
self._lock = threading.Lock()
|
|
self._reader = None
|
|
self._running = False
|
|
|
|
def connect(self):
|
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.sock.connect((self.host, self.port))
|
|
self._running = True
|
|
self._reader = threading.Thread(target=self._read_loop, daemon=True)
|
|
self._reader.start()
|
|
self._send(f"NICK {self.nick}")
|
|
self._send(f"USER {self.nick} 0 * :{self.nick}")
|
|
# Wait for RPL_WELCOME (001) to confirm registration
|
|
welcome = self.wait_for(" 001 ", timeout=10)
|
|
if not welcome:
|
|
raise ConnectionError(f"{self.nick}: registration timed out")
|
|
|
|
def _send(self, line):
|
|
self.sock.sendall((line + "\r\n").encode())
|
|
|
|
def _read_loop(self):
|
|
while self._running:
|
|
try:
|
|
data = self.sock.recv(4096)
|
|
if not data:
|
|
break
|
|
self.buffer += data.decode("utf-8", errors="replace")
|
|
while "\r\n" in self.buffer:
|
|
line, self.buffer = self.buffer.split("\r\n", 1)
|
|
# Auto-respond to PING
|
|
if line.startswith("PING"):
|
|
self._send("PONG" + line[4:])
|
|
with self._lock:
|
|
self.messages.append(line)
|
|
except OSError:
|
|
break
|
|
|
|
def join(self, channel):
|
|
self._send(f"JOIN {channel}")
|
|
time.sleep(0.3)
|
|
|
|
def say(self, target, message):
|
|
self._send(f"PRIVMSG {target} :{message}")
|
|
|
|
def get_messages(self, clear=True):
|
|
with self._lock:
|
|
msgs = list(self.messages)
|
|
if clear:
|
|
self.messages.clear()
|
|
return msgs
|
|
|
|
def wait_for(self, substring, timeout=5.0):
|
|
"""Wait until a message containing substring appears. Returns it or None."""
|
|
deadline = time.time() + timeout
|
|
while time.time() < deadline:
|
|
with self._lock:
|
|
for i, msg in enumerate(self.messages):
|
|
if substring in msg:
|
|
self.messages.pop(i)
|
|
return msg
|
|
time.sleep(0.1)
|
|
return None
|
|
|
|
def quit(self):
|
|
self._running = False
|
|
try:
|
|
self._send("QUIT :bye")
|
|
self.sock.close()
|
|
except OSError:
|
|
pass
|