import os, base64, subprocess, asyncio, firebase_admin, json, shutil, time, urllib.parse, uuid
from firebase_admin import credentials, firestore
from telethon import TelegramClient, types
from telethon.sessions import StringSession
from google.cloud.firestore_v1.base_query import FieldFilter

# --- [ Configuration ] ---
from dotenv import load_dotenv
load_dotenv(os.getenv("ENV_FILE", ".env"))

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
FIREBASE_CREDENTIALS = os.getenv(
    "GOOGLE_APPLICATION_CREDENTIALS",
    os.path.join(BASE_DIR, "firebase.json")
)
FREE_NETWORK_MBPS = float(os.getenv("FREE_NETWORK_MBPS", "10"))
PREMIUM_WORKERS = max(1, int(os.getenv("PREMIUM_WORKERS", "1")))
FREE_WORKERS = max(1, int(os.getenv("FREE_WORKERS", "1")))

if not firebase_admin._apps:
    cred = credentials.Certificate(FIREBASE_CREDENTIALS)
    firebase_admin.initialize_app(cred)

db = firestore.client()

API_ID = int(os.getenv("API_ID", "0"))
API_HASH = os.getenv("API_HASH")
SESSION_STRING = os.getenv("SESSION_STRING")
client = TelegramClient(StringSession(SESSION_STRING), API_ID, API_HASH)

MSG_DB = {
    'my': {'dl_fail': "⚠️ ဗီဒီယိုဒေါင်း၍မရပါ။", 'process_err': "⚠️ အမှားအယွင်းရှိပါသည်။", 'part_label': "အပိုင်း", 'end_label': " (ဇာတ်သိမ်း) ✅", 'blocked_msg': "⛔ သင့်အကောင့်သည် Block ခံထားရပါပြီ။ Admin ကိုဆက်သွယ်ပါ။"},
    'en': {'dl_fail': "⚠️ Download failed.", 'process_err': "⚠️ Error occurred.", 'part_label': "Part", 'end_label': " (End) ✅", 'blocked_msg': "⛔ Your account has been blocked. Please contact admin."}
}

RECIPIENT_UNREACHABLE_MARKERS = (
    "blocked",
    "forbidden",
    "peeridinvalid",
    "peer id invalid",
    "privacy",
    "deactivated",
    "can't write",
    "cannot write",
    "not enough rights",
)

def is_recipient_unreachable(error):
    text = f"{error.__class__.__name__} {error}".lower()
    return any(marker in text for marker in RECIPIENT_UNREACHABLE_MARKERS)

def mark_task_failed(doc_ref, error, detail):
    doc_ref.update({
        'status': 'failed',
        'stage': 'failed',
        'progress_text': f"Failed: {error}",
        'error': error,
        'error_detail': detail[:500],
        'failedAt': firestore.SERVER_TIMESTAMP
    })

def get_blocked_user_lang(uid):
    try:
        user_doc = db.collection('users').document(str(uid)).get()
        if not user_doc.exists:
            return False, 'my'
        data = user_doc.to_dict() or {}
        return data.get('is_blocked', False) is True, data.get('lang', 'my')
    except Exception as e:
        print(f"⚠️ Failed to check blocked user status: {e}", flush=True)
        return False, 'my'

def get_duration(file):
    try:
        res = subprocess.run(['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', file], capture_output=True, text=True)
        return float(res.stdout) if res.stdout else 0
    except: return 0

def file_size_mb(file):
    if not os.path.exists(file):
        return 0
    return os.path.getsize(file) / (1024 * 1024)

def download_mbps(bytes_count, elapsed):
    if elapsed <= 0:
        return 0
    return (bytes_count * 8) / elapsed / 1_000_000

def mbps_to_ytdlp_rate(mbps):
    if not mbps or mbps <= 0:
        return None
    return f"{max(1, int((mbps * 1_000_000) / 8))}"

def throttle_transfer(current_bytes, started_at, max_mbps):
    if not max_mbps or max_mbps <= 0:
        return
    expected_elapsed = (current_bytes * 8) / (max_mbps * 1_000_000)
    delay = expected_elapsed - (time.time() - started_at)
    if delay > 0:
        time.sleep(min(delay, 2))

def update_task_progress(doc_ref, stage, text, **extra):
    payload = {
        'stage': stage,
        'progress_text': text,
        'updatedAt': firestore.SERVER_TIMESTAMP
    }
    payload.update(extra)
    doc_ref.update(payload)
    print(f"📍 {stage}: {text}", flush=True)

def count_video_parts(total_sec, split_sec):
    curr = 0
    total_parts = 0
    while curr < total_sec:
        end = curr + split_sec
        if (total_sec - end) < (split_sec * 0.7):
            end = total_sec
        total_parts += 1
        curr = end
        if curr >= total_sec:
            break
    return total_parts

def task_matches_lane(data, worker_lane):
    is_premium = data.get('is_premium', False) is True
    return is_premium if worker_lane == "premium" else not is_premium

def find_next_task(worker_lane, allow_processing_resume):
    wanted_premium = worker_lane == "premium"

    if allow_processing_resume:
        processing = db.collection('tasks').where(filter=FieldFilter("status", "==", "processing"))\
            .where(filter=FieldFilter("is_premium", "==", wanted_premium))\
            .order_by("createdAt", direction=firestore.Query.ASCENDING)\
            .limit(1).get()
        if processing:
            return processing[0]

    queued = db.collection('tasks').where(filter=FieldFilter("status", "==", "queued"))\
        .where(filter=FieldFilter("is_premium", "==", wanted_premium))\
        .order_by("createdAt", direction=firestore.Query.ASCENDING)\
        .limit(1).get()
    if queued:
        return queued[0]

    if worker_lane == "free":
        fallback = db.collection('tasks').where(filter=FieldFilter("status", "==", "queued"))\
            .order_by("createdAt", direction=firestore.Query.ASCENDING)\
            .limit(10).get()
        for doc in fallback:
            if task_matches_lane(doc.to_dict() or {}, worker_lane):
                return doc

    return None

@firestore.transactional
def claim_queued_task(transaction, doc_ref, worker_id, worker_lane):
    snapshot = doc_ref.get(transaction=transaction)
    if not snapshot.exists:
        return None

    data = snapshot.to_dict() or {}
    if data.get("status") != "queued" or not task_matches_lane(data, worker_lane):
        return None

    transaction.update(doc_ref, {
        'status': 'processing',
        'stage': 'processing',
        'progress_text': 'Worker picked this task',
        'worker_id': worker_id,
        'worker_lane': worker_lane,
        'processing_started_at': firestore.SERVER_TIMESTAMP,
        'updatedAt': firestore.SERVER_TIMESTAMP
    })
    data['status'] = 'processing'
    return data

def parse_telegram_message_link(url):
    parsed = urllib.parse.urlparse(url.strip())
    host = parsed.netloc.lower().lstrip("www.")
    if host not in ("t.me", "telegram.me"):
        raise ValueError("Not a Telegram message link")

    parts = [urllib.parse.unquote(part) for part in parsed.path.split("/") if part]
    if parts and parts[0] == "s":
        parts = parts[1:]

    if len(parts) < 2:
        raise ValueError("Telegram message link must include a message id")

    if parts[0] == "c":
        if len(parts) < 3 or not parts[1].isdigit() or not parts[2].isdigit():
            raise ValueError("Invalid private Telegram message link")
        channel_id = int(parts[1])
        message_id = int(parts[2])
        return [types.PeerChannel(channel_id), int(f"-100{channel_id}")], message_id

    username = parts[0].lstrip("@")
    if not parts[1].isdigit():
        raise ValueError("Invalid public Telegram message link")
    return [username], int(parts[1])

async def download_telegram_link(url, output_path):
    chats, message_id = parse_telegram_message_link(url)
    errors = []

    for chat in chats:
        try:
            msg = await client.get_messages(chat, ids=message_id)
            if not msg:
                raise ValueError("Message not found")
            downloaded = await client.download_media(msg, output_path)
            if downloaded and os.path.exists(output_path):
                return True
            raise ValueError("Message has no downloadable media")
        except Exception as e:
            errors.append(f"{chat}: {type(e).__name__}: {e}")

    raise RuntimeError("; ".join(errors))

async def download_telegram_link_with_progress(url, output_path, doc_ref, network_limit_mbps=None):
    chats, message_id = parse_telegram_message_link(url)
    errors = []

    for chat in chats:
        try:
            msg = await client.get_messages(chat, ids=message_id)
            if not msg:
                raise ValueError("Message not found")

            started = time.time()
            last_update = {'time': 0}

            def progress_callback(current, total):
                throttle_transfer(current, started, network_limit_mbps)
                now = time.time()
                if now - last_update['time'] < 5 and current != total:
                    return
                last_update['time'] = now
                percent = (current / total * 100) if total else 0
                speed = download_mbps(current, now - started)
                text = f"Downloading {percent:.1f}% at {speed:.1f} Mbps"
                update_task_progress(
                    doc_ref,
                    'downloading',
                    text,
                    download_bytes=current,
                    download_total_bytes=total,
                    download_mbps=round(speed, 2)
                )

            downloaded = await client.download_media(
                msg,
                output_path,
                progress_callback=progress_callback
            )
            if downloaded and os.path.exists(output_path):
                return True
            raise ValueError("Message has no downloadable media")
        except Exception as e:
            errors.append(f"{chat}: {type(e).__name__}: {e}")

    raise RuntimeError("; ".join(errors))

async def send_file_with_progress(chat_id, file_path, caption, doc_ref, part_num, total_parts, network_limit_mbps=None):
    started = time.time()
    last_update = {'time': 0}

    def progress_callback(current, total):
        throttle_transfer(current, started, network_limit_mbps)
        now = time.time()
        if now - last_update['time'] < 5 and current != total:
            return
        last_update['time'] = now
        percent = (current / total * 100) if total else 0
        speed = download_mbps(current, now - started)
        update_task_progress(
            doc_ref,
            'uploading',
            f"Uploading part {part_num}/{total_parts}: {percent:.1f}% at {speed:.1f} Mbps",
            upload_bytes=current,
            upload_total_bytes=total,
            upload_mbps=round(speed, 2),
            current_part=part_num
        )

    return await client.send_file(
        chat_id,
        file_path,
        caption=caption,
        progress_callback=progress_callback
    )

def download_with_ytdlp(url, output_path, doc_ref, network_limit_mbps=None):
    started = time.time()
    update_task_progress(doc_ref, 'downloading', 'Starting yt-dlp download')
    cmd = [
        'yt-dlp',
        '--no-check-certificate',
        '--newline',
        '--progress-template',
        'download:%(progress._percent_str)s|%(progress._speed_str)s|%(progress._eta_str)s',
    ]
    rate = mbps_to_ytdlp_rate(network_limit_mbps)
    if rate:
        cmd += ['--limit-rate', rate]
    cmd += ['-o', output_path, url]
    proc = subprocess.Popen(
        cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True,
        bufsize=1
    )
    last_update = 0

    for raw_line in proc.stdout:
        line = raw_line.strip()
        if not line:
            continue
        print(f"yt-dlp: {line}", flush=True)
        if line.startswith("download:") and time.time() - last_update >= 5:
            last_update = time.time()
            parts = line.replace("download:", "", 1).split("|")
            percent = parts[0].strip() if len(parts) > 0 else "?"
            speed = parts[1].strip() if len(parts) > 1 else "?"
            eta = parts[2].strip() if len(parts) > 2 else "?"
            update_task_progress(
                doc_ref,
                'downloading',
                f"Downloading {percent} at {speed}, ETA {eta}"
            )

    return_code = proc.wait()
    if return_code != 0:
        return False

    elapsed = time.time() - started
    size_bytes = os.path.getsize(output_path) if os.path.exists(output_path) else 0
    speed = download_mbps(size_bytes, elapsed)
    update_task_progress(
        doc_ref,
        'downloaded',
        f"Downloaded {file_size_mb(output_path):.1f} MB in {elapsed:.1f}s ({speed:.1f} Mbps)",
        download_mbps=round(speed, 2),
        download_size_mb=round(file_size_mb(output_path), 2)
    )
    return True

async def worker_engine(worker_lane="free", worker_index=1):
    start_time = time.time()
    worker_id = f"{worker_lane}-{worker_index}-{uuid.uuid4().hex[:8]}"
    print(f"🚀 Luxury {worker_lane.title()} Worker Started: {worker_id}", flush=True)

    while True:
        if time.time() - start_time > 20700: break
        uid = None; doc_ref = None; current_files = []
        
        try:
            print(f"🔍 {worker_id} searching for {worker_lane} tasks...", flush=True)

            doc = find_next_task(worker_lane, worker_index == 1)

            if not doc:
                print(f"💤 No {worker_lane} tasks found. Waiting...", flush=True)
                await asyncio.sleep(15); continue

            doc_ref = doc.reference
            data = doc.to_dict() or {}

            if data.get('status') == 'queued':
                transaction = db.transaction()
                data = claim_queued_task(transaction, doc_ref, worker_id, worker_lane)
                if not data:
                    print(f"↪️ {worker_id} task was already claimed. Looking again...", flush=True)
                    await asyncio.sleep(2)
                    continue
            elif not task_matches_lane(data, worker_lane):
                await asyncio.sleep(2)
                continue

            uid = int(data.get('recipient_id') or data.get('chat_id') or data.get('user_id') or 0)
            lang = data.get('lang', 'my')
            v_url = data.get('value', '').strip()
            last_idx = data.get('last_sent_index', -1)
            is_premium_val = data.get('is_premium', False)
            network_limit_mbps = None if is_premium_val else FREE_NETWORK_MBPS
            
            print(f"📦 Task ID: {doc.id} | UID: {uid} | Lane: {worker_lane} | Last Sent: {last_idx}", flush=True)

            if uid <= 0:
                doc_ref.update({'status': 'failed', 'error': 'Missing Telegram recipient id'})
                doc_ref = None
                continue

            account_blocked, blocked_lang = get_blocked_user_lang(uid)
            if account_blocked:
                print(f"⛔ Skipping blocked account: {uid}", flush=True)
                try:
                    await client.send_message(uid, MSG_DB.get(blocked_lang, MSG_DB['my'])['blocked_msg'])
                except Exception as e:
                    print(f"⚠️ Could not notify blocked account {uid}: {e}", flush=True)
                mark_task_failed(doc_ref, 'account_blocked_by_admin', f'User {uid} is blocked by admin')
                doc_ref = None
                continue
            
            # Status ကို processing ပြောင်းမယ် (ရှိပြီးသားဆိုရင်လည်း merge ဖြစ်သွားမယ်)
            update_task_progress(
                doc_ref,
                'processing',
                'Worker picked this task',
                status='processing',
                worker_id=worker_id,
                worker_lane=worker_lane,
                network_limit_mbps=network_limit_mbps or 0
            )
            
            safe_task_id = "".join(ch if ch.isalnum() else "_" for ch in doc.id)
            main_vid = f"vid_{safe_task_id}_{uid}.mp4"
            current_files.append(main_vid)

            # Download Phase
            success = False
            if "t.me/" in v_url or "telegram.me/" in v_url:
                try:
                    update_task_progress(doc_ref, 'downloading', 'Starting Telegram download')
                    started = time.time()
                    success = await download_telegram_link_with_progress(v_url, main_vid, doc_ref, network_limit_mbps)
                    elapsed = time.time() - started
                    size_bytes = os.path.getsize(main_vid) if os.path.exists(main_vid) else 0
                    speed = download_mbps(size_bytes, elapsed)
                    update_task_progress(
                        doc_ref,
                        'downloaded',
                        f"Downloaded {file_size_mb(main_vid):.1f} MB in {elapsed:.1f}s ({speed:.1f} Mbps)",
                        download_mbps=round(speed, 2),
                        download_size_mb=round(file_size_mb(main_vid), 2)
                    )
                    print("✅ Downloaded from Telegram.", flush=True)
                except Exception as e: print(f"❌ TG DL Error: {e}")
            else:
                try:
                    success = download_with_ytdlp(v_url, main_vid, doc_ref, network_limit_mbps)
                    print("✅ Downloaded via yt-dlp.", flush=True)
                except Exception as e: print(f"❌ yt-dlp Error: {e}")

            if not success or not os.path.exists(main_vid):
                mark_task_failed(doc_ref, 'download_failed', v_url)
                try:
                    await client.send_message(uid, MSG_DB[lang]['dl_fail'])
                    doc_ref = None
                except Exception as e:
                    if is_recipient_unreachable(e):
                        print(f"🚫 Recipient unreachable while sending download failure notice: {e}", flush=True)
                        mark_task_failed(doc_ref, 'recipient_unreachable', str(e))
                        doc_ref = None
                    else:
                        raise
                continue

            # Splitting + delivery phase: send each part as soon as it is ready.
            total_sec = get_duration(main_vid)
            parts_dir = f"parts_{safe_task_id}_{uid}"; os.makedirs(parts_dir, exist_ok=True)
            s_p = (data.get('len') or "5:00").split(':')
            split_sec = int(s_p[0]) * 60 + (int(s_p[1]) if len(s_p) > 1 else 0)
            if split_sec <= 0:
                mark_task_failed(doc_ref, 'invalid_split_length', data.get('len', ''))
                doc_ref = None
                continue
            total_parts = count_video_parts(total_sec, split_sec) if split_sec > 0 else 0
            update_task_progress(
                doc_ref,
                'splitting',
                f"Video ready: {file_size_mb(main_vid):.1f} MB, {total_parts} parts",
                duration_sec=total_sec,
                split_sec=split_sec,
                total_parts=total_parts,
                download_size_mb=round(file_size_mb(main_vid), 2)
            )
            
            u_wm = data.get('wm', '').replace("'", "\\'")
            user_draw = f"drawtext=text='{u_wm}':x='(w-text_w)/2+((w-text_w)/3)*sin(t/3)':y='(h-text_h)/2+((h-text_h)/3)*cos(t/2)':fontfile='Pyidaungsu.ttf':fontcolor=white@0.5:fontsize=75" if u_wm else "null"
            bot_draw = ",drawtext=text='https\\://t.me/tt_uploader_bot':x='(w-text_w)-mod(t*120\\,w-text_w)':y='h-text_h-35-((h-text_h)*0.16)*(0.5+0.5*cos(t/2.3))':fontfile='Pyidaungsu.ttf':fontcolor=white@0.5:fontsize=45" if not is_premium_val else ""
            final_vf = f"{user_draw}{bot_draw}"
            if is_premium_val:
                encode_args = ['-c:v', 'libx264', '-preset', 'veryfast', '-crf', '20', '-c:a', 'copy']
                quality_label = "Premium high quality"
            else:
                encode_args = ['-c:v', 'libx264', '-preset', 'superfast', '-crf', '30', '-c:a', 'aac', '-b:a', '96k']
                quality_label = "Free compressed quality"

            curr, p_num = 0, 1
            delivery_failed = False
            while curr < total_sec:
                end = curr + split_sec
                if (total_sec - end) < (split_sec * 0.7): end = total_sec
                out = f"{parts_dir}/p_{p_num:03d}.mp4"

                # ✅ ပို့ပြီးသားအပိုင်းဆိုရင် FFmpeg ကို ကျော်မယ် (Resume optimization)
                if p_num <= last_idx + 1:
                    print(f"⏩ Skipping part {p_num} (Already Sent)", flush=True)
                else:
                    part_started = time.time()
                    update_task_progress(
                        doc_ref,
                        'splitting',
                        f"Splitting part {p_num}/{total_parts}",
                        current_part=p_num
                    )
                    cmd = ['ffmpeg', '-y', '-ss', str(curr), '-to', str(end), '-i', main_vid]
                    if data.get('logo_data'):
                        logo_img = f"logo_{safe_task_id}_{uid}.png"
                        if logo_img not in current_files: current_files.append(logo_img)
                        with open(logo_img, "wb") as f: f.write(base64.b64decode(data['logo_data'].split(",")[1]))
                        pos = {"tr": "W-w-15:15", "tl": "15:15", "br": "W-w-15:H-h-15", "bl": "15:H-h-15"}.get(data.get('pos', 'tr'), "W-w-15:15")
                        cmd += ['-i', logo_img, '-filter_complex', f"[1:v]scale=150:-1,format=rgba,colorchannelmixer=aa=0.6[l];[0:v]{final_vf}[v1];[v1][l]overlay={pos}"]
                    else: cmd += ['-vf', final_vf]
                    cmd += encode_args + [out]
                    subprocess.run(cmd, check=True, capture_output=True)
                    split_elapsed = time.time() - part_started
                    update_task_progress(
                        doc_ref,
                        'uploading',
                        f"Uploading part {p_num}/{total_parts} ({file_size_mb(out):.1f} MB, {quality_label})",
                        current_part=p_num,
                        current_part_size_mb=round(file_size_mb(out), 2)
                    )

                    cap = f"🎬 {data.get('name','Movie')} - {MSG_DB[lang]['part_label']} ({p_num})"
                    if end >= total_sec: cap += MSG_DB[lang]['end_label']

                    try:
                        await send_file_with_progress(uid, out, cap, doc_ref, p_num, total_parts, network_limit_mbps)
                    except Exception as e:
                        if is_recipient_unreachable(e):
                            print(f"🚫 Recipient unreachable while sending part {p_num}: {e}", flush=True)
                            mark_task_failed(doc_ref, 'recipient_unreachable', str(e))
                            doc_ref = None
                            delivery_failed = True
                            break
                        raise

                    doc_ref.update({'last_sent_index': p_num - 1})
                    update_task_progress(
                        doc_ref,
                        'sent_part',
                        f"Sent part {p_num}/{total_parts} in {time.time() - part_started:.1f}s",
                        last_sent_index=p_num - 1,
                        last_part_split_seconds=round(split_elapsed, 1)
                    )
                    print(f"✅ Sent Part {p_num}/{total_parts} in {time.time() - part_started:.1f}s", flush=True)
                    if os.path.exists(out):
                        os.remove(out)

                curr, p_num = end, p_num + 1
                if curr >= total_sec: break

            if delivery_failed:
                continue

            update_task_progress(doc_ref, 'done', f"Finished {total_parts} parts")
            doc_ref.delete(); doc_ref = None
            print("⭐ Task Finished Successfully.", flush=True)

        except Exception as e:
            print(f"⚠️ Error: {e}")
            if doc_ref and is_recipient_unreachable(e):
                print("🚫 Marking task as failed because recipient is unreachable.", flush=True)
                mark_task_failed(doc_ref, 'recipient_unreachable', str(e))
                doc_ref = None
            elif doc_ref: 
                # Error တက်ရင်လည်း နောက်တစ်ခါ ပြန်ဆက်နိုင်အောင် status ကို processing မှာပဲ ထားခဲ့မယ်
                # ဒါပေမဲ့ ပြင်းထန်တဲ့ error ဆိုရင်တော့ user ကို အသိပေးဖို့လိုတယ်
                pass
            await asyncio.sleep(5)
        finally:
            if uid and 'safe_task_id' in locals() and os.path.exists(f"parts_{safe_task_id}_{uid}"):
                shutil.rmtree(f"parts_{safe_task_id}_{uid}")
            for f in current_files:
                if os.path.exists(f): os.remove(f)

async def main():
    await client.start()
    workers = [
        worker_engine("premium", i + 1)
        for i in range(PREMIUM_WORKERS)
    ] + [
        worker_engine("free", i + 1)
        for i in range(FREE_WORKERS)
    ]
    await asyncio.gather(*workers)

if __name__ == "__main__":
    asyncio.run(main())
