oproxy/oproxy.py
2026-02-08 01:12:41 +09:00

185 lines
6.2 KiB
Python
Executable file
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# バージョン情報: Python 3.12+ / FastAPI 0.115.0 / uvicorn 0.30.0 / httpx 0.28.0
import argparse
import asyncio
import json
import os
import re
import sys
import threading
import unicodedata
from datetime import datetime
import httpx
import uvicorn
from fastapi import FastAPI, Request
from starlette.responses import StreamingResponse
app = FastAPI()
# --- カラー・設定 ---
C_GRAY, C_CYAN, C_GREEN, C_YELLOW, C_RED, C_RESET = (
"\033[90m",
"\033[96m",
"\033[92m",
"\033[93m",
"\033[91m",
"\033[0m",
)
DEFAULT_REMOTE_PORT, DEFAULT_LOCAL_PORT = 11430, 11434
MEM_LIMIT = 16.8
CONFIG = {
"url": f"http://127.0.0.1:{DEFAULT_REMOTE_PORT}",
"timeout": httpx.Timeout(None),
}
def get_ts():
return f"{C_GRAY}[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}]{C_RESET}"
def get_visual_width(text):
"""全角を2、半角を1として計算する"""
return sum(2 if unicodedata.east_asian_width(c) in "WF" else 1 for c in text)
def pad_right(text, width):
"""見た目の幅を揃えるためのパディング"""
return text + " " * (width - get_visual_width(text))
async def check_tool_support(client, model_name):
try:
res = await client.post(f"{CONFIG['url']}/api/show", json={"name": model_name})
if res.status_code == 200:
info = res.json()
details = str(info.get("template", "")) + str(info.get("details", ""))
return "tool" in details.lower() or "functions" in details.lower()
except:
pass
return False
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def sticky_proxy(path: str, request: Request):
print(f"\n{get_ts()} /{path}: ", end="", flush=True)
body = b""
async for chunk in request.stream():
body += chunk
print(f"{C_CYAN}^{C_RESET}", end="", flush=True)
print(f"{C_YELLOW}|{C_RESET}", end="", flush=True)
headers = {
k: v
for k, v in request.headers.items()
if k.lower() not in ["host", "content-length"]
}
async def stream_response():
try:
async with httpx.AsyncClient(timeout=CONFIG["timeout"]) as client:
async with client.stream(
request.method,
f"{CONFIG['url']}/{path}",
content=body,
headers=headers,
) as response:
if response.status_code != 200:
err_data = await response.aread()
err_msg = err_data.decode(errors="ignore")
print(f" {C_RED}{err_msg}{C_RESET}")
yield json.dumps(
{
"message": {
"role": "assistant",
"content": f"### Error\n{err_msg}",
},
"done": True,
}
).encode()
return
print(f"{C_GREEN}v:{C_RESET}", end="", flush=True)
async for chunk in response.aiter_bytes():
print(f"{C_GREEN}v{C_RESET}", end="", flush=True)
yield chunk
print(f"{C_YELLOW}*{C_RESET}", end="", flush=True)
except Exception as e:
print(f" {C_RED}[Err] {e}{C_RESET}")
return StreamingResponse(stream_response())
async def check_connection():
url = CONFIG["url"]
print(f"{get_ts()} {C_YELLOW}[Check] {url} 接続確認 & 性能分析中...{C_RESET}")
try:
async with httpx.AsyncClient(timeout=10.0) as client:
res = await client.get(f"{url}/api/tags")
if res.status_code == 200:
models = res.json().get("models", [])
header = (
f"--- リモートモデル戦力分析 (基準: {MEM_LIMIT}GiB + Tool対応) ---"
)
print(f"{get_ts()} {C_GREEN}{header}{C_RESET}")
for m in models:
name = m["name"]
size_gb = m["size"] / (1024**3)
has_tool = await check_tool_support(client, name)
reasons = []
if size_gb > MEM_LIMIT:
reasons.append("MEM_OVER")
if not has_tool:
reasons.append("NO_TOOL")
if not reasons:
color, status = C_GREEN, "✅ READY"
elif "MEM_OVER" in reasons:
color, status = C_RED, "❌ MEM "
else:
color, status = C_YELLOW, "⚠️ TOOL"
tool_mark = (
f"{C_CYAN}[TOOL]{C_RESET}"
if has_tool
else f"{C_GRAY}[----]{C_RESET}"
)
# 名前の表示幅を40に固定全角対応
display_name = pad_right(name, 45)
print(
f"{get_ts()} {color}{status:<8}{C_RESET} {tool_mark} {display_name} {size_gb:>5.1f} GiB"
)
print(
f"{get_ts()} {C_GREEN}{'-' * get_visual_width(header)}{C_RESET}\n"
)
return True
except Exception as e:
print(f"{get_ts()} {C_RED}!! 接続失敗 !!: {e}{C_RESET}\n")
return False
def wait_for_quit():
while True:
line = sys.stdin.readline()
if line and line.strip().lower() == "q":
os._exit(0)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-r", "--remote", type=int, default=DEFAULT_REMOTE_PORT)
parser.add_argument("-l", "--local", type=int, default=DEFAULT_LOCAL_PORT)
args = parser.parse_args()
CONFIG["url"] = f"http://127.0.0.1:{args.remote}"
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(check_connection())
threading.Thread(target=wait_for_quit, daemon=True).start()
uvicorn.run(app, host="127.0.0.1", port=args.local, log_level="error")
if __name__ == "__main__":
main()