This commit is contained in:
joe 2026-02-12 16:08:15 +09:00
commit bac1946768
4 changed files with 354 additions and 0 deletions

5
.gitignore vendored Normal file
View file

@ -0,0 +1,5 @@
user_data
gemini_profile
*.swp
gemini_profile
workspace

189
gemini_cli.py Normal file
View file

@ -0,0 +1,189 @@
import asyncio
import os
import re
import shlex
from datetime import datetime
from playwright.async_api import async_playwright
# [2026-02-11] Version: 1.9.3-Mad-Auto-Heal-Full
# [2025-07-04] Embed: Version info
# [2025-06-15] Format: Markdown Standard
class MadGeminiAgent:
def __init__(self, context, page):
self.context = context
self.page = page
self.input_selector = "div[role='textbox']"
self.container_name = "sandbox_container"
self.yolo_mode = False
self.max_iterations = 10
async def execute_in_sandbox(self, script):
"""コンテナへの電気信号送信"""
docker_cmd = f"docker exec -e OLLAMA_HOST=host.docker.internal -w /workspace {self.container_name} bash -c {shlex.quote(script)}"
proc = await asyncio.create_subprocess_shell(
docker_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
return stdout.decode('utf-8', errors='replace'), stderr.decode('utf-8', errors='replace')
async def wait_for_gemini_stable(self):
"""Geminiの打鍵が止まるまで粘り強く監視"""
print("🧠 Gemini思考中", end="", flush=True)
last_text = ""
stable_count = 0
for _ in range(120):
responses = await self.page.query_selector_all(".model-response-text")
if responses:
current_text = await responses[-1].inner_text()
if current_text == last_text and len(current_text) > 0:
stable_count += 1
else:
stable_count = 0
last_text = current_text
if stable_count >= 3:
await asyncio.sleep(1)
break
print(".", end="", flush=True)
await asyncio.sleep(1)
print(" 完了!")
def auto_heal_command(self, code):
"""
外科手術Geminiが壊したBash/C結合構文を物理的に修理する
"""
# 1. 'cat << 'EOF' > file.c#include' のように # が密着している場合、改行を挿入
code = re.sub(r"(cat <<\s*['\"]?EOF['\"]?\s*>\s*[^\s]+\.[a-z0-9]+)#", r"\1\n#", code)
# 2. 'EOF#' や 'EOFgcc' のように終端文字が次行とくっついている場合を分離
code = re.sub(r"EOF#", "EOF\n#", code)
code = re.sub(r"EOFgcc", "EOF\ngcc", code)
code = re.sub(r"EOF/", "EOF\n/", code)
# 3. その他、極端なワンライナー化の形跡があれば修正
# (必要に応じてここへ正規表現を追加)
return code
async def extract_commands(self, response_element):
"""HTMLから実行可能なコマンドを抽出し、Auto-Healを適用"""
code_elements = await response_element.query_selector_all("code")
results = []
for el in code_elements:
code_text = (await el.inner_text()).strip()
# 単語のみのタグは無視
if "\n" not in code_text and " " not in code_text:
continue
# --- Auto-Heal 適用 ---
code_text = self.auto_heal_command(code_text)
# 言語ラベルの除去
lines = code_text.split('\n')
if lines and lines[0].lower() in ['bash', 'sh', 'shell', 'c', 'python', 'cpp']:
code_text = '\n'.join(lines[1:])
if code_text.strip():
results.append(code_text.strip())
return results
async def send_feedback(self, text):
"""Geminiへのフィードバック送信"""
try:
target = self.page.locator(self.input_selector)
await target.wait_for(state="visible", timeout=10000)
await target.scroll_into_view_if_needed()
await target.click()
await target.fill(text)
await self.page.keyboard.press("Enter")
return True
except Exception as e:
print(f"\n⚠️ 通信障害: {e}")
return False
async def run_loop(self):
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] 🧪 実験室オンラインHeal-Engine 1.0)。")
while True:
mode_str = "☢️ YOLO" if self.yolo_mode else "🛡️ NORMAL"
user_input = input(f"\n🧬 Master [{mode_str}] > ").strip()
if not user_input: continue
if user_input.lower() in ["exit", "quit"]: break
if user_input.lower() == "yolo on":
self.yolo_mode = True
print("☢️ YOLOモード自動修復・自動実行を開始。")
continue
elif user_input.lower() == "yolo off":
self.yolo_mode = False
print("🛡️ 安全装置:再起動。")
continue
await self.send_feedback(user_input)
iteration = 0
while iteration < self.max_iterations:
iteration += 1
await self.wait_for_gemini_stable()
responses = await self.page.query_selector_all(".model-response-text")
if not responses: break
commands = await self.extract_commands(responses[-1])
if not commands:
raw_text = await responses[-1].inner_text()
print(f"🤖 Gemini: {raw_text[:80].replace(chr(10), ' ')}...")
break
print(f"\n⚠️ {len(commands)}件の命令を確認。")
all_results = ""
for i, code in enumerate(commands):
if self.yolo_mode:
print(f"🚀 [YOLO] Auto-executing {i+1}/{len(commands)}...")
ans = 'y'
else:
print(f"\n--- 🧪 実行案 [{i+1}/{len(commands)}] ---\n{code}\n----------------")
ans = input("▶️ 実行許可? (y/n/skip): ").lower()
if ans == 'y':
print("📡 信号送信中...")
out, err = await self.execute_in_sandbox(code)
print(f"\n┏━━━━━━━━━━━━ 実行結果 ━━━━━━━━━━━━┓")
print(f"┃ [STDOUT]\n{out if out else '(空)'}")
if err:
print(f"┃ [STDERR]\n{err}")
print(f"┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛")
all_results += f"Command: {code}\nResult: {out}\nError: {err}\n"
elif ans == 'skip': continue
else: break
if all_results:
print(f"🔄 フィードバック送信中... ({iteration}/{self.max_iterations})")
feedback = f"実行結果だ。修正が必要ならコードを出せ。成功なら次のステップへ。\n{all_results}"
if not await self.send_feedback(feedback): break
else:
break
async def main():
async with async_playwright() as p:
user_data_dir = os.path.expanduser("~/dev/gchat/gemini_profile")
context = await p.chromium.launch_persistent_context(
user_data_dir, headless=False,
args=["--disable-blink-features=AutomationControlled", "--no-sandbox"]
)
page = context.pages[0]
await page.goto("https://gemini.google.com/app")
agent = MadGeminiAgent(context, page)
await agent.run_loop()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n☢️ 実験室を封鎖。")

144
gemini_wgp.py Normal file
View file

@ -0,0 +1,144 @@
import asyncio
import os
import re
import shlex
from datetime import datetime
from playwright.async_api import async_playwright
# [2026-02-12] Version: 2.1.1-WGP-Final
# [2025-07-04] Embed: Version info
# [2025-06-21] Language: Japanese
class WebGemiAgent:
def __init__(self, context, page):
self.context = context
self.page = page
self.input_selector = "div[role='textbox']"
self.container_name = "sandbox_container"
self.yolo_mode = False
async def execute_in_sandbox(self, path, body, cmd):
"""WGPパケットの展開とコンテナ内実行"""
# ヒアドキュメントを利用してファイルを生成し、コマンドを実行
write_script = f"cat << 'EOF' > {path}\n{body}\nEOF"
full_script = f"{write_script} && {cmd}"
docker_cmd = f"docker exec -e OLLAMA_HOST=host.docker.internal -w /workspace {self.container_name} bash -c {shlex.quote(full_script)}"
proc = await asyncio.create_subprocess_shell(
docker_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
return stdout.decode('utf-8', errors='replace'), stderr.decode('utf-8', errors='replace')
async def wait_for_wgp_packet(self):
"""パケットが安定するまで待機"""
print("📡 WGP-Packet Receiving", end="", flush=True)
last_text = ""
stable_count = 0
for _ in range(60):
responses = await self.page.query_selector_all(".model-response-text")
if responses:
current_text = await responses[-1].inner_text()
# S式の括弧が閉じているか、前回の確認から変化がないかチェック
if current_text == last_text and "(" in current_text and ")" in current_text:
stable_count += 1
else:
stable_count = 0
last_text = current_text
if stable_count >= 3: break
print(".", end="", flush=True)
await asyncio.sleep(1)
print(" [Locked]")
return last_text
def parse_wgp(self, text):
"""WGP/1.1 S-Expression Parser"""
# 正規表現でS式パケットを抽出
pattern = r'\(WGP1\.1\s+:ID\s+"(?P<id>.*?)"\s+:OP\s+"(?P<op>.*?)"\s+:PATH\s+"(?P<path>.*?)"\s+:CMD\s+"(?P<cmd>.*?)"\s+:BODY\s+"(?P<body>.*?)"\)'
match = re.search(pattern, text, re.DOTALL)
if match:
return match.groupdict()
return None
async def run_loop(self):
print(f"\n[{datetime.now()}] ⚡ WGP/1.1 インターフェース起動成功。")
# ハンドシェイク用プロンプト
WGP_HANDSHAKE = (
"これより通信規約 WGP/1.1 を適用する。一切の挨拶、説明を禁止する。\n"
"応答は必ず以下のS式パケット1つのみを、Markdownのlispブロックで返せ。\n"
"```lisp\n"
"(WGP1.1 :ID \"seq-id\" :OP \"EXEC\" :PATH \"/path/to/file\" :CMD \"build && run command\" :BODY \"source code here\")\n"
"```"
)
await self.page.wait_for_selector(self.input_selector)
await self.page.locator(self.input_selector).fill(WGP_HANDSHAKE)
await self.page.keyboard.press("Enter")
await asyncio.sleep(5)
while True:
mode_str = "☢️ YOLO" if self.yolo_mode else "🛡️ NORMAL"
user_input = input(f"\n🧬 Master [{mode_str}] > ").strip()
if not user_input: continue
if user_input.lower() == "exit": break
if user_input.lower() == "yolo on":
self.yolo_mode = True
print("☢️ YOLOモード全自動通信開始。")
continue
if user_input.lower() == "yolo off":
self.yolo_mode = False
continue
# 送信
target = self.page.locator(self.input_selector)
await target.fill(user_input)
await self.page.keyboard.press("Enter")
# 応答ループ
for iteration in range(10):
raw_response = await self.wait_for_wgp_packet()
packet = self.parse_wgp(raw_response)
if packet:
print(f"📦 Packet Received. ID: {packet['id']}")
if self.yolo_mode or input("▶ Execute? (y/n): ") == "y":
out, err = await self.execute_in_sandbox(packet['path'], packet['body'], packet['cmd'])
print(f"--- RESULT ---\n{out}{err}\n--------------")
# フィードバックパケットの生成
safe_out = out.replace('"', '\\"').replace('\n', '\\n')
safe_err = err.replace('"', '\\"').replace('\n', '\\n')
feedback = f"(WGP-RESULT :ID \"{packet['id']}\" :STDOUT \"{safe_out}\" :STDERR \"{safe_err}\")"
await target.fill(feedback)
await self.page.keyboard.press("Enter")
if not self.yolo_mode: break
else:
break
else:
print(f"❌ Protocol Error or Prose Detected.")
break
async def main():
async with async_playwright() as p:
user_data_dir = os.path.expanduser("~/dev/gchat/gemini_profile")
context = await p.chromium.launch_persistent_context(
user_data_dir,
headless=False,
args=["--disable-blink-features=AutomationControlled", "--no-sandbox"]
)
page = context.pages[0]
# 純粋な文字列としてのURL
await page.goto("https://gemini.google.com/app")
agent = WebGemiAgent(context, page)
await agent.run_loop()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n☢️ 実験終了。")

16
sand.sh Normal file
View file

@ -0,0 +1,16 @@
#!/bin/sh
# workspaceディレクトリを確実に作成
mkdir -p ~/dev/gchat/workspace
# 隔離された「遊び場」を起動
docker run -d \
--name sandbox_container \
--add-host=host.docker.internal:host-gateway \
-v ~/dev/gchat/workspace:/workspace \
ubuntu:22.04 \
tail -f /dev/null
# 最小限の道具Cコンパイラ、Python、curlをインストール
docker exec sandbox_container apt-get update
docker exec sandbox_container apt-get install -y build-essential curl python3