h-1.flet.3/services/repositories.py
2026-02-23 08:57:59 +09:00

837 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
データリポジトリ層
SQLiteデータベース操作を抽象化
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import sqlite3
import json
from datetime import datetime
from typing import List, Optional, Dict, Any
from models.invoice_models import Invoice, Customer, InvoiceItem, DocumentType, Product
import logging
import uuid as uuid_module
import hashlib
logging.basicConfig(level=logging.INFO)
class InvoiceRepository:
"""伝票データリポジトリ"""
def __init__(self, db_path: str = "sales.db"):
self.db_path = db_path
self._init_tables()
def _init_tables(self):
"""テーブル初期化"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# メタ情報node_id等
cursor.execute(
'''
CREATE TABLE IF NOT EXISTS meta (
key TEXT PRIMARY KEY,
value TEXT
)
'''
)
# 伝票テーブル
cursor.execute('''
CREATE TABLE IF NOT EXISTS invoices (
id INTEGER PRIMARY KEY AUTOINCREMENT,
uuid TEXT UNIQUE,
document_type TEXT,
customer_id INTEGER,
customer_name TEXT,
customer_address TEXT,
customer_phone TEXT,
amount REAL,
tax REAL,
total_amount REAL,
date TEXT,
invoice_number TEXT,
notes TEXT,
file_path TEXT,
node_id TEXT,
payload_json TEXT,
payload_hash TEXT,
prev_chain_hash TEXT,
chain_hash TEXT,
pdf_template_version TEXT,
company_info_version TEXT,
is_offset INTEGER DEFAULT 0,
offset_target_uuid TEXT,
pdf_generated_at TEXT,
pdf_sha256 TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
# 明細テーブル
cursor.execute('''
CREATE TABLE IF NOT EXISTS invoice_items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
invoice_id INTEGER,
description TEXT,
quantity INTEGER,
unit_price INTEGER,
is_discount BOOLEAN,
product_id INTEGER,
FOREIGN KEY (invoice_id) REFERENCES invoices(id)
)
''')
# 商品マスタテーブル
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
unit_price INTEGER NOT NULL,
description TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
# 既存DB向けの軽量マイグレーション列追加
self._migrate_schema()
def _migrate_schema(self):
"""既存DBのスキーマを新仕様に追従ALTER TABLEで列追加"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# meta は _init_tables で作成済みだが念のため
cursor.execute(
'''
CREATE TABLE IF NOT EXISTS meta (
key TEXT PRIMARY KEY,
value TEXT
)
'''
)
def ensure_column(table: str, col: str, col_def: str):
cursor.execute(f"PRAGMA table_info({table})")
cols = {r[1] for r in cursor.fetchall()}
if col not in cols:
cursor.execute(f"ALTER TABLE {table} ADD COLUMN {col_def}")
ensure_column("invoices", "customer_id", "customer_id INTEGER")
ensure_column("invoices", "node_id", "node_id TEXT")
ensure_column("invoices", "payload_json", "payload_json TEXT")
ensure_column("invoices", "payload_hash", "payload_hash TEXT")
ensure_column("invoices", "prev_chain_hash", "prev_chain_hash TEXT")
ensure_column("invoices", "chain_hash", "chain_hash TEXT")
ensure_column("invoices", "pdf_template_version", "pdf_template_version TEXT")
ensure_column("invoices", "company_info_version", "company_info_version TEXT")
ensure_column("invoices", "is_offset", "is_offset INTEGER DEFAULT 0")
ensure_column("invoices", "offset_target_uuid", "offset_target_uuid TEXT")
ensure_column("invoices", "pdf_generated_at", "pdf_generated_at TEXT")
ensure_column("invoices", "pdf_sha256", "pdf_sha256 TEXT")
ensure_column("invoices", "submitted_to_tax_authority", "submitted_to_tax_authority INTEGER DEFAULT 0")
# Explorer向けインデックス大量データ閲覧時の検索性能を確保
cursor.execute("CREATE INDEX IF NOT EXISTS idx_invoices_date ON invoices(date)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_invoices_customer_name ON invoices(customer_name)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_invoices_document_type ON invoices(document_type)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_invoices_updated_at ON invoices(updated_at)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_invoices_is_offset ON invoices(is_offset)")
conn.commit()
def get_node_id(self) -> str:
"""DB単位のnode_id(UUID)を取得(未設定なら生成して保存)"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT value FROM meta WHERE key = ?", ("node_id",))
row = cursor.fetchone()
if row and row[0]:
return row[0]
node_id = str(uuid_module.uuid4())
cursor.execute(
"INSERT OR REPLACE INTO meta (key, value) VALUES (?, ?)",
("node_id", node_id),
)
conn.commit()
return node_id
def get_last_chain_hash(self, node_id: str) -> str:
"""直前のchain_hashを取得無ければgenesis=all-zero"""
genesis = "0" * 64
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT chain_hash FROM invoices WHERE node_id = ? AND chain_hash IS NOT NULL ORDER BY id DESC LIMIT 1",
(node_id,),
)
row = cursor.fetchone()
return row[0] if row and row[0] else genesis
except Exception:
return genesis
def verify_chain(self, node_id: Optional[str] = None, limit: Optional[int] = None) -> Dict[str, Any]:
"""DB内の連鎖ハッシュ整合性を検証監査用
Returns:
{
'node_id': str,
'checked': int,
'ok': bool,
'errors': [ { ... } ]
}
"""
if not node_id:
node_id = self.get_node_id()
genesis = "0" * 64
errors: List[Dict[str, Any]] = []
checked = 0
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
sql = "SELECT id, uuid, node_id, payload_json, payload_hash, prev_chain_hash, chain_hash FROM invoices WHERE node_id = ? ORDER BY id ASC"
params = [node_id]
if limit is not None:
sql += " LIMIT ?"
params.append(int(limit))
cursor.execute(sql, tuple(params))
rows = cursor.fetchall()
expected_prev = genesis
for (inv_id, inv_uuid, inv_node_id, payload_json, payload_hash, prev_chain_hash, chain_hash) in rows:
checked += 1
if not payload_json:
errors.append(
{
"invoice_id": inv_id,
"uuid": inv_uuid,
"type": "missing_payload_json",
}
)
# 検証不能なのでチェーンはそこで停止扱い
break
expected_payload_hash = hashlib.sha256(payload_json.encode("utf-8")).hexdigest()
if payload_hash != expected_payload_hash:
errors.append(
{
"invoice_id": inv_id,
"uuid": inv_uuid,
"type": "payload_hash_mismatch",
"db": payload_hash,
"expected": expected_payload_hash,
}
)
break
if (prev_chain_hash or genesis) != expected_prev:
errors.append(
{
"invoice_id": inv_id,
"uuid": inv_uuid,
"type": "prev_chain_hash_mismatch",
"db": prev_chain_hash,
"expected": expected_prev,
}
)
break
expected_chain_hash = hashlib.sha256(f"{expected_prev}:{expected_payload_hash}".encode("utf-8")).hexdigest()
if chain_hash != expected_chain_hash:
errors.append(
{
"invoice_id": inv_id,
"uuid": inv_uuid,
"type": "chain_hash_mismatch",
"db": chain_hash,
"expected": expected_chain_hash,
}
)
break
expected_prev = expected_chain_hash
except Exception as e:
errors.append({"type": "exception", "message": str(e)})
return {
"node_id": node_id,
"checked": checked,
"ok": len(errors) == 0,
"errors": errors,
}
def save_invoice(self, invoice: Invoice) -> bool:
"""伝票を保存"""
try:
node_id = getattr(invoice, "node_id", None) or self.get_node_id()
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# メインデータ挿入
cursor.execute('''
INSERT INTO invoices
(uuid, document_type, customer_id, customer_name, customer_address,
customer_phone, amount, tax, total_amount, date,
invoice_number, notes, file_path, node_id,
payload_json, payload_hash, prev_chain_hash, chain_hash,
pdf_template_version, company_info_version,
is_offset, offset_target_uuid,
pdf_generated_at, pdf_sha256, submitted_to_tax_authority)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
invoice.uuid,
invoice.document_type.value,
getattr(invoice.customer, "id", None),
invoice.customer.formal_name,
invoice.customer.address,
invoice.customer.phone,
0, # amount - 計算値を保存しない
0, # tax - 計算値を保存しない
0, # total_amount - 計算値を保存しない
invoice.date.isoformat(),
invoice.invoice_number,
invoice.notes,
invoice.file_path,
node_id,
getattr(invoice, "payload_json", None),
getattr(invoice, "payload_hash", None),
getattr(invoice, "prev_chain_hash", None),
getattr(invoice, "chain_hash", None),
getattr(invoice, "pdf_template_version", None),
getattr(invoice, "company_info_version", None),
1 if getattr(invoice, "is_offset", False) else 0,
getattr(invoice, "offset_target_uuid", None),
getattr(invoice, "pdf_generated_at", None),
getattr(invoice, "pdf_sha256", None),
0 # submitted_to_tax_authority = false by default
))
invoice_id = cursor.lastrowid
# 明細データ挿入
for item in invoice.items:
cursor.execute('''
INSERT INTO invoice_items
(invoice_id, description, quantity, unit_price, is_discount)
VALUES (?, ?, ?, ?, ?)
''', (
invoice_id,
item.description,
item.quantity,
item.unit_price,
item.is_discount
))
conn.commit()
return True
except Exception as e:
logging.error(f"伝票保存エラー: {e}")
return False
def update_invoice(self, invoice: Invoice) -> bool:
"""伝票を更新"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# 伝票基本情報を更新
cursor.execute('''
UPDATE invoices
SET document_type = ?, customer_id = ?, date = ?,
invoice_number = ?, notes = ?, pdf_generated_at = ?
WHERE uuid = ?
''', (
invoice.document_type.value,
invoice.customer.id if hasattr(invoice.customer, 'id') else None,
invoice.date.isoformat(),
invoice.invoice_number,
invoice.notes,
getattr(invoice, 'pdf_generated_at', None),
invoice.uuid
))
# UUIDからinvoice_idを取得
cursor.execute('SELECT id FROM invoices WHERE uuid = ?', (invoice.uuid,))
result = cursor.fetchone()
if not result:
logging.error(f"伝票ID取得失敗: {invoice.uuid}")
return False
invoice_id = result[0]
# 明細を一度削除して再挿入
cursor.execute('DELETE FROM invoice_items WHERE invoice_id = ?', (invoice_id,))
for item in invoice.items:
cursor.execute('''
INSERT INTO invoice_items (invoice_id, description, quantity, unit_price, is_discount)
VALUES (?, ?, ?, ?, ?)
''', (
invoice_id, # 正しいinvoice_idを使用
item.description,
item.quantity,
item.unit_price,
item.is_discount
))
conn.commit()
return True
except Exception as e:
logging.error(f"伝票更新エラー: {e}")
return False
def delete_invoice_by_uuid(self, invoice_uuid: str) -> bool:
"""UUIDで伝票を削除"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM invoice_items WHERE invoice_id = (SELECT id FROM invoices WHERE uuid = ?)', (invoice_uuid,))
cursor.execute('DELETE FROM invoices WHERE uuid = ?', (invoice_uuid,))
conn.commit()
return True
except Exception as e:
logging.error(f"伝票削除エラー: {e}")
return False
def update_invoice_file_path(self, invoice_uuid: str, file_path: str, pdf_generated_at: str):
"""PDFファイルパスを更新"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
UPDATE invoices
SET file_path = ?, pdf_generated_at = ?
WHERE uuid = ?
''', (file_path, pdf_generated_at, invoice_uuid))
conn.commit()
except Exception as e:
logging.error(f"PDFパス更新エラー: {e}")
def get_all_invoices(self, limit: int = 100) -> List[Invoice]:
"""全伝票を取得"""
return self.search_invoices(limit=limit)
def search_invoices(
self,
query: str = "",
date_from: Optional[str] = None,
date_to: Optional[str] = None,
sort_by: str = "date",
sort_desc: bool = True,
limit: int = 50,
offset: int = 0,
include_offsets: bool = False,
) -> List[Invoice]:
"""Explorer向けに条件検索で伝票を取得。"""
invoices: List[Invoice] = []
orderable_columns = {
"date": "date",
"invoice_number": "invoice_number",
"customer_name": "customer_name",
"document_type": "document_type",
"updated_at": "updated_at",
}
order_column = orderable_columns.get(sort_by, "date")
order_direction = "DESC" if sort_desc else "ASC"
where_clauses = ["1=1"]
params: List[Any] = []
if not include_offsets:
where_clauses.append("(is_offset IS NULL OR is_offset = 0)")
q = (query or "").strip()
if q:
like = f"%{q}%"
where_clauses.append(
"("
"invoice_number LIKE ? OR "
"customer_name LIKE ? OR "
"document_type LIKE ? OR "
"notes LIKE ?"
")"
)
params.extend([like, like, like, like])
if date_from:
where_clauses.append("date >= ?")
params.append(date_from)
if date_to:
where_clauses.append("date <= ?")
params.append(date_to)
sql = f"""
SELECT * FROM invoices
WHERE {' AND '.join(where_clauses)}
ORDER BY {order_column} {order_direction}, id DESC
LIMIT ? OFFSET ?
"""
params.extend([int(limit), int(offset)])
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(sql, tuple(params))
rows = cursor.fetchall()
for row in rows:
invoice = self._row_to_invoice(row, cursor)
if invoice:
invoices.append(invoice)
except Exception as e:
logging.error(f"伝票検索エラー: {e}")
return invoices
def get_invoice_by_uuid(self, invoice_uuid: str) -> Optional[Invoice]:
"""UUIDで伝票を取得PDF再生成等で使用"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM invoices WHERE uuid = ? LIMIT 1",
(invoice_uuid,),
)
row = cursor.fetchone()
if not row:
return None
return self._row_to_invoice(row, cursor)
except Exception as e:
logging.error(f"UUID取得エラー: {e}")
return None
def _row_to_invoice(self, row: tuple, cursor: sqlite3.Cursor) -> Optional[Invoice]:
"""DB行をInvoiceオブジェクトに変換"""
try:
invoice_id = row[0]
logging.info(f"変換開始: invoice_id={invoice_id}, row_length={len(row)}")
# 明細取得
cursor.execute('''
SELECT description, quantity, unit_price, is_discount
FROM invoice_items
WHERE invoice_id = ?
''', (invoice_id,))
item_rows = cursor.fetchall()
items = [
InvoiceItem(
description=ir[0],
quantity=ir[1],
unit_price=ir[2],
is_discount=bool(ir[3])
) for ir in item_rows
]
# 顧客情報
customer = Customer(
id=row[3] or 0, # customer_idフィールド
name=row[4], # customer_nameフィールド
formal_name=row[4], # customer_nameフィールド
address=row[5] or "", # customer_addressフィールド
phone=row[6] or "" # customer_phoneフィールド
)
# 伝票タイプ
doc_type = DocumentType.SALES
for dt in DocumentType:
if dt.value == row[2]:
doc_type = dt
break
# 日付変換
date_str = row[10] # 正しいdateフィールドのインデックス
logging.info(f"日付変換: {date_str}")
date_obj = datetime.fromisoformat(date_str)
inv = Invoice(
customer=customer,
date=date_obj,
items=items,
file_path=row[13], # 正しいfile_pathフィールドのインデックス
invoice_number=row[11] or "", # 正しいinvoice_numberフィールドのインデックス
notes=row[12], # 正しいnotesフィールドのインデックス
document_type=doc_type,
uuid=row[1],
)
# 監査用フィールド(存在していれば付与)
try:
inv.node_id = row[14]
inv.payload_json = row[15]
inv.payload_hash = row[16]
inv.prev_chain_hash = row[17]
inv.chain_hash = row[18]
inv.pdf_template_version = row[19]
inv.company_info_version = row[20]
inv.is_offset = bool(row[21])
inv.offset_target_uuid = row[22]
inv.pdf_generated_at = row[23]
inv.pdf_sha256 = row[24]
inv.submitted_to_tax_authority = bool(row[27])
except Exception:
pass
logging.info(f"変換成功: {inv.invoice_number}")
return inv
except Exception as e:
logging.error(f"変換エラー: {e}")
return None
def delete_invoice(self, invoice_id: int) -> bool:
"""伝票を削除"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# 明細を先に削除
cursor.execute('DELETE FROM invoice_items WHERE invoice_id = ?', (invoice_id,))
# 伝票を削除
cursor.execute('DELETE FROM invoices WHERE id = ?', (invoice_id,))
conn.commit()
return True
except Exception as e:
logging.error(f"削除エラー: {e}")
return False
def get_statistics(self) -> Dict[str, Any]:
"""統計情報取得"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# 総数
cursor.execute('SELECT COUNT(*) FROM invoices')
total_count = cursor.fetchone()[0]
# 総売上
cursor.execute('SELECT SUM(total_amount) FROM invoices')
total_amount = cursor.fetchone()[0] or 0
# 今月の売上
today = datetime.now()
first_day = today.replace(day=1).isoformat()
cursor.execute('''
SELECT SUM(total_amount) FROM invoices
WHERE date >= ?
''', (first_day,))
monthly_amount = cursor.fetchone()[0] or 0
return {
'total_count': total_count,
'total_amount': total_amount,
'monthly_amount': monthly_amount
}
except Exception as e:
logging.error(f"統計エラー: {e}")
return {'total_count': 0, 'total_amount': 0, 'monthly_amount': 0}
class CustomerRepository:
"""顧客リポジトリ"""
def __init__(self, db_path: str = "sales.db"):
self.db_path = db_path
self._init_tables()
def _init_tables(self):
"""テーブル初期化"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS customers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
formal_name TEXT,
address TEXT,
phone TEXT,
email TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
def save_customer(self, customer: Customer) -> bool:
"""顧客を保存(新規登録・更新共通)"""
# IDが0の場合は新規登録、それ以外は更新
if customer.id == 0:
return self._insert_customer(customer)
else:
return self.update_customer(customer)
def _insert_customer(self, customer: Customer) -> bool:
"""顧客を新規登録"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO customers
(name, formal_name, address, phone, email)
VALUES (?, ?, ?, ?, ?)
""", (
customer.name,
customer.formal_name,
customer.address,
customer.phone,
customer.email
))
# 挿入したIDを取得してcustomerオブジェクトを更新
customer.id = cursor.lastrowid
conn.commit()
return True
except Exception as e:
logging.error(f"顧客登録エラー: {e}")
return False
def update_customer(self, customer: Customer) -> bool:
"""顧客情報を更新"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE customers
SET name = ?, formal_name = ?, address = ?, phone = ?
WHERE id = ?
""", (
customer.name,
customer.formal_name,
customer.address,
customer.phone,
customer.id
))
conn.commit()
return True
except Exception as e:
logging.error(f"顧客更新エラー: {e}")
return False
def get_all_products(self) -> List[Product]:
"""全商品を取得"""
products = []
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM products ORDER BY name')
rows = cursor.fetchall()
for row in rows:
products.append(Product(
id=row[0],
name=row[1],
unit_price=row[2],
description=row[3]
))
return products
except Exception as e:
logging.error(f"商品取得エラー: {e}")
return []
def save_product(self, product: Product) -> bool:
"""商品を保存(新規・更新)"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
if product.id:
# 更新
cursor.execute("""
UPDATE products
SET name = ?, unit_price = ?, description = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (
product.name,
product.unit_price,
product.description,
product.id
))
else:
# 新規
cursor.execute("""
INSERT INTO products (name, unit_price, description)
VALUES (?, ?, ?)
""", (
product.name,
product.unit_price,
product.description
))
conn.commit()
return True
except Exception as e:
logging.error(f"商品保存エラー: {e}")
return False
def get_all_customers(self) -> List[Customer]:
"""全顧客を取得"""
customers = []
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM customers ORDER BY formal_name')
rows = cursor.fetchall()
for row in rows:
customers.append(Customer(
id=row[0],
name=row[1],
formal_name=row[2],
address=row[3] or "",
phone=row[4] or ""
))
except Exception as e:
logging.error(f"顧客取得エラー: {e}")
return customers
# 使用例
if __name__ == "__main__":
from models.invoice_models import create_sample_invoices
# リポジトリ初期化
invoice_repo = InvoiceRepository()
# サンプルデータ保存
sample_invoices = create_sample_invoices()
for invoice in sample_invoices:
success = invoice_repo.save_invoice(invoice)
print(f"保存: {invoice.customer.formal_name} - {'成功' if success else '失敗'}")
# 統計取得
stats = invoice_repo.get_statistics()
print(f"\n統計: {stats}")
# 全伝票取得
all_invoices = invoice_repo.get_all_invoices()
print(f"\n全伝票数: {len(all_invoices)}")