oproxy/oproxy.py
2026-02-14 13:42:20 +09:00

351 lines
No EOL
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# バージョン情報: Python 3.12+ / FastAPI 0.115.0 / uvicorn 0.30.0 / httpx 0.28.0
# [2026-02-14] 3060(12GB)戦術支援Mattermost Webhook連携・遅延送信実装版
import argparse
import asyncio
import json
import os
import re
import signal
import sys
import threading
from contextlib import asynccontextmanager
from datetime import datetime
import httpx
import uvicorn
from fastapi import FastAPI, Request
from starlette.responses import StreamingResponse
# --- 視認性向上のためのカラー定数 ---
C_GRAY, C_CYAN, C_GREEN, C_YELLOW, C_RED, C_WHITE, C_RESET = (
"\033[90m",
"\033[96m",
"\033[92m",
"\033[93m",
"\033[91m",
"\033[97m",
"\033[0m",
)
CONFIG = {
"remote_port": 11432,
"url": "http://127.0.0.1:11432",
"timeout": httpx.Timeout(600.0, connect=10.0),
"dump_next": False,
"models_cache": [],
"loop": None,
"vram_total": 12.0,
# --- Mattermost 連携設定 ---
"webhook_url": "https://mm.ka.sugeee.com/hooks/ctjisw6ugjg85nhddo9ox9zt4c",
"enable_webhook": True,
"max_log_len": 4000
}
# 高速抽出用正規表現
RE_CONTENT = re.compile(r'"(?:content|response)":\s*"(.*?)(?<!\\)"')
def get_ts():
return f"{C_GRAY}[{datetime.now().strftime('%H:%M:%S')}]{C_RESET}"
def pulse(char, color=C_RESET):
print(f"{color}{char}{C_RESET}", end="", flush=True)
# --- 🛰️ Mattermost 送信エンジン (非同期・遅延送信) ---
async def post_to_mattermost(model_name, path, req_body, resp_text):
"""通信完了後にバックグラウンドでMattermostへログを飛ばす"""
if not CONFIG["webhook_url"] or not CONFIG["enable_webhook"]:
return
try:
req_obj = json.loads(req_body)
req_clean = json.dumps(req_obj, indent=2, ensure_ascii=False)
except:
req_clean = req_body
resp_display = (
resp_text if len(resp_text) < CONFIG["max_log_len"]
else resp_text[:CONFIG["max_log_len"]] + "\n\n*(Truncated due to length)*"
)
payload = {
"username": f"oproxy:{model_name}",
"icon_url": "https://ollama.com/public/ollama.png",
"text": (
f"### 🛡️ LLM Traffic Log: `{path}`\n"
f"- **Model:** `{model_name}` | **Time:** {datetime.now().strftime('%H:%M:%S')}\n"
f"#### 📥 Request\n```json\n{req_clean[:800]}\n```\n"
f"#### 📤 Response\n{resp_display}"
)
}
try:
async with httpx.AsyncClient(timeout=10.0) as client:
await client.post(CONFIG["webhook_url"], json=payload)
except Exception as e:
print(f"\n{C_RED}[Webhook Fail] {e}{C_RESET}")
# --- 🧠 戦略적モデル分析 (2行分割レイアウト) ---
async def fetch_detailed_models():
"""Ollamaの内部情報を深掘りし、メタデータを完全取得する"""
try:
async with httpx.AsyncClient(timeout=5.0) as client:
res = await client.get(f"{CONFIG['url']}/api/tags")
if res.status_code != 200:
return
models = res.json().get("models", [])
for m in models:
try:
show_res = await client.post(
f"{CONFIG['url']}/api/show", json={"name": m["name"]}
)
if show_res.status_code == 200:
data = show_res.json()
templ = data.get("template", "").lower()
params = data.get("parameters", "")
# エージェント/補完タイプの判定
m_type = "inst"
if any(x in templ for x in ["tool", "parameters", "call"]):
m_type = "agent"
elif any(
x in m["name"].lower()
for x in ["acp", "fim", "base", "code"]
):
m_type = "acp"
# コンテキスト長の抽出
ctx = "Def"
if "num_ctx" in params:
ctx = params.split("num_ctx")[-1].strip().split()[0]
m["meta"] = {
"type": m_type,
"ctx": ctx,
"q": m.get("details", {}).get("quantization_level", "?"),
}
except:
m["meta"] = {"type": "inst", "ctx": "???", "q": "?"}
CONFIG["models_cache"] = models
except:
pass
def print_analysis(detail_mode=False):
"""ガタつきを排した重厚な2行リスト表示"""
print(
f"\n{C_CYAN}--- Strategic Agent Analysis (Target: {CONFIG['vram_total']}GB VRAM) ---{C_RESET}"
)
print(
f"{C_GRAY}{'STATUS':<7} | {'MODEL IDENTIFIER':<45} | {'TYPE':<7} | {'CTX':<7} | {'SIZE'}{C_RESET}"
)
print(f"{C_GRAY}{'-' * 88}{C_RESET}")
for m in CONFIG["models_cache"]:
gb = m.get("size", 0) / (1024**3)
meta = m.get("meta", {"type": "inst", "ctx": "???", "q": "?"})
# 3060(12GB)用 VRAM判定
if gb <= CONFIG["vram_total"] * 0.75:
color, status = C_GREEN, "SAFE"
elif gb <= CONFIG["vram_total"]:
color, status = C_YELLOW, "LIMIT"
else:
color, status = C_RED, "OVER"
type_c = (
C_CYAN
if meta["type"] == "agent"
else (C_YELLOW if meta["type"] == "acp" else C_WHITE)
)
full_name = m["name"]
if "/" in full_name:
parts = full_name.rsplit("/", 1)
org_path, model_base = parts[0] + "/", parts[1]
else:
org_path, model_base = "", full_name
# 1行目: ステータス、組織パス、タイプ、CTX、サイズ
print(
f" {color}{status:<6}{C_RESET} | {C_GRAY}{org_path:<45}{C_RESET} | {type_c}{meta['type']:<7}{C_RESET} | {C_WHITE}{str(meta['ctx']):>7}{C_RESET} | {C_WHITE}{gb:>6.2f}GB{C_RESET}"
)
# 2行目: モデル本体名 + 量子化詳細 (detail_mode時)
q_info = f" {C_GRAY}[{meta['q']}]{C_RESET}" if detail_mode else ""
print(f" {'':<6} | {color}{model_base:<45}{C_RESET} | {q_info}")
print(f"{C_CYAN}{'=' * 88}{C_RESET}")
# --- 📊 VRAM リアルタイムステータス ---
def show_vram_status():
async def _fetch():
try:
async with httpx.AsyncClient(timeout=2.0) as client:
res = await client.get(f"{CONFIG['url']}/api/ps")
if res.status_code == 200:
models = res.json().get("models", [])
print(f"\n{get_ts()} {C_CYAN}--- Active VRAM Usage ---{C_RESET}")
if not models:
print(
f"{get_ts()} {C_GRAY}No models currently in VRAM.{C_RESET}"
)
for m in models:
v_gb, t_gb = (
m.get("size_vram", 0) / (1024**3),
m.get("size", 0) / (1024**3),
)
v_color = (
C_RED if v_gb > 11 else (C_YELLOW if v_gb > 8 else C_GREEN)
)
print(
f"{get_ts()} {C_WHITE}{m['name']:<25}{C_RESET} {v_color}{v_gb:.2f}{C_RESET} / {t_gb:.2f} GB"
)
except:
print(f"\n{C_RED}[Error] Could not fetch VRAM status.{C_RESET}")
if CONFIG["loop"]:
asyncio.run_coroutine_threadsafe(_fetch(), CONFIG["loop"])
# --- 🚀 Proxy Core (高速リアルタイムDUMP実装) ---
@asynccontextmanager
async def lifespan(app: FastAPI):
CONFIG["loop"] = asyncio.get_running_loop()
asyncio.create_task(fetch_detailed_models())
yield
app = FastAPI(lifespan=lifespan)
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def sticky_proxy(path: str, request: Request):
do_dump = CONFIG["dump_next"]
if do_dump:
CONFIG["dump_next"] = False
print(f"{get_ts()} {C_WHITE}/{path: <10}{C_RESET} ", end="", flush=True)
body = await request.body()
# モデル名の特定
model_name = "unknown"
try:
model_name = json.loads(body).get("model", "unknown")
except:
pass
if do_dump:
print(
f"\n{C_YELLOW}{'=' * 60}\n[DUMP REQUEST: {request.method} /{path}]\n{'=' * 60}{C_RESET}"
)
try:
print(json.dumps(json.loads(body), indent=2, ensure_ascii=False))
except:
print(f"{C_GRAY}Body (Raw): {body[:500]!r}{C_RESET}")
print(f"\n{C_GREEN}[REALTIME AI RESPONSE CONTENT]{C_RESET}")
# 送信パルス
for _ in range(max(1, min(len(body) // 512, 12))):
pulse("^", C_CYAN)
pulse("|", C_YELLOW)
async def stream_response():
async with httpx.AsyncClient(timeout=CONFIG["timeout"]) as client:
try:
headers = {
k: v
for k, v in request.headers.items()
if k.lower() not in ["host", "content-length", "connection"]
}
async with client.stream(
request.method,
f"{CONFIG['url']}/{path}",
content=body,
headers=headers,
) as response:
pulse("v", C_GREEN)
full_text_buffer = []
async for chunk in response.aiter_bytes():
raw_data = chunk.decode(errors="ignore")
matches = RE_CONTENT.findall(raw_data)
for m in matches:
try:
text = m.encode().decode("unicode_escape", errors="ignore")
full_text_buffer.append(text)
if do_dump:
print(f"{C_WHITE}{text}{C_RESET}", end="", flush=True)
except:
pass
pulse("v", C_GREEN)
yield chunk
# 通信完了後に非同期タスクとしてWebhook送信
if full_text_buffer:
combined_text = "".join(full_text_buffer)
asyncio.create_task(post_to_mattermost(
model_name, path, body.decode(errors='ignore'), combined_text
))
if do_dump:
print(f"\n{C_GREEN}{'=' * 60}{C_RESET}")
pulse("*", C_YELLOW)
except Exception as e:
print(f" {C_RED}[Proxy Error] {e}{C_RESET}")
finally:
print("", flush=True)
return StreamingResponse(stream_response())
# --- ⌨️ コマンド入力ハンドラ ---
def input_handler():
print(
f"\n{C_GREEN}oproxy: Full Tactical Suite (RTX 3060 12GB Edition) Active{C_RESET}"
)
print(
f"{C_GRAY}Commands: [d]Dump [s]VRAM [l]List [ll]Detail [digit]Port [q]Exit{C_RESET}\n"
)
while True:
try:
line = sys.stdin.readline().strip().lower()
if not line:
continue
if line == "q":
os.kill(os.getpid(), signal.SIGINT)
elif line in ["d", "dd"]:
CONFIG["dump_next"] = not CONFIG["dump_next"]
print(
f"{get_ts()} DUMP Reservation: {'ON' if CONFIG['dump_next'] else 'OFF'}"
)
elif line == "s":
show_vram_status()
elif line in ["l", "ll"]:
asyncio.run_coroutine_threadsafe(
fetch_detailed_models(), CONFIG["loop"]
)
print_analysis(detail_mode=(line == "ll"))
elif line.isdigit():
p = int(line)
CONFIG["remote_port"], CONFIG["url"] = p, f"http://127.0.0.1:{p}"
print(f"{get_ts()} Target Port Switched -> {p}")
except:
pass
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-r", "--remote", type=int, default=11432)
parser.add_argument("-l", "--local", type=int, default=11434)
args = parser.parse_args()
CONFIG.update(
{"remote_port": args.remote, "url": f"http://127.0.0.1:{args.remote}"}
)
# メインスレッドでUvicorn、サブスレッドで入力待ち
threading.Thread(target=input_handler, daemon=True).start()
uvicorn.run(app, host="127.0.0.1", port=args.local, log_level="error")