Hacks
收集使用 peewee 的小技巧。有很酷的小技巧想要分享嗎?在 GitHub 上開啟 一個 Issue 或 聯絡我。
樂觀鎖定
在您通常會使用 SELECT FOR UPDATE (或在 SQLite 中使用 BEGIN IMMEDIATE) 的情況下,樂觀鎖定會很有用。例如,您可能會從資料庫提取使用者記錄,進行一些修改,然後儲存修改後的使用者記錄。通常這種情況需要我們在事務期間鎖定使用者記錄,從我們選擇它的那一刻起,到我們儲存變更的那一刻。
另一方面,在樂觀鎖定中,我們不取得任何鎖,而是依賴於我們正在修改的行中的內部版本欄位。在讀取時,我們會看到該行目前處於哪個版本,而在儲存時,我們確保只有在版本與我們最初讀取的版本相同時才會進行更新。如果版本較高,那麼一定是其他程序偷偷地更改了該行 – 儲存我們修改後的版本可能會導致重要變更的遺失。
在 Peewee 中實作樂觀鎖定非常簡單,這裡有一個基底類別,您可以將其用作起點。
from peewee import *
class ConflictDetectedException(Exception): pass
class BaseVersionedModel(Model):
version = IntegerField(default=1, index=True)
def save_optimistic(self):
if not self.id:
# This is a new record, so the default logic is to perform an
# INSERT. Ideally your model would also have a unique
# constraint that made it impossible for two INSERTs to happen
# at the same time.
return self.save()
# Update any data that has changed and bump the version counter.
field_data = dict(self.__data__)
current_version = field_data.pop('version', 1)
self._populate_unsaved_relations(field_data)
field_data = self._prune_fields(field_data, self.dirty_fields)
if not field_data:
raise ValueError('No changes have been made.')
ModelClass = type(self)
field_data['version'] = ModelClass.version + 1 # Atomic increment.
query = ModelClass.update(**field_data).where(
(ModelClass.version == current_version) &
(ModelClass.id == self.id))
if query.execute() == 0:
# No rows were updated, indicating another process has saved
# a new version. How you handle this situation is up to you,
# but for simplicity I'm just raising an exception.
raise ConflictDetectedException()
else:
# Increment local version to match what is now in the db.
self.version += 1
return True
這是一個說明其運作方式的範例。假設我們有以下模型定義。請注意,username 上有一個唯一約束 – 這很重要,因為它可以防止重複插入。
class User(BaseVersionedModel):
username = CharField(unique=True)
favorite_animal = CharField()
範例
>>> u = User(username='charlie', favorite_animal='cat')
>>> u.save_optimistic()
True
>>> u.version
1
>>> u.save_optimistic()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "x.py", line 18, in save_optimistic
raise ValueError('No changes have been made.')
ValueError: No changes have been made.
>>> u.favorite_animal = 'kitten'
>>> u.save_optimistic()
True
# Simulate a separate thread coming in and updating the model.
>>> u2 = User.get(User.username == 'charlie')
>>> u2.favorite_animal = 'macaw'
>>> u2.save_optimistic()
True
# Now, attempt to change and re-save the original instance:
>>> u.favorite_animal = 'little parrot'
>>> u.save_optimistic()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "x.py", line 30, in save_optimistic
raise ConflictDetectedException()
ConflictDetectedException: current version is out of sync
每個群組的頂層物件
這些範例描述了幾種查詢每個群組的單個頂層項目的方法。如需各種技術的詳細討論,請查看我的部落格文章使用 Peewee ORM 查詢群組的頂層項目。如果您對查詢頂層 N 個項目的更一般性問題感興趣,請參閱以下章節 每個群組的頂層 N 個物件。
在這些範例中,我們將使用 User 和 Tweet 模型來查找每個使用者及其最近的推文。
我在測試中發現最有效的方法是使用 MAX()
聚合函數。
我們將在非相關子查詢中執行聚合,因此我們可以確信此方法將具有良好的效能。這個想法是,我們將選取貼文,按其作者分組,其時間戳記等於該使用者觀察到的最大時間戳記。
# When referencing a table multiple times, we'll call Model.alias() to create
# a secondary reference to the table.
TweetAlias = Tweet.alias()
# Create a subquery that will calculate the maximum Tweet created_date for each
# user.
subquery = (TweetAlias
.select(
TweetAlias.user,
fn.MAX(TweetAlias.created_date).alias('max_ts'))
.group_by(TweetAlias.user)
.alias('tweet_max_subquery'))
# Query for tweets and join using the subquery to match the tweet's user
# and created_date.
query = (Tweet
.select(Tweet, User)
.join(User)
.switch(Tweet)
.join(subquery, on=(
(Tweet.created_date == subquery.c.max_ts) &
(Tweet.user == subquery.c.user_id))))
SQLite 和 MySQL 更為寬鬆,允許按選定欄位的子集進行分組。這表示我們可以去除子查詢,並非常簡潔地表達它。
query = (Tweet
.select(Tweet, User)
.join(User)
.group_by(Tweet.user)
.having(Tweet.created_date == fn.MAX(Tweet.created_date)))
每個群組的頂層 N 個物件
這些範例描述了幾種合理有效率地查詢每個群組的頂層 N 個項目之方法。如需各種技術的詳細討論,請查看我的部落格文章 使用 Peewee ORM 查詢每個群組的頂層 N 個物件。
在這些範例中,我們將使用 User 和 Tweet 模型來查找每個使用者及其最近的三則推文。
Postgres 側向聯結
側向聯結 是 Postgres 的一個巧妙功能,它允許相當有效率的相關子查詢。它們通常被描述為 SQL for each
迴圈。
所需的 SQL 是
SELECT * FROM
(SELECT id, username FROM user) AS uq
LEFT JOIN LATERAL
(SELECT message, created_date
FROM tweet
WHERE (user_id = uq.id)
ORDER BY created_date DESC LIMIT 3)
AS pq ON true
使用 peewee 完成此操作非常簡單
subq = (Tweet
.select(Tweet.message, Tweet.created_date)
.where(Tweet.user == User.id)
.order_by(Tweet.created_date.desc())
.limit(3))
query = (User
.select(User, subq.c.content, subq.c.created_date)
.join(subq, JOIN.LEFT_LATERAL)
.order_by(User.username, subq.c.created_date.desc()))
# We queried from the "perspective" of user, so the rows are User instances
# with the addition of a "content" and "created_date" attribute for each of
# the (up-to) 3 most-recent tweets for each user.
for row in query:
print(row.username, row.content, row.created_date)
若要從 Tweet 模型的「角度」實作等效查詢,我們可以改寫
# subq is the same as the above example.
subq = (Tweet
.select(Tweet.message, Tweet.created_date)
.where(Tweet.user == User.id)
.order_by(Tweet.created_date.desc())
.limit(3))
query = (Tweet
.select(User.username, subq.c.content, subq.c.created_date)
.from_(User)
.join(subq, JOIN.LEFT_LATERAL)
.order_by(User.username, subq.c.created_date.desc()))
# Each row is a "tweet" instance with an additional "username" attribute.
# This will print the (up-to) 3 most-recent tweets from each user.
for tweet in query:
print(tweet.username, tweet.content, tweet.created_date)
視窗函數
所需的 SQL 是
SELECT subq.message, subq.username
FROM (
SELECT
t2.message,
t3.username,
RANK() OVER (
PARTITION BY t2.user_id
ORDER BY t2.created_date DESC
) AS rnk
FROM tweet AS t2
INNER JOIN user AS t3 ON (t2.user_id = t3.id)
) AS subq
WHERE (subq.rnk <= 3)
若要使用 peewee 完成此操作,我們將把已排名的推文包裝在執行篩選的外部查詢中。
TweetAlias = Tweet.alias()
# The subquery will select the relevant data from the Tweet and
# User table, as well as ranking the tweets by user from newest
# to oldest.
subquery = (TweetAlias
.select(
TweetAlias.message,
User.username,
fn.RANK().over(
partition_by=[TweetAlias.user],
order_by=[TweetAlias.created_date.desc()]).alias('rnk'))
.join(User, on=(TweetAlias.user == User.id))
.alias('subq'))
# Since we can't filter on the rank, we are wrapping it in a query
# and performing the filtering in the outer query.
query = (Tweet
.select(subquery.c.message, subquery.c.username)
.from_(subquery)
.where(subquery.c.rnk <= 3))
其他方法
如果您沒有使用 Postgres,那麼很遺憾,您只能使用效能不太理想的選項。如需常見方法的更完整概述,請查看這篇部落格文章。以下我將總結這些方法和對應的 SQL。
使用 COUNT
,我們可以取得所有時間戳記較新的推文少於 N 則的推文
TweetAlias = Tweet.alias()
# Create a correlated subquery that calculates the number of
# tweets with a higher (newer) timestamp than the tweet we're
# looking at in the outer query.
subquery = (TweetAlias
.select(fn.COUNT(TweetAlias.id))
.where(
(TweetAlias.created_date >= Tweet.created_date) &
(TweetAlias.user == Tweet.user)))
# Wrap the subquery and filter on the count.
query = (Tweet
.select(Tweet, User)
.join(User)
.where(subquery <= 3))
我們可以透過執行自我聯結並在 HAVING
子句中執行篩選來達到類似的結果
TweetAlias = Tweet.alias()
# Use a self-join and join predicates to count the number of
# newer tweets.
query = (Tweet
.select(Tweet.id, Tweet.message, Tweet.user, User.username)
.join(User)
.switch(Tweet)
.join(TweetAlias, on=(
(TweetAlias.user == Tweet.user) &
(TweetAlias.created_date >= Tweet.created_date)))
.group_by(Tweet.id, Tweet.content, Tweet.user, User.username)
.having(fn.COUNT(Tweet.id) <= 3))
最後一個範例在相關子查詢中使用 LIMIT
子句。
TweetAlias = Tweet.alias()
# The subquery here will calculate, for the user who created the
# tweet in the outer loop, the three newest tweets. The expression
# will evaluate to `True` if the outer-loop tweet is in the set of
# tweets represented by the inner query.
query = (Tweet
.select(Tweet, User)
.join(User)
.where(Tweet.id << (
TweetAlias
.select(TweetAlias.id)
.where(TweetAlias.user == Tweet.user)
.order_by(TweetAlias.created_date.desc())
.limit(3))))
使用 SQLite 撰寫自訂函數
SQLite 非常容易使用 Python 撰寫的自訂函數進行擴充,然後可以從您的 SQL 陳述式中呼叫這些函數。透過使用 SqliteExtDatabase
和 func()
裝飾器,您可以非常輕鬆地定義自己的函數。
這是一個範例函數,它會產生使用者提供的密碼的雜湊版本。我們也可以使用它來實作 login
功能,以比對使用者和密碼。
from hashlib import sha1
from random import random
from playhouse.sqlite_ext import SqliteExtDatabase
db = SqliteExtDatabase('my-blog.db')
def get_hexdigest(salt, raw_password):
data = salt + raw_password
return sha1(data.encode('utf8')).hexdigest()
@db.func()
def make_password(raw_password):
salt = get_hexdigest(str(random()), str(random()))[:5]
hsh = get_hexdigest(salt, raw_password)
return '%s$%s' % (salt, hsh)
@db.func()
def check_password(raw_password, enc_password):
salt, hsh = enc_password.split('$', 1)
return hsh == get_hexdigest(salt, raw_password)
以下是如何使用此函數新增新使用者,儲存雜湊密碼的方式
query = User.insert(
username='charlie',
password=fn.make_password('testing')).execute()
如果我們從資料庫中擷取使用者,則儲存的密碼會被雜湊和加鹽處理
>>> user = User.get(User.username == 'charlie')
>>> print(user.password)
b76fa$88be1adcde66a1ac16054bc17c8a297523170949
若要實作 login
類型功能,您可以撰寫如下內容
def login(username, password):
try:
return (User
.select()
.where(
(User.username == username) &
(fn.check_password(password, User.password) == True))
.get())
except User.DoesNotExist:
# Incorrect username and/or password.
return False
日期運算
Peewee 支援的每個資料庫都實作了自己的一組日期/時間算術函數和語意。
本節將提供一個簡短的情境和範例程式碼,示範您如何使用 Peewee 在 SQL 中執行動態日期操作。
情境:我們需要每隔 X 秒執行某些任務,並且任務間隔和任務本身都在資料庫中定義。我們需要撰寫一些程式碼,告訴我們在給定時間應該執行哪些任務
class Schedule(Model):
interval = IntegerField() # Run this schedule every X seconds.
class Task(Model):
schedule = ForeignKeyField(Schedule, backref='tasks')
command = TextField() # Run this command.
last_run = DateTimeField() # When was this run last?
我們的邏輯基本上會歸結為
# e.g., if the task was last run at 12:00:05, and the associated interval
# is 10 seconds, the next occurrence should be 12:00:15. So we check
# whether the current time (now) is 12:00:15 or later.
now >= task.last_run + schedule.interval
因此我們可以撰寫以下程式碼
next_occurrence = something # ??? how do we define this ???
# We can express the current time as a Python datetime value, or we could
# alternatively use the appropriate SQL function/name.
now = Value(datetime.datetime.now()) # Or SQL('current_timestamp'), e.g.
query = (Task
.select(Task, Schedule)
.join(Schedule)
.where(now >= next_occurrence))
對於 Postgresql,我們將將靜態的 1 秒間隔相乘,以動態計算偏移量
second = SQL("INTERVAL '1 second'")
next_occurrence = Task.last_run + (Schedule.interval * second)
對於 MySQL,我們可以參考排程的間隔直接
from peewee import NodeList # Needed to construct sql entity.
interval = NodeList((SQL('INTERVAL'), Schedule.interval, SQL('SECOND')))
next_occurrence = fn.date_add(Task.last_run, interval)
對於 SQLite,情況有點棘手,因為 SQLite 沒有專用的日期時間類型。因此,對於 SQLite,我們將轉換為 Unix 時間戳記,加入排程的秒數,然後轉換回可比較的日期時間表示法
next_ts = fn.strftime('%s', Task.last_run) + Schedule.interval
next_occurrence = fn.datetime(next_ts, 'unixepoch')