""" データリポジトリ層 SQLiteデータベース操作を抽象化 """ import sys import os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import sqlite3 import json from datetime import datetime from typing import List, Optional, Dict, Any from models.invoice_models import Invoice, Customer, InvoiceItem, DocumentType, Product import logging import uuid as uuid_module import hashlib logging.basicConfig(level=logging.INFO) class InvoiceRepository: """伝票データリポジトリ""" def __init__(self, db_path: str = "sales.db"): self.db_path = db_path self._init_tables() def _init_tables(self): """テーブル初期化""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # メタ情報(node_id等) cursor.execute( ''' CREATE TABLE IF NOT EXISTS meta ( key TEXT PRIMARY KEY, value TEXT ) ''' ) # 伝票テーブル cursor.execute(''' CREATE TABLE IF NOT EXISTS invoices ( id INTEGER PRIMARY KEY AUTOINCREMENT, uuid TEXT UNIQUE, document_type TEXT, customer_id INTEGER, customer_name TEXT, customer_address TEXT, customer_phone TEXT, amount REAL, tax REAL, total_amount REAL, date TEXT, invoice_number TEXT, notes TEXT, file_path TEXT, node_id TEXT, payload_json TEXT, payload_hash TEXT, prev_chain_hash TEXT, chain_hash TEXT, pdf_template_version TEXT, company_info_version TEXT, is_offset INTEGER DEFAULT 0, offset_target_uuid TEXT, pdf_generated_at TEXT, pdf_sha256 TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP ) ''') # 明細テーブル cursor.execute(''' CREATE TABLE IF NOT EXISTS invoice_items ( id INTEGER PRIMARY KEY AUTOINCREMENT, invoice_id INTEGER, description TEXT, quantity INTEGER, unit_price INTEGER, is_discount BOOLEAN, product_id INTEGER, FOREIGN KEY (invoice_id) REFERENCES invoices(id) ) ''') # 商品マスタテーブル cursor.execute(''' CREATE TABLE IF NOT EXISTS products ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, unit_price INTEGER NOT NULL, description TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP ) ''') conn.commit() # 既存DB向けの軽量マイグレーション(列追加) self._migrate_schema() def _migrate_schema(self): """既存DBのスキーマを新仕様に追従(ALTER TABLEで列追加)""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # meta は _init_tables で作成済みだが念のため cursor.execute( ''' CREATE TABLE IF NOT EXISTS meta ( key TEXT PRIMARY KEY, value TEXT ) ''' ) def ensure_column(table: str, col: str, col_def: str): cursor.execute(f"PRAGMA table_info({table})") cols = {r[1] for r in cursor.fetchall()} if col not in cols: cursor.execute(f"ALTER TABLE {table} ADD COLUMN {col_def}") ensure_column("invoices", "customer_id", "customer_id INTEGER") ensure_column("invoices", "node_id", "node_id TEXT") ensure_column("invoices", "payload_json", "payload_json TEXT") ensure_column("invoices", "payload_hash", "payload_hash TEXT") ensure_column("invoices", "prev_chain_hash", "prev_chain_hash TEXT") ensure_column("invoices", "chain_hash", "chain_hash TEXT") ensure_column("invoices", "pdf_template_version", "pdf_template_version TEXT") ensure_column("invoices", "company_info_version", "company_info_version TEXT") ensure_column("invoices", "is_offset", "is_offset INTEGER DEFAULT 0") ensure_column("invoices", "offset_target_uuid", "offset_target_uuid TEXT") ensure_column("invoices", "pdf_generated_at", "pdf_generated_at TEXT") ensure_column("invoices", "pdf_sha256", "pdf_sha256 TEXT") ensure_column("invoices", "submitted_to_tax_authority", "submitted_to_tax_authority INTEGER DEFAULT 0") ensure_column("invoices", "is_draft", "is_draft INTEGER DEFAULT 0") # Explorer向けインデックス(大量データ閲覧時の検索性能を確保) cursor.execute("CREATE INDEX IF NOT EXISTS idx_invoices_date ON invoices(date)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_invoices_customer_name ON invoices(customer_name)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_invoices_document_type ON invoices(document_type)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_invoices_updated_at ON invoices(updated_at)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_invoices_is_offset ON invoices(is_offset)") conn.commit() def get_node_id(self) -> str: """DB単位のnode_id(UUID)を取得(未設定なら生成して保存)""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT value FROM meta WHERE key = ?", ("node_id",)) row = cursor.fetchone() if row and row[0]: return row[0] node_id = str(uuid_module.uuid4()) cursor.execute( "INSERT OR REPLACE INTO meta (key, value) VALUES (?, ?)", ("node_id", node_id), ) conn.commit() return node_id def get_last_chain_hash(self, node_id: str) -> str: """直前のchain_hashを取得(無ければgenesis=all-zero)""" genesis = "0" * 64 try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( "SELECT chain_hash FROM invoices WHERE node_id = ? AND chain_hash IS NOT NULL ORDER BY id DESC LIMIT 1", (node_id,), ) row = cursor.fetchone() return row[0] if row and row[0] else genesis except Exception: return genesis def verify_chain(self, node_id: Optional[str] = None, limit: Optional[int] = None) -> Dict[str, Any]: """DB内の連鎖ハッシュ整合性を検証(監査用) Returns: { 'node_id': str, 'checked': int, 'ok': bool, 'errors': [ { ... } ] } """ if not node_id: node_id = self.get_node_id() genesis = "0" * 64 errors: List[Dict[str, Any]] = [] checked = 0 try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() sql = "SELECT id, uuid, node_id, payload_json, payload_hash, prev_chain_hash, chain_hash FROM invoices WHERE node_id = ? ORDER BY id ASC" params = [node_id] if limit is not None: sql += " LIMIT ?" params.append(int(limit)) cursor.execute(sql, tuple(params)) rows = cursor.fetchall() expected_prev = genesis for (inv_id, inv_uuid, inv_node_id, payload_json, payload_hash, prev_chain_hash, chain_hash) in rows: checked += 1 if not payload_json: errors.append( { "invoice_id": inv_id, "uuid": inv_uuid, "type": "missing_payload_json", } ) # 検証不能なのでチェーンはそこで停止扱い break expected_payload_hash = hashlib.sha256(payload_json.encode("utf-8")).hexdigest() if payload_hash != expected_payload_hash: errors.append( { "invoice_id": inv_id, "uuid": inv_uuid, "type": "payload_hash_mismatch", "db": payload_hash, "expected": expected_payload_hash, } ) break if (prev_chain_hash or genesis) != expected_prev: errors.append( { "invoice_id": inv_id, "uuid": inv_uuid, "type": "prev_chain_hash_mismatch", "db": prev_chain_hash, "expected": expected_prev, } ) break expected_chain_hash = hashlib.sha256(f"{expected_prev}:{expected_payload_hash}".encode("utf-8")).hexdigest() if chain_hash != expected_chain_hash: errors.append( { "invoice_id": inv_id, "uuid": inv_uuid, "type": "chain_hash_mismatch", "db": chain_hash, "expected": expected_chain_hash, } ) break expected_prev = expected_chain_hash except Exception as e: errors.append({"type": "exception", "message": str(e)}) return { "node_id": node_id, "checked": checked, "ok": len(errors) == 0, "errors": errors, } def save_invoice(self, invoice: Invoice) -> bool: """伝票を保存""" try: node_id = getattr(invoice, "node_id", None) or self.get_node_id() with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # メインデータ挿入 cursor.execute(''' INSERT INTO invoices (uuid, document_type, customer_id, customer_name, customer_address, customer_phone, amount, tax, total_amount, date, invoice_number, notes, file_path, node_id, payload_json, payload_hash, prev_chain_hash, chain_hash, pdf_template_version, company_info_version, is_offset, offset_target_uuid, pdf_generated_at, pdf_sha256, submitted_to_tax_authority, is_draft) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( invoice.uuid, invoice.document_type.value, getattr(invoice.customer, "id", None), invoice.customer.formal_name, invoice.customer.address, invoice.customer.phone, 0, # amount - 計算値を保存しない 0, # tax - 計算値を保存しない 0, # total_amount - 計算値を保存しない invoice.date.isoformat(), invoice.invoice_number, invoice.notes, invoice.file_path, node_id, getattr(invoice, "payload_json", None), getattr(invoice, "payload_hash", None), getattr(invoice, "prev_chain_hash", None), getattr(invoice, "chain_hash", None), getattr(invoice, "pdf_template_version", None), getattr(invoice, "company_info_version", None), 1 if getattr(invoice, "is_offset", False) else 0, getattr(invoice, "offset_target_uuid", None), getattr(invoice, "pdf_generated_at", None), getattr(invoice, "pdf_sha256", None), 0, # submitted_to_tax_authority = false by default 1 if getattr(invoice, "is_draft", False) else 0 )) invoice_id = cursor.lastrowid # 明細データ挿入 for item in invoice.items: cursor.execute(''' INSERT INTO invoice_items (invoice_id, description, quantity, unit_price, is_discount) VALUES (?, ?, ?, ?, ?) ''', ( invoice_id, item.description, item.quantity, item.unit_price, item.is_discount )) conn.commit() return True except Exception as e: logging.error(f"伝票保存エラー: {e}") return False def update_invoice(self, invoice: Invoice) -> bool: """伝票を更新""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # 伝票基本情報を更新 cursor.execute(''' UPDATE invoices SET document_type = ?, customer_id = ?, customer_name = ?, customer_address = ?, customer_phone = ?, date = ?, invoice_number = ?, notes = ?, pdf_generated_at = ?, is_draft = ?, updated_at = CURRENT_TIMESTAMP WHERE uuid = ? ''', ( invoice.document_type.value, invoice.customer.id if hasattr(invoice.customer, 'id') else None, getattr(invoice.customer, 'formal_name', None), getattr(invoice.customer, 'address', None), getattr(invoice.customer, 'phone', None), invoice.date.isoformat(), invoice.invoice_number, invoice.notes, getattr(invoice, 'pdf_generated_at', None), 1 if getattr(invoice, 'is_draft', False) else 0, invoice.uuid )) # UUIDからinvoice_idを取得 cursor.execute('SELECT id FROM invoices WHERE uuid = ?', (invoice.uuid,)) result = cursor.fetchone() if not result: logging.warning(f"伝票ID取得失敗: {invoice.uuid} -> 新規保存にフォールバックします") conn.rollback() return self.save_invoice(invoice) invoice_id = result[0] # 明細を一度削除して再挿入 cursor.execute('DELETE FROM invoice_items WHERE invoice_id = ?', (invoice_id,)) for item in invoice.items: cursor.execute(''' INSERT INTO invoice_items (invoice_id, description, quantity, unit_price, is_discount) VALUES (?, ?, ?, ?, ?) ''', ( invoice_id, # 正しいinvoice_idを使用 item.description, item.quantity, item.unit_price, item.is_discount )) conn.commit() return True except Exception as e: logging.error(f"伝票更新エラー: {e}") return False def delete_invoice_by_uuid(self, invoice_uuid: str) -> bool: """UUIDで伝票を削除""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute('DELETE FROM invoice_items WHERE invoice_id = (SELECT id FROM invoices WHERE uuid = ?)', (invoice_uuid,)) cursor.execute('DELETE FROM invoices WHERE uuid = ?', (invoice_uuid,)) conn.commit() return True except Exception as e: logging.error(f"伝票削除エラー: {e}") return False def update_invoice_file_path(self, invoice_uuid: str, file_path: str, pdf_generated_at: str): """PDFファイルパスを更新""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' UPDATE invoices SET file_path = ?, pdf_generated_at = ? WHERE uuid = ? ''', (file_path, pdf_generated_at, invoice_uuid)) conn.commit() except Exception as e: logging.error(f"PDFパス更新エラー: {e}") def get_all_invoices(self, limit: int = 100) -> List[Invoice]: """全伝票を取得""" return self.search_invoices(limit=limit) def search_invoices( self, query: str = "", date_from: Optional[str] = None, date_to: Optional[str] = None, sort_by: str = "date", sort_desc: bool = True, limit: int = 50, offset: int = 0, include_offsets: bool = False, ) -> List[Invoice]: """Explorer向けに条件検索で伝票を取得。""" invoices: List[Invoice] = [] orderable_columns = { "date": "date", "invoice_number": "invoice_number", "customer_name": "customer_name", "document_type": "document_type", "updated_at": "updated_at", } order_column = orderable_columns.get(sort_by, "date") order_direction = "DESC" if sort_desc else "ASC" where_clauses = ["1=1"] params: List[Any] = [] if not include_offsets: where_clauses.append("(is_offset IS NULL OR is_offset = 0)") q = (query or "").strip() if q: like = f"%{q}%" where_clauses.append( "(" "invoice_number LIKE ? OR " "customer_name LIKE ? OR " "document_type LIKE ? OR " "notes LIKE ?" ")" ) params.extend([like, like, like, like]) if date_from: where_clauses.append("date >= ?") params.append(date_from) if date_to: where_clauses.append("date <= ?") params.append(date_to) sql = f""" SELECT * FROM invoices WHERE {' AND '.join(where_clauses)} ORDER BY {order_column} {order_direction}, id DESC LIMIT ? OFFSET ? """ params.extend([int(limit), int(offset)]) try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(sql, tuple(params)) rows = cursor.fetchall() for row in rows: invoice = self._row_to_invoice(row, cursor) if invoice: invoices.append(invoice) except Exception as e: logging.error(f"伝票検索エラー: {e}") return invoices def get_invoice_by_uuid(self, invoice_uuid: str) -> Optional[Invoice]: """UUIDで伝票を取得(PDF再生成等で使用)""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( "SELECT * FROM invoices WHERE uuid = ? LIMIT 1", (invoice_uuid,), ) row = cursor.fetchone() if not row: return None return self._row_to_invoice(row, cursor) except Exception as e: logging.error(f"UUID取得エラー: {e}") return None def _row_to_invoice(self, row: tuple, cursor: sqlite3.Cursor) -> Optional[Invoice]: """DB行をInvoiceオブジェクトに変換""" try: invoice_id = row[0] col_map = {desc[0]: idx for idx, desc in enumerate(cursor.description or [])} def val(name, default=None): idx = col_map.get(name) if idx is None or idx >= len(row): return default return row[idx] # 基本フィールド doc_type_val = val("document_type", DocumentType.INVOICE.value) doc_type = DocumentType(doc_type_val) if doc_type_val in DocumentType._value2member_map_ else DocumentType.INVOICE date_str = val("date") or datetime.now().isoformat() date_obj = datetime.fromisoformat(date_str) customer_name = val("customer_name", "") customer = Customer( id=val("customer_id", 0) or 0, name=customer_name or "", formal_name=customer_name or "", address=val("customer_address", "") or "", phone=val("customer_phone", "") or "", ) is_draft = bool(val("is_draft", 0)) # 明細取得は別カーソルで行い、元カーソルのdescriptionを汚染しない item_cursor = cursor.connection.cursor() item_cursor.execute( 'SELECT description, quantity, unit_price, is_discount, product_id FROM invoice_items WHERE invoice_id = ?', (invoice_id,), ) item_rows = item_cursor.fetchall() items = [ InvoiceItem( description=ir[0], quantity=ir[1], unit_price=ir[2], is_discount=bool(ir[3]), ) for ir in item_rows ] inv = Invoice( customer=customer, date=date_obj, items=items, file_path=val("file_path"), invoice_number=val("invoice_number", ""), notes=val("notes", ""), document_type=doc_type, uuid=val("uuid"), is_draft=is_draft, ) # 監査・チェーン関連のフィールドは動的属性として保持 inv.node_id = val("node_id") inv.payload_json = val("payload_json") inv.payload_hash = val("payload_hash") inv.prev_chain_hash = val("prev_chain_hash") inv.chain_hash = val("chain_hash") inv.pdf_template_version = val("pdf_template_version") inv.company_info_version = val("company_info_version") inv.is_offset = bool(val("is_offset", 0)) inv.offset_target_uuid = val("offset_target_uuid") inv.pdf_generated_at = val("pdf_generated_at") inv.pdf_sha256 = val("pdf_sha256") inv.submitted_to_tax_authority = bool(val("submitted_to_tax_authority", 0)) return inv except Exception as e: logging.error(f"変換エラー: {e}") return None def delete_invoice(self, invoice_id: int) -> bool: """伝票を削除""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # 明細を先に削除 cursor.execute('DELETE FROM invoice_items WHERE invoice_id = ?', (invoice_id,)) # 伝票を削除 cursor.execute('DELETE FROM invoices WHERE id = ?', (invoice_id,)) conn.commit() return True except Exception as e: logging.error(f"削除エラー: {e}") return False def get_statistics(self) -> Dict[str, Any]: """統計情報取得""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # 総数 cursor.execute('SELECT COUNT(*) FROM invoices') total_count = cursor.fetchone()[0] # 総売上 cursor.execute('SELECT SUM(total_amount) FROM invoices') total_amount = cursor.fetchone()[0] or 0 # 今月の売上 today = datetime.now() first_day = today.replace(day=1).isoformat() cursor.execute(''' SELECT SUM(total_amount) FROM invoices WHERE date >= ? ''', (first_day,)) monthly_amount = cursor.fetchone()[0] or 0 return { 'total_count': total_count, 'total_amount': total_amount, 'monthly_amount': monthly_amount } except Exception as e: logging.error(f"統計エラー: {e}") return {'total_count': 0, 'total_amount': 0, 'monthly_amount': 0} class CustomerRepository: """顧客リポジトリ""" def __init__(self, db_path: str = "sales.db"): self.db_path = db_path self._init_tables() def _init_tables(self): """テーブル初期化""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS customers ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, formal_name TEXT, address TEXT, phone TEXT, email TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP ) ''') conn.commit() def save_customer(self, customer: Customer) -> bool: """顧客を保存(新規登録・更新共通)""" # IDが0の場合は新規登録、それ以外は更新 if customer.id == 0: return self._insert_customer(customer) else: return self.update_customer(customer) def _insert_customer(self, customer: Customer) -> bool: """顧客を新規登録""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO customers (name, formal_name, address, phone, email) VALUES (?, ?, ?, ?, ?) """, ( customer.name, customer.formal_name, customer.address, customer.phone, customer.email )) # 挿入したIDを取得してcustomerオブジェクトを更新 customer.id = cursor.lastrowid conn.commit() return True except Exception as e: logging.error(f"顧客登録エラー: {e}") return False def update_customer(self, customer: Customer) -> bool: """顧客情報を更新""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" UPDATE customers SET name = ?, formal_name = ?, address = ?, phone = ? WHERE id = ? """, ( customer.name, customer.formal_name, customer.address, customer.phone, customer.id )) conn.commit() return True except Exception as e: logging.error(f"顧客更新エラー: {e}") return False def get_all_products(self) -> List[Product]: """全商品を取得""" products = [] try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute('SELECT * FROM products ORDER BY name') rows = cursor.fetchall() for row in rows: products.append(Product( id=row[0], name=row[1], unit_price=row[2], description=row[3] )) return products except Exception as e: logging.error(f"商品取得エラー: {e}") return [] def save_product(self, product: Product) -> bool: """商品を保存(新規・更新)""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() if product.id: # 更新 cursor.execute(""" UPDATE products SET name = ?, unit_price = ?, description = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, ( product.name, product.unit_price, product.description, product.id )) else: # 新規 cursor.execute(""" INSERT INTO products (name, unit_price, description) VALUES (?, ?, ?) """, ( product.name, product.unit_price, product.description )) conn.commit() return True except Exception as e: logging.error(f"商品保存エラー: {e}") return False def get_all_customers(self) -> List[Customer]: """全顧客を取得""" customers = [] try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute('SELECT * FROM customers ORDER BY formal_name') rows = cursor.fetchall() for row in rows: customers.append(Customer( id=row[0], name=row[1], formal_name=row[2], address=row[3] or "", phone=row[4] or "" )) except Exception as e: logging.error(f"顧客取得エラー: {e}") return customers # 使用例 if __name__ == "__main__": from models.invoice_models import create_sample_invoices # リポジトリ初期化 invoice_repo = InvoiceRepository() # サンプルデータ保存 sample_invoices = create_sample_invoices() for invoice in sample_invoices: success = invoice_repo.save_invoice(invoice) print(f"保存: {invoice.customer.formal_name} - {'成功' if success else '失敗'}") # 統計取得 stats = invoice_repo.get_statistics() print(f"\n統計: {stats}") # 全伝票取得 all_invoices = invoice_repo.get_all_invoices() print(f"\n全伝票数: {len(all_invoices)}")