oproxy/oproxy.py
2026-02-10 08:59:22 +09:00

303 lines
11 KiB
Python

# バージョン情報: Python 3.12+ / FastAPI 0.115.0 / uvicorn 0.30.0 / httpx 0.28.0
# [2026-02-10] 3060(12GB)戦術支援:全機能完全統合・非短縮版
import argparse
import asyncio
import json
import os
import re
import signal
import sys
import threading
from contextlib import asynccontextmanager
from datetime import datetime
import httpx
import uvicorn
from fastapi import FastAPI, Request
from starlette.responses import StreamingResponse
# --- 視認性向上のためのカラー定数 ---
C_GRAY, C_CYAN, C_GREEN, C_YELLOW, C_RED, C_WHITE, C_RESET = (
"\033[90m",
"\033[96m",
"\033[92m",
"\033[93m",
"\033[91m",
"\033[97m",
"\033[0m",
)
CONFIG = {
"remote_port": 11432,
"url": "http://127.0.0.1:11432",
"timeout": httpx.Timeout(600.0, connect=10.0),
"dump_next": False,
"models_cache": [],
"loop": None,
"vram_total": 12.0,
}
# 高速抽出用正規表現
RE_CONTENT = re.compile(r'"(?:content|response)":\s*"(.*?)(?<!\\)"')
def get_ts():
return f"{C_GRAY}[{datetime.now().strftime('%H:%M:%S')}]{C_RESET}"
def pulse(char, color=C_RESET):
print(f"{color}{char}{C_RESET}", end="", flush=True)
# --- 🧠 戦略的モデル分析 (2行分割レイアウト) ---
async def fetch_detailed_models():
"""Ollamaの内部情報を深掘りし、メタデータを完全取得する"""
try:
async with httpx.AsyncClient(timeout=5.0) as client:
res = await client.get(f"{CONFIG['url']}/api/tags")
if res.status_code != 200:
return
models = res.json().get("models", [])
for m in models:
try:
show_res = await client.post(
f"{CONFIG['url']}/api/show", json={"name": m["name"]}
)
if show_res.status_code == 200:
data = show_res.json()
templ = data.get("template", "").lower()
params = data.get("parameters", "")
# エージェント/補完タイプの判定
m_type = "inst"
if any(x in templ for x in ["tool", "parameters", "call"]):
m_type = "agent"
elif any(
x in m["name"].lower()
for x in ["acp", "fim", "base", "code"]
):
m_type = "acp"
# コンテキスト長の抽出
ctx = "Def"
if "num_ctx" in params:
ctx = params.split("num_ctx")[-1].strip().split()[0]
m["meta"] = {
"type": m_type,
"ctx": ctx,
"q": m.get("details", {}).get("quantization_level", "?"),
}
except:
m["meta"] = {"type": "inst", "ctx": "???", "q": "?"}
CONFIG["models_cache"] = models
except:
pass
def print_analysis(detail_mode=False):
"""ガタつきを排した重厚な2行リスト表示"""
print(
f"\n{C_CYAN}--- Strategic Agent Analysis (Target: {CONFIG['vram_total']}GB VRAM) ---{C_RESET}"
)
print(
f"{C_GRAY}{'STATUS':<7} | {'MODEL IDENTIFIER':<45} | {'TYPE':<7} | {'CTX':<7} | {'SIZE'}{C_RESET}"
)
print(f"{C_GRAY}{'-' * 88}{C_RESET}")
for m in CONFIG["models_cache"]:
gb = m.get("size", 0) / (1024**3)
meta = m.get("meta", {"type": "inst", "ctx": "???", "q": "?"})
# 3060(12GB)用 VRAM判定
if gb <= CONFIG["vram_total"] * 0.75:
color, status = C_GREEN, "SAFE"
elif gb <= CONFIG["vram_total"]:
color, status = C_YELLOW, "LIMIT"
else:
color, status = C_RED, "OVER"
type_c = (
C_CYAN
if meta["type"] == "agent"
else (C_YELLOW if meta["type"] == "acp" else C_WHITE)
)
full_name = m["name"]
if "/" in full_name:
parts = full_name.rsplit("/", 1)
org_path, model_base = parts[0] + "/", parts[1]
else:
org_path, model_base = "", full_name
# 1行目: ステータス、組織パス、タイプ、CTX、サイズ
print(
f" {color}{status:<6}{C_RESET} | {C_GRAY}{org_path:<45}{C_RESET} | {type_c}{meta['type']:<7}{C_RESET} | {C_WHITE}{str(meta['ctx']):>7}{C_RESET} | {C_WHITE}{gb:>6.2f}GB{C_RESET}"
)
# 2行目: モデル本体名 + 量子化詳細 (detail_mode時)
q_info = f" {C_GRAY}[{meta['q']}]{C_RESET}" if detail_mode else ""
print(f" {'':<6} | {color}{model_base:<45}{C_RESET} | {q_info}")
print(f"{C_CYAN}{'=' * 88}{C_RESET}")
# --- 📊 VRAM リアルタイムステータス ---
def show_vram_status():
async def _fetch():
try:
async with httpx.AsyncClient(timeout=2.0) as client:
res = await client.get(f"{CONFIG['url']}/api/ps")
if res.status_code == 200:
models = res.json().get("models", [])
print(f"\n{get_ts()} {C_CYAN}--- Active VRAM Usage ---{C_RESET}")
if not models:
print(
f"{get_ts()} {C_GRAY}No models currently in VRAM.{C_RESET}"
)
for m in models:
v_gb, t_gb = (
m.get("size_vram", 0) / (1024**3),
m.get("size", 0) / (1024**3),
)
v_color = (
C_RED if v_gb > 11 else (C_YELLOW if v_gb > 8 else C_GREEN)
)
print(
f"{get_ts()} {C_WHITE}{m['name']:<25}{C_RESET} {v_color}{v_gb:.2f}{C_RESET} / {t_gb:.2f} GB"
)
except:
print(f"\n{C_RED}[Error] Could not fetch VRAM status.{C_RESET}")
if CONFIG["loop"]:
asyncio.run_coroutine_threadsafe(_fetch(), CONFIG["loop"])
# --- 🚀 Proxy Core (高速リアルタイムDUMP実装) ---
@asynccontextmanager
async def lifespan(app: FastAPI):
CONFIG["loop"] = asyncio.get_running_loop()
asyncio.create_task(fetch_detailed_models())
yield
app = FastAPI(lifespan=lifespan)
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def sticky_proxy(path: str, request: Request):
do_dump = CONFIG["dump_next"]
if do_dump:
CONFIG["dump_next"] = False
print(f"{get_ts()} {C_WHITE}/{path: <10}{C_RESET} ", end="", flush=True)
body = await request.body()
if do_dump:
print(
f"\n{C_YELLOW}{'=' * 60}\n[DUMP REQUEST: {request.method} /{path}]\n{'=' * 60}{C_RESET}"
)
try:
print(json.dumps(json.loads(body), indent=2, ensure_ascii=False))
except:
print(f"{C_GRAY}Body (Raw): {body[:500]!r}{C_RESET}")
print(f"\n{C_GREEN}[REALTIME AI RESPONSE CONTENT]{C_RESET}")
# 送信パルス
for _ in range(max(1, min(len(body) // 512, 12))):
pulse("^", C_CYAN)
pulse("|", C_YELLOW)
async def stream_response():
async with httpx.AsyncClient(timeout=CONFIG["timeout"]) as client:
try:
headers = {
k: v
for k, v in request.headers.items()
if k.lower() not in ["host", "content-length", "connection"]
}
async with client.stream(
request.method,
f"{CONFIG['url']}/{path}",
content=body,
headers=headers,
) as response:
pulse("v", C_GREEN)
async for chunk in response.aiter_bytes():
if do_dump:
# チャンクから正規表現で高速テキスト抽出(リアルタイム表示)
raw_data = chunk.decode(errors="ignore")
matches = RE_CONTENT.findall(raw_data)
for m in matches:
try:
text = m.encode().decode(
"unicode_escape", errors="ignore"
)
print(
f"{C_WHITE}{text}{C_RESET}", end="", flush=True
)
except:
pass
pulse("v", C_GREEN)
yield chunk
if do_dump:
print(f"\n{C_GREEN}{'=' * 60}{C_RESET}")
pulse("*", C_YELLOW)
except Exception as e:
print(f" {C_RED}[Proxy Error] {e}{C_RESET}")
finally:
print("", flush=True)
return StreamingResponse(stream_response())
# --- ⌨️ コマンド入力ハンドラ ---
def input_handler():
print(
f"\n{C_GREEN}oproxy: Full Tactical Suite (RTX 3060 12GB Edition) Active{C_RESET}"
)
print(
f"{C_GRAY}Commands: [d]Dump [s]VRAM [l]List [ll]Detail [digit]Port [q]Exit{C_RESET}\n"
)
while True:
try:
line = sys.stdin.readline().strip().lower()
if not line:
continue
if line == "q":
os.kill(os.getpid(), signal.SIGINT)
elif line in ["d", "dd"]:
CONFIG["dump_next"] = not CONFIG["dump_next"]
print(
f"{get_ts()} DUMP Reservation: {'ON' if CONFIG['dump_next'] else 'OFF'}"
)
elif line == "s":
show_vram_status()
elif line in ["l", "ll"]:
asyncio.run_coroutine_threadsafe(
fetch_detailed_models(), CONFIG["loop"]
)
print_analysis(detail_mode=(line == "ll"))
elif line.isdigit():
p = int(line)
CONFIG["remote_port"], CONFIG["url"] = p, f"http://127.0.0.1:{p}"
print(f"{get_ts()} Target Port Switched -> {p}")
except:
pass
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-r", "--remote", type=int, default=11432)
parser.add_argument("-l", "--local", type=int, default=11434)
args = parser.parse_args()
CONFIG.update(
{"remote_port": args.remote, "url": f"http://127.0.0.1:{args.remote}"}
)
# メインスレッドでUvicorn、サブスレッドで入力待ち
threading.Thread(target=input_handler, daemon=True).start()
uvicorn.run(app, host="127.0.0.1", port=args.local, log_level="error")