查詢建構器
Peewee 的高階 Model
和 Field
API 是建立在較低階的 Table
和 Column
對應物之上。雖然這些較低階的 API 沒有像高階 API 那樣詳細的文件說明,但本文將提供一個概述,並提供一些範例,希望可以讓您進行實驗。
我們將使用以下綱要
CREATE TABLE "person" (
"id" INTEGER NOT NULL PRIMARY KEY,
"first" TEXT NOT NULL,
"last" TEXT NOT NULL);
CREATE TABLE "note" (
"id" INTEGER NOT NULL PRIMARY KEY,
"person_id" INTEGER NOT NULL,
"content" TEXT NOT NULL,
"timestamp" DATETIME NOT NULL,
FOREIGN KEY ("person_id") REFERENCES "person" ("id"));
CREATE TABLE "reminder" (
"id" INTEGER NOT NULL PRIMARY KEY,
"note_id" INTEGER NOT NULL,
"alarm" DATETIME NOT NULL,
FOREIGN KEY ("note_id") REFERENCES "note" ("id"));
宣告資料表
我們可以使用兩種方式宣告 Table
物件來操作這些資料表
# Explicitly declare columns
Person = Table('person', ('id', 'first', 'last'))
Note = Table('note', ('id', 'person_id', 'content', 'timestamp'))
# Do not declare columns, they will be accessed using magic ".c" attribute
Reminder = Table('reminder')
通常我們會希望將資料表 bind()
到資料庫。這可以省去我們每次想對資料表執行查詢時都必須明確傳遞資料庫的麻煩
db = SqliteDatabase('my_app.db')
Person = Person.bind(db)
Note = Note.bind(db)
Reminder = Reminder.bind(db)
Select 查詢
若要選取前三筆筆記並列印其內容,我們可以寫
query = Note.select().order_by(Note.timestamp).limit(3)
for note_dict in query:
print(note_dict['content'])
注意
預設情況下,資料列會以字典的形式傳回。如果您希望,可以使用 tuples()
、namedtuples()
或 objects()
方法來指定不同的資料列資料容器。
由於我們沒有指定任何欄位,因此會選取我們在筆記的 Table
建構函式中定義的所有欄位。這對 Reminder 無效,因為我們根本沒有指定任何欄位。
若要選取 2018 年發布的所有筆記,以及建立者的名稱,我們將使用 join()
。我們還會要求資料列以 *namedtuple* 物件傳回
query = (Note
.select(Note.content, Note.timestamp, Person.first, Person.last)
.join(Person, on=(Note.person_id == Person.id))
.where(Note.timestamp >= datetime.date(2018, 1, 1))
.order_by(Note.timestamp)
.namedtuples())
for row in query:
print(row.timestamp, '-', row.content, '-', row.first, row.last)
讓我們查詢最多產的人,也就是,取得建立最多筆記的人。這會引入呼叫 SQL 函數 (COUNT),這是使用 fn
物件完成的
name = Person.first.concat(' ').concat(Person.last)
query = (Person
.select(name.alias('name'), fn.COUNT(Note.id).alias('count'))
.join(Note, JOIN.LEFT_OUTER, on=(Note.person_id == Person.id))
.group_by(name)
.order_by(fn.COUNT(Note.id).desc()))
for row in query:
print(row['name'], row['count'])
在上述查詢中有幾件事需要注意
我們將一個表達式儲存在一個變數 (
name
) 中,然後在查詢中使用它。我們使用
fn.<function>(...)
呼叫 SQL 函數,並像傳遞正常的 Python 函數一樣傳遞引數。alias()
方法用於指定欄位或計算的名稱。
作為一個更複雜的範例,我們將產生所有人員的列表,以及他們最近發布的筆記的內容和時間戳記。若要執行此操作,我們將在同一個查詢中以不同的情境使用 Note 資料表兩次,這將需要我們使用資料表別名。
# Start with the query that calculates the timestamp of the most recent
# note for each person.
NA = Note.alias('na')
max_note = (NA
.select(NA.person_id, fn.MAX(NA.timestamp).alias('max_ts'))
.group_by(NA.person_id)
.alias('max_note'))
# Now we'll select from the note table, joining on both the subquery and
# on the person table to construct the result set.
query = (Note
.select(Note.content, Note.timestamp, Person.first, Person.last)
.join(max_note, on=((max_note.c.person_id == Note.person_id) &
(max_note.c.max_ts == Note.timestamp)))
.join(Person, on=(Note.person_id == Person.id))
.order_by(Person.first, Person.last))
for row in query.namedtuples():
print(row.first, row.last, ':', row.timestamp, '-', row.content)
在 *max_note* 子查詢的聯結述詞中,我們可以使用神奇的「.c」屬性來參考子查詢中的欄位。因此,*max_note.c.max_ts* 會轉譯為「來自 max_note 子查詢的 max_ts 欄位值」。
我們也可以使用「.c」神奇屬性來存取未明確定義其欄位的資料表中的欄位,就像我們對 Reminder 資料表所做的那樣。以下是一個簡單的查詢,用於取得今天的所有提醒,以及其相關的筆記內容
today = datetime.date.today()
tomorrow = today + datetime.timedelta(days=1)
query = (Reminder
.select(Reminder.c.alarm, Note.content)
.join(Note, on=(Reminder.c.note_id == Note.id))
.where(Reminder.c.alarm.between(today, tomorrow))
.order_by(Reminder.c.alarm))
for row in query:
print(row['alarm'], row['content'])
注意
為了避免混淆,「.c」屬性不會在明確定義其欄位的資料表上運作。
Insert 查詢
插入資料很簡單。我們可以使用兩種不同的方式指定要 insert()
的資料 (在這兩種情況下,都會傳回新資料列的 ID)
# Using keyword arguments:
zaizee_id = Person.insert(first='zaizee', last='cat').execute()
# Using column: value mappings:
Note.insert({
Note.person_id: zaizee_id,
Note.content: 'meeeeowwww',
Note.timestamp: datetime.datetime.now()}).execute()
批量插入資料很容易,只需傳入
字典列表 (所有字典都必須具有相同的鍵/欄位)。
如果明確指定了欄位,則為元組列表。
範例
people = [
{'first': 'Bob', 'last': 'Foo'},
{'first': 'Herb', 'last': 'Bar'},
{'first': 'Nuggie', 'last': 'Bar'}]
# Inserting multiple rows returns the ID of the last-inserted row.
last_id = Person.insert(people).execute()
# We can also specify row tuples, so long as we tell Peewee which
# columns the tuple values correspond to:
people = [
('Bob', 'Foo'),
('Herb', 'Bar'),
('Nuggie', 'Bar')]
Person.insert(people, columns=[Person.first, Person.last]).execute()
Update 查詢
update()
查詢接受關鍵字引數或將欄位對應到值的字典,就像 insert()
一樣。
範例
# "Bob" changed his last name from "Foo" to "Baze".
nrows = (Person
.update(last='Baze')
.where((Person.first == 'Bob') &
(Person.last == 'Foo'))
.execute())
# Use dictionary mapping column to value.
nrows = (Person
.update({Person.last: 'Baze'})
.where((Person.first == 'Bob') &
(Person.last == 'Foo'))
.execute())
您也可以使用表達式作為值來執行原子更新。假設我們有一個 *PageView* 資料表,並且我們需要對某些 URL 以原子方式遞增頁面檢視計數
# Do an atomic update:
(PageView
.update({PageView.count: PageView.count + 1})
.where(PageView.url == some_url)
.execute())
Delete 查詢
delete()
查詢是最簡單的,因為它們不接受任何引數
# Delete all notes created before 2018, returning number deleted.
n = Note.delete().where(Note.timestamp < datetime.date(2018, 1, 1)).execute()
由於 DELETE (和 UPDATE) 查詢不支援聯結,因此我們可以使用子查詢來根據相關資料表中的值刪除資料列。例如,以下是如何刪除所有姓氏為「Foo」的人的所有筆記
# Get the id of all people whose last name is "Foo".
foo_people = Person.select(Person.id).where(Person.last == 'Foo')
# Delete all notes by any person whose ID is in the previous query.
Note.delete().where(Note.person_id.in_(foo_people)).execute()
查詢物件
Peewee 2.x 提供的抽象化的基本限制之一,是缺少一個表示與給定模型類別無關的結構化查詢的類別。
一個範例可能是在子查詢上計算彙總值。例如,count()
方法 (傳回任意查詢中的資料列數) 是透過包裝查詢來實作的
SELECT COUNT(1) FROM (...)
若要使用 Peewee 完成此操作,實作會以這種方式撰寫
def count(query):
# Select([source1, ... sourcen], [column1, ...columnn])
wrapped = Select(from_list=[query], columns=[fn.COUNT(SQL('1'))])
curs = wrapped.tuples().execute(db)
return curs[0][0] # Return first column from first row of result.
我們實際上可以使用 scalar()
方法更簡潔地表達這一點,該方法適用於從彙總查詢傳回值
def count(query):
wrapped = Select(from_list=[query], columns=[fn.COUNT(SQL('1'))])
return wrapped.scalar(db)
查詢範例 文件中有一個更複雜的範例,在該範例中,我們編寫了一個查詢,用於找出預訂可用插槽數量最多的設施
我們想要表達的 SQL 是
SELECT facid, total FROM (
SELECT facid, SUM(slots) AS total,
rank() OVER (order by SUM(slots) DESC) AS rank
FROM bookings
GROUP BY facid
) AS ranked
WHERE rank = 1
我們可以透過使用普通的 Select
作為外部查詢,來相當優雅地表達此查詢
# Store rank expression in variable for readability.
rank_expr = fn.rank().over(order_by=[fn.SUM(Booking.slots).desc()])
subq = (Booking
.select(Booking.facility, fn.SUM(Booking.slots).alias('total'),
rank_expr.alias('rank'))
.group_by(Booking.facility))
# Use a plain "Select" to create outer query.
query = (Select(columns=[subq.c.facid, subq.c.total])
.from_(subq)
.where(subq.c.rank == 1)
.tuples())
# Iterate over the resulting facility ID(s) and total(s):
for facid, total in query.execute(db):
print(facid, total)
對於另一個範例,讓我們建立一個遞迴通用資料表表達式,來計算前 10 個費波那契數
base = Select(columns=(
Value(1).alias('n'),
Value(0).alias('fib_n'),
Value(1).alias('next_fib_n'))).cte('fibonacci', recursive=True)
n = (base.c.n + 1).alias('n')
recursive_term = Select(columns=(
n,
base.c.next_fib_n,
base.c.fib_n + base.c.next_fib_n)).from_(base).where(n < 10)
fibonacci = base.union_all(recursive_term)
query = fibonacci.select_from(fibonacci.c.n, fibonacci.c.fib_n)
results = list(query.execute(db))
# Generates the following result list:
[{'fib_n': 0, 'n': 1},
{'fib_n': 1, 'n': 2},
{'fib_n': 1, 'n': 3},
{'fib_n': 2, 'n': 4},
{'fib_n': 3, 'n': 5},
{'fib_n': 5, 'n': 6},
{'fib_n': 8, 'n': 7},
{'fib_n': 13, 'n': 8},
{'fib_n': 21, 'n': 9},
{'fib_n': 34, 'n': 10}]
更多
有關用於描述 SQL AST 的各種類別的說明,請參閱 查詢建構器 API 文件。
如果您有興趣了解更多資訊,也可以查看 專案原始碼。