82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338 | class ChatHandler:
queue = ChatQueue()
@staticmethod
async def init():
logger.debug("Started queue")
await ChatHandler.queue.start()
@staticmethod
async def chat(
message, context="", backend="azure", model="gpt-4o-mini", stream=False
):
return await ChatHandler.queue.submit(message, context, backend, model, stream)
@staticmethod
async def dispatch(message, context, backend, model):
logger.debug(f"Dispatch Chat to backend {backend} with {message}, {context}")
"""Route chat requests to the correct backend."""
if backend.lower() == "azure":
return await ChatHandler._azure_chat(message, context, model)
else:
raise ValueError(f"Unknown chat backend: {backend}")
@staticmethod
async def dispatch_stream(message, context, backend, model):
logger.debug(
f"Dispatch Chat Stream to backend {backend} with {message}, {context}"
)
if backend.lower() == "azure":
async for chunk in ChatHandler._azure_chat_stream(message, context, model):
yield chunk
else:
raise ValueError(f"Unknown chat backend: {backend}")
# --- Synchronous helpers for structured message payloads (incl. images) ---
@staticmethod
def chat_sync_messages(
messages: List[dict], backend: str = "azure", model: str = "gpt-4o-mini"
) -> str:
"""Synchronously send a list of chat messages (optionally with images) to Azure."""
logger.debug(f"Dispatch synchronous Chat to backend {backend} with {messages}")
if backend.lower() == "azure":
return ChatHandler._azure_chat_messages_sync(messages, model)
else:
raise ValueError(f"Unknown chat backend: {backend}")
# --- Streaming helpers for structured message payloads (incl. images) ---
@staticmethod
def chat_stream_messages_sync(
messages: List[dict], backend: str = "azure", model: str = "gpt-4o-mini"
):
"""Synchronously stream a list of chat messages (optionally with images).
Yields incremental text chunks from Azure as they arrive.
"""
logger.debug("Stream chat messages synchronously")
if backend.lower() == "azure":
return ChatHandler._azure_chat_messages_stream_sync(messages, model)
else:
raise ValueError(f"Unknown chat backend: {backend}")
@staticmethod
def _azure_chat_messages_stream_sync(messages: List[dict], model: str):
client = ChatHandler._get_azure_client()
azure_msgs = ChatHandler._to_azure_messages(messages)
logger.debug("Azure client stream in chunks")
stream = client.complete(model=model, messages=azure_msgs, stream=True)
for event in stream:
if hasattr(event, "delta") and event.delta:
delta = event.delta
content = getattr(delta, "content", None)
if content is None and isinstance(delta, dict):
content = delta.get("content")
if isinstance(content, str):
if content:
yield content
elif isinstance(content, list):
for item in content:
if isinstance(item, str):
if item:
yield item
elif isinstance(item, dict):
text = item.get("text")
if text:
yield text
else:
text = getattr(item, "text", None)
if text:
yield text
if hasattr(stream, "close"):
stream.close()
@staticmethod
def _azure_chat_messages_sync(messages: List[dict], model: str) -> str:
"""Synchronous Azure chat that supports text and images.
Accepts our internal message dicts of the form:
{"role": "system"|"user", "content": str | None, "images": [base64str, ...]?}
Converts them into Azure AI Inference message objects. For messages that
include images, constructs a single UserMessage with a mixed content list
containing a text item (when provided) and one input_image item per image.
"""
client = ChatHandler._get_azure_client()
logger.debug("Azure client init stream")
azure_msgs = ChatHandler._to_azure_messages(messages)
logger.debug(f"Azure message dict {azure_msgs}")
response = client.complete(model=model, messages=azure_msgs)
return ChatHandler._extract_azure_text_from_response(response)
@staticmethod
def _to_azure_messages(msgs: List[dict]) -> List[object]:
"""Convert internal message dicts to Azure AI Inference message objects."""
azure_messages: List[object] = []
for m in msgs:
role = (m.get("role") or "user").lower()
text_content = m.get("content")
images = m.get("images") or []
if not images:
if role == "system":
azure_messages.append(SystemMessage(content=text_content or ""))
else:
azure_messages.append(UserMessage(content=text_content or ""))
continue
mixed_content: List[object] = []
if text_content:
mixed_content.append({"type": "text", "text": text_content})
for b64 in images:
if not isinstance(b64, str) or not b64:
continue
mixed_content.append(
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{b64}"},
}
)
# Azure doesn't accept images in system role; downgrade to user
azure_messages.append(UserMessage(content=mixed_content))
return azure_messages
@staticmethod
def _get_azure_client():
"""Initialize Azure AI Foundry chat client."""
endpoint = os.getenv("AZURE_OPENAI_ENDPOINT_LARGE_MODEL")
api_key = os.getenv("AZURE_OPENAI_KEY")
if not endpoint or not api_key:
raise RuntimeError(
"Missing AZURE_OPENAI_ENDPOINT_LARGE_MODEL or AZURE_OPENAI_KEY environment variables"
)
logger.debug("Endpoint and API Key Exsists")
return ChatCompletionsClient(
endpoint=endpoint, credential=AzureKeyCredential(api_key)
)
@staticmethod
async def _azure_chat(message, context, model):
"""Non-streaming chat call using Azure GPT-4o-mini."""
client = ChatHandler._get_azure_client()
messages = [
SystemMessage(content=context or "You are a helpful assistant."),
UserMessage(content=message),
]
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None, lambda: client.complete(model=model, messages=messages)
)
logger.debug("Azure chat async works")
return ChatHandler._extract_azure_text_from_response(response)
@staticmethod
async def _azure_chat_stream(message, context, model):
"""Streaming chat call using Azure GPT-4o-mini."""
client = ChatHandler._get_azure_client()
messages = [
SystemMessage(content=context or "You are a helpful assistant."),
UserMessage(content=message),
]
loop = asyncio.get_event_loop()
def sync_stream():
return client.complete(
model=model,
messages=messages,
stream=True,
)
# Run the sync generator in a thread and forward chunks asynchronously
stream = await loop.run_in_executor(None, sync_stream)
for event in stream:
if hasattr(event, "delta") and event.delta:
delta = event.delta
content = getattr(delta, "content", None)
if content is None and isinstance(delta, dict):
content = delta.get("content")
# content may be str or list of items (with text)
if isinstance(content, str):
if content:
yield content
elif isinstance(content, list):
for item in content:
if isinstance(item, str):
if item:
yield item
elif isinstance(item, dict):
text = item.get("text")
if text:
yield text
else:
text = getattr(item, "text", None)
if text:
yield text
logger.debug("Azure chat stream works")
if hasattr(stream, "close"):
stream.close()
@staticmethod
def _extract_azure_text_from_response(response) -> str:
"""Extract text content from Azure ChatCompletions response across SDK variants."""
# Try choices-based response (common structure)
try:
choice0 = response.choices[0]
message = getattr(choice0, "message", None) or choice0["message"]
content = getattr(message, "content", None)
if content is None and isinstance(message, dict):
content = message.get("content")
# content can be str or list of content parts
if isinstance(content, str):
return content.strip()
if isinstance(content, list):
parts = []
for part in content:
if isinstance(part, str):
parts.append(part)
elif isinstance(part, dict):
txt = part.get("text")
if txt:
parts.append(txt)
else:
txt = getattr(part, "text", None)
if txt:
parts.append(txt)
return "".join(parts).strip()
except Exception:
pass
# Fallback older shape: output_message.content[0].text
try:
return response.output_message.content[0].text.strip()
except Exception:
pass
# Last resort: str(response)
return str(response)
|