"""電子帳簿保存法対応機能""" import sqlite3 import datetime import hashlib import json from typing import Dict, List, Optional import logging class ComplianceManager: """電子帳簿保存法対応マネージャー""" def __init__(self, db_path: str = "sales.db"): self.db_path = db_path self.logger = logging.getLogger(__name__) self.init_compliance_tables() def init_compliance_tables(self): """電子帳簿保存法対応テーブルを初期化""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # 監査ログテーブル cursor.execute(''' CREATE TABLE IF NOT EXISTS audit_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, table_name TEXT NOT NULL, record_id INTEGER NOT NULL, operation TEXT NOT NULL, -- 'CREATE', 'UPDATE', 'DELETE' old_data TEXT, -- JSON形式 new_data TEXT, -- JSON形式 user_id TEXT DEFAULT 'system', timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, checksum TEXT NOT NULL ) ''') # データ整合性チェックテーブル cursor.execute(''' CREATE TABLE IF NOT EXISTS integrity_checks ( id INTEGER PRIMARY KEY AUTOINCREMENT, table_name TEXT NOT NULL, record_count INTEGER NOT NULL, checksum TEXT NOT NULL, check_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, status TEXT DEFAULT 'OK' -- 'OK', 'ERROR', 'WARNING' ) ''') # 長期アーカイブテーブル(10年保存用) cursor.execute(''' CREATE TABLE IF NOT EXISTS archive_sales ( id INTEGER PRIMARY KEY AUTOINCREMENT, original_id INTEGER NOT NULL, customer_id INTEGER, product_id INTEGER, quantity INTEGER NOT NULL, unit_price REAL NOT NULL, total_price REAL NOT NULL, sale_date TIMESTAMP NOT NULL, customer_name TEXT, product_name TEXT, archived_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, retention_end_date TIMESTAMP NOT NULL, -- 10年後 checksum TEXT NOT NULL ) ''') conn.commit() conn.close() def log_change(self, table_name: str, record_id: int, operation: str, old_data: Dict = None, new_data: Dict = None, user_id: str = "system"): """データ変更を監査ログに記録""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # チェックサム計算 audit_data = { "table": table_name, "id": record_id, "operation": operation, "timestamp": datetime.datetime.now().isoformat(), "old": old_data, "new": new_data } checksum = self.calculate_checksum(audit_data) cursor.execute(''' INSERT INTO audit_logs (table_name, record_id, operation, old_data, new_data, user_id, checksum) VALUES (?, ?, ?, ?, ?, ?, ?) ''', ( table_name, record_id, operation, json.dumps(old_data, ensure_ascii=False) if old_data else None, json.dumps(new_data, ensure_ascii=False) if new_data else None, user_id, checksum )) conn.commit() conn.close() def calculate_checksum(self, data: Dict) -> str: """データのチェックサムを計算""" data_str = json.dumps(data, sort_keys=True, ensure_ascii=False) return hashlib.sha256(data_str.encode()).hexdigest() def verify_data_integrity(self, table_name: str) -> Dict: """テーブルのデータ整合性を検証""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: # テーブルの全データ取得 cursor.execute(f"SELECT * FROM {table_name}") rows = cursor.fetchall() columns = [description[0] for description in cursor.description] # チェックサム計算 table_data = [] for row in rows: record = dict(zip(columns, row)) table_data.append(record) checksum = self.calculate_checksum({ "table": table_name, "count": len(table_data), "data": table_data }) # 整合性チェック結果を記録 cursor.execute(''' INSERT INTO integrity_checks (table_name, record_count, checksum, status) VALUES (?, ?, ?, ?) ''', (table_name, len(table_data), checksum, 'OK')) conn.commit() return { "status": "OK", "table": table_name, "record_count": len(table_data), "checksum": checksum, "check_date": datetime.datetime.now().isoformat() } except Exception as e: self.logger.error(f"整合性チェックエラー: {str(e)}") return { "status": "ERROR", "table": table_name, "error": str(e), "check_date": datetime.datetime.now().isoformat() } finally: conn.close() def archive_old_data(self, years: int = 7) -> Dict: """古いデータをアーカイブ(デフォルト7年以上前)""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: # アーカイブ対象日付を計算 archive_date = datetime.datetime.now() - datetime.timedelta(days=years*365) retention_end = datetime.datetime.now() + datetime.timedelta(days=10*365) # アーカイブ対象の売上データを取得 cursor.execute(''' SELECT s.*, c.name as customer_name, p.name as product_name FROM sales s LEFT JOIN customers c ON s.customer_id = c.id LEFT JOIN products p ON s.product_id = p.id WHERE s.sale_date < ? ''', (archive_date,)) old_sales = cursor.fetchall() archived_count = 0 for sale in old_sales: # アーカイブデータ作成 archive_data = { "original_id": sale[0], "customer_id": sale[1], "product_id": sale[2], "quantity": sale[3], "unit_price": sale[4], "total_price": sale[5], "sale_date": sale[6], "customer_name": sale[7], "product_name": sale[8], "retention_end_date": retention_end } checksum = self.calculate_checksum(archive_data) # アーカイブテーブルに挿入 cursor.execute(''' INSERT INTO archive_sales (original_id, customer_id, product_id, quantity, unit_price, total_price, sale_date, customer_name, product_name, retention_end_date, checksum) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( sale[0], sale[1], sale[2], sale[3], sale[4], sale[5], sale[6], sale[7], sale[8], retention_end, checksum )) archived_count += 1 # 元のテーブルから削除 cursor.execute("DELETE FROM sales WHERE id = ?", (sale[0],)) # 監査ログ記録 self.log_change("sales", sale[0], "ARCHIVE", {"archived": True}, archive_data) conn.commit() return { "status": "SUCCESS", "archived_count": archived_count, "archive_date": archive_date.isoformat(), "retention_end": retention_end.isoformat() } except Exception as e: self.logger.error(f"アーカイブエラー: {str(e)}") return { "status": "ERROR", "error": str(e) } finally: conn.close() def get_audit_trail(self, table_name: str = None, record_id: int = None) -> List[Dict]: """監査証跡を取得""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() query = "SELECT * FROM audit_logs WHERE 1=1" params = [] if table_name: query += " AND table_name = ?" params.append(table_name) if record_id: query += " AND record_id = ?" params.append(record_id) query += " ORDER BY timestamp DESC" cursor.execute(query, params) logs = [] for row in cursor.fetchall(): logs.append({ "id": row[0], "table_name": row[1], "record_id": row[2], "operation": row[3], "old_data": json.loads(row[4]) if row[4] else None, "new_data": json.loads(row[5]) if row[5] else None, "user_id": row[6], "timestamp": row[7], "checksum": row[8] }) conn.close() return logs def generate_compliance_report(self) -> Dict: """電子帳簿保存法コンプライアンスレポートを生成""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() report = { "generated_date": datetime.datetime.now().isoformat(), "database_path": self.db_path, "tables": {}, "audit_summary": {}, "archive_summary": {}, "compliance_status": "COMPLIANT" } # 各テーブルの状況 for table in ["customers", "products", "sales"]: cursor.execute(f"SELECT COUNT(*) FROM {table}") count = cursor.fetchone()[0] # 最新の整合性チェック cursor.execute(''' SELECT status, check_date, checksum FROM integrity_checks WHERE table_name = ? ORDER BY check_date DESC LIMIT 1 ''', (table,)) integrity_result = cursor.fetchone() report["tables"][table] = { "record_count": count, "last_integrity_check": { "status": integrity_result[0] if integrity_result else "NOT_CHECKED", "date": integrity_result[1] if integrity_result else None, "checksum": integrity_result[2] if integrity_result else None } } # 監査ログサマリー cursor.execute(''' SELECT operation, COUNT(*) as count FROM audit_logs WHERE timestamp > date('now', '-1 year') GROUP BY operation ''') audit_summary = {} for row in cursor.fetchall(): audit_summary[row[0]] = row[1] report["audit_summary"] = audit_summary # アーカイブサマリー cursor.execute(''' SELECT COUNT(*) as total, MIN(archived_date) as oldest, MAX(retention_end_date) as latest_retention FROM archive_sales ''') archive_result = cursor.fetchone() if archive_result[0] > 0: report["archive_summary"] = { "archived_records": archive_result[0], "oldest_archive": archive_result[1], "latest_retention_end": archive_result[2] } conn.close() return report if __name__ == "__main__": # テスト実行 compliance = ComplianceManager() print("🔍 データ整合性チェック実行中...") for table in ["customers", "products", "sales"]: result = compliance.verify_data_integrity(table) print(f"✅ {table}: {result['status']} ({result['record_count']}件)") print("\n📋 コンプライアンスレポート生成...") report = compliance.generate_compliance_report() print(json.dumps(report, ensure_ascii=False, indent=2))