查詢
本節將介紹關聯式資料庫中常用的基本 CRUD 操作
Model.create()
,用於執行 INSERT 查詢。Model.save()
和Model.update()
,用於執行 UPDATE 查詢。Model.delete_instance()
和Model.delete()
,用於執行 DELETE 查詢。Model.select()
,用於執行 SELECT 查詢。
注意
還有大量範例查詢取自 Postgresql Exercises 網站。範例列在 查詢範例 文件中。
建立新紀錄
您可以使用 Model.create()
建立新的模型實例。此方法接受關鍵字引數,其中鍵對應於模型欄位的名稱。會傳回新的實例,並將資料列新增至表格。
>>> User.create(username='Charlie')
<__main__.User object at 0x2529350>
這會在資料庫中 INSERT 一個新的資料列。主鍵將會自動擷取並儲存在模型實例上。
或者,您可以透過程式設計方式建立模型實例,然後呼叫 save()
>>> user = User(username='Charlie')
>>> user.save() # save() returns the number of rows modified.
1
>>> user.id
1
>>> huey = User()
>>> huey.username = 'Huey'
>>> huey.save()
1
>>> huey.id
2
當模型具有外鍵時,您可以在建立新紀錄時,直接將模型實例指派給外鍵欄位。
>>> tweet = Tweet.create(user=huey, message='Hello!')
您也可以使用相關物件主鍵的值
>>> tweet = Tweet.create(user=2, message='Hello again!')
如果您只想插入資料而不需要建立模型實例,可以使用 Model.insert()
>>> User.insert(username='Mickey').execute()
3
執行插入查詢後,會傳回新資料列的主鍵。
注意
您可以使用幾種方法來加速批量插入操作。請查看 批量插入 食譜章節以取得更多資訊。
批量插入
您可以使用幾種方法來快速載入大量資料。天真的方法是簡單地在迴圈中呼叫 Model.create()
data_source = [
{'field1': 'val1-1', 'field2': 'val1-2'},
{'field1': 'val2-1', 'field2': 'val2-2'},
# ...
]
for data_dict in data_source:
MyModel.create(**data_dict)
上述方法速度很慢,原因有幾個
如果您沒有將迴圈包裝在交易中,那麼每次呼叫
create()
都會在自己的交易中發生。這將非常緩慢!有相當多的 Python 邏輯阻礙您,而且每個
InsertQuery
都必須產生並剖析為 SQL。您正在傳送大量資料(以原始 SQL 位元組而言)到資料庫進行剖析。
我們正在擷取上次插入的 ID,這在某些情況下會導致執行額外的查詢。
您可以透過簡單地使用 atomic()
將其包裝在交易中,來獲得顯著的加速。
# This is much faster.
with db.atomic():
for data_dict in data_source:
MyModel.create(**data_dict)
上述程式碼仍然有第 2、3 和 4 點的問題。我們可以透過使用 insert_many()
來獲得另一個很大的提升。此方法接受元組或字典的清單,並在單一查詢中插入多個資料列
data_source = [
{'field1': 'val1-1', 'field2': 'val1-2'},
{'field1': 'val2-1', 'field2': 'val2-2'},
# ...
]
# Fastest way to INSERT multiple rows.
MyModel.insert_many(data_source).execute()
insert_many()
方法也接受資料列元組的清單,前提是您也要指定對應的欄位
# We can INSERT tuples as well...
data = [('val1-1', 'val1-2'),
('val2-1', 'val2-2'),
('val3-1', 'val3-2')]
# But we need to indicate which fields the values correspond to.
MyModel.insert_many(data, fields=[MyModel.field1, MyModel.field2]).execute()
將批量插入包裝在交易中也是一個好習慣
# You can, of course, wrap this in a transaction as well:
with db.atomic():
MyModel.insert_many(data, fields=fields).execute()
注意
使用批量插入時,SQLite 使用者應注意一些注意事項。具體而言,您的 SQLite3 版本必須為 3.7.11.0 或更新版本,才能利用批量插入 API。此外,依預設,對於 3.32.0 (2020-05-22) 之前的 SQLite 版本,SQLite 將 SQL 查詢中繫結變數的數量限制為 999
個,對於 3.32.0 之後的 SQLite 版本,則限制為 32766 個。
分批插入資料列
根據資料來源中的資料列數量,您可能需要將其分解成區塊。尤其是 SQLite 通常具有 每個查詢限制 999 或 32766 個 變數(然後批次大小將為 999 // 資料列長度或 32766 // 資料列長度)。
您可以編寫一個迴圈,將您的資料批次處理成區塊(在這種情況下,強烈建議您使用交易)
# Insert rows 100 at a time.
with db.atomic():
for idx in range(0, len(data_source), 100):
MyModel.insert_many(data_source[idx:idx+100]).execute()
Peewee 隨附 chunked()
輔助函數,您可以使用它來有效地將通用可迭代物件分塊成一系列批次大小的可迭代物件
from peewee import chunked
# Insert rows 100 at a time.
with db.atomic():
for batch in chunked(data_source, 100):
MyModel.insert_many(batch).execute()
替代方案
Model.bulk_create()
方法的行為很像 Model.insert_many()
,但它接受要插入的未儲存模型實例清單,並且可選擇性地接受批次大小參數。若要使用 bulk_create()
API
# Read list of usernames from a file, for example.
with open('user_list.txt') as fh:
# Create a list of unsaved User instances.
users = [User(username=line.strip()) for line in fh.readlines()]
# Wrap the operation in a transaction and batch INSERT the users
# 100 at a time.
with db.atomic():
User.bulk_create(users, batch_size=100)
注意
如果您使用的是 Postgresql(它支援 RETURNING
子句),那麼先前未儲存的模型實例將會自動填入其新的主鍵值。
此外,Peewee 也提供 Model.bulk_update()
,它可以有效率地更新模型清單中的一個或多個欄位。例如
# First, create 3 users with usernames u1, u2, u3.
u1, u2, u3 = [User.create(username='u%s' % i) for i in (1, 2, 3)]
# Now we'll modify the user instances.
u1.username = 'u1-x'
u2.username = 'u2-y'
u3.username = 'u3-z'
# Update all three users with a single UPDATE query.
User.bulk_update([u1, u2, u3], fields=[User.username])
這會導致執行下列 SQL
UPDATE "users" SET "username" = CASE "users"."id"
WHEN 1 THEN "u1-x"
WHEN 2 THEN "u2-y"
WHEN 3 THEN "u3-z" END
WHERE "users"."id" IN (1, 2, 3);
注意
對於大型物件清單,您應該指定合理的 batch_size,並使用 Database.atomic()
包裝呼叫 bulk_update()
with database.atomic():
User.bulk_update(list_of_users, fields=['username'], batch_size=50)
警告
Model.bulk_update()
可能不是更新大量記錄的最有效率方法。此功能的實作方式是,我們會為使用 SQL CASE
陳述式更新的所有資料列建立主鍵到對應欄位值的「對應」。
或者,您可以使用 Database.batch_commit()
輔助程式,在批次大小的交易中處理資料列區塊。當必須取得新建立的資料列的主鍵時,此方法也為 Postgresql 以外的資料庫提供變通辦法。
# List of row data to insert.
row_data = [{'username': 'u1'}, {'username': 'u2'}, ...]
# Assume there are 789 items in row_data. The following code will result in
# 8 total transactions (7x100 rows + 1x89 rows).
for row in db.batch_commit(row_data, 100):
User.create(**row)
從另一個表格批量載入
如果您想要批量載入的資料儲存在另一個表格中,您也可以建立來源為 SELECT 查詢的 INSERT 查詢。使用 Model.insert_from()
方法
res = (TweetArchive
.insert_from(
Tweet.select(Tweet.user, Tweet.message),
fields=[TweetArchive.user, TweetArchive.message])
.execute())
上述查詢等同於下列 SQL
INSERT INTO "tweet_archive" ("user_id", "message")
SELECT "user_id", "message" FROM "tweet";
更新現有紀錄
一旦模型實例具有主鍵,後續任何呼叫 save()
都會導致 UPDATE 而不是另一個 INSERT。模型的主鍵不會變更
>>> user.save() # save() returns the number of rows modified.
1
>>> user.id
1
>>> user.save()
>>> user.id
1
>>> huey.save()
1
>>> huey.id
2
如果您想要更新多個記錄,請發出 UPDATE 查詢。下列範例將更新所有 Tweet
物件,如果它們是在今天之前建立的,則將其標示為已發佈。Model.update()
接受關鍵字引數,其中鍵對應於模型的欄位名稱
>>> today = datetime.today()
>>> query = Tweet.update(is_published=True).where(Tweet.creation_date < today)
>>> query.execute() # Returns the number of rows that were updated.
4
如需更多資訊,請參閱 Model.update()
、Update
和 Model.bulk_update()
的文件。
注意
如果您想要更多關於執行原子更新(例如遞增欄位的值)的資訊,請查看 原子更新 食譜。
原子更新
Peewee 允許您執行原子更新。假設我們需要更新一些計數器。最簡單的方法是寫成這樣:
>>> for stat in Stat.select().where(Stat.url == request.url):
... stat.counter += 1
... stat.save()
不要這樣做! 這不僅速度慢,而且如果多個進程同時更新計數器,還容易出現競爭條件。
相反地,您可以使用 update()
來原子地更新計數器
>>> query = Stat.update(counter=Stat.counter + 1).where(Stat.url == request.url)
>>> query.execute()
您可以讓這些更新語句盡可能複雜。讓我們給所有員工一個獎金,金額等於他們之前的獎金加上他們薪水的 10%:
>>> query = Employee.update(bonus=(Employee.bonus + (Employee.salary * .1)))
>>> query.execute() # Give everyone a bonus!
我們甚至可以使用子查詢來更新欄位的值。假設在 User
模型上有一個非正規化的欄位,儲存了使用者發過的推文數量,並且我們定期更新這個值。以下是您可能編寫此查詢的方式:
>>> subquery = Tweet.select(fn.COUNT(Tweet.id)).where(Tweet.user == User.id)
>>> update = User.update(num_tweets=subquery)
>>> update.execute()
Upsert
Peewee 支援各種 upsert 功能。對於 3.24.0 之前的 SQLite 和 MySQL,Peewee 提供了 replace()
,它允許您插入一筆記錄,或者在發生約束違規時,替換現有的記錄。對於 Sqlite 3.24+ 和 Postgres,peewee 完全支援 ON CONFLICT
查詢。
使用 replace()
和 on_conflict_replace()
的範例:
class User(Model):
username = TextField(unique=True)
last_login = DateTimeField(null=True)
# Insert or update the user. The "last_login" value will be updated
# regardless of whether the user existed previously.
user_id = (User
.replace(username='the-user', last_login=datetime.now())
.execute())
# This query is equivalent:
user_id = (User
.insert(username='the-user', last_login=datetime.now())
.on_conflict_replace()
.execute())
注意
除了 replace 之外,SQLite、MySQL 和 Postgresql 還提供了 ignore 操作(請參閱:on_conflict_ignore()
),如果您只想插入並忽略任何潛在的約束違規。
MySQL 透過 ON DUPLICATE KEY UPDATE 子句支援 upsert。例如:
class User(Model):
username = TextField(unique=True)
last_login = DateTimeField(null=True)
login_count = IntegerField()
# Insert a new user.
User.create(username='huey', login_count=0)
# Simulate the user logging in. The login count and timestamp will be
# either created or updated correctly.
now = datetime.now()
rowid = (User
.insert(username='huey', last_login=now, login_count=1)
.on_conflict(
preserve=[User.last_login], # Use the value we would have inserted.
update={User.login_count: User.login_count + 1})
.execute())
在上面的範例中,我們可以安全地多次調用 upsert 查詢。登入計數將以原子方式遞增,最後登入欄位將會更新,並且不會建立重複的列。
Postgresql 和 SQLite (3.24.0 及更新版本) 提供了不同的語法,可以更精細地控制哪個約束違規應觸發衝突解決,以及應該更新或保留哪些值。
使用 on_conflict()
執行 Postgresql 風格的 upsert(或 SQLite 3.24+)的範例:
class User(Model):
username = TextField(unique=True)
last_login = DateTimeField(null=True)
login_count = IntegerField()
# Insert a new user.
User.create(username='huey', login_count=0)
# Simulate the user logging in. The login count and timestamp will be
# either created or updated correctly.
now = datetime.now()
rowid = (User
.insert(username='huey', last_login=now, login_count=1)
.on_conflict(
conflict_target=[User.username], # Which constraint?
preserve=[User.last_login], # Use the value we would have inserted.
update={User.login_count: User.login_count + 1})
.execute())
在上面的範例中,我們可以安全地多次調用 upsert 查詢。登入計數將以原子方式遞增,最後登入欄位將會更新,並且不會建立重複的列。
注意
MySQL 和 Postgresql/SQLite 之間的主要區別在於,Postgresql 和 SQLite 要求您指定一個 conflict_target
。
這裡有一個更進階(如果有點牽強)的範例,使用了 EXCLUDED
命名空間。EXCLUDED
幫助程式允許我們引用衝突資料中的值。在我們的範例中,我們假設一個簡單的表格,將唯一鍵(字串)映射到一個值(整數):
class KV(Model):
key = CharField(unique=True)
value = IntegerField()
# Create one row.
KV.create(key='k1', value=1)
# Demonstrate usage of EXCLUDED.
# Here we will attempt to insert a new value for a given key. If that
# key already exists, then we will update its value with the *sum* of its
# original value and the value we attempted to insert -- provided that
# the new value is larger than the original value.
query = (KV.insert(key='k1', value=10)
.on_conflict(conflict_target=[KV.key],
update={KV.value: KV.value + EXCLUDED.value},
where=(EXCLUDED.value > KV.value)))
# Executing the above query will result in the following data being
# present in the "kv" table:
# (key='k1', value=11)
query.execute()
# If we attempted to execute the query *again*, then nothing would be
# updated, as the new value (10) is now less than the value in the
# original row (11).
使用 ON CONFLICT
時,有幾個重要的概念需要理解:
conflict_target=
:哪個(些)欄位具有 UNIQUE 約束。對於使用者表格,這可能是使用者的電子郵件。preserve=
:如果發生衝突,此參數用於指示我們希望更新哪些新資料的值。update=
:如果發生衝突,這是一個將套用至已存在列的資料映射。EXCLUDED
:這個「神奇」的命名空間允許您引用如果約束沒有失敗,原本會插入的新資料。
完整範例:
class User(Model):
email = CharField(unique=True) # Unique identifier for user.
last_login = DateTimeField()
login_count = IntegerField(default=0)
ip_log = TextField(default='')
# Demonstrates the above 4 concepts.
def login(email, ip):
rowid = (User
.insert({User.email: email,
User.last_login: datetime.now(),
User.login_count: 1,
User.ip_log: ip})
.on_conflict(
# If the INSERT fails due to a constraint violation on the
# user email, then perform an UPDATE instead.
conflict_target=[User.email],
# Set the "last_login" to the value we would have inserted
# (our call to datetime.now()).
preserve=[User.last_login],
# Increment the user's login count and prepend the new IP
# to the user's ip history.
update={User.login_count: User.login_count + 1,
User.ip_log: fn.CONCAT(EXCLUDED.ip_log, ',', User.ip_log)})
.execute())
return rowid
# This will insert the initial row, returning the new row id (1).
print(login('test@example.com', '127.1'))
# Because test@example.com exists, this will trigger the UPSERT. The row id
# from above is returned again (1).
print(login('test@example.com', '127.2'))
u = User.get()
print(u.login_count, u.ip_log)
# Prints "2 127.2,127.1"
如需更多資訊,請參閱 Insert.on_conflict()
和 OnConflict
。
刪除記錄
若要刪除單個模型實例,您可以使用 Model.delete_instance()
快捷方式。delete_instance()
將刪除給定的模型實例,並且可以選擇性地遞迴刪除任何相依物件(透過指定 recursive=True)。
>>> user = User.get(User.id == 1)
>>> user.delete_instance() # Returns the number of rows deleted.
1
>>> User.get(User.id == 1)
UserDoesNotExist: instance matching query does not exist:
SQL: SELECT t1."id", t1."username" FROM "user" AS t1 WHERE t1."id" = ?
PARAMS: [1]
若要刪除任意一組列,您可以發出 DELETE 查詢。以下將刪除所有超過一年的 Tweet
物件:
>>> query = Tweet.delete().where(Tweet.creation_date < one_year_ago)
>>> query.execute() # Returns the number of rows deleted.
7
如需更多資訊,請參閱文件:
DeleteQuery
選取單個記錄
您可以使用 Model.get()
方法來檢索與給定查詢匹配的單個實例。對於主鍵查找,您還可以使用快捷方法 Model.get_by_id()
。
此方法是一個快捷方式,會使用給定的查詢呼叫 Model.select()
,但會將結果集限制為單列。此外,如果沒有模型符合給定的查詢,則會引發 DoesNotExist
例外。
>>> User.get(User.id == 1)
<__main__.User object at 0x25294d0>
>>> User.get_by_id(1) # Same as above.
<__main__.User object at 0x252df10>
>>> User[1] # Also same as above.
<__main__.User object at 0x252dd10>
>>> User.get(User.id == 1).username
u'Charlie'
>>> User.get(User.username == 'Charlie')
<__main__.User object at 0x2529410>
>>> User.get(User.username == 'nobody')
UserDoesNotExist: instance matching query does not exist:
SQL: SELECT t1."id", t1."username" FROM "user" AS t1 WHERE t1."username" = ?
PARAMS: ['nobody']
對於更進階的操作,您可以使用 SelectBase.get()
。以下查詢會檢索名為 charlie 的使用者的最新推文:
>>> (Tweet
... .select()
... .join(User)
... .where(User.username == 'charlie')
... .order_by(Tweet.created_date.desc())
... .get())
<__main__.Tweet object at 0x2623410>
如需更多資訊,請參閱文件:
Model.get_or_none()
- 如果找不到匹配的列,則傳回None
。SelectBase.first()
- 傳回結果集的第一筆記錄,如果結果集為空,則傳回None
。
建立或取得
Peewee 有一個協助程式方法來執行「取得/建立」類型的操作:Model.get_or_create()
,它會先嘗試檢索匹配的列。如果失敗,則會建立一個新列。
對於「建立或取得」類型的邏輯,通常會依賴 unique 約束或主鍵來防止建立重複的物件。舉例來說,假設我們希望使用範例 User 模型來實作註冊新使用者帳戶。User 模型在 username 欄位上有一個 unique 約束,因此我們將依賴資料庫的完整性保證,以確保我們不會得到重複的 username:
try:
with db.atomic():
return User.create(username=username)
except peewee.IntegrityError:
# `username` is a unique column, so this username already exists,
# making it safe to call .get().
return User.get(User.username == username)
您可以輕鬆地將此類型的邏輯封裝為您自己的 Model
類別上的 classmethod
。
上面的範例首先嘗試建立,然後回退到檢索,並依賴資料庫來強制執行唯一約束。如果您希望先嘗試檢索記錄,則可以使用 get_or_create()
。此方法的實作方式與 Django 中同名的函式相同。您可以使用 Django 風格的關鍵字引數篩選器來指定您的 WHERE
條件。該函式會傳回一個 2 元組,其中包含實例和一個布林值,指示是否已建立該物件。
以下是您可能使用 get_or_create()
實作使用者帳戶建立的方式:
user, created = User.get_or_create(username=username)
假設我們有一個不同的模型 Person
,並且想要取得或建立一個 person 物件。當檢索 Person
時,我們唯一關心的條件是他們的名字和姓氏,但是,如果我們最終需要建立一個新記錄,我們還將指定他們的出生日期和最喜歡的顏色:
person, created = Person.get_or_create(
first_name=first_name,
last_name=last_name,
defaults={'dob': dob, 'favorite_color': 'green'})
傳遞給 get_or_create()
的任何關鍵字引數都將在邏輯的 get()
部分中使用,除了 defaults
字典之外,該字典將用於填入新建立實例的值。
更多詳細資訊請閱讀 Model.get_or_create()
的文件。
選取多筆記錄
我們可以使用 Model.select()
從資料表中檢索列。當您建構一個 SELECT 查詢時,資料庫將傳回符合您查詢的任何列。Peewee 允許您迭代這些列,以及使用索引和切片操作。
>>> query = User.select()
>>> [user.username for user in query]
['Charlie', 'Huey', 'Peewee']
>>> query[1]
<__main__.User at 0x7f83e80f5550>
>>> query[1].username
'Huey'
>>> query[:2]
[<__main__.User at 0x7f83e80f53a8>, <__main__.User at 0x7f83e80f5550>]
Select
查詢很聰明,您可以多次迭代、索引和切片查詢,但查詢只會執行一次。
在以下範例中,我們將簡單地呼叫 select()
並迭代傳回值,這是一個 Select
的實例。這將傳回 User 表格中的所有列。
>>> for user in User.select():
... print(user.username)
...
Charlie
Huey
Peewee
注意
後續對同一查詢的迭代不會存取資料庫,因為結果已快取。要停用此行為(以減少記憶體使用量),在迭代時呼叫 Select.iterator()
。
當迭代包含外鍵的模型時,請小心存取相關模型值的方式。意外解析外鍵或迭代回溯參照可能會導致 N+1 查詢行為。
當您建立一個外鍵時,例如 Tweet.user
,您可以使用 backref 來建立回溯參照 (User.tweets
)。回溯參照會作為 Select
實例公開。
>>> tweet = Tweet.get()
>>> tweet.user # Accessing a foreign key returns the related model.
<tw.User at 0x7f3ceb017f50>
>>> user = User.get()
>>> user.tweets # Accessing a back-reference returns a query.
<peewee.ModelSelect at 0x7f73db3bafd0>
您可以像任何其他 Select
一樣迭代 user.tweets
回溯參照。
>>> for tweet in user.tweets:
... print(tweet.message)
...
hello world
this is fun
look at this picture of my food
除了傳回模型實例外,Select
查詢還可以傳回字典、元組和具名元組。根據您的使用情況,您可能會發現使用字典作為列會更容易。
>>> query = User.select().dicts()
>>> for row in query:
... print(row)
{'id': 1, 'username': 'Charlie'}
{'id': 2, 'username': 'Huey'}
{'id': 3, 'username': 'Peewee'}
請參閱 namedtuples()
、tuples()
、dicts()
以獲取更多資訊。
迭代大型結果集
預設情況下,peewee 在迭代 Select
查詢時會快取傳回的列。這是一種優化,允許進行多次迭代,以及索引和切片,而不會導致額外的查詢。但是,當您計劃迭代大量列時,此快取可能會出現問題。
為了減少 peewee 在迭代查詢時使用的記憶體量,請使用 iterator()
方法。此方法允許您在不快取每個傳回的模型的情況下進行迭代,從而在迭代大型結果集時使用更少的記憶體。
# Let's assume we've got 10 million stat objects to dump to a csv file.
stats = Stat.select()
# Our imaginary serializer class
serializer = CSVSerializer()
# Loop over all the stats and serialize.
for stat in stats.iterator():
serializer.serialize_object(stat)
對於簡單的查詢,您可以透過將列作為字典、具名元組或元組傳回,來進一步提高速度。以下方法可用於任何 Select
查詢,以變更結果列類型。
不要忘記附加 iterator()
方法呼叫,以減少記憶體消耗。例如,上述程式碼可能如下所示:
# Let's assume we've got 10 million stat objects to dump to a csv file.
stats = Stat.select()
# Our imaginary serializer class
serializer = CSVSerializer()
# Loop over all the stats (rendered as tuples, without caching) and serialize.
for stat_tuple in stats.tuples().iterator():
serializer.serialize_tuple(stat_tuple)
當迭代包含來自多個表格的列的大量列時,peewee 會為每個傳回的列重建模型圖。對於複雜的圖形,此操作可能會很慢。例如,如果我們選擇一系列推文以及推文作者的使用者名稱和頭像,Peewee 必須為每一列建立兩個物件(一個推文和一個使用者)。除了上述列類型之外,還有第四種方法 objects()
,它會將列作為模型實例傳回,但不會嘗試解析模型圖。
例如:
query = (Tweet
.select(Tweet, User) # Select tweet and user data.
.join(User))
# Note that the user columns are stored in a separate User instance
# accessible at tweet.user:
for tweet in query:
print(tweet.user.username, tweet.content)
# Using ".objects()" will not create the tweet.user object and assigns all
# user attributes to the tweet instance:
for tweet in query.objects():
print(tweet.username, tweet.content)
為了獲得最佳效能,您可以執行查詢,然後使用底層資料庫游標迭代結果。Database.execute()
接受查詢物件,執行查詢,並傳回一個 DB-API 2.0 Cursor
物件。游標將傳回原始列元組。
query = Tweet.select(Tweet.content, User.username).join(User)
cursor = database.execute(query)
for (content, username) in cursor:
print(username, '->', content)
篩選記錄
您可以使用一般的 python 運算子篩選特定記錄。Peewee 支援各種 查詢運算子。
>>> user = User.get(User.username == 'Charlie')
>>> for tweet in Tweet.select().where(Tweet.user == user, Tweet.is_published == True):
... print(tweet.user.username, '->', tweet.message)
...
Charlie -> hello world
Charlie -> this is fun
>>> for tweet in Tweet.select().where(Tweet.created_date < datetime.datetime(2011, 1, 1)):
... print(tweet.message, tweet.created_date)
...
Really old tweet 2010-01-01 00:00:00
您也可以跨聯結篩選。
>>> for tweet in Tweet.select().join(User).where(User.username == 'Charlie'):
... print(tweet.message)
hello world
this is fun
look at this picture of my food
如果您想要表達複雜的查詢,請使用括號和 python 的位元運算子 or 和 and。
>>> Tweet.select().join(User).where(
... (User.username == 'Charlie') |
... (User.username == 'Peewee Herman'))
注意
請注意,Peewee 使用 **位元** 運算子 (&
和 |
),而不是邏輯運算子 (and
和 or
)。這樣做的原因是 Python 會將邏輯運算的傳回值強制轉換為布林值。這也是為什麼「IN」查詢必須使用 .in_()
而不是 in
運算子來表達的原因。
請查看 查詢運算表格,以了解可能有哪些查詢類型。
注意
查詢的 where 子句中可以放入許多有趣的東西,例如:
欄位運算式,例如
User.username == 'Charlie'
函式運算式,例如
fn.Lower(fn.Substr(User.username, 1, 1)) == 'a'
一個欄位與另一個欄位的比較,例如
Employee.salary < (Employee.tenure * 1000) + 40000
您也可以巢狀查詢,例如使用者名稱以「a」開頭的使用者的推文。
# get users whose username starts with "a"
a_users = User.select().where(fn.Lower(fn.Substr(User.username, 1, 1)) == 'a')
# the ".in_()" method signifies an "IN" query
a_user_tweets = Tweet.select().where(Tweet.user.in_(a_users))
更多查詢範例
注意
如需各種查詢範例,請參閱 查詢範例 文件,其中顯示如何實作來自 PostgreSQL Exercises 網站的查詢。
取得活動使用者
User.select().where(User.active == True)
取得是員工或超級使用者的使用者
User.select().where(
(User.is_staff == True) | (User.is_superuser == True))
取得名為「charlie」的使用者的推文
Tweet.select().join(User).where(User.username == 'charlie')
取得員工或超級使用者的推文(假設有 FK 關係)
Tweet.select().join(User).where(
(User.is_staff == True) | (User.is_superuser == True))
使用子查詢取得員工或超級使用者的推文
staff_super = User.select(User.id).where(
(User.is_staff == True) | (User.is_superuser == True))
Tweet.select().where(Tweet.user.in_(staff_super))
排序記錄
若要依順序傳回列,請使用 order_by()
方法。
>>> for t in Tweet.select().order_by(Tweet.created_date):
... print(t.pub_date)
...
2010-01-01 00:00:00
2011-06-07 14:08:48
2011-06-07 14:12:57
>>> for t in Tweet.select().order_by(Tweet.created_date.desc()):
... print(t.pub_date)
...
2011-06-07 14:12:57
2011-06-07 14:08:48
2010-01-01 00:00:00
您也可以使用 +
和 -
前置運算子來指示排序。
# The following queries are equivalent:
Tweet.select().order_by(Tweet.created_date.desc())
Tweet.select().order_by(-Tweet.created_date) # Note the "-" prefix.
# Similarly you can use "+" to indicate ascending order, though ascending
# is the default when no ordering is otherwise specified.
User.select().order_by(+User.username)
您也可以跨聯結排序。假設您想要依作者的使用者名稱排序推文,然後依 created_date 排序:
query = (Tweet
.select()
.join(User)
.order_by(User.username, Tweet.created_date.desc()))
SELECT t1."id", t1."user_id", t1."message", t1."is_published", t1."created_date"
FROM "tweet" AS t1
INNER JOIN "user" AS t2
ON t1."user_id" = t2."id"
ORDER BY t2."username", t1."created_date" DESC
在計算值上排序時,您可以包含必要的 SQL 運算式,或參照指派給該值的別名。以下是說明這些方法的兩個範例:
# Let's start with our base query. We want to get all usernames and the number of
# tweets they've made. We wish to sort this list from users with most tweets to
# users with fewest tweets.
query = (User
.select(User.username, fn.COUNT(Tweet.id).alias('num_tweets'))
.join(Tweet, JOIN.LEFT_OUTER)
.group_by(User.username))
您可以使用 select
子句中使用的相同 COUNT 運算式來排序。在以下範例中,我們按推文 ID 的 COUNT()
降序排序。
query = (User
.select(User.username, fn.COUNT(Tweet.id).alias('num_tweets'))
.join(Tweet, JOIN.LEFT_OUTER)
.group_by(User.username)
.order_by(fn.COUNT(Tweet.id).desc()))
或者,您可以參照在 select
子句中指派給計算值的別名。此方法的好處是更容易閱讀。請注意,我們不是直接參照具名別名,而是使用 SQL
輔助程式將其包裝起來。
query = (User
.select(User.username, fn.COUNT(Tweet.id).alias('num_tweets'))
.join(Tweet, JOIN.LEFT_OUTER)
.group_by(User.username)
.order_by(SQL('num_tweets').desc()))
或者,以「peewee」的方式執行操作:
ntweets = fn.COUNT(Tweet.id)
query = (User
.select(User.username, ntweets.alias('num_tweets'))
.join(Tweet, JOIN.LEFT_OUTER)
.group_by(User.username)
.order_by(ntweets.desc())
取得隨機記錄
有時您可能想要從資料庫中提取隨機記錄。您可以透過依 random 或 rand 函式排序來完成此操作(取決於您的資料庫)。
Postgresql 和 Sqlite 使用 Random 函式
# Pick 5 lucky winners:
LotteryNumber.select().order_by(fn.Random()).limit(5)
MySQL 使用 Rand
# Pick 5 lucky winners:
LotteryNumber.select().order_by(fn.Rand()).limit(5)
分頁記錄
paginate()
方法可以輕鬆地取得記錄的「頁面」。paginate()
接受兩個參數,page_number
和 items_per_page
。
注意
頁碼是以 1 為基礎的,因此第一個結果頁面將是第 1 頁。
>>> for tweet in Tweet.select().order_by(Tweet.id).paginate(2, 10):
... print(tweet.message)
...
tweet 10
tweet 11
tweet 12
tweet 13
tweet 14
tweet 15
tweet 16
tweet 17
tweet 18
tweet 19
計算記錄
您可以計算任何 select 查詢中的行數
>>> Tweet.select().count()
100
>>> Tweet.select().where(Tweet.id > 50).count()
50
Peewee 會將您的查詢包裝在執行計數的外層查詢中,這會產生類似以下的 SQL
SELECT COUNT(1) FROM ( ... your query ... );
彙總記錄
假設您有一些使用者,並且想要取得一份包含每個使用者推文計數的清單。
query = (User
.select(User, fn.Count(Tweet.id).alias('count'))
.join(Tweet, JOIN.LEFT_OUTER)
.group_by(User))
產生的查詢將會回傳 User 物件,其中包含所有正常屬性,外加一個額外屬性 count,其中會包含每個使用者的推文計數。我們使用左外部聯結來包含沒有推文的使用者。
假設您有一個標記應用程式,並且想要找到具有特定數量相關物件的標籤。在此範例中,我們將在多對多配置中使用一些不同的模型
class Photo(Model):
image = CharField()
class Tag(Model):
name = CharField()
class PhotoTag(Model):
photo = ForeignKeyField(Photo)
tag = ForeignKeyField(Tag)
現在假設我們想要找到至少有 5 張相關照片的標籤
query = (Tag
.select()
.join(PhotoTag)
.join(Photo)
.group_by(Tag)
.having(fn.Count(Photo.id) > 5))
此查詢等同於以下 SQL
SELECT t1."id", t1."name"
FROM "tag" AS t1
INNER JOIN "phototag" AS t2 ON t1."id" = t2."tag_id"
INNER JOIN "photo" AS t3 ON t2."photo_id" = t3."id"
GROUP BY t1."id", t1."name"
HAVING Count(t3."id") > 5
假設我們想要抓取相關的計數並將其儲存在標籤上
query = (Tag
.select(Tag, fn.Count(Photo.id).alias('count'))
.join(PhotoTag)
.join(Photo)
.group_by(Tag)
.having(fn.Count(Photo.id) > 5))
擷取純量值
您可以呼叫 Query.scalar()
來擷取純量值。例如
>>> PageView.select(fn.Count(fn.Distinct(PageView.url))).scalar()
100
您可以透過傳遞 as_tuple=True
來擷取多個純量值
>>> Employee.select(
... fn.Min(Employee.salary), fn.Max(Employee.salary)
... ).scalar(as_tuple=True)
(30000, 50000)
視窗函式
Window
函式指的是在 SELECT
查詢中,針對正在處理的滑動資料視窗進行操作的彙總函式。視窗函式可以實現以下功能
針對結果集的子集執行彙總。
計算執行總計。
結果排名。
將列值與前一個(或後續!)列中的值進行比較。
peewee 支援 SQL 視窗函式,可以透過呼叫 Function.over()
並傳入您的分割或排序參數來建立。
對於以下範例,我們將使用以下模型和範例資料
class Sample(Model):
counter = IntegerField()
value = FloatField()
data = [(1, 10),
(1, 20),
(2, 1),
(2, 3),
(3, 100)]
Sample.insert_many(data, fields=[Sample.counter, Sample.value]).execute()
我們的範例表格現在包含
id |
counter |
value |
---|---|---|
1 |
1 |
10.0 |
2 |
1 |
20.0 |
3 |
2 |
1.0 |
4 |
2 |
3.0 |
5 |
3 |
100.0 |
排序視窗
讓我們計算 value
欄位的執行總和。為了使其成為「執行」總和,我們需要對其進行排序,因此我們將依照 Sample 的 id
欄位進行排序
query = Sample.select(
Sample.counter,
Sample.value,
fn.SUM(Sample.value).over(order_by=[Sample.id]).alias('total'))
for sample in query:
print(sample.counter, sample.value, sample.total)
# 1 10. 10.
# 1 20. 30.
# 2 1. 31.
# 2 3. 34.
# 3 100 134.
另一個範例,我們將計算依 id
排序時,目前值與前一個值之間的差異
difference = Sample.value - fn.LAG(Sample.value, 1).over(order_by=[Sample.id])
query = Sample.select(
Sample.counter,
Sample.value,
difference.alias('diff'))
for sample in query:
print(sample.counter, sample.value, sample.diff)
# 1 10. NULL
# 1 20. 10. -- (20 - 10)
# 2 1. -19. -- (1 - 20)
# 2 3. 2. -- (3 - 1)
# 3 100 97. -- (100 - 3)
分割視窗
讓我們計算每個不同的「counter」值的平均 value
。請注意,counter
欄位有三個可能的值(1、2 和 3)。我們可以透過計算 value
欄位的 AVG()
,該視窗會根據 counter
欄位進行分割。
query = Sample.select(
Sample.counter,
Sample.value,
fn.AVG(Sample.value).over(partition_by=[Sample.counter]).alias('cavg'))
for sample in query:
print(sample.counter, sample.value, sample.cavg)
# 1 10. 15.
# 1 20. 15.
# 2 1. 2.
# 2 3. 2.
# 3 100 100.
我們可以透過指定 order_by
和 partition_by
參數在分割區內使用排序。舉例來說,讓我們依照每個不同的 counter
群組中的值來為樣本排名。
query = Sample.select(
Sample.counter,
Sample.value,
fn.RANK().over(
order_by=[Sample.value],
partition_by=[Sample.counter]).alias('rank'))
for sample in query:
print(sample.counter, sample.value, sample.rank)
# 1 10. 1
# 1 20. 2
# 2 1. 1
# 2 3. 2
# 3 100 1
有界視窗
預設情況下,視窗函式會使用視窗的無限前導起始,以及目前列作為結尾進行評估。我們可以透過在呼叫 Function.over()
時指定 start
和/或 end
來變更彙總函式所操作視窗的界限。此外,Peewee 在 Window
物件上提供了輔助方法,可產生適當的邊界參考。
Window.CURRENT_ROW
- 參考目前列的屬性。Window.preceding()
- 指定前導的列數,或省略數目以指示所有前導列。Window.following()
- 指定後續的列數,或省略數目以指示所有後續列。
為了檢查邊界如何運作,我們將計算 value
欄位的執行總計,並依照 id
進行排序,但我們只會查看目前列及其前兩列的執行總計
query = Sample.select(
Sample.counter,
Sample.value,
fn.SUM(Sample.value).over(
order_by=[Sample.id],
start=Window.preceding(2),
end=Window.CURRENT_ROW).alias('rsum'))
for sample in query:
print(sample.counter, sample.value, sample.rsum)
# 1 10. 10.
# 1 20. 30. -- (20 + 10)
# 2 1. 31. -- (1 + 20 + 10)
# 2 3. 24. -- (3 + 1 + 20)
# 3 100 104. -- (100 + 3 + 1)
注意
技術上,我們不需要指定 end=Window.CURRENT
,因為這是預設值。在範例中顯示它只是為了示範。
讓我們來看另一個範例。在此範例中,我們將計算執行總計的「相反」,其中所有值的總和會依 id
排序的樣本值遞減。為了實現此目的,我們將計算從目前列到最後一列的總和。
query = Sample.select(
Sample.counter,
Sample.value,
fn.SUM(Sample.value).over(
order_by=[Sample.id],
start=Window.CURRENT_ROW,
end=Window.following()).alias('rsum'))
# 1 10. 134. -- (10 + 20 + 1 + 3 + 100)
# 1 20. 124. -- (20 + 1 + 3 + 100)
# 2 1. 104. -- (1 + 3 + 100)
# 2 3. 103. -- (3 + 100)
# 3 100 100. -- (100)
篩選的彙總
彙總函式也可能支援篩選函式(Postgres 和 Sqlite 3.25+),這些函式會轉換為 FILTER (WHERE...)
子句。篩選運算式會透過 Function.filter()
方法新增至彙總函式。
舉例來說,我們將計算 value
欄位相對於 id
的執行總和,但我們將篩除任何 counter=2
的樣本。
query = Sample.select(
Sample.counter,
Sample.value,
fn.SUM(Sample.value).filter(Sample.counter != 2).over(
order_by=[Sample.id]).alias('csum'))
for sample in query:
print(sample.counter, sample.value, sample.csum)
# 1 10. 10.
# 1 20. 30.
# 2 1. 30.
# 2 3. 30.
# 3 100 130.
重複使用視窗定義
如果您打算將相同的視窗定義用於多個彙總,則可以建立 Window
物件。Window
物件會採用與 Function.over()
相同的參數,並且可以傳遞至 over()
方法,以取代個別參數。
在這裡,我們將宣告一個依照樣本 id
排序的單一視窗,並使用該視窗定義呼叫數個視窗函式
win = Window(order_by=[Sample.id])
query = Sample.select(
Sample.counter,
Sample.value,
fn.LEAD(Sample.value).over(win),
fn.LAG(Sample.value).over(win),
fn.SUM(Sample.value).over(win)
).window(win) # Include our window definition in query.
for row in query.tuples():
print(row)
# counter value lead() lag() sum()
# 1 10. 20. NULL 10.
# 1 20. 1. 10. 30.
# 2 1. 3. 20. 31.
# 2 3. 100. 1. 34.
# 3 100. NULL 3. 134.
多個視窗定義
在先前的範例中,我們看到了如何宣告 Window
定義,並將其重複用於多個不同的彙總。您可以在查詢中包含所需的任意多個視窗定義,但必須確保每個視窗都有唯一的別名
w1 = Window(order_by=[Sample.id]).alias('w1')
w2 = Window(partition_by=[Sample.counter]).alias('w2')
query = Sample.select(
Sample.counter,
Sample.value,
fn.SUM(Sample.value).over(w1).alias('rsum'), # Running total.
fn.AVG(Sample.value).over(w2).alias('cavg') # Avg per category.
).window(w1, w2) # Include our window definitions.
for sample in query:
print(sample.counter, sample.value, sample.rsum, sample.cavg)
# counter value rsum cavg
# 1 10. 10. 15.
# 1 20. 30. 15.
# 2 1. 31. 2.
# 2 3. 34. 2.
# 3 100 134. 100.
同樣地,如果您有多個共用類似定義的視窗定義,則可以擴充先前定義的視窗定義。例如,在這裡,我們將依計數器值分割資料集,因此我們將針對計數器執行彙總。然後,我們將定義第二個視窗來擴充此分割,並新增排序子句
w1 = Window(partition_by=[Sample.counter]).alias('w1')
# By extending w1, this window definition will also be partitioned
# by "counter".
w2 = Window(extends=w1, order_by=[Sample.value.desc()]).alias('w2')
query = (Sample
.select(Sample.counter, Sample.value,
fn.SUM(Sample.value).over(w1).alias('group_sum'),
fn.RANK().over(w2).alias('revrank'))
.window(w1, w2)
.order_by(Sample.id))
for sample in query:
print(sample.counter, sample.value, sample.group_sum, sample.revrank)
# counter value group_sum revrank
# 1 10. 30. 2
# 1 20. 30. 1
# 2 1. 4. 2
# 2 3. 4. 1
# 3 100. 100. 1
框架類型:範圍 vs 列 vs 群組
根據框架類型,資料庫將以不同的方式處理排序的群組。讓我們建立另外兩個 Sample
列,以視覺化差異
>>> Sample.create(counter=1, value=20.)
<Sample 6>
>>> Sample.create(counter=2, value=1.)
<Sample 7>
我們的表格現在包含
id |
counter |
value |
---|---|---|
1 |
1 |
10.0 |
2 |
1 |
20.0 |
3 |
2 |
1.0 |
4 |
2 |
3.0 |
5 |
3 |
100.0 |
6 |
1 |
20.0 |
7 |
2 |
1.0 |
讓我們透過計算樣本的「執行總和」來檢查差異,並依照 counter
和 value
欄位進行排序。若要指定框架類型,我們可以使用
當有邏輯重複項時,RANGE
的行為可能會導致非預期的結果
query = Sample.select(
Sample.counter,
Sample.value,
fn.SUM(Sample.value).over(
order_by=[Sample.counter, Sample.value],
frame_type=Window.RANGE).alias('rsum'))
for sample in query.order_by(Sample.counter, Sample.value):
print(sample.counter, sample.value, sample.rsum)
# counter value rsum
# 1 10. 10.
# 1 20. 50.
# 1 20. 50.
# 2 1. 52.
# 2 1. 52.
# 2 3. 55.
# 3 100 155.
包含新列後,我們現在有一些列具有重複的 category
和 value
值。RANGE
框架類型會導致這些重複項一起評估,而不是個別評估。
更預期的結果可以透過使用 ROWS
作為框架類型來實現
query = Sample.select(
Sample.counter,
Sample.value,
fn.SUM(Sample.value).over(
order_by=[Sample.counter, Sample.value],
frame_type=Window.ROWS).alias('rsum'))
for sample in query.order_by(Sample.counter, Sample.value):
print(sample.counter, sample.value, sample.rsum)
# counter value rsum
# 1 10. 10.
# 1 20. 30.
# 1 20. 50.
# 2 1. 51.
# 2 1. 52.
# 2 3. 55.
# 3 100 155.
Peewee 使用這些規則來判斷要使用的框架類型
如果使用者指定了
frame_type
,則會使用該框架類型。如果指定了
start
和/或end
邊界,則 Peewee 預設會使用ROWS
。如果使用者未指定框架類型或起始/結束邊界,Peewee 將使用資料庫預設值,即
RANGE
。
Window.GROUPS
框架類型會根據排序條件,以行的群組來看待視窗範圍的規範。使用 GROUPS
,我們可以定義框架,使其涵蓋不同的行群組。讓我們來看一個範例
query = (Sample
.select(Sample.counter, Sample.value,
fn.SUM(Sample.value).over(
order_by=[Sample.counter, Sample.value],
frame_type=Window.GROUPS,
start=Window.preceding(1)).alias('gsum'))
.order_by(Sample.counter, Sample.value))
for sample in query:
print(sample.counter, sample.value, sample.gsum)
# counter value gsum
# 1 10 10
# 1 20 50
# 1 20 50 (10) + (20+0)
# 2 1 42
# 2 1 42 (20+20) + (1+1)
# 2 3 5 (1+1) + 3
# 3 100 103 (3) + 100
您應該可以推斷出,視窗是根據其排序條件 (counter, value)
來分組的。我們正在查看一個視窗,其範圍介於前一個群組和目前的群組之間。
檢索列元組/字典/具名元組
有時您不需要建立模型實例的額外開銷,而只想在不需要 Model
提供的所有 API 的情況下,遍歷列資料。若要執行此操作,請使用
objects()
– 接受任意建構函式,該函式會使用列元組呼叫。
stats = (Stat
.select(Stat.url, fn.Count(Stat.url))
.group_by(Stat.url)
.tuples())
# iterate over a list of 2-tuples containing the url and count
for stat_url, stat_count in stats:
print(stat_url, stat_count)
同樣地,您可以使用 dicts()
從游標傳回列作為字典。
stats = (Stat
.select(Stat.url, fn.Count(Stat.url).alias('ct'))
.group_by(Stat.url)
.dicts())
# iterate over a list of 2-tuples containing the url and count
for stat in stats:
print(stat['url'], stat['ct'])
傳回子句
PostgresqlDatabase
支援 UPDATE
、INSERT
和 DELETE
查詢上的 RETURNING
子句。指定 RETURNING
子句可讓您遍歷查詢存取的列。
依預設,執行不同查詢時的傳回值為
INSERT
- 新插入列的自動遞增主索引鍵值。當未使用自動遞增主索引鍵時,Postgres 會傳回新列的主索引鍵,但 SQLite 和 MySQL 不會。UPDATE
- 修改的列數DELETE
- 刪除的列數
當使用傳回子句時,執行查詢後的傳回值將為可迭代的游標物件。
Postgresql 允許透過 RETURNING
子句,從查詢插入或修改的列傳回資料。
例如,假設您有一個 Update
,會停用所有註冊已過期的使用者帳戶。停用這些帳戶後,您想向每位使用者發送一封電子郵件,告知他們其帳戶已停用。您可以透過單一具有 RETURNING
子句的 UPDATE
查詢來完成此操作,而不是編寫兩個查詢,一個 SELECT
和一個 UPDATE
。
query = (User
.update(is_active=False)
.where(User.registration_expired == True)
.returning(User))
# Send an email to every user that was deactivated.
for deactivate_user in query.execute():
send_deactivation_email(deactivated_user.email)
RETURNING
子句也可用於 Insert
和 Delete
。當與 INSERT
一起使用時,將會傳回新建立的列。當與 DELETE
一起使用時,將會傳回已刪除的列。
RETURNING
子句的唯一限制是,它只能包含查詢的 FROM
子句中列出的表格中的資料行。若要選取特定表格的所有資料行,您可以直接傳入 Model
類別。
作為另一個範例,讓我們新增一位使用者,並將其建立日期設定為伺服器產生的目前時間戳記。我們將在單一查詢中建立並檢索新使用者的 ID、電子郵件和建立時間戳記
query = (User
.insert(email='foo@bar.com', created=fn.now())
.returning(User)) # Shorthand for all columns on User.
# When using RETURNING, execute() returns a cursor.
cursor = query.execute()
# Get the user object we just inserted and log the data:
user = cursor[0]
logger.info('Created user %s (id=%s) at %s', user.email, user.id, user.created)
依預設,游標將傳回 Model
實例,但您可以指定不同的列類型
data = [{'name': 'charlie'}, {'name': 'huey'}, {'name': 'mickey'}]
query = (User
.insert_many(data)
.returning(User.id, User.username)
.dicts())
for new_user in query.execute():
print('Added user "%s", id=%s' % (new_user['username'], new_user['id']))
通用表格表達式
Peewee 支援在所有類型的查詢中包含通用表格表達式 (CTE)。CTE 可能適用於
分解出共用子查詢。
依 CTE 結果集中衍生的資料行進行分組或篩選。
編寫遞迴查詢。
若要宣告要作為 CTE 使用的 Select
查詢,請使用 cte()
方法,該方法會將查詢包裝在 CTE
物件中。若要指示應該將 CTE
作為查詢的一部分包含進來,請使用 Query.with_cte()
方法,並傳遞 CTE 物件的清單。
簡單範例
舉例來說,假設我們有一些資料點,其中包含索引鍵和浮點數值。讓我們定義我們的模型並填入一些測試資料
class Sample(Model):
key = TextField()
value = FloatField()
data = (
('a', (1.25, 1.5, 1.75)),
('b', (2.1, 2.3, 2.5, 2.7, 2.9)),
('c', (3.5, 3.5)))
# Populate data.
for key, values in data:
Sample.insert_many([(key, value) for value in values],
fields=[Sample.key, Sample.value]).execute()
讓我們使用 CTE 來計算每個不同的索引鍵,哪些數值高於該索引鍵的平均值。
# First we'll declare the query that will be used as a CTE. This query
# simply determines the average value for each key.
cte = (Sample
.select(Sample.key, fn.AVG(Sample.value).alias('avg_value'))
.group_by(Sample.key)
.cte('key_avgs', columns=('key', 'avg_value')))
# Now we'll query the sample table, using our CTE to find rows whose value
# exceeds the average for the given key. We'll calculate how far above the
# average the given sample's value is, as well.
query = (Sample
.select(Sample.key, Sample.value)
.join(cte, on=(Sample.key == cte.c.key))
.where(Sample.value > cte.c.avg_value)
.order_by(Sample.value)
.with_cte(cte))
我們可以遍歷查詢傳回的樣本,以查看哪些樣本在其給定群組中具有高於平均值的數值
>>> for sample in query:
... print(sample.key, sample.value)
# 'a', 1.75
# 'b', 2.7
# 'b', 2.9
複雜範例
若要獲得更完整的範例,讓我們考慮以下查詢,該查詢使用多個 CTE 來尋找僅在頂級銷售區域中的每產品銷售總額。我們的模型如下所示
class Order(Model):
region = TextField()
amount = FloatField()
product = TextField()
quantity = IntegerField()
以下是如何在 SQL 中撰寫查詢。此範例可在 postgresql 文件中找到。
WITH regional_sales AS (
SELECT region, SUM(amount) AS total_sales
FROM orders
GROUP BY region
), top_regions AS (
SELECT region
FROM regional_sales
WHERE total_sales > (SELECT SUM(total_sales) / 10 FROM regional_sales)
)
SELECT region,
product,
SUM(quantity) AS product_units,
SUM(amount) AS product_sales
FROM orders
WHERE region IN (SELECT region FROM top_regions)
GROUP BY region, product;
使用 Peewee,我們會寫成
reg_sales = (Order
.select(Order.region,
fn.SUM(Order.amount).alias('total_sales'))
.group_by(Order.region)
.cte('regional_sales'))
top_regions = (reg_sales
.select(reg_sales.c.region)
.where(reg_sales.c.total_sales > (
reg_sales.select(fn.SUM(reg_sales.c.total_sales) / 10)))
.cte('top_regions'))
query = (Order
.select(Order.region,
Order.product,
fn.SUM(Order.quantity).alias('product_units'),
fn.SUM(Order.amount).alias('product_sales'))
.where(Order.region.in_(top_regions.select(top_regions.c.region)))
.group_by(Order.region, Order.product)
.with_cte(reg_sales, top_regions))
遞迴 CTE
Peewee 支援遞迴 CTE。當您有一個由父系連結外鍵表示的樹狀資料結構時,遞迴 CTE 很有用。例如,假設我們有一個線上書店的類別階層。我們希望產生一個表格,其中顯示所有類別及其絕對深度,以及從根到類別的路徑。
我們假設以下模型定義,其中每個類別都有一個指向其直接父類別的外鍵
class Category(Model):
name = TextField()
parent = ForeignKeyField('self', backref='children', null=True)
若要列出所有類別及其深度和父系,我們可以使用遞迴 CTE
# Define the base case of our recursive CTE. This will be categories that
# have a null parent foreign-key.
Base = Category.alias()
level = Value(1).alias('level')
path = Base.name.alias('path')
base_case = (Base
.select(Base.id, Base.name, Base.parent, level, path)
.where(Base.parent.is_null())
.cte('base', recursive=True))
# Define the recursive terms.
RTerm = Category.alias()
rlevel = (base_case.c.level + 1).alias('level')
rpath = base_case.c.path.concat('->').concat(RTerm.name).alias('path')
recursive = (RTerm
.select(RTerm.id, RTerm.name, RTerm.parent, rlevel, rpath)
.join(base_case, on=(RTerm.parent == base_case.c.id)))
# The recursive CTE is created by taking the base case and UNION ALL with
# the recursive term.
cte = base_case.union_all(recursive)
# We will now query from the CTE to get the categories, their levels, and
# their paths.
query = (cte
.select_from(cte.c.name, cte.c.level, cte.c.path)
.order_by(cte.c.path))
# We can now iterate over a list of all categories and print their names,
# absolute levels, and path from root -> category.
for category in query:
print(category.name, category.level, category.path)
# Example output:
# root, 1, root
# p1, 2, root->p1
# c1-1, 3, root->p1->c1-1
# c1-2, 3, root->p1->c1-2
# p2, 2, root->p2
# c2-1, 3, root->p2->c2-1
資料修改 CTE
Peewee 支援資料修改 CTE。
使用資料修改 CTE 將資料從一個表格移動到封存表格的範例,使用單一查詢
class Event(Model):
name = CharField()
timestamp = DateTimeField()
class Archive(Model):
name = CharField()
timestamp = DateTimeField()
# Move rows older than 24 hours from the Event table to the Archive.
cte = (Event
.delete()
.where(Event.timestamp < (datetime.now() - timedelta(days=1)))
.returning(Event)
.cte('moved_rows'))
# Create a simple SELECT to get the resulting rows from the CTE.
src = Select((cte,), (cte.c.id, cte.c.name, cte.c.timestamp))
# Insert into the archive table whatever data was returned by the DELETE.
res = (Archive
.insert_from(src, (Archive.id, Archive.name, Archive.timestamp))
.with_cte(cte)
.execute())
以上對應到大約以下的 SQL
WITH "moved_rows" AS (
DELETE FROM "event" WHERE ("timestamp" < XXXX-XX-XXTXX:XX:XX)
RETURNING "id", "name", "timestamp")
INSERT INTO "archive" ("id", "name", "timestamp")
SELECT "moved_rows"."id", "moved_rows"."name", "moved_rows"."timestamp"
FROM "moved_rows";
如需其他範例,請參閱 models.py
和 sql.py
中的測試
外鍵和聯結
本節已移至其自己的文件中:關聯性和聯結。