Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

214 changes: 214 additions & 0 deletions insecure-api/main-2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
from fastapi import FastAPI, HTTPException, Header, Request
from typing import Optional
from models import VideoGame, User
from database import video_games, users
import sqlite3
import os
import requests
from fastapi.responses import RedirectResponse

app = FastAPI(
title="Intentionally Insecure Video Game API",
description="An API designed for security education, demonstrating common vulnerabilities.",
version="1.0.0",
contact={
"name": "Your Name",
"email": "[email protected]",
},
)

# Initialize the SQLite database
def init_db():
if not os.path.exists('videogames.db'):
conn = sqlite3.connect('videogames.db')
cursor = conn.cursor()
# Create table
cursor.execute('''
CREATE TABLE video_games (
id INTEGER PRIMARY KEY,
title TEXT NOT NULL,
developer TEXT NOT NULL,
publisher TEXT NOT NULL,
year_published INTEGER NOT NULL,
sales INTEGER NOT NULL
)
''')
# Insert data
for game in video_games:
cursor.execute('''
INSERT INTO video_games (id, title, developer, publisher, year_published, sales)
VALUES (?, ?, ?, ?, ?, ?)
''', (game.id, game.title, game.developer, game.publisher, game.year_published, game.sales))
conn.commit()
conn.close()

# Call the init_db function when the app starts
@app.on_event("startup")
def startup_event():
init_db()

# Public endpoint to get basic video game info
@app.get("/games")
def get_games():
return video_games

# Vulnerable endpoint: No authentication required to get sensitive sales data
@app.get("/games/{game_id}/sales")
def get_game_sales(game_id: int):
# Vulnerability: No authentication or authorization checks (API1:2019 - Broken Object Level Authorization)
for game in video_games:
if game.id == game_id:
return {"title": game.title, "sales": game.sales}
raise HTTPException(status_code=404, detail="Game not found")

# Vulnerable endpoint: Weak authentication and improper authorization
@app.post("/games")
def add_game(game: VideoGame, Authorization: Optional[str] = Header(None)):
# Vulnerability: Token sent in Authorization header without proper validation (API2:2019 - Broken Authentication)
if not Authorization:
raise HTTPException(status_code=401, detail="Authorization header required")

# Extract Bearer token
if not Authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Invalid Authorization header format")
token = Authorization.split(" ")[1]

# Vulnerability: Insecure token handling and authorization (API5:2019 - Broken Function Level Authorization)
for user in users:
if user.token == token:
if user.is_admin:
video_games.append(game)
return {"message": "Game added"}
else:
raise HTTPException(status_code=403, detail="Not authorized")
raise HTTPException(status_code=401, detail="Invalid token")

# Vulnerable endpoint: Exposes sensitive user data
@app.get("/users")
def get_users():
# Vulnerability: Exposes tokens and admin status (API3:2019 - Excessive Data Exposure)
return users

# Additional vulnerable endpoint: No rate limiting implemented
@app.post("/login")
def login(username: str):
# Vulnerability: No rate limiting allows brute-force attacks (API4:2019 - Lack of Resources & Rate Limiting)
for user in users:
if user.username == username:
return {"token": user.token}
raise HTTPException(status_code=404, detail="User not found")

# Additional vulnerable endpoint: Mass assignment
@app.put("/games/{game_id}")
def update_game(game_id: int, updated_game: VideoGame):
# Vulnerability: Mass assignment allows overwriting of unintended fields (API6:2019 - Mass Assignment)
for i, game in enumerate(video_games):
if game.id == game_id:
video_games[i] = updated_game
return {"message": "Game updated"}
raise HTTPException(status_code=404, detail="Game not found")

# Additional vulnerable endpoint: SQL Injection
@app.get("/search")
def search_games(query: str):
# Vulnerability: User input is not sanitized (API8:2019 - Injection)
conn = sqlite3.connect('videogames.db')
cursor = conn.cursor()
try:
sql_query = f"SELECT * FROM video_games WHERE title = '{query}'"
cursor.execute(sql_query)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semgrep identified an issue in your code:

Detected possible formatted SQL query. Use parameterized queries instead.

To resolve this comment:

🔧 No guidance has been designated for this issue. Fix according to your organization's approved methods.

💬 Ignore this finding

Reply with Semgrep commands to ignore this finding.

  • /fp <comment> for false positive
  • /ar <comment> for acceptable risk
  • /other <comment> for all other reasons

Alternatively, triage in Semgrep AppSec Platform to ignore the finding created by formatted-sql-query.

You can view more details about this finding in the Semgrep AppSec Platform.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semgrep identified an issue in your code:

Untrusted input might be used to build a database query, which can lead to a SQL injection vulnerability. An attacker can execute malicious SQL statements and gain unauthorized access to sensitive data, modify, delete data, or execute arbitrary system commands. To prevent this vulnerability, use prepared statements that do not concatenate user-controllable strings and use parameterized queries where SQL commands and user data are strictly separated. Also, consider using an object-relational (ORM) framework to operate with safer abstractions.

Dataflow graph
flowchart LR
    classDef invis fill:white, stroke: none
    classDef default fill:#e7f5ff, color:#1c7fd6, stroke: none

    subgraph File0["<b>insecure-api/main-2.py</b>"]
        direction LR
        %% Source

        subgraph Source
            direction LR

            v0["<a href=https://github.com/latiotech/insecure-kubernetes-deployments/blob/a53589ae67e1029cd632a301ec9c8900283faded/insecure-api/main-2.py#L113 target=_blank style='text-decoration:none; color:#1c7fd6'>[Line: 113] query</a>"]
        end
        %% Intermediate

        subgraph Traces0[Traces]
            direction TB

            v2["<a href=https://github.com/latiotech/insecure-kubernetes-deployments/blob/a53589ae67e1029cd632a301ec9c8900283faded/insecure-api/main-2.py#L113 target=_blank style='text-decoration:none; color:#1c7fd6'>[Line: 113] query</a>"]

            v3["<a href=https://github.com/latiotech/insecure-kubernetes-deployments/blob/a53589ae67e1029cd632a301ec9c8900283faded/insecure-api/main-2.py#L118 target=_blank style='text-decoration:none; color:#1c7fd6'>[Line: 118] sql_query</a>"]
        end
            v2 --> v3
        %% Sink

        subgraph Sink
            direction LR

            v1["<a href=https://github.com/latiotech/insecure-kubernetes-deployments/blob/a53589ae67e1029cd632a301ec9c8900283faded/insecure-api/main-2.py#L119 target=_blank style='text-decoration:none; color:#1c7fd6'>[Line: 119] cursor.execute(sql_query)</a>"]
        end
    end
    %% Class Assignment
    Source:::invis
    Sink:::invis

    Traces0:::invis
    File0:::invis

    %% Connections

    Source --> Traces0
    Traces0 --> Sink


Loading

To resolve this comment:

🔧 No guidance has been designated for this issue. Fix according to your organization's approved methods.

💬 Ignore this finding

Reply with Semgrep commands to ignore this finding.

  • /fp <comment> for false positive
  • /ar <comment> for acceptable risk
  • /other <comment> for all other reasons

Alternatively, triage in Semgrep AppSec Platform to ignore the finding created by fastapi-without-url-path-prestodb-sqli.

You can view more details about this finding in the Semgrep AppSec Platform.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semgrep identified an issue in your code:

Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

To resolve this comment:

🔧 No guidance has been designated for this issue. Fix according to your organization's approved methods.

💬 Ignore this finding

Reply with Semgrep commands to ignore this finding.

  • /fp <comment> for false positive
  • /ar <comment> for acceptable risk
  • /other <comment> for all other reasons

Alternatively, triage in Semgrep AppSec Platform to ignore the finding created by sqlalchemy-execute-raw-query.

You can view more details about this finding in the Semgrep AppSec Platform.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semgrep identified an issue in your code:

Untrusted input might be used to build a database query, which can lead to a SQL injection vulnerability. An attacker can execute malicious SQL statements and gain unauthorized access to sensitive data, modify, delete data, or execute arbitrary system commands. To prevent this vulnerability, use prepared statements that do not concatenate user-controllable strings and use parameterized queries where SQL commands and user data are strictly separated. Also, consider using an object-relational (ORM) framework to operate with safer abstractions.

Dataflow graph
flowchart LR
    classDef invis fill:white, stroke: none
    classDef default fill:#e7f5ff, color:#1c7fd6, stroke: none

    subgraph File0["<b>insecure-api/main-2.py</b>"]
        direction LR
        %% Source

        subgraph Source
            direction LR

            v0["<a href=https://github.com/latiotech/insecure-kubernetes-deployments/blob/a53589ae67e1029cd632a301ec9c8900283faded/insecure-api/main-2.py#L113 target=_blank style='text-decoration:none; color:#1c7fd6'>[Line: 113] query</a>"]
        end
        %% Intermediate

        subgraph Traces0[Traces]
            direction TB

            v2["<a href=https://github.com/latiotech/insecure-kubernetes-deployments/blob/a53589ae67e1029cd632a301ec9c8900283faded/insecure-api/main-2.py#L113 target=_blank style='text-decoration:none; color:#1c7fd6'>[Line: 113] query</a>"]

            v3["<a href=https://github.com/latiotech/insecure-kubernetes-deployments/blob/a53589ae67e1029cd632a301ec9c8900283faded/insecure-api/main-2.py#L118 target=_blank style='text-decoration:none; color:#1c7fd6'>[Line: 118] sql_query</a>"]
        end
            v2 --> v3
        %% Sink

        subgraph Sink
            direction LR

            v1["<a href=https://github.com/latiotech/insecure-kubernetes-deployments/blob/a53589ae67e1029cd632a301ec9c8900283faded/insecure-api/main-2.py#L119 target=_blank style='text-decoration:none; color:#1c7fd6'>[Line: 119] cursor.execute(sql_query)</a>"]
        end
    end
    %% Class Assignment
    Source:::invis
    Sink:::invis

    Traces0:::invis
    File0:::invis

    %% Connections

    Source --> Traces0
    Traces0 --> Sink


Loading

To resolve this comment:

🔧 No guidance has been designated for this issue. Fix according to your organization's approved methods.

💬 Ignore this finding

Reply with Semgrep commands to ignore this finding.

  • /fp <comment> for false positive
  • /ar <comment> for acceptable risk
  • /other <comment> for all other reasons

Alternatively, triage in Semgrep AppSec Platform to ignore the finding created by fastapi-prestodb-sqli.

You can view more details about this finding in the Semgrep AppSec Platform.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security control: Static Code Analysis Python Semgrep

Sqlalchemy Raw Sql Query Concatenation Risks Sql Injection

Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

Severity: HIGH

Learn more about this issue


Jit Bot commands and options (e.g., ignore issue)

You can trigger Jit actions by commenting on this PR review:

  • #jit_ignore_fp Ignore and mark this specific single instance of finding as “False Positive”
  • #jit_ignore_accept Ignore and mark this specific single instance of finding as “Accept Risk”
  • #jit_ignore_type_in_file Ignore any finding of type "SQLAlchemy raw SQL query concatenation risks SQL Injection" in insecure-api/main-2.py; future occurrences will also be ignored.
  • #jit_undo_ignore Undo ignore command

rows = cursor.fetchall()
except Exception as e:
# Return the exception message for educational purposes (not recommended in production)
return {"error": str(e)}
finally:
conn.close()
# Convert rows to list of dictionaries
results = []
for row in rows:
results.append({
"id": row[0],
"title": row[1],
"developer": row[2],
"publisher": row[3],
"year_published": row[4],
"sales": row[5],
})
return results

# Additional vulnerable endpoint: Improper assets management
@app.get("/.env")
def get_env():
# Vulnerability: Sensitive files are exposed (API9:2019 - Improper Assets Management)
return {"SECRET_KEY": "supersecretkey"}

# Additional vulnerable endpoint: Insufficient logging and monitoring
@app.post("/admin/delete_game")
def delete_game(game_id: int, Authorization: Optional[str] = Header(None)):
# Vulnerability: Actions are not logged (API10:2019 - Insufficient Logging & Monitoring)
if not Authorization:
raise HTTPException(status_code=401, detail="Authorization header required")

# Extract Bearer token
if not Authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Invalid Authorization header format")
token = Authorization.split(" ")[1]

for user in users:
if user.token == token and user.is_admin:
for i, game in enumerate(video_games):
if game.id == game_id:
deleted_game = video_games.pop(i)
# No logging of the deletion action
return {"message": f"Game '{deleted_game.title}' deleted"}
raise HTTPException(status_code=404, detail="Game not found")
raise HTTPException(status_code=403, detail="Not authorized")

@app.post("/feedback")
def submit_feedback(feedback: str):
# Vulnerability: User input is not sanitized before rendering (API7:2019 - Security Misconfiguration)
response = HTMLResponse(content=f"<html><body><h1>Feedback Received</h1><p>{feedback}</p></body></html>")
return response

# Additional vulnerable endpoint: Insecure Direct Object References (IDOR)
@app.get("/user_profile")
def get_user_profile(user_id: int):
# Vulnerability: No authorization checks (API1:2019 - Broken Object Level Authorization)
for user in users:
if user.username == f"user{user_id}":
return user
raise HTTPException(status_code=404, detail="User not found")

# Additional vulnerable endpoint: Cross-Site Request Forgery (CSRF)
@app.post("/update_profile")
def update_profile(username: str, email: str, Authorization: Optional[str] = Header(None)):
# Vulnerability: No CSRF protection (API5:2019 - Broken Function Level Authorization)
if not Authorization:
raise HTTPException(status_code=401, detail="Authorization header required")
# Extract Bearer token
if not Authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Invalid Authorization header format")
token = Authorization.split(" ")[1]
# Simulate updating user profile
for user in users:
if user.token == token:
user.username = username
user.email = email # Assuming 'email' field exists in User model
return {"message": "Profile updated"}
raise HTTPException(status_code=401, detail="Invalid token")

# Additional vulnerable endpoint: Server-Side Request Forgery (SSRF)
@app.get("/fetch_url")
def fetch_url_content(url: str):
# Vulnerability: No validation of the URL (API10:2019 - Unsafe Consumption of APIs)
try:
response = requests.get(url)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security control: Static Code Analysis Python Semgrep

Potential Ssrf With Request Data In Server-Side Requests

Data from request object is passed to a new server-side request. This could lead to a server-side request forgery (SSRF). To mitigate, ensure that schemes and hosts are validated against an allowlist, do not forward the response to the user, and ensure proper authentication and transport-layer security in the proxied request.

Severity: HIGH

Learn more about this issue


Jit Bot commands and options (e.g., ignore issue)

You can trigger Jit actions by commenting on this PR review:

  • #jit_ignore_fp Ignore and mark this specific single instance of finding as “False Positive”
  • #jit_ignore_accept Ignore and mark this specific single instance of finding as “Accept Risk”
  • #jit_ignore_type_in_file Ignore any finding of type "Potential SSRF with request data in server-side requests" in insecure-api/main-2.py; future occurrences will also be ignored.
  • #jit_undo_ignore Undo ignore command

return {"content": response.text}
except Exception as e:
return {"error": str(e)}

# Additional vulnerable endpoint: Unvalidated Redirects and Forwards
@app.get("/redirect")
def unsafe_redirect(next: str):
# Vulnerability: Unvalidated redirect (API10:2019 - Unsafe Consumption of APIs)
return RedirectResponse(url=next)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semgrep identified an issue in your code:

The application builds a URL using user-controlled input which can lead to an open redirect vulnerability. An attacker can manipulate the URL and redirect users to an arbitrary domain. Open redirect vulnerabilities can lead to issues such as Cross-site scripting (XSS) or redirecting to a malicious domain for activities such as phishing to capture users' credentials. To prevent this vulnerability perform strict input validation of the domain against an allowlist of approved domains. Notify a user in your application that they are leaving the website. Display a domain where they are redirected to the user. A user can then either accept or deny the redirect to an untrusted site.

Dataflow graph
flowchart LR
    classDef invis fill:white, stroke: none
    classDef default fill:#e7f5ff, color:#1c7fd6, stroke: none

    subgraph File0["<b>insecure-api/main-2.py</b>"]
        direction LR
        %% Source

        subgraph Source
            direction LR

            v0["<a href=https://github.com/latiotech/insecure-kubernetes-deployments/blob/a53589ae67e1029cd632a301ec9c8900283faded/insecure-api/main-2.py#L212 target=_blank style='text-decoration:none; color:#1c7fd6'>[Line: 212] next</a>"]
        end
        %% Intermediate

        subgraph Traces0[Traces]
            direction TB

            v2["<a href=https://github.com/latiotech/insecure-kubernetes-deployments/blob/a53589ae67e1029cd632a301ec9c8900283faded/insecure-api/main-2.py#L212 target=_blank style='text-decoration:none; color:#1c7fd6'>[Line: 212] next</a>"]
        end
        %% Sink

        subgraph Sink
            direction LR

            v1["<a href=https://github.com/latiotech/insecure-kubernetes-deployments/blob/a53589ae67e1029cd632a301ec9c8900283faded/insecure-api/main-2.py#L214 target=_blank style='text-decoration:none; color:#1c7fd6'>[Line: 214] next</a>"]
        end
    end
    %% Class Assignment
    Source:::invis
    Sink:::invis

    Traces0:::invis
    File0:::invis

    %% Connections

    Source --> Traces0
    Traces0 --> Sink


Loading

To resolve this comment:

🔧 No guidance has been designated for this issue. Fix according to your organization's approved methods.

💬 Ignore this finding

Reply with Semgrep commands to ignore this finding.

  • /fp <comment> for false positive
  • /ar <comment> for acceptable risk
  • /other <comment> for all other reasons

Alternatively, triage in Semgrep AppSec Platform to ignore the finding created by tainted-redirect-fastapi.

You can view more details about this finding in the Semgrep AppSec Platform.

13 changes: 12 additions & 1 deletion insecure-api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,18 @@ def startup_event():

# Public endpoint to get basic video game info
@app.get("/games")
def get_games():
def get_games(query: str):
conn = sqlite3.connect('videogames.db')
cursor = conn.cursor()
try:
sql_query = f"SELECT * FROM tiles WHERE title = '{query}'"
cursor.execute(sql_query)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security control: Static Code Analysis Python Semgrep

Sqlalchemy Raw Sql Query Concatenation Risks Sql Injection

Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

Severity: HIGH

Learn more about this issue


Jit Bot commands and options (e.g., ignore issue)

You can trigger Jit actions by commenting on this PR review:

  • #jit_ignore_fp Ignore and mark this specific single instance of finding as “False Positive”
  • #jit_ignore_accept Ignore and mark this specific single instance of finding as “Accept Risk”
  • #jit_ignore_type_in_file Ignore any finding of type "SQLAlchemy raw SQL query concatenation risks SQL Injection" in insecure-api/main.py; future occurrences will also be ignored.
  • #jit_undo_ignore Undo ignore command

video_games = cursor.fetchall()
except Exception as e:
# Return the exception message for educational purposes (not recommended in production)
return {"error": str(e)}
finally:
conn.close()
return video_games

# Vulnerable endpoint: No authentication required to get sensitive sales data
Expand Down
Loading
Loading