# バージョン情報: Python 3.12+ / FastAPI 0.115.0 / uvicorn 0.30.0 / httpx 0.28.0 import argparse import asyncio import json import os import re import sys import threading import unicodedata from datetime import datetime import httpx import uvicorn from fastapi import FastAPI, Request from starlette.responses import StreamingResponse app = FastAPI() # --- カラー・設定 --- C_GRAY, C_CYAN, C_GREEN, C_YELLOW, C_RED, C_RESET = ( "\033[90m", "\033[96m", "\033[92m", "\033[93m", "\033[91m", "\033[0m", ) B_GREEN, B_YELLOW, B_RED = "\033[42;30m", "\033[43;30m", "\033[41;37m" MEM_LIMIT = 16.8 NAME_MAX_WIDTH = 50 CONFIG = {"url": "http://127.0.0.1:11430"} def get_ts(): return f"{C_GRAY}[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}]{C_RESET}" def get_visual_width(text): return sum( 2 if unicodedata.east_asian_width(c) in ("W", "F", "A") else 1 for c in text ) def pad_right(text, width): plain_text = re.sub(r"\033\[[0-9;]*m", "", text) return text + " " * max(0, width - get_visual_width(plain_text)) def draw_progress(current, total, model_name=""): width = 30 filled = int(width * current / total) bar = "█" * filled + "░" * (width - filled) percent = (current / total) * 100 sys.stdout.write( f"\r{get_ts()} {C_CYAN}[Scanning] |{bar}| {percent:>3.0f}% {C_GRAY}({model_name[:20]}...){C_RESET}" ) sys.stdout.flush() async def check_tool_support(client, model_name): try: res = await client.post(f"{CONFIG['url']}/api/show", json={"name": model_name}) if res.status_code == 200: info = res.json() content = " ".join( [ info.get("template", ""), info.get("system", ""), info.get("modelfile", ""), ] ).lower() return any(x in content for x in ["tool", "function", "call", "assistant"]) except: pass return False def run_analyze(): asyncio.run(analyze_models()) async def analyze_models(): url = CONFIG["url"] print(f"\n{get_ts()} {C_YELLOW}[Analyze] {url} 接続開始...{C_RESET}") try: async with httpx.AsyncClient(timeout=10.0) as client: res = await client.get(f"{url}/api/tags") if res.status_code != 200: print(f"{get_ts()} {C_RED}分析エラー: HTTP {res.status_code}{C_RESET}") return models_data = res.json().get("models", []) total = len(models_data) enriched = [] for i, m in enumerate(models_data, 1): full_name = m["name"] draw_progress(i, total, full_name.split("/")[-1]) size_gb = m["size"] / (1024**3) has_tool = await check_tool_support(client, full_name) score = ( 0 if size_gb <= MEM_LIMIT and has_tool else (1 if size_gb <= MEM_LIMIT else 2) ) enriched.append( { "full_name": full_name, "display_name": full_name.split("/")[-1], "size_gb": size_gb, "has_tool": has_tool, "score": score, } ) print("\n") enriched.sort(key=lambda x: (x["score"], x["display_name"], -x["size_gb"])) print( f"{get_ts()} {C_GREEN}--- リモートモデル戦力分析 (Target: {url}) ---{C_RESET}" ) prefix_width = 32 for em in enriched: status = ( f"{B_GREEN} READY{C_RESET}" if em["score"] == 0 else ( f"{B_YELLOW} TOOL {C_RESET}" if em["score"] == 1 else f"{B_RED} MEM {C_RESET}" ) ) tool = ( f"{C_CYAN}[TOOL]{C_RESET}" if em["has_tool"] else f"{C_GRAY}[----]{C_RESET}" ) name, size = em["display_name"], f"{em['size_gb']:>5.1f} GiB" if get_visual_width(name) > NAME_MAX_WIDTH: print(f"{get_ts()} {status} {tool} {name[:NAME_MAX_WIDTH]} {size}") print( f"{get_ts()} {' ' * (prefix_width - 15)} {C_GRAY}└ {name[NAME_MAX_WIDTH:]}{C_RESET}" ) else: print( f"{get_ts()} {status} {tool} {pad_right(name, NAME_MAX_WIDTH)} {size}" ) print(f"{get_ts()} {C_GREEN}{'-' * 80}{C_RESET}") show_help() except Exception as e: print(f"\n{get_ts()} {C_RED}分析失敗: {e}{C_RESET}") def show_help(): print(f"\n{C_CYAN}[Command Help]{C_RESET}") print(f" {C_YELLOW}:p [port]{C_RESET} - 転送先(Ollama)のポートを切り替えて再分析") print(f" {C_YELLOW}?{C_RESET} - このヘルプを表示") print(f" {C_YELLOW}q{C_RESET} - プロキシを終了") print(f"{C_GRAY}------------------------------------------{C_RESET}\n") @app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"]) async def sticky_proxy(path: str, request: Request): target_url = f"{CONFIG['url']}/{path}" # 1. リクエスト吸い込みの可視化 print(f"\n{get_ts()} /{path} -> {CONFIG['url']}: ", end="", flush=True) body = b"" async for chunk in request.stream(): body += chunk print(f"{C_CYAN}^{C_RESET}", end="", flush=True) print(f"{C_YELLOW}|{C_RESET}", end="", flush=True) headers = { k: v for k, v in request.headers.items() if k.lower() not in ["host", "content-length"] } async def stream_response(): try: async with httpx.AsyncClient(timeout=None) as client: async with client.stream( request.method, target_url, content=body, headers=headers ) as response: # 2. レスポンス吐き出しの可視化 print(f"{C_GREEN}v:{C_RESET}", end="", flush=True) async for chunk in response.aiter_bytes(): print(f"{C_GREEN}v{C_RESET}", end="", flush=True) yield chunk print(f"{C_YELLOW}*{C_RESET}", end="", flush=True) except Exception as e: print(f" {C_RED}[Err] {e}{C_RESET}") return StreamingResponse(stream_response()) def interactive_shell(): while True: try: line = sys.stdin.readline().strip().lower() if not line: continue if line == "q": os._exit(0) elif line == "?": show_help() elif line.startswith(":p"): parts = line.split() if len(parts) > 1: new_port = parts[1] CONFIG["url"] = f"http://127.0.0.1:{new_port}" threading.Thread(target=run_analyze, daemon=True).start() else: print(f"{C_RED}ポート指定ミス: :p 11435{C_RESET}") else: print(f"{C_GRAY}未知のコマンド: '{line}'{C_RESET}") except EOFError: break def main(): parser = argparse.ArgumentParser() parser.add_argument("-r", "--remote", type=int, default=11430) parser.add_argument("-l", "--local", type=int, default=11434) args = parser.parse_args() CONFIG["url"] = f"http://127.0.0.1:{args.remote}" asyncio.run(analyze_models()) threading.Thread(target=interactive_shell, daemon=True).start() uvicorn.run(app, host="127.0.0.1", port=args.local, log_level="error") if __name__ == "__main__": main()