oproxy/oproxy.py
2026-02-14 13:57:02 +09:00

358 lines
No EOL
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# バージョン情報: Python 3.12+ / FastAPI 0.115.0 / uvicorn 0.30.0 / httpx 0.28.0
# [2026-02-14] 3060(12GB)戦術支援Mattermost連携・文字化け完全対策・全ロジック非短縮版
import argparse
import asyncio
import json
import os
import re
import signal
import sys
import threading
from contextlib import asynccontextmanager
from datetime import datetime
import httpx
import uvicorn
from fastapi import FastAPI, Request
from starlette.responses import StreamingResponse
# --- 視認性向上のためのカラー定数 ---
C_GRAY, C_CYAN, C_GREEN, C_YELLOW, C_RED, C_WHITE, C_RESET = (
"\033[90m",
"\033[96m",
"\033[92m",
"\033[93m",
"\033[91m",
"\033[97m",
"\033[0m",
)
CONFIG = {
"remote_port": 11432,
"url": "http://127.0.0.1:11432",
"timeout": httpx.Timeout(600.0, connect=10.0),
"dump_next": False,
"models_cache": [],
"loop": None,
"vram_total": 12.0,
# --- Mattermost 連携設定 ---
"webhook_url": "https://mm.ka.sugeee.com/hooks/ctjisw6ugjg85nhddo9ox9zt4c",
"enable_webhook": True,
"max_log_len": 4000
}
# 高速抽出用正規表現
RE_CONTENT = re.compile(r'"(?:content|response)":\s*"(.*?)(?<!\\)"')
def get_ts():
return f"{C_GRAY}[{datetime.now().strftime('%H:%M:%S')}]{C_RESET}"
def pulse(char, color=C_RESET):
print(f"{color}{char}{C_RESET}", end="", flush=True)
# --- 🛰️ Mattermost 送信エンジン (文字化け対策済み) ---
async def post_to_mattermost(model_name, path, req_body, resp_text):
"""通信完了後にバックグラウンドでMattermostへログを飛ばす。生日本語を保持する。"""
if not CONFIG["webhook_url"] or not CONFIG["enable_webhook"]:
return
try:
# リクエストのデコード(生日本語を維持)
try:
req_obj = json.loads(req_body)
req_clean = json.dumps(req_obj, indent=2, ensure_ascii=False)
except:
req_clean = req_body.decode('utf-8', errors='replace')
# レスポンスのカットオフ
resp_display = (
resp_text if len(resp_text) < CONFIG["max_log_len"]
else resp_text[:CONFIG["max_log_len"]] + "\n\n*(長文のため中略...)*"
)
payload = {
"username": f"oproxy:{model_name}",
"icon_url": "https://ollama.com/public/ollama.png",
"text": (
f"### 🛡️ LLM Traffic Log: `{path}`\n"
f"- **Model:** `{model_name}` | **Time:** {datetime.now().strftime('%H:%M:%S')}\n"
f"#### 📥 Request preview\n```json\n{req_clean[:800]}\n```\n"
f"#### 📤 Response\n{resp_display}"
)
}
async with httpx.AsyncClient(timeout=15.0) as client:
# ensure_ascii=False で Mattermost に生の日本語を届ける
encoded_payload = json.dumps(payload, ensure_ascii=False).encode('utf-8')
await client.post(CONFIG["webhook_url"], content=encoded_payload, headers={"Content-Type": "application/json"})
except Exception as e:
print(f"\n{C_RED}[Webhook Fail] {e}{C_RESET}")
# --- 🧠 戦略的モデル分析 (2行分割レイアウト・非短縮版) ---
async def fetch_detailed_models():
"""Ollamaの内部情報を深掘りし、メタデータを完全取得する"""
try:
async with httpx.AsyncClient(timeout=5.0) as client:
res = await client.get(f"{CONFIG['url']}/api/tags")
if res.status_code != 200:
return
models = res.json().get("models", [])
for m in models:
try:
show_res = await client.post(
f"{CONFIG['url']}/api/show", json={"name": m["name"]}
)
if show_res.status_code == 200:
data = show_res.json()
templ = data.get("template", "").lower()
params = data.get("parameters", "")
# エージェント/補完タイプの判定
m_type = "inst"
if any(x in templ for x in ["tool", "parameters", "call"]):
m_type = "agent"
elif any(
x in m["name"].lower()
for x in ["acp", "fim", "base", "code"]
):
m_type = "acp"
# コンテキスト長の抽出
ctx = "Def"
if "num_ctx" in params:
ctx = params.split("num_ctx")[-1].strip().split()[0]
m["meta"] = {
"type": m_type,
"ctx": ctx,
"q": m.get("details", {}).get("quantization_level", "?"),
}
except:
m["meta"] = {"type": "inst", "ctx": "???", "q": "?"}
CONFIG["models_cache"] = models
except:
pass
def print_analysis(detail_mode=False):
"""ガタつきを排した重厚な2行リスト表示"""
print(
f"\n{C_CYAN}--- Strategic Agent Analysis (Target: {CONFIG['vram_total']}GB VRAM) ---{C_RESET}"
)
print(
f"{C_GRAY}{'STATUS':<7} | {'MODEL IDENTIFIER':<45} | {'TYPE':<7} | {'CTX':<7} | {'SIZE'}{C_RESET}"
)
print(f"{C_GRAY}{'-' * 88}{C_RESET}")
for m in CONFIG["models_cache"]:
gb = m.get("size", 0) / (1024**3)
meta = m.get("meta", {"type": "inst", "ctx": "???", "q": "?"})
# 3060(12GB)用 VRAM判定
if gb <= CONFIG["vram_total"] * 0.75:
color, status = C_GREEN, "SAFE"
elif gb <= CONFIG["vram_total"]:
color, status = C_YELLOW, "LIMIT"
else:
color, status = C_RED, "OVER"
type_c = (
C_CYAN
if meta["type"] == "agent"
else (C_YELLOW if meta["type"] == "acp" else C_WHITE)
)
full_name = m["name"]
if "/" in full_name:
parts = full_name.rsplit("/", 1)
org_path, model_base = parts[0] + "/", parts[1]
else:
org_path, model_base = "", full_name
# 1行目: ステータス、組織パス、タイプ、CTX、サイズ
print(
f" {color}{status:<6}{C_RESET} | {C_GRAY}{org_path:<45}{C_RESET} | {type_c}{meta['type']:<7}{C_RESET} | {C_WHITE}{str(meta['ctx']):>7}{C_RESET} | {C_WHITE}{gb:>6.2f}GB{C_RESET}"
)
# 2行目: モデル本体名 + 量子化詳細 (detail_mode時)
q_info = f" {C_GRAY}[{meta['q']}]{C_RESET}" if detail_mode else ""
print(f" {'':<6} | {color}{model_base:<45}{C_RESET} | {q_info}")
print(f"{C_CYAN}{'=' * 88}{C_RESET}")
# --- 📊 VRAM リアルタイムステータス ---
def show_vram_status():
async def _fetch():
try:
async with httpx.AsyncClient(timeout=2.0) as client:
res = await client.get(f"{CONFIG['url']}/api/ps")
if res.status_code == 200:
models = res.json().get("models", [])
print(f"\n{get_ts()} {C_CYAN}--- Active VRAM Usage ---{C_RESET}")
if not models:
print(
f"{get_ts()} {C_GRAY}No models currently in VRAM.{C_RESET}"
)
for m in models:
v_gb, t_gb = (
m.get("size_vram", 0) / (1024**3),
m.get("size", 0) / (1024**3),
)
v_color = (
C_RED if v_gb > 11 else (C_YELLOW if v_gb > 8 else C_GREEN)
)
print(
f"{get_ts()} {C_WHITE}{m['name']:<25}{C_RESET} {v_color}{v_gb:.2f}{C_RESET} / {t_gb:.2f} GB"
)
except:
print(f"\n{C_RED}[Error] Could not fetch VRAM status.{C_RESET}")
if CONFIG["loop"]:
asyncio.run_coroutine_threadsafe(_fetch(), CONFIG["loop"])
# --- 🚀 Proxy Core (高速リアルタイムDUMP実装) ---
@asynccontextmanager
async def lifespan(app: FastAPI):
CONFIG["loop"] = asyncio.get_running_loop()
asyncio.create_task(fetch_detailed_models())
yield
app = FastAPI(lifespan=lifespan)
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def sticky_proxy(path: str, request: Request):
do_dump = CONFIG["dump_next"]
if do_dump:
CONFIG["dump_next"] = False
print(f"{get_ts()} {C_WHITE}/{path: <10}{C_RESET} ", end="", flush=True)
body = await request.body()
# 送信時にどのモデルか判別するための抽出
model_name = "unknown"
try:
model_name = json.loads(body).get("model", "unknown")
except:
pass
if do_dump:
print(
f"\n{C_YELLOW}{'=' * 60}\n[DUMP REQUEST: {model_name}]\n{'=' * 60}{C_RESET}"
)
try:
print(json.dumps(json.loads(body), indent=2, ensure_ascii=False))
except:
print(f"{C_GRAY}Body (Raw): {body[:500]!r}{C_RESET}")
print(f"\n{C_GREEN}[REALTIME AI RESPONSE CONTENT]{C_RESET}")
# 送信パルス
for _ in range(max(1, min(len(body) // 512, 12))):
pulse("^", C_CYAN)
pulse("|", C_YELLOW)
async def stream_response():
async with httpx.AsyncClient(timeout=CONFIG["timeout"]) as client:
try:
headers = {
k: v
for k, v in request.headers.items()
if k.lower() not in ["host", "content-length", "connection"]
}
async with client.stream(
request.method,
f"{CONFIG['url']}/{path}",
content=body,
headers=headers,
) as response:
pulse("v", C_GREEN)
full_text_buffer = []
async for chunk in response.aiter_bytes():
# チャンクから正規表現で高速テキスト抽出
raw_data = chunk.decode(errors="ignore")
matches = RE_CONTENT.findall(raw_data)
for m in matches:
try:
# 【最重要】Unicodeエスケープを戻し、化けないようにUTF-8で再構成
text = m.encode('utf-8').decode('unicode_escape').encode('latin1').decode('utf-8')
full_text_buffer.append(text)
if do_dump:
print(f"{C_WHITE}{text}{C_RESET}", end="", flush=True)
except:
# 万が一デコードに失敗した場合はマッチした文字列をそのまま保持
full_text_buffer.append(m)
pulse("v", C_GREEN)
yield chunk
# 通信完了後に非同期タスクとしてWebhook送信
if full_text_buffer:
combined_text = "".join(full_text_buffer)
asyncio.create_task(post_to_mattermost(
model_name, path, body, combined_text
))
if do_dump:
print(f"\n{C_GREEN}{'=' * 60}{C_RESET}")
pulse("*", C_YELLOW)
except Exception as e:
print(f" {C_RED}[Proxy Error] {e}{C_RESET}")
finally:
print("", flush=True)
return StreamingResponse(stream_response())
# --- ⌨️ コマンド入力ハンドラ ---
def input_handler():
print(
f"\n{C_GREEN}oproxy: Full Tactical Suite (RTX 3060 12GB Edition) Active{C_RESET}"
)
print(
f"{C_GRAY}Commands: [d]Dump [s]VRAM [l]List [ll]Detail [digit]Port [q]Exit{C_RESET}\n"
)
while True:
try:
line = sys.stdin.readline().strip().lower()
if not line:
continue
if line == "q":
os.kill(os.getpid(), signal.SIGINT)
elif line in ["d", "dd"]:
CONFIG["dump_next"] = not CONFIG["dump_next"]
print(
f"{get_ts()} DUMP Reservation: {'ON' if CONFIG['dump_next'] else 'OFF'}"
)
elif line == "s":
show_vram_status()
elif line in ["l", "ll"]:
asyncio.run_coroutine_threadsafe(
fetch_detailed_models(), CONFIG["loop"]
)
print_analysis(detail_mode=(line == "ll"))
elif line.isdigit():
p = int(line)
CONFIG["remote_port"], CONFIG["url"] = p, f"http://127.0.0.1:{p}"
print(f"{get_ts()} Target Port Switched -> {p}")
except:
pass
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-r", "--remote", type=int, default=11432)
parser.add_argument("-l", "--local", type=int, default=11434)
args = parser.parse_args()
CONFIG.update(
{"remote_port": args.remote, "url": f"http://127.0.0.1:{args.remote}"}
)
# メインスレッドでUvicorn、サブスレッドで入力待ち
threading.Thread(target=input_handler, daemon=True).start()
uvicorn.run(app, host="127.0.0.1", port=args.local, log_level="error")