"""LLM provider HTTP adapters.""" from __future__ import annotations import logging from typing import TYPE_CHECKING import httpx if TYPE_CHECKING: from collections.abc import Sequence from python.ebook_search.config import EbookSearchConfig, RerankConfig logger = logging.getLogger(__name__) def auth_headers(api_key: str) -> dict[str, str]: """Build authorization headers when an API key is configured.""" if api_key == "not-needed": return {} return {"Authorization": f"Bearer {api_key}"} def request_embeddings(texts: Sequence[str], config: EbookSearchConfig) -> list[list[float]]: """Request embeddings from the configured OpenAI-compatible endpoint.""" try: response = httpx.post( f"{config.embedding_base_url.rstrip('/')}/embeddings", headers=auth_headers(config.embedding_api_key), json={"model": config.embedding_model, "input": list(texts)}, timeout=60, ) response.raise_for_status() return embedding_vectors_from_response(response.json()) except (httpx.HTTPError, ValueError, KeyError, TypeError) as error: logger.exception( "ebook_embed_request_failed base_url=%s model=%s count=%s", config.embedding_base_url, config.embedding_model, len(texts), ) msg = f"Embedding request failed. base_url={config.embedding_base_url} model={config.embedding_model}" raise RuntimeError(msg) from error def embedding_vectors_from_response(body: object) -> list[list[float]]: """Extract embedding vectors from an OpenAI-compatible embedding response.""" if not isinstance(body, dict): msg = "Embedding response is not an object" raise TypeError(msg) data = body["data"] if not isinstance(data, list): msg = "Embedding response data is not a list" raise TypeError(msg) vectors: list[list[float]] = [] for item in data: if not isinstance(item, dict): msg = "Embedding item is not an object" raise TypeError(msg) embedding = item["embedding"] if not isinstance(embedding, list): msg = "Embedding value is not a list" raise TypeError(msg) vectors.append([float(value) for value in embedding]) return vectors def request_rerank( query: str, documents: Sequence[str], config: RerankConfig, ) -> object | None: """Request rerank scores from the configured vLLM endpoint.""" payload = { "model": config.model, "query": query, "documents": list(documents), } response = httpx.post( f"{config.base_url.rstrip('/')}/rerank", json=payload, timeout=config.timeout_seconds, ) response.raise_for_status() try: return response.json() except ValueError: logger.debug("ebook_rerank_response_invalid_json", extra={"response": response.text}) return None def request_chat_completion( config: EbookSearchConfig, messages: Sequence[dict[str, str]], ) -> str: """Request a chat completion from the configured OpenAI-compatible endpoint.""" try: response = httpx.post( f"{config.vllm_base_url.rstrip('/')}/chat/completions", headers=auth_headers(config.vllm_api_key), json={ "model": config.chat_model, "messages": list(messages), "temperature": 0, }, timeout=60, ) response.raise_for_status() return chat_content_from_response(response.json()) except (httpx.HTTPError, ValueError, KeyError, TypeError) as error: msg = f"Chat request failed. base_url={config.vllm_base_url} model={config.chat_model}" raise RuntimeError(msg) from error def chat_content_from_response(body: object) -> str: """Extract text content from an OpenAI-compatible chat response.""" if not isinstance(body, dict): msg = "Chat response is not an object" raise TypeError(msg) choices = body["choices"] if not isinstance(choices, list) or not choices: msg = "Chat response has no choices" raise ValueError(msg) first = choices[0] if not isinstance(first, dict): msg = "Chat choice is not an object" raise TypeError(msg) message = first["message"] if not isinstance(message, dict): msg = "Chat message is not an object" raise TypeError(msg) content = message.get("content") or "" if not isinstance(content, str): msg = "Chat content is not text" raise TypeError(msg) return content