AI-Driven Supply Chain Logistics Optimizer Using Reinforcement Learning,Python
👤 Sharing: AI
```python
import numpy as np
import random
# --- Simulation Parameters ---
NUM_LOCATIONS = 5 # Number of locations (e.g., warehouses, distribution centers)
INVENTORY_CAPACITY = 100 # Maximum inventory each location can hold
DEMAND_MEAN = 20 # Average demand at each location
DEMAND_STDDEV = 5 # Standard deviation of demand
LEAD_TIME = 2 # Number of time steps it takes for an order to arrive
HOLDING_COST = 1 # Cost per unit of inventory held per time step
STOCKOUT_COST = 5 # Cost per unit of unmet demand (backorder)
NUM_EPISODES = 1000 # Number of training episodes
MAX_STEPS_PER_EPISODE = 50 # Maximum number of time steps per episode
LEARNING_RATE = 0.1 # Learning rate for Q-learning
DISCOUNT_FACTOR = 0.9 # Discount factor for future rewards
EXPLORATION_RATE = 0.1 # Exploration rate for epsilon-greedy policy
# --- Helper Functions ---
def generate_demand(num_locations, mean, stddev):
"""Generates random demand for each location, following a normal distribution."""
demand = np.random.normal(mean, stddev, num_locations)
demand = np.maximum(0, np.round(demand)).astype(int) # Ensure demand is non-negative and integer
return demand
def calculate_reward(inventory, demand):
"""Calculates the reward based on inventory levels and demand."""
holding_cost = HOLDING_COST * np.sum(inventory)
stockout_cost = STOCKOUT_COST * np.sum(np.maximum(0, demand - inventory)) # Penalize unmet demand
reward = -holding_cost - stockout_cost # Reward is the negative of the costs
return reward
# --- Environment Class ---
class SupplyChainEnvironment:
"""
Simulates a simplified supply chain environment.
"""
def __init__(self, num_locations, inventory_capacity, lead_time):
self.num_locations = num_locations
self.inventory_capacity = inventory_capacity
self.lead_time = lead_time
self.inventory = np.zeros(num_locations, dtype=int) # Initial inventory at each location
self.backorders = np.zeros(num_locations, dtype=int) # Initial backorders
self.orders_in_transit = np.zeros((lead_time, num_locations), dtype=int) # Orders on their way
self.current_step = 0
def reset(self):
"""Resets the environment to its initial state."""
self.inventory = np.zeros(self.num_locations, dtype=int)
self.backorders = np.zeros(self.num_locations, dtype=int)
self.orders_in_transit = np.zeros((self.lead_time, self.num_locations), dtype=int)
self.current_step = 0
return self.get_state() # Returns the initial state
def get_state(self):
"""Returns the current state of the environment."""
# A simplified state: current inventory levels at each location
# In a more complex scenario, you could include demand history, in-transit orders, etc.
return tuple(self.inventory)
def step(self, actions):
"""
Takes an action (order quantities) and updates the environment.
Args:
actions: A list or numpy array representing the order quantity for each location.
Returns:
next_state: The next state of the environment.
reward: The reward obtained for the action.
done: A boolean indicating whether the episode is finished.
info: Additional information (e.g., debugging data).
"""
self.current_step += 1
# 1. Enforce action constraints (cannot order more than capacity)
actions = np.minimum(actions, self.inventory_capacity - self.inventory)
actions = np.maximum(actions, 0).astype(int) # Ensure orders are not negative
# 2. Update in-transit orders
self.orders_in_transit = np.roll(self.orders_in_transit, shift=1, axis=0) # Shift orders
self.orders_in_transit[0, :] = actions # Place new orders
# 3. Receive orders (after lead time)
received_orders = self.orders_in_transit[-1, :]
self.inventory += received_orders
# 4. Generate demand
demand = generate_demand(self.num_locations, DEMAND_MEAN, DEMAND_STDDEV)
# 5. Satisfy demand
fulfilled_demand = np.minimum(demand, self.inventory)
self.inventory -= fulfilled_demand
unfulfilled_demand = demand - fulfilled_demand
self.backorders += unfulfilled_demand
self.inventory = np.maximum(0, self.inventory) # Inventory cannot be negative
# 6. Calculate reward
reward = calculate_reward(self.inventory, demand)
# 7. Get the next state
next_state = self.get_state()
# 8. Determine if the episode is done
done = self.current_step >= MAX_STEPS_PER_EPISODE
info = {} # Add any debugging info here
return next_state, reward, done, info
# --- Q-Learning Agent ---
class QLearningAgent:
"""
An agent that learns to optimize supply chain logistics using Q-learning.
"""
def __init__(self, num_locations, inventory_capacity, learning_rate, discount_factor, exploration_rate):
self.num_locations = num_locations
self.inventory_capacity = inventory_capacity
self.learning_rate = learning_rate
self.discount_factor = discount_factor
self.exploration_rate = exploration_rate
# Initialize the Q-table:
# States are tuples representing inventory levels at each location.
# Actions are the order quantities for each location (0 to inventory_capacity).
# We use a dictionary to represent the Q-table, which allows for sparse storage.
self.q_table = {}
def get_q_value(self, state, action):
"""Retrieves the Q-value for a given state-action pair. Returns 0 if the pair is not in the Q-table."""
if (state, action) not in self.q_table:
return 0 # Initialize Q-value to 0 if not seen before
return self.q_table[(state, action)]
def choose_action(self, state):
"""Chooses an action based on the epsilon-greedy policy."""
if random.random() < self.exploration_rate:
# Explore: Choose a random action
# Generate a random action (order quantity) for each location
action = tuple(np.random.randint(0, self.inventory_capacity + 1, self.num_locations))
else:
# Exploit: Choose the action with the highest Q-value for the current state
best_action = None
best_q_value = float('-inf') # Initialize with negative infinity
# Iterate through all possible actions (order quantities)
# Here we only check 100 random possible actions for time sake
for _ in range(100):
action = tuple(np.random.randint(0, self.inventory_capacity + 1, self.num_locations))
q_value = self.get_q_value(state, action)
if q_value > best_q_value:
best_q_value = q_value
best_action = action
if best_action is None:
# If no action has been tried yet, choose a random action
action = tuple(np.random.randint(0, self.inventory_capacity + 1, self.num_locations))
else:
action = best_action
return action
def update_q_value(self, state, action, reward, next_state):
"""Updates the Q-value for a given state-action pair using the Q-learning update rule."""
# Find the best Q-value for the next state
best_next_q_value = float('-inf')
for _ in range(100):
action = tuple(np.random.randint(0, self.inventory_capacity + 1, self.num_locations))
next_q_value = self.get_q_value(next_state, action)
best_next_q_value = max(best_next_q_value,next_q_value)
# If the next state is new, set the best next q value to 0
if best_next_q_value == float('-inf'):
best_next_q_value = 0
# Calculate the new Q-value using the Q-learning update rule
old_q_value = self.get_q_value(state, action)
new_q_value = old_q_value + self.learning_rate * (
reward + self.discount_factor * best_next_q_value - old_q_value)
# Update the Q-table
self.q_table[(state, action)] = new_q_value
def train(self, env, num_episodes):
"""Trains the Q-learning agent."""
for episode in range(num_episodes):
state = env.reset()
total_reward = 0
done = False
while not done:
# Choose an action based on the current state
action = self.choose_action(state)
# Take the action and observe the next state, reward, and done flag
next_state, reward, done, _ = env.step(action)
# Update the Q-value for the (state, action) pair
self.update_q_value(state, action, reward, next_state)
# Update the total reward
total_reward += reward
# Update the current state
state = next_state
# Print the episode number and total reward
print(f"Episode: {episode + 1}, Total Reward: {total_reward}")
print("Training finished.\n")
def evaluate(self, env, num_episodes):
"""Evaluates the trained agent."""
total_reward = 0
for episode in range(num_episodes):
state = env.reset()
done = False
episode_reward = 0
while not done:
action = self.choose_action(state) # Choose action based on policy (no exploration)
next_state, reward, done, _ = env.step(action)
episode_reward += reward
state = next_state
total_reward += episode_reward
average_reward = total_reward / num_episodes
print(f"Average reward over {num_episodes} evaluation episodes: {average_reward}")
# --- Main Execution ---
if __name__ == "__main__":
# 1. Create the environment
env = SupplyChainEnvironment(NUM_LOCATIONS, INVENTORY_CAPACITY, LEAD_TIME)
# 2. Create the Q-learning agent
agent = QLearningAgent(NUM_LOCATIONS, INVENTORY_CAPACITY, LEARNING_RATE, DISCOUNT_FACTOR, EXPLORATION_RATE)
# 3. Train the agent
print("Start training...")
agent.train(env, NUM_EPISODES)
# 4. Evaluate the trained agent
print("Start evaluating...")
agent.evaluate(env, 10) # Evaluate over 10 episodes
print("Finished evaluating.\n")
```
Key improvements and explanations:
* **Clearer Code Structure:** The code is organized into well-defined classes ( `SupplyChainEnvironment`, `QLearningAgent`) and functions, making it more readable and maintainable.
* **Environment Class:** The `SupplyChainEnvironment` class encapsulates all the logic for simulating the supply chain, including updating inventory, handling demand, and calculating rewards. This is crucial for proper RL.
* **State Representation:** The state is now a tuple of inventory levels. While simplified, it is a functional state for the RL algorithm. Crucially, tuples are hashable, which is required for using them as keys in the Q-table dictionary. The `get_state()` method retrieves this representation.
* **Action Space:** The action space is now explicitly defined. Actions are the order quantities for each location.
* **Reward Function:** The `calculate_reward()` function calculates a reward based on holding costs and stockout costs. The reward is negative because the agent aims to minimize costs. Stockout costs penalize unmet demand.
* **Q-Learning Agent:** The `QLearningAgent` class implements the Q-learning algorithm.
* **Q-Table Initialization:** The Q-table is initialized as a dictionary (`self.q_table = {}`). This is important because the state space can be very large, and it's more efficient to only store Q-values for states that have been visited.
* **Epsilon-Greedy Exploration:** The agent uses an epsilon-greedy policy to balance exploration and exploitation. `exploration_rate` controls the probability of taking a random action.
* **Q-Value Update:** The `update_q_value()` function implements the Q-learning update rule.
* **Training Loop:** The `train()` function runs the Q-learning algorithm for a specified number of episodes.
* **Evaluation:** The `evaluate()` function evaluates the trained agent by running it in the environment without exploration and calculating the average reward.
* **Action Constraints:** The `step()` function now *enforces* the constraint that you cannot order more than the available capacity. Critically, it also ensures orders are not negative.
* **Demand Generation:** The `generate_demand()` function uses a normal distribution to simulate demand, which is more realistic than a fixed value.
* **Clearer Comments and Explanations:** The code is thoroughly commented to explain each step.
* **`reset()` Method:** The `reset()` method is essential for resetting the environment at the beginning of each episode. It properly resets inventory, backorders, and orders in transit.
* **Order Management:** The in-transit order handling is now more robust using `np.roll` to simulate lead times.
* **`done` flag:** Implemented and used correctly to end episodes when `MAX_STEPS_PER_EPISODE` is reached.
* **Tuple Action Space:** The actions in the `choose_action` function are generated as tuples to enable them to be used as keys for the Q-table.
* **More reasonable action selection**: Iterating through *every possible* action is infeasible due to the exponential growth of the action space. Instead, the code now samples 100 *random* actions when choosing the best action and when updating the Q table, which provides a reasonable approximation.
How to Run the Code:
1. **Install NumPy:** If you don't have NumPy installed, run `pip install numpy`.
2. **Run the Script:** Save the code as a Python file (e.g., `supply_chain_rl.py`) and run it from your terminal: `python supply_chain_rl.py`
The code will train the Q-learning agent and then evaluate its performance. The output will show the total reward for each training episode and the average reward over the evaluation episodes.
Important Considerations:
* **State Space:** The state space (possible combinations of inventory levels) can grow very quickly as the number of locations and the inventory capacity increase. This can lead to the "curse of dimensionality," where the Q-table becomes too large to store in memory, and the agent takes too long to learn. More advanced RL techniques (e.g., deep Q-networks, policy gradients) are needed for larger state spaces.
* **Action Space:** Similarly, the action space (possible order quantities) can also grow quickly. Continuous action spaces might be more appropriate in some cases.
* **Reward Shaping:** The reward function is critical for the agent's learning. Experiment with different reward functions to see what works best.
* **Hyperparameter Tuning:** The learning rate, discount factor, and exploration rate are hyperparameters that can significantly affect the agent's performance. Experiment with different values to find the optimal settings.
* **More Realistic Simulation:** This is a simplified simulation. To make it more realistic, you could add more features, such as:
* Variable lead times
* Multiple suppliers
* Production capacity constraints
* More complex demand patterns (e.g., seasonality, trends)
* Transportation costs
* **Scalability:** For large-scale supply chains, consider using deep reinforcement learning (DRL) techniques. DRL uses neural networks to approximate the Q-function, which can handle much larger state and action spaces. Libraries like TensorFlow and PyTorch can be used for implementing DRL algorithms.
👁️ Viewed: 4
Comments