# バージョン情報: Python 3.12+ / FastAPI 0.115.0 / uvicorn 0.30.0 / httpx 0.28.0 # [2026-02-14] 3060(12GB)戦術支援:Mattermost連携・文字化け完全対策・全ロジック非短縮版 import argparse import asyncio import json import os import re import signal import sys import threading from contextlib import asynccontextmanager from datetime import datetime import httpx import uvicorn from fastapi import FastAPI, Request from starlette.responses import StreamingResponse # --- 視認性向上のためのカラー定数 --- C_GRAY, C_CYAN, C_GREEN, C_YELLOW, C_RED, C_WHITE, C_RESET = ( "\033[90m", "\033[96m", "\033[92m", "\033[93m", "\033[91m", "\033[97m", "\033[0m", ) CONFIG = { "remote_port": 11432, "url": "http://127.0.0.1:11432", "timeout": httpx.Timeout(600.0, connect=10.0), "dump_next": False, "models_cache": [], "loop": None, "vram_total": 12.0, # --- Mattermost 連携設定 --- "webhook_url_secret": "https://mm.ka.sugeee.com/hooks/ctjisw6ugjg85nhddo9ox9zt4c", "webhook_url": "https://mm.ka.sugeee.com/hooks/7d4793kdufyad8afi9x9wof13r", "enable_webhook": True, "max_log_len": 4000 } # 高速抽出用正規表現 RE_CONTENT = re.compile(r'"(?:content|response)":\s*"(.*?)(?7}{C_RESET} | {C_WHITE}{gb:>6.2f}GB{C_RESET}" ) # 2行目: モデル本体名 + 量子化詳細 (detail_mode時) q_info = f" {C_GRAY}[{meta['q']}]{C_RESET}" if detail_mode else "" print(f" {'':<6} | {color}{model_base:<45}{C_RESET} | {q_info}") print(f"{C_CYAN}{'=' * 88}{C_RESET}") # --- 📊 VRAM リアルタイムステータス --- def show_vram_status(): async def _fetch(): try: async with httpx.AsyncClient(timeout=2.0) as client: res = await client.get(f"{CONFIG['url']}/api/ps") if res.status_code == 200: models = res.json().get("models", []) print(f"\n{get_ts()} {C_CYAN}--- Active VRAM Usage ---{C_RESET}") if not models: print( f"{get_ts()} {C_GRAY}No models currently in VRAM.{C_RESET}" ) for m in models: v_gb, t_gb = ( m.get("size_vram", 0) / (1024**3), m.get("size", 0) / (1024**3), ) v_color = ( C_RED if v_gb > 11 else (C_YELLOW if v_gb > 8 else C_GREEN) ) print( f"{get_ts()} {C_WHITE}{m['name']:<25}{C_RESET} {v_color}{v_gb:.2f}{C_RESET} / {t_gb:.2f} GB" ) except: print(f"\n{C_RED}[Error] Could not fetch VRAM status.{C_RESET}") if CONFIG["loop"]: asyncio.run_coroutine_threadsafe(_fetch(), CONFIG["loop"]) # --- 🚀 Proxy Core (高速リアルタイムDUMP実装) --- @asynccontextmanager async def lifespan(app: FastAPI): CONFIG["loop"] = asyncio.get_running_loop() asyncio.create_task(fetch_detailed_models()) yield app = FastAPI(lifespan=lifespan) @app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"]) async def sticky_proxy(path: str, request: Request): do_dump = CONFIG["dump_next"] if do_dump: CONFIG["dump_next"] = False print(f"{get_ts()} {C_WHITE}/{path: <10}{C_RESET} ", end="", flush=True) body = await request.body() # 送信時にどのモデルか判別するための抽出 model_name = "unknown" try: model_name = json.loads(body).get("model", "unknown") except: pass if do_dump: print( f"\n{C_YELLOW}{'=' * 60}\n[DUMP REQUEST: {model_name}]\n{'=' * 60}{C_RESET}" ) try: print(json.dumps(json.loads(body), indent=2, ensure_ascii=False)) except: print(f"{C_GRAY}Body (Raw): {body[:500]!r}{C_RESET}") print(f"\n{C_GREEN}[REALTIME AI RESPONSE CONTENT]{C_RESET}") # 送信パルス for _ in range(max(1, min(len(body) // 512, 12))): pulse("^", C_CYAN) pulse("|", C_YELLOW) async def stream_response(): async with httpx.AsyncClient(timeout=CONFIG["timeout"]) as client: try: headers = { k: v for k, v in request.headers.items() if k.lower() not in ["host", "content-length", "connection"] } async with client.stream( request.method, f"{CONFIG['url']}/{path}", content=body, headers=headers, ) as response: pulse("v", C_GREEN) full_text_buffer = [] async for chunk in response.aiter_bytes(): # チャンクから正規表現で高速テキスト抽出 raw_data = chunk.decode(errors="ignore") matches = RE_CONTENT.findall(raw_data) for m in matches: try: # 【最重要】Unicodeエスケープを戻し、化けないようにUTF-8で再構成 text = m.encode('utf-8').decode('unicode_escape').encode('latin1').decode('utf-8') full_text_buffer.append(text) if do_dump: print(f"{C_WHITE}{text}{C_RESET}", end="", flush=True) except: # 万が一デコードに失敗した場合はマッチした文字列をそのまま保持 full_text_buffer.append(m) pulse("v", C_GREEN) yield chunk # 通信完了後に非同期タスクとしてWebhook送信 if full_text_buffer: combined_text = "".join(full_text_buffer) asyncio.create_task(post_to_mattermost( model_name, path, body, combined_text )) if do_dump: print(f"\n{C_GREEN}{'=' * 60}{C_RESET}") pulse("*", C_YELLOW) except Exception as e: print(f" {C_RED}[Proxy Error] {e}{C_RESET}") finally: print("", flush=True) return StreamingResponse(stream_response()) # --- ⌨️ コマンド入力ハンドラ --- def input_handler(): print( f"\n{C_GREEN}oproxy: Full Tactical Suite (RTX 3060 12GB Edition) Active{C_RESET}" ) print( f"{C_GRAY}Commands: [d]Dump [s]VRAM [l]List [ll]Detail [digit]Port [q]Exit{C_RESET}\n" ) while True: try: line = sys.stdin.readline().strip().lower() if not line: continue if line == "q": os.kill(os.getpid(), signal.SIGINT) elif line in ["d", "dd"]: CONFIG["dump_next"] = not CONFIG["dump_next"] print( f"{get_ts()} DUMP Reservation: {'ON' if CONFIG['dump_next'] else 'OFF'}" ) elif line == "s": show_vram_status() elif line in ["l", "ll"]: asyncio.run_coroutine_threadsafe( fetch_detailed_models(), CONFIG["loop"] ) print_analysis(detail_mode=(line == "ll")) elif line.isdigit(): p = int(line) CONFIG["remote_port"], CONFIG["url"] = p, f"http://127.0.0.1:{p}" print(f"{get_ts()} Target Port Switched -> {p}") except: pass if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("-r", "--remote", type=int, default=11432) parser.add_argument("-l", "--local", type=int, default=11434) args = parser.parse_args() CONFIG.update( {"remote_port": args.remote, "url": f"http://127.0.0.1:{args.remote}"} ) # メインスレッドでUvicorn、サブスレッドで入力待ち threading.Thread(target=input_handler, daemon=True).start() uvicorn.run(app, host="127.0.0.1", port=args.local, log_level="error")