h-1.flet.3/app_robust.py

681 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import flet as ft
import sqlite3
import signal
import sys
import logging
import random
from datetime import datetime
from typing import List, Dict, Optional
class ErrorHandler:
"""グローバルエラーハンドラ"""
@staticmethod
def handle_error(error: Exception, context: str = ""):
"""エラーを一元処理"""
error_msg = f"{context}: {str(error)}"
logging.error(error_msg)
print(f"{error_msg}")
# SnackBarでユーザーに通知
try:
# グローバルページ参照用
if hasattr(ErrorHandler, 'current_page'):
ErrorHandler.show_snackbar(ErrorHandler.current_page, error_msg, ft.Colors.RED)
except:
pass
@staticmethod
def show_snackbar(page, message: str, color: ft.Colors = ft.Colors.RED):
"""SnackBarを表示"""
try:
page.snack_bar = ft.SnackBar(
content=ft.Text(message),
bgcolor=color
)
page.snack_bar.open = True
page.update()
except:
pass
class DummyDataGenerator:
"""テスト用ダミーデータ生成"""
@staticmethod
def generate_customers(count: int = 100) -> List[Dict]:
"""ダミー顧客データ生成"""
first_names = ["田中", "佐藤", "鈴木", "高橋", "伊藤", "渡辺", "山本", "中村", "小林", "加藤"]
last_names = ["太郎", "次郎", "三郎", "花子", "美子", "健一", "恵子", "大輔", "由美", "翔太"]
customers = []
for i in range(count):
name = f"{random.choice(first_names)} {random.choice(last_names)}"
customers.append({
'id': i + 1,
'name': name,
'phone': f"090-{random.randint(1000, 9999)}-{random.randint(1000, 9999)}",
'email': f"customer{i+1}@example.com",
'address': f"東京都{random.choice(['渋谷区', '新宿区', '港区', '千代田区'])}{random.randint(1, 50)}-{random.randint(1, 10)}"
})
return customers
@staticmethod
def generate_products(count: int = 50) -> List[Dict]:
"""ダミー商品データ生成"""
categories = ["電子機器", "衣料品", "食品", "書籍", "家具"]
products = []
for i in range(count):
category = random.choice(categories)
products.append({
'id': i + 1,
'name': f"{category}{i+1}",
'category': category,
'price': random.randint(100, 50000),
'stock': random.randint(0, 100)
})
return products
@staticmethod
def generate_sales(count: int = 200) -> List[Dict]:
"""ダミー売上データ生成"""
customers = DummyDataGenerator.generate_customers(20)
products = DummyDataGenerator.generate_products(30)
sales = []
for i in range(count):
customer = random.choice(customers)
product = random.choice(products)
quantity = random.randint(1, 10)
sales.append({
'id': i + 1,
'customer_id': customer['id'],
'customer_name': customer['name'],
'product_id': product['id'],
'product_name': product['name'],
'quantity': quantity,
'unit_price': product['price'],
'total_price': quantity * product['price'],
'date': datetime.now().strftime("%Y-%m-%d"),
'created_at': datetime.now().isoformat()
})
return sales
class NavigationHistory:
"""ナビゲーション履歴管理"""
def __init__(self):
self.history: List[Dict] = []
self.max_history = 10
def add_to_history(self, page_name: str, page_data: Dict = None):
"""履歴に追加"""
history_item = {
'page': page_name,
'data': page_data,
'timestamp': datetime.now().isoformat()
}
# 重複を避ける
self.history = [item for item in self.history if item['page'] != page_name]
self.history.insert(0, history_item)
# 履歴数を制限
if len(self.history) > self.max_history:
self.history = self.history[:self.max_history]
def get_last_page(self) -> Optional[Dict]:
"""最後のページを取得"""
return self.history[0] if self.history else None
def get_history(self) -> List[Dict]:
"""履歴を取得"""
return self.history
class SafePageManager:
"""安全なページマネージャー"""
def __init__(self, page: ft.Page):
self.page = page
self.current_page = None
self.navigation_history = NavigationHistory()
ErrorHandler.current_page = page # グローバル参照用
def safe_navigate(self, page_name: str, page_builder):
"""安全なページ遷移"""
try:
# 現在のページ情報を保存
current_data = {}
if self.current_page:
current_data = self._get_page_data()
self.navigation_history.add_to_history(page_name, current_data)
# 新しいページを構築
page_instance = page_builder(self)
new_page = page_instance.build()
# 安全なページ切り替え
self._safe_page_transition(new_page, page_name)
except Exception as e:
ErrorHandler.handle_error(e, f"ページ遷移エラー ({page_name})")
def _get_page_data(self) -> Dict:
"""現在のページデータを取得"""
try:
if hasattr(self.current_page, 'get_data'):
return self.current_page.get_data()
return {}
except:
return {}
def _safe_page_transition(self, new_page, page_name: str):
"""安全なページ切り替え"""
try:
# 古いコンテンツをクリア
self.page.controls.clear()
# 新しいコンテンツを追加
self.page.add(new_page)
# 現在のページを更新
self.current_page = new_page
# ページを更新
self.page.update()
logging.info(f"ページ遷移成功: {page_name}")
except Exception as e:
ErrorHandler.handle_error(e, f"ページ表示エラー ({page_name})")
def go_back(self):
"""前のページに戻る"""
try:
history = self.navigation_history.get_history()
if len(history) > 1:
# 前のページに戻る
previous_page = history[1]
# ページビルダーを呼び出し
if previous_page['page'] == 'dashboard':
self.safe_navigate('dashboard', DashboardPage)
elif previous_page['page'] == 'sales':
self.safe_navigate('sales', SalesPage)
elif previous_page['page'] == 'customers':
self.safe_navigate('customers', CustomerPage)
elif previous_page['page'] == 'products':
self.safe_navigate('products', ProductPage)
# 履歴を更新
self.navigation_history.history.pop(0) # 現在の履歴を削除
except Exception as e:
ErrorHandler.handle_error(e, "戻る処理エラー")
class DashboardPage:
"""ダッシュボードページ"""
def __init__(self, page_manager):
self.page_manager = page_manager
def build(self):
"""ダッシュボードUI構築"""
try:
# 統計データ取得
stats = self._get_statistics()
return ft.Container(
content=ft.Column([
ft.Text("ダッシュボード", size=24, weight=ft.FontWeight.BOLD),
ft.Divider(),
ft.Row([
self._stat_card("総顧客数", stats['customers'], ft.Colors.BLUE),
self._stat_card("総商品数", stats['products'], ft.Colors.GREEN),
], spacing=10),
ft.Row([
self._stat_card("総売上件数", stats['sales'], ft.Colors.ORANGE),
self._stat_card("総売上高", f"¥{stats['total_sales']:,.0f}", ft.Colors.PURPLE),
], spacing=10),
]),
padding=20
)
except Exception as e:
ErrorHandler.handle_error(e, "ダッシュボード構築エラー")
return ft.Text("ダッシュボード読み込みエラー")
def _stat_card(self, title: str, value: str, color: ft.Colors):
"""統計カード作成"""
return ft.Card(
content=ft.Container(
content=ft.Column([
ft.Text(title, size=16, color=color),
ft.Text(value, size=20, weight=ft.FontWeight.BOLD),
]),
padding=15
),
width=200
)
def _get_statistics(self) -> Dict:
"""統計データ取得"""
try:
conn = sqlite3.connect('sales.db')
cursor = conn.cursor()
# 各テーブルの件数取得
cursor.execute("SELECT COUNT(*) FROM customers")
customers = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM products")
products = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*), COALESCE(SUM(total_price), 0) FROM sales")
sales_result = cursor.fetchone()
sales_count = sales_result[0]
total_sales = sales_result[1]
conn.close()
return {
'customers': customers,
'products': products,
'sales': sales_count,
'total_sales': total_sales
}
except Exception as e:
ErrorHandler.handle_error(e, "統計データ取得エラー")
return {'customers': 0, 'products': 0, 'sales': 0, 'total_sales': 0}
def get_data(self) -> Dict:
"""ページデータ取得"""
return {'page': 'dashboard'}
class SalesPage:
"""売上管理ページ"""
def __init__(self, page_manager):
self.page_manager = page_manager
def build(self):
"""売上管理UI構築"""
try:
return ft.Container(
content=ft.Column([
ft.Text("売上管理", size=24, weight=ft.FontWeight.BOLD),
ft.Divider(),
ft.Row([
ft.TextField(label="顧客名", width=200),
ft.TextField(label="商品名", width=200),
ft.TextField(label="金額", width=150),
ft.Button("追加", bgcolor=ft.Colors.BLUE, color=ft.Colors.WHITE),
]),
ft.Divider(),
ft.Text("売上一覧", size=18),
ft.Container(
content=ft.Column([], scroll=ft.ScrollMode.AUTO),
height=300
)
]),
padding=20
)
except Exception as e:
ErrorHandler.handle_error(e, "売上管理ページ構築エラー")
return ft.Text("売上管理ページ読み込みエラー")
def get_data(self) -> Dict:
"""ページデータ取得"""
return {'page': 'sales'}
class CustomerPage:
"""顧客管理ページ"""
def __init__(self, page_manager):
self.page_manager = page_manager
def build(self):
"""顧客管理UI構築"""
try:
return ft.Container(
content=ft.Column([
ft.Text("顧客管理", size=24, weight=ft.FontWeight.BOLD),
ft.Divider(),
ft.Row([
ft.Button("新規追加", bgcolor=ft.Colors.GREEN, color=ft.Colors.WHITE),
ft.Button("一括インポート", bgcolor=ft.Colors.BLUE, color=ft.Colors.WHITE),
]),
ft.Divider(),
ft.Text("顧客一覧", size=18),
ft.Container(
content=ft.Column([], scroll=ft.ScrollMode.AUTO),
height=300
)
]),
padding=20
)
except Exception as e:
ErrorHandler.handle_error(e, "顧客管理ページ構築エラー")
return ft.Text("顧客管理ページ読み込みエラー")
def get_data(self) -> Dict:
"""ページデータ取得"""
return {'page': 'customers'}
class ProductPage:
"""商品管理ページ"""
def __init__(self, page_manager):
self.page_manager = page_manager
def build(self):
"""商品管理UI構築"""
try:
return ft.Container(
content=ft.Column([
ft.Text("商品管理", size=24, weight=ft.FontWeight.BOLD),
ft.Divider(),
ft.Row([
ft.Button("新規追加", bgcolor=ft.Colors.GREEN, color=ft.Colors.WHITE),
ft.Button("一括インポート", bgcolor=ft.Colors.BLUE, color=ft.Colors.WHITE),
]),
ft.Divider(),
ft.Text("商品一覧", size=18),
ft.Container(
content=ft.Column([], scroll=ft.ScrollMode.AUTO),
height=300
)
]),
padding=20
)
except Exception as e:
ErrorHandler.handle_error(e, "商品管理ページ構築エラー")
return ft.Text("商品管理ページ読み込みエラー")
def get_data(self) -> Dict:
"""ページデータ取得"""
return {'page': 'products'}
class SalesAssistantApp:
"""メインアプリケーション"""
def __init__(self, page: ft.Page):
self.page = page
self.page_manager = SafePageManager(page)
# ログ設定
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler()
]
)
# シグナルハンドラ設定
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
# データベース初期化
self._init_database()
self._generate_dummy_data()
# ナビゲーションバー構築
self._build_navigation()
# 初期ページ表示
self.page_manager.safe_navigate('dashboard', DashboardPage)
def _signal_handler(self, signum, frame):
"""シグナルハンドラ"""
print(f"\nシグナル {signum} を受信しました")
self._cleanup_resources()
sys.exit(0)
def _cleanup_resources(self):
"""リソースクリーンアップ"""
try:
logging.info("アプリケーション終了処理開始")
print("✅ 正常終了処理完了")
logging.info("アプリケーション正常終了")
except Exception as e:
logging.error(f"クリーンアップエラー: {e}")
print(f"❌ クリーンアップエラー: {e}")
def _init_database(self):
"""データベース初期化"""
try:
conn = sqlite3.connect('sales.db')
cursor = conn.cursor()
# 各テーブル作成
cursor.execute('''
CREATE TABLE IF NOT EXISTS customers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
phone TEXT,
email TEXT,
address TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
category TEXT,
price REAL NOT NULL,
stock INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS sales (
id INTEGER PRIMARY KEY AUTOINCREMENT,
customer_id INTEGER,
customer_name TEXT NOT NULL,
product_id INTEGER,
product_name TEXT NOT NULL,
quantity INTEGER NOT NULL,
unit_price REAL NOT NULL,
total_price REAL NOT NULL,
date TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (customer_id) REFERENCES customers(id),
FOREIGN KEY (product_id) REFERENCES products(id)
)
''')
conn.commit()
conn.close()
logging.info("データベース初期化完了")
except Exception as e:
ErrorHandler.handle_error(e, "データベース初期化エラー")
def _generate_dummy_data(self):
"""ダミーデータ生成"""
try:
conn = sqlite3.connect('sales.db')
cursor = conn.cursor()
# 既存データチェック
cursor.execute("SELECT COUNT(*) FROM customers")
if cursor.fetchone()[0] == 0:
print("📊 ダミーデータを生成中...")
# 顧客データ
customers = DummyDataGenerator.generate_customers(50)
for customer in customers:
cursor.execute('''
INSERT INTO customers (name, phone, email, address)
VALUES (?, ?, ?, ?)
''', (customer['name'], customer['phone'], customer['email'], customer['address']))
# 商品データ
products = DummyDataGenerator.generate_products(30)
for product in products:
cursor.execute('''
INSERT INTO products (name, category, price, stock)
VALUES (?, ?, ?, ?)
''', (product['name'], product['category'], product['price'], product['stock']))
# 売上データ
sales = DummyDataGenerator.generate_sales(100)
for sale in sales:
cursor.execute('''
INSERT INTO sales (customer_id, customer_name, product_id, product_name, quantity, unit_price, total_price, date)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (sale['customer_id'], sale['customer_name'], sale['product_id'],
sale['product_name'], sale['quantity'], sale['unit_price'],
sale['total_price'], sale['date']))
conn.commit()
print("✅ ダミーデータ生成完了")
conn.close()
except Exception as e:
ErrorHandler.handle_error(e, "ダミーデータ生成エラー")
def _build_navigation(self):
"""ナビゲーションバー構築"""
try:
# ナビゲーションボタン
nav_buttons = [
ft.ElevatedButton(
"ダッシュボード",
on_click=lambda _: self.page_manager.safe_navigate('dashboard', DashboardPage),
bgcolor=ft.Colors.BLUE,
color=ft.Colors.WHITE
),
ft.ElevatedButton(
"売上管理",
on_click=lambda _: self.page_manager.safe_navigate('sales', SalesPage),
bgcolor=ft.Colors.GREEN,
color=ft.Colors.WHITE
),
ft.ElevatedButton(
"顧客管理",
on_click=lambda _: self.page_manager.safe_navigate('customers', CustomerPage),
bgcolor=ft.Colors.ORANGE,
color=ft.Colors.WHITE
),
ft.ElevatedButton(
"商品管理",
on_click=lambda _: self.page_manager.safe_navigate('products', ProductPage),
bgcolor=ft.Colors.PURPLE,
color=ft.Colors.WHITE
),
ft.ElevatedButton(
"戻る",
on_click=lambda _: self.page_manager.go_back(),
bgcolor=ft.Colors.RED,
color=ft.Colors.WHITE
)
]
# ナビゲーションバー
self.page.navigation_bar = ft.NavigationBar(
destinations=[
ft.NavigationBarDestination(
icon=ft.Icons.DASHBOARD,
label="ダッシュボード"
),
ft.NavigationBarDestination(
icon=ft.Icons.SHOPPING_CART,
label="売上"
),
ft.NavigationBarDestination(
icon=ft.Icons.PEOPLE,
label="顧客"
),
ft.NavigationBarDestination(
icon=ft.Icons.INVENTORY,
label="商品"
)
],
on_change=self._on_nav_change
)
# 代替ナビゲーションNavigationBarが動かない場合
self.page.add(
ft.Container(
content=ft.Row([
ft.Button(
"ダッシュボード",
on_click=lambda _: self.page_manager.safe_navigate('dashboard', DashboardPage),
bgcolor=ft.Colors.BLUE,
color=ft.Colors.WHITE
),
ft.Button(
"売上管理",
on_click=lambda _: self.page_manager.safe_navigate('sales', SalesPage),
bgcolor=ft.Colors.GREEN,
color=ft.Colors.WHITE
),
ft.Button(
"顧客管理",
on_click=lambda _: self.page_manager.safe_navigate('customers', CustomerPage),
bgcolor=ft.Colors.ORANGE,
color=ft.Colors.WHITE
),
ft.Button(
"商品管理",
on_click=lambda _: self.page_manager.safe_navigate('products', ProductPage),
bgcolor=ft.Colors.PURPLE,
color=ft.Colors.WHITE
),
ft.Button(
"戻る",
on_click=lambda _: self.page_manager.go_back(),
bgcolor=ft.Colors.RED,
color=ft.Colors.WHITE
)
], spacing=5),
padding=10,
bgcolor=ft.Colors.GREY_100
)
)
except Exception as e:
ErrorHandler.handle_error(e, "ナビゲーション構築エラー")
def _on_nav_change(self, e):
"""ナビゲーション変更イベント"""
try:
index = e.control.selected_index
pages = [DashboardPage, SalesPage, CustomerPage, ProductPage]
page_names = ['dashboard', 'sales', 'customers', 'products']
if 0 <= index < len(pages):
self.page_manager.safe_navigate(page_names[index], pages[index])
except Exception as ex:
ErrorHandler.handle_error(ex, "ナビゲーション変更エラー")
def main(page: ft.Page):
"""メイン関数"""
try:
# ウィンドウ設定
page.title = "販売アシスト1号"
page.window_width = 800
page.window_height = 600
page.theme_mode = ft.ThemeMode.LIGHT
# ウィンドウクローズイベント
page.on_window_close = lambda _: SalesAssistantApp(page)._cleanup_resources()
# アプリケーション起動
app = SalesAssistantApp(page)
logging.info("アプリケーション起動完了")
print("🚀 頑健な販売アシスト1号起動完了")
except Exception as e:
ErrorHandler.handle_error(e, "アプリケーション起動エラー")
if __name__ == "__main__":
ft.run(main)