h-1.flet.3/compliance.py
2026-02-19 11:53:09 +09:00

356 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""電子帳簿保存法対応機能"""
import sqlite3
import datetime
import hashlib
import json
from typing import Dict, List, Optional
import logging
class ComplianceManager:
"""電子帳簿保存法対応マネージャー"""
def __init__(self, db_path: str = "sales.db"):
self.db_path = db_path
self.logger = logging.getLogger(__name__)
self.init_compliance_tables()
def init_compliance_tables(self):
"""電子帳簿保存法対応テーブルを初期化"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 監査ログテーブル
cursor.execute('''
CREATE TABLE IF NOT EXISTS audit_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
table_name TEXT NOT NULL,
record_id INTEGER NOT NULL,
operation TEXT NOT NULL, -- 'CREATE', 'UPDATE', 'DELETE'
old_data TEXT, -- JSON形式
new_data TEXT, -- JSON形式
user_id TEXT DEFAULT 'system',
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
checksum TEXT NOT NULL
)
''')
# データ整合性チェックテーブル
cursor.execute('''
CREATE TABLE IF NOT EXISTS integrity_checks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
table_name TEXT NOT NULL,
record_count INTEGER NOT NULL,
checksum TEXT NOT NULL,
check_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status TEXT DEFAULT 'OK' -- 'OK', 'ERROR', 'WARNING'
)
''')
# 長期アーカイブテーブル10年保存用
cursor.execute('''
CREATE TABLE IF NOT EXISTS archive_sales (
id INTEGER PRIMARY KEY AUTOINCREMENT,
original_id INTEGER NOT NULL,
customer_id INTEGER,
product_id INTEGER,
quantity INTEGER NOT NULL,
unit_price REAL NOT NULL,
total_price REAL NOT NULL,
sale_date TIMESTAMP NOT NULL,
customer_name TEXT,
product_name TEXT,
archived_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
retention_end_date TIMESTAMP NOT NULL, -- 10年後
checksum TEXT NOT NULL
)
''')
conn.commit()
conn.close()
def log_change(self, table_name: str, record_id: int, operation: str,
old_data: Dict = None, new_data: Dict = None, user_id: str = "system"):
"""データ変更を監査ログに記録"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# チェックサム計算
audit_data = {
"table": table_name,
"id": record_id,
"operation": operation,
"timestamp": datetime.datetime.now().isoformat(),
"old": old_data,
"new": new_data
}
checksum = self.calculate_checksum(audit_data)
cursor.execute('''
INSERT INTO audit_logs
(table_name, record_id, operation, old_data, new_data, user_id, checksum)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
table_name, record_id, operation,
json.dumps(old_data, ensure_ascii=False) if old_data else None,
json.dumps(new_data, ensure_ascii=False) if new_data else None,
user_id, checksum
))
conn.commit()
conn.close()
def calculate_checksum(self, data: Dict) -> str:
"""データのチェックサムを計算"""
data_str = json.dumps(data, sort_keys=True, ensure_ascii=False)
return hashlib.sha256(data_str.encode()).hexdigest()
def verify_data_integrity(self, table_name: str) -> Dict:
"""テーブルのデータ整合性を検証"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
# テーブルの全データ取得
cursor.execute(f"SELECT * FROM {table_name}")
rows = cursor.fetchall()
columns = [description[0] for description in cursor.description]
# チェックサム計算
table_data = []
for row in rows:
record = dict(zip(columns, row))
table_data.append(record)
checksum = self.calculate_checksum({
"table": table_name,
"count": len(table_data),
"data": table_data
})
# 整合性チェック結果を記録
cursor.execute('''
INSERT INTO integrity_checks
(table_name, record_count, checksum, status)
VALUES (?, ?, ?, ?)
''', (table_name, len(table_data), checksum, 'OK'))
conn.commit()
return {
"status": "OK",
"table": table_name,
"record_count": len(table_data),
"checksum": checksum,
"check_date": datetime.datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"整合性チェックエラー: {str(e)}")
return {
"status": "ERROR",
"table": table_name,
"error": str(e),
"check_date": datetime.datetime.now().isoformat()
}
finally:
conn.close()
def archive_old_data(self, years: int = 7) -> Dict:
"""古いデータをアーカイブデフォルト7年以上前"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
# アーカイブ対象日付を計算
archive_date = datetime.datetime.now() - datetime.timedelta(days=years*365)
retention_end = datetime.datetime.now() + datetime.timedelta(days=10*365)
# アーカイブ対象の売上データを取得
cursor.execute('''
SELECT s.*, c.name as customer_name, p.name as product_name
FROM sales s
LEFT JOIN customers c ON s.customer_id = c.id
LEFT JOIN products p ON s.product_id = p.id
WHERE s.sale_date < ?
''', (archive_date,))
old_sales = cursor.fetchall()
archived_count = 0
for sale in old_sales:
# アーカイブデータ作成
archive_data = {
"original_id": sale[0],
"customer_id": sale[1],
"product_id": sale[2],
"quantity": sale[3],
"unit_price": sale[4],
"total_price": sale[5],
"sale_date": sale[6],
"customer_name": sale[7],
"product_name": sale[8],
"retention_end_date": retention_end
}
checksum = self.calculate_checksum(archive_data)
# アーカイブテーブルに挿入
cursor.execute('''
INSERT INTO archive_sales
(original_id, customer_id, product_id, quantity, unit_price,
total_price, sale_date, customer_name, product_name,
retention_end_date, checksum)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
sale[0], sale[1], sale[2], sale[3], sale[4], sale[5],
sale[6], sale[7], sale[8], retention_end, checksum
))
archived_count += 1
# 元のテーブルから削除
cursor.execute("DELETE FROM sales WHERE id = ?", (sale[0],))
# 監査ログ記録
self.log_change("sales", sale[0], "ARCHIVE",
{"archived": True}, archive_data)
conn.commit()
return {
"status": "SUCCESS",
"archived_count": archived_count,
"archive_date": archive_date.isoformat(),
"retention_end": retention_end.isoformat()
}
except Exception as e:
self.logger.error(f"アーカイブエラー: {str(e)}")
return {
"status": "ERROR",
"error": str(e)
}
finally:
conn.close()
def get_audit_trail(self, table_name: str = None, record_id: int = None) -> List[Dict]:
"""監査証跡を取得"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
query = "SELECT * FROM audit_logs WHERE 1=1"
params = []
if table_name:
query += " AND table_name = ?"
params.append(table_name)
if record_id:
query += " AND record_id = ?"
params.append(record_id)
query += " ORDER BY timestamp DESC"
cursor.execute(query, params)
logs = []
for row in cursor.fetchall():
logs.append({
"id": row[0],
"table_name": row[1],
"record_id": row[2],
"operation": row[3],
"old_data": json.loads(row[4]) if row[4] else None,
"new_data": json.loads(row[5]) if row[5] else None,
"user_id": row[6],
"timestamp": row[7],
"checksum": row[8]
})
conn.close()
return logs
def generate_compliance_report(self) -> Dict:
"""電子帳簿保存法コンプライアンスレポートを生成"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
report = {
"generated_date": datetime.datetime.now().isoformat(),
"database_path": self.db_path,
"tables": {},
"audit_summary": {},
"archive_summary": {},
"compliance_status": "COMPLIANT"
}
# 各テーブルの状況
for table in ["customers", "products", "sales"]:
cursor.execute(f"SELECT COUNT(*) FROM {table}")
count = cursor.fetchone()[0]
# 最新の整合性チェック
cursor.execute('''
SELECT status, check_date, checksum
FROM integrity_checks
WHERE table_name = ?
ORDER BY check_date DESC
LIMIT 1
''', (table,))
integrity_result = cursor.fetchone()
report["tables"][table] = {
"record_count": count,
"last_integrity_check": {
"status": integrity_result[0] if integrity_result else "NOT_CHECKED",
"date": integrity_result[1] if integrity_result else None,
"checksum": integrity_result[2] if integrity_result else None
}
}
# 監査ログサマリー
cursor.execute('''
SELECT operation, COUNT(*) as count
FROM audit_logs
WHERE timestamp > date('now', '-1 year')
GROUP BY operation
''')
audit_summary = {}
for row in cursor.fetchall():
audit_summary[row[0]] = row[1]
report["audit_summary"] = audit_summary
# アーカイブサマリー
cursor.execute('''
SELECT COUNT(*) as total,
MIN(archived_date) as oldest,
MAX(retention_end_date) as latest_retention
FROM archive_sales
''')
archive_result = cursor.fetchone()
if archive_result[0] > 0:
report["archive_summary"] = {
"archived_records": archive_result[0],
"oldest_archive": archive_result[1],
"latest_retention_end": archive_result[2]
}
conn.close()
return report
if __name__ == "__main__":
# テスト実行
compliance = ComplianceManager()
print("🔍 データ整合性チェック実行中...")
for table in ["customers", "products", "sales"]:
result = compliance.verify_data_integrity(table)
print(f"{table}: {result['status']} ({result['record_count']}件)")
print("\n📋 コンプライアンスレポート生成...")
report = compliance.generate_compliance_report()
print(json.dumps(report, ensure_ascii=False, indent=2))