Skip to content

trulens.providers.openai.endpoint

trulens.providers.openai.endpoint

Dev Notes

This class makes use of langchain's cost tracking for openai models. Changes to the involved classes will need to be adapted here. The important classes are:

  • langchain.schema.LLMResult
  • langchain.callbacks.openai_info.OpenAICallbackHandler

Changes for openai 1.0

  • Previously we instrumented classes openai.* and their methods create and acreate. Now we instrument classes openai.resources.* and their create methods. We also instrument openai.resources.chat.* and their create. To be determined is the instrumentation of the other classes/modules under openai.resources.

  • openai methods produce structured data instead of dicts now. langchain expects dicts so we convert them to dicts.

Classes

OpenAIClient

Bases: SerialModel

A wrapper for openai clients.

This class allows wrapped clients to be serialized into json. Does not serialize API key though. You can access openai.OpenAI under the client attribute. Any attributes not defined by this wrapper are looked up from the wrapped client so you should be able to use this instance as if it were an openai.OpenAI instance.

Attributes
REDACTED_KEYS class-attribute
REDACTED_KEYS: List[str] = ['api_key', 'default_headers']

Parameters of the OpenAI client that will not be serialized because they contain secrets.

client class-attribute instance-attribute
client: Union[OpenAI, AzureOpenAI] = Field(exclude=True)

Deserialized representation.

client_cls instance-attribute
client_cls: Class

Serialized representation class.

client_kwargs instance-attribute
client_kwargs: dict

Serialized representation constructor arguments.

Functions
__rich_repr__
__rich_repr__() -> Result

Requirement for pretty printing using the rich package.

OpenAICallback

Bases: EndpointCallback

Attributes
endpoint class-attribute instance-attribute
endpoint: Endpoint = Field(exclude=True)

The endpoint owning this callback.

cost class-attribute instance-attribute
cost: Cost = Field(default_factory=Cost)

Costs tracked by this callback.

Functions
__rich_repr__
__rich_repr__() -> Result

Requirement for pretty printing using the rich package.

handle
handle(response: Any) -> None

Called after each request.

handle_chunk
handle_chunk(response: Any) -> None

Called after receiving a chunk from a request.

handle_classification
handle_classification(response: Any) -> None

Called after each classification response.

OpenAIEndpoint

Bases: Endpoint

OpenAI endpoint.

Instruments "create" methods in openai client.

PARAMETER DESCRIPTION
client

openai client to use. If not provided, a new client will be created using the provided kwargs.

TYPE: Optional[Union[OpenAI, AzureOpenAI, OpenAIClient]] DEFAULT: None

**kwargs

arguments to constructor of a new OpenAI client if client not provided.

TYPE: dict DEFAULT: {}

Attributes
tru_class_info instance-attribute
tru_class_info: Class

Class information of this pydantic object for use in deserialization.

Using this odd key to not pollute attribute names in whatever class we mix this into. Should be the same as CLASS_INFO.

instrumented_methods class-attribute
instrumented_methods: Dict[
    Any, List[Tuple[Callable, Callable, Type[Endpoint]]]
] = defaultdict(list)

Mapping of classes/module-methods that have been instrumented for cost tracking along with the wrapper methods and the class that instrumented them.

Key is the class or module owning the instrumented method. Tuple value has:

  • original function,

  • wrapped version,

  • endpoint that did the wrapping.

name instance-attribute
name: str

API/endpoint name.

rpm class-attribute instance-attribute

Requests per minute.

retries class-attribute instance-attribute
retries: int = 3

Retries (if performing requests using this class).

post_headers class-attribute instance-attribute
post_headers: Dict[str, str] = Field(
    default_factory=dict, exclude=True
)

Optional post headers for post requests if done by this class.

pace class-attribute instance-attribute
pace: Pace = Field(
    default_factory=lambda: Pace(
        marks_per_second=DEFAULT_RPM / 60.0,
        seconds_per_period=60.0,
    ),
    exclude=True,
)

Pacing instance to maintain a desired rpm.

global_callback class-attribute instance-attribute
global_callback: EndpointCallback = Field(exclude=True)

Track costs not run inside "track_cost" here.

Also note that Endpoints are singletons (one for each unique name argument) hence this global callback will track all requests for the named api even if you try to create multiple endpoints (with the same name).

callback_class class-attribute instance-attribute
callback_class: Type[EndpointCallback] = Field(exclude=True)

Callback class to use for usage tracking.

callback_name class-attribute instance-attribute
callback_name: str = Field(exclude=True)

Name of variable that stores the callback noted above.

Classes
EndpointSetup dataclass

Class for storing supported endpoint information.

See track_all_costs for usage.

Functions
get_instances classmethod
get_instances() -> Generator[InstanceRefMixin]

Get all instances of the class.

__rich_repr__
__rich_repr__() -> Result

Requirement for pretty printing using the rich package.

load staticmethod
load(obj, *args, **kwargs)

Deserialize/load this object using the class information in tru_class_info to lookup the actual class that will do the deserialization.

model_validate classmethod
model_validate(*args, **kwargs) -> Any

Deserialized a jsonized version of the app into the instance of the class it was serialized from.

Note

This process uses extra information stored in the jsonized object and handled by WithClassInfo.

pace_me
pace_me() -> float

Block until we can make a request to this endpoint to keep pace with maximum rpm. Returns time in seconds since last call to this method returned.

run_in_pace
run_in_pace(
    func: Callable[[A], B], *args, **kwargs
) -> B

Run the given func on the given args and kwargs at pace with the endpoint-specified rpm. Failures will be retried self.retries times.

run_me
run_me(thunk: Thunk[T]) -> T

DEPRECATED: Run the given thunk, returning itse output, on pace with the api. Retries request multiple times if self.retries > 0.

DEPRECATED: Use run_in_pace instead.

print_instrumented classmethod
print_instrumented()

Print out all of the methods that have been instrumented for cost tracking. This is organized by the classes/modules containing them.

track_all_costs staticmethod
track_all_costs(
    __func: CallableMaybeAwaitable[A, T],
    *args,
    with_openai: bool = True,
    with_hugs: bool = True,
    with_litellm: bool = True,
    with_bedrock: bool = True,
    with_cortex: bool = True,
    with_dummy: bool = True,
    **kwargs
) -> Tuple[T, Sequence[EndpointCallback]]

Track costs of all of the apis we can currently track, over the execution of thunk.

track_all_costs_tally staticmethod
track_all_costs_tally(
    __func: CallableMaybeAwaitable[A, T],
    *args,
    with_openai: bool = True,
    with_hugs: bool = True,
    with_litellm: bool = True,
    with_bedrock: bool = True,
    with_cortex: bool = True,
    with_dummy: bool = True,
    **kwargs
) -> Tuple[T, Thunk[Cost]]

Track costs of all of the apis we can currently track, over the execution of thunk.

RETURNS DESCRIPTION
T

Result of evaluating the thunk.

TYPE: T

Thunk[Cost]

Thunk[Cost]: A thunk that returns the total cost of all callbacks that tracked costs. This is a thunk as the costs might change after this method returns in case of Awaitable results.

track_cost
track_cost(
    __func: CallableMaybeAwaitable[..., T], *args, **kwargs
) -> Tuple[T, EndpointCallback]

Tally only the usage performed within the execution of the given thunk.

Returns the thunk's result alongside the EndpointCallback object that includes the usage information.

wrap_function
wrap_function(func)

Create a wrapper of the given function to perform cost tracking.