oproxy/oproxy.py
2026-02-08 01:14:05 +09:00

128 lines
5.7 KiB
Python
Executable file

# バージョン情報: Python 3.12+ / FastAPI 0.115.0 / uvicorn 0.30.0 / httpx 0.28.0
import httpx, asyncio, json, sys, threading, os, argparse, re, unicodedata
from datetime import datetime
import uvicorn
from fastapi import FastAPI, Request
from starlette.responses import StreamingResponse
app = FastAPI()
C_GRAY, C_CYAN, C_GREEN, C_YELLOW, C_RED, C_RESET = "\033[90m", "\033[96m", "\033[92m", "\033[93m", "\033[91m", "\033[0m"
DEFAULT_REMOTE_PORT, DEFAULT_LOCAL_PORT = 11430, 11434
MEM_LIMIT = 16.8
CONFIG = {"url": f"http://127.0.0.1:{DEFAULT_REMOTE_PORT}", "timeout": httpx.Timeout(None)}
def get_ts():
return f"{C_GRAY}[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}]{C_RESET}"
def clean_text(text):
"""特殊な空白文字(\xa0等)を通常の空白に置換し、前後のゴミを取る"""
return "".join(c for c in text if unicodedata.category(c) != 'Cf').replace('\xa0', ' ').strip()
def get_visual_width(text):
"""文字の表示幅を計算 (全角2, 半角1)"""
width = 0
for c in text:
if unicodedata.east_asian_width(c) in ('W', 'F', 'A'):
width += 2
else:
width += 1
return width
def pad_right(text, width):
"""表示幅に基づいて正確に右パディングする"""
t = clean_text(text)
curr_w = get_visual_width(t)
return t + ' ' * max(0, width - curr_w)
# --- 以下、ロジック部分は維持しつつ表示部のみ微調整 ---
async def check_tool_support(client, model_name):
try:
res = await client.post(f"{CONFIG['url']}/api/show", json={"name": model_name})
if res.status_code == 200:
info = res.json()
# template, system, modelfile 等から tool 関連のキーワードを執念深く探す
look_in = [info.get("template", ""), info.get("system", ""), info.get("modelfile", "")]
content = " ".join(look_in).lower()
return any(x in content for x in ["tool", "function", "call", "assistant"])
except: pass
return False
async def check_connection():
url = CONFIG["url"]
print(f"{get_ts()} {C_YELLOW}[Check] {url} 分析中...{C_RESET}")
try:
async with httpx.AsyncClient(timeout=10.0) as client:
res = await client.get(f"{url}/api/tags")
if res.status_code == 200:
models = res.json().get('models', [])
header_text = f"--- リモートモデル戦力分析 (基準: {MEM_LIMIT}GiB + Tool対応) ---"
print(f"{get_ts()} {C_GREEN}{header_text}{C_RESET}")
for m in models:
name = m['name']
size_gb = m['size'] / (1024**3)
has_tool = await check_tool_support(client, name)
if size_gb > MEM_LIMIT: color, status = C_RED, "❌ MEM "
elif not has_tool: color, status = C_YELLOW, "⚠️ TOOL"
else: color, status = C_GREEN, "✅ READY"
tool_mark = f"{C_CYAN}[TOOL]{C_RESET}" if has_tool else f"{C_GRAY}[----]{C_RESET}"
# カラム幅を微調整 (名前: 70)
display_name = pad_right(name, 70)
print(f"{get_ts()} {color}{status:<8}{C_RESET} {tool_mark} {display_name} {size_gb:>5.1f} GiB")
print(f"{get_ts()} {C_GREEN}{'-' * 100}{C_RESET}\n")
return True
except Exception as e:
print(f"{get_ts()} {C_RED}!! 接続失敗 !!: {e}{C_RESET}")
return False
# --- API Route, main, wait_for_quit --- (省略せず統合してください)
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def sticky_proxy(path: str, request: Request):
print(f"\n{get_ts()} /{path}: ", end="", flush=True)
body = b""; async for chunk in request.stream():
body += chunk; print(f"{C_CYAN}^{C_RESET}", end="", flush=True)
print(f"{C_YELLOW}|{C_RESET}", end="", flush=True)
headers = {k: v for k, v in request.headers.items() if k.lower() not in ["host", "content-length"]}
async def stream_response():
try:
async with httpx.AsyncClient(timeout=None) as client:
async with client.stream(request.method, f"{CONFIG['url']}/{path}", content=body, headers=headers) as response:
if response.status_code != 200:
err_data = await response.aread(); err_msg = err_data.decode(errors='ignore')
print(f" {C_RED}{err_msg}{C_RESET}")
yield json.dumps({"message": {"role": "assistant", "content": f"### Error\n{err_msg}"}, "done": True}).encode()
return
print(f"{C_GREEN}v:{C_RESET}", end="", flush=True)
async for chunk in response.aiter_bytes():
print(f"{C_GREEN}v{C_RESET}", end="", flush=True)
yield chunk
print(f"{C_YELLOW}*{C_RESET}", end="", flush=True)
except Exception as e: print(f" {C_RED}[Err] {e}{C_RESET}")
return StreamingResponse(stream_response())
def wait_for_quit():
while True:
line = sys.stdin.readline()
if line and line.strip().lower() == 'q': os._exit(0)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-r", "--remote", type=int, default=DEFAULT_REMOTE_PORT)
parser.add_argument("-l", "--local", type=int, default=DEFAULT_LOCAL_PORT)
args = parser.parse_args()
CONFIG["url"] = f"http://127.0.0.1:{args.remote}"
loop = asyncio.new_event_loop(); asyncio.set_event_loop(loop)
loop.run_until_complete(check_connection())
threading.Thread(target=wait_for_quit, daemon=True).start()
uvicorn.run(app, host="127.0.0.1", port=args.local, log_level="error")
if __name__ == "__main__": main()