From cca9a9e82d925009c8460e3faf8413d1e8cd677f Mon Sep 17 00:00:00 2001 From: James Rochabrun Date: Tue, 4 Jun 2024 22:32:23 -0700 Subject: [PATCH] Decoding run stream events (#46) * Getting stream run events * typo --- ...AssistantThreadConfigurationProvider.swift | 2 ++ .../Assistants/AssistantStreamEvent.swift | 16 +++++----- .../AssistantStreamEventObject.swift | 4 +++ .../OpenAI/Public/Service/OpenAIService.swift | 31 ++++++++++++++++++- 4 files changed, 44 insertions(+), 9 deletions(-) diff --git a/Examples/SwiftOpenAIExample/SwiftOpenAIExample/Assistants/AssistantThreadConfigurationProvider.swift b/Examples/SwiftOpenAIExample/SwiftOpenAIExample/Assistants/AssistantThreadConfigurationProvider.swift index 2a8712b..bd16bac 100644 --- a/Examples/SwiftOpenAIExample/SwiftOpenAIExample/Assistants/AssistantThreadConfigurationProvider.swift +++ b/Examples/SwiftOpenAIExample/SwiftOpenAIExample/Assistants/AssistantThreadConfigurationProvider.swift @@ -79,6 +79,8 @@ import SwiftOpenAI case nil: print("PROVIDER: tool call nil") } + case .threadRunCompleted(let runObject): + print("PROVIDER: the run is completed - \(runObject)") default: break } } diff --git a/Sources/OpenAI/Public/ResponseModels/Assistants/AssistantStreamEvent.swift b/Sources/OpenAI/Public/ResponseModels/Assistants/AssistantStreamEvent.swift index 376cab2..39d7bb1 100644 --- a/Sources/OpenAI/Public/ResponseModels/Assistants/AssistantStreamEvent.swift +++ b/Sources/OpenAI/Public/ResponseModels/Assistants/AssistantStreamEvent.swift @@ -20,35 +20,35 @@ public enum AssistantStreamEvent { /// Occurs when a run moves to a queued status. /// - data is a run - case threadRunQueued + case threadRunQueued(RunObject) /// Occurs when a run moves to an in_progress status. /// - data is a run - case threadRunInProgress + case threadRunInProgress(RunObject) /// Occurs when a run moves to a requires_action status. /// - data is a run - case threadRunRequiresAction + case threadRunRequiresAction(RunObject) /// Occurs when a run is completed. /// - data is a run - case threadRunCompleted + case threadRunCompleted(RunObject) /// Occurs when a run fails. /// - data is a run - case threadRunFailed + case threadRunFailed(RunObject) /// Occurs when a run moves to a cancelling status. /// - data is a run - case threadRunCancelling + case threadRunCancelling(RunObject) /// Occurs when a run is cancelled. /// - data is a run - case threadRunCancelled + case threadRunCancelled(RunObject) /// Occurs when a run expires. /// - data is a run - case threadRunExpired + case threadRunExpired(RunObject) /// Occurs when a run step is created. /// - data is a run step diff --git a/Sources/OpenAI/Public/ResponseModels/Assistants/AssistantStreamEventObject.swift b/Sources/OpenAI/Public/ResponseModels/Assistants/AssistantStreamEventObject.swift index 3c5bfaf..d0fbb25 100644 --- a/Sources/OpenAI/Public/ResponseModels/Assistants/AssistantStreamEventObject.swift +++ b/Sources/OpenAI/Public/ResponseModels/Assistants/AssistantStreamEventObject.swift @@ -15,6 +15,10 @@ public enum AssistantStreamEventObject: String { /// - data is a [thread](https://platform.openai.com/docs/api-reference/threads/object) case threadCreated = "thread.created" + /// Occurs during the life cycle of a run. + /// - data is a [run](https://platform.openai.com/docs/api-reference/runs/object) + case threadRun = "thread.run" + /// Occurs when a new run is created. /// - data is a [run](https://platform.openai.com/docs/api-reference/runs/object) case threadRunCreated = "thread.run.created" diff --git a/Sources/OpenAI/Public/Service/OpenAIService.swift b/Sources/OpenAI/Public/Service/OpenAIService.swift index 136956b..6835fbb 100644 --- a/Sources/OpenAI/Public/Service/OpenAIService.swift +++ b/Sources/OpenAI/Public/Service/OpenAIService.swift @@ -1180,6 +1180,35 @@ extension OpenAIService { case .threadRunStepDelta: let decoded = try self.decoder.decode(RunStepDeltaObject.self, from: data) continuation.yield(.threadRunStepDelta(decoded)) + case .threadRun: + // We expect a object of type "thread.run.SOME_STATE" in the data object + // However what we get is a `thread.run` object but we can check the status + // of the decoded run to determine the stream event. + // If we check the event line instead, this will contain the expected "event: thread.run.step.completed" for example. + // Therefore the need to stream this event in the following way. + let decoded = try self.decoder.decode(RunObject.self, from: data) + switch RunObject.Status(rawValue: decoded.status) { + case .queued: + continuation.yield(.threadRunQueued(decoded)) + case .inProgress: + continuation.yield(.threadRunInProgress(decoded)) + case .requiresAction: + continuation.yield(.threadRunRequiresAction(decoded)) + case .cancelling: + continuation.yield(.threadRunCancelling(decoded)) + case .cancelled: + continuation.yield(.threadRunCancelled(decoded)) + case .failed: + continuation.yield(.threadRunFailed(decoded)) + case .completed: + continuation.yield(.threadRunCompleted(decoded)) + case .expired: + continuation.yield(.threadRunExpired(decoded)) + default: + #if DEBUG + print("DEBUG threadRun status not found = \(try JSONSerialization.jsonObject(with: data, options: .allowFragments) as? [String: Any])") + #endif + } default: #if DEBUG print("DEBUG EVENT \(eventObject.rawValue) IGNORED = \(try JSONSerialization.jsonObject(with: data, options: .allowFragments) as? [String: Any])") @@ -1187,7 +1216,7 @@ extension OpenAIService { } } else { #if DEBUG - print("DEBUG EVENT DECODE IGNORED= \(try JSONSerialization.jsonObject(with: data, options: .allowFragments) as? [String: Any])") + print("DEBUG EVENT DECODE IGNORED = \(try JSONSerialization.jsonObject(with: data, options: .allowFragments) as? [String: Any])") #endif } } catch let DecodingError.keyNotFound(key, context) {