"""Backend regression tests for My Truck (MariaDB schema).

Covers:
- Auth (login, JWT, admin block, change-password)
- Users CRUD (bureau-only, role filtering)
- Folders (CRUD, permissions, status, append, stats, dates)
- Phrases
- Manual notifications (NEW feature)
- Device tokens
"""
import os
import re
import time
import uuid

import bcrypt
import pymysql
import pytest
import requests

BASE_URL = os.environ.get("REACT_APP_BACKEND_URL", "https://fleet-care-app-2.preview.emergentagent.com").rstrip("/")
API = f"{BASE_URL}/api"

# DB direct access (to seed admin user used for the "admin role login blocked" test)
DB_CFG = dict(
    host=os.environ.get("MYSQL_HOST", "127.0.0.1"),
    port=int(os.environ.get("MYSQL_PORT", "3306")),
    user=os.environ.get("MYSQL_USER", "cariste"),
    password=os.environ.get("MYSQL_PASSWORD", "cariste_pass"),
    database=os.environ.get("MYSQL_DB", "cariste_api"),
    autocommit=True,
)

DEFAULT_BUREAU = ("admin.b", "123456")
WEBADMIN_USERNAME = "TEST_webadmin"
WEBADMIN_PASSWORD = "654321"


def db_conn():
    return pymysql.connect(**DB_CFG)


def db_exec(sql, params=()):
    with db_conn() as c:
        with c.cursor() as cur:
            cur.execute(sql, params)
            return cur.fetchall() if cur.description else None


def hash_pwd(pwd):
    return bcrypt.hashpw(pwd.encode(), bcrypt.gensalt()).decode()


# ---------- Fixtures ----------
@pytest.fixture(scope="session")
def bureau_token():
    r = requests.post(f"{API}/auth/login", json={"username": DEFAULT_BUREAU[0], "password": DEFAULT_BUREAU[1]}, timeout=15)
    assert r.status_code == 200, f"Bureau login failed: {r.status_code} {r.text}"
    data = r.json()
    return data["token"], data["user"]


@pytest.fixture(scope="session")
def bureau_headers(bureau_token):
    token, _ = bureau_token
    return {"Authorization": f"Bearer {token}"}


@pytest.fixture(scope="session", autouse=True)
def cleanup_test_data():
    """Remove TEST_-prefixed rows before & after the run."""
    def purge():
        try:
            db_exec("DELETE FROM users WHERE username LIKE 'TEST_%%'")
            db_exec("DELETE FROM phrases WHERE text LIKE 'TEST_%%'")
            db_exec("DELETE FROM folders WHERE plate LIKE 'TEST_%%'")
            db_exec("DELETE FROM manual_notifications WHERE title LIKE 'TEST_%%'")
        except Exception as e:
            print(f"cleanup error: {e}")
    purge()
    yield
    purge()


@pytest.fixture(scope="session")
def webadmin_user():
    """Insert a role='admin' user directly so we can test the login block."""
    db_exec("DELETE FROM users WHERE username=%s", (WEBADMIN_USERNAME,))
    db_exec(
        "INSERT INTO users (username, fullname, password_hash, role, is_active, must_change_password, created_at) "
        "VALUES (%s, %s, %s, 'admin', 1, 0, NOW(6))",
        (WEBADMIN_USERNAME, "Web Admin", hash_pwd(WEBADMIN_PASSWORD)),
    )
    yield WEBADMIN_USERNAME
    db_exec("DELETE FROM users WHERE username=%s", (WEBADMIN_USERNAME,))


@pytest.fixture
def cariste_user(bureau_headers):
    """Create a fresh cariste user, returns (id, username, password, token)."""
    uname = f"TEST_cariste_{uuid.uuid4().hex[:6]}"
    pwd = "111111"
    r = requests.post(f"{API}/users", headers=bureau_headers,
                      json={"username": uname, "fullname": "Test Cariste", "password": pwd, "role": "cariste"},
                      timeout=15)
    assert r.status_code == 200, r.text
    uid = r.json()["id"]
    # Login this cariste -> must_change_password true, but login still works
    lr = requests.post(f"{API}/auth/login", json={"username": uname, "password": pwd}, timeout=15)
    assert lr.status_code == 200, lr.text
    token = lr.json()["token"]
    yield {"id": uid, "username": uname, "password": pwd, "token": token, "headers": {"Authorization": f"Bearer {token}"}}
    # Cleanup
    try:
        requests.delete(f"{API}/users/{uid}", headers=bureau_headers, timeout=10)
    except Exception:
        pass


# ---------- Health ----------
def test_root_ok():
    r = requests.get(f"{API}/", timeout=10)
    assert r.status_code == 200
    assert r.json().get("status") == "ok"


# ---------- AUTH ----------
class TestAuth:
    def test_login_default_bureau(self):
        r = requests.post(f"{API}/auth/login", json={"username": "admin.b", "password": "123456"}, timeout=15)
        assert r.status_code == 200, r.text
        data = r.json()
        assert "token" in data and isinstance(data["token"], str)
        assert data["user"]["username"].lower() == "admin.b"
        assert data["user"]["role"] == "bureau"
        assert data["user"]["must_change_password"] is False

    def test_login_wrong_password(self):
        r = requests.post(f"{API}/auth/login", json={"username": "admin.b", "password": "000000"}, timeout=15)
        assert r.status_code == 401

    def test_login_password_not_6_digits_alpha(self):
        # The login itself doesn't enforce 6-digit format; wrong password -> 401
        r = requests.post(f"{API}/auth/login", json={"username": "admin.b", "password": "abcdef"}, timeout=15)
        assert r.status_code == 401

    def test_login_password_short(self):
        r = requests.post(f"{API}/auth/login", json={"username": "admin.b", "password": "12345"}, timeout=15)
        assert r.status_code == 401

    def test_login_unknown_username(self):
        r = requests.post(f"{API}/auth/login", json={"username": "TEST_nonexistent_xyz", "password": "123456"}, timeout=15)
        assert r.status_code == 401

    def test_login_admin_role_blocked(self, webadmin_user):
        r = requests.post(f"{API}/auth/login",
                          json={"username": WEBADMIN_USERNAME, "password": WEBADMIN_PASSWORD},
                          timeout=15)
        assert r.status_code in (401, 403), r.text
        # Per spec: should be 403
        assert r.status_code == 403, f"Expected 403 for admin role, got {r.status_code}"

    def test_me_with_token(self, bureau_headers):
        r = requests.get(f"{API}/auth/me", headers=bureau_headers, timeout=10)
        assert r.status_code == 200
        assert r.json()["username"].lower() == "admin.b"

    def test_me_without_token(self):
        r = requests.get(f"{API}/auth/me", timeout=10)
        assert r.status_code in (401, 403)

    def test_login_user_with_must_change_password(self, cariste_user):
        # Login again to verify the flag returned
        r = requests.post(f"{API}/auth/login",
                          json={"username": cariste_user["username"], "password": cariste_user["password"]},
                          timeout=15)
        assert r.status_code == 200
        assert r.json()["user"]["must_change_password"] is True


class TestChangePassword:
    def test_wrong_current(self, cariste_user):
        r = requests.post(f"{API}/auth/change-password",
                          headers=cariste_user["headers"],
                          json={"current_password": "999999", "new_password": "222222"},
                          timeout=15)
        assert r.status_code == 400

    def test_new_password_not_6_digits(self, cariste_user):
        r = requests.post(f"{API}/auth/change-password",
                          headers=cariste_user["headers"],
                          json={"current_password": cariste_user["password"], "new_password": "abcdef"},
                          timeout=15)
        assert r.status_code == 400

    def test_new_password_short(self, cariste_user):
        r = requests.post(f"{API}/auth/change-password",
                          headers=cariste_user["headers"],
                          json={"current_password": cariste_user["password"], "new_password": "12345"},
                          timeout=15)
        assert r.status_code == 400

    def test_change_password_success(self, cariste_user):
        new_pwd = "222222"
        r = requests.post(f"{API}/auth/change-password",
                          headers=cariste_user["headers"],
                          json={"current_password": cariste_user["password"], "new_password": new_pwd},
                          timeout=15)
        assert r.status_code == 200, r.text
        # Verify must_change_password now false
        me = requests.get(f"{API}/auth/me", headers=cariste_user["headers"], timeout=10).json()
        assert me["must_change_password"] is False
        # Verify new password works
        lr = requests.post(f"{API}/auth/login",
                           json={"username": cariste_user["username"], "password": new_pwd},
                           timeout=15)
        assert lr.status_code == 200


# ---------- USERS ----------
class TestUsers:
    def test_list_users_bureau_only_filters_admin(self, bureau_headers, webadmin_user):
        r = requests.get(f"{API}/users", headers=bureau_headers, timeout=10)
        assert r.status_code == 200
        roles = {u["role"] for u in r.json()}
        assert roles.issubset({"cariste", "bureau"})
        unames = [u["username"] for u in r.json()]
        assert WEBADMIN_USERNAME not in unames

    def test_list_users_cariste_forbidden(self, cariste_user):
        r = requests.get(f"{API}/users", headers=cariste_user["headers"], timeout=10)
        assert r.status_code == 403

    def test_create_user_bad_password(self, bureau_headers):
        r = requests.post(f"{API}/users", headers=bureau_headers,
                          json={"username": "TEST_badpwd", "fullname": "X", "password": "abc", "role": "cariste"},
                          timeout=10)
        assert r.status_code == 400

    def test_create_user_default_must_change(self, bureau_headers):
        uname = f"TEST_newu_{uuid.uuid4().hex[:6]}"
        r = requests.post(f"{API}/users", headers=bureau_headers,
                          json={"username": uname, "fullname": "U", "password": "123456", "role": "cariste"},
                          timeout=10)
        assert r.status_code == 200
        body = r.json()
        assert body["must_change_password"] is True
        assert body["role"] == "cariste"
        uid = body["id"]
        # GET to verify
        lst = requests.get(f"{API}/users", headers=bureau_headers, timeout=10).json()
        assert any(u["id"] == uid for u in lst)

    def test_create_user_duplicate_username_case_insensitive(self, bureau_headers):
        uname = f"TEST_dup_{uuid.uuid4().hex[:5]}"
        r1 = requests.post(f"{API}/users", headers=bureau_headers,
                           json={"username": uname.lower(), "fullname": "A", "password": "123456", "role": "cariste"},
                           timeout=10)
        assert r1.status_code == 200
        r2 = requests.post(f"{API}/users", headers=bureau_headers,
                           json={"username": uname.upper(), "fullname": "B", "password": "123456", "role": "cariste"},
                           timeout=10)
        assert r2.status_code == 400

    def test_patch_user_fullname_role_password(self, bureau_headers, cariste_user):
        uid = cariste_user["id"]
        r = requests.patch(f"{API}/users/{uid}", headers=bureau_headers,
                           json={"fullname": "Renamed", "password": "333333"},
                           timeout=10)
        assert r.status_code == 200
        assert r.json()["fullname"] == "Renamed"
        # Login with new password
        lr = requests.post(f"{API}/auth/login",
                           json={"username": cariste_user["username"], "password": "333333"},
                           timeout=10)
        assert lr.status_code == 200

    def test_patch_cannot_downgrade_self(self, bureau_headers, bureau_token):
        _, me = bureau_token
        r = requests.patch(f"{API}/users/{me['id']}", headers=bureau_headers,
                           json={"role": "cariste"}, timeout=10)
        assert r.status_code == 400

    def test_delete_self_refused(self, bureau_headers, bureau_token):
        _, me = bureau_token
        r = requests.delete(f"{API}/users/{me['id']}", headers=bureau_headers, timeout=10)
        assert r.status_code == 400

    def test_delete_user_ok(self, bureau_headers):
        uname = f"TEST_del_{uuid.uuid4().hex[:5]}"
        r = requests.post(f"{API}/users", headers=bureau_headers,
                          json={"username": uname, "fullname": "X", "password": "123456", "role": "cariste"},
                          timeout=10)
        uid = r.json()["id"]
        d = requests.delete(f"{API}/users/{uid}", headers=bureau_headers, timeout=10)
        assert d.status_code == 200
        # Verify gone
        rows = db_exec("SELECT id FROM users WHERE id=%s", (uid,))
        assert not rows


# ---------- FOLDERS ----------
class TestFolders:
    def test_folder_full_lifecycle(self, bureau_headers, cariste_user):
        # cariste creates folder
        payload = {"plate": "TEST_AB-123", "problem": "Pneu crevé", "photos": ["data:image/png;base64,AAA"]}
        r = requests.post(f"{API}/folders", headers=cariste_user["headers"], json=payload, timeout=15)
        assert r.status_code == 200, r.text
        f = r.json()
        assert f["cariste_id"] == cariste_user["id"]
        assert isinstance(f["cariste_id"], int)
        fid = f["id"]

        # cariste GET lists own folders
        lst = requests.get(f"{API}/folders", headers=cariste_user["headers"], timeout=10).json()
        assert any(x["id"] == fid for x in lst)

        # bureau GET sees all
        lst_b = requests.get(f"{API}/folders", headers=bureau_headers, timeout=10).json()
        assert any(x["id"] == fid for x in lst_b)

        # cariste GET own folder
        g = requests.get(f"{API}/folders/{fid}", headers=cariste_user["headers"], timeout=10)
        assert g.status_code == 200

        # append text+photo
        ap = requests.post(f"{API}/folders/{fid}/append",
                          headers=cariste_user["headers"],
                          json={"text": "more details", "photos": ["data:image/png;base64,BBB"]},
                          timeout=15)
        assert ap.status_code == 200
        assert len(ap.json()["photos"]) == 2

        # 30 photos boundary on append: try to push +30 (already 2 → +30 exceeds)
        too_many = requests.post(f"{API}/folders/{fid}/append",
                                 headers=cariste_user["headers"],
                                 json={"photos": ["data:image/png;base64,X"] * 30}, timeout=15)
        assert too_many.status_code == 400

        # bureau status take -> en_cours
        s1 = requests.patch(f"{API}/folders/{fid}/status", headers=bureau_headers,
                            json={"action": "take"}, timeout=10)
        assert s1.status_code == 200 and s1.json()["status"] == "en_cours"

        # cariste cannot change status
        s_forbid = requests.patch(f"{API}/folders/{fid}/status", headers=cariste_user["headers"],
                                  json={"action": "close"}, timeout=10)
        assert s_forbid.status_code == 403

        # close
        s2 = requests.patch(f"{API}/folders/{fid}/status", headers=bureau_headers,
                            json={"action": "close"}, timeout=10)
        assert s2.status_code == 200 and s2.json()["status"] == "cloture"

        # reopen
        s3 = requests.patch(f"{API}/folders/{fid}/status", headers=bureau_headers,
                            json={"action": "reopen"}, timeout=10)
        assert s3.status_code == 200 and s3.json()["status"] == "en_attente"

        # history has entries
        full = requests.get(f"{API}/folders/{fid}", headers=bureau_headers, timeout=10).json()
        actions = [h["action"] for h in full["history"]]
        assert "created" in actions and "take" in actions and "close" in actions and "reopen" in actions

        # cariste cannot delete
        ddel = requests.delete(f"{API}/folders/{fid}", headers=cariste_user["headers"], timeout=10)
        assert ddel.status_code == 403

        # bureau delete (cascade history & photos)
        d = requests.delete(f"{API}/folders/{fid}", headers=bureau_headers, timeout=10)
        assert d.status_code == 200
        # Verify cascade
        ph = db_exec("SELECT COUNT(*) c FROM folder_photos WHERE folder_id=%s", (fid,))
        hi = db_exec("SELECT COUNT(*) c FROM folder_history WHERE folder_id=%s", (fid,))
        assert ph[0][0] == 0 and hi[0][0] == 0

    def test_cariste_cannot_see_others_folder(self, bureau_headers, cariste_user):
        # Create a folder owned by bureau (bureau is technically allowed via /folders POST)
        r = requests.post(f"{API}/folders", headers=bureau_headers,
                         json={"plate": "TEST_OTHER", "problem": "x", "photos": []},
                         timeout=10)
        assert r.status_code == 200
        fid = r.json()["id"]
        # Cariste tries to read -> 403
        g = requests.get(f"{API}/folders/{fid}", headers=cariste_user["headers"], timeout=10)
        assert g.status_code == 403
        # cleanup
        requests.delete(f"{API}/folders/{fid}", headers=bureau_headers)

    def test_folders_too_many_photos_on_create(self, cariste_user):
        r = requests.post(f"{API}/folders", headers=cariste_user["headers"],
                          json={"plate": "TEST_XX", "problem": "p", "photos": ["a"] * 31},
                          timeout=10)
        assert r.status_code == 400

    def test_stats_and_dates(self, bureau_headers, cariste_user):
        # create one folder to ensure non-empty
        r = requests.post(f"{API}/folders", headers=cariste_user["headers"],
                          json={"plate": "TEST_STAT", "problem": "p", "photos": []}, timeout=10)
        fid = r.json()["id"]
        st = requests.get(f"{API}/folders/stats", headers=bureau_headers, timeout=10)
        assert st.status_code == 200
        data = st.json()
        for k in ("total", "waiting", "in_progress", "closed"):
            assert k in data and isinstance(data[k], int)
        # with date filter
        from datetime import datetime, timezone
        today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
        st2 = requests.get(f"{API}/folders/stats?date={today}", headers=bureau_headers, timeout=10)
        assert st2.status_code == 200
        # dates
        d = requests.get(f"{API}/folders/dates", headers=bureau_headers, timeout=10).json()
        assert today in d["dates"]
        requests.delete(f"{API}/folders/{fid}", headers=bureau_headers)


# ---------- PHRASES ----------
class TestPhrases:
    def test_phrases_flow(self, bureau_headers, cariste_user):
        # Cariste cannot create
        r = requests.post(f"{API}/phrases", headers=cariste_user["headers"],
                          json={"text": "TEST_phrase_cariste"}, timeout=10)
        assert r.status_code == 403
        # Bureau create
        r2 = requests.post(f"{API}/phrases", headers=bureau_headers,
                           json={"text": "TEST_phrase_x"}, timeout=10)
        assert r2.status_code == 200
        pid = r2.json()["id"]
        # List
        lst = requests.get(f"{API}/phrases", headers=bureau_headers, timeout=10).json()
        assert any(p["id"] == pid for p in lst)
        # Delete
        d = requests.delete(f"{API}/phrases/{pid}", headers=bureau_headers, timeout=10)
        assert d.status_code == 200


# ---------- MANUAL NOTIFICATIONS (new feature) ----------
class TestManualNotifications:
    def test_send_ok(self, bureau_headers, cariste_user):
        r = requests.post(f"{API}/notifications/send", headers=bureau_headers,
                          json={"user_ids": [cariste_user["id"]], "title": "TEST_n1", "body": "Hello"},
                          timeout=15)
        assert r.status_code == 200, r.text
        data = r.json()
        assert data.get("ok") is True
        assert data.get("recipients") >= 1
        assert isinstance(data.get("id"), int)

    def test_send_empty_user_ids(self, bureau_headers):
        r = requests.post(f"{API}/notifications/send", headers=bureau_headers,
                          json={"user_ids": [], "title": "TEST_n2", "body": "x"}, timeout=10)
        assert r.status_code == 400

    def test_send_empty_title(self, bureau_headers, cariste_user):
        r = requests.post(f"{API}/notifications/send", headers=bureau_headers,
                          json={"user_ids": [cariste_user["id"]], "title": "", "body": "x"}, timeout=10)
        assert r.status_code == 400

    def test_send_empty_body(self, bureau_headers, cariste_user):
        r = requests.post(f"{API}/notifications/send", headers=bureau_headers,
                          json={"user_ids": [cariste_user["id"]], "title": "TEST_n3", "body": ""}, timeout=10)
        assert r.status_code == 400

    def test_send_by_cariste_forbidden(self, cariste_user, bureau_token):
        # cariste cannot send
        r = requests.post(f"{API}/notifications/send", headers=cariste_user["headers"],
                          json={"user_ids": [bureau_token[1]["id"]], "title": "TEST_n4", "body": "hi"},
                          timeout=10)
        assert r.status_code == 403

    def test_send_admin_recipient_filtered_out(self, bureau_headers, cariste_user, webadmin_user):
        admin_row = db_exec("SELECT id FROM users WHERE username=%s", (WEBADMIN_USERNAME,))
        admin_id = admin_row[0][0]
        r = requests.post(f"{API}/notifications/send", headers=bureau_headers,
                          json={"user_ids": [cariste_user["id"], admin_id],
                                "title": "TEST_filter", "body": "msg"}, timeout=15)
        assert r.status_code == 200, r.text
        # Only the cariste should count
        assert r.json()["recipients"] == 1

    def test_send_admin_only_returns_400(self, bureau_headers, webadmin_user):
        admin_id = db_exec("SELECT id FROM users WHERE username=%s", (WEBADMIN_USERNAME,))[0][0]
        r = requests.post(f"{API}/notifications/send", headers=bureau_headers,
                          json={"user_ids": [admin_id], "title": "TEST_only_admin", "body": "x"},
                          timeout=10)
        assert r.status_code == 400

    def test_history(self, bureau_headers, cariste_user):
        # send one
        requests.post(f"{API}/notifications/send", headers=bureau_headers,
                      json={"user_ids": [cariste_user["id"]], "title": "TEST_hist", "body": "y"}, timeout=10)
        r = requests.get(f"{API}/notifications/history", headers=bureau_headers, timeout=10)
        assert r.status_code == 200
        items = r.json()
        assert isinstance(items, list)
        assert any(i["title"] == "TEST_hist" for i in items)

    def test_history_cariste_forbidden(self, cariste_user):
        r = requests.get(f"{API}/notifications/history", headers=cariste_user["headers"], timeout=10)
        assert r.status_code == 403


# ---------- DEVICES ----------
class TestDevices:
    def test_register_unregister(self, cariste_user):
        token = f"TEST_devtok_{uuid.uuid4().hex[:8]}"
        r = requests.post(f"{API}/devices/register", headers=cariste_user["headers"],
                          json={"token": token, "platform": "android"}, timeout=10)
        assert r.status_code == 200
        assert r.json()["ok"] is True
        # idempotent (ON DUPLICATE KEY)
        r2 = requests.post(f"{API}/devices/register", headers=cariste_user["headers"],
                           json={"token": token, "platform": "ios"}, timeout=10)
        assert r2.status_code == 200
        # unregister
        d = requests.delete(f"{API}/devices/unregister", headers=cariste_user["headers"],
                            json={"token": token, "platform": "android"}, timeout=10)
        assert d.status_code == 200
