import flet as ft import sqlite3 import signal import sys import logging import random from datetime import datetime from typing import List, Dict, Optional class ErrorHandler: """グローバルエラーハンドラ""" @staticmethod def handle_error(error: Exception, context: str = ""): """エラーを一元処理""" error_msg = f"{context}: {str(error)}" logging.error(error_msg) print(f"❌ {error_msg}") try: if hasattr(ErrorHandler, 'current_page'): ErrorHandler.show_snackbar(ErrorHandler.current_page, error_msg, ft.Colors.RED) except: pass @staticmethod def show_snackbar(page, message: str, color: ft.Colors = ft.Colors.RED): """SnackBarを表示""" try: page.snack_bar = ft.SnackBar( content=ft.Text(message), bgcolor=color ) page.snack_bar.open = True page.update() except: pass class DummyDataGenerator: """テスト用ダミーデータ生成""" @staticmethod def generate_customers(count: int = 100) -> List[Dict]: """ダミー顧客データ生成""" first_names = ["田中", "佐藤", "鈴木", "高橋", "伊藤", "渡辺", "山本", "中村", "小林", "加藤"] last_names = ["太郎", "次郎", "三郎", "花子", "美子", "健一", "恵子", "大輔", "由美", "翔太"] customers = [] for i in range(count): name = f"{random.choice(first_names)} {random.choice(last_names)}" customers.append({ 'id': i + 1, 'name': name, 'phone': f"090-{random.randint(1000, 9999)}-{random.randint(1000, 9999)}", 'email': f"customer{i+1}@example.com", 'address': f"東京都{random.choice(['渋谷区', '新宿区', '港区', '千代田区'])}{random.randint(1, 50)}-{random.randint(1, 10)}" }) return customers @staticmethod def generate_products(count: int = 50) -> List[Dict]: """ダミー商品データ生成""" categories = ["電子機器", "衣料品", "食品", "書籍", "家具"] products = [] for i in range(count): category = random.choice(categories) products.append({ 'id': i + 1, 'name': f"{category}{i+1}", 'category': category, 'price': random.randint(100, 50000), 'stock': random.randint(0, 100) }) return products @staticmethod def generate_sales(count: int = 200) -> List[Dict]: """ダミー売上データ生成""" customers = DummyDataGenerator.generate_customers(20) products = DummyDataGenerator.generate_products(30) sales = [] for i in range(count): customer = random.choice(customers) product = random.choice(products) quantity = random.randint(1, 10) sales.append({ 'id': i + 1, 'customer_id': customer['id'], 'customer_name': customer['name'], 'product_id': product['id'], 'product_name': product['name'], 'quantity': quantity, 'unit_price': product['price'], 'total_price': quantity * product['price'], 'date': datetime.now().strftime("%Y-%m-%d"), 'created_at': datetime.now().isoformat() }) return sales class DashboardView: """ダッシュボードビュー""" def __init__(self, page: ft.Page): self._page = None # UI構築 self.title = ft.Text("ダッシュボード", size=24, weight=ft.FontWeight.BOLD, color=ft.Colors.BLUE_900) # 統計カード self.stats_row1 = ft.Row([], spacing=10) self.stats_row2 = ft.Row([], spacing=10) # キーボードショートカット表示 self.shortcuts = ft.Container( content=ft.Column([ ft.Text("キーボードショートカット", size=16, weight=ft.FontWeight.BOLD), ft.Text("1: ダッシュボード", size=14), ft.Text("2: 売上管理", size=14), ft.Text("3: 顧客管理", size=14), ft.Text("4: 商品管理", size=14), ft.Text("ESC: ダッシュボードに戻る", size=14), ]), padding=10, bgcolor=ft.Colors.GREY_100, border_radius=10 ) # データ読み込み self.load_stats() @property def page(self): return self._page @page.setter def page(self, value): self._page = value def build(self): """UIを構築して返す""" return ft.Column([ self.title, ft.Divider(), self.stats_row1, self.stats_row2, ft.Divider(), self.shortcuts ], expand=True, spacing=15) def load_stats(self): """統計データ読み込み""" try: conn = sqlite3.connect('sales.db') cursor = conn.cursor() # 各テーブルの件数取得 cursor.execute("SELECT COUNT(*) FROM customers") customers = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM products") products = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*), COALESCE(SUM(total_price), 0) FROM sales") sales_result = cursor.fetchone() sales_count = sales_result[0] total_sales = sales_result[1] conn.close() # カードを更新 self.stats_row1.controls = [ self._stat_card("総顧客数", customers, ft.Colors.BLUE), self._stat_card("総商品数", products, ft.Colors.GREEN), ] self.stats_row2.controls = [ self._stat_card("総売上件数", sales_count, ft.Colors.ORANGE), self._stat_card("総売上高", f"¥{total_sales:,.0f}", ft.Colors.PURPLE), ] except Exception as e: ErrorHandler.handle_error(e, "統計データ取得エラー") def _stat_card(self, title: str, value: str, color: ft.Colors): """統計カード作成""" return ft.Card( content=ft.Container( content=ft.Column([ ft.Text(title, size=16, color=color), ft.Text(value, size=20, weight=ft.FontWeight.BOLD), ]), padding=15 ), width=200 ) class SalesView: """売上管理ビュー""" def __init__(self, page: ft.Page): self._page = None # UI部品 self.title = ft.Text("売上管理", size=24, weight=ft.FontWeight.BOLD, color=ft.Colors.BLUE_900) self.customer_tf = ft.TextField(label="顧客名", width=200, autofocus=True) self.product_tf = ft.TextField(label="商品名", width=200) self.amount_tf = ft.TextField(label="金額", width=150, keyboard_type=ft.KeyboardType.NUMBER) self.add_btn = ft.Button("追加", on_click=self.add_sale, bgcolor=ft.Colors.BLUE, color=ft.Colors.WHITE) # 売上一覧 self.sales_list = ft.Column([], scroll=ft.ScrollMode.AUTO, height=300) # データ読み込み self.load_sales() @property def page(self): return self._page @page.setter def page(self, value): self._page = value def build(self): """UIを構築して返す""" return ft.Column([ self.title, ft.Divider(), ft.Row([self.customer_tf, self.product_tf, self.amount_tf, self.add_btn]), ft.Divider(), ft.Text("売上一覧", size=18), ft.Text("キーボード操作: TABでフォーカス移動, SPACE/ENTERで追加", size=12, color=ft.Colors.GREY_600), self.sales_list ], expand=True, spacing=15) def load_sales(self): """売上データ読み込み""" try: conn = sqlite3.connect('sales.db') cursor = conn.cursor() cursor.execute(''' SELECT customer_name, product_name, total_price, date FROM sales ORDER BY created_at DESC LIMIT 20 ''') sales = cursor.fetchall() conn.close() # リストを更新 self.sales_list.controls.clear() for sale in sales: customer, product, amount, date = sale self.sales_list.controls.append( ft.Text(f"{date}: {customer} - {product}: ¥{amount:,.0f}") ) except Exception as e: ErrorHandler.handle_error(e, "売上データ読み込みエラー") def add_sale(self, e): """売上データ追加""" if self.customer_tf.value and self.product_tf.value and self.amount_tf.value: try: # 保存前に値を取得 customer_val = self.customer_tf.value product_val = self.product_tf.value amount_val = float(self.amount_tf.value) # データベースに保存 conn = sqlite3.connect('sales.db') cursor = conn.cursor() cursor.execute(''' INSERT INTO sales (customer_name, product_name, quantity, unit_price, total_price, date) VALUES (?, ?, ?, ?, ?, ?) ''', (customer_val, product_val, 1, amount_val, amount_val, datetime.now().strftime("%Y-%m-%d"))) conn.commit() conn.close() # フィールドをクリア self.customer_tf.value = "" self.product_tf.value = "" self.amount_tf.value = "" # リスト更新 self.load_sales() # 成功メッセージ ErrorHandler.show_snackbar(self.page, "保存しました", ft.Colors.GREEN) logging.info(f"売上データ追加: {customer_val} {product_val} {amount_val}") except Exception as ex: ErrorHandler.handle_error(ex, "売上データ保存エラー") class CustomerView: """顧客管理ビュー""" def __init__(self, page: ft.Page): self._page = None # UI部品 self.title = ft.Text("顧客管理", size=24, weight=ft.FontWeight.BOLD, color=ft.Colors.BLUE_900) # 登録フォーム self.name_tf = ft.TextField(label="顧客名", width=200, autofocus=True) self.phone_tf = ft.TextField(label="電話番号", width=200) self.email_tf = ft.TextField(label="メールアドレス", width=250) self.address_tf = ft.TextField(label="住所", width=300) self.add_btn = ft.Button("新規追加", on_click=self.add_customer, bgcolor=ft.Colors.GREEN, color=ft.Colors.WHITE) self.import_btn = ft.Button("一括インポート", bgcolor=ft.Colors.BLUE, color=ft.Colors.WHITE) # 顧客一覧 self.customer_list = ft.Column([], scroll=ft.ScrollMode.AUTO, height=250) # データ読み込み self.load_customers() @property def page(self): return self._page @page.setter def page(self, value): self._page = value def build(self): """UIを構築して返す""" return ft.Column([ self.title, ft.Divider(), ft.Text("新規登録", size=18, weight=ft.FontWeight.BOLD), ft.Row([self.name_tf, self.phone_tf], spacing=10), ft.Row([self.email_tf, self.address_tf], spacing=10), ft.Row([self.add_btn, self.import_btn], spacing=10), ft.Divider(), ft.Text("顧客一覧", size=18), ft.Text("キーボード操作: TABでフォーカス移動, SPACE/ENTERで追加", size=12, color=ft.Colors.GREY_600), self.customer_list ], expand=True, spacing=15) def add_customer(self, e): """顧客データ追加""" if self.name_tf.value: try: conn = sqlite3.connect('sales.db') cursor = conn.cursor() cursor.execute(''' INSERT INTO customers (name, phone, email, address) VALUES (?, ?, ?, ?) ''', (self.name_tf.value, self.phone_tf.value, self.email_tf.value, self.address_tf.value)) conn.commit() conn.close() # フィールドをクリア self.name_tf.value = "" self.phone_tf.value = "" self.email_tf.value = "" self.address_tf.value = "" # リスト更新 self.load_customers() # 成功メッセージ ErrorHandler.show_snackbar(self.page, "顧客を追加しました", ft.Colors.GREEN) logging.info(f"顧客データ追加: {self.name_tf.value}") except Exception as ex: ErrorHandler.handle_error(ex, "顧客データ保存エラー") def load_customers(self): """顧客データ読み込み""" try: conn = sqlite3.connect('sales.db') cursor = conn.cursor() cursor.execute(''' SELECT name, phone, email FROM customers ORDER BY name LIMIT 20 ''') customers = cursor.fetchall() conn.close() # リストを更新 self.customer_list.controls.clear() for customer in customers: name, phone, email = customer self.customer_list.controls.append( ft.Container( content=ft.Column([ ft.Text(f"氏名: {name}", weight=ft.FontWeight.BOLD), ft.Text(f"電話: {phone}", size=12), ft.Text(f"メール: {email}", size=12), ]), padding=10, bgcolor=ft.Colors.GREY_50, border_radius=5, margin=ft.margin.only(bottom=5) ) ) except Exception as e: ErrorHandler.handle_error(e, "顧客データ読み込みエラー") class ProductView: """商品管理ビュー""" def __init__(self, page: ft.Page): self._page = None # UI部品 self.title = ft.Text("商品管理", size=24, weight=ft.FontWeight.BOLD, color=ft.Colors.BLUE_900) # 登録フォーム self.name_tf = ft.TextField(label="商品名", width=200, autofocus=True) self.category_tf = ft.TextField(label="カテゴリ", width=150) self.price_tf = ft.TextField(label="価格", width=100, keyboard_type=ft.KeyboardType.NUMBER) self.stock_tf = ft.TextField(label="在庫数", width=100, keyboard_type=ft.KeyboardType.NUMBER) self.add_btn = ft.Button("新規追加", on_click=self.add_product, bgcolor=ft.Colors.GREEN, color=ft.Colors.WHITE) self.import_btn = ft.Button("一括インポート", bgcolor=ft.Colors.BLUE, color=ft.Colors.WHITE) # 商品一覧 self.product_list = ft.Column([], scroll=ft.ScrollMode.AUTO, height=250) # データ読み込み self.load_products() @property def page(self): return self._page @page.setter def page(self, value): self._page = value def build(self): """UIを構築して返す""" return ft.Column([ self.title, ft.Divider(), ft.Text("新規登録", size=18, weight=ft.FontWeight.BOLD), ft.Row([self.name_tf, self.category_tf, self.price_tf, self.stock_tf], spacing=10), ft.Row([self.add_btn, self.import_btn], spacing=10), ft.Divider(), ft.Text("商品一覧", size=18), ft.Text("キーボード操作: TABでフォーカス移動, SPACE/ENTERで追加", size=12, color=ft.Colors.GREY_600), self.product_list ], expand=True, spacing=15) def add_product(self, e): """商品データ追加""" if self.name_tf.value and self.price_tf.value: try: conn = sqlite3.connect('sales.db') cursor = conn.cursor() cursor.execute(''' INSERT INTO products (name, category, price, stock) VALUES (?, ?, ?, ?) ''', (self.name_tf.value, self.category_tf.value, float(self.price_tf.value), int(self.stock_tf.value) if self.stock_tf.value else 0)) conn.commit() conn.close() # フィールドをクリア self.name_tf.value = "" self.category_tf.value = "" self.price_tf.value = "" self.stock_tf.value = "" # リスト更新 self.load_products() # 成功メッセージ ErrorHandler.show_snackbar(self.page, "商品を追加しました", ft.Colors.GREEN) logging.info(f"商品データ追加: {self.name_tf.value}") except Exception as ex: ErrorHandler.handle_error(ex, "商品データ保存エラー") def load_products(self): """商品データ読み込み""" try: conn = sqlite3.connect('sales.db') cursor = conn.cursor() cursor.execute(''' SELECT name, category, price, stock FROM products ORDER BY name LIMIT 20 ''') products = cursor.fetchall() conn.close() # リストを更新 self.product_list.controls.clear() for product in products: name, category, price, stock = product self.product_list.controls.append( ft.Container( content=ft.Column([ ft.Text(f"商品名: {name}", weight=ft.FontWeight.BOLD), ft.Text(f"カテゴリ: {category}", size=12), ft.Text(f"価格: ¥{price:,}", size=12), ft.Text(f"在庫: {stock}個", size=12), ]), padding=10, bgcolor=ft.Colors.GREY_50, border_radius=5, margin=ft.margin.only(bottom=5) ) ) except Exception as e: ErrorHandler.handle_error(e, "商品データ読み込みエラー") class SalesAssistantApp: """メインアプリケーション""" def __init__(self, page: ft.Page): self.page = page ErrorHandler.current_page = page # ログ設定 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('app.log'), logging.StreamHandler() ] ) # シグナルハンドラ設定 signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) # データベース初期化 self._init_database() self._generate_dummy_data() # ビュー作成 self.dashboard_view = DashboardView(page) self.sales_view = SalesView(page) self.customer_view = CustomerView(page) self.product_view = ProductView(page) # ナビゲーション設定 self._build_navigation() # キーボードショートカット設定 self._setup_keyboard_shortcuts() # 初期表示 self.dashboard_container.visible = True self.sales_container.visible = False self.customer_container.visible = False self.product_container.visible = False logging.info("アプリケーション起動完了") print("🚀 Compiz対応販売アシスト1号起動完了") def _signal_handler(self, signum, frame): """シグナルハンドラ""" print(f"\nシグナル {signum} を受信しました") self._cleanup_resources() sys.exit(0) def _cleanup_resources(self): """リソースクリーンアップ""" try: logging.info("アプリケーション終了処理開始") print("✅ 正常終了処理完了") logging.info("アプリケーション正常終了") except Exception as e: logging.error(f"クリーンアップエラー: {e}") print(f"❌ クリーンアップエラー: {e}") def _init_database(self): """データベース初期化""" try: conn = sqlite3.connect('sales.db') cursor = conn.cursor() # 各テーブル作成 cursor.execute(''' CREATE TABLE IF NOT EXISTS customers ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, phone TEXT, email TEXT, address TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS products ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, category TEXT, price REAL NOT NULL, stock INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS sales ( id INTEGER PRIMARY KEY AUTOINCREMENT, customer_name TEXT NOT NULL, product_name TEXT NOT NULL, quantity INTEGER NOT NULL, unit_price REAL NOT NULL, total_price REAL NOT NULL, date TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') conn.commit() conn.close() logging.info("データベース初期化完了") except Exception as e: ErrorHandler.handle_error(e, "データベース初期化エラー") def _generate_dummy_data(self): """ダミーデータ生成""" try: conn = sqlite3.connect('sales.db') cursor = conn.cursor() # 既存データチェック cursor.execute("SELECT COUNT(*) FROM customers") if cursor.fetchone()[0] == 0: print("📊 ダミーデータを生成中...") # 顧客データ customers = DummyDataGenerator.generate_customers(50) for customer in customers: cursor.execute(''' INSERT INTO customers (name, phone, email, address) VALUES (?, ?, ?, ?) ''', (customer['name'], customer['phone'], customer['email'], customer['address'])) # 商品データ products = DummyDataGenerator.generate_products(30) for product in products: cursor.execute(''' INSERT INTO products (name, category, price, stock) VALUES (?, ?, ?, ?) ''', (product['name'], product['category'], product['price'], product['stock'])) # 売上データ sales = DummyDataGenerator.generate_sales(100) for sale in sales: cursor.execute(''' INSERT INTO sales (customer_name, product_name, quantity, unit_price, total_price, date) VALUES (?, ?, ?, ?, ?, ?) ''', (sale['customer_name'], sale['product_name'], sale['quantity'], sale['unit_price'], sale['total_price'], sale['date'])) conn.commit() print("✅ ダミーデータ生成完了") conn.close() except Exception as e: ErrorHandler.handle_error(e, "ダミーデータ生成エラー") def _build_navigation(self): """ナビゲーション構築""" try: # NavigationBar(Compiz対応) self.page.navigation_bar = ft.NavigationBar( selected_index=0, # 最初はダッシュボード destinations=[ ft.NavigationBarDestination( icon=ft.Icons.DASHBOARD, label="ダッシュボード" ), ft.NavigationBarDestination( icon=ft.Icons.SHOPPING_CART, label="売上" ), ft.NavigationBarDestination( icon=ft.Icons.PEOPLE, label="顧客" ), ft.NavigationBarDestination( icon=ft.Icons.INVENTORY, label="商品" ) ], on_change=self._on_nav_change ) # コンテナ作成 self.dashboard_container = ft.Container( content=self.dashboard_view.build(), visible=True ) self.sales_container = ft.Container( content=self.sales_view.build(), visible=False ) self.customer_container = ft.Container( content=self.customer_view.build(), visible=False ) self.product_container = ft.Container( content=self.product_view.build(), visible=False ) # 全てのコンテナをページに追加 self.page.add( ft.Container( content=ft.Column([ self.dashboard_container, self.sales_container, self.customer_container, self.product_container ], expand=True), padding=10, expand=True ) ) except Exception as e: ErrorHandler.handle_error(e, "ナビゲーション構築エラー") def _on_nav_change(self, e): """ナビゲーション変更イベント""" try: index = e.control.selected_index # 全てのコンテナを非表示 self.dashboard_container.visible = False self.sales_container.visible = False self.customer_container.visible = False self.product_container.visible = False # 選択されたコンテナを表示 if index == 0: self.dashboard_container.visible = True self.dashboard_view.load_stats() # データ更新 elif index == 1: self.sales_container.visible = True self.sales_view.load_sales() # データ更新 elif index == 2: self.customer_container.visible = True self.customer_view.load_customers() # データ更新 elif index == 3: self.product_container.visible = True self.product_view.load_products() # データ更新 self.page.update() logging.info(f"ページ遷移: index={index}") except Exception as ex: ErrorHandler.handle_error(ex, "ナビゲーション変更エラー") def _setup_keyboard_shortcuts(self): """キーボードショートカット設定""" try: def on_keyboard(e: ft.KeyboardEvent): # SPACEまたはENTERで追加 if e.key == " " or e.key == "Enter": if self.sales_container.visible: self.sales_view.add_sale(None) elif self.customer_container.visible: self.customer_view.add_customer(None) elif self.product_container.visible: self.product_view.add_product(None) # 数字キーでページ遷移 elif e.key == "1": self.page.navigation_bar.selected_index = 0 self._on_nav_change(ft.ControlEvent(control=self.page.navigation_bar)) elif e.key == "2": self.page.navigation_bar.selected_index = 1 self._on_nav_change(ft.ControlEvent(control=self.page.navigation_bar)) elif e.key == "3": self.page.navigation_bar.selected_index = 2 self._on_nav_change(ft.ControlEvent(control=self.page.navigation_bar)) elif e.key == "4": self.page.navigation_bar.selected_index = 3 self._on_nav_change(ft.ControlEvent(control=self.page.navigation_bar)) # ESCでダッシュボードに戻る elif e.key == "Escape": self.page.navigation_bar.selected_index = 0 self._on_nav_change(ft.ControlEvent(control=self.page.navigation_bar)) self.page.on_keyboard_event = on_keyboard logging.info("キーボードショートカット設定完了") except Exception as e: ErrorHandler.handle_error(e, "キーボードショートカット設定エラー") def main(page: ft.Page): """メイン関数""" try: # ウィンドウ設定 page.title = "販売アシスト1号 (Compiz対応)" page.window_width = 800 page.window_height = 600 page.theme_mode = ft.ThemeMode.LIGHT # ウィンドウクローズイベント page.on_window_close = lambda _: SalesAssistantApp(page)._cleanup_resources() # アプリケーション起動 app = SalesAssistantApp(page) except Exception as e: ErrorHandler.handle_error(e, "アプリケーション起動エラー") if __name__ == "__main__": ft.run(main)