Files
javis_bot/tests/test_dialogue_memory.py
javis-bot c4abf63f38
Some checks failed
Release / semantic-release (push) Successful in 59s
tests / Unit tests (Linux, Python 3.11) (push) Successful in 13m45s
Release / build-linux (push) Failing after 7m47s
Release / build-windows (push) Has been cancelled
Release / build-macos (arm64, macos-latest) (push) Has been cancelled
Release / build-macos (x64, macos-15-intel) (push) Has been cancelled
Release / release-main (push) Has been cancelled
Release / release-develop (push) Has been cancelled
Add Discord-native hybrid front-end for Jarvis (bot + bridge)
Transform isair/jarvis into a Discord-controlled voice assistant running on
the Ubuntu VNC desktop, keeping the mature ~39k-line Python brain intact.

- bot/ (Node + bun, discord.js): /자비스 slash commands (ephemeral),
  voice channel join + voice receive/playback, pluggable VNC screen broadcast
  (selfbot live / noVNC / screenshot)
- bridge/ (Python, Flask): wraps jarvis STT + run_reply_engine + Piper TTS
  behind a thin localhost HTTP API
- .env.example, scripts/ (start_bridge/start_bot/dev), README rewrite,
  docs/language-comparison.md and docs/vnc-xfce-setup.md

Language decision: hybrid (Python brain + Node/bun Discord layer) because
Discord blocks bot video; native screen broadcast only works via a Node
selfbot library.
2026-06-09 14:51:05 +09:00

630 lines
24 KiB
Python

"""Tests for dialogue memory and diary redaction functionality."""
import pytest
import time
import threading
from unittest.mock import Mock, patch
from datetime import datetime, timezone
from src.jarvis.memory.conversation import (
DialogueMemory,
update_daily_conversation_summary,
update_diary_from_dialogue_memory,
)
from src.jarvis.reply.engine import run_reply_engine
from src.jarvis.utils.redact import redact
@pytest.mark.unit
class TestDialogueMemory:
"""Test dialogue memory conversation flow preservation."""
def test_add_interaction_basic(self):
"""Test basic interaction storage."""
dm = DialogueMemory()
dm.add_interaction("Hello", "Hi there!")
chunks = dm.get_pending_chunks()
assert len(chunks) == 2
assert "User: Hello" in chunks
assert "Assistant: Hi there!" in chunks
def test_add_interaction_preserves_order(self):
"""Test that multiple interactions preserve chronological order."""
dm = DialogueMemory()
dm.add_interaction("First message", "First response")
dm.add_interaction("Second message", "Second response")
chunks = dm.get_pending_chunks()
assert len(chunks) == 4
assert chunks[0] == "User: First message"
assert chunks[1] == "Assistant: First response"
assert chunks[2] == "User: Second message"
assert chunks[3] == "Assistant: Second response"
def test_add_interaction_with_conversation_flow(self):
"""Test storing full conversation flow in user_text."""
dm = DialogueMemory()
conversation_flow = "User: london, please\nAssistant: I'll check London weather\nUser: what's the temperature?\nAssistant: It's 18°C in London"
dm.add_interaction(conversation_flow, "")
chunks = dm.get_pending_chunks()
assert len(chunks) == 1
assert chunks[0] == f"User: {conversation_flow}"
def test_should_update_diary_logic(self):
"""Test diary update timing logic."""
dm = DialogueMemory(inactivity_timeout=1.0) # 1 second timeout
# No interactions yet
assert not dm.should_update_diary()
# Add interaction
dm.add_interaction("Hello", "Hi")
assert not dm.should_update_diary() # Too soon
# Mock time passage
import time
with patch('time.time', return_value=time.time() + 2.0):
assert dm.should_update_diary() # Timeout passed
def test_clear_pending_updates(self):
"""Test clearing pending diary updates."""
dm = DialogueMemory(inactivity_timeout=0.1) # Short timeout for testing
dm.add_interaction("Hello", "Hi")
# Mock time passage to trigger diary update
import time
with patch('time.time', return_value=time.time() + 1.0):
assert dm.should_update_diary()
dm.clear_pending_updates()
assert not dm.should_update_diary()
class TestReplyEngineDialogueMemory:
"""Test reply engine dialogue memory integration."""
@patch('src.jarvis.reply.engine.chat_with_messages')
@patch('src.jarvis.reply.engine.extract_text_from_response')
def test_dialogue_memory_preserves_message_order(self, mock_extract, mock_chat):
"""Test that reply engine stores conversation in correct order."""
# Mock dependencies
mock_extract.return_value = "Final response"
mock_chat.return_value = {"message": {"content": "Final response"}}
# Mock database and config
mock_db = Mock()
mock_cfg = Mock()
mock_cfg.ollama_base_url = "http://localhost:11434"
mock_cfg.ollama_chat_model = "test"
mock_cfg.voice_debug = False
mock_cfg.llm_tools_timeout_sec = 8.0
mock_cfg.llm_embed_timeout_sec = 10.0
mock_cfg.llm_chat_timeout_sec = 45.0
mock_cfg.memory_enrichment_max_results = 5
mock_cfg.location_ip_address = None
mock_cfg.location_auto_detect = False
mock_cfg.agentic_max_turns = 8
# Create dialogue memory
dialogue_memory = DialogueMemory()
# Run reply engine
result = run_reply_engine(
db=mock_db,
cfg=mock_cfg,
tts=None,
text="What's the weather in London?",
dialogue_memory=dialogue_memory
)
# Check that dialogue memory was updated
chunks = dialogue_memory.get_pending_chunks()
assert len(chunks) == 2 # Now stores individual messages
# Check that both messages are stored correctly
assert "User: What's the weather in London?" in chunks
assert "Assistant: Final response" in chunks
@patch('src.jarvis.reply.engine.chat_with_messages')
@patch('src.jarvis.reply.engine.extract_text_from_response')
@patch('src.jarvis.reply.engine.run_tool_with_retries')
def test_dialogue_memory_filters_tool_calls(self, mock_tool, mock_extract, mock_chat):
"""Test that JSON tool calls are filtered from dialogue memory."""
# Mock dependencies
mock_tool.return_value = Mock(reply_text="Weather data", error_message=None)
# Mock multi-turn conversation: structured tool call then final response
mock_chat.side_effect = [
{
"message": {
"content": "",
"tool_calls": [{
"id": "call_12345",
"function": {
"name": "webSearch",
"arguments": {"query": "London weather"}
}
}]
}
},
{"message": {"content": "It's sunny in London today!"}}
]
mock_extract.side_effect = [
"", # Empty content for tool call
"It's sunny in London today!"
]
# Mock database and config
mock_db = Mock()
mock_cfg = Mock()
mock_cfg.ollama_base_url = "http://localhost:11434"
mock_cfg.ollama_chat_model = "test"
mock_cfg.voice_debug = False
mock_cfg.llm_tools_timeout_sec = 8.0
mock_cfg.llm_embed_timeout_sec = 10.0
mock_cfg.llm_chat_timeout_sec = 45.0
mock_cfg.memory_enrichment_max_results = 5
mock_cfg.location_ip_address = None
mock_cfg.location_auto_detect = False
mock_cfg.agentic_max_turns = 8
# Create dialogue memory
dialogue_memory = DialogueMemory()
# Run reply engine
result = run_reply_engine(
db=mock_db,
cfg=mock_cfg,
tts=None,
text="What's the weather in London?",
dialogue_memory=dialogue_memory
)
# Check that dialogue memory was updated
chunks = dialogue_memory.get_pending_chunks()
assert len(chunks) == 2 # User message and assistant response stored separately
# Should include user input and final response
assert "User: What's the weather in London?" in chunks
assert "Assistant: It's sunny in London today!" in chunks
# Should NOT include the tool call
for chunk in chunks:
assert 'call_12345' not in chunk
class TestDiaryRedaction:
"""Test diary redaction functionality."""
def test_redact_sensitive_info(self):
"""Test that sensitive information is properly redacted."""
sensitive_text = "My email is user@example.com and my apikey: sk-abcd1234567890abcdef"
redacted = redact(sensitive_text)
assert "[REDACTED_EMAIL]" in redacted
assert "[REDACTED]" in redacted # API key pattern uses different format
assert "user@example.com" not in redacted
assert "sk-abcd1234567890abcdef" not in redacted
@patch('src.jarvis.memory.conversation.generate_conversation_summary')
def test_diary_update_redacts_chunks(self, mock_summary):
"""Test that diary updates redact sensitive information from chunks."""
# Mock summary generation
mock_summary.return_value = ("Daily summary", ["topic1", "topic2"])
# Mock database
mock_db = Mock()
mock_db.get_conversation_summary.return_value = None
mock_db.upsert_conversation_summary.return_value = 1
# Create chunks with sensitive information
sensitive_chunks = [
"User: My email is sensitive@example.com",
"Assistant: I'll help you with that",
"User: Here's my apikey: sk-abcdef123456"
]
# Call diary update function
result = update_daily_conversation_summary(
db=mock_db,
new_chunks=sensitive_chunks,
ollama_base_url="http://localhost:11434",
ollama_chat_model="test",
ollama_embed_model="test",
source_app="test"
)
# Verify summary was called with redacted chunks
mock_summary.assert_called_once()
redacted_chunks = mock_summary.call_args[0][0] # First argument to generate_conversation_summary
# Check that sensitive info was redacted
redacted_text = " ".join(redacted_chunks)
assert "[REDACTED_EMAIL]" in redacted_text
assert "[REDACTED]" in redacted_text # API key pattern uses different format
assert "sensitive@example.com" not in redacted_text
assert "sk-abcdef123456" not in redacted_text
@patch('src.jarvis.memory.conversation.generate_conversation_summary')
def test_diary_update_preserves_conversation_flow(self, mock_summary):
"""Test that diary updates preserve conversation order after redaction."""
# Mock summary generation
mock_summary.return_value = ("Daily summary", ["topic1", "topic2"])
# Mock database
mock_db = Mock()
mock_db.get_conversation_summary.return_value = None
mock_db.upsert_conversation_summary.return_value = 1
# Create ordered conversation chunks
chunks = [
"User: Hello there",
"Assistant: Hi! How can I help?",
"User: What's the weather?",
"Assistant: Let me check for you"
]
# Call diary update function
result = update_daily_conversation_summary(
db=mock_db,
new_chunks=chunks,
ollama_base_url="http://localhost:11434",
ollama_chat_model="test",
ollama_embed_model="test",
source_app="test"
)
# Verify summary was called with chunks in correct order
mock_summary.assert_called_once()
processed_chunks = mock_summary.call_args[0][0] # First argument
assert len(processed_chunks) == 4
assert processed_chunks[0] == "User: Hello there"
assert processed_chunks[1] == "Assistant: Hi! How can I help?"
assert processed_chunks[2] == "User: What's the weather?"
assert processed_chunks[3] == "Assistant: Let me check for you"
class TestDialogueMemoryIntegration:
"""Integration tests for dialogue memory with redaction."""
def test_full_flow_with_sensitive_data(self):
"""Test complete flow from dialogue memory to redacted diary."""
# Create dialogue memory with sensitive information
dm = DialogueMemory()
sensitive_conversation = (
"User: My email is test@example.com\n"
"Assistant: I can help with that\n"
"User: Here's my apikey: sk-1234567890\n"
"Assistant: Thanks, I'll process that securely"
)
dm.add_interaction(sensitive_conversation, "")
# Get chunks (should contain sensitive info)
chunks = dm.get_pending_chunks()
assert len(chunks) == 1
chunk_content = chunks[0]
assert "test@example.com" in chunk_content
assert "sk-1234567890" in chunk_content
# Simulate diary update redaction
from src.jarvis.utils.redact import redact
redacted_chunks = [redact(chunk) for chunk in chunks]
redacted_content = redacted_chunks[0]
# Verify redaction worked
assert "[REDACTED_EMAIL]" in redacted_content
assert "[REDACTED]" in redacted_content # API key pattern uses different format
assert "test@example.com" not in redacted_content
assert "sk-1234567890" not in redacted_content
# Verify conversation flow is preserved
assert "User: My email is [REDACTED_EMAIL]" in redacted_content
assert "Assistant: I can help with that" in redacted_content
assert "apikey=[REDACTED]" in redacted_content
assert "Assistant: Thanks, I'll process that securely" in redacted_content
@pytest.mark.unit
class TestDialogueMemoryEdgeCases:
"""Test edge cases for dialogue memory thread safety and long conversations."""
def test_thread_safety_concurrent_add_and_read(self):
"""Test that concurrent add and read operations don't cause race conditions."""
dm = DialogueMemory()
errors = []
iterations = 100
def add_messages():
for i in range(iterations):
try:
dm.add_message("user", f"Message {i}")
except Exception as e:
errors.append(f"add_message error: {e}")
def read_messages():
for _ in range(iterations):
try:
dm.get_recent_messages()
dm.get_pending_chunks()
dm.has_recent_messages()
except Exception as e:
errors.append(f"read error: {e}")
# Run concurrent operations
threads = [
threading.Thread(target=add_messages),
threading.Thread(target=read_messages),
threading.Thread(target=add_messages),
threading.Thread(target=read_messages),
]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0, f"Thread safety errors: {errors}"
def test_new_message_during_diary_update_not_lost(self):
"""Test that messages added during diary update are not incorrectly marked as saved."""
dm = DialogueMemory(inactivity_timeout=0.1)
# Add initial message
dm.add_message("user", "First message")
time.sleep(0.01) # Small delay to ensure different timestamp
dm.add_message("assistant", "First response")
# Get current timestamp (simulating what update_diary_from_dialogue_memory does)
snapshot_timestamp = time.time()
# Get pending chunks (2 messages)
chunks_before = dm.get_pending_chunks()
assert len(chunks_before) == 2
# Simulate new message arriving during LLM summarization
time.sleep(0.01)
dm.add_message("user", "New message during update")
# Mark saved up to snapshot (not including new message)
dm.mark_saved_up_to(snapshot_timestamp)
# New message should still be pending
chunks_after = dm.get_pending_chunks()
assert len(chunks_after) == 1
assert "New message during update" in chunks_after[0]
def test_mark_saved_up_to_preserves_new_messages(self):
"""Test that mark_saved_up_to only marks messages up to the given timestamp."""
dm = DialogueMemory()
# Add messages at different times
dm.add_message("user", "Old message 1")
time.sleep(0.05)
cutoff_time = time.time()
time.sleep(0.05)
dm.add_message("user", "New message 2")
time.sleep(0.05)
dm.add_message("user", "New message 3")
# Mark only old messages as saved
dm.mark_saved_up_to(cutoff_time)
# New messages should still be pending
pending = dm.get_pending_chunks()
assert len(pending) == 2
assert any("New message 2" in chunk for chunk in pending)
assert any("New message 3" in chunk for chunk in pending)
def test_long_conversation_forces_diary_update(self):
"""Test that very long conversations force diary update to prevent data loss."""
dm = DialogueMemory(inactivity_timeout=300.0) # 5 minute inactivity timeout
# Add a message and simulate it being old (older than MAX_UNSAVED_AGE_SEC)
dm.add_message("user", "Old message")
# Manually adjust the message timestamp to be old
with dm._lock:
ts, role, content = dm._messages[0]
# Make it older than MAX_UNSAVED_AGE_SEC (which equals inactivity_timeout)
old_ts = time.time() - (dm.MAX_UNSAVED_AGE_SEC + 60)
dm._messages[0] = (old_ts, role, content)
# Should trigger diary update even though user is "active" (recent _last_activity_time)
assert dm.should_update_diary()
def test_long_conversation_does_not_force_if_recent(self):
"""Test that recent messages don't trigger forced diary update."""
dm = DialogueMemory(inactivity_timeout=300.0)
# Add a recent message
dm.add_message("user", "Recent message")
# Should not trigger diary update (not inactive and not too old)
assert not dm.should_update_diary()
def test_cleanup_removes_old_saved_messages(self):
"""Test that old saved messages are cleaned up from memory."""
dm = DialogueMemory()
# Add messages
dm.add_message("user", "Message 1")
time.sleep(0.01)
dm.add_message("user", "Message 2")
# Mark all as saved
dm.clear_pending_updates()
# Manually make messages old (beyond RECENT_WINDOW_SEC)
with dm._lock:
old_ts = time.time() - (dm.RECENT_WINDOW_SEC + 60)
dm._messages = [
(old_ts, role, content) for _, role, content in dm._messages
]
dm._cleanup_old_messages()
# Old saved messages should be removed
assert len(dm._messages) == 0
def test_cleanup_keeps_unsaved_old_messages(self):
"""Test that old unsaved messages are NOT cleaned up (needed for diary)."""
dm = DialogueMemory()
# Add messages
dm.add_message("user", "Unsaved message")
# Manually make message old but don't mark as saved
with dm._lock:
old_ts = time.time() - (dm.RECENT_WINDOW_SEC + 60)
dm._messages = [
(old_ts, role, content) for _, role, content in dm._messages
]
dm._cleanup_old_messages()
# Old unsaved messages should still exist (needed for diary update)
assert len(dm._messages) == 1
def test_has_pending_chunks(self):
"""Test has_pending_chunks method."""
dm = DialogueMemory()
# No messages yet
assert not dm.has_pending_chunks()
# Add message
dm.add_message("user", "Hello")
assert dm.has_pending_chunks()
# Mark as saved
dm.clear_pending_updates()
assert not dm.has_pending_chunks()
def test_should_update_diary_returns_false_when_no_pending(self):
"""Test that should_update_diary returns False when no pending chunks."""
dm = DialogueMemory(inactivity_timeout=0.1)
# No messages
assert not dm.should_update_diary()
# Add and save messages
dm.add_message("user", "Hello")
dm.clear_pending_updates()
# Even after timeout, should return False if no pending
time.sleep(0.15)
assert not dm.should_update_diary()
def test_get_pending_chunks_with_snapshot_empty(self):
"""Snapshot on a fresh DialogueMemory returns empty chunks and zero timestamp."""
dm = DialogueMemory()
chunks, ts = dm.get_pending_chunks_with_snapshot()
assert chunks == []
assert ts == 0.0
def test_get_pending_chunks_with_snapshot_returns_unsaved_messages(self):
"""Snapshot returns chunks for unsaved messages in role.title() format."""
dm = DialogueMemory()
dm.add_message("user", "Hello")
dm.add_message("assistant", "Hi there")
chunks, _ = dm.get_pending_chunks_with_snapshot()
assert len(chunks) == 2
assert chunks[0] == "User: Hello"
assert chunks[1] == "Assistant: Hi there"
def test_get_pending_chunks_with_snapshot_excludes_saved_messages(self):
"""Snapshot excludes messages already marked as saved."""
dm = DialogueMemory()
dm.add_message("user", "Old message")
dm.clear_pending_updates()
dm.add_message("user", "New message")
chunks, _ = dm.get_pending_chunks_with_snapshot()
assert len(chunks) == 1
assert "New message" in chunks[0]
def test_get_pending_chunks_with_snapshot_monotonicity(self):
"""Snapshot timestamp is strictly less than any message added afterwards."""
dm = DialogueMemory()
dm.add_message("user", "Before snapshot")
_, snapshot_ts = dm.get_pending_chunks_with_snapshot()
dm.add_message("user", "After snapshot")
# The message added after the snapshot must have a strictly greater timestamp.
after_ts = dm._messages[-1][0]
assert after_ts > snapshot_ts
def test_get_pending_chunks_with_snapshot_consistent_with_get_pending_chunks(self):
"""get_pending_chunks() is consistent with get_pending_chunks_with_snapshot()."""
dm = DialogueMemory()
dm.add_message("user", "Hello")
dm.add_message("assistant", "World")
chunks_simple = dm.get_pending_chunks()
chunks_snapshot, _ = dm.get_pending_chunks_with_snapshot()
assert chunks_simple == chunks_snapshot
@patch('src.jarvis.memory.conversation.update_daily_conversation_summary')
def test_update_diary_preserves_new_messages_during_slow_llm(self, mock_summary):
"""Integration test: messages arriving during slow LLM call are preserved."""
dm = DialogueMemory(inactivity_timeout=0.1)
mock_db = Mock()
# Add initial messages
dm.add_message("user", "Initial message")
dm.add_message("assistant", "Initial response")
# Simulate slow LLM call that takes time
def slow_summary(*args, **kwargs):
# Simulate user sending new message during LLM call
dm.add_message("user", "Message during LLM call")
return 123 # Return summary ID
mock_summary.return_value = 123
mock_summary.side_effect = slow_summary
# Wait for inactivity timeout
time.sleep(0.15)
# Run diary update
result = update_diary_from_dialogue_memory(
db=mock_db,
dialogue_memory=dm,
ollama_base_url="http://localhost",
ollama_chat_model="test",
ollama_embed_model="test",
force=True,
)
assert result == 123
# New message should still be pending
pending = dm.get_pending_chunks()
assert len(pending) == 1
assert "Message during LLM call" in pending[0]
@pytest.mark.unit
class TestDialogueMemoryUnifiedDurations:
"""Test that DialogueMemory durations are unified from inactivity_timeout."""
def test_recent_window_matches_inactivity_timeout(self):
"""Verify RECENT_WINDOW_SEC equals inactivity_timeout."""
dm = DialogueMemory(inactivity_timeout=300.0)
assert dm.RECENT_WINDOW_SEC == 300.0
def test_max_unsaved_age_matches_inactivity_timeout(self):
"""Verify MAX_UNSAVED_AGE_SEC equals inactivity_timeout."""
dm = DialogueMemory(inactivity_timeout=300.0)
assert dm.MAX_UNSAVED_AGE_SEC == 300.0
def test_all_durations_unified(self):
"""Verify all durations match the configured inactivity_timeout."""
dm = DialogueMemory(inactivity_timeout=600.0)
assert dm.RECENT_WINDOW_SEC == 600.0
assert dm.MAX_UNSAVED_AGE_SEC == 600.0
def test_custom_timeout_propagates(self):
"""Verify a custom timeout drives all durations."""
dm = DialogueMemory(inactivity_timeout=120.0)
assert dm.RECENT_WINDOW_SEC == 120.0
assert dm.MAX_UNSAVED_AGE_SEC == 120.0