356 lines
13 KiB
Python
356 lines
13 KiB
Python
"""電子帳簿保存法対応機能"""
|
||
import sqlite3
|
||
import datetime
|
||
import hashlib
|
||
import json
|
||
from typing import Dict, List, Optional
|
||
import logging
|
||
|
||
class ComplianceManager:
|
||
"""電子帳簿保存法対応マネージャー"""
|
||
|
||
def __init__(self, db_path: str = "sales.db"):
|
||
self.db_path = db_path
|
||
self.logger = logging.getLogger(__name__)
|
||
self.init_compliance_tables()
|
||
|
||
def init_compliance_tables(self):
|
||
"""電子帳簿保存法対応テーブルを初期化"""
|
||
conn = sqlite3.connect(self.db_path)
|
||
cursor = conn.cursor()
|
||
|
||
# 監査ログテーブル
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS audit_logs (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
table_name TEXT NOT NULL,
|
||
record_id INTEGER NOT NULL,
|
||
operation TEXT NOT NULL, -- 'CREATE', 'UPDATE', 'DELETE'
|
||
old_data TEXT, -- JSON形式
|
||
new_data TEXT, -- JSON形式
|
||
user_id TEXT DEFAULT 'system',
|
||
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
checksum TEXT NOT NULL
|
||
)
|
||
''')
|
||
|
||
# データ整合性チェックテーブル
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS integrity_checks (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
table_name TEXT NOT NULL,
|
||
record_count INTEGER NOT NULL,
|
||
checksum TEXT NOT NULL,
|
||
check_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
status TEXT DEFAULT 'OK' -- 'OK', 'ERROR', 'WARNING'
|
||
)
|
||
''')
|
||
|
||
# 長期アーカイブテーブル(10年保存用)
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS archive_sales (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
original_id INTEGER NOT NULL,
|
||
customer_id INTEGER,
|
||
product_id INTEGER,
|
||
quantity INTEGER NOT NULL,
|
||
unit_price REAL NOT NULL,
|
||
total_price REAL NOT NULL,
|
||
sale_date TIMESTAMP NOT NULL,
|
||
customer_name TEXT,
|
||
product_name TEXT,
|
||
archived_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
retention_end_date TIMESTAMP NOT NULL, -- 10年後
|
||
checksum TEXT NOT NULL
|
||
)
|
||
''')
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
def log_change(self, table_name: str, record_id: int, operation: str,
|
||
old_data: Dict = None, new_data: Dict = None, user_id: str = "system"):
|
||
"""データ変更を監査ログに記録"""
|
||
conn = sqlite3.connect(self.db_path)
|
||
cursor = conn.cursor()
|
||
|
||
# チェックサム計算
|
||
audit_data = {
|
||
"table": table_name,
|
||
"id": record_id,
|
||
"operation": operation,
|
||
"timestamp": datetime.datetime.now().isoformat(),
|
||
"old": old_data,
|
||
"new": new_data
|
||
}
|
||
checksum = self.calculate_checksum(audit_data)
|
||
|
||
cursor.execute('''
|
||
INSERT INTO audit_logs
|
||
(table_name, record_id, operation, old_data, new_data, user_id, checksum)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||
''', (
|
||
table_name, record_id, operation,
|
||
json.dumps(old_data, ensure_ascii=False) if old_data else None,
|
||
json.dumps(new_data, ensure_ascii=False) if new_data else None,
|
||
user_id, checksum
|
||
))
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
def calculate_checksum(self, data: Dict) -> str:
|
||
"""データのチェックサムを計算"""
|
||
data_str = json.dumps(data, sort_keys=True, ensure_ascii=False)
|
||
return hashlib.sha256(data_str.encode()).hexdigest()
|
||
|
||
def verify_data_integrity(self, table_name: str) -> Dict:
|
||
"""テーブルのデータ整合性を検証"""
|
||
conn = sqlite3.connect(self.db_path)
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
# テーブルの全データ取得
|
||
cursor.execute(f"SELECT * FROM {table_name}")
|
||
rows = cursor.fetchall()
|
||
columns = [description[0] for description in cursor.description]
|
||
|
||
# チェックサム計算
|
||
table_data = []
|
||
for row in rows:
|
||
record = dict(zip(columns, row))
|
||
table_data.append(record)
|
||
|
||
checksum = self.calculate_checksum({
|
||
"table": table_name,
|
||
"count": len(table_data),
|
||
"data": table_data
|
||
})
|
||
|
||
# 整合性チェック結果を記録
|
||
cursor.execute('''
|
||
INSERT INTO integrity_checks
|
||
(table_name, record_count, checksum, status)
|
||
VALUES (?, ?, ?, ?)
|
||
''', (table_name, len(table_data), checksum, 'OK'))
|
||
|
||
conn.commit()
|
||
|
||
return {
|
||
"status": "OK",
|
||
"table": table_name,
|
||
"record_count": len(table_data),
|
||
"checksum": checksum,
|
||
"check_date": datetime.datetime.now().isoformat()
|
||
}
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"整合性チェックエラー: {str(e)}")
|
||
return {
|
||
"status": "ERROR",
|
||
"table": table_name,
|
||
"error": str(e),
|
||
"check_date": datetime.datetime.now().isoformat()
|
||
}
|
||
finally:
|
||
conn.close()
|
||
|
||
def archive_old_data(self, years: int = 7) -> Dict:
|
||
"""古いデータをアーカイブ(デフォルト7年以上前)"""
|
||
conn = sqlite3.connect(self.db_path)
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
# アーカイブ対象日付を計算
|
||
archive_date = datetime.datetime.now() - datetime.timedelta(days=years*365)
|
||
retention_end = datetime.datetime.now() + datetime.timedelta(days=10*365)
|
||
|
||
# アーカイブ対象の売上データを取得
|
||
cursor.execute('''
|
||
SELECT s.*, c.name as customer_name, p.name as product_name
|
||
FROM sales s
|
||
LEFT JOIN customers c ON s.customer_id = c.id
|
||
LEFT JOIN products p ON s.product_id = p.id
|
||
WHERE s.sale_date < ?
|
||
''', (archive_date,))
|
||
|
||
old_sales = cursor.fetchall()
|
||
|
||
archived_count = 0
|
||
for sale in old_sales:
|
||
# アーカイブデータ作成
|
||
archive_data = {
|
||
"original_id": sale[0],
|
||
"customer_id": sale[1],
|
||
"product_id": sale[2],
|
||
"quantity": sale[3],
|
||
"unit_price": sale[4],
|
||
"total_price": sale[5],
|
||
"sale_date": sale[6],
|
||
"customer_name": sale[7],
|
||
"product_name": sale[8],
|
||
"retention_end_date": retention_end
|
||
}
|
||
|
||
checksum = self.calculate_checksum(archive_data)
|
||
|
||
# アーカイブテーブルに挿入
|
||
cursor.execute('''
|
||
INSERT INTO archive_sales
|
||
(original_id, customer_id, product_id, quantity, unit_price,
|
||
total_price, sale_date, customer_name, product_name,
|
||
retention_end_date, checksum)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
''', (
|
||
sale[0], sale[1], sale[2], sale[3], sale[4], sale[5],
|
||
sale[6], sale[7], sale[8], retention_end, checksum
|
||
))
|
||
|
||
archived_count += 1
|
||
|
||
# 元のテーブルから削除
|
||
cursor.execute("DELETE FROM sales WHERE id = ?", (sale[0],))
|
||
|
||
# 監査ログ記録
|
||
self.log_change("sales", sale[0], "ARCHIVE",
|
||
{"archived": True}, archive_data)
|
||
|
||
conn.commit()
|
||
|
||
return {
|
||
"status": "SUCCESS",
|
||
"archived_count": archived_count,
|
||
"archive_date": archive_date.isoformat(),
|
||
"retention_end": retention_end.isoformat()
|
||
}
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"アーカイブエラー: {str(e)}")
|
||
return {
|
||
"status": "ERROR",
|
||
"error": str(e)
|
||
}
|
||
finally:
|
||
conn.close()
|
||
|
||
def get_audit_trail(self, table_name: str = None, record_id: int = None) -> List[Dict]:
|
||
"""監査証跡を取得"""
|
||
conn = sqlite3.connect(self.db_path)
|
||
cursor = conn.cursor()
|
||
|
||
query = "SELECT * FROM audit_logs WHERE 1=1"
|
||
params = []
|
||
|
||
if table_name:
|
||
query += " AND table_name = ?"
|
||
params.append(table_name)
|
||
|
||
if record_id:
|
||
query += " AND record_id = ?"
|
||
params.append(record_id)
|
||
|
||
query += " ORDER BY timestamp DESC"
|
||
|
||
cursor.execute(query, params)
|
||
logs = []
|
||
|
||
for row in cursor.fetchall():
|
||
logs.append({
|
||
"id": row[0],
|
||
"table_name": row[1],
|
||
"record_id": row[2],
|
||
"operation": row[3],
|
||
"old_data": json.loads(row[4]) if row[4] else None,
|
||
"new_data": json.loads(row[5]) if row[5] else None,
|
||
"user_id": row[6],
|
||
"timestamp": row[7],
|
||
"checksum": row[8]
|
||
})
|
||
|
||
conn.close()
|
||
return logs
|
||
|
||
def generate_compliance_report(self) -> Dict:
|
||
"""電子帳簿保存法コンプライアンスレポートを生成"""
|
||
conn = sqlite3.connect(self.db_path)
|
||
cursor = conn.cursor()
|
||
|
||
report = {
|
||
"generated_date": datetime.datetime.now().isoformat(),
|
||
"database_path": self.db_path,
|
||
"tables": {},
|
||
"audit_summary": {},
|
||
"archive_summary": {},
|
||
"compliance_status": "COMPLIANT"
|
||
}
|
||
|
||
# 各テーブルの状況
|
||
for table in ["customers", "products", "sales"]:
|
||
cursor.execute(f"SELECT COUNT(*) FROM {table}")
|
||
count = cursor.fetchone()[0]
|
||
|
||
# 最新の整合性チェック
|
||
cursor.execute('''
|
||
SELECT status, check_date, checksum
|
||
FROM integrity_checks
|
||
WHERE table_name = ?
|
||
ORDER BY check_date DESC
|
||
LIMIT 1
|
||
''', (table,))
|
||
|
||
integrity_result = cursor.fetchone()
|
||
|
||
report["tables"][table] = {
|
||
"record_count": count,
|
||
"last_integrity_check": {
|
||
"status": integrity_result[0] if integrity_result else "NOT_CHECKED",
|
||
"date": integrity_result[1] if integrity_result else None,
|
||
"checksum": integrity_result[2] if integrity_result else None
|
||
}
|
||
}
|
||
|
||
# 監査ログサマリー
|
||
cursor.execute('''
|
||
SELECT operation, COUNT(*) as count
|
||
FROM audit_logs
|
||
WHERE timestamp > date('now', '-1 year')
|
||
GROUP BY operation
|
||
''')
|
||
|
||
audit_summary = {}
|
||
for row in cursor.fetchall():
|
||
audit_summary[row[0]] = row[1]
|
||
|
||
report["audit_summary"] = audit_summary
|
||
|
||
# アーカイブサマリー
|
||
cursor.execute('''
|
||
SELECT COUNT(*) as total,
|
||
MIN(archived_date) as oldest,
|
||
MAX(retention_end_date) as latest_retention
|
||
FROM archive_sales
|
||
''')
|
||
|
||
archive_result = cursor.fetchone()
|
||
if archive_result[0] > 0:
|
||
report["archive_summary"] = {
|
||
"archived_records": archive_result[0],
|
||
"oldest_archive": archive_result[1],
|
||
"latest_retention_end": archive_result[2]
|
||
}
|
||
|
||
conn.close()
|
||
return report
|
||
|
||
if __name__ == "__main__":
|
||
# テスト実行
|
||
compliance = ComplianceManager()
|
||
|
||
print("🔍 データ整合性チェック実行中...")
|
||
for table in ["customers", "products", "sales"]:
|
||
result = compliance.verify_data_integrity(table)
|
||
print(f"✅ {table}: {result['status']} ({result['record_count']}件)")
|
||
|
||
print("\n📋 コンプライアンスレポート生成...")
|
||
report = compliance.generate_compliance_report()
|
||
print(json.dumps(report, ensure_ascii=False, indent=2))
|