fix: close old LLM clients when event loop changes
This fixes a critical bug where old AsyncAzureOpenAI and AsyncAnthropicBedrock clients were not being closed when the event loop changed, causing: 1. Connection pool exhaustion → "couldn't get a connection after 30.00 sec" 2. RuntimeError: Event loop is closed during httpx client cleanup Root cause: In LLMClient.call(), when the event loop changed, new clients were created but old clients were not properly closed, leading to connection leaks. Fix: - Added await client.close() for both openai_client and anthropic_client before creating new instances - Added comprehensive unit tests to verify proper cleanup Impact: - Resolves ~150+ test generation failures (500 errors) - Fixes event loop closure errors in aiservice logs Trace IDs affected: 04500fbd-88e0-44e4-8d20-32f6a0dc06cc (and many others) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
c2feaf91f0
commit
322d8736c9
2 changed files with 186 additions and 0 deletions
|
|
@ -93,6 +93,12 @@ class LLMClient:
|
|||
# Recreate provider clients when the event loop changes (stale connections)
|
||||
loop = asyncio.get_running_loop()
|
||||
if loop is not self.client_loop:
|
||||
# Close old clients to prevent connection leaks and event loop closure errors
|
||||
if self.openai_client is not None:
|
||||
await self.openai_client.close()
|
||||
if self.anthropic_client is not None:
|
||||
await self.anthropic_client.close()
|
||||
|
||||
self.client_loop = loop
|
||||
self.background_tasks = set()
|
||||
self.openai_client = AsyncAzureOpenAI() if has_openai else None
|
||||
|
|
|
|||
180
django/aiservice/tests/aiservice/test_llm_client.py
Normal file
180
django/aiservice/tests/aiservice/test_llm_client.py
Normal file
|
|
@ -0,0 +1,180 @@
|
|||
"""Tests for LLM client event loop handling and connection cleanup."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from aiservice.llm import LLMClient
|
||||
|
||||
|
||||
class TestLLMClientEventLoopHandling:
|
||||
"""Test that LLMClient properly handles event loop changes."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_old_clients_are_closed_when_event_loop_changes(self) -> None:
|
||||
"""Test that old HTTP clients are closed when switching event loops.
|
||||
|
||||
This test reproduces the bug where old AsyncAzureOpenAI and
|
||||
AsyncAnthropicBedrock clients are not closed when the event loop
|
||||
changes, causing connection leaks and 'Event loop is closed' errors.
|
||||
"""
|
||||
from aiservice.llm_models import LLM
|
||||
|
||||
client = LLMClient()
|
||||
|
||||
# Mock the API key check so we can test the cleanup logic
|
||||
with (
|
||||
patch("aiservice.llm.has_openai", True),
|
||||
patch("aiservice.llm.has_anthropic", True),
|
||||
patch("aiservice.llm.AsyncAzureOpenAI") as mock_openai_class,
|
||||
patch("aiservice.llm.AsyncAnthropicBedrock") as mock_anthropic_class,
|
||||
):
|
||||
# Create mock instances with close methods
|
||||
# Need to create new instances on each call to simulate proper client creation
|
||||
def create_openai_mock(*args, **kwargs):
|
||||
mock = MagicMock()
|
||||
mock.close = AsyncMock()
|
||||
mock.chat = MagicMock()
|
||||
mock.chat.completions = MagicMock()
|
||||
mock.chat.completions.create = AsyncMock(
|
||||
return_value=MagicMock(
|
||||
choices=[MagicMock(message=MagicMock(content="test"))],
|
||||
usage=MagicMock(prompt_tokens=10, completion_tokens=20),
|
||||
)
|
||||
)
|
||||
return mock
|
||||
|
||||
def create_anthropic_mock(*args, **kwargs):
|
||||
mock = MagicMock()
|
||||
mock.close = AsyncMock()
|
||||
return mock
|
||||
|
||||
mock_openai_class.side_effect = create_openai_mock
|
||||
mock_anthropic_class.side_effect = create_anthropic_mock
|
||||
|
||||
# Make first call in event loop 1
|
||||
test_llm = LLM(
|
||||
name="gpt-4.1",
|
||||
model_type="openai",
|
||||
input_cost=2.0,
|
||||
output_cost=8.0,
|
||||
cached_input_cost=None,
|
||||
)
|
||||
|
||||
await client.call(
|
||||
llm=test_llm,
|
||||
messages=[{"role": "user", "content": "test"}],
|
||||
call_type="test",
|
||||
trace_id="test-trace-1",
|
||||
)
|
||||
|
||||
# Save reference to first clients
|
||||
first_openai_client = client.openai_client
|
||||
first_anthropic_client = client.anthropic_client
|
||||
first_loop = client.client_loop
|
||||
|
||||
assert first_openai_client is not None
|
||||
assert first_anthropic_client is not None
|
||||
assert first_loop == asyncio.get_running_loop()
|
||||
|
||||
# Simulate event loop change by creating a new loop and running in it
|
||||
# In Django/ASGI, this happens when requests are handled by different workers
|
||||
def make_call_in_new_loop():
|
||||
async def inner():
|
||||
# Create a fresh LLM object in the new loop
|
||||
new_llm = LLM(
|
||||
name="gpt-4.1",
|
||||
model_type="openai",
|
||||
input_cost=2.0,
|
||||
output_cost=8.0,
|
||||
cached_input_cost=None,
|
||||
)
|
||||
await client.call(
|
||||
llm=new_llm,
|
||||
messages=[{"role": "user", "content": "test2"}],
|
||||
call_type="test",
|
||||
trace_id="test-trace-2",
|
||||
)
|
||||
|
||||
new_loop = asyncio.new_event_loop()
|
||||
try:
|
||||
new_loop.run_until_complete(inner())
|
||||
finally:
|
||||
new_loop.close()
|
||||
|
||||
# Make call in new event loop
|
||||
await asyncio.to_thread(make_call_in_new_loop)
|
||||
|
||||
# Check that old clients were closed
|
||||
# THIS WILL FAIL with the current buggy code - old clients are NOT closed
|
||||
first_openai_client.close.assert_called_once()
|
||||
first_anthropic_client.close.assert_called_once()
|
||||
|
||||
# Verify new clients were created
|
||||
assert client.openai_client is not first_openai_client
|
||||
assert client.anthropic_client is not first_anthropic_client
|
||||
assert client.client_loop != first_loop
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clients_are_not_recreated_in_same_event_loop(self) -> None:
|
||||
"""Test that clients are reused when called in the same event loop."""
|
||||
from aiservice.llm_models import LLM
|
||||
|
||||
client = LLMClient()
|
||||
|
||||
with (
|
||||
patch("aiservice.llm.has_openai", True),
|
||||
patch("aiservice.llm.has_anthropic", True),
|
||||
patch("aiservice.llm.AsyncAzureOpenAI") as mock_openai_class,
|
||||
patch("aiservice.llm.AsyncAnthropicBedrock") as mock_anthropic_class,
|
||||
):
|
||||
mock_openai_instance = MagicMock()
|
||||
mock_openai_instance.chat = MagicMock()
|
||||
mock_openai_instance.chat.completions = MagicMock()
|
||||
mock_openai_instance.chat.completions.create = AsyncMock(
|
||||
return_value=MagicMock(
|
||||
choices=[MagicMock(message=MagicMock(content="test"))],
|
||||
usage=MagicMock(prompt_tokens=10, completion_tokens=20),
|
||||
)
|
||||
)
|
||||
|
||||
mock_openai_class.return_value = mock_openai_instance
|
||||
mock_anthropic_class.return_value = MagicMock()
|
||||
|
||||
test_llm = LLM(
|
||||
name="gpt-4.1",
|
||||
model_type="openai",
|
||||
input_cost=2.0,
|
||||
output_cost=8.0,
|
||||
cached_input_cost=None,
|
||||
)
|
||||
|
||||
# Make first call
|
||||
await client.call(
|
||||
llm=test_llm,
|
||||
messages=[{"role": "user", "content": "test1"}],
|
||||
call_type="test",
|
||||
trace_id="test-trace-1",
|
||||
)
|
||||
|
||||
first_openai_client = client.openai_client
|
||||
first_anthropic_client = client.anthropic_client
|
||||
|
||||
# Make second call in same event loop
|
||||
await client.call(
|
||||
llm=test_llm,
|
||||
messages=[{"role": "user", "content": "test2"}],
|
||||
call_type="test",
|
||||
trace_id="test-trace-2",
|
||||
)
|
||||
|
||||
# Clients should be the same instances (not recreated)
|
||||
assert client.openai_client is first_openai_client
|
||||
assert client.anthropic_client is first_anthropic_client
|
||||
|
||||
# Constructor should only be called once
|
||||
assert mock_openai_class.call_count == 1
|
||||
assert mock_anthropic_class.call_count == 1
|
||||
Loading…
Reference in a new issue