メモリ&ツール対応 W判定版

This commit is contained in:
user 2026-02-08 01:07:27 +09:00
parent 9107c7f811
commit a6a107bd6f

141
oproxy.py
View file

@ -1,6 +1,14 @@
# バージョン情報: Python 3.12+ / FastAPI 0.115.0 / uvicorn 0.30.0 / httpx 0.28.0
import httpx, asyncio, json, sys, threading, os, argparse, re
import argparse
import asyncio
import json
import os
import re
import sys
import threading
from datetime import datetime
import httpx
import uvicorn
from fastapi import FastAPI, Request
from starlette.responses import StreamingResponse
@ -8,17 +16,41 @@ from starlette.responses import StreamingResponse
app = FastAPI()
# --- カラー・設定 ---
C_GRAY = "\033[90m" # タイムスタンプ用
C_CYAN, C_GREEN, C_YELLOW, C_RED, C_RESET = "\033[96m", "\033[92m", "\033[93m", "\033[91m", "\033[0m"
C_GRAY, C_CYAN, C_GREEN, C_YELLOW, C_RED, C_RESET = (
"\033[90m",
"\033[96m",
"\033[92m",
"\033[93m",
"\033[91m",
"\033[0m",
)
DEFAULT_REMOTE_PORT, DEFAULT_LOCAL_PORT = 11430, 11434
MEM_LIMIT = 16.8
CONFIG = {"url": f"http://127.0.0.1:{DEFAULT_REMOTE_PORT}", "timeout": httpx.Timeout(None)}
CONFIG = {
"url": f"http://127.0.0.1:{DEFAULT_REMOTE_PORT}",
"timeout": httpx.Timeout(None),
}
def get_ts():
"""現在の時刻を [HH:MM:SS.ms] 形式で返す"""
return f"{C_GRAY}[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}]{C_RESET}"
async def check_tool_support(client, model_name):
"""モデルがツール(Function Calling)をサポートしているか検証"""
try:
res = await client.post(f"{CONFIG['url']}/api/show", json={"name": model_name})
if res.status_code == 200:
info = res.json()
# テンプレートや詳細情報から 'tool' の記述を探す
details = str(info.get("template", "")) + str(info.get("details", ""))
return "tool" in details.lower() or "functions" in details.lower()
except:
pass
return False
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def sticky_proxy(path: str, request: Request):
print(f"\n{get_ts()} /{path}: ", end="", flush=True)
@ -28,25 +60,37 @@ async def sticky_proxy(path: str, request: Request):
print(f"{C_CYAN}^{C_RESET}", end="", flush=True)
print(f"{C_YELLOW}|{C_RESET}", end="", flush=True)
headers = {k: v for k, v in request.headers.items() if k.lower() not in ["host", "content-length"]}
headers = {
k: v
for k, v in request.headers.items()
if k.lower() not in ["host", "content-length"]
}
async def stream_response():
try:
async with httpx.AsyncClient(timeout=CONFIG["timeout"]) as client:
async with client.stream(request.method, f"{CONFIG['url']}/{path}", content=body, headers=headers) as response:
async with client.stream(
request.method,
f"{CONFIG['url']}/{path}",
content=body,
headers=headers,
) as response:
if response.status_code != 200:
err_data = await response.aread()
err_msg = err_data.decode(errors='ignore')
memory_match = re.findall(r"(\d+\.?\d*\s*[GM]iB)", err_msg)
report_content = f"### ❌ Ollama Error (HTTP {response.status_code})\n\n> {err_msg}\n\n"
if "memory" in err_msg.lower() and len(memory_match) >= 2:
report_content += f"**分析:** 要求 `{memory_match[0]}` に対し、空き `{memory_match[1]}`。モデルが大きすぎます。"
elif "tools" in err_msg.lower():
report_content += f"**分析:** このモデルは Cline が必要とする 'Tools (Function Calling)' に対応していません。"
err_msg = err_data.decode(errors="ignore")
# レポート生成
report = f"### ❌ Ollama Error ({response.status_code})\n\n> {err_msg}\n\n"
if "tools" in err_msg.lower():
report += "#### 💡 原因: ツール非対応\nこのモデルは Cline の自動操作に必要な `Tools` 機能を持っていません。Llama 3.1 等への変更を推奨します。"
print(f" {C_RED}{err_msg}{C_RESET}")
fake_response = json.dumps({"message": {"role": "assistant", "content": report_content}, "done": True})
yield fake_response.encode()
yield json.dumps(
{
"message": {"role": "assistant", "content": report},
"done": True,
}
).encode()
return
print(f"{C_GREEN}v:{C_RESET}", end="", flush=True)
@ -59,36 +103,65 @@ async def sticky_proxy(path: str, request: Request):
return StreamingResponse(stream_response())
async def check_connection():
url = CONFIG["url"]
print(f"{get_ts()} {C_YELLOW}[Check] {url} への接続確認中...{C_RESET}")
print(f"{get_ts()} {C_YELLOW}[Check] {url} 接続確認 & 性能分析中...{C_RESET}")
try:
async with httpx.AsyncClient(timeout=5.0) as client:
async with httpx.AsyncClient(timeout=10.0) as client:
res = await client.get(f"{url}/api/tags")
if res.status_code == 200:
models = res.json().get('models', [])
print(f"{get_ts()} {C_GREEN}--- リモートモデル一覧 (判定基準: {MEM_LIMIT} GiB) ---{C_RESET}")
models = res.json().get("models", [])
print(
f"{get_ts()} {C_GREEN}--- リモートモデル戦力分析 (基準: {MEM_LIMIT}GiB + Tool対応) ---{C_RESET}"
)
for m in models:
name = m['name']
size_gb = m['size'] / (1024**3)
name = m["name"]
size_gb = m["size"] / (1024**3)
# ツール対応を非同期でチェック
has_tool = await check_tool_support(client, name)
# 判定ロジック
reasons = []
if size_gb > MEM_LIMIT:
color, status = C_RED, "❌ OVER"
elif size_gb > MEM_LIMIT * 0.8:
color, status = C_YELLOW, "⚠️ RISKY"
reasons.append("MEM_OVER")
if not has_tool:
reasons.append("NO_TOOL")
if not reasons:
color, status = C_GREEN, "✅ READY"
elif "MEM_OVER" in reasons:
color, status = C_RED, "❌ MEM "
else:
color, status = C_GREEN, "✅ OK"
print(f"{get_ts()} {color}{status:<8} {name:<40} {size_gb:>6.1f} GiB{C_RESET}")
print(f"{get_ts()} {C_GREEN}---------------------------------------------------{C_RESET}\n")
color, status = C_YELLOW, "⚠️ TOOL" # メモリはOKだがツールがない
tool_mark = (
f"{C_CYAN}[TOOL]{C_RESET}"
if has_tool
else f"{C_GRAY}[----]{C_RESET}"
)
print(
f"{get_ts()} {color}{status:<8}{C_RESET} {tool_mark} {name:<35} {size_gb:>5.1f} GiB"
)
print(
f"{get_ts()} {C_GREEN}------------------------------------------------------------{C_RESET}\n"
)
return True
except Exception as e:
print(f"{get_ts()} {C_RED}!! 接続失敗 !!: {e}{C_RESET}\n")
return False
# (wait_for_quit と main は前回のロジックと同じですが、printを調整)
# (main, wait_for_quit は変更なし)
def wait_for_quit():
while True:
line = sys.stdin.readline()
if line and line.strip().lower() == 'q': os._exit(0)
if line and line.strip().lower() == "q":
os._exit(0)
def main():
parser = argparse.ArgumentParser()
@ -96,10 +169,12 @@ def main():
parser.add_argument("-l", "--local", type=int, default=DEFAULT_LOCAL_PORT)
args = parser.parse_args()
CONFIG["url"] = f"http://127.0.0.1:{args.remote}"
loop = asyncio.new_event_loop(); asyncio.set_event_loop(loop)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(check_connection())
threading.Thread(target=wait_for_quit, daemon=True).start()
print(f"{get_ts()} L:{args.local} -> R:{args.remote} ('q'で終了)")
uvicorn.run(app, host="127.0.0.1", port=args.local, log_level="error")
if __name__ == "__main__": main()
if __name__ == "__main__":
main()