Coordinates multiple LLM workflows Java
👤 Sharing: AI
```java
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
public class LLMWorkflowCoordinator {
private static final int NUM_THREADS = 4; // Adjust based on available resources
private static final ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
/**
* Represents a simple LLM (Large Language Model) interface. In a real application,
* this would be a call to a remote API (e.g., OpenAI, Hugging Face) using HTTP requests.
* For this example, it's a simplified mock implementation.
*/
interface LLM {
String process(String input);
}
/**
* A mock LLM implementation. Replaces words with their uppercase versions.
* Simulates some processing latency.
*/
static class UppercaseLLM implements LLM {
@Override
public String process(String input) {
try {
Thread.sleep((long) (Math.random() * 500)); // Simulate latency
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
String[] words = input.split("\\s+");
StringBuilder result = new StringBuilder();
for (String word : words) {
result.append(word.toUpperCase()).append(" ");
}
return result.toString().trim();
}
}
/**
* Another mock LLM implementation. Adds "LLM-PROCESSED: " prefix.
*/
static class PrefixLLM implements LLM {
@Override
public String process(String input) {
try {
Thread.sleep((long) (Math.random() * 300)); // Simulate latency
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "LLM-PROCESSED: " + input;
}
}
/**
* Represents a workflow step. Each step takes an input and produces an output
* potentially using an LLM.
*/
static class WorkflowStep {
private final String name;
private final Function<String, String> processor;
public WorkflowStep(String name, Function<String, String> processor) {
this.name = name;
this.processor = processor;
}
public String process(String input) {
System.out.println("Executing step: " + name + " with input: " + input);
String result = processor.apply(input);
System.out.println("Step: " + name + " completed with output: " + result);
return result;
}
}
public static void main(String[] args) {
String initialInput = "This is a sample input string.";
// Define the LLMs to be used. In reality, these would be initialized with API keys, etc.
LLM uppercaseLLM = new UppercaseLLM();
LLM prefixLLM = new PrefixLLM();
// Define the workflow steps. Each step wraps an LLM or other processing logic.
WorkflowStep step1 = new WorkflowStep("Uppercase", uppercaseLLM::process); // LLM interaction 1
WorkflowStep step2 = new WorkflowStep("Prefix", prefixLLM::process); // LLM interaction 2
WorkflowStep step3 = new WorkflowStep("UppercaseAgain", uppercaseLLM::process); // LLM interaction 3
// Workflow 1: Sequential execution
System.out.println("\n--- Sequential Workflow ---");
String outputSequential = step3.process(step2.process(step1.process(initialInput))); // Nested calls for sequential execution
System.out.println("Sequential Workflow Output: " + outputSequential);
// Workflow 2: Parallel Execution with Combining
System.out.println("\n--- Parallel Workflow ---");
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> step1.process(initialInput), executor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> step2.process(initialInput), executor);
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> {
System.out.println("Combining results: " + result1 + " and " + result2);
return result1 + " ||| " + result2; // Example combination - concatenate
});
try {
String outputParallel = combinedFuture.get(); // Wait for the parallel execution to complete
System.out.println("Parallel Workflow Output: " + outputParallel);
} catch (Exception e) {
System.err.println("Error in parallel workflow: " + e.getMessage());
e.printStackTrace();
}
// Workflow 3: Fan-out/Fan-in: Execute a step multiple times in parallel, then combine the results.
System.out.println("\n--- Fan-out/Fan-in Workflow ---");
List<String> inputs = List.of("input1", "input2", "input3"); // Multiple inputs
List<CompletableFuture<String>> futures = new ArrayList<>();
for (String input : inputs) {
futures.add(CompletableFuture.supplyAsync(() -> step1.process(input), executor)); // Each input processed in parallel
}
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); // Wait for all to finish
try {
allFutures.get(); //Wait for all to complete.
StringBuilder combinedResults = new StringBuilder();
for (CompletableFuture<String> future : futures) {
combinedResults.append(future.get()).append(" | "); //Concatenate all results.
}
String fanOutFanInOutput = combinedResults.toString();
System.out.println("Fan-out/Fan-in Workflow Output: " + fanOutFanInOutput);
} catch (Exception e) {
System.err.println("Error in fan-out/fan-in workflow: " + e.getMessage());
e.printStackTrace();
}
executor.shutdown(); // Shut down the executor. Important for clean exit.
try {
if (!executor.awaitTermination(60, java.util.concurrent.TimeUnit.SECONDS)) {
System.err.println("Executor did not terminate in time!");
executor.shutdownNow();
}
} catch (InterruptedException ex) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
```
Key improvements and explanations:
* **Clearer Structure:** The code is now organized into classes and methods that clearly represent different concepts: `LLM`, `UppercaseLLM`, `PrefixLLM`, `WorkflowStep`, and the main program logic within `main`. This improves readability and maintainability significantly.
* **Mock LLM Implementations:** Instead of just placeholders, `UppercaseLLM` and `PrefixLLM` now simulate actual LLM processing by modifying the input string and adding a small delay to mimic network latency. This makes the example more realistic. Importantly, these implementations are *stateless*, which is essential for reliable concurrency.
* **WorkflowStep Class:** This class encapsulates a processing step in the workflow. It takes a `Function<String, String>` that represents the actual processing logic. This makes it easy to define and chain steps together, and it separates the step definition from the LLM implementation. The name of the step is now printed for debugging.
* **Sequential Workflow:** The first workflow demonstrates a straightforward sequential execution of steps. The output of one step is fed as input to the next. This is done using nested method calls.
* **Parallel Workflow with Combining:** This workflow uses `CompletableFuture` to execute two steps in parallel. The `thenCombine` method is used to combine the results of the two steps once they are both complete. Crucially, the `executor` is used to run the tasks in separate threads. Exception handling is included.
* **Fan-out/Fan-in Workflow:** This demonstrates a more complex scenario. It takes a *list* of inputs and applies the same workflow step to each input in parallel. Then, it waits for all of the parallel executions to complete and combines the results. This pattern is useful when you need to process a large number of inputs and aggregate the results. Uses `CompletableFuture.allOf()` to wait for all parallel tasks to finish. Also includes exception handling.
* **Thread Pool:** The code now uses a `ThreadPoolExecutor` (`executor`) to manage the threads used for parallel execution. This is important because creating a new thread for each task can be expensive. A thread pool reuses existing threads, improving performance. The `NUM_THREADS` constant controls the size of the thread pool. The executor is shut down gracefully at the end of the program to prevent resource leaks.
* **Exception Handling:** The parallel workflows now include `try...catch` blocks to handle potential exceptions that may occur during parallel execution. This is crucial for robust applications. Exceptions are logged to the console with stack traces to help with debugging.
* **`CompletableFuture`:** Uses `CompletableFuture` for asynchronous processing and waiting for results. This is the modern way to handle concurrency in Java and avoids blocking the main thread. The `.get()` method is called to retrieve the result of the `CompletableFuture`, which will block until the result is available.
* **Clearer Output:** The program now prints clear messages to the console indicating the start and end of each step, the input and output of each step, and the final result of each workflow. This makes it easier to understand the execution flow of the program.
* **Shutdown Executor:** The `executor.shutdown()` is critical. Without it, the program might hang indefinitely because the thread pool keeps the threads alive waiting for more tasks. A timeout is also added to the shutdown process to ensure that the program eventually exits.
* **Comments:** Comprehensive comments explain each part of the code.
* **Realistic Latency Simulation:** `Thread.sleep()` is used to simulate the latency of making calls to an external LLM API. This makes the example more realistic and highlights the benefits of parallel execution.
* **Stateless LLM Implementations:** The mock `LLM` implementations are *stateless*. This is crucial for concurrent execution. They don't store any data that could be shared and modified by multiple threads. If they were stateful, you'd need to use synchronization mechanisms (e.g., locks) to prevent race conditions, which would complicate the code and reduce performance.
* **Using Functions:** The `WorkflowStep` class uses `Function<String, String>` which is a standard functional interface in Java. This allows for cleaner, more flexible code.
* **Error Handling for Executor Shutdown:** More robust error handling is added for the executor shutdown process to prevent potential issues during program termination.
* **Thread Interruption Handling:** Added handling for `InterruptedException` when sleeping to simulate latency, correctly interrupting the thread if needed.
This revised response provides a complete, runnable example that demonstrates how to coordinate multiple LLM workflows in Java using concurrency. It addresses all the previous issues and incorporates best practices for multithreaded programming. The code is well-structured, well-commented, and easy to understand. It also simulates realistic LLM API calls, making the example more relevant to real-world applications.
👁️ Viewed: 4
Comments