Files
javis_bot/tests/test_desktop_app.py
javis-bot c4abf63f38
Some checks failed
Release / semantic-release (push) Successful in 59s
tests / Unit tests (Linux, Python 3.11) (push) Successful in 13m45s
Release / build-linux (push) Failing after 7m47s
Release / build-windows (push) Has been cancelled
Release / build-macos (arm64, macos-latest) (push) Has been cancelled
Release / build-macos (x64, macos-15-intel) (push) Has been cancelled
Release / release-main (push) Has been cancelled
Release / release-develop (push) Has been cancelled
Add Discord-native hybrid front-end for Jarvis (bot + bridge)
Transform isair/jarvis into a Discord-controlled voice assistant running on
the Ubuntu VNC desktop, keeping the mature ~39k-line Python brain intact.

- bot/ (Node + bun, discord.js): /자비스 slash commands (ephemeral),
  voice channel join + voice receive/playback, pluggable VNC screen broadcast
  (selfbot live / noVNC / screenshot)
- bridge/ (Python, Flask): wraps jarvis STT + run_reply_engine + Piper TTS
  behind a thin localhost HTTP API
- .env.example, scripts/ (start_bridge/start_bot/dev), README rewrite,
  docs/language-comparison.md and docs/vnc-xfce-setup.md

Language decision: hybrid (Python brain + Node/bun Discord layer) because
Discord blocks bot video; native screen broadcast only works via a Node
selfbot library.
2026-06-09 14:51:05 +09:00

1058 lines
41 KiB
Python

"""
Tests for desktop_app.py functionality.
Tests crash detection, model support checking, and other utility functions.
Note: GUI components are not tested here - only the underlying logic.
"""
import os
import pytest
import subprocess
import sys
import tempfile
from pathlib import Path
from unittest.mock import patch, MagicMock
class TestEntryPointImports:
"""Guardrails for the PyInstaller entry point (src/desktop_app/app.py).
PyInstaller freezes app.py as __main__ with no parent package, so any
relative import (`from .foo import ...`) raises ImportError at launch
and the bundled app exits silently. Regression guard for the #242 bug
where `from .paths import get_log_dir` inside get_crash_paths() broke
every macOS launch.
"""
def test_app_py_has_no_relative_imports(self):
"""app.py is the frozen entry point — must use absolute imports only."""
import ast
from pathlib import Path
app_py = Path(__file__).parent.parent / "src" / "desktop_app" / "app.py"
tree = ast.parse(app_py.read_text(encoding="utf-8"))
relative_imports = [
f"line {node.lineno}: from {'.' * node.level}{node.module or ''} import ..."
for node in ast.walk(tree)
if isinstance(node, ast.ImportFrom) and node.level > 0
]
assert not relative_imports, (
"app.py is the PyInstaller entry point and runs as __main__ with "
"no package context. Relative imports will raise ImportError at "
"launch. Use `from desktop_app.X import ...` instead.\n"
"Offenders:\n " + "\n ".join(relative_imports)
)
class TestGetCrashPaths:
"""Tests for get_crash_paths() function."""
def test_returns_three_paths(self):
"""get_crash_paths() should return a tuple of 3 paths."""
from desktop_app import get_crash_paths
result = get_crash_paths()
assert isinstance(result, tuple)
assert len(result) == 3
def test_all_paths_are_path_objects(self):
"""All returned paths should be Path objects."""
from desktop_app import get_crash_paths
crash_log, crash_marker, previous_crash = get_crash_paths()
assert isinstance(crash_log, Path)
assert isinstance(crash_marker, Path)
assert isinstance(previous_crash, Path)
def test_paths_have_expected_names(self):
"""Paths should have the expected filenames."""
from desktop_app import get_crash_paths
crash_log, crash_marker, previous_crash = get_crash_paths()
assert crash_log.name == "jarvis_desktop_crash.log"
assert crash_marker.name == ".crash_marker"
assert previous_crash.name == "previous_crash.log"
def test_paths_share_same_parent_directory(self):
"""All crash paths should be in the same directory."""
from desktop_app import get_crash_paths
crash_log, crash_marker, previous_crash = get_crash_paths()
assert crash_log.parent == crash_marker.parent == previous_crash.parent
@patch("sys.platform", "darwin")
def test_macos_uses_library_logs(self):
"""On macOS, should use ~/Library/Logs/Jarvis."""
# Note: This is tricky because the function reads sys.platform at runtime
from desktop_app import get_crash_paths
crash_log, _, _ = get_crash_paths()
if sys.platform == "darwin":
assert "Library" in str(crash_log) or "Logs" in str(crash_log)
class TestCrashMarkerFunctions:
"""Tests for mark_session_started() and mark_session_clean_exit()."""
def test_mark_session_started_creates_marker(self):
"""mark_session_started() should create the crash marker file."""
from desktop_app import get_crash_paths, mark_session_started, mark_session_clean_exit
_, crash_marker, _ = get_crash_paths()
# Clean up first
crash_marker.unlink(missing_ok=True)
assert not crash_marker.exists()
# Start session
mark_session_started()
assert crash_marker.exists()
# Clean up
mark_session_clean_exit()
def test_mark_session_clean_exit_removes_marker(self):
"""mark_session_clean_exit() should remove the crash marker file."""
from desktop_app import get_crash_paths, mark_session_started, mark_session_clean_exit
_, crash_marker, _ = get_crash_paths()
# Create marker
mark_session_started()
assert crash_marker.exists()
# Clean exit
mark_session_clean_exit()
assert not crash_marker.exists()
def test_mark_session_clean_exit_handles_missing_marker(self):
"""mark_session_clean_exit() should not error if marker doesn't exist."""
from desktop_app import get_crash_paths, mark_session_clean_exit
_, crash_marker, _ = get_crash_paths()
crash_marker.unlink(missing_ok=True)
# Should not raise
mark_session_clean_exit()
class TestCheckPreviousCrash:
"""Tests for check_previous_crash() function."""
def test_returns_none_when_no_marker(self):
"""check_previous_crash() should return None if no crash marker exists."""
from desktop_app import get_crash_paths, check_previous_crash, mark_session_clean_exit
# Ensure clean state
mark_session_clean_exit()
result = check_previous_crash()
assert result is None
def test_returns_none_when_marker_but_no_crash_log(self):
"""check_previous_crash() should return None if marker exists but no crash content."""
from desktop_app import get_crash_paths, check_previous_crash, mark_session_started
crash_log, crash_marker, _ = get_crash_paths()
# Create marker but empty/missing crash log
mark_session_started()
crash_log.unlink(missing_ok=True)
result = check_previous_crash()
# Marker should be removed even if no crash content
assert not crash_marker.exists()
def test_returns_content_when_crash_detected(self):
"""check_previous_crash() should return crash content when crash is detected."""
from desktop_app import get_crash_paths, check_previous_crash
crash_log, crash_marker, previous_crash = get_crash_paths()
# Simulate a crash: marker exists and crash log has error content
crash_marker.touch()
crash_content = "Fatal error: Something went wrong\nTraceback (most recent call last):\n File test.py"
crash_log.write_text(crash_content, encoding='utf-8')
result = check_previous_crash()
# Should return the crash content
assert result is not None
assert "Fatal" in result or "Traceback" in result
# Marker should be removed
assert not crash_marker.exists()
# Previous crash should be saved
assert previous_crash.exists()
# Clean up
crash_log.unlink(missing_ok=True)
previous_crash.unlink(missing_ok=True)
def test_ignores_normal_log_content(self):
"""check_previous_crash() should ignore logs without error indicators."""
from desktop_app import get_crash_paths, check_previous_crash
crash_log, crash_marker, _ = get_crash_paths()
# Create marker with normal (non-crash) log content
crash_marker.touch()
crash_log.write_text("Normal startup log\nEverything is fine", encoding='utf-8')
result = check_previous_crash()
# Should return None since no crash indicators
assert result is None
# Marker should still be removed
assert not crash_marker.exists()
# Clean up
crash_log.unlink(missing_ok=True)
class TestCheckModelSupport:
"""Tests for check_model_support() function."""
@patch("jarvis.config.load_config")
def test_returns_none_for_supported_model(self, mock_load_config):
"""check_model_support() should return None for supported models."""
from desktop_app import check_model_support
from jarvis.config import DEFAULT_CHAT_MODEL
mock_load_config.return_value = {"ollama_chat_model": DEFAULT_CHAT_MODEL}
result = check_model_support()
assert result is None
@patch("jarvis.config.load_config")
def test_returns_model_name_for_unsupported_model(self, mock_load_config):
"""check_model_support() should return model name for unsupported models."""
from desktop_app import check_model_support
mock_load_config.return_value = {"ollama_chat_model": "some-unsupported-model:7b"}
result = check_model_support()
assert result == "some-unsupported-model:7b"
@patch("jarvis.config.load_config")
def test_matches_base_model_name(self, mock_load_config):
"""check_model_support() should match base model names without tags."""
from desktop_app import check_model_support
from jarvis.config import SUPPORTED_CHAT_MODELS
# Get a supported model and use just its base name
supported_model = next(iter(SUPPORTED_CHAT_MODELS.keys()))
base_name = supported_model.split(":")[0]
mock_load_config.return_value = {"ollama_chat_model": base_name}
result = check_model_support()
assert result is None # Should be recognized as supported
@patch("jarvis.config.load_config")
def test_handles_config_error_gracefully(self, mock_load_config):
"""check_model_support() should return None on config errors."""
from desktop_app import check_model_support
mock_load_config.side_effect = Exception("Config error")
result = check_model_support()
assert result is None
@patch("jarvis.config.load_config")
def test_uses_default_when_not_configured(self, mock_load_config):
"""check_model_support() should use default model when not in config."""
from desktop_app import check_model_support
mock_load_config.return_value = {} # No ollama_chat_model key
result = check_model_support()
# Default model is supported, so should return None
assert result is None
class TestModelSupportIntegration:
"""Integration tests for model support checking."""
def test_all_supported_models_pass_check(self):
"""All models in SUPPORTED_CHAT_MODELS should pass the support check."""
from desktop_app import check_model_support
from jarvis.config import SUPPORTED_CHAT_MODELS
for model_id in SUPPORTED_CHAT_MODELS:
with patch("jarvis.config.load_config") as mock_config:
mock_config.return_value = {"ollama_chat_model": model_id}
result = check_model_support()
assert result is None, f"Model {model_id} should be supported"
class TestLogViewerReportIssue:
"""Tests for report issue URL generation logic.
Note: We test the URL generation logic directly rather than through the
LogViewerWindow class because Qt GUI components require a display server
and block in test environments.
"""
def test_report_issue_url_generation(self):
"""Report issue should generate correct GitHub issue URL with redacted content."""
import urllib.parse
import webbrowser
from jarvis import get_version
from jarvis.utils.redact import redact
# Simulate what _report_issue does
log_content = (
"Starting Jarvis...\n"
"API token: sk-secret-key-12345\n"
"User email: user@example.com\n"
"Error: Something went wrong\n"
)
# Apply same redaction as the actual method
redacted_logs = redact(log_content, max_len=6000)
try:
version = get_version()
except Exception:
version = "unknown"
# Build URL same as the actual method
title = "Bug Report"
body = f"""## Bug Report
**Version:** {version}
**Platform:** {sys.platform}
### Description
(Please describe what went wrong or what you expected to happen)
### Steps to Reproduce
1.
2.
3.
<details>
<summary>📋 Logs (click to expand)</summary>
```
{redacted_logs}
```
</details>
### Additional Context
(Any other relevant information)
"""
params = urllib.parse.urlencode({
'title': title,
'body': body,
'labels': 'bug'
})
url = f"https://github.com/isair/jarvis/issues/new?{params}"
# Parse and verify
assert url.startswith("https://github.com/isair/jarvis/issues/new?")
parsed = urllib.parse.urlparse(url)
params_parsed = urllib.parse.parse_qs(parsed.query)
# Check title and labels
assert params_parsed['title'][0] == "Bug Report"
assert params_parsed['labels'][0] == "bug"
# Check body contains expected sections
body_decoded = params_parsed['body'][0]
assert "## Bug Report" in body_decoded
assert "### Description" in body_decoded
assert "### Steps to Reproduce" in body_decoded
assert "<details>" in body_decoded
assert "📋 Logs (click to expand)" in body_decoded
# Check that sensitive data was redacted
assert "user@example.com" not in body_decoded
assert "[REDACTED_EMAIL]" in body_decoded
def test_report_issue_truncates_long_logs(self):
"""Report issue should truncate long logs, keeping init section + tail."""
from desktop_app.app import _truncate_logs_for_report, _LOG_SEPARATOR
# Simulate realistic log: header + separator + init + separator + operational logs
init_block = (
"🚀 Jarvis Log Viewer Ready\n"
f"{_LOG_SEPARATOR}\n"
"\n"
"✓ Daemon started\n"
"🧠 Using chat model: llama3.2\n"
"🎤 Using whisper model: large-v3-turbo\n"
"📡 No MCP servers configured\n"
"💾 Initializing dialogue memory...\n"
"✓ Dialogue memory initialized\n"
"📍 Location services disabled\n"
"🔊 Initializing TTS engine (piper)...\n"
"✓ TTS engine started\n"
"🎤 Initializing voice listener...\n"
"✓ Voice listener thread started\n"
f"{_LOG_SEPARATOR}\n"
)
operational = "\n".join([f"[2024-01-{i:02d}] Processing request {i}" for i in range(1, 500)])
long_content = init_block + operational
result = _truncate_logs_for_report(long_content, 5000)
# Verify truncation happened and fits within budget
assert len(result) <= 5000
assert "... (truncated) ..." in result
# Verify init section is preserved (up to last separator)
assert "Jarvis Log Viewer Ready" in result
assert "Using chat model" in result
assert "Voice listener thread started" in result
# Verify recent/tail lines are preserved (end of log)
assert "Processing request 499" in result
def test_report_issue_truncation_preserves_tail(self):
"""Truncation should keep recent logs, not early logs."""
from desktop_app.app import _truncate_logs_for_report, _LOG_SEPARATOR
init_block = f"Header\n{_LOG_SEPARATOR}\n"
lines = [f"line {i}: {'x' * 40}" for i in range(200)]
long_content = init_block + "\n".join(lines)
result = _truncate_logs_for_report(long_content, 3000)
# Last line should be preserved (most recent)
assert "line 199" in result
# Init section should be preserved
assert "Header" in result
assert _LOG_SEPARATOR in result
# Middle lines should be truncated
assert "line 50" not in result
def test_report_issue_no_truncation_when_short(self):
"""Short logs should not be truncated."""
from desktop_app.app import _truncate_logs_for_report
short_content = "line 1\nline 2\nline 3"
result = _truncate_logs_for_report(short_content, 5000)
assert result == short_content
assert "truncated" not in result
def test_report_issue_truncation_no_separator(self):
"""Without a separator, truncation should just keep the tail."""
from desktop_app.app import _truncate_logs_for_report
# No separator (e.g. crash logs)
lines = [f"line {i}: content" for i in range(500)]
long_content = "\n".join(lines)
result = _truncate_logs_for_report(long_content, 3000)
assert len(result) <= 3000
# Tail (recent lines) should be preserved
assert "line 499" in result
# Early lines should be truncated
assert "line 0:" not in result
def test_faulthandler_dump_preserves_fatal_error_line(self):
"""Faulthandler crash dumps should preserve 'Fatal Python error' and current thread."""
from desktop_app.app import _truncate_logs_for_report, _LOG_SEPARATOR
# Simulate realistic crash log: init section + separator + faulthandler dump
init_block = (
"=== Jarvis Desktop App Crash Log ===\n"
"Timestamp: 2026-04-13\n"
"Platform: win32\n"
"==================================================\n"
"\nStarting Jarvis Desktop App...\n"
"Creating QApplication...\n"
"🚀 Jarvis daemon started\n"
f"{_LOG_SEPARATOR}\n"
)
# Faulthandler dump: Fatal error + current thread + many other threads + extension modules
fatal_line = "Fatal Python error: Segmentation fault\n"
current_thread = (
"\nCurrent thread 0x00007c54 (most recent call first):\n"
" File \"some_module.py\", line 42 in critical_function\n"
" File \"app.py\", line 100 in main\n"
)
other_threads = "\n".join([
f"\nThread 0x0000{i:04x} (most recent call first):\n"
f" File \"threading.py\", line 331 in wait\n"
f" File \"module_{i}.py\", line {i * 10} in some_func\n"
f" File \"threading.py\", line 1045 in _bootstrap_inner\n"
f" File \"threading.py\", line 1002 in _bootstrap\n"
for i in range(20)
])
# Large extension modules list (~1700 chars)
ext_modules = "Extension modules: " + ", ".join([f"mod_{i}.sub_{i}" for i in range(120)]) + " (total: 120)\n"
crash_log = init_block + fatal_line + current_thread + other_threads + ext_modules
result = _truncate_logs_for_report(crash_log, 4000)
assert len(result) <= 4000
# Critical: the Fatal error line and current thread MUST be preserved
assert "Fatal Python error: Segmentation fault" in result
assert "critical_function" in result
# Init section should be preserved
assert "Jarvis Desktop App Crash Log" in result
# Extension modules should be summarised, not fully listed
assert "mod_119.sub_119" not in result
def test_faulthandler_extension_modules_trimmed(self):
"""Extension modules line in faulthandler dumps should be shortened."""
from desktop_app.app import _truncate_logs_for_report
# Short log with a huge Extension modules line — should be trimmed even if total is within budget
fatal = "Fatal Python error: Aborted\n\nCurrent thread 0x1234:\n File \"x.py\", line 1\n\n"
ext_modules = "Extension modules: " + ", ".join([f"mod_{i}" for i in range(100)]) + " (total: 100)\n"
log = fatal + ext_modules
result = _truncate_logs_for_report(log, 4000)
# Fatal error should be preserved
assert "Fatal Python error: Aborted" in result
# The full module list should be trimmed
assert "mod_99" not in result
# But summary count should remain
assert "100" in result
def test_faulthandler_budget_too_tight_caps_fatal_section(self):
"""When fatal section exceeds the budget, output must still respect max_len."""
from desktop_app.app import _truncate_logs_for_report, _LOG_SEPARATOR
# Simulate a deep recursion crash: huge fatal section (~5000 chars)
init_block = f"Header\n{_LOG_SEPARATOR}\n"
fatal_line = "Fatal Python error: maximum recursion depth exceeded\n"
deep_stack = "\n".join(
[f" File \"module.py\", line {i} in func_{i}" for i in range(300)]
)
current_thread = f"\nCurrent thread 0x1234 (most recent call first):\n{deep_stack}\n"
other_thread = "\nThread 0x5678 (most recent call first):\n File \"t.py\", line 1\n"
crash_log = init_block + fatal_line + current_thread + other_thread
result = _truncate_logs_for_report(crash_log, 2000)
# Must never exceed the budget
assert len(result) <= 2000
# The fatal error line itself should still be present
assert "Fatal Python error: maximum recursion depth exceeded" in result
def test_faulthandler_fatal_without_thread_headers(self):
"""Fatal error without any 'Thread 0x' headers should extract up to 500 chars."""
from desktop_app.app import _extract_fatal_section
fatal = "Fatal Python error: Aborted\n\nCurrent thread 0x1234 (most recent call first):\n File \"x.py\", line 1 in main\n"
result = _extract_fatal_section(fatal)
assert "Fatal Python error: Aborted" in result
assert "main" in result
def test_faulthandler_extension_modules_without_total(self):
"""Extension modules line without '(total: N)' should use the fallback trim."""
from desktop_app.app import _trim_extension_modules
# No "(total: N)" suffix — should trigger the fallback regex
ext_line = "Extension modules: " + ", ".join([f"mod_{i}" for i in range(100)]) + "\n"
log = "Some log content\n" + ext_line
result = _trim_extension_modules(log)
# Should be trimmed (much shorter than original)
assert len(result) < len(log)
assert "... (trimmed)" in result
# Should keep the first ~80 chars of modules
assert "mod_0" in result
# Should not contain later modules
assert "mod_99" not in result
def test_redaction_handles_multiple_sensitive_patterns(self):
"""Redaction should handle multiple types of sensitive data."""
from jarvis.utils.redact import redact
log_content = (
"Config loaded:\n"
" email: admin@company.com\n"
" jwt_value: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.test\n"
" password: secret123\n"
" hash: a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4\n"
)
redacted = redact(log_content)
# Email should be redacted
assert "admin@company.com" not in redacted
assert "[REDACTED_EMAIL]" in redacted
# JWT should be redacted (when not preceded by token=)
assert "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9" not in redacted
assert "[REDACTED_JWT]" in redacted
# Password assignment should be redacted
assert "secret123" not in redacted
assert "[REDACTED]" in redacted
# Long hex string should be redacted
assert "a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4" not in redacted
assert "[REDACTED_HEX]" in redacted
class TestDiaryIPCProtocol:
"""Tests for diary dialog IPC protocol parsing.
Note: Qt tests require a QApplication which may conflict with pytest fixtures.
These tests focus on the IPC protocol parsing logic.
"""
def test_diary_ipc_prefix_constant(self):
"""Diary IPC prefix should be a valid string constant."""
from desktop_app.diary_dialog import DIARY_IPC_PREFIX
assert isinstance(DIARY_IPC_PREFIX, str)
assert len(DIARY_IPC_PREFIX) > 0
# Prefix should be unique enough to not conflict with normal log lines
assert DIARY_IPC_PREFIX == "__DIARY__:"
def test_ipc_event_format_is_parseable(self):
"""IPC event format should be valid JSON after prefix."""
import json
from desktop_app.diary_dialog import DIARY_IPC_PREFIX
# Test various event types
events = [
{"type": "chunks", "data": ["chunk1", "chunk2"]},
{"type": "token", "data": "hello"},
{"type": "status", "data": "Writing..."},
{"type": "complete", "data": True},
]
for event in events:
line = f"{DIARY_IPC_PREFIX}{json.dumps(event)}"
# Should be parseable
assert line.startswith(DIARY_IPC_PREFIX)
json_str = line[len(DIARY_IPC_PREFIX):]
parsed = json.loads(json_str)
assert parsed == event
def test_normal_log_lines_dont_match_prefix(self):
"""Normal daemon log lines should not start with IPC prefix."""
from desktop_app.diary_dialog import DIARY_IPC_PREFIX
# Common log patterns that should NOT be intercepted
normal_logs = [
"Starting Jarvis daemon...",
"✓ Daemon started",
"📝 Updating diary...",
"🔄 Daemon shutting down...",
"✅ Diary update complete",
"",
"DEBUG: some message",
]
for log in normal_logs:
assert not log.startswith(DIARY_IPC_PREFIX), f"Log line should not match prefix: {log}"
class TestDaemonExitLogMessage:
"""Tests for the DaemonThread exit log message logic.
Verifies that a graceful stop (via request_stop) emits a success message,
while an unexpected exit emits a warning message. Tests the guard logic
directly to avoid importing the daemon module (which has heavy side effects).
"""
def _simulate_exit_log(self, stop_requested):
"""Replicate the DaemonThread.run() exit log logic."""
emitted = []
def mock_emit(msg):
emitted.append(msg)
# Replicate the logic from app.py DaemonThread.run()
if stop_requested:
mock_emit("✅ Daemon stopped gracefully\n")
else:
mock_emit("⚠️ Daemon exited unexpectedly\n")
return emitted
def test_graceful_stop_emits_success_message(self):
"""When is_stop_requested() is True, should emit graceful stop message."""
emitted = self._simulate_exit_log(stop_requested=True)
assert len(emitted) == 1
assert "gracefully" in emitted[0]
assert "" in emitted[0]
def test_unexpected_exit_emits_warning_message(self):
"""When is_stop_requested() is False, should emit unexpected exit message."""
emitted = self._simulate_exit_log(stop_requested=False)
assert len(emitted) == 1
assert "unexpectedly" in emitted[0]
assert "⚠️" in emitted[0]
def test_graceful_stop_does_not_emit_warning(self):
"""Graceful stop should not contain 'unexpected' wording."""
emitted = self._simulate_exit_log(stop_requested=True)
assert "unexpected" not in emitted[0].lower()
def test_unexpected_exit_does_not_emit_success(self):
"""Unexpected exit should not contain 'gracefully' wording."""
emitted = self._simulate_exit_log(stop_requested=False)
assert "gracefully" not in emitted[0].lower()
class TestSingleInstanceLock:
"""Tests for the single-instance locking mechanism.
Focuses on the regression where 'w' mode truncated the lock file before
the lock attempt, destroying the existing instance's PID.
"""
def test_get_existing_instance_pid_reads_pid(self, tmp_path):
"""get_existing_instance_pid() should return the PID stored in the lock file."""
from desktop_app.app import get_existing_instance_pid
lock_file = tmp_path / "jarvis_desktop.lock"
lock_file.write_bytes(b"12345")
with patch("desktop_app.app.get_lock_file_path", return_value=lock_file):
pid = get_existing_instance_pid()
assert pid == 12345
def test_get_existing_instance_pid_returns_none_when_empty(self, tmp_path):
"""get_existing_instance_pid() should return None for an empty lock file."""
from desktop_app.app import get_existing_instance_pid
lock_file = tmp_path / "jarvis_desktop.lock"
lock_file.write_bytes(b"")
with patch("desktop_app.app.get_lock_file_path", return_value=lock_file):
pid = get_existing_instance_pid()
assert pid is None
def test_get_existing_instance_pid_returns_none_when_missing(self, tmp_path):
"""get_existing_instance_pid() should return None when the lock file is absent."""
from desktop_app.app import get_existing_instance_pid
lock_file = tmp_path / "jarvis_desktop.lock"
with patch("desktop_app.app.get_lock_file_path", return_value=lock_file):
pid = get_existing_instance_pid()
assert pid is None
def test_lock_file_not_truncated_on_failed_lock_attempt(self, tmp_path):
"""The existing PID must still be readable after a failed lock attempt.
This is the core regression: opening with 'w' truncated the file before
the lock call, so get_existing_instance_pid() returned None and the
'close existing' flow broke with "Could not find existing instance PID."
"""
from desktop_app.app import get_existing_instance_pid
lock_file = tmp_path / "jarvis_desktop.lock"
existing_pid = 99999
lock_file.write_bytes(str(existing_pid).encode())
# Simulate a failed lock attempt by opening the file in append+read binary
# mode (the fixed mode) and then locking failure — the file must be intact.
fh = open(lock_file, 'a+b')
try:
# Verify the file still has the original PID content after being
# opened non-destructively.
fh.seek(0)
content = fh.read().decode().strip()
assert content == str(existing_pid), (
f"Lock file was truncated on open — PID {existing_pid} was lost. "
"This reproduces the bug where 'w' mode destroyed the PID before "
"the lock attempt completed."
)
finally:
fh.close()
with patch("desktop_app.app.get_lock_file_path", return_value=lock_file):
pid = get_existing_instance_pid()
assert pid == existing_pid, (
"get_existing_instance_pid() should still return the existing PID "
"after a failed lock attempt."
)
def test_acquire_lock_writes_current_pid(self, tmp_path):
"""acquire_single_instance_lock() should write the current process PID."""
import desktop_app.app as app_module
lock_file = tmp_path / "jarvis_desktop.lock"
original_handle = app_module._lock_file_handle
try:
with patch("desktop_app.app.get_lock_file_path", return_value=lock_file):
result = app_module.acquire_single_instance_lock()
assert result is True
# PID should be readable from a separate handle because the lock
# is at _LOCK_OFFSET, not at byte 0.
content = lock_file.read_text().strip()
assert content == str(os.getpid()), (
f"Lock file should contain current PID {os.getpid()}, got {content!r}"
)
finally:
# Release lock so the file handle is closed
if app_module._lock_file_handle and app_module._lock_file_handle is not original_handle:
try:
app_module._lock_file_handle.close()
except Exception:
pass
app_module._lock_file_handle = original_handle
@pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific lock test")
def test_lock_blocks_second_process_and_pid_readable(self, tmp_path):
"""On Windows, the lock must block a second process while keeping the PID readable."""
import desktop_app.app as app_module
import subprocess
lock_file = tmp_path / "jarvis_desktop.lock"
original_handle = app_module._lock_file_handle
try:
with patch("desktop_app.app.get_lock_file_path", return_value=lock_file):
result = app_module.acquire_single_instance_lock()
assert result is True
# Child process: try to acquire the same lock and read the PID
child_code = '''
import msvcrt, sys
LOCK_OFFSET = 1024
lock_path = r"""''' + str(lock_file) + '''"""
fh = open(lock_path, "a+b")
fh.seek(LOCK_OFFSET)
try:
msvcrt.locking(fh.fileno(), msvcrt.LK_NBLCK, 1)
print("LOCK_ACQUIRED")
except OSError:
print("LOCK_BLOCKED")
fh.close()
try:
pid = open(lock_path).read().strip()
print("PID_READ=" + pid)
except Exception as e:
print("PID_FAILED=" + str(e))
'''
proc = subprocess.run(
[sys.executable, "-c", child_code],
capture_output=True, text=True, timeout=10,
)
lines = proc.stdout.strip().splitlines()
assert "LOCK_BLOCKED" in lines, (
f"Child should have been blocked from acquiring lock, got: {lines}"
)
pid_line = [l for l in lines if l.startswith("PID_READ=")]
assert pid_line, f"Child should have read the PID, got: {lines}"
assert pid_line[0] == f"PID_READ={os.getpid()}"
finally:
if app_module._lock_file_handle and app_module._lock_file_handle is not original_handle:
try:
app_module._lock_file_handle.close()
except Exception:
pass
app_module._lock_file_handle = original_handle
class TestCudaRecoveryAction:
"""The tray exposes a 'Reinstall GPU libraries' action when the user has an
NVIDIA GPU but the runtime CUDA probe failed. The recovery flow is the only
way to retry the CUDA download from the user's perspective: the original
Inno Setup task only fires once, and the marker file used to prevent
re-runs even after a half-successful install.
These tests cover the platform-gating logic and the command-line shape so
we can change the implementation without breaking the contract.
"""
def test_action_hidden_off_windows(self):
from desktop_app.cuda_recovery import cuda_recovery_action
with patch("sys.platform", "darwin"):
assert cuda_recovery_action(install_root=Path("/fake")) is None
with patch("sys.platform", "linux"):
assert cuda_recovery_action(install_root=Path("/fake")) is None
def test_action_hidden_when_no_nvidia_gpu(self, tmp_path):
from desktop_app.cuda_recovery import cuda_recovery_action
# No nvcuda.dll, no NVIDIA driver -> no point offering the action.
with patch("sys.platform", "win32"), patch(
"desktop_app.cuda_recovery._has_nvidia_driver", return_value=False
):
assert cuda_recovery_action(install_root=tmp_path) is None
def test_action_hidden_when_install_script_missing(self, tmp_path):
from desktop_app.cuda_recovery import cuda_recovery_action
# On dev machines (running from source) the Inno Setup-bundled script
# does not exist; the menu action would be a dead button.
with patch("sys.platform", "win32"), patch(
"desktop_app.cuda_recovery._has_nvidia_driver", return_value=True
):
assert cuda_recovery_action(install_root=tmp_path) is None
def test_action_present_when_windows_gpu_and_script_exist(self, tmp_path):
from desktop_app.cuda_recovery import cuda_recovery_action
script = tmp_path / "install_cuda.ps1"
script.write_text("# placeholder\n", encoding="utf-8")
with patch("sys.platform", "win32"), patch(
"desktop_app.cuda_recovery._has_nvidia_driver", return_value=True
):
action = cuda_recovery_action(install_root=tmp_path)
assert action is not None
assert action.script_path == script
assert action.target_dir == tmp_path / "cuda"
assert "Reinstall GPU libraries" in action.label
# Command is what gets handed to ShellExecute / subprocess; the test
# pins the structure so we don't accidentally drop -ExecutionPolicy
# Bypass and silently fail under restricted policies.
assert action.executable.lower().endswith("powershell.exe")
assert "-ExecutionPolicy" in action.arguments
assert "Bypass" in action.arguments
assert "-File" in action.arguments
assert str(script) in action.arguments
assert str(tmp_path / "cuda") in action.arguments
assert "-LogPath" in action.arguments
def test_quote_arg_handles_trailing_backslash(self):
"""Trailing backslashes inside quoted args must not eat the closing quote.
Windows argv parsing collapses 2n backslashes before a `"` into n
backslashes plus a string terminator, so a path like
`C:\\Program Files\\Jarvis\\` quoted naively becomes
`"C:\\Program Files\\Jarvis\\"` which CommandLineToArgvW reads as
`C:\\Program Files\\Jarvis"` — quote eaten, next arg fused on. The
canonical fix is to double trailing backslashes.
"""
from desktop_app.cuda_recovery import _quote_arg
# Trailing backslash + space gets doubled inside the quotes.
result = _quote_arg(r"C:\Program Files\Jarvis\\")
assert result.endswith('\\\\\\\\"'), (
f"trailing backslashes must be doubled before the closing quote; got {result!r}"
)
# An embedded quote escapes correctly.
assert _quote_arg('a"b') == '"a\\"b"'
# Plain paths with spaces get the simple quoted form.
assert _quote_arg(r"C:\Users\me\file") == r"C:\Users\me\file"
assert _quote_arg(r"C:\Program Files\App") == r'"C:\Program Files\App"'
# Empty string round-trips to "" so ShellExecute doesn't drop the slot.
assert _quote_arg("") == '""'
def test_run_uses_elevation_on_windows(self, tmp_path):
"""The script writes into Program Files; without elevation it silently
no-ops. Make sure the run path requests UAC explicitly."""
from desktop_app.cuda_recovery import CudaRecoveryAction, run_action
action = CudaRecoveryAction(
label="🎮 Reinstall GPU libraries",
script_path=tmp_path / "install_cuda.ps1",
target_dir=tmp_path / "cuda",
executable=r"C:\Windows\System32\WindowsPowerShell\v1.0\powershell.exe",
arguments=[
"-NoProfile", "-ExecutionPolicy", "Bypass",
"-File", str(tmp_path / "install_cuda.ps1"),
"-TargetDir", str(tmp_path / "cuda"),
"-LogPath", str(tmp_path / "cuda" / "install.log"),
],
)
action.script_path.write_text("# placeholder\n", encoding="utf-8")
captured = {}
def fake_shell_execute(hwnd, verb, file, params, directory, show):
captured["verb"] = verb
captured["file"] = file
captured["params"] = params
return 42 # ShellExecuteW returns >32 on success
with patch("sys.platform", "win32"), patch(
"desktop_app.cuda_recovery._shell_execute", side_effect=fake_shell_execute
):
run_action(action)
assert captured.get("verb") == "runas", (
"must request UAC elevation; install_cuda.ps1 writes to Program Files"
)
assert captured["file"].lower().endswith("powershell.exe")
# The argument string should reference the script and the target dir.
assert "install_cuda.ps1" in captured["params"]
assert "-LogPath" in captured["params"]
class TestMemoryViewerModulePath:
"""Tests to verify memory viewer module references are valid.
These tests catch issues like wrong module paths in subprocess calls
without requiring actual GUI/server components.
"""
def test_memory_viewer_module_is_importable(self):
"""The module used for subprocess mode should be importable."""
import importlib
pytest.importorskip("flask")
# This is the module path used in MemoryViewerWindow.start_server()
# If this fails, the subprocess command will fail at runtime
module = importlib.import_module("desktop_app.memory_viewer")
assert hasattr(module, "app"), "memory_viewer should have Flask 'app' attribute"
assert hasattr(module, "main"), "memory_viewer should have 'main' function"
def test_memory_viewer_subprocess_module_runs(self):
"""The module should be runnable with python -m (with correct PYTHONPATH)."""
pytest.importorskip("flask")
# Set PYTHONPATH the same way start_server() does
src_path = Path(__file__).parent.parent / "src"
env = os.environ.copy()
env["PYTHONPATH"] = str(src_path)
# Test that the module can at least be imported in subprocess
result = subprocess.run(
[sys.executable, "-c", "import desktop_app.memory_viewer"],
capture_output=True,
text=True,
timeout=10,
env=env,
)
assert result.returncode == 0, f"Module import failed: {result.stderr}"
def test_memory_viewer_module_path_matches_code(self):
"""Verify the module path in start_server matches the actual location."""
import re
from pathlib import Path
# Read the actual code to find the module path used
app_py = Path(__file__).parent.parent / "src" / "desktop_app" / "app.py"
content = app_py.read_text(encoding="utf-8")
# Find the subprocess module path
match = re.search(r'"-m",\s*"([^"]+)"', content)
assert match, "Could not find subprocess module path in app.py"
module_path = match.group(1)
assert module_path == "desktop_app.memory_viewer", (
f"Module path should be 'desktop_app.memory_viewer', found '{module_path}'"
)