780 lines
30 KiB
Python
780 lines
30 KiB
Python
"""
|
||
データリポジトリ層
|
||
SQLiteデータベース操作を抽象化
|
||
"""
|
||
|
||
import sys
|
||
import os
|
||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
|
||
import sqlite3
|
||
import json
|
||
from datetime import datetime
|
||
from typing import List, Optional, Dict, Any
|
||
from models.invoice_models import Invoice, Customer, InvoiceItem, DocumentType, Product
|
||
import logging
|
||
import uuid as uuid_module
|
||
import hashlib
|
||
|
||
logging.basicConfig(level=logging.INFO)
|
||
|
||
class InvoiceRepository:
|
||
"""伝票データリポジトリ"""
|
||
|
||
def __init__(self, db_path: str = "sales.db"):
|
||
self.db_path = db_path
|
||
self._init_tables()
|
||
|
||
def _init_tables(self):
|
||
"""テーブル初期化"""
|
||
with sqlite3.connect(self.db_path) as conn:
|
||
cursor = conn.cursor()
|
||
|
||
# メタ情報(node_id等)
|
||
cursor.execute(
|
||
'''
|
||
CREATE TABLE IF NOT EXISTS meta (
|
||
key TEXT PRIMARY KEY,
|
||
value TEXT
|
||
)
|
||
'''
|
||
)
|
||
|
||
# 伝票テーブル
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS invoices (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
uuid TEXT UNIQUE,
|
||
document_type TEXT,
|
||
customer_id INTEGER,
|
||
customer_name TEXT,
|
||
customer_address TEXT,
|
||
customer_phone TEXT,
|
||
amount REAL,
|
||
tax REAL,
|
||
total_amount REAL,
|
||
date TEXT,
|
||
invoice_number TEXT,
|
||
notes TEXT,
|
||
file_path TEXT,
|
||
node_id TEXT,
|
||
payload_json TEXT,
|
||
payload_hash TEXT,
|
||
prev_chain_hash TEXT,
|
||
chain_hash TEXT,
|
||
pdf_template_version TEXT,
|
||
company_info_version TEXT,
|
||
is_offset INTEGER DEFAULT 0,
|
||
offset_target_uuid TEXT,
|
||
pdf_generated_at TEXT,
|
||
pdf_sha256 TEXT,
|
||
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
||
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
''')
|
||
|
||
# 明細テーブル
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS invoice_items (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
invoice_id INTEGER,
|
||
description TEXT,
|
||
quantity INTEGER,
|
||
unit_price INTEGER,
|
||
is_discount BOOLEAN,
|
||
product_id INTEGER,
|
||
FOREIGN KEY (invoice_id) REFERENCES invoices(id)
|
||
)
|
||
''')
|
||
|
||
# 商品マスタテーブル
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS products (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
name TEXT NOT NULL,
|
||
unit_price INTEGER NOT NULL,
|
||
description TEXT,
|
||
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
||
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
''')
|
||
|
||
conn.commit()
|
||
|
||
# 既存DB向けの軽量マイグレーション(列追加)
|
||
self._migrate_schema()
|
||
|
||
def _migrate_schema(self):
|
||
"""既存DBのスキーマを新仕様に追従(ALTER TABLEで列追加)"""
|
||
with sqlite3.connect(self.db_path) as conn:
|
||
cursor = conn.cursor()
|
||
|
||
# meta は _init_tables で作成済みだが念のため
|
||
cursor.execute(
|
||
'''
|
||
CREATE TABLE IF NOT EXISTS meta (
|
||
key TEXT PRIMARY KEY,
|
||
value TEXT
|
||
)
|
||
'''
|
||
)
|
||
|
||
def ensure_column(table: str, col: str, col_def: str):
|
||
cursor.execute(f"PRAGMA table_info({table})")
|
||
cols = {r[1] for r in cursor.fetchall()}
|
||
if col not in cols:
|
||
cursor.execute(f"ALTER TABLE {table} ADD COLUMN {col_def}")
|
||
|
||
ensure_column("invoices", "customer_id", "customer_id INTEGER")
|
||
ensure_column("invoices", "node_id", "node_id TEXT")
|
||
ensure_column("invoices", "payload_json", "payload_json TEXT")
|
||
ensure_column("invoices", "payload_hash", "payload_hash TEXT")
|
||
ensure_column("invoices", "prev_chain_hash", "prev_chain_hash TEXT")
|
||
ensure_column("invoices", "chain_hash", "chain_hash TEXT")
|
||
ensure_column("invoices", "pdf_template_version", "pdf_template_version TEXT")
|
||
ensure_column("invoices", "company_info_version", "company_info_version TEXT")
|
||
ensure_column("invoices", "is_offset", "is_offset INTEGER DEFAULT 0")
|
||
ensure_column("invoices", "offset_target_uuid", "offset_target_uuid TEXT")
|
||
ensure_column("invoices", "pdf_generated_at", "pdf_generated_at TEXT")
|
||
ensure_column("invoices", "pdf_sha256", "pdf_sha256 TEXT")
|
||
ensure_column("invoices", "submitted_to_tax_authority", "submitted_to_tax_authority INTEGER DEFAULT 0")
|
||
|
||
conn.commit()
|
||
|
||
def get_node_id(self) -> str:
|
||
"""DB単位のnode_id(UUID)を取得(未設定なら生成して保存)"""
|
||
with sqlite3.connect(self.db_path) as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT value FROM meta WHERE key = ?", ("node_id",))
|
||
row = cursor.fetchone()
|
||
if row and row[0]:
|
||
return row[0]
|
||
|
||
node_id = str(uuid_module.uuid4())
|
||
cursor.execute(
|
||
"INSERT OR REPLACE INTO meta (key, value) VALUES (?, ?)",
|
||
("node_id", node_id),
|
||
)
|
||
conn.commit()
|
||
return node_id
|
||
|
||
def get_last_chain_hash(self, node_id: str) -> str:
|
||
"""直前のchain_hashを取得(無ければgenesis=all-zero)"""
|
||
genesis = "0" * 64
|
||
try:
|
||
with sqlite3.connect(self.db_path) as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute(
|
||
"SELECT chain_hash FROM invoices WHERE node_id = ? AND chain_hash IS NOT NULL ORDER BY id DESC LIMIT 1",
|
||
(node_id,),
|
||
)
|
||
row = cursor.fetchone()
|
||
return row[0] if row and row[0] else genesis
|
||
except Exception:
|
||
return genesis
|
||
|
||
def verify_chain(self, node_id: Optional[str] = None, limit: Optional[int] = None) -> Dict[str, Any]:
|
||
"""DB内の連鎖ハッシュ整合性を検証(監査用)
|
||
|
||
Returns:
|
||
{
|
||
'node_id': str,
|
||
'checked': int,
|
||
'ok': bool,
|
||
'errors': [ { ... } ]
|
||
}
|
||
"""
|
||
if not node_id:
|
||
node_id = self.get_node_id()
|
||
|
||
genesis = "0" * 64
|
||
errors: List[Dict[str, Any]] = []
|
||
checked = 0
|
||
|
||
try:
|
||
with sqlite3.connect(self.db_path) as conn:
|
||
cursor = conn.cursor()
|
||
|
||
sql = "SELECT id, uuid, node_id, payload_json, payload_hash, prev_chain_hash, chain_hash FROM invoices WHERE node_id = ? ORDER BY id ASC"
|
||
params = [node_id]
|
||
if limit is not None:
|
||
sql += " LIMIT ?"
|
||
params.append(int(limit))
|
||
|
||
cursor.execute(sql, tuple(params))
|
||
rows = cursor.fetchall()
|
||
|
||
expected_prev = genesis
|
||
for (inv_id, inv_uuid, inv_node_id, payload_json, payload_hash, prev_chain_hash, chain_hash) in rows:
|
||
checked += 1
|
||
|
||
if not payload_json:
|
||
errors.append(
|
||
{
|
||
"invoice_id": inv_id,
|
||
"uuid": inv_uuid,
|
||
"type": "missing_payload_json",
|
||
}
|
||
)
|
||
# 検証不能なのでチェーンはそこで停止扱い
|
||
break
|
||
|
||
expected_payload_hash = hashlib.sha256(payload_json.encode("utf-8")).hexdigest()
|
||
if payload_hash != expected_payload_hash:
|
||
errors.append(
|
||
{
|
||
"invoice_id": inv_id,
|
||
"uuid": inv_uuid,
|
||
"type": "payload_hash_mismatch",
|
||
"db": payload_hash,
|
||
"expected": expected_payload_hash,
|
||
}
|
||
)
|
||
break
|
||
|
||
if (prev_chain_hash or genesis) != expected_prev:
|
||
errors.append(
|
||
{
|
||
"invoice_id": inv_id,
|
||
"uuid": inv_uuid,
|
||
"type": "prev_chain_hash_mismatch",
|
||
"db": prev_chain_hash,
|
||
"expected": expected_prev,
|
||
}
|
||
)
|
||
break
|
||
|
||
expected_chain_hash = hashlib.sha256(f"{expected_prev}:{expected_payload_hash}".encode("utf-8")).hexdigest()
|
||
if chain_hash != expected_chain_hash:
|
||
errors.append(
|
||
{
|
||
"invoice_id": inv_id,
|
||
"uuid": inv_uuid,
|
||
"type": "chain_hash_mismatch",
|
||
"db": chain_hash,
|
||
"expected": expected_chain_hash,
|
||
}
|
||
)
|
||
break
|
||
|
||
expected_prev = expected_chain_hash
|
||
|
||
except Exception as e:
|
||
errors.append({"type": "exception", "message": str(e)})
|
||
|
||
return {
|
||
"node_id": node_id,
|
||
"checked": checked,
|
||
"ok": len(errors) == 0,
|
||
"errors": errors,
|
||
}
|
||
|
||
def save_invoice(self, invoice: Invoice) -> bool:
|
||
"""伝票を保存"""
|
||
try:
|
||
node_id = getattr(invoice, "node_id", None) or self.get_node_id()
|
||
with sqlite3.connect(self.db_path) as conn:
|
||
cursor = conn.cursor()
|
||
|
||
# メインデータ挿入
|
||
cursor.execute('''
|
||
INSERT INTO invoices
|
||
(uuid, document_type, customer_id, customer_name, customer_address,
|
||
customer_phone, amount, tax, total_amount, date,
|
||
invoice_number, notes, file_path, node_id,
|
||
payload_json, payload_hash, prev_chain_hash, chain_hash,
|
||
pdf_template_version, company_info_version,
|
||
is_offset, offset_target_uuid,
|
||
pdf_generated_at, pdf_sha256, submitted_to_tax_authority)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
''', (
|
||
invoice.uuid,
|
||
invoice.document_type.value,
|
||
getattr(invoice.customer, "id", None),
|
||
invoice.customer.formal_name,
|
||
invoice.customer.address,
|
||
invoice.customer.phone,
|
||
0, # amount - 計算値を保存しない
|
||
0, # tax - 計算値を保存しない
|
||
0, # total_amount - 計算値を保存しない
|
||
invoice.date.isoformat(),
|
||
invoice.invoice_number,
|
||
invoice.notes,
|
||
invoice.file_path,
|
||
node_id,
|
||
getattr(invoice, "payload_json", None),
|
||
getattr(invoice, "payload_hash", None),
|
||
getattr(invoice, "prev_chain_hash", None),
|
||
getattr(invoice, "chain_hash", None),
|
||
getattr(invoice, "pdf_template_version", None),
|
||
getattr(invoice, "company_info_version", None),
|
||
1 if getattr(invoice, "is_offset", False) else 0,
|
||
getattr(invoice, "offset_target_uuid", None),
|
||
getattr(invoice, "pdf_generated_at", None),
|
||
getattr(invoice, "pdf_sha256", None),
|
||
0 # submitted_to_tax_authority = false by default
|
||
))
|
||
|
||
invoice_id = cursor.lastrowid
|
||
|
||
# 明細データ挿入
|
||
for item in invoice.items:
|
||
cursor.execute('''
|
||
INSERT INTO invoice_items
|
||
(invoice_id, description, quantity, unit_price, is_discount)
|
||
VALUES (?, ?, ?, ?, ?)
|
||
''', (
|
||
invoice_id,
|
||
item.description,
|
||
item.quantity,
|
||
item.unit_price,
|
||
item.is_discount
|
||
))
|
||
|
||
conn.commit()
|
||
return True
|
||
|
||
except Exception as e:
|
||
logging.error(f"伝票保存エラー: {e}")
|
||
return False
|
||
|
||
def update_invoice(self, invoice: Invoice) -> bool:
|
||
"""伝票を更新"""
|
||
try:
|
||
with sqlite3.connect(self.db_path) as conn:
|
||
cursor = conn.cursor()
|
||
|
||
# 伝票基本情報を更新
|
||
cursor.execute('''
|
||
UPDATE invoices
|
||
SET document_type = ?, customer_id = ?, date = ?,
|
||
invoice_number = ?, notes = ?, pdf_generated_at = ?
|
||
WHERE uuid = ?
|
||
''', (
|
||
invoice.document_type.value,
|
||
invoice.customer.id if hasattr(invoice.customer, 'id') else None,
|
||
invoice.date.isoformat(),
|
||
invoice.invoice_number,
|
||
invoice.notes,
|
||
getattr(invoice, 'pdf_generated_at', None),
|
||
invoice.uuid
|
||
))
|
||
|
||
# UUIDからinvoice_idを取得
|
||
cursor.execute('SELECT id FROM invoices WHERE uuid = ?', (invoice.uuid,))
|
||
result = cursor.fetchone()
|
||
if not result:
|
||
logging.error(f"伝票ID取得失敗: {invoice.uuid}")
|
||
return False
|
||
invoice_id = result[0]
|
||
|
||
# 明細を一度削除して再挿入
|
||
cursor.execute('DELETE FROM invoice_items WHERE invoice_id = ?', (invoice_id,))
|
||
|
||
for item in invoice.items:
|
||
cursor.execute('''
|
||
INSERT INTO invoice_items (invoice_id, description, quantity, unit_price, is_discount)
|
||
VALUES (?, ?, ?, ?, ?)
|
||
''', (
|
||
invoice_id, # 正しいinvoice_idを使用
|
||
item.description,
|
||
item.quantity,
|
||
item.unit_price,
|
||
item.is_discount
|
||
))
|
||
|
||
conn.commit()
|
||
return True
|
||
|
||
except Exception as e:
|
||
logging.error(f"伝票更新エラー: {e}")
|
||
return False
|
||
|
||
def delete_invoice_by_uuid(self, invoice_uuid: str) -> bool:
|
||
"""UUIDで伝票を削除"""
|
||
try:
|
||
with sqlite3.connect(self.db_path) as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute('DELETE FROM invoice_items WHERE invoice_id = (SELECT id FROM invoices WHERE uuid = ?)', (invoice_uuid,))
|
||
cursor.execute('DELETE FROM invoices WHERE uuid = ?', (invoice_uuid,))
|
||
conn.commit()
|
||
return True
|
||
except Exception as e:
|
||
logging.error(f"伝票削除エラー: {e}")
|
||
return False
|
||
|
||
def update_invoice_file_path(self, invoice_uuid: str, file_path: str, pdf_generated_at: str):
|
||
"""PDFファイルパスを更新"""
|
||
try:
|
||
with sqlite3.connect(self.db_path) as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute('''
|
||
UPDATE invoices
|
||
SET file_path = ?, pdf_generated_at = ?
|
||
WHERE uuid = ?
|
||
''', (file_path, pdf_generated_at, invoice_uuid))
|
||
conn.commit()
|
||
except Exception as e:
|
||
logging.error(f"PDFパス更新エラー: {e}")
|
||
|
||
def get_all_invoices(self, limit: int = 100) -> List[Invoice]:
|
||
"""全伝票を取得"""
|
||
invoices = []
|
||
|
||
try:
|
||
with sqlite3.connect(self.db_path) as conn:
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute('''
|
||
SELECT * FROM invoices
|
||
ORDER BY date DESC
|
||
LIMIT ?
|
||
''', (limit,))
|
||
|
||
rows = cursor.fetchall()
|
||
|
||
for row in rows:
|
||
invoice = self._row_to_invoice(row, cursor)
|
||
if invoice:
|
||
invoices.append(invoice)
|
||
|
||
except Exception as e:
|
||
logging.error(f"伝票取得エラー: {e}")
|
||
|
||
return invoices
|
||
|
||
def get_invoice_by_uuid(self, invoice_uuid: str) -> Optional[Invoice]:
|
||
"""UUIDで伝票を取得(PDF再生成等で使用)"""
|
||
try:
|
||
with sqlite3.connect(self.db_path) as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute(
|
||
"SELECT * FROM invoices WHERE uuid = ? LIMIT 1",
|
||
(invoice_uuid,),
|
||
)
|
||
row = cursor.fetchone()
|
||
if not row:
|
||
return None
|
||
return self._row_to_invoice(row, cursor)
|
||
except Exception as e:
|
||
logging.error(f"UUID取得エラー: {e}")
|
||
return None
|
||
|
||
def _row_to_invoice(self, row: tuple, cursor: sqlite3.Cursor) -> Optional[Invoice]:
|
||
"""DB行をInvoiceオブジェクトに変換"""
|
||
try:
|
||
invoice_id = row[0]
|
||
logging.info(f"変換開始: invoice_id={invoice_id}, row_length={len(row)}")
|
||
|
||
# 明細取得
|
||
cursor.execute('''
|
||
SELECT description, quantity, unit_price, is_discount
|
||
FROM invoice_items
|
||
WHERE invoice_id = ?
|
||
''', (invoice_id,))
|
||
|
||
item_rows = cursor.fetchall()
|
||
items = [
|
||
InvoiceItem(
|
||
description=ir[0],
|
||
quantity=ir[1],
|
||
unit_price=ir[2],
|
||
is_discount=bool(ir[3])
|
||
) for ir in item_rows
|
||
]
|
||
|
||
# 顧客情報
|
||
customer = Customer(
|
||
id=row[3] or 0, # customer_idフィールド
|
||
name=row[4], # customer_nameフィールド
|
||
formal_name=row[4], # customer_nameフィールド
|
||
address=row[5] or "", # customer_addressフィールド
|
||
phone=row[6] or "" # customer_phoneフィールド
|
||
)
|
||
|
||
# 伝票タイプ
|
||
doc_type = DocumentType.SALES
|
||
for dt in DocumentType:
|
||
if dt.value == row[2]:
|
||
doc_type = dt
|
||
break
|
||
|
||
# 日付変換
|
||
date_str = row[10] # 正しいdateフィールドのインデックス
|
||
logging.info(f"日付変換: {date_str}")
|
||
date_obj = datetime.fromisoformat(date_str)
|
||
|
||
inv = Invoice(
|
||
customer=customer,
|
||
date=date_obj,
|
||
items=items,
|
||
file_path=row[13], # 正しいfile_pathフィールドのインデックス
|
||
invoice_number=row[11] or "", # 正しいinvoice_numberフィールドのインデックス
|
||
notes=row[12], # 正しいnotesフィールドのインデックス
|
||
document_type=doc_type,
|
||
uuid=row[1],
|
||
)
|
||
|
||
# 監査用フィールド(存在していれば付与)
|
||
try:
|
||
inv.node_id = row[14]
|
||
inv.payload_json = row[15]
|
||
inv.payload_hash = row[16]
|
||
inv.prev_chain_hash = row[17]
|
||
inv.chain_hash = row[18]
|
||
inv.pdf_template_version = row[19]
|
||
inv.company_info_version = row[20]
|
||
inv.is_offset = bool(row[21])
|
||
inv.offset_target_uuid = row[22]
|
||
inv.pdf_generated_at = row[23]
|
||
inv.pdf_sha256 = row[24]
|
||
inv.submitted_to_tax_authority = bool(row[27])
|
||
except Exception:
|
||
pass
|
||
|
||
logging.info(f"変換成功: {inv.invoice_number}")
|
||
return inv
|
||
|
||
except Exception as e:
|
||
logging.error(f"変換エラー: {e}")
|
||
return None
|
||
|
||
def delete_invoice(self, invoice_id: int) -> bool:
|
||
"""伝票を削除"""
|
||
try:
|
||
with sqlite3.connect(self.db_path) as conn:
|
||
cursor = conn.cursor()
|
||
|
||
# 明細を先に削除
|
||
cursor.execute('DELETE FROM invoice_items WHERE invoice_id = ?', (invoice_id,))
|
||
|
||
# 伝票を削除
|
||
cursor.execute('DELETE FROM invoices WHERE id = ?', (invoice_id,))
|
||
|
||
conn.commit()
|
||
return True
|
||
|
||
except Exception as e:
|
||
logging.error(f"削除エラー: {e}")
|
||
return False
|
||
|
||
def get_statistics(self) -> Dict[str, Any]:
|
||
"""統計情報取得"""
|
||
try:
|
||
with sqlite3.connect(self.db_path) as conn:
|
||
cursor = conn.cursor()
|
||
|
||
# 総数
|
||
cursor.execute('SELECT COUNT(*) FROM invoices')
|
||
total_count = cursor.fetchone()[0]
|
||
|
||
# 総売上
|
||
cursor.execute('SELECT SUM(total_amount) FROM invoices')
|
||
total_amount = cursor.fetchone()[0] or 0
|
||
|
||
# 今月の売上
|
||
today = datetime.now()
|
||
first_day = today.replace(day=1).isoformat()
|
||
cursor.execute('''
|
||
SELECT SUM(total_amount) FROM invoices
|
||
WHERE date >= ?
|
||
''', (first_day,))
|
||
monthly_amount = cursor.fetchone()[0] or 0
|
||
|
||
return {
|
||
'total_count': total_count,
|
||
'total_amount': total_amount,
|
||
'monthly_amount': monthly_amount
|
||
}
|
||
|
||
except Exception as e:
|
||
logging.error(f"統計エラー: {e}")
|
||
return {'total_count': 0, 'total_amount': 0, 'monthly_amount': 0}
|
||
|
||
|
||
class CustomerRepository:
|
||
"""顧客リポジトリ"""
|
||
|
||
def __init__(self, db_path: str = "sales.db"):
|
||
self.db_path = db_path
|
||
self._init_tables()
|
||
|
||
def _init_tables(self):
|
||
"""テーブル初期化"""
|
||
with sqlite3.connect(self.db_path) as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS customers (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
name TEXT,
|
||
formal_name TEXT,
|
||
address TEXT,
|
||
phone TEXT,
|
||
email TEXT,
|
||
created_at TEXT DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
''')
|
||
conn.commit()
|
||
|
||
def save_customer(self, customer: Customer) -> bool:
|
||
"""顧客を保存(新規登録・更新共通)"""
|
||
# IDが0の場合は新規登録、それ以外は更新
|
||
if customer.id == 0:
|
||
return self._insert_customer(customer)
|
||
else:
|
||
return self.update_customer(customer)
|
||
|
||
def _insert_customer(self, customer: Customer) -> bool:
|
||
"""顧客を新規登録"""
|
||
try:
|
||
with sqlite3.connect(self.db_path) as conn:
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute("""
|
||
INSERT INTO customers
|
||
(name, formal_name, address, phone, email)
|
||
VALUES (?, ?, ?, ?, ?)
|
||
""", (
|
||
customer.name,
|
||
customer.formal_name,
|
||
customer.address,
|
||
customer.phone,
|
||
customer.email
|
||
))
|
||
|
||
# 挿入したIDを取得してcustomerオブジェクトを更新
|
||
customer.id = cursor.lastrowid
|
||
conn.commit()
|
||
return True
|
||
|
||
except Exception as e:
|
||
logging.error(f"顧客登録エラー: {e}")
|
||
return False
|
||
|
||
def update_customer(self, customer: Customer) -> bool:
|
||
"""顧客情報を更新"""
|
||
try:
|
||
with sqlite3.connect(self.db_path) as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
UPDATE customers
|
||
SET name = ?, formal_name = ?, address = ?, phone = ?
|
||
WHERE id = ?
|
||
""", (
|
||
customer.name,
|
||
customer.formal_name,
|
||
customer.address,
|
||
customer.phone,
|
||
customer.id
|
||
))
|
||
conn.commit()
|
||
return True
|
||
except Exception as e:
|
||
logging.error(f"顧客更新エラー: {e}")
|
||
return False
|
||
|
||
def get_all_products(self) -> List[Product]:
|
||
"""全商品を取得"""
|
||
products = []
|
||
|
||
try:
|
||
with sqlite3.connect(self.db_path) as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute('SELECT * FROM products ORDER BY name')
|
||
|
||
rows = cursor.fetchall()
|
||
for row in rows:
|
||
products.append(Product(
|
||
id=row[0],
|
||
name=row[1],
|
||
unit_price=row[2],
|
||
description=row[3]
|
||
))
|
||
|
||
return products
|
||
|
||
except Exception as e:
|
||
logging.error(f"商品取得エラー: {e}")
|
||
return []
|
||
|
||
def save_product(self, product: Product) -> bool:
|
||
"""商品を保存(新規・更新)"""
|
||
try:
|
||
with sqlite3.connect(self.db_path) as conn:
|
||
cursor = conn.cursor()
|
||
|
||
if product.id:
|
||
# 更新
|
||
cursor.execute("""
|
||
UPDATE products
|
||
SET name = ?, unit_price = ?, description = ?, updated_at = CURRENT_TIMESTAMP
|
||
WHERE id = ?
|
||
""", (
|
||
product.name,
|
||
product.unit_price,
|
||
product.description,
|
||
product.id
|
||
))
|
||
else:
|
||
# 新規
|
||
cursor.execute("""
|
||
INSERT INTO products (name, unit_price, description)
|
||
VALUES (?, ?, ?)
|
||
""", (
|
||
product.name,
|
||
product.unit_price,
|
||
product.description
|
||
))
|
||
|
||
conn.commit()
|
||
return True
|
||
|
||
except Exception as e:
|
||
logging.error(f"商品保存エラー: {e}")
|
||
return False
|
||
|
||
def get_all_customers(self) -> List[Customer]:
|
||
"""全顧客を取得"""
|
||
customers = []
|
||
|
||
try:
|
||
with sqlite3.connect(self.db_path) as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute('SELECT * FROM customers ORDER BY formal_name')
|
||
|
||
rows = cursor.fetchall()
|
||
for row in rows:
|
||
customers.append(Customer(
|
||
id=row[0],
|
||
name=row[1],
|
||
formal_name=row[2],
|
||
address=row[3] or "",
|
||
phone=row[4] or ""
|
||
))
|
||
|
||
except Exception as e:
|
||
logging.error(f"顧客取得エラー: {e}")
|
||
|
||
return customers
|
||
|
||
|
||
# 使用例
|
||
if __name__ == "__main__":
|
||
from models.invoice_models import create_sample_invoices
|
||
|
||
# リポジトリ初期化
|
||
invoice_repo = InvoiceRepository()
|
||
|
||
# サンプルデータ保存
|
||
sample_invoices = create_sample_invoices()
|
||
for invoice in sample_invoices:
|
||
success = invoice_repo.save_invoice(invoice)
|
||
print(f"保存: {invoice.customer.formal_name} - {'成功' if success else '失敗'}")
|
||
|
||
# 統計取得
|
||
stats = invoice_repo.get_statistics()
|
||
print(f"\n統計: {stats}")
|
||
|
||
# 全伝票取得
|
||
all_invoices = invoice_repo.get_all_invoices()
|
||
print(f"\n全伝票数: {len(all_invoices)}")
|