Files
mc_domain_proxy/proxy/handshake.py
claude-bot d10dae5cb9 feat: implement MC domain filter proxy, API, dashboard
- proxy: asyncio TCP proxy with handshake parser, domain whitelist,
  transparent backend tunneling, SQLite logging, mtime hot reload
- api: FastAPI routes for config/domains/logs/status + restart trigger
- frontend: React + Vite NPM-style dashboard (dashboard/domains/logs/settings)
- nginx: reverse proxy for /api -> api:8000 and / -> frontend:3000
- docker-compose: full stack with shared data volume
- replace spec mc-domain-filter.md with README.md
2026-05-20 16:39:18 +09:00

106 lines
3.4 KiB
Python

"""Minecraft 핸드셰이크 패킷 파서.
마인크래프트 클라이언트는 서버에 접속하면 첫 패킷으로 사용자가 입력한
서버 주소(string)를 포함한 handshake 패킷을 보낸다. 이 모듈은 그 첫
패킷을 잘라서 (protocol_version, server_address, server_port, next_state)
를 반환한다.
패킷 구조:
[varint] Packet Length
[varint] Packet ID (0x00)
[varint] Protocol Version
[varint] Server Address Length
[string] Server Address ← 클라이언트가 입력한 주소
[ushort] Server Port
[varint] Next State (1=status, 2=login)
"""
from __future__ import annotations
import asyncio
from dataclasses import dataclass
class HandshakeError(Exception):
"""핸드셰이크 패킷이 비정상일 때."""
def read_varint(data: bytes, offset: int = 0) -> tuple[int, int]:
"""(value, bytes_consumed) 반환. 5바이트를 넘으면 오류."""
value = 0
shift = 0
pos = offset
while True:
if pos >= len(data):
raise HandshakeError("varint truncated")
byte = data[pos]
pos += 1
value |= (byte & 0x7F) << shift
if not (byte & 0x80):
break
shift += 7
if shift >= 35:
raise HandshakeError("varint too long")
return value, pos - offset
@dataclass
class Handshake:
protocol_version: int
server_address: str
server_port: int
next_state: int # 1 = status, 2 = login
def parse_handshake(buf: bytes) -> Handshake:
"""버퍼 시작 위치에서 핸드셰이크 패킷을 파싱."""
pkt_len, n = read_varint(buf, 0)
pos = n
if len(buf) < pos + pkt_len:
raise HandshakeError("packet truncated")
pkt_id, n = read_varint(buf, pos)
pos += n
if pkt_id != 0x00:
raise HandshakeError(f"unexpected packet id 0x{pkt_id:02x}")
proto, n = read_varint(buf, pos)
pos += n
addr_len, n = read_varint(buf, pos)
pos += n
if addr_len < 0 or addr_len > 255 or pos + addr_len > len(buf):
raise HandshakeError("invalid address length")
address = buf[pos : pos + addr_len].decode("utf-8", errors="replace")
pos += addr_len
if pos + 2 > len(buf):
raise HandshakeError("port truncated")
port = int.from_bytes(buf[pos : pos + 2], "big")
pos += 2
next_state, _ = read_varint(buf, pos)
# Forge / BungeeCord 등이 \x00 으로 메타데이터를 붙이는 경우 있음
address = address.split("\x00", 1)[0].strip()
return Handshake(proto, address, port, next_state)
async def read_handshake_bytes(reader: asyncio.StreamReader, max_bytes: int = 2048) -> bytes:
"""길이 varint 를 보고 정확히 첫 패킷 분량만 읽는다.
읽은 바이트는 그대로 보존해서 백엔드로 그대로 forward 할 수 있게 한다.
"""
buf = bytearray()
# length varint: 최대 5바이트
for _ in range(5):
chunk = await reader.readexactly(1)
buf.extend(chunk)
if not (chunk[0] & 0x80):
break
else:
raise HandshakeError("packet length varint too long")
pkt_len, n = read_varint(bytes(buf), 0)
if pkt_len <= 0 or pkt_len > max_bytes:
raise HandshakeError(f"unreasonable packet length: {pkt_len}")
remaining = pkt_len - (len(buf) - n)
if remaining < 0:
raise HandshakeError("inconsistent packet length")
if remaining > 0:
rest = await reader.readexactly(remaining)
buf.extend(rest)
return bytes(buf)