Source code for pytector.langchain

from __future__ import annotations

from typing import Any, Optional

try:
    from langchain_core.runnables import RunnableConfig, RunnableSerializable
except ImportError as exc:
    raise ImportError(
        "LangChain support is optional. Install it with: pip install pytector[langchain]"
    ) from exc

from pydantic import Field, PrivateAttr

from .detector import PromptInjectionDetector


[docs] class PromptInjectionBlockedError(ValueError): """Raised when a prompt is blocked by the guard."""
[docs] class PytectorGuard(RunnableSerializable[str, str]): """ LangChain Runnable that blocks unsafe prompts before downstream steps run. For safe inputs the original string is passed through unchanged. """ model_name_or_url: str = "deberta" threshold: float = 0.5 use_groq: bool = False api_key: Optional[str] = None groq_model: str = "openai/gpt-oss-safeguard-20b" fallback_message: Optional[str] = None block_on_api_error: bool = True detector_kwargs: dict[str, Any] = Field(default_factory=dict) _detector: Optional[PromptInjectionDetector] = PrivateAttr(default=None) def _get_detector(self) -> PromptInjectionDetector: if self._detector is None: init_kwargs: dict[str, Any] = { "model_name_or_url": self.model_name_or_url, "use_groq": self.use_groq, "api_key": self.api_key, "groq_model": self.groq_model, } init_kwargs.update(self.detector_kwargs) self._detector = PromptInjectionDetector(**init_kwargs) return self._detector def _block(self, message: str) -> str: if self.fallback_message is not None: return self.fallback_message raise PromptInjectionBlockedError(message)
[docs] def invoke( self, input: str, config: Optional[RunnableConfig] = None, **kwargs: Any, ) -> str: del config, kwargs if not isinstance(input, str): raise TypeError(f"PytectorGuard expects string input, got {type(input)!r}.") detector = self._get_detector() if detector.use_groq: is_safe = detector.detect_injection_api(input) if is_safe is None: if self.block_on_api_error: return self._block( "Security alert: prompt safety could not be determined due to an API error." ) return input if is_safe is False: return self._block("Security alert: unsafe prompt blocked by safeguard model.") return input is_injection, score = detector.detect_injection(input, threshold=self.threshold) if is_injection: score_text = "unknown" if score is None else f"{score:.4f}" return self._block( "Security alert: prompt injection detected " f"(score={score_text}, threshold={self.threshold:.4f})." ) return input