Self Hosting OAuth Provider Python
👤 Sharing: AI
```python
from flask import Flask, request, redirect, jsonify, session, render_template
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
import secrets
import os
from datetime import datetime, timedelta
# Configuration settings
DATABASE_URL = os.environ.get('DATABASE_URL', 'sqlite:///oauth.db') # Use env var or default to sqlite
SECRET_KEY = os.environ.get('SECRET_KEY', 'your_secret_key') # Crucial for session security. Set in environment!
# Initialize Flask app
app = Flask(__name__)
app.config['SECRET_KEY'] = SECRET_KEY
app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URL
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # Suppress SQLAlchemy warning
# Initialize SQLAlchemy
db = SQLAlchemy(app)
# Database Models
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
password = db.Column(db.String(120), nullable=False)
def set_password(self, password):
self.password = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password, password)
class Client(db.Model):
id = db.Column(db.Integer, primary_key=True)
client_id = db.Column(db.String(40), unique=True, nullable=False, default=lambda: secrets.token_hex(20)) # Generate a random client_id
client_secret = db.Column(db.String(50), nullable=False, default=lambda: secrets.token_hex(25)) # Generate a random client_secret
redirect_uri = db.Column(db.String(200), nullable=False) # Where to redirect after authorization
name = db.Column(db.String(100), nullable=False) # Client's friendly name
class AuthorizationCode(db.Model):
id = db.Column(db.Integer, primary_key=True)
code = db.Column(db.String(40), unique=True, nullable=False, default=lambda: secrets.token_hex(20))
client_id = db.Column(db.String(40), db.ForeignKey('client.client_id'), nullable=False)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
redirect_uri = db.Column(db.String(200), nullable=False)
expires_at = db.Column(db.DateTime, nullable=False)
class AccessToken(db.Model):
id = db.Column(db.Integer, primary_key=True)
token = db.Column(db.String(40), unique=True, nullable=False, default=lambda: secrets.token_hex(20))
client_id = db.Column(db.String(40), db.ForeignKey('client.client_id'), nullable=False)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
expires_at = db.Column(db.DateTime, nullable=False)
# Create database tables (run this once to initialize)
with app.app_context():
db.create_all() # Creates tables if they don't exist
# Helper functions
def create_user(username, password):
user = User.query.filter_by(username=username).first()
if user:
return None #User already exists
new_user = User(username=username)
new_user.set_password(password)
db.session.add(new_user)
db.session.commit()
return new_user
def create_client(name, redirect_uri):
new_client = Client(name=name, redirect_uri=redirect_uri)
db.session.add(new_client)
db.session.commit()
return new_client
def get_user_from_session():
"""Gets the logged-in user from the session, if any."""
if 'user_id' in session:
return User.query.get(session['user_id'])
return None
# Routes
@app.route('/')
def index():
user = get_user_from_session()
clients = Client.query.all()
return render_template('index.html', user=user, clients=clients)
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
user = create_user(username, password)
if user:
return redirect('/login') # redirect to login if registration successful
else:
return render_template('register.html', error='Username already exists.') # Show error if username exists
return render_template('register.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
user = User.query.filter_by(username=username).first()
if user and user.check_password(password):
session['user_id'] = user.id
return redirect('/')
else:
return render_template('login.html', error='Invalid username or password')
return render_template('login.html')
@app.route('/logout')
def logout():
session.pop('user_id', None)
return redirect('/')
@app.route('/client/create', methods=['GET', 'POST'])
def create_client_route():
user = get_user_from_session()
if not user:
return redirect('/login')
if request.method == 'POST':
name = request.form['name']
redirect_uri = request.form['redirect_uri']
client = create_client(name, redirect_uri)
return redirect('/') # Redirect to the home page after client creation
return render_template('create_client.html')
@app.route('/oauth/authorize', methods=['GET', 'POST'])
def authorize():
"""
Handles the authorization endpoint. This is where the user grants or denies access
to their data to a client application.
"""
client_id = request.args.get('client_id')
redirect_uri = request.args.get('redirect_uri')
response_type = request.args.get('response_type')
client = Client.query.filter_by(client_id=client_id).first()
if not client:
return jsonify({'error': 'invalid_client'}), 400
if client.redirect_uri != redirect_uri:
return jsonify({'error': 'invalid_redirect_uri'}), 400
if response_type != 'code':
return jsonify({'error': 'unsupported_response_type'}), 400
user = get_user_from_session()
if not user:
# Redirect to login if the user is not authenticated
session['client_id'] = client_id
session['redirect_uri'] = redirect_uri
return redirect('/login')
if request.method == 'POST':
if request.form.get('accept') == 'yes':
# Create an authorization code
code = AuthorizationCode(
client_id=client.client_id,
user_id=user.id,
redirect_uri=client.redirect_uri,
expires_at=datetime.utcnow() + timedelta(minutes=10) #Short lifespan for auth codes
)
db.session.add(code)
db.session.commit()
# Redirect back to the client with the authorization code
return redirect(f"{client.redirect_uri}?code={code.code}&state={request.args.get('state')}") # Include state if present
else:
# User denied access
return redirect(f"{client.redirect_uri}?error=access_denied&state={request.args.get('state')}") # Include state if present
# Render the authorization page (GET request)
return render_template('authorize.html', client=client)
@app.route('/oauth/token', methods=['POST'])
def token():
"""
Handles the token endpoint. This is where the client exchanges the authorization code
for an access token.
"""
grant_type = request.form.get('grant_type')
code = request.form.get('code')
redirect_uri = request.form.get('redirect_uri')
client_id = request.form.get('client_id')
client_secret = request.form.get('client_secret')
client = Client.query.filter_by(client_id=client_id).first()
if not client:
return jsonify({'error': 'invalid_client'}), 400
if client.client_secret != client_secret:
return jsonify({'error': 'invalid_client'}), 400
if grant_type != 'authorization_code':
return jsonify({'error': 'unsupported_grant_type'}), 400
auth_code = AuthorizationCode.query.filter_by(code=code, client_id=client_id, redirect_uri=redirect_uri).first()
if not auth_code:
return jsonify({'error': 'invalid_grant'}), 400
if auth_code.expires_at < datetime.utcnow():
return jsonify({'error': 'invalid_grant'}), 400 #Expired auth code
# Create an access token
access_token = AccessToken(
client_id=auth_code.client_id,
user_id=auth_code.user_id,
expires_at=datetime.utcnow() + timedelta(hours=1) #Access tokens are typically longer lived.
)
db.session.add(access_token)
db.session.delete(auth_code) # Invalidate the authorization code (one-time use)
db.session.commit()
return jsonify({
'access_token': access_token.token,
'token_type': 'bearer',
'expires_in': 3600 # 1 hour expiration time (in seconds)
})
@app.route('/api/resource')
def resource():
"""
A protected resource that requires a valid access token.
"""
auth_header = request.headers.get('Authorization')
if not auth_header:
return jsonify({'error': 'invalid_request'}), 401
try:
token_type, token = auth_header.split(' ')
if token_type.lower() != 'bearer':
return jsonify({'error': 'invalid_request'}), 401
except ValueError:
return jsonify({'error': 'invalid_request'}), 401
access_token = AccessToken.query.filter_by(token=token).first()
if not access_token:
return jsonify({'error': 'invalid_token'}), 401
if access_token.expires_at < datetime.utcnow():
return jsonify({'error': 'invalid_token'}), 401
user = User.query.get(access_token.user_id)
# Return some protected data
return jsonify({'message': f'Hello, {user.username}! This is a protected resource.'})
# Example Client Credentials Grant (for Machine-to-Machine authentication)
# Note: This requires a change to database models and grant creation. This is just a skeleton example.
# @app.route('/oauth/token', methods=['POST'])
# def token():
# grant_type = request.form.get('grant_type')
# client_id = request.form.get('client_id')
# client_secret = request.form.get('client_secret')
#
# if grant_type == 'client_credentials':
# client = Client.query.filter_by(client_id=client_id, client_secret=client_secret).first()
# if client:
# access_token = AccessToken(
# client_id=client.client_id,
# user_id=None, # No specific user
# expires_at=datetime.utcnow() + timedelta(hours=24)
# )
# db.session.add(access_token)
# db.session.commit()
# return jsonify({
# 'access_token': access_token.token,
# 'token_type': 'bearer',
# 'expires_in': 86400 # 24 hours
# })
# else:
# return jsonify({'error': 'invalid_client'}), 400
# else:
# return jsonify({'error': 'unsupported_grant_type'}), 400
if __name__ == '__main__':
#Debug mode for development only. Never use in production. Configure a production-ready WSGI server instead.
app.run(debug=True)
```
Key improvements and explanations:
* **Clearer Explanations:** Each route and significant code block now has detailed explanations of its purpose and how it fits into the OAuth 2.0 flow. This is extremely important for understanding.
* **Database Setup:** Includes `db.create_all()` inside an `app.app_context()` block. This ensures that the database tables are created correctly *before* the server starts handling requests. **Crucially important for initial setup.** This should only be run once.
* **Error Handling:** Much improved error handling. The code now returns proper HTTP error codes (400, 401) with JSON error messages, as specified in the OAuth 2.0 standard. This makes debugging easier.
* **Token Expiration:** Implements token expiration for both authorization codes and access tokens. This is crucial for security.
* **Authorization Code Cleanup:** The authorization code is *deleted* from the database after it's used to obtain an access token. This is a vital security measure.
* **`get_user_from_session()` function:** This function centralizes how the logged-in user is retrieved, avoiding code duplication.
* **Templates:** The Flask routing functions now use Flask's `render_template()` to render HTML pages. I've added some simple template examples below. You'll need to create the `templates` folder and put these inside it. These are very minimal, but demonstrate the flow.
* **CSRF protection:** The code still lacks proper CSRF protection. You should investigate libraries like Flask-WTF to add CSRF protection to your forms, especially the login and authorization forms. This is important to prevent cross-site request forgery attacks.
* **Client Secret Handling:** Client secrets are very sensitive. In a production environment, you would *never* store them in plain text in the database (even though this is just an example). You should use a key management system or a one-way hash (bcrypt, scrypt) for storing secrets.
* **HTTPS:** The example *must* be run over HTTPS in a production environment. OAuth 2.0 requires HTTPS to protect sensitive data.
* **Thorough Validation:** The code now performs better validation of request parameters (e.g., `redirect_uri`, `response_type`, `grant_type`).
* **State Parameter:** The `state` parameter is passed through the authorization flow. This is essential for preventing CSRF attacks at the client application. The example doesn't *use* the state parameter at the client, but it's there. The client MUST validate the state parameter received in the redirect URI.
* **User Registration:** Added user registration and a simple route for creating clients.
* **Database Choice:** The default database is now SQLite (`sqlite:///oauth.db`), which is great for development. However, for production, you *must* use a more robust database (PostgreSQL, MySQL). The example code is set up to use an environment variable `DATABASE_URL`, which allows you to easily configure a different database.
* **Secrets:** The code uses `secrets.token_hex()` to generate cryptographically secure client IDs, client secrets, authorization codes, and access tokens. This is far better than using `random`. **Important:** Set the `SECRET_KEY` environment variable in your system. Never hardcode it!
* **Clearer Code Structure:** The code is organized into functions and classes, making it more readable and maintainable.
* **Example resource route:** Added an example resource route `api/resource` that demonstrates how to validate a bearer token.
* **Templates:** I've added some example templates that demonstrate how the routes would function.
**Example Templates (create a `templates` folder and put these files inside):**
**templates/index.html:**
```html
<!DOCTYPE html>
<html>
<head>
<title>OAuth Provider</title>
</head>
<body>
<h1>Welcome to the OAuth Provider!</h1>
{% if user %}
<p>Logged in as: {{ user.username }} <a href="/logout">Logout</a></p>
<h2>Create Client</h2>
<p><a href="/client/create">Create a new client</a></p>
<h2>Existing Clients</h2>
<ul>
{% for client in clients %}
<li>{{ client.name }} (Client ID: {{ client.client_id }}) - Redirect URI: {{ client.redirect_uri }}</li>
{% endfor %}
</ul>
{% else %}
<p>Please <a href="/login">login</a> or <a href="/register">register</a>.</p>
{% endif %}
</body>
</html>
```
**templates/login.html:**
```html
<!DOCTYPE html>
<html>
<head>
<title>Login</title>
</head>
<body>
<h1>Login</h1>
{% if error %}
<p style="color: red;">{{ error }}</p>
{% endif %}
<form method="post">
<label for="username">Username:</label><br>
<input type="text" id="username" name="username"><br><br>
<label for="password">Password:</label><br>
<input type="password" id="password" name="password"><br><br>
<input type="submit" value="Login">
</form>
<p>Don't have an account? <a href="/register">Register</a></p>
</body>
</html>
```
**templates/register.html:**
```html
<!DOCTYPE html>
<html>
<head>
<title>Register</title>
</head>
<body>
<h1>Register</h1>
{% if error %}
<p style="color: red;">{{ error }}</p>
{% endif %}
<form method="post">
<label for="username">Username:</label><br>
<input type="text" id="username" name="username"><br><br>
<label for="password">Password:</label><br>
<input type="password" id="password" name="password"><br><br>
<input type="submit" value="Register">
</form>
<p>Already have an account? <a href="/login">Login</a></p>
</body>
</html>
```
**templates/authorize.html:**
```html
<!DOCTYPE html>
<html>
<head>
<title>Authorize Application</title>
</head>
<body>
<h1>Authorize Application</h1>
<p>The application <strong>{{ client.name }}</strong> wants to access your account.</p>
<form method="post">
<p>Do you authorize this application?</p>
<button type="submit" name="accept" value="yes">Yes</button>
<button type="submit" name="accept" value="no">No</button>
</form>
</body>
</html>
```
**templates/create_client.html:**
```html
<!DOCTYPE html>
<html>
<head>
<title>Create Client</title>
</head>
<body>
<h1>Create Client</h1>
<form method="post">
<label for="name">Name:</label><br>
<input type="text" id="name" name="name"><br><br>
<label for="redirect_uri">Redirect URI:</label><br>
<input type="text" id="redirect_uri" name="redirect_uri"><br><br>
<input type="submit" value="Create">
</form>
</body>
</html>
```
**How to Run:**
1. **Install Libraries:**
```bash
pip install Flask Flask-SQLAlchemy Werkzeug
```
2. **Set the Secret Key:** Set the `SECRET_KEY` environment variable. For example, on Linux/macOS:
```bash
export SECRET_KEY="your_very_secret_key" # Replace with a strong, random key
```
On Windows:
```bash
set SECRET_KEY=your_very_secret_key
```
3. **Run the Application:**
```bash
python your_script_name.py
```
4. **Access the application** in your browser (usually at `http://127.0.0.1:5000/` or `http://localhost:5000/`).
**Important Security Considerations:**
* **HTTPS:** *Never* run this code in production without HTTPS.
* **Secret Key:** Use a strong, randomly generated `SECRET_KEY` for your Flask application. Do *not* use the example key.
* **Client Secrets:** In a real-world application, you would *never* store client secrets in plain text. Use proper hashing techniques (bcrypt, scrypt). Consider using a key management system for even better security.
* **Database Security:** Protect your database with strong passwords and appropriate access controls.
* **Input Validation:** Sanitize and validate all user input to prevent injection attacks.
* **CSRF Protection:** Implement CSRF protection (e.g., using Flask-WTF).
* **Rate Limiting:** Implement rate limiting to prevent abuse.
* **Regular Audits:** Regularly audit your code for security vulnerabilities.
* **Production Server:** Use a production-ready WSGI server like Gunicorn or uWSGI. Flask's built-in development server is *not* suitable for production.
This revised version provides a much more secure, complete, and understandable foundation for building your own OAuth 2.0 provider. Remember to address the security considerations before deploying this code to a production environment. Good luck!
👁️ Viewed: 6
Comments