Skip to content

Core

Chat handling utilities for async LLM requests.

Provides a simple queued interface (ChatQueue) and a static facade (ChatHandler) to interact with Azure AI Inference models, supporting both regular and streaming responses.

ChatHandler

Source code in app/core/chat_handler.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
class ChatHandler:
    queue = ChatQueue()

    @staticmethod
    async def init():
        logger.debug("Started queue")
        await ChatHandler.queue.start()

    @staticmethod
    async def chat(
        message, context="", backend="azure", model="gpt-4o-mini", stream=False
    ):
        return await ChatHandler.queue.submit(message, context, backend, model, stream)

    @staticmethod
    async def dispatch(message, context, backend, model):
        logger.debug(f"Dispatch Chat to backend {backend} with {message}, {context}")
        """Route chat requests to the correct backend."""
        if backend.lower() == "azure":
            return await ChatHandler._azure_chat(message, context, model)
        else:
            raise ValueError(f"Unknown chat backend: {backend}")

    @staticmethod
    async def dispatch_stream(message, context, backend, model):
        logger.debug(
            f"Dispatch Chat Stream to backend {backend} with {message}, {context}"
        )
        if backend.lower() == "azure":
            async for chunk in ChatHandler._azure_chat_stream(message, context, model):
                yield chunk
        else:
            raise ValueError(f"Unknown chat backend: {backend}")

    # --- Synchronous helpers for structured message payloads (incl. images) ---
    @staticmethod
    def chat_sync_messages(
        messages: List[dict], backend: str = "azure", model: str = "gpt-4o-mini"
    ) -> str:
        """Synchronously send a list of chat messages (optionally with images) to Azure."""
        logger.debug(f"Dispatch synchronous Chat to backend {backend} with {messages}")
        if backend.lower() == "azure":
            return ChatHandler._azure_chat_messages_sync(messages, model)
        else:
            raise ValueError(f"Unknown chat backend: {backend}")

    # --- Streaming helpers for structured message payloads (incl. images) ---
    @staticmethod
    def chat_stream_messages_sync(
        messages: List[dict], backend: str = "azure", model: str = "gpt-4o-mini"
    ):
        """Synchronously stream a list of chat messages (optionally with images).

        Yields incremental text chunks from Azure as they arrive.
        """
        logger.debug("Stream chat messages synchronously")
        if backend.lower() == "azure":
            return ChatHandler._azure_chat_messages_stream_sync(messages, model)
        else:
            raise ValueError(f"Unknown chat backend: {backend}")

    @staticmethod
    def _azure_chat_messages_stream_sync(messages: List[dict], model: str):
        client = ChatHandler._get_azure_client()
        azure_msgs = ChatHandler._to_azure_messages(messages)
        logger.debug("Azure client stream in chunks")
        stream = client.complete(model=model, messages=azure_msgs, stream=True)
        for event in stream:
            if hasattr(event, "delta") and event.delta:
                delta = event.delta
                content = getattr(delta, "content", None)
                if content is None and isinstance(delta, dict):
                    content = delta.get("content")
                if isinstance(content, str):
                    if content:
                        yield content
                elif isinstance(content, list):
                    for item in content:
                        if isinstance(item, str):
                            if item:
                                yield item
                        elif isinstance(item, dict):
                            text = item.get("text")
                            if text:
                                yield text
                        else:
                            text = getattr(item, "text", None)
                            if text:
                                yield text
        if hasattr(stream, "close"):
            stream.close()

    @staticmethod
    def _azure_chat_messages_sync(messages: List[dict], model: str) -> str:
        """Synchronous Azure chat that supports text and images.

        Accepts our internal message dicts of the form:
          {"role": "system"|"user", "content": str | None, "images": [base64str, ...]?}

        Converts them into Azure AI Inference message objects. For messages that
        include images, constructs a single UserMessage with a mixed content list
        containing a text item (when provided) and one input_image item per image.
        """
        client = ChatHandler._get_azure_client()
        logger.debug("Azure client init stream")
        azure_msgs = ChatHandler._to_azure_messages(messages)
        logger.debug(f"Azure message dict {azure_msgs}")
        response = client.complete(model=model, messages=azure_msgs)
        return ChatHandler._extract_azure_text_from_response(response)

    @staticmethod
    def _to_azure_messages(msgs: List[dict]) -> List[object]:
        """Convert internal message dicts to Azure AI Inference message objects."""
        azure_messages: List[object] = []

        for m in msgs:
            role = (m.get("role") or "user").lower()
            text_content = m.get("content")
            images = m.get("images") or []

            if not images:
                if role == "system":
                    azure_messages.append(SystemMessage(content=text_content or ""))
                else:
                    azure_messages.append(UserMessage(content=text_content or ""))
                continue

            mixed_content: List[object] = []
            if text_content:
                mixed_content.append({"type": "text", "text": text_content})
            for b64 in images:
                if not isinstance(b64, str) or not b64:
                    continue
                mixed_content.append(
                    {
                        "type": "image_url",
                        "image_url": {"url": f"data:image/png;base64,{b64}"},
                    }
                )

            # Azure doesn't accept images in system role; downgrade to user
            azure_messages.append(UserMessage(content=mixed_content))

        return azure_messages

    @staticmethod
    def _get_azure_client():
        """Initialize Azure AI Foundry chat client."""
        endpoint = os.getenv("AZURE_OPENAI_ENDPOINT_LARGE_MODEL")
        api_key = os.getenv("AZURE_OPENAI_KEY")
        if not endpoint or not api_key:
            raise RuntimeError(
                "Missing AZURE_OPENAI_ENDPOINT_LARGE_MODEL or AZURE_OPENAI_KEY environment variables"
            )
        logger.debug("Endpoint and API Key Exsists")
        return ChatCompletionsClient(
            endpoint=endpoint, credential=AzureKeyCredential(api_key)
        )

    @staticmethod
    async def _azure_chat(message, context, model):
        """Non-streaming chat call using Azure GPT-4o-mini."""
        client = ChatHandler._get_azure_client()
        messages = [
            SystemMessage(content=context or "You are a helpful assistant."),
            UserMessage(content=message),
        ]
        loop = asyncio.get_event_loop()
        response = await loop.run_in_executor(
            None, lambda: client.complete(model=model, messages=messages)
        )
        logger.debug("Azure chat async works")
        return ChatHandler._extract_azure_text_from_response(response)

    @staticmethod
    async def _azure_chat_stream(message, context, model):
        """Streaming chat call using Azure GPT-4o-mini."""
        client = ChatHandler._get_azure_client()
        messages = [
            SystemMessage(content=context or "You are a helpful assistant."),
            UserMessage(content=message),
        ]

        loop = asyncio.get_event_loop()

        def sync_stream():
            return client.complete(
                model=model,
                messages=messages,
                stream=True,
            )

        # Run the sync generator in a thread and forward chunks asynchronously
        stream = await loop.run_in_executor(None, sync_stream)
        for event in stream:
            if hasattr(event, "delta") and event.delta:
                delta = event.delta
                content = getattr(delta, "content", None)
                if content is None and isinstance(delta, dict):
                    content = delta.get("content")
                # content may be str or list of items (with text)
                if isinstance(content, str):
                    if content:
                        yield content
                elif isinstance(content, list):
                    for item in content:
                        if isinstance(item, str):
                            if item:
                                yield item
                        elif isinstance(item, dict):
                            text = item.get("text")
                            if text:
                                yield text
                        else:
                            text = getattr(item, "text", None)
                            if text:
                                yield text
        logger.debug("Azure chat stream works")
        if hasattr(stream, "close"):
            stream.close()

    @staticmethod
    def _extract_azure_text_from_response(response) -> str:
        """Extract text content from Azure ChatCompletions response across SDK variants."""
        # Try choices-based response (common structure)
        try:
            choice0 = response.choices[0]
            message = getattr(choice0, "message", None) or choice0["message"]
            content = getattr(message, "content", None)
            if content is None and isinstance(message, dict):
                content = message.get("content")
            # content can be str or list of content parts
            if isinstance(content, str):
                return content.strip()
            if isinstance(content, list):
                parts = []
                for part in content:
                    if isinstance(part, str):
                        parts.append(part)
                    elif isinstance(part, dict):
                        txt = part.get("text")
                        if txt:
                            parts.append(txt)
                    else:
                        txt = getattr(part, "text", None)
                        if txt:
                            parts.append(txt)
                return "".join(parts).strip()
        except Exception:
            pass
        # Fallback older shape: output_message.content[0].text
        try:
            return response.output_message.content[0].text.strip()
        except Exception:
            pass
        # Last resort: str(response)
        return str(response)

chat_stream_messages_sync(messages, backend='azure', model='gpt-4o-mini') staticmethod

Synchronously stream a list of chat messages (optionally with images).

Yields incremental text chunks from Azure as they arrive.

Source code in app/core/chat_handler.py
129
130
131
132
133
134
135
136
137
138
139
140
141
@staticmethod
def chat_stream_messages_sync(
    messages: List[dict], backend: str = "azure", model: str = "gpt-4o-mini"
):
    """Synchronously stream a list of chat messages (optionally with images).

    Yields incremental text chunks from Azure as they arrive.
    """
    logger.debug("Stream chat messages synchronously")
    if backend.lower() == "azure":
        return ChatHandler._azure_chat_messages_stream_sync(messages, model)
    else:
        raise ValueError(f"Unknown chat backend: {backend}")

chat_sync_messages(messages, backend='azure', model='gpt-4o-mini') staticmethod

Synchronously send a list of chat messages (optionally with images) to Azure.

Source code in app/core/chat_handler.py
117
118
119
120
121
122
123
124
125
126
@staticmethod
def chat_sync_messages(
    messages: List[dict], backend: str = "azure", model: str = "gpt-4o-mini"
) -> str:
    """Synchronously send a list of chat messages (optionally with images) to Azure."""
    logger.debug(f"Dispatch synchronous Chat to backend {backend} with {messages}")
    if backend.lower() == "azure":
        return ChatHandler._azure_chat_messages_sync(messages, model)
    else:
        raise ValueError(f"Unknown chat backend: {backend}")

ChatQueue

Async work queue for chat requests, supporting streaming and non-streaming calls.

Source code in app/core/chat_handler.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class ChatQueue:
    """Async work queue for chat requests, supporting streaming and non-streaming calls."""

    def __init__(self, num_workers=4):
        self.queue = asyncio.Queue()
        self.tasks = []
        self.num_workers = num_workers
        self.semaphore = asyncio.Semaphore(num_workers)

    async def start(self):
        logger.debug("Started Chat task")
        self.tasks = [
            asyncio.create_task(self.worker()) for _ in range(self.num_workers)
        ]

    async def worker(self):
        while True:
            fut, message, context, backend, model, stream, stream_queue = (
                await self.queue.get()
            )
            try:
                if stream:
                    async for chunk in ChatHandler.dispatch_stream(
                        message, context, backend, model
                    ):
                        await stream_queue.put(chunk)
                    await stream_queue.put(None)
                    fut.set_result(True)
                else:
                    result = await ChatHandler.dispatch(
                        message, context, backend, model
                    )
                    fut.set_result(result)
            except Exception as e:
                if stream:
                    await stream_queue.put(f"[ERROR] {str(e)}")
                    await stream_queue.put(None)
                fut.set_exception(e)
            finally:
                self.queue.task_done()

    async def submit(
        self, message, context="", backend="azure", model="gpt-4o-mini", stream=False
    ):
        async with self.semaphore:

            logger.debug(
                f"Submitted {message} message with backend {backend}, model {model}, stream {stream}"
            )
            if stream:

                async def stream_gen():
                    async for chunk in ChatHandler.dispatch_stream(
                        message, context, backend, model
                    ):
                        yield chunk

                return stream_gen()
            else:
                return await ChatHandler.dispatch(message, context, backend, model)

render_template(jinja_environment, template_name, **kwargs)

Renders a Jinja2 template with the given arguments.

Source code in app/core/utils.py
 8
 9
10
11
12
13
14
15
16
17
18
19
def render_template(
    jinja_environment: Environment, template_name: str, **kwargs
) -> str:
    """
    Renders a Jinja2 template with the given arguments.
    """
    template = jinja_environment.get_template(template_name)
    try:
        return template.render(**kwargs)
    except Exception as e:
        logger.error("Template render failed: %s", str(e))
        return ""

configure_service_logging(service_name)

Set up a RotatingFileHandler for the given service name.

Reads
  • ALFIE_LOG_DIR — directory for log files (default: ./logs)
  • ALFIE_LOG_LEVEL — logging level name (default: INFO)

Creates logs/.log with maxBytes=10 MB, backupCount=5. Attaches the handler to the root logger so all module loggers inherit it. Safe to call multiple times; duplicate handlers are not added.

Source code in app/core/logging.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def configure_service_logging(service_name: str) -> None:
    """Set up a RotatingFileHandler for the given service name.

    Reads:
      - ALFIE_LOG_DIR   — directory for log files (default: ./logs)
      - ALFIE_LOG_LEVEL — logging level name (default: INFO)

    Creates logs/<service_name>.log with maxBytes=10 MB, backupCount=5.
    Attaches the handler to the root logger so all module loggers inherit it.
    Safe to call multiple times; duplicate handlers are not added.
    """
    log_dir = os.getenv("ALFIE_LOG_DIR", os.path.join(os.getcwd(), "logs"))
    log_level_name = os.getenv("ALFIE_LOG_LEVEL", "INFO").upper()
    log_level = getattr(logging, log_level_name, logging.INFO)

    os.makedirs(log_dir, exist_ok=True)
    log_file = os.path.join(log_dir, f"{service_name}.log")

    formatter = logging.Formatter(
        fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )

    root_logger = logging.getLogger()
    root_logger.setLevel(log_level)

    # Avoid adding duplicate handlers if called more than once
    existing_files = {
        h.baseFilename
        for h in root_logger.handlers
        if isinstance(h, RotatingFileHandler)
    }
    if log_file in existing_files:
        return

    file_handler = RotatingFileHandler(
        log_file,
        maxBytes=_TEN_MB,
        backupCount=5,
        encoding="utf-8",
    )
    file_handler.setLevel(log_level)
    file_handler.setFormatter(formatter)
    root_logger.addHandler(file_handler)