144 lines
No EOL
6 KiB
Python
144 lines
No EOL
6 KiB
Python
import asyncio
|
||
import os
|
||
import re
|
||
import shlex
|
||
from datetime import datetime
|
||
from playwright.async_api import async_playwright
|
||
|
||
# [2026-02-12] Version: 2.1.1-WGP-Final
|
||
# [2025-07-04] Embed: Version info
|
||
# [2025-06-21] Language: Japanese
|
||
|
||
class WebGemiAgent:
|
||
def __init__(self, context, page):
|
||
self.context = context
|
||
self.page = page
|
||
self.input_selector = "div[role='textbox']"
|
||
self.container_name = "sandbox_container"
|
||
self.yolo_mode = False
|
||
|
||
async def execute_in_sandbox(self, path, body, cmd):
|
||
"""WGPパケットの展開とコンテナ内実行"""
|
||
# ヒアドキュメントを利用してファイルを生成し、コマンドを実行
|
||
write_script = f"cat << 'EOF' > {path}\n{body}\nEOF"
|
||
full_script = f"{write_script} && {cmd}"
|
||
|
||
docker_cmd = f"docker exec -e OLLAMA_HOST=host.docker.internal -w /workspace {self.container_name} bash -c {shlex.quote(full_script)}"
|
||
proc = await asyncio.create_subprocess_shell(
|
||
docker_cmd,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE
|
||
)
|
||
stdout, stderr = await proc.communicate()
|
||
return stdout.decode('utf-8', errors='replace'), stderr.decode('utf-8', errors='replace')
|
||
|
||
async def wait_for_wgp_packet(self):
|
||
"""パケットが安定するまで待機"""
|
||
print("📡 WGP-Packet Receiving", end="", flush=True)
|
||
last_text = ""
|
||
stable_count = 0
|
||
for _ in range(60):
|
||
responses = await self.page.query_selector_all(".model-response-text")
|
||
if responses:
|
||
current_text = await responses[-1].inner_text()
|
||
# S式の括弧が閉じているか、前回の確認から変化がないかチェック
|
||
if current_text == last_text and "(" in current_text and ")" in current_text:
|
||
stable_count += 1
|
||
else:
|
||
stable_count = 0
|
||
last_text = current_text
|
||
if stable_count >= 3: break
|
||
print(".", end="", flush=True)
|
||
await asyncio.sleep(1)
|
||
print(" [Locked]")
|
||
return last_text
|
||
|
||
def parse_wgp(self, text):
|
||
"""WGP/1.1 S-Expression Parser"""
|
||
# 正規表現でS式パケットを抽出
|
||
pattern = r'\(WGP1\.1\s+:ID\s+"(?P<id>.*?)"\s+:OP\s+"(?P<op>.*?)"\s+:PATH\s+"(?P<path>.*?)"\s+:CMD\s+"(?P<cmd>.*?)"\s+:BODY\s+"(?P<body>.*?)"\)'
|
||
match = re.search(pattern, text, re.DOTALL)
|
||
if match:
|
||
return match.groupdict()
|
||
return None
|
||
|
||
async def run_loop(self):
|
||
print(f"\n[{datetime.now()}] ⚡ WGP/1.1 インターフェース起動成功。")
|
||
|
||
# ハンドシェイク用プロンプト
|
||
WGP_HANDSHAKE = (
|
||
"これより通信規約 WGP/1.1 を適用する。一切の挨拶、説明を禁止する。\n"
|
||
"応答は必ず以下のS式パケット1つのみを、Markdownのlispブロックで返せ。\n"
|
||
"```lisp\n"
|
||
"(WGP1.1 :ID \"seq-id\" :OP \"EXEC\" :PATH \"/path/to/file\" :CMD \"build && run command\" :BODY \"source code here\")\n"
|
||
"```"
|
||
)
|
||
|
||
await self.page.wait_for_selector(self.input_selector)
|
||
await self.page.locator(self.input_selector).fill(WGP_HANDSHAKE)
|
||
await self.page.keyboard.press("Enter")
|
||
await asyncio.sleep(5)
|
||
|
||
while True:
|
||
mode_str = "☢️ YOLO" if self.yolo_mode else "🛡️ NORMAL"
|
||
user_input = input(f"\n🧬 Master [{mode_str}] > ").strip()
|
||
|
||
if not user_input: continue
|
||
if user_input.lower() == "exit": break
|
||
if user_input.lower() == "yolo on":
|
||
self.yolo_mode = True
|
||
print("☢️ YOLOモード:全自動通信開始。")
|
||
continue
|
||
if user_input.lower() == "yolo off":
|
||
self.yolo_mode = False
|
||
continue
|
||
|
||
# 送信
|
||
target = self.page.locator(self.input_selector)
|
||
await target.fill(user_input)
|
||
await self.page.keyboard.press("Enter")
|
||
|
||
# 応答ループ
|
||
for iteration in range(10):
|
||
raw_response = await self.wait_for_wgp_packet()
|
||
packet = self.parse_wgp(raw_response)
|
||
|
||
if packet:
|
||
print(f"📦 Packet Received. ID: {packet['id']}")
|
||
if self.yolo_mode or input("▶ Execute? (y/n): ") == "y":
|
||
out, err = await self.execute_in_sandbox(packet['path'], packet['body'], packet['cmd'])
|
||
print(f"--- RESULT ---\n{out}{err}\n--------------")
|
||
|
||
# フィードバックパケットの生成
|
||
safe_out = out.replace('"', '\\"').replace('\n', '\\n')
|
||
safe_err = err.replace('"', '\\"').replace('\n', '\\n')
|
||
feedback = f"(WGP-RESULT :ID \"{packet['id']}\" :STDOUT \"{safe_out}\" :STDERR \"{safe_err}\")"
|
||
|
||
await target.fill(feedback)
|
||
await self.page.keyboard.press("Enter")
|
||
if not self.yolo_mode: break
|
||
else:
|
||
break
|
||
else:
|
||
print(f"❌ Protocol Error or Prose Detected.")
|
||
break
|
||
|
||
async def main():
|
||
async with async_playwright() as p:
|
||
user_data_dir = os.path.expanduser("~/dev/gchat/gemini_profile")
|
||
context = await p.chromium.launch_persistent_context(
|
||
user_data_dir,
|
||
headless=False,
|
||
args=["--disable-blink-features=AutomationControlled", "--no-sandbox"]
|
||
)
|
||
page = context.pages[0]
|
||
# 純粋な文字列としてのURL
|
||
await page.goto("https://gemini.google.com/app")
|
||
agent = WebGemiAgent(context, page)
|
||
await agent.run_loop()
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
asyncio.run(main())
|
||
except KeyboardInterrupt:
|
||
print("\n☢️ 実験終了。") |