"""Minecraft 핸드셰이크 패킷 파서. 마인크래프트 클라이언트는 서버에 접속하면 첫 패킷으로 사용자가 입력한 서버 주소(string)를 포함한 handshake 패킷을 보낸다. 이 모듈은 그 첫 패킷을 잘라서 (protocol_version, server_address, server_port, next_state) 를 반환한다. 패킷 구조: [varint] Packet Length [varint] Packet ID (0x00) [varint] Protocol Version [varint] Server Address Length [string] Server Address ← 클라이언트가 입력한 주소 [ushort] Server Port [varint] Next State (1=status, 2=login) """ from __future__ import annotations import asyncio from dataclasses import dataclass class HandshakeError(Exception): """핸드셰이크 패킷이 비정상일 때.""" def read_varint(data: bytes, offset: int = 0) -> tuple[int, int]: """(value, bytes_consumed) 반환. 5바이트를 넘으면 오류.""" value = 0 shift = 0 pos = offset while True: if pos >= len(data): raise HandshakeError("varint truncated") byte = data[pos] pos += 1 value |= (byte & 0x7F) << shift if not (byte & 0x80): break shift += 7 if shift >= 35: raise HandshakeError("varint too long") return value, pos - offset @dataclass class Handshake: protocol_version: int server_address: str server_port: int next_state: int # 1 = status, 2 = login def parse_handshake(buf: bytes) -> Handshake: """버퍼 시작 위치에서 핸드셰이크 패킷을 파싱.""" pkt_len, n = read_varint(buf, 0) pos = n if len(buf) < pos + pkt_len: raise HandshakeError("packet truncated") pkt_id, n = read_varint(buf, pos) pos += n if pkt_id != 0x00: raise HandshakeError(f"unexpected packet id 0x{pkt_id:02x}") proto, n = read_varint(buf, pos) pos += n addr_len, n = read_varint(buf, pos) pos += n if addr_len < 0 or addr_len > 255 or pos + addr_len > len(buf): raise HandshakeError("invalid address length") address = buf[pos : pos + addr_len].decode("utf-8", errors="replace") pos += addr_len if pos + 2 > len(buf): raise HandshakeError("port truncated") port = int.from_bytes(buf[pos : pos + 2], "big") pos += 2 next_state, _ = read_varint(buf, pos) # Forge / BungeeCord 등이 \x00 으로 메타데이터를 붙이는 경우 있음 address = address.split("\x00", 1)[0].strip() return Handshake(proto, address, port, next_state) async def read_handshake_bytes(reader: asyncio.StreamReader, max_bytes: int = 2048) -> bytes: """길이 varint 를 보고 정확히 첫 패킷 분량만 읽는다. 읽은 바이트는 그대로 보존해서 백엔드로 그대로 forward 할 수 있게 한다. """ buf = bytearray() # length varint: 최대 5바이트 for _ in range(5): chunk = await reader.readexactly(1) buf.extend(chunk) if not (chunk[0] & 0x80): break else: raise HandshakeError("packet length varint too long") pkt_len, n = read_varint(bytes(buf), 0) if pkt_len <= 0 or pkt_len > max_bytes: raise HandshakeError(f"unreasonable packet length: {pkt_len}") remaining = pkt_len - (len(buf) - n) if remaining < 0: raise HandshakeError("inconsistent packet length") if remaining > 0: rest = await reader.readexactly(remaining) buf.extend(rest) return bytes(buf)