# バージョン情報: Python 3.12+ / FastAPI 0.115.0 / uvicorn 0.30.0 / httpx 0.28.0 # [2026-02-10] 3060(12GB)戦術支援:全機能完全統合・非短縮版 import argparse import asyncio import json import os import re import signal import sys import threading from contextlib import asynccontextmanager from datetime import datetime import httpx import uvicorn from fastapi import FastAPI, Request from starlette.responses import StreamingResponse # --- 視認性向上のためのカラー定数 --- C_GRAY, C_CYAN, C_GREEN, C_YELLOW, C_RED, C_WHITE, C_RESET = ( "\033[90m", "\033[96m", "\033[92m", "\033[93m", "\033[91m", "\033[97m", "\033[0m", ) CONFIG = { "remote_port": 11432, "url": "http://127.0.0.1:11432", "timeout": httpx.Timeout(600.0, connect=10.0), "dump_next": False, "models_cache": [], "loop": None, "vram_total": 12.0, } # 高速抽出用正規表現 RE_CONTENT = re.compile(r'"(?:content|response)":\s*"(.*?)(?7}{C_RESET} | {C_WHITE}{gb:>6.2f}GB{C_RESET}" ) # 2行目: モデル本体名 + 量子化詳細 (detail_mode時) q_info = f" {C_GRAY}[{meta['q']}]{C_RESET}" if detail_mode else "" print(f" {'':<6} | {color}{model_base:<45}{C_RESET} | {q_info}") print(f"{C_CYAN}{'=' * 88}{C_RESET}") # --- 📊 VRAM リアルタイムステータス --- def show_vram_status(): async def _fetch(): try: async with httpx.AsyncClient(timeout=2.0) as client: res = await client.get(f"{CONFIG['url']}/api/ps") if res.status_code == 200: models = res.json().get("models", []) print(f"\n{get_ts()} {C_CYAN}--- Active VRAM Usage ---{C_RESET}") if not models: print( f"{get_ts()} {C_GRAY}No models currently in VRAM.{C_RESET}" ) for m in models: v_gb, t_gb = ( m.get("size_vram", 0) / (1024**3), m.get("size", 0) / (1024**3), ) v_color = ( C_RED if v_gb > 11 else (C_YELLOW if v_gb > 8 else C_GREEN) ) print( f"{get_ts()} {C_WHITE}{m['name']:<25}{C_RESET} {v_color}{v_gb:.2f}{C_RESET} / {t_gb:.2f} GB" ) except: print(f"\n{C_RED}[Error] Could not fetch VRAM status.{C_RESET}") if CONFIG["loop"]: asyncio.run_coroutine_threadsafe(_fetch(), CONFIG["loop"]) # --- 🚀 Proxy Core (高速リアルタイムDUMP実装) --- @asynccontextmanager async def lifespan(app: FastAPI): CONFIG["loop"] = asyncio.get_running_loop() asyncio.create_task(fetch_detailed_models()) yield app = FastAPI(lifespan=lifespan) @app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"]) async def sticky_proxy(path: str, request: Request): do_dump = CONFIG["dump_next"] if do_dump: CONFIG["dump_next"] = False print(f"{get_ts()} {C_WHITE}/{path: <10}{C_RESET} ", end="", flush=True) body = await request.body() if do_dump: print( f"\n{C_YELLOW}{'=' * 60}\n[DUMP REQUEST: {request.method} /{path}]\n{'=' * 60}{C_RESET}" ) try: print(json.dumps(json.loads(body), indent=2, ensure_ascii=False)) except: print(f"{C_GRAY}Body (Raw): {body[:500]!r}{C_RESET}") print(f"\n{C_GREEN}[REALTIME AI RESPONSE CONTENT]{C_RESET}") # 送信パルス for _ in range(max(1, min(len(body) // 512, 12))): pulse("^", C_CYAN) pulse("|", C_YELLOW) async def stream_response(): async with httpx.AsyncClient(timeout=CONFIG["timeout"]) as client: try: headers = { k: v for k, v in request.headers.items() if k.lower() not in ["host", "content-length", "connection"] } async with client.stream( request.method, f"{CONFIG['url']}/{path}", content=body, headers=headers, ) as response: pulse("v", C_GREEN) async for chunk in response.aiter_bytes(): if do_dump: # チャンクから正規表現で高速テキスト抽出(リアルタイム表示) raw_data = chunk.decode(errors="ignore") matches = RE_CONTENT.findall(raw_data) for m in matches: try: text = m.encode().decode( "unicode_escape", errors="ignore" ) print( f"{C_WHITE}{text}{C_RESET}", end="", flush=True ) except: pass pulse("v", C_GREEN) yield chunk if do_dump: print(f"\n{C_GREEN}{'=' * 60}{C_RESET}") pulse("*", C_YELLOW) except Exception as e: print(f" {C_RED}[Proxy Error] {e}{C_RESET}") finally: print("", flush=True) return StreamingResponse(stream_response()) # --- ⌨️ コマンド入力ハンドラ --- def input_handler(): print( f"\n{C_GREEN}oproxy: Full Tactical Suite (RTX 3060 12GB Edition) Active{C_RESET}" ) print( f"{C_GRAY}Commands: [d]Dump [s]VRAM [l]List [ll]Detail [digit]Port [q]Exit{C_RESET}\n" ) while True: try: line = sys.stdin.readline().strip().lower() if not line: continue if line == "q": os.kill(os.getpid(), signal.SIGINT) elif line in ["d", "dd"]: CONFIG["dump_next"] = not CONFIG["dump_next"] print( f"{get_ts()} DUMP Reservation: {'ON' if CONFIG['dump_next'] else 'OFF'}" ) elif line == "s": show_vram_status() elif line in ["l", "ll"]: asyncio.run_coroutine_threadsafe( fetch_detailed_models(), CONFIG["loop"] ) print_analysis(detail_mode=(line == "ll")) elif line.isdigit(): p = int(line) CONFIG["remote_port"], CONFIG["url"] = p, f"http://127.0.0.1:{p}" print(f"{get_ts()} Target Port Switched -> {p}") except: pass if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("-r", "--remote", type=int, default=11432) parser.add_argument("-l", "--local", type=int, default=11434) args = parser.parse_args() CONFIG.update( {"remote_port": args.remote, "url": f"http://127.0.0.1:{args.remote}"} ) # メインスレッドでUvicorn、サブスレッドで入力待ち threading.Thread(target=input_handler, daemon=True).start() uvicorn.run(app, host="127.0.0.1", port=args.local, log_level="error")