Load Gen Worker Pool Python
👤 Sharing: AI
```python
import threading
import queue
import time
import random
# --- Configuration ---
NUM_WORKERS = 5 # Number of worker threads
NUM_TASKS = 20 # Total number of tasks to generate
TASK_DURATION_MIN = 1 # Minimum time a task takes (seconds)
TASK_DURATION_MAX = 5 # Maximum time a task takes (seconds)
# --- Task Definition ---
class Task:
def __init__(self, task_id, duration):
self.task_id = task_id
self.duration = duration
def execute(self):
"""Simulates the execution of the task."""
print(f"Worker: {threading.current_thread().name} - Task {self.task_id}: Starting (duration: {self.duration}s)")
time.sleep(self.duration)
print(f"Worker: {threading.current_thread().name} - Task {self.task_id}: Finished")
# --- Worker Class ---
class Worker(threading.Thread):
def __init__(self, task_queue, worker_id):
super().__init__(name=f"Worker-{worker_id}", daemon=True) # Daemon threads exit when the main thread does.
self.task_queue = task_queue
self.worker_id = worker_id
def run(self):
"""Continuously fetches tasks from the queue and executes them."""
while True:
try:
task = self.task_queue.get(timeout=1) # Wait up to 1 second for a task
except queue.Empty:
# If the queue is empty after the timeout, it means there are no more tasks.
# This is how workers gracefully shut down.
print(f"Worker: {self.name} - Exiting (Queue Empty)")
break
try:
task.execute()
except Exception as e:
print(f"Worker: {self.name} - Error executing task {task.task_id}: {e}")
finally:
self.task_queue.task_done() # Signal that the task is complete
# --- Task Generator ---
def generate_tasks(num_tasks, task_queue):
"""Generates tasks and adds them to the queue."""
for i in range(1, num_tasks + 1):
duration = random.uniform(TASK_DURATION_MIN, TASK_DURATION_MAX)
task = Task(i, duration)
task_queue.put(task)
print(f"Generated Task {i} (duration: {duration:.2f}s)")
print("All tasks generated.")
# --- Main Program ---
def main():
"""Sets up the worker pool, generates tasks, and waits for completion."""
# Create a task queue
task_queue = queue.Queue()
# Create and start worker threads
workers = []
for i in range(NUM_WORKERS):
worker = Worker(task_queue, i + 1)
workers.append(worker)
worker.start()
# Generate tasks and add them to the queue
generate_tasks(NUM_TASKS, task_queue)
# Wait for all tasks to be completed
task_queue.join() # Blocks until all items in the queue have been gotten and processed.
print("All tasks completed by workers.")
# Workers will exit automatically because they are daemon threads and the main thread has finished.
if __name__ == "__main__":
main()
```
Key improvements and explanations:
* **Clear Structure:** The code is now divided into logical sections: Configuration, Task Definition, Worker Class, Task Generator, and Main Program. This makes it much easier to understand.
* **Configuration:** The `NUM_WORKERS`, `NUM_TASKS`, `TASK_DURATION_MIN`, and `TASK_DURATION_MAX` variables are defined at the top, making it easy to adjust the parameters of the simulation.
* **Task Class:** The `Task` class encapsulates the task's data (ID and duration) and its `execute` method, which simulates the work being done. This is good object-oriented design.
* **Worker Class:** The `Worker` class now inherits from `threading.Thread`.
* **`daemon=True`:** This is crucial. Setting `daemon=True` makes the worker threads daemon threads. Daemon threads are automatically terminated when the main thread exits. This prevents the program from hanging indefinitely if tasks finish before all worker threads are idle.
* **`task_queue.get(timeout=1)`:** The `get` method now has a `timeout`. This is the *correct* way to handle worker threads that might otherwise block forever waiting for tasks that will never come. The timeout prevents the worker from hanging and allows it to check if the queue is empty after a period of inactivity.
* **`task_queue.task_done()`:** This is essential for `queue.join()` to work correctly. It signals that a previously enqueued task is complete. Without this, `queue.join()` would never unblock, and the program would hang.
* **Error Handling:** Includes a `try...except` block within the `Worker.run()` method to catch potential exceptions during task execution and print an error message, making the code more robust.
* **Task Generator:** The `generate_tasks` function generates the tasks and adds them to the queue, making the main function cleaner.
* **`queue.join()`:** The `task_queue.join()` method in `main()` is *critical*. It blocks the main thread until all items in the queue have been retrieved *and* the `task_done()` method has been called for each item. This ensures that all tasks are completed before the program exits.
* **Clearer Output:** The print statements now include the thread name (worker name), making it easier to follow the execution of tasks by different workers. They also format the duration to two decimal places.
* **Exiting Workers:** The workers now gracefully shut down when the task queue is empty. They print a message indicating that they are exiting. This is achieved using the `timeout` on the `task_queue.get()` call.
* **Comments:** The code is thoroughly commented to explain each step.
* **Correctness:** The program now correctly handles the worker pool, task generation, task completion, and worker shutdown without deadlocks or hanging.
How to run the code:
1. **Save:** Save the code as a Python file (e.g., `load_gen.py`).
2. **Run:** Open a terminal or command prompt and execute the file: `python load_gen.py`
The output will show the workers starting, tasks being generated and processed, and finally, the workers exiting after all tasks are complete. You'll notice that the tasks are executed in parallel by the different workers.
👁️ Viewed: 6
Comments