#!/usr/bin/env python3
"""
XHS Shadow Ban Checker
Intercepts XHS creator backend API responses to read the hidden 'level' field.

Usage:
    python3 check_shadowban.py              # Check all notes (opens Chrome)
    python3 check_shadowban.py --url <xhs_note_url>  # Check specific note
"""

import asyncio
import json
import argparse
from playwright.async_api import async_playwright

CHROME_PROFILE = "/Users/bowang/Library/Application Support/Google/Chrome/Default"

LEVEL_LABELS = {
    4:    "✅ Normal — recommended",
    2:    "🟡 Basically normal",
    1:    "🟡 Basically normal",
    0:    "⚠️  Unknown / no data",
    -1:   "🔶 Mild shadow ban (limited distribution)",
    -100: "🔴 Severe shadow ban",
    -101: "🔴 Severe shadow ban",
    -102: "🔴 Severe shadow ban (IRREVERSIBLE)",
}

def interpret_level(level):
    if level is None:
        return "❓ No level data found"
    if level >= 4:
        return LEVEL_LABELS.get(4)
    if level >= 2:
        return LEVEL_LABELS.get(2)
    if level >= 0:
        return LEVEL_LABELS.get(0)
    if level >= -1:
        return LEVEL_LABELS.get(-1)
    return LEVEL_LABELS.get(-102)


async def check_notes(target_url=None):
    results = []
    intercepted = []

    async with async_playwright() as p:
        ctx = await p.chromium.launch_persistent_context(
            CHROME_PROFILE,
            channel="chrome",
            headless=False,
            args=["--no-sandbox", "--disable-blink-features=AutomationControlled"],
            ignore_default_args=["--enable-automation"],
        )

        page = ctx.pages[0] if ctx.pages else await ctx.new_page()

        # Intercept API responses containing note level info
        async def handle_response(response):
            url = response.url
            # XHS creator API endpoints that contain note level data
            if any(path in url for path in [
                "/api/sns/web/v1/creator/note/list",
                "/api/sns/web/v1/creator/note_flow",
                "/api/sns/web/v2/creator/stat/note",
                "/web_api/sns/v3/page/notes",
                "creator.xiaohongshu.com/api",
            ]):
                try:
                    body = await response.json()
                    intercepted.append({"url": url, "body": body})
                    # Parse notes from response
                    notes = extract_notes(body)
                    for note in notes:
                        results.append(note)
                        title = note.get("title", note.get("id", "Unknown"))[:40]
                        level = note.get("level")
                        status = interpret_level(level)
                        print(f"  [{level:>5}] {status}")
                        print(f"         Title: {title}")
                        print()
                except Exception as e:
                    pass  # Not JSON or no level field

        page.on("response", handle_response)

        if target_url:
            print(f"Navigating to: {target_url}")
            await page.goto(target_url, wait_until="domcontentloaded", timeout=30000)
        else:
            print("Navigating to XHS Creator Center...")
            print("→ Log in if prompted, then navigate to your note management page.")
            print("→ The checker will capture level data automatically from API responses.\n")
            await page.goto(
                "https://creator.xiaohongshu.com/creator/note",
                wait_until="domcontentloaded",
                timeout=30000
            )

        print("Waiting for API responses (30s)... scroll through your notes list.\n")
        await page.wait_for_timeout(30000)

        await ctx.close()

    if not results:
        print("\nNo level data captured.")
        if intercepted:
            print(f"Intercepted {len(intercepted)} API responses but no level fields found.")
            print("Raw URLs intercepted:")
            for i in intercepted:
                print(f"  {i['url']}")
        else:
            print("No creator API calls intercepted. Make sure you're on the note management page.")
        return

    print(f"\n{'='*60}")
    print(f"SUMMARY: {len(results)} notes checked")
    print(f"{'='*60}")

    normal    = [n for n in results if n.get("level", 0) >= 2]
    mild_ban  = [n for n in results if -2 <= n.get("level", 0) < 2]
    severe    = [n for n in results if n.get("level", 0) < -2]

    print(f"  ✅ Normal (level ≥ 2):         {len(normal)}")
    print(f"  🔶 Mild shadow ban (level -1):  {len(mild_ban)}")
    print(f"  🔴 Severe shadow ban (level < -2): {len(severe)}")

    if severe:
        print(f"\n⚠️  SEVERE SHADOW BANNED NOTES:")
        for n in severe:
            print(f"  Level {n.get('level')}: {n.get('title', n.get('id', 'Unknown'))[:60]}")


def extract_notes(body):
    """Try to extract note-level data from various XHS API response formats."""
    notes = []

    def scan(obj, depth=0):
        if depth > 8:
            return
        if isinstance(obj, dict):
            # Check if this object looks like a note with a level
            if "level" in obj and isinstance(obj["level"], (int, float)):
                note = {
                    "level": obj["level"],
                    "title": obj.get("title") or obj.get("desc") or obj.get("note_id") or obj.get("id", ""),
                    "id": obj.get("note_id") or obj.get("id", ""),
                }
                notes.append(note)
            for v in obj.values():
                scan(v, depth + 1)
        elif isinstance(obj, list):
            for item in obj:
                scan(item, depth + 1)

    scan(body)
    return notes


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="XHS Shadow Ban Checker")
    parser.add_argument("--url", help="Specific XHS note URL to check", default=None)
    args = parser.parse_args()
    asyncio.run(check_notes(target_url=args.url))
