ipf/ipf-safe-control.py

88 lines
3 KiB
Python

#!/usr/bin/env python3
# [2026-01-16] ipf Monitor & Switch Panel
# Client: Linuxmintmate 22.1 / Server: Proxmox
# Logic: Always show mapping from 'ipf -l' and switch with single key.
import subprocess
import sys
import tty
import termios
import re
import time
import threading
# 接続先の定義
BRAINS = {
'1': ('pve1-3B ', '11431'),
'2': ('pve2-7B ', '11432'),
'3': ('pve3-14B', '11433')
}
def get_key():
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(sys.stdin.fileno())
ch = sys.stdin.read(1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
return ch
def get_mapping_status():
"""ipf -l から 11434 の現在の接続先を解析する"""
try:
res = subprocess.run(["ipf", "-l"], capture_output=True, text=True)
# 出力例 "1: 127.0.0.1:11434 -> 127.0.0.1:11432" を想定
for line in res.stdout.splitlines():
if "11434" in line:
# 矢印の先のポート番号を抽出
match = re.search(r'->.*?(\d+)$', line.strip())
if match:
port = match.group(1)
# 逆引きして名前を出す
for k, v in BRAINS.items():
if v[1] == port: return f"\033[1;32m{v[0]}\033[0m ({port})"
return f"\033[1;36mUnknown\033[0m ({port})"
except: pass
return "\033[1;31mDISCONNECTED\033[0m"
def safe_switch(target_port):
"""削除・確認・追加のシーケンス"""
while True:
res = subprocess.run(["ipf", "-l"], capture_output=True, text=True)
idx = None
for line in res.stdout.splitlines():
if "11434" in line:
m = re.search(r'^(\d+):', line.strip())
if m: idx = m.group(1); break
if not idx: break
subprocess.run(["ipf", "-d", idx], stdout=subprocess.DEVNULL)
time.sleep(0.05)
subprocess.run(["ipf", f"11434:127.0.0.1:{target_port}"], stdout=subprocess.DEVNULL)
def main():
print("\033[2J\033[H") # 画面クリア
try:
while True:
# 現在の状態を取得
current = get_mapping_status()
# 最小限の1行表示
sys.stdout.write(f"\r\033[K[1:3B 2:7B 3:14B] ACTIVE: {current} | Key? (q:quit)")
sys.stdout.flush()
# キー入力を待つ (非同期にするほどでもないので、短いタイムアウトで回すのもありですが、
# ひとまず「入力があったら更新」の形にします)
# 常に最新にしたい場合は、別のスレッドで get_mapping_status を回すと良いです
key = get_key()
if key in BRAINS:
safe_switch(BRAINS[key][1])
elif key == 'q':
print("\nClosed.")
break
except KeyboardInterrupt:
pass
if __name__ == "__main__":
main()