diff --git a/cockpit.py b/cockpit.py new file mode 100644 index 0000000..640d9c9 --- /dev/null +++ b/cockpit.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 +import sys, tty, termios, subprocess, re, time + +# [2026-01-16] ipf Multi-Brain Cockpit by Gemini3.0flash +# OS: Linuxmintmate 22.1 / Server: Proxmo\x + +def get_key(): + fd = sys.stdin.fileno() + old_settings = termios.tcgetattr(fd) + try: + tty.setraw(sys.stdin.fileno()) + ch = sys.stdin.read(1) + finally: + termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) + return ch + +def get_status(targets): + """ipf -l の実体から現在の接続先を特定してラベルを返す""" + try: + res = subprocess.run(["ipf", "-l"], capture_output=True, text=True) + for line in res.stdout.splitlines(): + if "11434" in line: + for key, label, port in targets: + if port in line: + return f"\033[1;32m[{label}]\033[0m" + return "\033[1;31m[UNKNOWN]\033[0m" + except: return "???" + +def safe_switch(target_port): + """iptables/ipfのインデックスを掃除して張り直す""" + while True: + res = subprocess.run(["ipf", "-l"], capture_output=True, text=True) + idx = next((re.search(r'^(\d+):', l).group(1) for l in res.stdout.splitlines() if "11434" in l), None) + if not idx: break + subprocess.run(["ipf", "-d", idx], stdout=subprocess.DEVNULL) + time.sleep(0.05) + subprocess.run(["ipf", f"11434:127.0.0.1:{target_port}"], stdout=subprocess.DEVNULL) + +def main(): + # 引数から設定を動的に生成 (形式: key:label:port) + # 例: 1:3b:11431 2:7b:11432 + if len(sys.argv) < 2: + print("Usage: cockpit.py ...") + sys.exit(1) + + targets = [arg.split(':') for arg in sys.argv[1:]] + mapping = {t[0]: (t[1], t[2]) for t in targets} + + # 画面を1行に固定してスタイリッシュに表示 + try: + while True: + cur_label = get_status(targets) + menu = " ".join([f"{k}:{l}" for k, l, p in targets]) + + # 計器のようなデザイン + sys.stdout.write(f"\r\033[K \033[1;34mBRAIN_CTRL\033[0m >> {menu} | \033[1;35mACTIVE\033[0m >> {cur_label} ") + sys.stdout.flush() + + key = get_key() + if key in mapping: + safe_switch(mapping[key][1]) + elif key == 'q': + break + except KeyboardInterrupt: pass + print("\n") + +if __name__ == "__main__": + main() diff --git a/ipf-safe-control.py b/ipf-safe-control.py new file mode 100644 index 0000000..3a9f4bb --- /dev/null +++ b/ipf-safe-control.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 +# [2026-01-16] ipf Monitor & Switch Panel +# Client: Linuxmintmate 22.1 / Server: Proxmox +# Logic: Always show mapping from 'ipf -l' and switch with single key. + +import subprocess +import sys +import tty +import termios +import re +import time +import threading + +# 接続先の定義 +BRAINS = { + '1': ('pve1-3B ', '11431'), + '2': ('pve2-7B ', '11432'), + '3': ('pve3-14B', '11433') +} + +def get_key(): + fd = sys.stdin.fileno() + old_settings = termios.tcgetattr(fd) + try: + tty.setraw(sys.stdin.fileno()) + ch = sys.stdin.read(1) + finally: + termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) + return ch + +def get_mapping_status(): + """ipf -l から 11434 の現在の接続先を解析する""" + try: + res = subprocess.run(["ipf", "-l"], capture_output=True, text=True) + # 出力例 "1: 127.0.0.1:11434 -> 127.0.0.1:11432" を想定 + for line in res.stdout.splitlines(): + if "11434" in line: + # 矢印の先のポート番号を抽出 + match = re.search(r'->.*?(\d+)$', line.strip()) + if match: + port = match.group(1) + # 逆引きして名前を出す + for k, v in BRAINS.items(): + if v[1] == port: return f"\033[1;32m{v[0]}\033[0m ({port})" + return f"\033[1;36mUnknown\033[0m ({port})" + except: pass + return "\033[1;31mDISCONNECTED\033[0m" + +def safe_switch(target_port): + """削除・確認・追加のシーケンス""" + while True: + res = subprocess.run(["ipf", "-l"], capture_output=True, text=True) + idx = None + for line in res.stdout.splitlines(): + if "11434" in line: + m = re.search(r'^(\d+):', line.strip()) + if m: idx = m.group(1); break + if not idx: break + subprocess.run(["ipf", "-d", idx], stdout=subprocess.DEVNULL) + time.sleep(0.05) + + subprocess.run(["ipf", f"11434:127.0.0.1:{target_port}"], stdout=subprocess.DEVNULL) + +def main(): + print("\033[2J\033[H") # 画面クリア + try: + while True: + # 現在の状態を取得 + current = get_mapping_status() + + # 最小限の1行表示 + sys.stdout.write(f"\r\033[K[1:3B 2:7B 3:14B] ACTIVE: {current} | Key? (q:quit)") + sys.stdout.flush() + + # キー入力を待つ (非同期にするほどでもないので、短いタイムアウトで回すのもありですが、 + # ひとまず「入力があったら更新」の形にします) + # 常に最新にしたい場合は、別のスレッドで get_mapping_status を回すと良いです + key = get_key() + if key in BRAINS: + safe_switch(BRAINS[key][1]) + elif key == 'q': + print("\nClosed.") + break + except KeyboardInterrupt: + pass + +if __name__ == "__main__": + main()