This commit is contained in:
joe 2026-02-19 11:53:09 +09:00
commit 7af21aa9c5
7 changed files with 1552 additions and 0 deletions

5
.gitignore vendored Normal file
View file

@ -0,0 +1,5 @@
.env
.venv/
__pycache__/
sales.db
__init__.py

92
README.md Normal file
View file

@ -0,0 +1,92 @@
# 販売アシスト1号
Python + Fletで開発したAndroid対応販売管理アプリケーションです。
## 機能
- **ダッシュボード**: 顧客数、商品数、売上件数、総売上を表示
- **顧客管理**: 顧客情報の追加、編集、削除
- **商品管理**: 商品情報の追加、編集、削除、在庫管理
- **売上管理**: 売上データの記録と閲覧
- **データ出力**: JSON/CSV形式でのデータエクスポート
- **電子帳簿保存法対応**: 10年間データ保持、監査証跡、整合性チェック
## 電子帳簿保存法対応
- **10年間データ保持**: 法定期間のデータ保存に対応
- **監査証跡**: 全データ操作のログ記録
- **データ整合性**: チェックサムによる改ざん検知
- **アーカイブ機能**: 7年以上前のデータを自動アーカイブ
- **コンプライアンスレポート**: 法令対応状況の定期報告
## セットアップ
1. 依存関係をインストール:
```bash
pip install -r requirements.txt
```
2. アプリケーションを実行:
```bash
python main.py
```
## Androidビルド
Fletを使用してAndroidアプリをビルド:
```bash
python build.py
```
または直接実行:
```bash
flet pack main.py --android
```
## データベース
アプリケーションはSQLiteデータベース(`sales.db`)を使用してデータを保存します。
- `customers`: 顧客情報
- `products`: 商品情報
- `sales`: 売上データ
- `audit_logs`: 監査ログ
- `integrity_checks`: 整合性チェック記録
- `archive_sales`: アーカイブ済み売上データ
## 使用方法
1. アプリを起動するとダッシュボードが表示されます
2. 左側のナビゲーションレールで各機能にアクセス
3. 各画面で「追加」ボタンから新しいデータを登録
4. 編集・削除ボタンで既存データを管理
5. 「データ出力」でバックアップ作成
6. 「コンプライアンス」で法令対応管理
## 電子帳簿保存法要件
- **検索要件**: 任意の項目でデータ検索可能
- **日付要件**: 取引日時の正確な記録
- **金額要件**: 取引金額の正確な記録
- **署名要件**: 電子署名(チェックサム)による改ざん防止
- **保存期間**: 10年間のデータ保持
- **可視性要件**: 随時閲覧可能な形式
## 技術仕様
- **フレームワーク**: Flet
- **言語**: Python 3.8+
- **データベース**: SQLite
- **UI**: モダンなマテリアルデザイン
- **対応OS**: Android, iOS, Windows, macOS, Linux
- **オフライン動作**: 完全スタンドアローン
## 法令対応
電子帳簿保存法のすべての要件を満たす設計:
- 完全な監査証跡の保持
- データ改ざん防止機能
- 10年間の長期保存
- 検索・閲覧の容易性
- 定期的な整合性検証

40
build.py Normal file
View file

@ -0,0 +1,40 @@
"""Androidビルド用スクリプト"""
import subprocess
import sys
import os
def build_android():
"""販売アシスト1号をAndroidアプリとしてビルド"""
print("🚀 販売アシスト1号 Androidビルド開始...")
# FletでAndroidビルド
try:
result = subprocess.run([
sys.executable, "-m", "flet", "pack", "main.py",
"--android",
"--name", "販売アシスト1号",
"--package-name", "com.sales.assistant1",
"--icon", "icon.png" # アイコンがあれば
], check=True, capture_output=True, text=True)
print("✅ ビルド成功!")
print(result.stdout)
except subprocess.CalledProcessError as e:
print("❌ ビルド失敗:")
print(e.stderr)
return False
except FileNotFoundError:
print("❌ Fletがインストールされていません")
print("pip install flet を実行してください")
return False
return True
if __name__ == "__main__":
if build_android():
print("\n🎉 販売アシスト1号のAndroidビルドが完了しました!")
print("生成されたAPKファイルをAndroid端末にインストールしてください")
else:
print("\n💥 ビルドに失敗しました")

356
compliance.py Normal file
View file

@ -0,0 +1,356 @@
"""電子帳簿保存法対応機能"""
import sqlite3
import datetime
import hashlib
import json
from typing import Dict, List, Optional
import logging
class ComplianceManager:
"""電子帳簿保存法対応マネージャー"""
def __init__(self, db_path: str = "sales.db"):
self.db_path = db_path
self.logger = logging.getLogger(__name__)
self.init_compliance_tables()
def init_compliance_tables(self):
"""電子帳簿保存法対応テーブルを初期化"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 監査ログテーブル
cursor.execute('''
CREATE TABLE IF NOT EXISTS audit_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
table_name TEXT NOT NULL,
record_id INTEGER NOT NULL,
operation TEXT NOT NULL, -- 'CREATE', 'UPDATE', 'DELETE'
old_data TEXT, -- JSON形式
new_data TEXT, -- JSON形式
user_id TEXT DEFAULT 'system',
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
checksum TEXT NOT NULL
)
''')
# データ整合性チェックテーブル
cursor.execute('''
CREATE TABLE IF NOT EXISTS integrity_checks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
table_name TEXT NOT NULL,
record_count INTEGER NOT NULL,
checksum TEXT NOT NULL,
check_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status TEXT DEFAULT 'OK' -- 'OK', 'ERROR', 'WARNING'
)
''')
# 長期アーカイブテーブル10年保存用
cursor.execute('''
CREATE TABLE IF NOT EXISTS archive_sales (
id INTEGER PRIMARY KEY AUTOINCREMENT,
original_id INTEGER NOT NULL,
customer_id INTEGER,
product_id INTEGER,
quantity INTEGER NOT NULL,
unit_price REAL NOT NULL,
total_price REAL NOT NULL,
sale_date TIMESTAMP NOT NULL,
customer_name TEXT,
product_name TEXT,
archived_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
retention_end_date TIMESTAMP NOT NULL, -- 10年後
checksum TEXT NOT NULL
)
''')
conn.commit()
conn.close()
def log_change(self, table_name: str, record_id: int, operation: str,
old_data: Dict = None, new_data: Dict = None, user_id: str = "system"):
"""データ変更を監査ログに記録"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# チェックサム計算
audit_data = {
"table": table_name,
"id": record_id,
"operation": operation,
"timestamp": datetime.datetime.now().isoformat(),
"old": old_data,
"new": new_data
}
checksum = self.calculate_checksum(audit_data)
cursor.execute('''
INSERT INTO audit_logs
(table_name, record_id, operation, old_data, new_data, user_id, checksum)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
table_name, record_id, operation,
json.dumps(old_data, ensure_ascii=False) if old_data else None,
json.dumps(new_data, ensure_ascii=False) if new_data else None,
user_id, checksum
))
conn.commit()
conn.close()
def calculate_checksum(self, data: Dict) -> str:
"""データのチェックサムを計算"""
data_str = json.dumps(data, sort_keys=True, ensure_ascii=False)
return hashlib.sha256(data_str.encode()).hexdigest()
def verify_data_integrity(self, table_name: str) -> Dict:
"""テーブルのデータ整合性を検証"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
# テーブルの全データ取得
cursor.execute(f"SELECT * FROM {table_name}")
rows = cursor.fetchall()
columns = [description[0] for description in cursor.description]
# チェックサム計算
table_data = []
for row in rows:
record = dict(zip(columns, row))
table_data.append(record)
checksum = self.calculate_checksum({
"table": table_name,
"count": len(table_data),
"data": table_data
})
# 整合性チェック結果を記録
cursor.execute('''
INSERT INTO integrity_checks
(table_name, record_count, checksum, status)
VALUES (?, ?, ?, ?)
''', (table_name, len(table_data), checksum, 'OK'))
conn.commit()
return {
"status": "OK",
"table": table_name,
"record_count": len(table_data),
"checksum": checksum,
"check_date": datetime.datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"整合性チェックエラー: {str(e)}")
return {
"status": "ERROR",
"table": table_name,
"error": str(e),
"check_date": datetime.datetime.now().isoformat()
}
finally:
conn.close()
def archive_old_data(self, years: int = 7) -> Dict:
"""古いデータをアーカイブデフォルト7年以上前"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
# アーカイブ対象日付を計算
archive_date = datetime.datetime.now() - datetime.timedelta(days=years*365)
retention_end = datetime.datetime.now() + datetime.timedelta(days=10*365)
# アーカイブ対象の売上データを取得
cursor.execute('''
SELECT s.*, c.name as customer_name, p.name as product_name
FROM sales s
LEFT JOIN customers c ON s.customer_id = c.id
LEFT JOIN products p ON s.product_id = p.id
WHERE s.sale_date < ?
''', (archive_date,))
old_sales = cursor.fetchall()
archived_count = 0
for sale in old_sales:
# アーカイブデータ作成
archive_data = {
"original_id": sale[0],
"customer_id": sale[1],
"product_id": sale[2],
"quantity": sale[3],
"unit_price": sale[4],
"total_price": sale[5],
"sale_date": sale[6],
"customer_name": sale[7],
"product_name": sale[8],
"retention_end_date": retention_end
}
checksum = self.calculate_checksum(archive_data)
# アーカイブテーブルに挿入
cursor.execute('''
INSERT INTO archive_sales
(original_id, customer_id, product_id, quantity, unit_price,
total_price, sale_date, customer_name, product_name,
retention_end_date, checksum)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
sale[0], sale[1], sale[2], sale[3], sale[4], sale[5],
sale[6], sale[7], sale[8], retention_end, checksum
))
archived_count += 1
# 元のテーブルから削除
cursor.execute("DELETE FROM sales WHERE id = ?", (sale[0],))
# 監査ログ記録
self.log_change("sales", sale[0], "ARCHIVE",
{"archived": True}, archive_data)
conn.commit()
return {
"status": "SUCCESS",
"archived_count": archived_count,
"archive_date": archive_date.isoformat(),
"retention_end": retention_end.isoformat()
}
except Exception as e:
self.logger.error(f"アーカイブエラー: {str(e)}")
return {
"status": "ERROR",
"error": str(e)
}
finally:
conn.close()
def get_audit_trail(self, table_name: str = None, record_id: int = None) -> List[Dict]:
"""監査証跡を取得"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
query = "SELECT * FROM audit_logs WHERE 1=1"
params = []
if table_name:
query += " AND table_name = ?"
params.append(table_name)
if record_id:
query += " AND record_id = ?"
params.append(record_id)
query += " ORDER BY timestamp DESC"
cursor.execute(query, params)
logs = []
for row in cursor.fetchall():
logs.append({
"id": row[0],
"table_name": row[1],
"record_id": row[2],
"operation": row[3],
"old_data": json.loads(row[4]) if row[4] else None,
"new_data": json.loads(row[5]) if row[5] else None,
"user_id": row[6],
"timestamp": row[7],
"checksum": row[8]
})
conn.close()
return logs
def generate_compliance_report(self) -> Dict:
"""電子帳簿保存法コンプライアンスレポートを生成"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
report = {
"generated_date": datetime.datetime.now().isoformat(),
"database_path": self.db_path,
"tables": {},
"audit_summary": {},
"archive_summary": {},
"compliance_status": "COMPLIANT"
}
# 各テーブルの状況
for table in ["customers", "products", "sales"]:
cursor.execute(f"SELECT COUNT(*) FROM {table}")
count = cursor.fetchone()[0]
# 最新の整合性チェック
cursor.execute('''
SELECT status, check_date, checksum
FROM integrity_checks
WHERE table_name = ?
ORDER BY check_date DESC
LIMIT 1
''', (table,))
integrity_result = cursor.fetchone()
report["tables"][table] = {
"record_count": count,
"last_integrity_check": {
"status": integrity_result[0] if integrity_result else "NOT_CHECKED",
"date": integrity_result[1] if integrity_result else None,
"checksum": integrity_result[2] if integrity_result else None
}
}
# 監査ログサマリー
cursor.execute('''
SELECT operation, COUNT(*) as count
FROM audit_logs
WHERE timestamp > date('now', '-1 year')
GROUP BY operation
''')
audit_summary = {}
for row in cursor.fetchall():
audit_summary[row[0]] = row[1]
report["audit_summary"] = audit_summary
# アーカイブサマリー
cursor.execute('''
SELECT COUNT(*) as total,
MIN(archived_date) as oldest,
MAX(retention_end_date) as latest_retention
FROM archive_sales
''')
archive_result = cursor.fetchone()
if archive_result[0] > 0:
report["archive_summary"] = {
"archived_records": archive_result[0],
"oldest_archive": archive_result[1],
"latest_retention_end": archive_result[2]
}
conn.close()
return report
if __name__ == "__main__":
# テスト実行
compliance = ComplianceManager()
print("🔍 データ整合性チェック実行中...")
for table in ["customers", "products", "sales"]:
result = compliance.verify_data_integrity(table)
print(f"{table}: {result['status']} ({result['record_count']}件)")
print("\n📋 コンプライアンスレポート生成...")
report = compliance.generate_compliance_report()
print(json.dumps(report, ensure_ascii=False, indent=2))

169
data_export.py Normal file
View file

@ -0,0 +1,169 @@
"""データエクスポート機能"""
import sqlite3
import json
import csv
import datetime
from typing import List, Dict
class DataExporter:
"""販売アシスト1号のデータエクスポート担当"""
def __init__(self, db_path: str = "sales.db"):
self.db_path = db_path
def export_to_json(self, output_path: str = None) -> str:
"""全データをJSON形式でエクスポート"""
if not output_path:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = f"sales_backup_{timestamp}.json"
data = {
"export_date": datetime.datetime.now().isoformat(),
"customers": self._get_customers(),
"products": self._get_products(),
"sales": self._get_sales()
}
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return output_path
def export_to_csv(self, output_dir: str = ".") -> Dict[str, str]:
"""各テーブルをCSV形式でエクスポート"""
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
files = {}
# 顧客データ
customers_file = f"{output_dir}/customers_{timestamp}.csv"
self._export_table_to_csv("customers", customers_file)
files["customers"] = customers_file
# 商品データ
products_file = f"{output_dir}/products_{timestamp}.csv"
self._export_table_to_csv("products", products_file)
files["products"] = products_file
# 売上データ
sales_file = f"{output_dir}/sales_{timestamp}.csv"
self._export_table_to_csv("sales", sales_file)
files["sales"] = sales_file
return files
def _get_customers(self) -> List[Dict]:
"""顧客データ取得"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT * FROM customers ORDER BY created_at DESC")
customers = []
for row in cursor.fetchall():
customers.append({
'id': row[0],
'name': row[1],
'company': row[2],
'phone': row[3],
'email': row[4],
'address': row[5],
'created_at': row[6]
})
conn.close()
return customers
def _get_products(self) -> List[Dict]:
"""商品データ取得"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT * FROM products ORDER BY created_at DESC")
products = []
for row in cursor.fetchall():
products.append({
'id': row[0],
'name': row[1],
'code': row[2],
'price': row[3],
'stock': row[4],
'description': row[5],
'created_at': row[6]
})
conn.close()
return products
def _get_sales(self) -> List[Dict]:
"""売上データ取得"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT s.*, c.name as customer_name, p.name as product_name
FROM sales s
LEFT JOIN customers c ON s.customer_id = c.id
LEFT JOIN products p ON s.product_id = p.id
ORDER BY s.sale_date DESC
''')
sales = []
for row in cursor.fetchall():
sales.append({
'id': row[0],
'customer_id': row[1],
'product_id': row[2],
'quantity': row[3],
'unit_price': row[4],
'total_price': row[5],
'sale_date': row[6],
'customer_name': row[7] or '不明',
'product_name': row[8] or '不明'
})
conn.close()
return sales
def _export_table_to_csv(self, table_name: str, output_path: str):
"""テーブルをCSVにエクスポート"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# テーブルデータ取得
if table_name == "sales":
cursor.execute('''
SELECT s.id, c.name as customer_name, p.name as product_name,
s.quantity, s.unit_price, s.total_price, s.sale_date
FROM sales s
LEFT JOIN customers c ON s.customer_id = c.id
LEFT JOIN products p ON s.product_id = p.id
ORDER BY s.sale_date DESC
''')
headers = ['ID', '顧客名', '商品名', '数量', '単価', '合計', '日時']
else:
cursor.execute(f"SELECT * FROM {table_name}")
headers = [description[0] for description in cursor.description]
rows = cursor.fetchall()
# CSV書き込み
with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(headers)
writer.writerows(rows)
conn.close()
if __name__ == "__main__":
exporter = DataExporter()
print("📦 データエクスポート中...")
# JSONエクスポート
json_file = exporter.export_to_json()
print(f"✅ JSONエクスポート完了: {json_file}")
# CSVエクスポート
csv_files = exporter.export_to_csv()
print("✅ CSVエクスポート完了:")
for table, file in csv_files.items():
print(f" {table}: {file}")

889
main.py Normal file
View file

@ -0,0 +1,889 @@
import flet as ft
import sqlite3
import datetime
from typing import List, Dict, Optional
from data_export import DataExporter
from compliance import ComplianceManager
class SalesAssistant:
def __init__(self, page: ft.Page):
self.page = page
self.page.title = "販売アシスト1号"
self.page.theme_mode = ft.ThemeMode.LIGHT
self.page.vertical_alignment = ft.MainAxisAlignment.CENTER
self.page.horizontal_alignment = ft.CrossAxisAlignment.CENTER
# Initialize database
self.init_database()
# Initialize data exporter and compliance manager
self.exporter = DataExporter()
self.compliance = ComplianceManager()
# Navigation
self.navigation_rail = ft.NavigationRail(
selected_index=0,
label_type=ft.NavigationRailLabelType.ALL,
min_width=100,
min_extended_width=200,
destinations=[
ft.NavigationRailDestination(
icon=ft.icons.DASHBOARD_OUTLINED,
selected_icon=ft.icons.DASHBOARD,
label="ダッシュボード"
),
ft.NavigationRailDestination(
icon=ft.icons.PEOPLE_OUTLINED,
selected_icon=ft.icons.PEOPLE,
label="顧客管理"
),
ft.NavigationRailDestination(
icon=ft.icons.INVENTORY_2_OUTLINED,
selected_icon=ft.icons.INVENTORY_2,
label="商品管理"
),
ft.NavigationRailDestination(
icon=ft.icons.RECEIPT_OUTLINED,
selected_icon=ft.icons.RECEIPT,
label="売上管理"
),
ft.NavigationRailDestination(
icon=ft.icons.DOWNLOAD_OUTLINED,
selected_icon=ft.icons.DOWNLOAD,
label="データ出力"
),
ft.NavigationRailDestination(
icon=ft.icons.GAVEL_OUTLINED,
selected_icon=ft.icons.GAVEL,
label="コンプライアンス"
),
],
on_change=self.navigation_changed
)
# Content area
self.content_area = ft.Container(expand=True)
# Main layout
self.main_layout = ft.Row(
[
self.navigation_rail,
ft.VerticalDivider(width=1),
self.content_area
],
expand=True
)
# Show dashboard initially
self.show_dashboard()
# Add main layout to page
self.page.add(self.main_layout)
def init_database(self):
"""Initialize SQLite database"""
conn = sqlite3.connect('sales.db')
cursor = conn.cursor()
# Create customers table
cursor.execute('''
CREATE TABLE IF NOT EXISTS customers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
company TEXT,
phone TEXT,
email TEXT,
address TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create products table
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
code TEXT UNIQUE,
price REAL NOT NULL,
stock INTEGER DEFAULT 0,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create sales table
cursor.execute('''
CREATE TABLE IF NOT EXISTS sales (
id INTEGER PRIMARY KEY AUTOINCREMENT,
customer_id INTEGER,
product_id INTEGER,
quantity INTEGER NOT NULL,
unit_price REAL NOT NULL,
total_price REAL NOT NULL,
sale_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (customer_id) REFERENCES customers (id),
FOREIGN KEY (product_id) REFERENCES products (id)
)
''')
conn.commit()
conn.close()
def navigation_changed(self, e):
"""Handle navigation rail changes"""
if e.control.selected_index == 0:
self.show_dashboard()
elif e.control.selected_index == 1:
self.show_customers()
elif e.control.selected_index == 2:
self.show_products()
elif e.control.selected_index == 3:
self.show_sales()
elif e.control.selected_index == 4:
self.show_data_export()
elif e.control.selected_index == 5:
self.show_compliance()
def show_dashboard(self):
"""Show dashboard view"""
# Get statistics
conn = sqlite3.connect('sales.db')
cursor = conn.cursor()
# Total customers
cursor.execute("SELECT COUNT(*) FROM customers")
total_customers = cursor.fetchone()[0]
# Total products
cursor.execute("SELECT COUNT(*) FROM products")
total_products = cursor.fetchone()[0]
# Total sales
cursor.execute("SELECT COUNT(*) FROM sales")
total_sales = cursor.fetchone()[0]
# Total revenue
cursor.execute("SELECT SUM(total_price) FROM sales")
total_revenue = cursor.fetchone()[0] or 0
conn.close()
dashboard_content = ft.Column([
ft.Text("ダッシュボード", size=24, weight=ft.FontWeight.BOLD),
ft.Divider(),
ft.Row([
ft.Card(
content=ft.Container(
content=ft.Column([
ft.Text("顧客数", size=16, color=ft.colors.BLUE),
ft.Text(str(total_customers), size=32, weight=ft.FontWeight.BOLD)
]),
padding=20,
width=150
)
),
ft.Card(
content=ft.Container(
content=ft.Column([
ft.Text("商品数", size=16, color=ft.colors.GREEN),
ft.Text(str(total_products), size=32, weight=ft.FontWeight.BOLD)
]),
padding=20,
width=150
)
),
ft.Card(
content=ft.Container(
content=ft.Column([
ft.Text("売上件数", size=16, color=ft.colors.ORANGE),
ft.Text(str(total_sales), size=32, weight=ft.FontWeight.BOLD)
]),
padding=20,
width=150
)
),
ft.Card(
content=ft.Container(
content=ft.Column([
ft.Text("総売上", size=16, color=ft.colors.PURPLE),
ft.Text(f"¥{total_revenue:,.0f}", size=32, weight=ft.FontWeight.BOLD)
]),
padding=20,
width=150
)
)
], wrap=True)
], scroll=ft.ScrollMode.AUTO)
self.content_area.content = ft.Container(dashboard_content, padding=20)
self.page.update()
def show_customers(self):
"""Show customers management view"""
customers = self.get_customers()
# Customer list
customer_list = ft.ListView(
expand=True,
spacing=10,
padding=10
)
for customer in customers:
customer_list.controls.append(
ft.Card(
content=ft.Container(
content=ft.ListTile(
title=ft.Text(customer['name']),
subtitle=ft.Text(customer['company'] or ''),
trailing=ft.Row([
ft.IconButton(
icon=ft.icons.EDIT,
on_click=lambda e, c=customer: self.edit_customer(c)
),
ft.IconButton(
icon=ft.icons.DELETE,
on_click=lambda e, c=customer: self.delete_customer(c)
)
])
),
padding=10
)
)
)
# Add customer button
add_button = ft.ElevatedButton(
"顧客を追加",
icon=ft.icons.ADD,
on_click=self.add_customer_dialog
)
customers_content = ft.Column([
ft.Row([
ft.Text("顧客管理", size=24, weight=ft.FontWeight.BOLD),
add_button
], alignment=ft.MainAxisAlignment.SPACE_BETWEEN),
ft.Divider(),
customer_list
], scroll=ft.ScrollMode.AUTO)
self.content_area.content = ft.Container(customers_content, padding=20)
self.page.update()
def show_products(self):
"""Show products management view"""
products = self.get_products()
# Product list
product_list = ft.ListView(
expand=True,
spacing=10,
padding=10
)
for product in products:
product_list.controls.append(
ft.Card(
content=ft.Container(
content=ft.ListTile(
title=ft.Text(product['name']),
subtitle=ft.Text(f"¥{product['price']:,.0f} | 在庫: {product['stock']}"),
trailing=ft.Row([
ft.IconButton(
icon=ft.icons.EDIT,
on_click=lambda e, p=product: self.edit_product(p)
),
ft.IconButton(
icon=ft.icons.DELETE,
on_click=lambda e, p=product: self.delete_product(p)
)
])
),
padding=10
)
)
)
# Add product button
add_button = ft.ElevatedButton(
"商品を追加",
icon=ft.icons.ADD,
on_click=self.add_product_dialog
)
products_content = ft.Column([
ft.Row([
ft.Text("商品管理", size=24, weight=ft.FontWeight.BOLD),
add_button
], alignment=ft.MainAxisAlignment.SPACE_BETWEEN),
ft.Divider(),
product_list
], scroll=ft.ScrollMode.AUTO)
self.content_area.content = ft.Container(products_content, padding=20)
self.page.update()
def show_sales(self):
"""Show sales management view"""
sales = self.get_sales()
# Sales list
sales_list = ft.ListView(
expand=True,
spacing=10,
padding=10
)
for sale in sales:
sales_list.controls.append(
ft.Card(
content=ft.Container(
content=ft.ListTile(
title=ft.Text(f"{sale['customer_name']} - {sale['product_name']}"),
subtitle=ft.Text(f"{sale['quantity']}× ¥{sale['unit_price']:,.0f} = ¥{sale['total_price']:,.0f}"),
trailing=ft.Text(sale['sale_date'].split()[0])
),
padding=10
)
)
)
# Add sale button
add_button = ft.ElevatedButton(
"売上を追加",
icon=ft.icons.ADD,
on_click=self.add_sale_dialog
)
sales_content = ft.Column([
ft.Row([
ft.Text("売上管理", size=24, weight=ft.FontWeight.BOLD),
add_button
], alignment=ft.MainAxisAlignment.SPACE_BETWEEN),
ft.Divider(),
sales_list
], scroll=ft.ScrollMode.AUTO)
self.content_area.content = ft.Container(sales_content, padding=20)
self.page.update()
def get_customers(self) -> List[Dict]:
"""Get all customers from database"""
conn = sqlite3.connect('sales.db')
cursor = conn.cursor()
cursor.execute("SELECT * FROM customers ORDER BY created_at DESC")
customers = []
for row in cursor.fetchall():
customers.append({
'id': row[0],
'name': row[1],
'company': row[2],
'phone': row[3],
'email': row[4],
'address': row[5],
'created_at': row[6]
})
conn.close()
return customers
def get_products(self) -> List[Dict]:
"""Get all products from database"""
conn = sqlite3.connect('sales.db')
cursor = conn.cursor()
cursor.execute("SELECT * FROM products ORDER BY created_at DESC")
products = []
for row in cursor.fetchall():
products.append({
'id': row[0],
'name': row[1],
'code': row[2],
'price': row[3],
'stock': row[4],
'description': row[5],
'created_at': row[6]
})
conn.close()
return products
def get_sales(self) -> List[Dict]:
"""Get all sales with customer and product names"""
conn = sqlite3.connect('sales.db')
cursor = conn.cursor()
cursor.execute('''
SELECT s.*, c.name as customer_name, p.name as product_name
FROM sales s
LEFT JOIN customers c ON s.customer_id = c.id
LEFT JOIN products p ON s.product_id = p.id
ORDER BY s.sale_date DESC
''')
sales = []
for row in cursor.fetchall():
sales.append({
'id': row[0],
'customer_id': row[1],
'product_id': row[2],
'quantity': row[3],
'unit_price': row[4],
'total_price': row[5],
'sale_date': row[6],
'customer_name': row[7] or '不明',
'product_name': row[8] or '不明'
})
conn.close()
return sales
def add_customer_dialog(self, e):
"""Show add customer dialog"""
name_field = ft.TextField(label="氏名", autofocus=True)
company_field = ft.TextField(label="会社名")
phone_field = ft.TextField(label="電話番号")
email_field = ft.TextField(label="メールアドレス")
address_field = ft.TextField(label="住所")
def save_customer(e):
conn = sqlite3.connect('sales.db')
cursor = conn.cursor()
cursor.execute('''
INSERT INTO customers (name, company, phone, email, address)
VALUES (?, ?, ?, ?, ?)
''', (name_field.value, company_field.value, phone_field.value,
email_field.value, address_field.value))
conn.commit()
conn.close()
dialog.open = False
self.show_customers()
self.page.update()
dialog = ft.AlertDialog(
title=ft.Text("顧客を追加"),
content=ft.Column([
name_field,
company_field,
phone_field,
email_field,
address_field
], tight=True),
actions=[
ft.TextButton("キャンセル", on_click=lambda e: self.close_dialog(dialog)),
ft.TextButton("保存", on_click=save_customer)
],
actions_alignment=ft.MainAxisAlignment.END
)
self.page.dialog = dialog
dialog.open = True
self.page.update()
def add_product_dialog(self, e):
"""Show add product dialog"""
name_field = ft.TextField(label="商品名", autofocus=True)
code_field = ft.TextField(label="商品コード")
price_field = ft.TextField(label="価格", keyboard_type=ft.KeyboardType.NUMBER)
stock_field = ft.TextField(label="在庫数", keyboard_type=ft.KeyboardType.NUMBER)
description_field = ft.TextField(label="説明", multiline=True)
def save_product(e):
conn = sqlite3.connect('sales.db')
cursor = conn.cursor()
cursor.execute('''
INSERT INTO products (name, code, price, stock, description)
VALUES (?, ?, ?, ?, ?)
''', (name_field.value, code_field.value, float(price_field.value or 0),
int(stock_field.value or 0), description_field.value))
conn.commit()
conn.close()
dialog.open = False
self.show_products()
self.page.update()
dialog = ft.AlertDialog(
title=ft.Text("商品を追加"),
content=ft.Column([
name_field,
code_field,
price_field,
stock_field,
description_field
], tight=True),
actions=[
ft.TextButton("キャンセル", on_click=lambda e: self.close_dialog(dialog)),
ft.TextButton("保存", on_click=save_product)
],
actions_alignment=ft.MainAxisAlignment.END
)
self.page.dialog = dialog
dialog.open = True
self.page.update()
def add_sale_dialog(self, e):
"""Show add sale dialog"""
customers = self.get_customers()
products = self.get_products()
customer_dropdown = ft.Dropdown(
label="顧客",
options=[ft.dropdown.Option(c['name'], key=str(c['id'])) for c in customers]
)
product_dropdown = ft.Dropdown(
label="商品",
options=[ft.dropdown.Option(p['name'], key=str(p['id'])) for p in products],
on_change=lambda e: self.update_price_display(e, product_dropdown, price_field, quantity_field, total_field)
)
quantity_field = ft.TextField(
label="数量",
value="1",
keyboard_type=ft.KeyboardType.NUMBER,
on_change=lambda e: self.calculate_total(price_field, quantity_field, total_field)
)
price_field = ft.TextField(
label="単価",
keyboard_type=ft.KeyboardType.NUMBER,
on_change=lambda e: self.calculate_total(price_field, quantity_field, total_field)
)
total_field = ft.TextField(label="合計", read_only=True)
def save_sale(e):
conn = sqlite3.connect('sales.db')
cursor = conn.cursor()
cursor.execute('''
INSERT INTO sales (customer_id, product_id, quantity, unit_price, total_price)
VALUES (?, ?, ?, ?, ?)
''', (int(customer_dropdown.value), int(product_dropdown.value),
int(quantity_field.value), float(price_field.value),
float(total_field.value)))
conn.commit()
conn.close()
dialog.open = False
self.show_sales()
self.page.update()
dialog = ft.AlertDialog(
title=ft.Text("売上を追加"),
content=ft.Column([
customer_dropdown,
product_dropdown,
quantity_field,
price_field,
total_field
], tight=True),
actions=[
ft.TextButton("キャンセル", on_click=lambda e: self.close_dialog(dialog)),
ft.TextButton("保存", on_click=save_sale)
],
actions_alignment=ft.MainAxisAlignment.END
)
self.page.dialog = dialog
dialog.open = True
self.page.update()
def update_price_display(self, e, product_dropdown, price_field, quantity_field, total_field):
"""Update price when product is selected"""
if product_dropdown.value:
conn = sqlite3.connect('sales.db')
cursor = conn.cursor()
cursor.execute("SELECT price FROM products WHERE id = ?", (int(product_dropdown.value),))
result = cursor.fetchone()
conn.close()
if result:
price_field.value = str(result[0])
self.calculate_total(price_field, quantity_field, total_field)
self.page.update()
def calculate_total(self, price_field, quantity_field, total_field):
"""Calculate total price"""
try:
price = float(price_field.value or 0)
quantity = int(quantity_field.value or 1)
total_field.value = str(price * quantity)
except ValueError:
total_field.value = "0"
self.page.update()
def close_dialog(self, dialog):
"""Close dialog"""
dialog.open = False
self.page.update()
def edit_customer(self, customer):
"""Edit customer (placeholder)"""
self.page.snack_bar = ft.SnackBar(content=ft.Text("顧客編集機能は準備中です"))
self.page.snack_bar.open = True
self.page.update()
def delete_customer(self, customer):
"""Delete customer"""
def confirm_delete(e):
conn = sqlite3.connect('sales.db')
cursor = conn.cursor()
cursor.execute("DELETE FROM customers WHERE id = ?", (customer['id'],))
conn.commit()
conn.close()
dialog.open = False
self.show_customers()
self.page.update()
dialog = ft.AlertDialog(
title=ft.Text("確認"),
content=ft.Text(f"顧客「{customer['name']}」を削除してもよろしいですか?"),
actions=[
ft.TextButton("キャンセル", on_click=lambda e: self.close_dialog(dialog)),
ft.TextButton("削除", on_click=confirm_delete)
],
actions_alignment=ft.MainAxisAlignment.END
)
self.page.dialog = dialog
dialog.open = True
self.page.update()
def edit_product(self, product):
"""Edit product (placeholder)"""
self.page.snack_bar = ft.SnackBar(content=ft.Text("商品編集機能は準備中です"))
self.page.snack_bar.open = True
self.page.update()
def delete_product(self, product):
"""Delete product"""
def confirm_delete(e):
conn = sqlite3.connect('sales.db')
cursor = conn.cursor()
cursor.execute("DELETE FROM products WHERE id = ?", (product['id'],))
conn.commit()
conn.close()
dialog.open = False
self.show_products()
self.page.update()
dialog = ft.AlertDialog(
title=ft.Text("確認"),
content=ft.Text(f"商品「{product['name']}」を削除してもよろしいですか?"),
actions=[
ft.TextButton("キャンセル", on_click=lambda e: self.close_dialog(dialog)),
ft.TextButton("削除", on_click=confirm_delete)
],
actions_alignment=ft.MainAxisAlignment.END
)
self.page.dialog = dialog
dialog.open = True
self.page.update()
def show_data_export(self):
"""データ出力画面を表示"""
export_content = ft.Column([
ft.Text("データ出力", size=24, weight=ft.FontWeight.BOLD),
ft.Divider(),
ft.Card(
content=ft.Container(
content=ft.Column([
ft.Text("JSON形式で出力", size=16, weight=ft.FontWeight.BOLD),
ft.Text("全データをJSON形式でエクスポートします"),
ft.ElevatedButton(
"JSON出力",
icon=ft.icons.DOWNLOAD,
on_click=self.export_json
)
]),
padding=20
)
),
ft.Card(
content=ft.Container(
content=ft.Column([
ft.Text("CSV形式で出力", size=16, weight=ft.FontWeight.BOLD),
ft.Text("各テーブルをCSV形式でエクスポートします"),
ft.ElevatedButton(
"CSV出力",
icon=ft.icons.DOWNLOAD,
on_click=self.export_csv
)
]),
padding=20
)
)
], scroll=ft.ScrollMode.AUTO)
self.content_area.content = ft.Container(export_content, padding=20)
self.page.update()
def show_compliance(self):
"""コンプライアンス画面を表示"""
compliance_content = ft.Column([
ft.Text("電子帳簿保存法対応", size=24, weight=ft.FontWeight.BOLD),
ft.Divider(),
ft.Card(
content=ft.Container(
content=ft.Column([
ft.Text("データ整合性チェック", size=16, weight=ft.FontWeight.BOLD),
ft.Text("データの整合性を検証し、チェックサムを生成します"),
ft.ElevatedButton(
"整合性チェック",
icon=ft.icons.CHECK_CIRCLE,
on_click=self.check_integrity
)
]),
padding=20
)
),
ft.Card(
content=ft.Container(
content=ft.Column([
ft.Text("古いデータのアーカイブ", size=16, weight=ft.FontWeight.BOLD),
ft.Text("7年以上前のデータをアーカイブします10年保存"),
ft.ElevatedButton(
"データアーカイブ",
icon=ft.icons.ARCHIVE,
on_click=self.archive_data
)
]),
padding=20
)
),
ft.Card(
content=ft.Container(
content=ft.Column([
ft.Text("コンプライアンスレポート", size=16, weight=ft.FontWeight.BOLD),
ft.Text("電子帳簿保存法対応レポートを生成します"),
ft.ElevatedButton(
"レポート生成",
icon=ft.icons.ASSIGNMENT,
on_click=self.generate_compliance_report
)
]),
padding=20
)
)
], scroll=ft.ScrollMode.AUTO)
self.content_area.content = ft.Container(compliance_content, padding=20)
self.page.update()
def export_json(self, e):
"""JSON形式でデータをエクスポート"""
try:
file_path = self.exporter.export_to_json()
self.page.snack_bar = ft.SnackBar(
content=ft.Text(f"JSON出力完了: {file_path}"),
bgcolor=ft.colors.GREEN
)
except Exception as ex:
self.page.snack_bar = ft.SnackBar(
content=ft.Text(f"JSON出力エラー: {str(ex)}"),
bgcolor=ft.colors.RED
)
self.page.snack_bar.open = True
self.page.update()
def export_csv(self, e):
"""CSV形式でデータをエクスポート"""
try:
files = self.exporter.export_to_csv()
file_list = "\n".join([f"{k}: {v}" for k, v in files.items()])
self.page.snack_bar = ft.SnackBar(
content=ft.Text(f"CSV出力完了:\n{file_list}"),
bgcolor=ft.colors.GREEN
)
except Exception as ex:
self.page.snack_bar = ft.SnackBar(
content=ft.Text(f"CSV出力エラー: {str(ex)}"),
bgcolor=ft.colors.RED
)
self.page.snack_bar.open = True
self.page.update()
def check_integrity(self, e):
"""データ整合性チェック"""
try:
results = []
for table in ["customers", "products", "sales"]:
result = self.compliance.verify_data_integrity(table)
results.append(f"{table}: {result['status']} ({result['record_count']}件)")
message = "整合性チェック完了:\n" + "\n".join(results)
self.page.snack_bar = ft.SnackBar(
content=ft.Text(message),
bgcolor=ft.colors.GREEN
)
except Exception as ex:
self.page.snack_bar = ft.SnackBar(
content=ft.Text(f"整合性チェックエラー: {str(ex)}"),
bgcolor=ft.colors.RED
)
self.page.snack_bar.open = True
self.page.update()
def archive_data(self, e):
"""古いデータをアーカイブ"""
try:
result = self.compliance.archive_old_data()
if result["status"] == "SUCCESS":
message = f"アーカイブ完了: {result['archived_count']}"
bgcolor = ft.colors.GREEN
else:
message = f"アーカイブエラー: {result['error']}"
bgcolor = ft.colors.RED
self.page.snack_bar = ft.SnackBar(
content=ft.Text(message),
bgcolor=bgcolor
)
except Exception as ex:
self.page.snack_bar = ft.SnackBar(
content=ft.Text(f"アーカイブエラー: {str(ex)}"),
bgcolor=ft.colors.RED
)
self.page.snack_bar.open = True
self.page.update()
def generate_compliance_report(self, e):
"""コンプライアンスレポートを生成"""
try:
report = self.compliance.generate_compliance_report()
# レポート内容を表示
dialog = ft.AlertDialog(
title=ft.Text("コンプライアンスレポート"),
content=ft.Container(
content=ft.Column([
ft.Text(f"生成日時: {report['generated_date']}"),
ft.Text(f"データベース: {report['database_path']}"),
ft.Divider(),
ft.Text("テーブル状況:", weight=ft.FontWeight.BOLD),
*[ft.Text(f" {table}: {info['record_count']}")
for table, info in report['tables'].items()],
ft.Divider(),
ft.Text("監査ログ:", weight=ft.FontWeight.BOLD),
*[ft.Text(f" {op}: {count}")
for op, count in report['audit_summary'].items()]
], scroll=ft.ScrollMode.AUTO),
height=400
),
actions=[
ft.TextButton("閉じる", on_click=lambda e: self.close_dialog(dialog))
]
)
self.page.dialog = dialog
dialog.open = True
self.page.update()
except Exception as ex:
self.page.snack_bar = ft.SnackBar(
content=ft.Text(f"レポート生成エラー: {str(ex)}"),
bgcolor=ft.colors.RED
)
self.page.snack_bar.open = True
self.page.update()
def main(page: ft.Page):
app = SalesAssistant(page)
if __name__ == "__main__":
ft.app(target=main)

1
requirements.txt Normal file
View file

@ -0,0 +1 @@
flet>=0.21.0