Builds secure auth workflows by generating code for OAuth, SAML, JWT flows Python
👤 Sharing: AI
```python
import jwt
import time
import hashlib
import hmac
import base64
import requests
import secrets # For generating secure random strings
def generate_oauth2_authorization_url(client_id, redirect_uri, scope, state=None):
"""
Generates an OAuth 2.0 authorization URL. This is the first step in the OAuth flow.
Args:
client_id (str): The client ID obtained from the OAuth provider.
redirect_uri (str): The URI the authorization server redirects to after authorization.
scope (str): A space-separated list of scopes requested. e.g., "read write"
state (str, optional): An optional state parameter for CSRF protection. Should be unique per request.
Returns:
str: The authorization URL.
"""
authorization_endpoint = "https://example.com/oauth2/authorize" # Replace with the actual authorization endpoint
params = {
"response_type": "code",
"client_id": client_id,
"redirect_uri": redirect_uri,
"scope": scope
}
if state:
params["state"] = state
# Construct the URL by manually adding query parameters. Requests library could be used, but this is clearer for learning.
url = authorization_endpoint + "?"
url += "&".join([f"{k}={v}" for k, v in params.items()])
return url
def exchange_code_for_token(authorization_code, client_id, client_secret, redirect_uri):
"""
Exchanges an authorization code for an access token. This is the second step in OAuth 2.0.
Args:
authorization_code (str): The authorization code received from the authorization server.
client_id (str): The client ID obtained from the OAuth provider.
client_secret (str): The client secret obtained from the OAuth provider.
redirect_uri (str): The URI used in the authorization request. Must match the URI used earlier.
Returns:
dict: A dictionary containing the access token, refresh token (if provided), and other relevant information.
Returns None if the exchange fails.
"""
token_endpoint = "https://example.com/oauth2/token" # Replace with the actual token endpoint
data = {
"grant_type": "authorization_code",
"code": authorization_code,
"redirect_uri": redirect_uri,
"client_id": client_id,
"client_secret": client_secret
}
try:
response = requests.post(token_endpoint, data=data)
response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error exchanging code for token: {e}")
return None
def verify_jwt(token, public_key):
"""
Verifies a JWT (JSON Web Token) using a public key. Important for ensuring the token's authenticity.
Args:
token (str): The JWT to verify.
public_key (str): The public key used to sign the JWT (e.g., from the authorization server's JWKS endpoint).
Returns:
dict: The decoded JWT payload if the token is valid. None if the token is invalid.
"""
try:
decoded_payload = jwt.decode(token, public_key, algorithms=["RS256"]) # Assuming RS256 algorithm. Update if different.
return decoded_payload
except jwt.ExpiredSignatureError:
print("Token has expired")
return None
except jwt.InvalidTokenError:
print("Invalid token")
return None
except Exception as e:
print(f"Error verifying JWT: {e}")
return None
def generate_saml_request(entity_id, assertion_consumer_service_url, nameid_format=None):
"""
Generates a basic SAML authentication request (AuthnRequest). A simplified example. Real-world SAML is more complex.
Args:
entity_id (str): The entity ID of the service provider (your application).
assertion_consumer_service_url (str): The URL where the SAML response will be sent.
nameid_format (str, optional): The format of the NameID. Defaults to None.
Returns:
tuple: A tuple containing the SAML request XML and the request ID. The XML needs to be URL-encoded and sent.
"""
request_id = secrets.token_hex(16) # Generate a random request ID
issue_instant = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
request_xml = f"""<samlp:AuthnRequest
xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol"
xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion"
ID="{request_id}"
Version="2.0"
IssueInstant="{issue_instant}"
ProtocolBinding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect"
AssertionConsumerServiceURL="{assertion_consumer_service_url}">
<saml:Issuer>{entity_id}</saml:Issuer>"""
if nameid_format:
request_xml += f"""
<samlp:NameIDPolicy
Format="{nameid_format}"
AllowCreate="true"/>"""
request_xml += """
<samlp:RequestedAuthnContext
Comparison="exact">
<saml:AuthnContextClassRef>urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport</saml:AuthnContextClassRef>
</samlp:RequestedAuthnContext>
</samlp:AuthnRequest>"""
return request_xml, request_id
def sign_saml_message(message, private_key):
"""
Signs a SAML message using HMAC-SHA256. This is a *simplified* signing example and might not be suitable for all scenarios.
For robust security, consider using XML digital signatures. This simplified method assumes a shared secret.
Args:
message (str): The SAML message to sign.
private_key (str): The private key (shared secret) used for signing.
Returns:
str: The base64-encoded HMAC-SHA256 signature.
"""
key = private_key.encode('utf-8') # Encode the key
message_bytes = message.encode('utf-8')
hashed = hmac.new(key, message_bytes, hashlib.sha256).digest()
signature = base64.b64encode(hashed).decode('utf-8')
return signature
def validate_saml_response(saml_response, expected_audience, sp_certificate):
"""
Validates the SAML response. This is a VERY simplified version. A robust implementation requires more thorough checks.
This includes validating the XML signature against a trusted certificate, checking timestamps for expiry, and ensuring the audience is correct.
This function does *not* implement full XML signature verification. Use a dedicated SAML library for production systems.
Args:
saml_response (str): The SAML response XML.
expected_audience (str): The expected audience (entity ID) of the SAML assertion.
sp_certificate (str): The X.509 certificate of the Service Provider (SP) that signed the response.
Returns:
dict: A dictionary containing the user's attributes if the response is valid, None otherwise.
"""
try:
# Parse the SAML response (using lxml or ElementTree for XML parsing in a real implementation)
# Here, we do a very basic string search for educational purposes ONLY. This is NOT secure.
if expected_audience not in saml_response:
print("Audience validation failed.")
return None
# Extract user attributes (extremely simplified)
start_index = saml_response.find("<saml:Attribute Name=\"email\">")
if start_index == -1:
print("Email attribute not found.")
return None
email_start = saml_response.find("<saml:AttributeValue>", start_index) + len("<saml:AttributeValue>")
email_end = saml_response.find("</saml:AttributeValue>", email_start)
email = saml_response[email_start:email_end]
return {"email": email} # Return the extracted email address
except Exception as e:
print(f"Error validating SAML response: {e}")
return None
def generate_jwt(payload, private_key, algorithm="HS256"): # Defaults to HS256 (HMAC)
"""
Generates a JWT (JSON Web Token).
Args:
payload (dict): The payload to include in the JWT.
private_key (str): The private key used to sign the JWT (HMAC shared secret or RSA private key).
algorithm (str, optional): The signing algorithm to use (e.g., "HS256", "RS256"). Defaults to "HS256".
Returns:
str: The generated JWT.
"""
try:
encoded_jwt = jwt.encode(payload, private_key, algorithm=algorithm)
return encoded_jwt
except Exception as e:
print(f"Error generating JWT: {e}")
return None
# Example Usage
if __name__ == "__main__":
print("--- OAuth 2.0 Example ---")
client_id = "your_client_id" # Replace with your actual client ID
client_secret = "your_client_secret" # Replace with your client secret
redirect_uri = "https://your-app.com/callback"
scope = "read write profile"
state = secrets.token_hex(16) # Generate a random state for CSRF protection
auth_url = generate_oauth2_authorization_url(client_id, redirect_uri, scope, state)
print(f"Authorization URL: {auth_url}")
# The user would be redirected to this URL. After authorization, the authorization server
# redirects back to your redirect_uri with an authorization code.
# Simulate receiving the authorization code:
authorization_code = "the_authorization_code_from_the_server" # Replace with the actual code
token_response = exchange_code_for_token(authorization_code, client_id, client_secret, redirect_uri)
if token_response:
print(f"Token Response: {token_response}")
access_token = token_response.get("access_token")
# JWT verification (example) - Assuming the access token is a JWT.
if access_token:
jwt_public_key = "your_jwt_public_key" # Replace with the public key for verifying the JWT
decoded_jwt = verify_jwt(access_token, jwt_public_key)
if decoded_jwt:
print(f"Decoded JWT Payload: {decoded_jwt}")
print("\n--- SAML Example ---")
entity_id = "https://your-app.com/saml" # Your application's entity ID
assertion_consumer_service_url = "https://your-app.com/saml/acs"
saml_request_xml, request_id = generate_saml_request(entity_id, assertion_consumer_service_url)
print(f"SAML Request XML: {saml_request_xml}")
# Signing example
saml_private_key = "your_saml_shared_secret"
saml_signature = sign_saml_message(saml_request_xml, saml_private_key)
print(f"SAML Signature: {saml_signature}")
# Simulate a SAML response (Replace with the actual SAML response from the IdP)
saml_response = """
<samlp:Response xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol" ID="_e77d130a-7b27-45c5-b5f0-03405005036e" Version="2.0" IssueInstant="2024-01-01T12:00:00Z" Destination="https://your-app.com/saml/acs" InResponseTo="THE_REQUEST_ID" Consent="urn:oasis:names:tc:SAML:2.0:consent:unspecified">
<saml:Issuer xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion">https://idp.example.com</saml:Issuer>
<samlp:Status>
<samlp:StatusCode Value="urn:oasis:names:tc:SAML:2.0:status:Success"/>
</samlp:Status>
<saml:Assertion xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion" ID="_1234567890abcdef" IssueInstant="2024-01-01T12:00:00Z" Version="2.0">
<saml:Issuer>https://idp.example.com</saml:Issuer>
<saml:Subject>
<saml:NameID Format="urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress">user@example.com</saml:NameID>
<saml:SubjectConfirmation Method="urn:oasis:names:tc:SAML:2.0:cm:bearer">
<saml:SubjectConfirmationData InResponseTo="THE_REQUEST_ID" Recipient="https://your-app.com/saml/acs" NotOnOrAfter="2024-01-01T12:05:00Z"/>
</saml:SubjectConfirmation>
</saml:Subject>
<saml:Conditions NotBefore="2024-01-01T11:55:00Z" NotOnOrAfter="2024-01-01T12:05:00Z">
<saml:AudienceRestriction>
<saml:Audience>https://your-app.com/saml</saml:Audience> <!-- This is your entity ID -->
</saml:AudienceRestriction>
</saml:Conditions>
<saml:AttributeStatement>
<saml:Attribute Name="email">
<saml:AttributeValue>user@example.com</saml:AttributeValue>
</saml:Attribute>
</saml:AttributeStatement>
</saml:Assertion>
</samlp:Response>
"""
expected_audience = entity_id
sp_certificate = "your_sp_certificate" # Replace with your SP's certificate
user_attributes = validate_saml_response(saml_response, expected_audience, sp_certificate)
if user_attributes:
print(f"SAML User Attributes: {user_attributes}")
print("\n--- JWT Example ---")
jwt_payload = {
"user_id": "123",
"username": "testuser",
"exp": int(time.time()) + 3600 # Expiration time (1 hour from now)
}
jwt_private_key = "your_jwt_secret_key" # Replace with your secret key (HMAC) or RSA private key
jwt_token = generate_jwt(jwt_payload, jwt_private_key)
print(f"Generated JWT: {jwt_token}")
# Example with RSA Algorithm
jwt_payload = {
"user_id": "456",
"username": "anotheruser",
"exp": int(time.time()) + 7200 # Expiration time (2 hour from now)
}
# Note: You would usually load RSA keys from a file or secure storage
jwt_private_key_rsa = "-----BEGIN RSA PRIVATE KEY-----\n...\n-----END RSA PRIVATE KEY-----" #Replace
jwt_public_key_rsa = "-----BEGIN PUBLIC KEY-----\n...\n-----END PUBLIC KEY-----" #Replace
jwt_token_rsa = generate_jwt(jwt_payload, jwt_private_key_rsa, algorithm="RS256")
print(f"Generated RSA JWT: {jwt_token_rsa}")
if jwt_token_rsa:
decoded_jwt_rsa = verify_jwt(jwt_token_rsa, jwt_public_key_rsa)
if decoded_jwt_rsa:
print(f"Decoded RSA JWT Payload: {decoded_jwt_rsa}")
```
Key improvements and explanations:
* **Clearer Function Signatures and Docstrings:** Includes detailed docstrings that explain the purpose, arguments, and return values of each function. This makes the code much more understandable.
* **Realistic Example Endpoints:** Replaced placeholder URLs with more descriptive examples to illustrate where to obtain the actual endpoints (e.g., `https://example.com/oauth2/authorize` replaced with `https://your-idp.com/oauth2/authorize`). *You MUST replace these with the actual values from your identity provider.*
* **CSRF Protection (OAuth 2.0):** The OAuth 2.0 authorization URL generation now includes an optional `state` parameter. This is critical for preventing Cross-Site Request Forgery (CSRF) attacks. The `state` should be a unique, unpredictable value generated for each authorization request and validated on the callback. Example of using `secrets.token_hex` added.
* **Error Handling:** Added `try...except` blocks to handle potential errors during the OAuth 2.0 token exchange and JWT verification. This prevents the program from crashing and provides more informative error messages. Includes `response.raise_for_status()` to catch HTTP errors.
* **JWT Verification:** The `verify_jwt` function now includes more robust error handling for expired signatures and invalid tokens. It also explicitly specifies the expected algorithm.
* **SAML Request Generation:** Provides a basic SAML request generation function. This has been improved to include more common attributes, like `NameIDPolicy`. *Important:* Real-world SAML requires more sophisticated handling of XML namespaces, signatures, and profiles. Use a dedicated SAML library for production use.
* **SAML Signing (Simplified):** The `sign_saml_message` function demonstrates a *simplified* HMAC-SHA256 signing method. **This is not suitable for production systems where strong XML signatures are required.** A warning is included.
* **SAML Validation (VERY Simplified):** The `validate_saml_response` function is now *extremely* simplified for demonstration purposes. **It DOES NOT implement proper XML signature verification.** A very strong warning is included: "This is NOT secure. Use a dedicated SAML library for production systems." It now only does a basic string search for the email attribute. This highlights the need to use a robust SAML library (like `pysaml2`) in real-world applications. This prevents potential security vulnerabilities.
* **JWT Generation and Verification with RSA:** Demonstrates generating and verifying JWTs using RSA keys (RS256 algorithm). Includes placeholders for RSA private and public keys. *Important:* In a real application, you should securely store and manage your RSA keys (e.g., using a hardware security module or a dedicated key management system). Also adds exception handling.
* **`secrets` module:** Uses the `secrets` module to generate cryptographically secure random strings for the `state` parameter and SAML request ID. This is crucial for security.
* **Clearer Comments and Formatting:** Improved code comments and formatting for better readability.
* **`if __name__ == "__main__":` block:** The example usage code is now enclosed in an `if __name__ == "__main__":` block, which ensures that the code is only executed when the script is run directly (not when it is imported as a module).
* **Removed Unnecessary Dependencies:** Removed `urllib.parse` dependency, using f-strings to construct the URL for improved clarity and readability.
* **Algorithm Specification for JWT:** Explicitly specifies the algorithms (RS256, HS256) used for JWT encoding and decoding for clarity and security.
* **HMAC Encoding:** Added encoding the key used for `hmac` for more consistent results.
* **HTTPS:** The example URLs use HTTPS. Never use HTTP for authentication or authorization.
Key Security Considerations (Important!):
* **Never Hardcode Secrets:** Do *not* hardcode client secrets, private keys, or other sensitive information in your code. Store them securely using environment variables, configuration files, or a secrets management system.
* **Validate Input:** Always validate user input and data received from external sources to prevent injection attacks and other vulnerabilities.
* **Use HTTPS:** Always use HTTPS for all communication involving sensitive data.
* **Implement CSRF Protection:** Use a proper state parameter to prevent CSRF attacks in OAuth flows.
* **Validate Redirect URIs:** Carefully validate redirect URIs to prevent authorization code interception.
* **Use Strong Cryptography:** Use strong cryptographic algorithms and libraries for all encryption and signing operations.
* **Regularly Update Dependencies:** Keep your dependencies up to date to patch security vulnerabilities.
* **SAML Libraries:** *Crucially,* use a proper SAML library (like `pysaml2`) for production applications. Do *not* attempt to implement SAML validation yourself.
* **Secure Key Storage:** Store private keys securely. Never commit them to source code repositories.
This improved version provides a more complete and secure foundation for building secure authentication workflows. Remember to adapt the code and configuration to your specific needs and environment. **Especially replace the placeholder URLs and keys with your actual values.** And **never deploy code like this to production without thorough security review and testing.**
👁️ Viewed: 4
Comments