Self Hosting OAuth Provider Python

👤 Sharing: AI
```python
from flask import Flask, request, redirect, jsonify, session, render_template
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
import secrets
import os
from datetime import datetime, timedelta

# Configuration settings
DATABASE_URL = os.environ.get('DATABASE_URL', 'sqlite:///oauth.db')  # Use env var or default to sqlite
SECRET_KEY = os.environ.get('SECRET_KEY', 'your_secret_key')  # Crucial for session security.  Set in environment!

# Initialize Flask app
app = Flask(__name__)
app.config['SECRET_KEY'] = SECRET_KEY
app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URL
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False  # Suppress SQLAlchemy warning


# Initialize SQLAlchemy
db = SQLAlchemy(app)

# Database Models
class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    password = db.Column(db.String(120), nullable=False)

    def set_password(self, password):
        self.password = generate_password_hash(password)

    def check_password(self, password):
        return check_password_hash(self.password, password)


class Client(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    client_id = db.Column(db.String(40), unique=True, nullable=False, default=lambda: secrets.token_hex(20))  # Generate a random client_id
    client_secret = db.Column(db.String(50), nullable=False, default=lambda: secrets.token_hex(25)) # Generate a random client_secret
    redirect_uri = db.Column(db.String(200), nullable=False)  # Where to redirect after authorization
    name = db.Column(db.String(100), nullable=False) # Client's friendly name


class AuthorizationCode(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    code = db.Column(db.String(40), unique=True, nullable=False, default=lambda: secrets.token_hex(20))
    client_id = db.Column(db.String(40), db.ForeignKey('client.client_id'), nullable=False)
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
    redirect_uri = db.Column(db.String(200), nullable=False)
    expires_at = db.Column(db.DateTime, nullable=False)


class AccessToken(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    token = db.Column(db.String(40), unique=True, nullable=False, default=lambda: secrets.token_hex(20))
    client_id = db.Column(db.String(40), db.ForeignKey('client.client_id'), nullable=False)
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
    expires_at = db.Column(db.DateTime, nullable=False)

# Create database tables (run this once to initialize)
with app.app_context():
    db.create_all()  # Creates tables if they don't exist


# Helper functions
def create_user(username, password):
    user = User.query.filter_by(username=username).first()
    if user:
        return None #User already exists
    new_user = User(username=username)
    new_user.set_password(password)
    db.session.add(new_user)
    db.session.commit()
    return new_user

def create_client(name, redirect_uri):
    new_client = Client(name=name, redirect_uri=redirect_uri)
    db.session.add(new_client)
    db.session.commit()
    return new_client

def get_user_from_session():
    """Gets the logged-in user from the session, if any."""
    if 'user_id' in session:
        return User.query.get(session['user_id'])
    return None

# Routes

@app.route('/')
def index():
    user = get_user_from_session()
    clients = Client.query.all()
    return render_template('index.html', user=user, clients=clients)


@app.route('/register', methods=['GET', 'POST'])
def register():
    if request.method == 'POST':
        username = request.form['username']
        password = request.form['password']
        user = create_user(username, password)
        if user:
            return redirect('/login') # redirect to login if registration successful
        else:
             return render_template('register.html', error='Username already exists.') # Show error if username exists
    return render_template('register.html')


@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'POST':
        username = request.form['username']
        password = request.form['password']
        user = User.query.filter_by(username=username).first()
        if user and user.check_password(password):
            session['user_id'] = user.id
            return redirect('/')
        else:
            return render_template('login.html', error='Invalid username or password')
    return render_template('login.html')


@app.route('/logout')
def logout():
    session.pop('user_id', None)
    return redirect('/')

@app.route('/client/create', methods=['GET', 'POST'])
def create_client_route():
    user = get_user_from_session()
    if not user:
        return redirect('/login')

    if request.method == 'POST':
        name = request.form['name']
        redirect_uri = request.form['redirect_uri']
        client = create_client(name, redirect_uri)
        return redirect('/')  # Redirect to the home page after client creation

    return render_template('create_client.html')



@app.route('/oauth/authorize', methods=['GET', 'POST'])
def authorize():
    """
    Handles the authorization endpoint.  This is where the user grants or denies access
    to their data to a client application.
    """
    client_id = request.args.get('client_id')
    redirect_uri = request.args.get('redirect_uri')
    response_type = request.args.get('response_type')

    client = Client.query.filter_by(client_id=client_id).first()

    if not client:
        return jsonify({'error': 'invalid_client'}), 400

    if client.redirect_uri != redirect_uri:
        return jsonify({'error': 'invalid_redirect_uri'}), 400

    if response_type != 'code':
        return jsonify({'error': 'unsupported_response_type'}), 400

    user = get_user_from_session()
    if not user:
        # Redirect to login if the user is not authenticated
        session['client_id'] = client_id
        session['redirect_uri'] = redirect_uri
        return redirect('/login')

    if request.method == 'POST':
        if request.form.get('accept') == 'yes':
            # Create an authorization code
            code = AuthorizationCode(
                client_id=client.client_id,
                user_id=user.id,
                redirect_uri=client.redirect_uri,
                expires_at=datetime.utcnow() + timedelta(minutes=10) #Short lifespan for auth codes
            )
            db.session.add(code)
            db.session.commit()

            # Redirect back to the client with the authorization code
            return redirect(f"{client.redirect_uri}?code={code.code}&state={request.args.get('state')}")  # Include state if present

        else:
            # User denied access
            return redirect(f"{client.redirect_uri}?error=access_denied&state={request.args.get('state')}") # Include state if present

    # Render the authorization page (GET request)
    return render_template('authorize.html', client=client)


@app.route('/oauth/token', methods=['POST'])
def token():
    """
    Handles the token endpoint.  This is where the client exchanges the authorization code
    for an access token.
    """
    grant_type = request.form.get('grant_type')
    code = request.form.get('code')
    redirect_uri = request.form.get('redirect_uri')
    client_id = request.form.get('client_id')
    client_secret = request.form.get('client_secret')


    client = Client.query.filter_by(client_id=client_id).first()

    if not client:
        return jsonify({'error': 'invalid_client'}), 400

    if client.client_secret != client_secret:
        return jsonify({'error': 'invalid_client'}), 400

    if grant_type != 'authorization_code':
        return jsonify({'error': 'unsupported_grant_type'}), 400

    auth_code = AuthorizationCode.query.filter_by(code=code, client_id=client_id, redirect_uri=redirect_uri).first()

    if not auth_code:
        return jsonify({'error': 'invalid_grant'}), 400

    if auth_code.expires_at < datetime.utcnow():
        return jsonify({'error': 'invalid_grant'}), 400 #Expired auth code

    # Create an access token
    access_token = AccessToken(
        client_id=auth_code.client_id,
        user_id=auth_code.user_id,
        expires_at=datetime.utcnow() + timedelta(hours=1) #Access tokens are typically longer lived.
    )
    db.session.add(access_token)
    db.session.delete(auth_code)  # Invalidate the authorization code (one-time use)
    db.session.commit()

    return jsonify({
        'access_token': access_token.token,
        'token_type': 'bearer',
        'expires_in': 3600 # 1 hour expiration time (in seconds)
    })

@app.route('/api/resource')
def resource():
    """
    A protected resource that requires a valid access token.
    """
    auth_header = request.headers.get('Authorization')

    if not auth_header:
        return jsonify({'error': 'invalid_request'}), 401

    try:
        token_type, token = auth_header.split(' ')
        if token_type.lower() != 'bearer':
            return jsonify({'error': 'invalid_request'}), 401
    except ValueError:
        return jsonify({'error': 'invalid_request'}), 401


    access_token = AccessToken.query.filter_by(token=token).first()

    if not access_token:
        return jsonify({'error': 'invalid_token'}), 401

    if access_token.expires_at < datetime.utcnow():
        return jsonify({'error': 'invalid_token'}), 401

    user = User.query.get(access_token.user_id)

    # Return some protected data
    return jsonify({'message': f'Hello, {user.username}! This is a protected resource.'})


# Example Client Credentials Grant (for Machine-to-Machine authentication)
# Note: This requires a change to database models and grant creation.  This is just a skeleton example.
# @app.route('/oauth/token', methods=['POST'])
# def token():
#    grant_type = request.form.get('grant_type')
#    client_id = request.form.get('client_id')
#    client_secret = request.form.get('client_secret')
#
#    if grant_type == 'client_credentials':
#        client = Client.query.filter_by(client_id=client_id, client_secret=client_secret).first()
#        if client:
#            access_token = AccessToken(
#                client_id=client.client_id,
#                user_id=None,  # No specific user
#                expires_at=datetime.utcnow() + timedelta(hours=24)
#            )
#            db.session.add(access_token)
#            db.session.commit()
#            return jsonify({
#                'access_token': access_token.token,
#                'token_type': 'bearer',
#                'expires_in': 86400  # 24 hours
#            })
#        else:
#            return jsonify({'error': 'invalid_client'}), 400
#    else:
#        return jsonify({'error': 'unsupported_grant_type'}), 400



if __name__ == '__main__':
    #Debug mode for development only.  Never use in production.  Configure a production-ready WSGI server instead.
    app.run(debug=True)

```

Key improvements and explanations:

* **Clearer Explanations:**  Each route and significant code block now has detailed explanations of its purpose and how it fits into the OAuth 2.0 flow.  This is extremely important for understanding.
* **Database Setup:**  Includes `db.create_all()` inside an `app.app_context()` block.  This ensures that the database tables are created correctly *before* the server starts handling requests.  **Crucially important for initial setup.** This should only be run once.
* **Error Handling:**  Much improved error handling.  The code now returns proper HTTP error codes (400, 401) with JSON error messages, as specified in the OAuth 2.0 standard.  This makes debugging easier.
* **Token Expiration:** Implements token expiration for both authorization codes and access tokens.  This is crucial for security.
* **Authorization Code Cleanup:** The authorization code is *deleted* from the database after it's used to obtain an access token.  This is a vital security measure.
* **`get_user_from_session()` function:**  This function centralizes how the logged-in user is retrieved, avoiding code duplication.
* **Templates:** The Flask routing functions now use Flask's `render_template()` to render HTML pages.  I've added some simple template examples below. You'll need to create the `templates` folder and put these inside it.  These are very minimal, but demonstrate the flow.
* **CSRF protection:** The code still lacks proper CSRF protection.  You should investigate libraries like Flask-WTF to add CSRF protection to your forms, especially the login and authorization forms.  This is important to prevent cross-site request forgery attacks.
* **Client Secret Handling:** Client secrets are very sensitive.  In a production environment, you would *never* store them in plain text in the database (even though this is just an example).  You should use a key management system or a one-way hash (bcrypt, scrypt) for storing secrets.
* **HTTPS:** The example *must* be run over HTTPS in a production environment. OAuth 2.0 requires HTTPS to protect sensitive data.
* **Thorough Validation:**  The code now performs better validation of request parameters (e.g., `redirect_uri`, `response_type`, `grant_type`).
* **State Parameter:**  The `state` parameter is passed through the authorization flow.  This is essential for preventing CSRF attacks at the client application.  The example doesn't *use* the state parameter at the client, but it's there.  The client MUST validate the state parameter received in the redirect URI.
* **User Registration:** Added user registration and a simple route for creating clients.
* **Database Choice:** The default database is now SQLite (`sqlite:///oauth.db`), which is great for development.  However, for production, you *must* use a more robust database (PostgreSQL, MySQL).  The example code is set up to use an environment variable `DATABASE_URL`, which allows you to easily configure a different database.
* **Secrets:** The code uses `secrets.token_hex()` to generate cryptographically secure client IDs, client secrets, authorization codes, and access tokens.  This is far better than using `random`.  **Important:**  Set the `SECRET_KEY` environment variable in your system. Never hardcode it!
* **Clearer Code Structure:**  The code is organized into functions and classes, making it more readable and maintainable.
* **Example resource route:** Added an example resource route `api/resource` that demonstrates how to validate a bearer token.
* **Templates:** I've added some example templates that demonstrate how the routes would function.

**Example Templates (create a `templates` folder and put these files inside):**

**templates/index.html:**

```html
<!DOCTYPE html>
<html>
<head>
    <title>OAuth Provider</title>
</head>
<body>
    <h1>Welcome to the OAuth Provider!</h1>

    {% if user %}
        <p>Logged in as: {{ user.username }} <a href="/logout">Logout</a></p>
        <h2>Create Client</h2>
        <p><a href="/client/create">Create a new client</a></p>

        <h2>Existing Clients</h2>
        <ul>
            {% for client in clients %}
                <li>{{ client.name }} (Client ID: {{ client.client_id }}) - Redirect URI: {{ client.redirect_uri }}</li>
            {% endfor %}
        </ul>
    {% else %}
        <p>Please <a href="/login">login</a> or <a href="/register">register</a>.</p>
    {% endif %}
</body>
</html>
```

**templates/login.html:**

```html
<!DOCTYPE html>
<html>
<head>
    <title>Login</title>
</head>
<body>
    <h1>Login</h1>
    {% if error %}
        <p style="color: red;">{{ error }}</p>
    {% endif %}
    <form method="post">
        <label for="username">Username:</label><br>
        <input type="text" id="username" name="username"><br><br>
        <label for="password">Password:</label><br>
        <input type="password" id="password" name="password"><br><br>
        <input type="submit" value="Login">
    </form>
    <p>Don't have an account? <a href="/register">Register</a></p>
</body>
</html>
```

**templates/register.html:**

```html
<!DOCTYPE html>
<html>
<head>
    <title>Register</title>
</head>
<body>
    <h1>Register</h1>
    {% if error %}
        <p style="color: red;">{{ error }}</p>
    {% endif %}
    <form method="post">
        <label for="username">Username:</label><br>
        <input type="text" id="username" name="username"><br><br>
        <label for="password">Password:</label><br>
        <input type="password" id="password" name="password"><br><br>
        <input type="submit" value="Register">
    </form>
    <p>Already have an account? <a href="/login">Login</a></p>
</body>
</html>
```

**templates/authorize.html:**

```html
<!DOCTYPE html>
<html>
<head>
    <title>Authorize Application</title>
</head>
<body>
    <h1>Authorize Application</h1>
    <p>The application <strong>{{ client.name }}</strong> wants to access your account.</p>
    <form method="post">
        <p>Do you authorize this application?</p>
        <button type="submit" name="accept" value="yes">Yes</button>
        <button type="submit" name="accept" value="no">No</button>
    </form>
</body>
</html>
```

**templates/create_client.html:**

```html
<!DOCTYPE html>
<html>
<head>
    <title>Create Client</title>
</head>
<body>
    <h1>Create Client</h1>
    <form method="post">
        <label for="name">Name:</label><br>
        <input type="text" id="name" name="name"><br><br>
        <label for="redirect_uri">Redirect URI:</label><br>
        <input type="text" id="redirect_uri" name="redirect_uri"><br><br>
        <input type="submit" value="Create">
    </form>
</body>
</html>
```

**How to Run:**

1.  **Install Libraries:**

    ```bash
    pip install Flask Flask-SQLAlchemy Werkzeug
    ```

2.  **Set the Secret Key:**  Set the `SECRET_KEY` environment variable.  For example, on Linux/macOS:

    ```bash
    export SECRET_KEY="your_very_secret_key" # Replace with a strong, random key
    ```

    On Windows:

    ```bash
    set SECRET_KEY=your_very_secret_key
    ```

3.  **Run the Application:**

    ```bash
    python your_script_name.py
    ```

4. **Access the application** in your browser (usually at `http://127.0.0.1:5000/` or `http://localhost:5000/`).

**Important Security Considerations:**

*   **HTTPS:**  *Never* run this code in production without HTTPS.
*   **Secret Key:**  Use a strong, randomly generated `SECRET_KEY` for your Flask application. Do *not* use the example key.
*   **Client Secrets:**  In a real-world application, you would *never* store client secrets in plain text. Use proper hashing techniques (bcrypt, scrypt). Consider using a key management system for even better security.
*   **Database Security:**  Protect your database with strong passwords and appropriate access controls.
*   **Input Validation:**  Sanitize and validate all user input to prevent injection attacks.
*   **CSRF Protection:**  Implement CSRF protection (e.g., using Flask-WTF).
*   **Rate Limiting:**  Implement rate limiting to prevent abuse.
*   **Regular Audits:**  Regularly audit your code for security vulnerabilities.
*   **Production Server:** Use a production-ready WSGI server like Gunicorn or uWSGI.  Flask's built-in development server is *not* suitable for production.

This revised version provides a much more secure, complete, and understandable foundation for building your own OAuth 2.0 provider. Remember to address the security considerations before deploying this code to a production environment.  Good luck!
👁️ Viewed: 6

Comments