444 lines
14 KiB
Python
444 lines
14 KiB
Python
"""
|
|
Integration test: play a real game against monop-irc, periodically dump
|
|
state via .print and .own, and compare against the parser's tracked state.
|
|
"""
|
|
|
|
import sys
|
|
import time
|
|
import re
|
|
|
|
sys.path.insert(0, '.')
|
|
from irc_client import IRCClient
|
|
from monop_parser import MonopParser
|
|
|
|
BOT = 'monop'
|
|
CHANNEL = '#monop'
|
|
|
|
|
|
def extract_bot_msgs(client, wait=1.5):
|
|
"""Collect all PRIVMSG lines from monop bot."""
|
|
time.sleep(wait)
|
|
lines = []
|
|
for m in client.get_messages():
|
|
if f'PRIVMSG {CHANNEL} :' in m:
|
|
sender = m.split('!')[0][1:]
|
|
text = m.split(f'PRIVMSG {CHANNEL} :', 1)[1]
|
|
lines.append((sender, text))
|
|
return lines
|
|
|
|
|
|
def say(client, msg):
|
|
"""Send a message to the channel."""
|
|
client.say(CHANNEL, msg)
|
|
|
|
|
|
def feed_to_parser(parser, lines):
|
|
"""Feed lines to the parser, return the lines for logging."""
|
|
from datetime import datetime
|
|
ts = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
|
|
for sender, text in lines:
|
|
# Parser expects tab-delimited log format: timestamp\tsender\tmessage
|
|
log_line = f"{ts}\t{sender}\t{text}"
|
|
parser.parse_line(log_line)
|
|
return lines
|
|
|
|
|
|
def get_current_player_name(parser):
|
|
"""Get the name of the current player from parser state."""
|
|
state = parser.get_state()
|
|
if not state.get('players'):
|
|
return None
|
|
cp = state.get('currentPlayer')
|
|
if cp is None:
|
|
return None
|
|
for p in state['players']:
|
|
if p['number'] == cp:
|
|
return p['name']
|
|
return None
|
|
|
|
|
|
def parse_print_output(lines):
|
|
"""
|
|
Parse the output of .print command into a dict of square states.
|
|
Format: 'Name Own Price Mg # Rent'
|
|
Each row has two squares side by side separated by 4 spaces.
|
|
"""
|
|
squares = {} # id -> {owner, mortgaged, houses, rent}
|
|
|
|
header_re = re.compile(r'Name\s+Own\s+Price\s+Mg\s+#\s+Rent')
|
|
|
|
# Each printsq outputs: %-10.10s then type-specific fields
|
|
# Property: ' owner group cost mg houses rent'
|
|
# Unowned: ' - group cost '
|
|
# Railroad owned: ' owner Railroad 200 mg count rent'
|
|
# Utility owned: ' owner 150 mg count'
|
|
|
|
board_names = [
|
|
"=== GO ===", "Mediterran", "Community ", "Baltic ave",
|
|
"Income Tax", "Reading RR", "Oriental a", "Chance i",
|
|
"Vermont av", "Connecticu", "Just Visit", "St. Charle",
|
|
"Electric C", "States ave", "Virginia a", "Pennsylvan",
|
|
"St. James ", "Community ", "Tennessee ", "New York a",
|
|
"Free Parki", "Kentucky a", "Chance ii", "Indiana av",
|
|
"Illinois a", "B&O RR", "Atlantic a", "Ventnor av",
|
|
"Water Work", "Marvin Gar", "GO TO JAIL", "Pacific av",
|
|
"N. Carolin", "Community ", "Pennsylvan", "Short Line",
|
|
"Chance iii", "Park place", "Luxury Tax", "Boardwalk "
|
|
]
|
|
|
|
# We'll parse the raw text more carefully
|
|
data_lines = []
|
|
for sender, text in lines:
|
|
if sender == BOT and not header_re.search(text):
|
|
data_lines.append(text)
|
|
|
|
# Each data line has two squares: left (chars 0-33) and right (chars 38+)
|
|
for idx, line in enumerate(data_lines):
|
|
if idx >= 20:
|
|
break
|
|
left_id = idx
|
|
right_id = idx + 20
|
|
|
|
# Parse left half (first ~34 chars)
|
|
left = line[:34] if len(line) >= 34 else line
|
|
right = line[38:] if len(line) >= 38 else ""
|
|
|
|
sq_left = parse_square_str(left, left_id)
|
|
if sq_left:
|
|
squares[left_id] = sq_left
|
|
|
|
sq_right = parse_square_str(right, right_id)
|
|
if sq_right:
|
|
squares[right_id] = sq_right
|
|
|
|
return squares
|
|
|
|
|
|
def parse_square_str(s, sq_id):
|
|
"""Parse a single square string from .print output."""
|
|
if not s or len(s.strip()) == 0:
|
|
return None
|
|
|
|
result = {'id': sq_id}
|
|
|
|
# Property with owner: 'Mediterran 1 Purple 60 0 10'
|
|
# Property no owner: 'Mediterran - Purple 60 '
|
|
# Railroad with owner: 'Reading RR 1 Railroad 200 2 100'
|
|
# Utility with owner: 'Electric C 1 150 2'
|
|
# Safe/CC/Chance: '=== GO === (blanks)'
|
|
|
|
# Try to extract owner
|
|
m = re.match(r'.{10}\s+(\d+|-)\s+(.+)', s)
|
|
if not m:
|
|
# Non-ownable square
|
|
return {'id': sq_id, 'owner': None}
|
|
|
|
owner_str = m.group(1)
|
|
rest = m.group(2).strip()
|
|
|
|
if owner_str == '-':
|
|
result['owner'] = None
|
|
else:
|
|
result['owner'] = int(owner_str)
|
|
|
|
# Try to extract mortgage flag and houses
|
|
# Look for ' * ' (mortgaged) or ' ' (not mortgaged) after cost
|
|
result['mortgaged'] = ' * ' in s[10:] if len(s) > 10 else False
|
|
|
|
# Houses: look for digit before rent at end
|
|
# Format after mortgage flag: 'houses rent' e.g. '2 390' or 'H 1500'
|
|
tail = s[28:].strip() if len(s) > 28 else ''
|
|
if tail:
|
|
hm = re.match(r'(\d|H)\s+(\d+)', tail)
|
|
if hm:
|
|
h = hm.group(1)
|
|
result['houses'] = 5 if h == 'H' else int(h)
|
|
result['rent'] = int(hm.group(2))
|
|
|
|
return result
|
|
|
|
|
|
def parse_own_output(lines):
|
|
"""
|
|
Parse output of .own command.
|
|
Format:
|
|
{name}'s ({n}) holdings (Total worth: ${worth}):
|
|
${money}[, N get-out-of-jail-free card[s]]
|
|
Name Own Price Mg # Rent
|
|
<property lines>
|
|
"""
|
|
result = {}
|
|
current_player = None
|
|
|
|
for sender, text in lines:
|
|
if sender != BOT:
|
|
continue
|
|
|
|
# Holdings header
|
|
m = re.match(r"(\S+(?:\s+\S+)*)'s \((\d+)\) holdings \(Total worth: \$(\d+)\):", text)
|
|
if m:
|
|
current_player = {
|
|
'name': m.group(1),
|
|
'number': int(m.group(2)),
|
|
'totalWorth': int(m.group(3)),
|
|
'money': None,
|
|
'gojf': 0,
|
|
'properties': []
|
|
}
|
|
result[current_player['number']] = current_player
|
|
continue
|
|
|
|
# Money line
|
|
if current_player and text.strip().startswith('$'):
|
|
mm = re.match(r'\s*\$(\d+)', text)
|
|
if mm:
|
|
current_player['money'] = int(mm.group(1))
|
|
gm = re.search(r'(\d+) get-out-of-jail-free card', text)
|
|
if gm:
|
|
current_player['gojf'] = int(gm.group(1))
|
|
continue
|
|
|
|
# Property line (starts with spaces then square name)
|
|
if current_player and text.startswith(' ') and 'Name' not in text:
|
|
current_player['properties'].append(text.strip())
|
|
|
|
return result
|
|
|
|
|
|
def compare_states(parser, print_data, own_data, log):
|
|
"""Compare parser state against .print and .own dumps."""
|
|
state = parser.get_state()
|
|
errors = []
|
|
|
|
# Compare square ownership from .print
|
|
for sq_id, print_sq in print_data.items():
|
|
if 'owner' not in print_sq:
|
|
continue
|
|
parser_sq = state['squares'][sq_id] if sq_id < len(state['squares']) else None
|
|
if parser_sq is None:
|
|
continue
|
|
|
|
parser_owner = parser_sq.get('owner')
|
|
print_owner = print_sq.get('owner')
|
|
|
|
if parser_owner != print_owner:
|
|
errors.append(
|
|
f"Square {sq_id} ({state['squares'][sq_id]['name']}): "
|
|
f"parser owner={parser_owner}, print owner={print_owner}"
|
|
)
|
|
|
|
if 'mortgaged' in print_sq and 'mortgaged' in parser_sq:
|
|
if parser_sq['mortgaged'] != print_sq['mortgaged']:
|
|
errors.append(
|
|
f"Square {sq_id}: parser mortgaged={parser_sq['mortgaged']}, "
|
|
f"print mortgaged={print_sq['mortgaged']}"
|
|
)
|
|
|
|
if 'houses' in print_sq and 'houses' in parser_sq:
|
|
if parser_sq['houses'] != print_sq.get('houses', 0):
|
|
errors.append(
|
|
f"Square {sq_id}: parser houses={parser_sq['houses']}, "
|
|
f"print houses={print_sq.get('houses')}"
|
|
)
|
|
|
|
# Compare player money/gojf from .own
|
|
for pnum, own_p in own_data.items():
|
|
parser_p = None
|
|
for p in state['players']:
|
|
if p['number'] == pnum:
|
|
parser_p = p
|
|
break
|
|
if parser_p is None:
|
|
errors.append(f"Player {pnum} in .own but not in parser")
|
|
continue
|
|
|
|
if own_p['money'] is not None and parser_p['money'] != own_p['money']:
|
|
errors.append(
|
|
f"Player {own_p['name']}: parser money=${parser_p['money']}, "
|
|
f"own money=${own_p['money']}"
|
|
)
|
|
|
|
if parser_p.get('getOutOfJailFreeCards', 0) != own_p['gojf']:
|
|
errors.append(
|
|
f"Player {own_p['name']}: parser gojf={parser_p.get('getOutOfJailFreeCards', 0)}, "
|
|
f"own gojf={own_p['gojf']}"
|
|
)
|
|
|
|
if errors:
|
|
log.append(f" === MISMATCHES ({len(errors)}) ===")
|
|
for e in errors:
|
|
log.append(f" {e}")
|
|
else:
|
|
log.append(f" === ALL MATCH ===")
|
|
|
|
return len(errors)
|
|
|
|
|
|
def play_turn(player_client, observer, parser, log, auto_buy=True):
|
|
"""Play one turn for the given player. Returns lines from bot."""
|
|
name = player_client.nick
|
|
|
|
# Roll
|
|
say(player_client, '.')
|
|
lines = extract_bot_msgs(observer, 2)
|
|
all_lines = list(lines)
|
|
feed_to_parser(parser, lines)
|
|
|
|
# Check if we need to respond to a buy prompt
|
|
for sender, text in lines:
|
|
if text == 'Do you want to buy?':
|
|
if auto_buy:
|
|
say(player_client, '.y')
|
|
else:
|
|
say(player_client, '.n')
|
|
more = extract_bot_msgs(observer, 1)
|
|
all_lines.extend(more)
|
|
feed_to_parser(parser, more)
|
|
break
|
|
|
|
# Log the turn
|
|
for sender, text in all_lines:
|
|
log.append(f"<{sender}> {text}")
|
|
|
|
return all_lines
|
|
|
|
|
|
def dump_and_compare(observer, current_player_client, parser, log, turn_num):
|
|
"""Send .print and .own, parse results, compare to parser state."""
|
|
log.append(f"\n--- State dump after turn {turn_num} ---")
|
|
|
|
# .print
|
|
say(current_player_client, '.print')
|
|
print_lines = extract_bot_msgs(observer, 2)
|
|
feed_to_parser(parser, print_lines)
|
|
|
|
# .own for each player (use .holdings which shows all)
|
|
say(current_player_client, '.holdings')
|
|
own_lines = extract_bot_msgs(observer, 2)
|
|
feed_to_parser(parser, own_lines)
|
|
|
|
print_data = parse_print_output(print_lines)
|
|
own_data = parse_own_output(own_lines)
|
|
|
|
return compare_states(parser, print_data, own_data, log)
|
|
|
|
|
|
def main():
|
|
parser = MonopParser()
|
|
log = []
|
|
total_errors = 0
|
|
total_checks = 0
|
|
|
|
# Connect clients
|
|
alice = IRCClient('alice')
|
|
bob = IRCClient('bob')
|
|
charlie = IRCClient('charlie')
|
|
observer = IRCClient('observer') # dedicated message collector
|
|
|
|
for c in [alice, bob, charlie, observer]:
|
|
c.connect()
|
|
c.join(CHANNEL)
|
|
time.sleep(1)
|
|
|
|
players = {'alice': alice, 'bob': bob, 'charlie': charlie}
|
|
player_order = [] # filled after setup
|
|
|
|
# Collect greeting
|
|
lines = extract_bot_msgs(observer, 2)
|
|
feed_to_parser(parser, lines)
|
|
for _, text in lines:
|
|
log.append(f"<{BOT}> {text}")
|
|
|
|
# Setup: 3 players
|
|
say(alice, '.3')
|
|
lines = extract_bot_msgs(observer, 1)
|
|
feed_to_parser(parser, lines)
|
|
for _, t in lines:
|
|
log.append(f"<{BOT}> {t}")
|
|
|
|
say(alice, '.me')
|
|
lines = extract_bot_msgs(observer, 1)
|
|
feed_to_parser(parser, lines)
|
|
for _, t in lines:
|
|
log.append(f"<{BOT}> {t}")
|
|
|
|
say(bob, '.me')
|
|
lines = extract_bot_msgs(observer, 1)
|
|
feed_to_parser(parser, lines)
|
|
for _, t in lines:
|
|
log.append(f"<{BOT}> {t}")
|
|
|
|
say(charlie, '.me')
|
|
lines = extract_bot_msgs(observer, 3)
|
|
feed_to_parser(parser, lines)
|
|
for _, t in lines:
|
|
log.append(f"<{BOT}> {t}")
|
|
|
|
# Determine turn order from parser
|
|
state = parser.get_state()
|
|
if state.get('players'):
|
|
for p in state['players']:
|
|
player_order.append(p['name'].lower())
|
|
log.append(f"\nTurn order: {player_order}")
|
|
|
|
# Play turns with periodic state dumps
|
|
num_turns = 30
|
|
turn = 0
|
|
for i in range(num_turns):
|
|
if not player_order:
|
|
break
|
|
|
|
current_name = player_order[turn % len(player_order)]
|
|
current_client = players.get(current_name)
|
|
if not current_client:
|
|
turn += 1
|
|
continue
|
|
|
|
log.append(f"\n=== Turn {i+1}: {current_name} ===")
|
|
turn_lines = play_turn(current_client, observer, parser, log)
|
|
|
|
# Check for doubles (gets another turn)
|
|
got_doubles = False
|
|
for sender, text in turn_lines:
|
|
if 'rolled doubles' in text.lower():
|
|
got_doubles = True
|
|
|
|
if not got_doubles:
|
|
turn += 1
|
|
|
|
# Dump state every 5 turns
|
|
if (i + 1) % 5 == 0:
|
|
total_checks += 1
|
|
errs = dump_and_compare(observer, current_client, parser, log, i + 1)
|
|
total_errors += errs
|
|
|
|
# Final dump
|
|
current_name = player_order[turn % len(player_order)]
|
|
current_client = players[current_name]
|
|
total_checks += 1
|
|
errs = dump_and_compare(observer, current_client, parser, log, num_turns)
|
|
total_errors += errs
|
|
|
|
# Print results
|
|
print("=" * 60)
|
|
print("INTEGRATION TEST RESULTS")
|
|
print("=" * 60)
|
|
for line in log:
|
|
print(line)
|
|
print("=" * 60)
|
|
print(f"\nState comparisons: {total_checks}")
|
|
print(f"Total mismatches: {total_errors}")
|
|
if total_errors == 0:
|
|
print("PASS: Parser state matches monop internal state perfectly")
|
|
else:
|
|
print(f"FAIL: {total_errors} mismatches found")
|
|
|
|
# Cleanup
|
|
for c in [alice, bob, charlie, observer]:
|
|
c.quit()
|
|
|
|
return total_errors
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(0 if main() == 0 else 1)
|