資料庫
Peewee 的 Database
物件代表與資料庫的連線。Database
類別會以開啟資料庫連線所需的所有資訊進行實例化,然後可用於:
開啟和關閉連線。
執行查詢。
管理交易(和儲存點)。
檢視表格、欄、索引和約束。
Peewee 支援 SQLite、MySQL、MariaDB 和 Postgres。每個資料庫類別都提供一些基本的、特定於資料庫的設定選項。
from peewee import *
# SQLite database using WAL journal mode and 64MB cache.
sqlite_db = SqliteDatabase('/path/to/app.db', pragmas={
'journal_mode': 'wal',
'cache_size': -1024 * 64})
# Connect to a MySQL database on network.
mysql_db = MySQLDatabase('my_app', user='app', password='db_password',
host='10.1.0.8', port=3306)
# Connect to a Postgres database.
pg_db = PostgresqlDatabase('my_app', user='postgres', password='secret',
host='10.1.0.9', port=5432)
Peewee 透過特定於資料庫的擴充模組,為 SQLite、Postgres 和 CockroachDB 提供進階支援。若要使用擴充功能,請匯入適當的特定於資料庫的模組,並使用提供的資料庫類別。
from playhouse.sqlite_ext import SqliteExtDatabase
# Use SQLite (will register a REGEXP function and set busy timeout to 3s).
db = SqliteExtDatabase('/path/to/app.db', regexp_function=True, timeout=3,
pragmas={'journal_mode': 'wal'})
from playhouse.postgres_ext import PostgresqlExtDatabase
# Use Postgres (and register hstore extension).
db = PostgresqlExtDatabase('my_app', user='postgres', register_hstore=True)
from playhouse.cockroachdb import CockroachDatabase
# Use CockroachDB.
db = CockroachDatabase('my_app', user='root', port=26257, host='10.1.0.8')
# CockroachDB connections may require a number of parameters, which can
# alternatively be specified using a connection-string.
db = CockroachDatabase('postgresql://...')
有關資料庫擴充功能的詳細資訊,請參閱:
Sqlcipher 後端(加密的 SQLite 資料庫)。
初始化資料庫
Database
初始化方法預期將資料庫名稱作為第一個參數。建立連線時,後續的關鍵字引數會傳遞至底層資料庫驅動程式,讓您可以輕鬆地傳遞供應商特定的參數。
例如,使用 Postgresql 時,通常需要指定 host
、user
和 password
來建立連線。這些不是標準的 Peewee Database
參數,因此建立連線時,會直接傳回至 psycopg2
。
db = PostgresqlDatabase(
'database_name', # Required by Peewee.
user='postgres', # Will be passed directly to psycopg2.
password='secret', # Ditto.
host='db.mysite.com') # Ditto.
另一個範例是,pymysql
驅動程式接受 charset
參數,這不是標準的 Peewee Database
參數。若要設定此值,只需將 charset
與其他值一起傳入。
db = MySQLDatabase('database_name', user='www-data', charset='utf8mb4')
請查閱資料庫驅動程式的文件,以瞭解可用的參數。
Postgres:psycopg2
MySQL:pymysql
MySQL:mysqlclient
SQLite:sqlite3
CockroachDB:請參閱 psycopg2
使用 Postgresql
若要連線到 Postgresql 資料庫,我們將使用 PostgresqlDatabase
。第一個參數永遠是資料庫的名稱,之後您可以指定任意的 psycopg2 參數。
psql_db = PostgresqlDatabase('my_database', user='postgres')
class BaseModel(Model):
"""A base model that will use our Postgresql database"""
class Meta:
database = psql_db
class User(BaseModel):
username = CharField()
Playhouse,Peewee 的擴充功能 包含一個 Postgresql 擴充模組,提供許多 postgres 特定的功能,例如:
如果您想使用這些出色的功能,請使用 playhouse.postgres_ext
模組中的 PostgresqlExtDatabase
from playhouse.postgres_ext import PostgresqlExtDatabase
psql_db = PostgresqlExtDatabase('my_database', user='postgres')
隔離等級
從 Peewee 3.9.7 開始,可以使用 psycopg2.extensions
中的符號常數,將隔離等級指定為初始化參數。
from psycopg2.extensions import ISOLATION_LEVEL_SERIALIZABLE
db = PostgresqlDatabase('my_app', user='postgres', host='db-host',
isolation_level=ISOLATION_LEVEL_SERIALIZABLE)
注意
在舊版本中,您可以手動設定底層 psycopg2 連線上的隔離等級。可以一次性完成此操作。
db = PostgresqlDatabase(...)
conn = db.connection() # returns current connection.
from psycopg2.extensions import ISOLATION_LEVEL_SERIALIZABLE
conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)
若要讓每次建立連線時都執行此操作,請子類別化並實作 _initialize_database()
掛勾,這是為此目的而設計的。
class SerializedPostgresqlDatabase(PostgresqlDatabase):
def _initialize_connection(self, conn):
conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)
使用 CockroachDB
使用 playhouse.cockroachdb
中定義的 CockroachDatabase
資料庫類別連線到 CockroachDB (CRDB)。
from playhouse.cockroachdb import CockroachDatabase
db = CockroachDatabase('my_app', user='root', port=26257, host='localhost')
如果您使用的是 Cockroach Cloud,您可能會發現使用連線字串指定連線參數會更容易。
db = CockroachDatabase('postgresql://root:secret@host:26257/defaultdb...')
注意
CockroachDB 需要 psycopg2
(postgres) Python 驅動程式。
注意
CockroachDB 安裝和入門指南可以在這裡找到:https://www.cockroachlabs.com/docs/stable/install-cockroachdb.html
CRDB 提供用戶端交易重試,這些重試可使用特殊的 CockroachDatabase.run_transaction()
協助程式方法。此方法接受一個可呼叫物件,負責執行任何可能需要重試的交易陳述式。
run_transaction()
最簡單的範例:
def create_user(email):
# Callable that accepts a single argument (the database instance) and
# which is responsible for executing the transactional SQL.
def callback(db_ref):
return User.create(email=email)
return db.run_transaction(callback, max_attempts=10)
huey = create_user('huey@example.com')
注意
如果經過指定次數的嘗試後,交易仍無法提交,則會引發 cockroachdb.ExceededMaxAttempts
例外狀況。如果 SQL 格式不正確、違反限制等,則函式會將例外狀況引發給呼叫者。
如需詳細資訊,請參閱:
使用 SQLite
若要連線到 SQLite 資料庫,我們將使用 SqliteDatabase
。第一個參數是包含資料庫的檔案名稱,或是字串 ':memory:'
,用於建立記憶體內的資料庫。在資料庫檔案名稱之後,您可以指定 pragma 的清單,或是任何其他任意的 sqlite3 參數。
sqlite_db = SqliteDatabase('my_app.db', pragmas={'journal_mode': 'wal'})
class BaseModel(Model):
"""A base model that will use our Sqlite database."""
class Meta:
database = sqlite_db
class User(BaseModel):
username = TextField()
# etc, etc
Peewee 包含一個 SQLite 擴充模組,提供許多 SQLite 特定的功能,例如 全文搜尋、json 擴充功能支援 以及更多功能。如果您想使用這些出色的功能,請使用 playhouse.sqlite_ext
模組中的 SqliteExtDatabase
from playhouse.sqlite_ext import SqliteExtDatabase
sqlite_db = SqliteExtDatabase('my_app.db', pragmas={
'journal_mode': 'wal', # WAL-mode.
'cache_size': -64 * 1000, # 64MB cache.
'synchronous': 0}) # Let the OS manage syncing.
PRAGMA 語句
SQLite 允許透過 PRAGMA
語句 (SQLite 文件) 執行階段設定多個參數。這些語句通常在建立新的資料庫連線時執行。若要對新連線執行一個或多個 PRAGMA
語句,您可以將其指定為字典或包含 pragma 名稱和值的 2 元組清單。
db = SqliteDatabase('my_app.db', pragmas={
'journal_mode': 'wal',
'cache_size': 10000, # 10000 pages, or ~40MB
'foreign_keys': 1, # Enforce foreign-key constraints
})
也可以使用 pragma()
方法或 SqliteDatabase
物件上公開的特殊屬性,動態設定 PRAGMA。
# Set cache size to 64MB for *current connection*.
db.pragma('cache_size', -1024 * 64)
# Same as above.
db.cache_size = -1024 * 64
# Read the value of several pragmas:
print('cache_size:', db.cache_size)
print('foreign_keys:', db.foreign_keys)
print('journal_mode:', db.journal_mode)
print('page_size:', db.page_size)
# Set foreign_keys pragma on current connection *AND* on all
# connections opened subsequently.
db.pragma('foreign_keys', 1, permanent=True)
注意
使用 pragma()
方法設定的 Pragma,預設情況下,連線關閉後不會保留。若要將 pragma 設定為每次開啟連線時執行,請指定 permanent=True
。
注意
完整的 PRAGMA 設定清單,以及其意義和可接受的值,可以在 SQLite 文件中找到:https://sqlite.dev.org.tw/pragma.html
建議設定
以下是我在典型的 Web 應用程式資料庫中與 SQLite 一起使用的設定。
pragma |
建議設定 |
說明 |
---|---|---|
journal_mode |
wal |
允許讀取器和寫入器共存 |
cache_size |
-1 * data_size_kb |
以 KiB 為單位設定頁面快取大小,例如 -32000 = 32MB |
foreign_keys |
1 |
強制執行外鍵約束 |
ignore_check_constraints |
0 |
強制執行 CHECK 約束 |
synchronous |
0 |
讓作業系統處理 fsync(請謹慎使用) |
使用上述選項的資料庫範例
db = SqliteDatabase('my_app.db', pragmas={
'journal_mode': 'wal',
'cache_size': -1 * 64000, # 64MB
'foreign_keys': 1,
'ignore_check_constraints': 0,
'synchronous': 0})
使用者自訂函式
SQLite 可以使用使用者自訂的 Python 程式碼進行擴充。 SqliteDatabase
類別支援三種使用者自訂的擴充類型
函式 - 接受任意數量的參數並返回單一值。
聚合 - 從多個列聚合參數並返回單一值。
校對 - 描述如何排序某些值。
注意
如需更多擴充支援,請參閱 SqliteExtDatabase
,它位於 playhouse.sqlite_ext
模組中。
使用者自訂函式範例
db = SqliteDatabase('analytics.db')
from urllib.parse import urlparse
@db.func('hostname')
def hostname(url):
if url is not None:
return urlparse(url).netloc
# Call this function in our code:
# The following finds the most common hostnames of referrers by count:
query = (PageView
.select(fn.hostname(PageView.referrer), fn.COUNT(PageView.id))
.group_by(fn.hostname(PageView.referrer))
.order_by(fn.COUNT(PageView.id).desc()))
使用者自訂聚合範例
from hashlib import md5
@db.aggregate('md5')
class MD5Checksum(object):
def __init__(self):
self.checksum = md5()
def step(self, value):
self.checksum.update(value.encode('utf-8'))
def finalize(self):
return self.checksum.hexdigest()
# Usage:
# The following computes an aggregate MD5 checksum for files broken
# up into chunks and stored in the database.
query = (FileChunk
.select(FileChunk.filename, fn.MD5(FileChunk.data))
.group_by(FileChunk.filename)
.order_by(FileChunk.filename, FileChunk.sequence))
校對範例
@db.collation('ireverse')
def collate_reverse(s1, s2):
# Case-insensitive reverse.
s1, s2 = s1.lower(), s2.lower()
return (s1 < s2) - (s1 > s2) # Equivalent to -cmp(s1, s2)
# To use this collation to sort books in reverse order...
Book.select().order_by(collate_reverse.collation(Book.title))
# Or...
Book.select().order_by(Book.title.asc(collation='reverse'))
使用者自訂表格值函式範例(有關詳細資訊,請參閱 TableFunction
和 table_function
)
from playhouse.sqlite_ext import TableFunction
db = SqliteDatabase('my_app.db')
@db.table_function('series')
class Series(TableFunction):
columns = ['value']
params = ['start', 'stop', 'step']
def initialize(self, start=0, stop=None, step=1):
"""
Table-functions declare an initialize() method, which is
called with whatever arguments the user has called the
function with.
"""
self.start = self.current = start
self.stop = stop or float('Inf')
self.step = step
def iterate(self, idx):
"""
Iterate is called repeatedly by the SQLite database engine
until the required number of rows has been read **or** the
function raises a `StopIteration` signalling no more rows
are available.
"""
if self.current > self.stop:
raise StopIteration
ret, self.current = self.current, self.current + self.step
return (ret,)
# Usage:
cursor = db.execute_sql('SELECT * FROM series(?, ?, ?)', (0, 5, 2))
for value, in cursor:
print(value)
# Prints:
# 0
# 2
# 4
如需詳細資訊,請參閱:
設定交易的鎖定模式
SQLite 交易可以使用三種不同的模式開啟
延遲(預設)- 僅在執行讀取或寫入時取得鎖定。第一次讀取會建立共用鎖定,而第一次寫入會建立保留鎖定。由於鎖定的取得會延遲到實際需要時,因此有可能在目前執行緒上執行 BEGIN 之後,另一個執行緒或程序可能會建立單獨的交易並寫入資料庫。
立即 - 會立即取得保留鎖定。在此模式下,沒有其他資料庫可以寫入資料庫或開啟立即或獨佔交易。但是,其他程序可以繼續從資料庫讀取。
獨佔 - 開啟獨佔鎖定,這會防止所有(除了讀取未提交之外)連線存取資料庫,直到交易完成為止。
指定鎖定模式的範例
db = SqliteDatabase('app.db')
with db.atomic('EXCLUSIVE'):
do_something()
@db.atomic('IMMEDIATE')
def some_other_function():
# This function is wrapped in an "IMMEDIATE" transaction.
do_something_else()
APSW,進階 SQLite 驅動程式
Peewee 還提供另一個 SQLite 資料庫,它使用 apsw,一個進階 sqlite 驅動程式。有關 APSW 的更多資訊,請參閱 APSW 專案網站。APSW 提供以下特殊功能:
虛擬表格、虛擬檔案系統、Blob I/O、備份和檔案控制。
連線可以在執行緒之間共用,而無需額外的鎖定。
交易由您的程式碼明確管理。
Unicode 的處理正確。
APSW 比標準程式庫 sqlite3 模組更快。
幾乎將整個 SQLite C API 公開給您的 Python 應用程式。
如果您想使用 APSW,請使用 apsw_ext 模組中的 APSWDatabase
from playhouse.apsw_ext import APSWDatabase
apsw_db = APSWDatabase('my_app.db')
使用 MariaDB
Peewee 支援 MariaDB。若要使用 MariaDB,請使用 MySQL 後端,兩者共用同一個後端。如需更多詳細資訊,請參閱 「使用 MySQL」。
使用 MySQL
若要連線到 MySQL 資料庫,我們將使用 MySQLDatabase
。在資料庫名稱之後,您可以指定將傳回給驅動程式的任意連線參數(例如 pymysql
或 mysqlclient
)。
mysql_db = MySQLDatabase('my_database')
class BaseModel(Model):
"""A base model that will use our MySQL database"""
class Meta:
database = mysql_db
class User(BaseModel):
username = CharField()
# etc, etc
驅動程式資訊
pymysql 是一個純 Python mysql 用戶端,適用於 Python 2 和 3。Peewee 會先嘗試使用 pymysql。
mysqlclient 使用 c 擴充功能並支援 Python 3。它會公開
MySQLdb
模組。如果未安裝 pymysql,Peewee 會嘗試使用此模組。mysql-python
也稱為 MySQLdb1,它已經是舊版,不應使用。由於這與 mysqlclient 共用相同的模組名稱,因此適用相同的規則。mysql-connector python 純 Python (我認為是??) 支援 Python 3。若要使用此驅動程式,您可以使用 MySQLConnectorDatabase,它來自
playhouse.mysql_ext
擴充功能。
錯誤 2006:MySQL 伺服器已斷線
當 MySQL 終止閒置資料庫連線時,可能會發生這個特定錯誤。這通常發生在未明確管理資料庫連線的 Web 應用程式中。發生的情況是您的應用程式啟動,開啟連線以處理執行的第一個查詢,而且由於該連線永遠不會關閉,因此它會保持開啟,等待更多查詢。
若要修正此問題,請確保在需要執行查詢時明確連線到資料庫,並在完成後關閉連線。在 Web 應用程式中,這通常表示您會在收到請求時開啟連線,並在傳回回應時關閉連線。
有關設定常用 Web 架構以管理資料庫連線的範例,請參閱 架構整合 一節。
使用資料庫 URL 連線
playhouse 模組 資料庫 URL 提供了一個輔助 connect()
函式,該函式接受資料庫 URL 並傳回 Database
實例。
程式碼範例
import os
from peewee import *
from playhouse.db_url import connect
# Connect to the database URL defined in the environment, falling
# back to a local Sqlite database if no database URL is specified.
db = connect(os.environ.get('DATABASE') or 'sqlite:///default.db')
class BaseModel(Model):
class Meta:
database = db
資料庫 URL 範例
sqlite:///my_database.db
將會為目前目錄中名為my_database.db
的檔案建立SqliteDatabase
實例。sqlite:///:memory:
將會建立一個記憶體中的SqliteDatabase
實例。postgresql://postgres:my_password@localhost:5432/my_database
將會建立一個PostgresqlDatabase
實例。已提供使用者名稱和密碼,以及要連線的主機和連接埠。mysql://user:passwd@ip:port/my_db
將會為本機 MySQL 資料庫 my_db 建立一個MySQLDatabase
實例。
執行階段資料庫設定
有時,資料庫連線設定在執行階段之前是未知的,這些值可能會從組態檔案或環境中載入。在這些情況下,您可以透過指定 None
作為 database_name 來延遲資料庫的初始化。
database = PostgresqlDatabase(None) # Un-initialized database.
class SomeModel(Model):
class Meta:
database = database
如果您嘗試在資料庫未初始化的情況下連線或發出任何查詢,將會收到例外狀況
>>> database.connect()
Exception: Error, database not properly initialized before opening connection
若要初始化資料庫,請使用資料庫名稱和任何其他關鍵字引數呼叫 init()
方法
database_name = input('What is the name of the db? ')
database.init(database_name, host='localhost', user='postgres')
若要對資料庫的初始化進行更多控制,請參閱下一節 動態定義資料庫。
動態定義資料庫
若要對資料庫的定義/初始化方式進行更多控制,您可以使用 DatabaseProxy
輔助程式。 DatabaseProxy
物件充當預留位置,然後在執行階段,您可以將它換成不同的物件。在下面的範例中,我們將根據應用程式的組態來換出資料庫
database_proxy = DatabaseProxy() # Create a proxy for our db.
class BaseModel(Model):
class Meta:
database = database_proxy # Use proxy for our DB.
class User(BaseModel):
username = CharField()
# Based on configuration, use a different database.
if app.config['DEBUG']:
database = SqliteDatabase('local.db')
elif app.config['TESTING']:
database = SqliteDatabase(':memory:')
else:
database = PostgresqlDatabase('mega_production_db')
# Configure our proxy to use the db we specified in config.
database_proxy.initialize(database)
警告
僅當您的實際資料庫驅動程式在執行階段有所不同時,才使用此方法。例如,如果您的測試和本機開發環境在 SQLite 上執行,但您的已部署應用程式使用 PostgreSQL,則您可以使用 DatabaseProxy
在執行階段換出引擎。
但是,如果只有連線值在執行階段有所不同,例如資料庫檔案的路徑或資料庫主機,則應改為使用 Database.init()
。如需更多詳細資訊,請參閱 執行階段資料庫設定。
注意
避免使用 DatabaseProxy
,而是使用 Database.bind()
和相關方法來設定或變更資料庫可能會更容易。如需詳細資訊,請參閱 在執行階段設定資料庫。
在執行階段設定資料庫
我們已經看到三種可以使用 Peewee 設定資料庫的方式
# The usual way:
db = SqliteDatabase('my_app.db', pragmas={'journal_mode': 'wal'})
# Specify the details at run-time:
db = SqliteDatabase(None)
...
db.init(db_filename, pragmas={'journal_mode': 'wal'})
# Or use a placeholder:
db = DatabaseProxy()
...
db.initialize(SqliteDatabase('my_app.db', pragmas={'journal_mode': 'wal'}))
Peewee 也可以為您的模型類別設定或變更資料庫。Peewee 測試套件使用此技術,在執行測試時將測試模型類別繫結到各種資料庫實例。
有兩組互補方法
Database.bind()
和Model.bind()
- 將一個或多個模型繫結到資料庫。Database.bind_ctx()
和Model.bind_ctx()
- 它們的功能與各自的bind()
對應方法相同,但會返回一個上下文管理器,在需要臨時變更資料庫時非常有用。
舉例來說,我們將宣告兩個**未**指定任何資料庫的模型
class User(Model):
username = TextField()
class Tweet(Model):
user = ForeignKeyField(User, backref='tweets')
content = TextField()
timestamp = TimestampField()
在執行時將模型繫結至資料庫
postgres_db = PostgresqlDatabase('my_app', user='postgres')
sqlite_db = SqliteDatabase('my_app.db')
# At this point, the User and Tweet models are NOT bound to any database.
# Let's bind them to the Postgres database:
postgres_db.bind([User, Tweet])
# Now we will temporarily bind them to the sqlite database:
with sqlite_db.bind_ctx([User, Tweet]):
# User and Tweet are now bound to the sqlite database.
assert User._meta.database is sqlite_db
# User and Tweet are once again bound to the Postgres database.
assert User._meta.database is postgres_db
Model.bind()
和 Model.bind_ctx()
方法對於繫結給定的模型類別具有相同的作用
# Bind the user model to the sqlite db. By default, Peewee will also
# bind any models that are related to User via foreign-key as well.
User.bind(sqlite_db)
assert User._meta.database is sqlite_db
assert Tweet._meta.database is sqlite_db # Related models bound too.
# Here we will temporarily bind *just* the User model to the postgres db.
with User.bind_ctx(postgres_db, bind_backrefs=False):
assert User._meta.database is postgres_db
assert Tweet._meta.database is sqlite_db # Has not changed.
# And now User is back to being bound to the sqlite_db.
assert User._meta.database is sqlite_db
本文檔的 測試 Peewee 應用程式 章節也包含使用 bind()
方法的一些範例。
執行緒安全性和多個資料庫
如果您計劃在多執行緒應用程式中於執行時變更資料庫,將模型的資料庫儲存在執行緒區域變數中將可避免競爭條件。這可以透過自訂模型 Metadata
類別來完成(請參閱 ThreadSafeDatabaseMetadata
,包含在 playhouse.shortcuts
中)
from peewee import *
from playhouse.shortcuts import ThreadSafeDatabaseMetadata
class BaseModel(Model):
class Meta:
# Instruct peewee to use our thread-safe metadata implementation.
model_metadata_class = ThreadSafeDatabaseMetadata
現在可以使用熟悉的 Database.bind()
或 Database.bind_ctx()
方法在多執行緒環境中安全地交換資料庫。
連線管理
若要開啟與資料庫的連線,請使用 Database.connect()
方法
>>> db = SqliteDatabase(':memory:') # In-memory SQLite database.
>>> db.connect()
True
如果我們嘗試在已開啟的資料庫上呼叫 connect()
,會得到一個 OperationalError
>>> db.connect()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/charles/pypath/peewee.py", line 2390, in connect
raise OperationalError('Connection already opened.')
peewee.OperationalError: Connection already opened.
為防止引發此例外狀況,我們可以使用額外的引數 reuse_if_open
呼叫 connect()
>>> db.close() # Close connection.
True
>>> db.connect()
True
>>> db.connect(reuse_if_open=True)
False
請注意,如果資料庫連線已開啟,則呼叫 connect()
會返回 False
。
若要關閉連線,請使用 Database.close()
方法
>>> db.close()
True
在已關閉的連線上呼叫 close()
不會導致例外狀況,但會返回 False
>>> db.connect() # Open connection.
True
>>> db.close() # Close connection.
True
>>> db.close() # Connection already closed, returns False.
False
您可以使用 Database.is_closed()
方法測試資料庫是否已關閉
>>> db.is_closed()
True
使用自動連線
如果資料庫是以 autoconnect=True
(預設值) 初始化,則在使用資料庫之前,不需要明確連線到資料庫。明確地管理連線被認為是**最佳實務**,因此您可以考慮停用 autoconnect
行為。
明確說明您的連線生命週期非常有幫助。例如,如果連線失敗,則會在開啟連線時捕獲例外狀況,而不是在稍後執行查詢時的任意時間捕獲。此外,如果使用 連線池,則必須呼叫 connect()
和 close()
,以確保正確回收連線。
為了獲得最佳的正確性保證,請停用 autoconnect
db = PostgresqlDatabase('my_app', user='postgres', autoconnect=False)
執行緒安全性
Peewee 使用執行緒區域儲存來追蹤連線狀態,使得 Peewee Database
物件可以安全地與多個執行緒一起使用。每個執行緒都有其自己的連線,因此任何給定的執行緒在給定時間都只會有一個開啟的連線。
上下文管理器
資料庫物件本身可以用作上下文管理器,它會在程式碼區塊的持續時間內開啟連線。此外,會在程式碼區塊的開頭開啟一個交易,並在關閉連線之前提交交易(除非發生錯誤,在這種情況下會回滾交易)。
>>> db.is_closed()
True
>>> with db:
... print(db.is_closed()) # db is open inside context manager.
...
False
>>> db.is_closed() # db is closed.
True
如果您想要個別管理交易,可以使用 Database.connection_context()
上下文管理器。
>>> with db.connection_context():
... # db connection is open.
... pass
...
>>> db.is_closed() # db connection is closed.
True
connection_context()
方法也可以用作裝飾器
@db.connection_context()
def prepare_database():
# DB connection will be managed by the decorator, which opens
# a connection, calls function, and closes upon returning.
db.create_tables(MODELS) # Create schema.
load_fixture_data(db)
DB-API 連線物件
若要取得對基礎 DB-API 2.0 連線的參考,請使用 Database.connection()
方法。此方法會傳回目前開啟的連線物件(如果有的話),否則會開啟新的連線。
>>> db.connection()
<sqlite3.Connection object at 0x7f94e9362f10>
連線池
連線池由 pool 模組 提供,該模組包含在 playhouse 擴充程式庫中。連線池支援
連線將被回收的逾時時間。
開啟連線數量的上限。
from playhouse.pool import PooledPostgresqlExtDatabase
db = PooledPostgresqlExtDatabase(
'my_database',
max_connections=8,
stale_timeout=300,
user='postgres')
class BaseModel(Model):
class Meta:
database = db
下列集區資料庫類別可用
測試 Peewee 應用程式
在為使用 Peewee 的應用程式撰寫測試時,可能希望使用專用的測試資料庫。另一個常見的做法是對乾淨的資料庫執行測試,這表示要確保每個測試開始時資料表都是空的。
若要將模型繫結至執行時的資料庫,您可以使用下列方法
Database.bind_ctx()
,它會傳回一個上下文管理器,該管理器會在包裝區塊的持續時間內將給定模型繫結至資料庫實例。Model.bind_ctx()
,同樣地,它會傳回一個上下文管理器,該管理器會在包裝區塊的持續時間內將模型(以及選擇性的其依賴關係)繫結至給定的資料庫。Database.bind()
,這是一次性操作,會將模型(以及選擇性的其依賴關係)繫結至給定的資料庫。Model.bind()
,這是一次性操作,會將模型(以及選擇性的其依賴關係)繫結至給定的資料庫。
根據您的使用案例,這些選項之一可能更有意義。在下面的範例中,我將使用 Model.bind()
。
測試案例設定範例
# tests.py
import unittest
from my_app.models import EventLog, Relationship, Tweet, User
MODELS = [User, Tweet, EventLog, Relationship]
# use an in-memory SQLite for tests.
test_db = SqliteDatabase(':memory:')
class BaseTestCase(unittest.TestCase):
def setUp(self):
# Bind model classes to test db. Since we have a complete list of
# all models, we do not need to recursively bind dependencies.
test_db.bind(MODELS, bind_refs=False, bind_backrefs=False)
test_db.connect()
test_db.create_tables(MODELS)
def tearDown(self):
# Not strictly necessary since SQLite in-memory databases only live
# for the duration of the connection, and in the next step we close
# the connection...but a good practice all the same.
test_db.drop_tables(MODELS)
# Close connection to db.
test_db.close()
# If we wanted, we could re-bind the models to their original
# database here. But for tests this is probably not necessary.
順帶一提,根據經驗,我建議使用與生產環境中相同的資料庫後端來測試您的應用程式,以避免任何潛在的相容性問題。
如果您想查看更多有關如何使用 Peewee 執行測試的範例,請查看 Peewee 自己的 測試套件。
搭配 Gevent 的非同步處理
建議使用 gevent 搭配 Postgresql 或 MySQL 執行非同步 I/O。我偏好 gevent 的原因
不需要專用的「循環感知」重新實作*所有內容*。使用 asyncio 的第三方程式庫通常必須重新實作多層程式碼以及重新實作協定本身。
Gevent 允許您以正常、乾淨且慣用的 Python 撰寫應用程式。不需要在每一行都塞滿「async」、「await」和其他雜訊。沒有回呼、未來、任務、承諾。沒有雜亂的東西。
Gevent 可用於 Python 2 *和* Python 3。
Gevent 是 *Pythonic* 的。Asyncio 是一個不符合 Python 慣例的陋習。
除了修補套接字之外,如果您將 **MySQL** 與純 Python 驅動程式(如 pymysql)一起使用,或是以純 Python 模式使用 mysql-connector,則不需要任何特殊步驟。以 C 撰寫的 MySQL 驅動程式將需要超出本文檔範圍的特殊組態。
對於 Postgres 和 psycopg2(一個 C 擴充套件),您可以使用以下程式碼片段來註冊事件掛鉤,使您的連線變成非同步。
from gevent.socket import wait_read, wait_write
from psycopg2 import extensions
# Call this function after monkey-patching socket (etc).
def patch_psycopg2():
extensions.set_wait_callback(_psycopg2_gevent_callback)
def _psycopg2_gevent_callback(conn, timeout=None):
while True:
state = conn.poll()
if state == extensions.POLL_OK:
break
elif state == extensions.POLL_READ:
wait_read(conn.fileno(), timeout=timeout)
elif state == extensions.POLL_WRITE:
wait_write(conn.fileno(), timeout=timeout)
else:
raise ValueError('poll() returned unexpected result')
SQLite 因為是嵌入在 Python 應用程式本身,所以不會執行任何可能成為非阻塞候選的 Socket 操作。無論如何,非同步對 SQLite 資料庫都沒有任何影響。
框架整合
對於 Web 應用程式,通常會在收到請求時開啟連線,並在傳遞回應時關閉連線。在本節中,我將描述如何將掛鉤新增到您的 Web 應用程式,以確保資料庫連線得到正確處理。
這些步驟將確保無論您使用的是簡單的 SQLite 資料庫,還是多個 Postgres 連線的池,Peewee 都會正確處理連線。
注意
接收大量流量的應用程式可能會受益於使用連線池,以減少在每個請求上設定和拆除連線的成本。
Flask
Flask 和 Peewee 是絕佳的組合,也是我任何大小專案的首選。Flask 提供了兩個掛鉤,我們將用來開啟和關閉資料庫連線。我們將在收到請求時開啟連線,然後在返回回應時關閉它。
from flask import Flask
from peewee import *
database = SqliteDatabase('my_app.db')
app = Flask(__name__)
# This hook ensures that a connection is opened to handle any queries
# generated by the request.
@app.before_request
def _db_connect():
database.connect()
# This hook ensures that the connection is closed when we've finished
# processing the request.
@app.teardown_request
def _db_close(exc):
if not database.is_closed():
database.close()
Django
雖然 Peewee 與 Django 一起使用較不常見,但實際上它們兩者非常容易一起使用。要使用 Django 管理您的 Peewee 資料庫連線,我認為最簡單的方法是向您的應用程式新增一個中介軟體。中介軟體應該是中介軟體列表中的第一個,以確保在處理請求時首先執行,並在返回回應時最後執行。
如果您有一個名為 *my_blog* 的 django 專案,且您的 Peewee 資料庫定義在 my_blog.db
模組中,您可以新增以下中介軟體類別
# middleware.py
from my_blog.db import database # Import the peewee database instance.
def PeeweeConnectionMiddleware(get_response):
def middleware(request):
database.connect()
try:
response = get_response(request)
finally:
if not database.is_closed():
database.close()
return response
return middleware
# Older Django < 1.10 middleware.
class PeeweeConnectionMiddleware(object):
def process_request(self, request):
database.connect()
def process_response(self, request, response):
if not database.is_closed():
database.close()
return response
為了確保這個中介軟體被執行,請將其新增到您的 settings
模組
# settings.py
MIDDLEWARE_CLASSES = (
# Our custom middleware appears first in the list.
'my_blog.middleware.PeeweeConnectionMiddleware',
# These are the default Django 1.7 middlewares. Yours may differ,
# but the important this is that our Peewee middleware comes first.
'django.middleware.common.CommonMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
)
# ... other Django settings ...
Bottle
我個人沒有使用過 Bottle,但查看文件後,我相信以下程式碼應該可以確保資料庫連線得到正確管理
# app.py
from bottle import hook #, route, etc, etc.
from peewee import *
db = SqliteDatabase('my-bottle-app.db')
@hook('before_request')
def _connect_db():
db.connect()
@hook('after_request')
def _close_db():
if not db.is_closed():
db.close()
# Rest of your bottle app goes here.
Web.py
請參閱應用程式處理器的文件。
db = SqliteDatabase('my_webpy_app.db')
def connection_processor(handler):
db.connect()
try:
return handler()
finally:
if not db.is_closed():
db.close()
app.add_processor(connection_processor)
Tornado
看來 Tornado 的 RequestHandler
類別實作了兩個掛鉤,可用於在處理請求時開啟和關閉連線。
from tornado.web import RequestHandler
db = SqliteDatabase('my_db.db')
class PeeweeRequestHandler(RequestHandler):
def prepare(self):
db.connect()
return super(PeeweeRequestHandler, self).prepare()
def on_finish(self):
if not db.is_closed():
db.close()
return super(PeeweeRequestHandler, self).on_finish()
在您的應用程式中,現在您可以擴充 PeeweeRequestHandler
,而不是擴充預設的 RequestHandler
。
請注意,這並未說明如何將 Peewee 與 Tornado 或其他事件迴圈非同步使用。
Wheezy.web
連線處理程式碼可以放在中介軟體中。
def peewee_middleware(request, following):
db.connect()
try:
response = following(request)
finally:
if not db.is_closed():
db.close()
return response
app = WSGIApplication(middleware=[
lambda x: peewee_middleware,
# ... other middlewares ...
])
感謝 GitHub 使用者 *@tuukkamustonen* 提供此程式碼。
Falcon
連線處理程式碼可以放在中介軟體元件中。
import falcon
from peewee import *
database = SqliteDatabase('my_app.db')
class PeeweeConnectionMiddleware(object):
def process_request(self, req, resp):
database.connect()
def process_response(self, req, resp, resource, req_succeeded):
if not database.is_closed():
database.close()
application = falcon.API(middleware=[
PeeweeConnectionMiddleware(),
# ... other middlewares ...
])
Pyramid
設定一個 Request 工廠,以如下方式處理資料庫連線的生命週期
from pyramid.request import Request
db = SqliteDatabase('pyramidapp.db')
class MyRequest(Request):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
db.connect()
self.add_finished_callback(self.finish)
def finish(self, request):
if not db.is_closed():
db.close()
在您的應用程式 main() 中,請確保 MyRequest 用作 request_factory
def main(global_settings, **settings):
config = Configurator(settings=settings, ...)
config.set_request_factory(MyRequest)
CherryPy
請參閱發佈/訂閱模式。
def _db_connect():
db.connect()
def _db_close():
if not db.is_closed():
db.close()
cherrypy.engine.subscribe('before_request', _db_connect)
cherrypy.engine.subscribe('after_request', _db_close)
Sanic
在 Sanic 中,連線處理程式碼可以放在請求和回應中介軟體 sanic 中介軟體中。
# app.py
@app.middleware('request')
async def handle_request(request):
db.connect()
@app.middleware('response')
async def handle_response(request, response):
if not db.is_closed():
db.close()
FastAPI
FastAPI 是一個與 asyncio 相容的框架。Peewee 依賴於執行緒本地變數(也與 gevent 相容)來管理跨請求的連線狀態。為了與 asyncio 一起使用,需要一些覆寫來將執行緒本地變數的行為替換為與 asyncio 相容的上下文本地變數。
如需完整了解如何在 FastAPI 中使用 Peewee,請參閱此處的 FastAPI 文件
https://fastapi.dev.org.tw/advanced/sql-databases-peewee/
上述文件涵蓋
新增 asyncio 感知上下文的連線狀態追蹤
每個請求的連線處理
其他框架
在這裡沒有看到您的框架嗎?請開啟一個 GitHub 工單,我會考慮新增一個章節,或者更好的是,提交一個文件 PR。
執行查詢
SQL 查詢通常會透過在 使用查詢產生器 API 建立的查詢上呼叫 execute()
來執行(或者在 Select
查詢的情況下,只需迭代查詢物件即可)。如果您希望直接執行 SQL,可以使用 Database.execute_sql()
方法。
db = SqliteDatabase('my_app.db')
db.connect()
# Example of executing a simple query and ignoring the results.
db.execute_sql("ATTACH DATABASE ':memory:' AS cache;")
# Example of iterating over the results of a query using the cursor.
cursor = db.execute_sql('SELECT * FROM users WHERE status = ?', (ACTIVE,))
for row in cursor.fetchall():
# Do something with row, which is a tuple containing column data.
pass
管理交易
Peewee 提供了多個用於處理交易的介面。最通用的是 Database.atomic()
方法,它也支援巢狀交易。 atomic()
區塊將在交易或儲存點中執行,具體取決於巢狀層級。
如果已封裝的區塊中發生未處理的例外狀況,則目前的交易/儲存點將會回滾。否則,這些陳述將在已封裝區塊的結尾提交。
範例
# Transaction will commit automatically at the end of the "with" block:
with db.atomic() as txn:
User.create(username='u1')
# Unhandled exceptions will cause transaction to be rolled-back:
with db.atomic() as txn:
User.create(username='huey')
# User has been INSERTed into the database but the transaction is not
# yet committed because we haven't left the scope of the "with" block.
raise ValueError('uh-oh')
# This exception is unhandled - the transaction will be rolled-back and
# the ValueError will be raised.
注意
在由 atomic()
上下文管理器封裝的區塊中,您可以隨時透過呼叫 Transaction.rollback()
或 Transaction.commit()
來明確回滾或提交。當您在已封裝的程式碼區塊中執行此操作時,將會自動啟動新的交易。
with db.atomic() as transaction: # Opens new transaction.
try:
save_some_objects()
except ErrorSavingData:
# Because this block of code is wrapped with "atomic", a
# new transaction will begin automatically after the call
# to rollback().
transaction.rollback()
error_saving = True
create_report(error_saving=error_saving)
# Note: no need to call commit. Since this marks the end of the
# wrapped block of code, the `atomic` context manager will
# automatically call commit for us.
注意
atomic()
可以用作上下文管理器或裝飾器。
注意
Peewee 的行為與您可能習慣的 DB-API 2.0 行為不同(有關詳細資訊,請參閱 PEP-249)。依預設,Peewee 會將所有連線放入自動提交模式,交易管理由 Peewee 處理。
上下文管理器
將 atomic
用作上下文管理器
db = SqliteDatabase(':memory:')
with db.atomic() as txn:
# This is the outer-most level, so this block corresponds to
# a transaction.
User.create(username='charlie')
with db.atomic() as nested_txn:
# This block corresponds to a savepoint.
User.create(username='huey')
# This will roll back the above create() query.
nested_txn.rollback()
User.create(username='mickey')
# When the block ends, the transaction is committed (assuming no error
# occurs). At that point there will be two users, "charlie" and "mickey".
您也可以使用 atomic
方法來執行取得或建立操作
try:
with db.atomic():
user = User.create(username=username)
return 'Success'
except peewee.IntegrityError:
return 'Failure: %s is already in use.' % username
裝飾器
將 atomic
用作裝飾器
@db.atomic()
def create_user(username):
# This statement will run in a transaction. If the caller is already
# running in an `atomic` block, then a savepoint will be used instead.
return User.create(username=username)
create_user('charlie')
巢狀交易
atomic()
提供了交易的透明巢狀。使用 atomic()
時,最外層的呼叫將會被封裝在交易中,而任何巢狀呼叫都會使用儲存點。
with db.atomic() as txn:
perform_operation()
with db.atomic() as nested_txn:
perform_another_operation()
Peewee 透過使用儲存點來支援巢狀交易(如需更多資訊,請參閱savepoint()
)。
明確交易
如果您希望在交易中明確執行程式碼,可以使用 transaction()
。與 atomic()
相同,transaction()
可以用作上下文管理器或裝飾器。
如果在已封裝的區塊中發生例外狀況,則交易將會回滾。否則,這些陳述將在已封裝區塊的結尾提交。
db = SqliteDatabase(':memory:')
with db.transaction() as txn:
# Delete the user and their associated tweets.
user.delete_instance(recursive=True)
交易可以在已封裝的區塊內明確提交或回滾。發生這種情況時,將會啟動新的交易。
with db.transaction() as txn:
User.create(username='mickey')
txn.commit() # Changes are saved and a new transaction begins.
User.create(username='huey')
# Roll back. "huey" will not be saved, but since "mickey" was already
# committed, that row will remain in the database.
txn.rollback()
with db.transaction() as txn:
User.create(username='whiskers')
# Roll back changes, which removes "whiskers".
txn.rollback()
# Create a new row for "mr. whiskers" which will be implicitly committed
# at the end of the `with` block.
User.create(username='mr. whiskers')
注意
如果您嘗試使用 transaction()
上下文管理器在 Peewee 中巢狀交易,則只會使用最外層的交易。如果巢狀區塊中發生例外狀況,則交易將不會回滾,只有浮出到最外層交易的例外狀況才會觸發回滾。
由於這可能會導致不可預測的行為,因此建議您使用 atomic()
。
明確儲存點
就像您可以明確建立交易一樣,您也可以使用 savepoint()
方法明確建立儲存點。儲存點必須發生在交易中,但可以任意深度巢狀。
with db.transaction() as txn:
with db.savepoint() as sp:
User.create(username='mickey')
with db.savepoint() as sp2:
User.create(username='zaizee')
sp2.rollback() # "zaizee" will not be saved, but "mickey" will be.
警告
如果您手動提交或回滾儲存點,則不會自動建立新的儲存點。這與 transaction
的行為不同,它會在手動提交/回滾後自動開啟新的交易。
自動提交模式
依預設,Peewee 以自動提交模式運作,因此在交易外部執行的任何陳述都會在其自己的交易中執行。若要將多個陳述群組到一個交易中,Peewee 提供了 atomic()
上下文管理器/裝飾器。這應涵蓋所有使用案例,但在極少數情況下,您想要暫時完全停用 Peewee 的交易管理,您可以使用 Database.manual_commit()
上下文管理器/裝飾器。
以下說明如何模擬 transaction()
上下文管理器的行為
with db.manual_commit():
db.begin() # Have to begin transaction explicitly.
try:
user.delete_instance(recursive=True)
except:
db.rollback() # Rollback! An error occurred.
raise
else:
try:
db.commit() # Commit changes.
except:
db.rollback()
raise
再次強調,我不認為有人會需要這個,但為了以防萬一,還是寫在這裡。
資料庫錯誤
Python DB-API 2.0 規範描述了幾種例外情況。由於大多數資料庫驅動程式都有自己對這些例外情況的實作,Peewee 提供圍繞任何特定於實作的例外類別的包裝器來簡化事情。這樣一來,您無需擔心匯入任何特殊的例外類別,只需使用來自 peewee 的例外類別即可
DatabaseError
DataError
IntegrityError
InterfaceError
InternalError
NotSupportedError
OperationalError
ProgrammingError
注意
所有這些錯誤類別都繼承自 PeeweeException
。
記錄查詢
所有查詢都會使用標準程式庫 logging
模組記錄到 *peewee* 命名空間。查詢會使用 *DEBUG* 級別記錄。如果您有興趣對查詢執行某些操作,您可以簡單地註冊一個處理程式。
# Print all queries to stderr.
import logging
logger = logging.getLogger('peewee')
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
新增新的資料庫驅動程式
Peewee 內建支援 Postgres、MySQL、MariaDB 和 SQLite。這些資料庫非常受歡迎,範圍涵蓋從快速、可嵌入的資料庫到適合大型部署的重量級伺服器。話雖如此,市面上有很多很棒的資料庫,而且只要驅動程式支援 DB-API 2.0 規範,新增對您選擇的資料庫的支援應該非常容易。
警告
Peewee 需要將資料庫連線置於自動提交模式。
如果您使用過標準程式庫 sqlite3 驅動程式、psycopg2 或類似的驅動程式,您應該會熟悉 DB-API 2.0 規範。Peewee 目前依賴以下幾個部分
Connection.commit
Connection.execute
Connection.rollback
Cursor.description
Cursor.fetchone
這些方法通常封裝在更高層次的抽象中,並由 Database
公開,因此即使您的驅動程式不完全執行這些操作,您仍然可以從 peewee 中獲得很多好處。一個例子是「playhouse」模組中的 apsw sqlite 驅動程式。
第一件事是提供 Database
的子類別,它將開啟連線,並確保連線處於自動提交模式(從而停用所有 DB-API 交易語意)
from peewee import Database
import foodb # Our fictional DB-API 2.0 driver.
class FooDatabase(Database):
def _connect(self, database):
return foodb.connect(self.database, autocommit=True, **self.connect_params)
Database
提供更高層次的 API,並負責執行查詢、建立表格和索引,以及檢查資料庫以取得表格清單。上面的實作是所需的絕對最小值,儘管某些功能將無法運作 – 為了獲得最佳結果,您還需要新增一個方法,從資料庫中提取表格的表格和索引清單。我們假設 FooDB
很像 MySQL,並且有特殊的「SHOW」語句
class FooDatabase(Database):
def _connect(self):
return foodb.connect(self.database, autocommit=True, **self.connect_params)
def get_tables(self):
res = self.execute('SHOW TABLES;')
return [r[0] for r in res.fetchall()]
資料庫處理的其他事項(這裡未涵蓋)包括
param
和quote
,它們告訴 SQL 產生程式碼如何新增參數佔位符和引用實體名稱。field_types
,用於將 INT 或 TEXT 等資料類型對應到其廠商特定的類型名稱。operations
,用於將「LIKE/ILIKE」等運算對應到其資料庫等效項目
請參閱 Database
API 參考或 原始程式碼以取得詳細資訊。
注意
如果您的驅動程式符合 DB-API 2.0 規範,則應該不需要太多工作即可啟動並執行。
我們的新資料庫可以像任何其他資料庫子類別一樣使用
from peewee import *
from foodb_ext import FooDatabase
db = FooDatabase('my_database', user='foo', password='secret')
class BaseModel(Model):
class Meta:
database = db
class Blog(BaseModel):
title = CharField()
contents = TextField()
pub_date = DateTimeField()