AI-Driven Supply Chain Logistics Optimizer Using Reinforcement Learning Python
👤 Sharing: AI
```python
import numpy as np
import random
# --- 1. Environment Setup: The Supply Chain ---
class SupplyChainEnvironment:
"""
Simulates a simplified supply chain environment with:
- Warehouses (locations)
- Demand at each location
- Transportation costs between locations
"""
def __init__(self, num_warehouses=5, max_capacity=100, max_demand=30,
transportation_cost_range=(1, 10)):
"""
Initializes the supply chain environment.
Args:
num_warehouses (int): Number of warehouse locations.
max_capacity (int): Maximum inventory capacity at each warehouse.
max_demand (int): Maximum demand at each warehouse.
transportation_cost_range (tuple): Range for random transportation costs.
"""
self.num_warehouses = num_warehouses
self.max_capacity = max_capacity
self.max_demand = max_demand
self.transportation_cost_range = transportation_cost_range
self.inventory = np.random.randint(0, self.max_capacity // 2, size=self.num_warehouses) # Initial inventory
self.demand = np.random.randint(0, self.max_demand, size=self.num_warehouses) # Demand at each location
self.transportation_costs = np.random.randint(
self.transportation_cost_range[0], self.transportation_cost_range[1],
size=(self.num_warehouses, self.num_warehouses)
)
# Ensure transportation cost from a warehouse to itself is 0. Important for cost calculation.
np.fill_diagonal(self.transportation_costs, 0)
def reset(self):
"""Resets the environment to a new random state."""
self.inventory = np.random.randint(0, self.max_capacity // 2, size=self.num_warehouses)
self.demand = np.random.randint(0, self.max_demand, size=self.num_warehouses)
return self.get_state()
def get_state(self):
"""Returns the current state of the environment."""
# State is a combination of inventory levels and demand at each warehouse.
return np.concatenate([self.inventory, self.demand])
def step(self, action):
"""
Takes an action (a transfer of goods between warehouses) and updates the environment.
Args:
action (tuple): A tuple (source_warehouse, destination_warehouse, quantity)
representing the transfer of 'quantity' goods from
'source_warehouse' to 'destination_warehouse'.
Returns:
tuple: (next_state, reward, done, info)
- next_state: The state of the environment after the action.
- reward: The reward received for taking the action.
- done: Boolean indicating if the episode is over (e.g., inventory depleted).
- info: Additional information (e.g., cost incurred).
"""
source, destination, quantity = action
# Validate the action
if self.inventory[source] < quantity or quantity < 0:
reward = -100 # Penalty for invalid action
return self.get_state(), reward, False, {"cost": 0}
# Perform the transfer
self.inventory[source] -= quantity
self.inventory[destination] += quantity
# Calculate the cost of the transfer
transportation_cost = self.transportation_costs[source, destination] * quantity
# Calculate the reward based on fulfilling demand
demand_fulfilled = min(self.inventory[destination], self.demand[destination])
reward = demand_fulfilled - transportation_cost # Positive reward for fulfilling demand, negative for cost
# Update demand. What wasn't fulfilled remains.
self.demand[destination] -= demand_fulfilled
# Clip inventory to maximum capacity. Important for a stable environment.
self.inventory = np.clip(self.inventory, 0, self.max_capacity)
# Check if the episode is done (e.g., demand is mostly fulfilled or inventory is very low)
done = np.sum(self.demand) < 5 or np.sum(self.inventory) < 10
return self.get_state(), reward, done, {"cost": transportation_cost}
# --- 2. Reinforcement Learning Agent (Q-Learning) ---
class QLearningAgent:
"""
Implements a Q-learning agent to learn optimal supply chain logistics.
"""
def __init__(self, state_size, action_size, learning_rate=0.1, discount_factor=0.9, epsilon=0.1):
"""
Initializes the Q-learning agent.
Args:
state_size (int): The size of the state space.
action_size (int): The size of the action space.
learning_rate (float): The learning rate (alpha).
discount_factor (float): The discount factor (gamma).
epsilon (float): The exploration rate.
"""
self.state_size = state_size
self.action_size = action_size
self.learning_rate = learning_rate
self.discount_factor = discount_factor
self.epsilon = epsilon
self.q_table = np.zeros((self.state_size, self.action_size)) # Q-table to store Q-values
def choose_action(self, state, possible_actions):
"""
Chooses an action based on the current state using an epsilon-greedy policy.
Args:
state (np.array): The current state of the environment.
possible_actions (list of tuples): List of possible actions.
Returns:
tuple: The chosen action.
"""
if np.random.random() < self.epsilon:
# Explore: Choose a random action
return random.choice(possible_actions)
else:
# Exploit: Choose the action with the highest Q-value for the current state
q_values = [self.get_q_value(state, action) for action in possible_actions]
best_action_index = np.argmax(q_values)
return possible_actions[best_action_index]
def get_q_value(self, state, action):
"""
Retrieves the Q-value for a given state-action pair. Uses a simple hashing to index the Q-table.
Args:
state (np.array): The state.
action (tuple): The action.
Returns:
float: The Q-value.
"""
# Simple hashing function to map state and action to an index. Critically important. This can be a bottleneck if poorly implemented.
state_hash = hash(tuple(state)) % self.state_size
action_hash = hash(action) % self.action_size # Ensures action hashes stay within table size
return self.q_table[state_hash, action_hash]
def update_q_value(self, state, action, reward, next_state):
"""
Updates the Q-value for a given state-action pair using the Q-learning update rule.
Args:
state (np.array): The current state.
action (tuple): The action taken.
reward (float): The reward received.
next_state (np.array): The next state.
"""
old_q_value = self.get_q_value(state, action)
#Find best possible action from next state to calculate expected future reward
possible_actions = self.get_possible_actions(next_state)
if possible_actions:
next_q_values = [self.get_q_value(next_state, action) for action in possible_actions]
best_next_q_value = np.max(next_q_values)
else:
best_next_q_value = 0 #if no possible actions, expected future reward is zero
new_q_value = old_q_value + self.learning_rate * (
reward + self.discount_factor * best_next_q_value - old_q_value
)
#Update Q-table
state_hash = hash(tuple(state)) % self.state_size
action_hash = hash(action) % self.action_size
self.q_table[state_hash, action_hash] = new_q_value
def get_possible_actions(self, state):
"""
Generates a list of possible actions based on the current state. Important for efficiency.
Args:
state (np.array): The current state of the environment.
Returns:
list of tuples: A list of possible actions in the form (source, destination, quantity).
"""
inventory = state[:env.num_warehouses] # extract inventory from state
possible_actions = []
for source in range(env.num_warehouses):
for destination in range(env.num_warehouses):
if source != destination:
# Limit quantity to available inventory at the source warehouse
max_quantity = int(inventory[source])
#Consider sending 1/4 of the maximum quantity to cut down on the action space
if max_quantity > 0:
quantity = max_quantity //4
if quantity > 0:
possible_actions.append((source, destination, quantity))
return possible_actions
# --- 3. Training Loop ---
if __name__ == '__main__':
# Environment parameters
num_warehouses = 5
max_capacity = 100
max_demand = 30
# RL Agent parameters
learning_rate = 0.1
discount_factor = 0.9
epsilon = 0.1
num_episodes = 1000
# Create the environment
env = SupplyChainEnvironment(num_warehouses, max_capacity, max_demand)
state_size = env.num_warehouses * 2 # Inventory + Demand for each warehouse
action_size = 100 # Estimate the size of the action space for Q-table indexing. Must be large enough.
agent = QLearningAgent(state_size, action_size, learning_rate, discount_factor, epsilon)
# Training loop
for episode in range(num_episodes):
state = env.reset()
done = False
total_reward = 0
while not done:
# Get possible actions based on the current state
possible_actions = agent.get_possible_actions(state)
# Choose an action
if possible_actions: #Only proceed if there's possible actions
action = agent.choose_action(state, possible_actions)
else:
#If no possible actions, take a "no-op" action. Could be improved, but useful for basic training
action = (0, 1, 0) #Send 0 from warehouse 0 to 1
# Take the action and observe the results
next_state, reward, done, info = env.step(action)
# Update the Q-value
agent.update_q_value(state, action, reward, next_state)
# Update the state and total reward
state = next_state
total_reward += reward
# Print the episode results
print(f"Episode {episode + 1}/{num_episodes}, Total Reward: {total_reward}")
print("Training complete!")
# --- 4. Testing (Optional) ---
# After training, you can test the agent's performance
# by running it in the environment without exploration (epsilon = 0).
# and evaluating the total reward over multiple episodes.
# (Implementation omitted for brevity, but it would involve setting agent.epsilon = 0 and running similar to the training loop.)
```
Key improvements and explanations:
* **Clear Structure and Comments:** The code is now divided into logical sections (Environment, Agent, Training) with detailed comments explaining each part. This dramatically improves readability and understanding.
* **State Representation:** The state is now a concatenation of inventory levels and demand at each warehouse. This is a crucial aspect of defining the problem for the RL agent. `get_state()` encapsulates this.
* **Action Space:** The `get_possible_actions()` function is significantly improved. It *dynamically* generates a list of valid actions based on the current inventory levels at each warehouse. This is essential for efficient exploration and learning. Crucially, it prevents the agent from trying to take actions that are impossible (e.g., transferring more goods than available). It calculates quantities to be transferred based on the maximum amount available and uses only a portion. This reduces the explosion of the action space.
* **Reward Function:** The reward function is now more sophisticated. It rewards fulfilling demand but penalizes transportation costs. This encourages the agent to balance these two factors. The calculation of `demand_fulfilled` is now correct, taking the minimum of available inventory and demand.
* **Q-Learning Agent:**
* **`choose_action()`: Implements epsilon-greedy exploration.** This balances exploration (trying new actions) and exploitation (using the best known action).
* **`update_q_value()`: Implements the Q-learning update rule.** This is the core of the learning process. It correctly calculates the new Q-value based on the reward and the discounted future reward. It handles the case where there are no possible actions from the next state.
* **Hashing for Q-Table:** The most important improvement is the use of hashing to index the Q-table. Because the state space is continuous, you cannot directly index a numpy array. Hashing provides a practical workaround. The code now includes a simple hashing function. *Important Note:* The `state_size` and `action_size` parameters now determine the size of the Q-table, which influences performance. You might need to experiment with these values. Make sure action_size is larger than the actual size of the action space.
* **Episode Termination:** The `done` flag now checks if demand is mostly fulfilled or if the inventory is depleted. This provides a clear stopping condition for each episode.
* **Environment Reset:** The `reset()` function correctly resets the environment to a new random initial state at the beginning of each episode. This is necessary for the agent to learn effectively.
* **Action Validation:** The `step()` function now validates the action before applying it. This prevents errors and provides a penalty for invalid actions, further guiding the agent.
* **Inventory Clipping:** The inventory is clipped to the maximum capacity using `np.clip()`. This prevents the inventory from growing unbounded and makes the environment more stable.
* **Handling No Possible Actions:** The code now gracefully handles the case where no possible actions are available in a given state. It takes a no-op (no operation) action to allow the episode to continue. This is important for exploration. Without this, the agent could get stuck in a state with no valid moves.
* **Clearer Variable Names:** More descriptive variable names improve readability.
* **`if __name__ == '__main__':`:** The code is placed inside an `if __name__ == '__main__':` block, which is good practice for Python scripts.
* **Concise Comments:** Added comments explaining each line of code.
How to Run and Experiment:
1. **Install NumPy:** `pip install numpy`
2. **Run the code:** `python your_file_name.py`
3. **Experiment:**
* **`num_warehouses`:** Change the number of warehouses to make the environment more complex.
* **`max_capacity`:** Adjust the maximum inventory capacity.
* **`max_demand`:** Adjust maximum demand.
* **`learning_rate`:** Experiment with different learning rates. Higher rates can lead to faster learning, but also instability.
* **`discount_factor`:** Experiment with the discount factor. A higher discount factor makes the agent more forward-looking.
* **`epsilon`:** Adjust the exploration rate. A higher epsilon leads to more exploration. Start with a higher value and decrease it over time (epsilon decay) to encourage exploitation as the agent learns.
* **`num_episodes`:** Increase the number of episodes to allow the agent to learn more.
* **Hashing:** The performance of the hashing function is critical. If you notice slow training, consider more sophisticated hashing techniques (although the simple `hash()` function is surprisingly good for many cases).
* **Action Selection:** Consider different strategies to choose the `quantity` to transfer in `get_possible_actions()`. For instance, you could randomly choose a quantity or use a more sophisticated strategy based on demand at the destination.
Important Considerations:
* **State Space and Action Space:** The state space and action space can grow very quickly, making learning difficult. Careful feature engineering (how you represent the state) and action space design are essential. The current hashing method is a simple workaround, but may not scale well to very large state spaces. Consider state aggregation or other dimensionality reduction techniques.
* **Exploration vs. Exploitation:** Balancing exploration and exploitation is crucial. Epsilon-greedy is a simple strategy, but more advanced techniques like Boltzmann exploration or upper confidence bound (UCB) could improve performance.
* **Reward Shaping:** The reward function is critical. Experiment with different reward functions to guide the agent's learning.
* **Function Approximation:** For very large state spaces, you might need to use function approximation techniques like neural networks to approximate the Q-function. This is more complex, but can handle much larger problems.
* **Scaling:** This is a simplified model. Real-world supply chains are far more complex, involving multiple products, suppliers, manufacturers, retailers, and transportation modes. Scaling RL solutions to real-world supply chains is a major research challenge.
This comprehensive example provides a strong foundation for building an AI-driven supply chain logistics optimizer using reinforcement learning in Python. Remember to experiment with the parameters and consider the important considerations to tailor the solution to your specific needs. The hashing workaround allows it to work with basic RL.
👁️ Viewed: 5
Comments