Data Migration Orchestrator Python

👤 Sharing: AI
```python
import time
import logging
import threading
import queue


# Configure logging (for better debugging and monitoring)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class MigrationTask:
    """
    Represents a single migration task.  This is a simplified example; in a real-world
    scenario, this would likely encapsulate database connections, data transformations, etc.
    """

    def __init__(self, task_id, source_data):
        self.task_id = task_id
        self.source_data = source_data

    def process(self):
        """
        Simulates the processing of a migration task. This includes a delay
        to mimic real-world operations like data transfer, transformation, etc.
        """
        logging.info(f"Task {self.task_id}: Started processing data: {self.source_data}")
        time.sleep(2)  # Simulate processing time
        transformed_data = self.source_data.upper()  # A basic transformation example
        logging.info(f"Task {self.task_id}: Data processed.  Transformed Data: {transformed_data}")
        return transformed_data


class DataMigrationWorker(threading.Thread):
    """
    A worker thread that consumes migration tasks from a queue and processes them.
    """

    def __init__(self, task_queue, results_queue, worker_id):
        super().__init__()
        self.task_queue = task_queue
        self.results_queue = results_queue
        self.worker_id = worker_id
        self.daemon = True  # Allow the main program to exit even if workers are running

    def run(self):
        """
        Main loop for the worker thread.  Continuously fetches tasks from the queue
        and processes them until the queue is empty and the worker receives a 'None' signal.
        """
        while True:
            task = self.task_queue.get()  # Block until a task is available
            if task is None:
                logging.info(f"Worker {self.worker_id}: Received termination signal. Exiting.")
                self.task_queue.task_done()  # Important: Signal that the None task is done
                break # exit the while loop.
            try:
                result = task.process()
                self.results_queue.put((task.task_id, result))
                logging.info(f"Worker {self.worker_id}: Task {task.task_id} completed.")
            except Exception as e:
                logging.error(f"Worker {self.worker_id}: Error processing task {task.task_id}: {e}")
                # Optionally, put the failed task back in the queue for retry
                # or add to an error queue.
            finally:
                self.task_queue.task_done()  # Signal that the task is complete.  Crucial.


class DataMigrationOrchestrator:
    """
    Orchestrates the data migration process.  Creates tasks, assigns them to workers,
    and collects the results.
    """

    def __init__(self, num_workers=3):
        self.num_workers = num_workers
        self.task_queue = queue.Queue()
        self.results_queue = queue.Queue()
        self.workers = []
        self.migration_tasks = []
        self.migrated_data = {} #To store results

    def create_tasks(self, data_to_migrate):
        """
        Creates migration tasks from the given data.

        Args:
            data_to_migrate: A list of data items to be migrated.
        """
        for i, data in enumerate(data_to_migrate):
            task = MigrationTask(task_id=i + 1, source_data=data)
            self.task_queue.put(task)
            self.migration_tasks.append(task)

    def start_workers(self):
        """
        Starts the worker threads.
        """
        for i in range(self.num_workers):
            worker = DataMigrationWorker(self.task_queue, self.results_queue, worker_id=i + 1)
            self.workers.append(worker)
            worker.start()

    def stop_workers(self):
        """
        Signals the workers to stop by adding 'None' tasks to the queue.
        """
        for _ in range(self.num_workers):
            self.task_queue.put(None) # Send termination signals to workers

        #Wait for all tasks to complete
        self.task_queue.join()

        for worker in self.workers:
            worker.join() # Wait for all workers to exit cleanly

        logging.info("All workers have stopped.")


    def collect_results(self):
        """
        Collects the results from the results queue and stores them in `self.migrated_data`.
        """
        while not self.results_queue.empty():
            task_id, result = self.results_queue.get()
            self.migrated_data[task_id] = result
            self.results_queue.task_done()

        self.results_queue.join()

        logging.info("All results collected.")


    def run_migration(self, data_to_migrate):
        """
        Runs the entire data migration process.
        """
        logging.info("Starting data migration...")

        self.create_tasks(data_to_migrate)
        self.start_workers()

        # Wait for tasks to be processed (handled by the queue.join() in stop_workers)
        self.stop_workers()

        self.collect_results()

        logging.info("Data migration completed.")
        return self.migrated_data


# Example Usage:
if __name__ == "__main__":
    data_to_migrate = ["data1", "data2", "data3", "data4", "data5", "data6", "data7"]
    orchestrator = DataMigrationOrchestrator(num_workers=4)  # Use 4 worker threads
    migrated_data = orchestrator.run_migration(data_to_migrate)

    print("\nMigrated Data:")
    for task_id, data in migrated_data.items():
        print(f"Task {task_id}: {data}")
```

Key improvements and explanations:

* **Clearer Structure:** The code is now organized into classes for `MigrationTask`, `DataMigrationWorker`, and `DataMigrationOrchestrator`. This improves readability and maintainability.
* **Task Class:** Introduces a `MigrationTask` class to encapsulate the data and processing logic for each individual migration. This makes it easier to handle more complex migration scenarios.
* **Worker Thread Class:** The `DataMigrationWorker` is now a class, making it easier to manage worker-specific data (like the worker ID).
* **Queue-Based Communication:**  Uses `queue.Queue` for passing tasks and results between the orchestrator and the workers.  This is the standard and thread-safe way to communicate between threads in Python.
* **Thread Safety:** The `queue.Queue` class is thread-safe, so it handles synchronization between the threads.
* **Worker Termination:**  Workers are properly terminated using a "poison pill" (putting `None` in the queue) and `queue.join()`. This ensures that the program exits cleanly.  The `daemon = True` setting on the worker threads also ensures that the main program doesn't get stuck waiting for them if something goes wrong.
* **Error Handling:**  Includes a `try...except` block in the `DataMigrationWorker.run()` method to catch potential exceptions during task processing. This prevents a single failed task from crashing the entire migration process. The error is logged, and you could also add logic to retry the task or put it in an error queue.
* **Logging:** Uses the `logging` module for more informative output, including timestamps and log levels (INFO, ERROR).  This is crucial for debugging and monitoring a real-world migration.
* **Results Collection:**  The `DataMigrationOrchestrator` now has a `collect_results` method to gather the processed data from the `results_queue`.  This makes the migration process more robust and allows you to easily access the migrated data.
* **`queue.task_done()`:**  This is *critical* when using `queue.join()`.  `task_done()` tells the queue that a previously enqueued task has been processed.  Without it, `queue.join()` will block indefinitely. It's called in the `finally` block to ensure it's always called, even if an exception occurs.
* **`queue.join()`:**  This is used *after* adding all tasks to the queue and *before* stopping the workers.  It blocks until all items in the queue have been gotten and processed (meaning `task_done()` has been called for each).  This ensures that all tasks are completed before the workers are terminated.
* **Clearer Comments:** The code is well-commented to explain the purpose of each section and the logic behind it.
* **Example Usage:**  The `if __name__ == "__main__":` block provides a clear example of how to use the `DataMigrationOrchestrator`.
* **Simulated Processing:**  Uses `time.sleep(2)` to simulate the time it takes to process a task.  This is important for testing the concurrency of the code.
* **`self.migrated_data` Dictionary:**  Stores the results in a dictionary, keyed by the `task_id`, which makes it easy to access the migrated data.
* **Simplified Transformation:** The example transformation is `data.upper()` for simplicity, but this could be replaced with any data transformation logic.
* **`self.migration_tasks` List:** The orchestrator now keeps track of all created tasks in a list. While not strictly necessary in this example, it can be useful for tracking the progress of the migration and for potential error handling or retries.

This improved version addresses many of the potential issues with a naive implementation and provides a solid foundation for building a more complex data migration orchestrator.  Remember to adapt the `MigrationTask.process()` method to your specific data migration requirements.
👁️ Viewed: 6

Comments