"""
Basic OAuth 2.0 Authorization Code Flow with MileIQ (Demo Only)
- Demonstrates authorization, token exchange, refresh, and an authenticated API call.
- Replace placeholders and in-memory storage before production use.
"""
import logging
import os
import uuid
from urllib.parse import urlencode
import requests
from flask import Flask, request, redirect, jsonify, make_response, abort
logger = logging.getLogger(__name__)
app = Flask(__name__)
# ----- Configuration (use environment variables in production) -----
CLIENT_ID = os.getenv("MILEIQ_CLIENT_ID", "your-client-id")
CLIENT_SECRET = os.getenv("MILEIQ_CLIENT_SECRET", "your-client-secret") # Keep secret!
REDIRECT_URI = os.getenv("MILEIQ_REDIRECT_URI", "http://localhost:3011/callback")
# OAuth / API endpoints (replace with your actual hosts if different)
OAUTH_SERVER_URL = os.getenv("MILEIQ_OAUTH_URL", "https://oauth2.mileiq.com")
AUTH_ENDPOINT = f"{OAUTH_SERVER_URL}/oauth2/auth"
TOKEN_ENDPOINT = f"{OAUTH_SERVER_URL}/oauth2/token"
API_SERVER_URL = os.getenv("MILEIQ_API_URL", "https://external-api.mileiq.com")
USER_ENDPOINT = f"{API_SERVER_URL}/v1/users/me"
DRIVES_ENDPOINT = f"{API_SERVER_URL}/v1/users/{{user_id}}/drives"
# Requested scopes (limit to what you need)
SCOPES = "drives:read:all users:read"
# Demo-only in-memory store; replace for production
_sessions = {}
HTTP_TIMEOUT = 10 # seconds
@app.route("/")
def home():
"""Simple home page explaining the OAuth flow"""
return """
<h1>MileIQ OAuth Authorization Code Flow (Demo)</h1>
<ol>
<li>Start the flow and sign in</li>
<li>Get redirected back with an authorization code</li>
<li>Exchange the code for tokens</li>
<li>Call protected APIs</li>
</ol>
<a href="/authorize">Start Authorization</a>
"""
@app.route("/authorize")
def authorize():
"""Step 1: Start OAuth Authorization Flow"""
# Create CSRF state and a demo session id
state = str(uuid.uuid4())
session_id = str(uuid.uuid4())
_sessions[session_id] = {"state": state}
# Build authorization URL
params = {
"response_type": "code",
"client_id": CLIENT_ID,
"redirect_uri": REDIRECT_URI,
"scope": SCOPES,
"state": state,
}
auth_url = f"{AUTH_ENDPOINT}?{urlencode(params)}"
# Set a session cookie and redirect to the OAuth server
resp = make_response(redirect(auth_url, code=302))
# Secure attributes recommended in production: Secure, HttpOnly, SameSite
resp.set_cookie("session_id", session_id, httponly=True, samesite="Lax")
return resp
@app.route("/callback")
def callback():
"""Step 2: Handle Authorization Callback"""
code = request.args.get("code")
state = request.args.get("state")
error = request.args.get("error")
session_id = request.cookies.get("session_id")
sess = _sessions.get(session_id) if session_id else None
if not sess or state != sess.get("state"):
abort(400, description="Invalid state parameter.")
if error:
abort(400, description=f"Authorization error: {error}")
if not code:
abort(400, description="Authorization code not received.")
# Exchange code for tokens
data = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": REDIRECT_URI,
}
try:
token_resp = requests.post(
TOKEN_ENDPOINT,
data=data,
auth=(CLIENT_ID, CLIENT_SECRET),
headers={"Content-Type": "application/x-www-form-urlencoded"},
timeout=HTTP_TIMEOUT,
)
token_resp.raise_for_status()
token_info = token_resp.json()
except requests.RequestException as exc:
app.logger.exception("Token exchange failed")
abort(502, description=f"Failed to exchange code for token: {exc}")
# Persist tokens in session (demo only)
sess["access_token"] = token_info.get("access_token")
sess["refresh_token"] = token_info.get("refresh_token")
sess["token_type"] = token_info.get("token_type", "Bearer")
sess["expires_in"] = token_info.get("expires_in")
return """
<h1>Authorization Successful</h1>
<p>Tokens received.</p>
<a href="/user-info">Get User Info</a> |
<a href="/drives">Get User Drives</a>
"""
def _require_session():
session_id = request.cookies.get("session_id")
sess = _sessions.get(session_id) if session_id else None
if not sess or "access_token" not in sess:
return None
return sess
def _bearer_headers(token: str) -> dict:
return {"Authorization": f"Bearer {token}", "Accept": "application/json"}
def _refresh_if_needed(sess) -> None:
"""
Naive refresh helper: if a protected call fails with 401 and a refresh token exists,
attempt a single refresh and update the session.
"""
refresh_token = sess.get("refresh_token")
if not refresh_token:
return
try:
resp = requests.post(
TOKEN_ENDPOINT,
data={"grant_type": "refresh_token", "refresh_token": refresh_token},
auth=(CLIENT_ID, CLIENT_SECRET),
headers={"Content-Type": "application/x-www-form-urlencoded"},
timeout=HTTP_TIMEOUT,
)
resp.raise_for_status()
token_info = resp.json()
sess["access_token"] = token_info.get("access_token", sess.get("access_token"))
# Some providers rotate refresh tokens
if "refresh_token" in token_info:
sess["refresh_token"] = token_info["refresh_token"]
except requests.RequestException:
# Leave existing tokens as-is; caller may handle failure
app.logger.warning("Token refresh attempt failed.")
def get_user_info(sess) -> dict:
"""
Get user info
"""
r = requests.get(
USER_ENDPOINT,
headers=_bearer_headers(sess["access_token"]),
timeout=HTTP_TIMEOUT,
)
if r.status_code == 401:
_refresh_if_needed(sess)
r = requests.get(
USER_ENDPOINT,
headers=_bearer_headers(sess["access_token"]),
timeout=HTTP_TIMEOUT,
)
r.raise_for_status()
return r.json()
@app.route("/user-info")
def user_info():
"""Step 3: Use Access Token to Get User Info"""
sess = _require_session()
if not sess:
return redirect("/authorize")
try:
return jsonify(get_user_info(sess))
except requests.RequestException as exc:
app.logger.exception("Fetching user info failed")
abort(502, description=f"Failed to fetch user info: {exc}")
@app.route("/drives")
def drives():
"""Step 4: Use Access Token to Get User Drives"""
sess = _require_session()
if not sess:
return redirect("/authorize")
try:
user_info = get_user_info(sess)
user_id = user_info["id"]
r = requests.get(
DRIVES_ENDPOINT.format(user_id=user_id),
headers=_bearer_headers(sess["access_token"]),
timeout=HTTP_TIMEOUT,
)
if r.status_code == 401:
_refresh_if_needed(sess)
r = requests.get(
DRIVES_ENDPOINT.format(user_id=user_id),
headers=_bearer_headers(sess["access_token"]),
timeout=HTTP_TIMEOUT,
)
r.raise_for_status()
return jsonify(r.json())
except requests.RequestException as exc:
app.logger.exception("Fetching drives failed")
abort(502, description=f"Failed to fetch drives: {exc}")
if __name__ == "__main__":
# Do not enable debug in production; bind to localhost or a proper server (gunicorn/uwsgi) behind TLS.
app.run(host="0.0.0.0", port=3011, debug=True)