Skip to content

Commit 4b7e9fb

Browse files
async examples
1 parent 8add69f commit 4b7e9fb

12 files changed

+805
-6
lines changed

examples/alias_migration.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
will have index set to the concrete index whereas the class refers to the
3636
alias.
3737
"""
38+
import os
3839
from datetime import datetime
3940
from fnmatch import fnmatch
4041

@@ -126,7 +127,7 @@ def migrate(move_data=True, update_alias=True):
126127

127128
if __name__ == "__main__":
128129
# initiate the default connection to elasticsearch
129-
connections.create_connection()
130+
connections.create_connection(hosts=[os.environ["ELASTICSEARCH_URL"]])
130131

131132
# create the empty index
132133
setup()

examples/async/alias_migration.py

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
# Licensed to Elasticsearch B.V. under one or more contributor
2+
# license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright
4+
# ownership. Elasticsearch B.V. licenses this file to you under
5+
# the Apache License, Version 2.0 (the "License"); you may
6+
# not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
"""
19+
Simple example with a single Document demonstrating how schema can be managed,
20+
including upgrading with reindexing.
21+
22+
Key concepts:
23+
24+
* setup() function to first initialize the schema (as index template) in
25+
elasticsearch. Can be called any time (recommended with every deploy of
26+
your app).
27+
28+
* migrate() function to be called any time when the schema changes - it
29+
will create a new index (by incrementing the version) and update the alias.
30+
By default it will also (before flipping the alias) move the data from the
31+
previous index to the new one.
32+
33+
* BlogPost._matches() class method is required for this code to work since
34+
otherwise BlogPost will not be used to deserialize the documents as those
35+
will have index set to the concrete index whereas the class refers to the
36+
alias.
37+
"""
38+
import asyncio
39+
from datetime import datetime
40+
from fnmatch import fnmatch
41+
42+
from elasticsearch_dsl import AsyncDocument, Date, Keyword, Text, async_connections
43+
44+
ALIAS = "test-blog"
45+
PATTERN = ALIAS + "-*"
46+
47+
48+
class BlogPost(AsyncDocument):
49+
title = Text()
50+
published = Date()
51+
tags = Keyword(multi=True)
52+
content = Text()
53+
54+
def is_published(self):
55+
return self.published and datetime.now() > self.published
56+
57+
@classmethod
58+
def _matches(cls, hit):
59+
# override _matches to match indices in a pattern instead of just ALIAS
60+
# hit is the raw dict as returned by elasticsearch
61+
return fnmatch(hit["_index"], PATTERN)
62+
63+
class Index:
64+
# we will use an alias instead of the index
65+
name = ALIAS
66+
# set settings and possibly other attributes of the index like
67+
# analyzers
68+
settings = {"number_of_shards": 1, "number_of_replicas": 0}
69+
70+
71+
async def setup():
72+
"""
73+
Create the index template in elasticsearch specifying the mappings and any
74+
settings to be used. This can be run at any time, ideally at every new code
75+
deploy.
76+
"""
77+
# create an index template
78+
index_template = BlogPost._index.as_template(ALIAS, PATTERN)
79+
# upload the template into elasticsearch
80+
# potentially overriding the one already there
81+
await index_template.save()
82+
83+
# create the first index if it doesn't exist
84+
if not await BlogPost._index.exists():
85+
await migrate(move_data=False)
86+
87+
88+
async def migrate(move_data=True, update_alias=True):
89+
"""
90+
Upgrade function that creates a new index for the data. Optionally it also can
91+
(and by default will) reindex previous copy of the data into the new index
92+
(specify ``move_data=False`` to skip this step) and update the alias to
93+
point to the latest index (set ``update_alias=False`` to skip).
94+
95+
Note that while this function is running the application can still perform
96+
any and all searches without any loss of functionality. It should, however,
97+
not perform any writes at this time as those might be lost.
98+
"""
99+
# construct a new index name by appending current timestamp
100+
next_index = PATTERN.replace("*", datetime.now().strftime("%Y%m%d%H%M%S%f"))
101+
102+
# get the low level connection
103+
es = async_connections.get_connection()
104+
105+
# create new index, it will use the settings from the template
106+
await es.indices.create(index=next_index)
107+
108+
if move_data:
109+
# move data from current alias to the new index
110+
await es.options(request_timeout=3600).reindex(
111+
body={"source": {"index": ALIAS}, "dest": {"index": next_index}}
112+
)
113+
# refresh the index to make the changes visible
114+
await es.indices.refresh(index=next_index)
115+
116+
if update_alias:
117+
# repoint the alias to point to the newly created index
118+
await es.indices.update_aliases(
119+
body={
120+
"actions": [
121+
{"remove": {"alias": ALIAS, "index": PATTERN}},
122+
{"add": {"alias": ALIAS, "index": next_index}},
123+
]
124+
}
125+
)
126+
127+
128+
async def main():
129+
# initiate the default connection to elasticsearch
130+
async_connections.create_connection(hosts=["http://localhost:9200"])
131+
132+
# create the empty index
133+
await setup()
134+
135+
# create a new document
136+
bp = BlogPost(
137+
_id=0,
138+
title="Hello World!",
139+
tags=["testing", "dummy"],
140+
content=open(__file__).read(),
141+
)
142+
await bp.save(refresh=True)
143+
144+
# create new index
145+
await migrate()
146+
147+
# close the connection
148+
await async_connections.get_connection().close()
149+
150+
151+
if __name__ == "__main__":
152+
asyncio.run(main())

examples/async/completion.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
# Licensed to Elasticsearch B.V. under one or more contributor
2+
# license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright
4+
# ownership. Elasticsearch B.V. licenses this file to you under
5+
# the Apache License, Version 2.0 (the "License"); you may
6+
# not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
"""
19+
Example ``Document`` with completion suggester.
20+
21+
In the ``Person`` class we index the person's name to allow auto completing in
22+
any order ("first last", "middle last first", ...). For the weight we use a
23+
value from the ``popularity`` field which is a long.
24+
25+
To make the suggestions work in different languages we added a custom analyzer
26+
that does ascii folding.
27+
"""
28+
29+
import asyncio
30+
import os
31+
from itertools import permutations
32+
33+
from elasticsearch_dsl import (
34+
AsyncDocument,
35+
Completion,
36+
Keyword,
37+
Long,
38+
Text,
39+
analyzer,
40+
async_connections,
41+
token_filter,
42+
)
43+
44+
# custom analyzer for names
45+
ascii_fold = analyzer(
46+
"ascii_fold",
47+
# we don't want to split O'Brian or Toulouse-Lautrec
48+
tokenizer="whitespace",
49+
filter=["lowercase", token_filter("ascii_fold", "asciifolding")],
50+
)
51+
52+
53+
class Person(AsyncDocument):
54+
name = Text(fields={"keyword": Keyword()})
55+
popularity = Long()
56+
57+
# copletion field with a custom analyzer
58+
suggest = Completion(analyzer=ascii_fold)
59+
60+
def clean(self):
61+
"""
62+
Automatically construct the suggestion input and weight by taking all
63+
possible permutation of Person's name as ``input`` and taking their
64+
popularity as ``weight``.
65+
"""
66+
self.suggest = {
67+
"input": [" ".join(p) for p in permutations(self.name.split())],
68+
"weight": self.popularity,
69+
}
70+
71+
class Index:
72+
name = "test-suggest"
73+
settings = {"number_of_shards": 1, "number_of_replicas": 0}
74+
75+
76+
async def main():
77+
# initiate the default connection to elasticsearch
78+
async_connections.create_connection(hosts=[os.environ["ELASTICSEARCH_URL"]])
79+
80+
# create the empty index
81+
await Person.init()
82+
83+
# index some sample data
84+
for id, (name, popularity) in enumerate(
85+
[("Henri de Toulouse-Lautrec", 42), ("Jára Cimrman", 124)]
86+
):
87+
await Person(_id=id, name=name, popularity=popularity).save()
88+
89+
# refresh index manually to make changes live
90+
await Person._index.refresh()
91+
92+
# run some suggestions
93+
for text in ("já", "Jara Cimr", "tou", "de hen"):
94+
s = Person.search()
95+
s = s.suggest("auto_complete", text, completion={"field": "suggest"})
96+
response = await s.execute()
97+
98+
# print out all the options we got
99+
for option in response.suggest.auto_complete[0].options:
100+
print("%10s: %25s (%d)" % (text, option._source.name, option._score))
101+
102+
# close the connection
103+
await async_connections.get_connection().close()
104+
105+
106+
if __name__ == "__main__":
107+
asyncio.run(main())

examples/async/composite_agg.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# Licensed to Elasticsearch B.V. under one or more contributor
2+
# license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright
4+
# ownership. Elasticsearch B.V. licenses this file to you under
5+
# the Apache License, Version 2.0 (the "License"); you may
6+
# not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
import asyncio
19+
import os
20+
21+
from elasticsearch_dsl import A, AsyncSearch, async_connections
22+
23+
24+
async def scan_aggs(search, source_aggs, inner_aggs={}, size=10):
25+
"""
26+
Helper function used to iterate over all possible bucket combinations of
27+
``source_aggs``, returning results of ``inner_aggs`` for each. Uses the
28+
``composite`` aggregation under the hood to perform this.
29+
"""
30+
31+
async def run_search(**kwargs):
32+
s = search[:0]
33+
s.aggs.bucket("comp", "composite", sources=source_aggs, size=size, **kwargs)
34+
for agg_name, agg in inner_aggs.items():
35+
s.aggs["comp"][agg_name] = agg
36+
return await s.execute()
37+
38+
response = await run_search()
39+
while response.aggregations.comp.buckets:
40+
for b in response.aggregations.comp.buckets:
41+
yield b
42+
if "after_key" in response.aggregations.comp:
43+
after = response.aggregations.comp.after_key
44+
else:
45+
after = response.aggregations.comp.buckets[-1].key
46+
response = await run_search(after=after)
47+
48+
49+
async def main():
50+
# initiate the default connection to elasticsearch
51+
async_connections.create_connection(hosts=[os.environ["ELASTICSEARCH_URL"]])
52+
53+
async for b in scan_aggs(
54+
AsyncSearch(index="git"),
55+
{"files": A("terms", field="files")},
56+
{"first_seen": A("min", field="committed_date")},
57+
):
58+
print(
59+
"File %s has been modified %d times, first seen at %s."
60+
% (b.key.files, b.doc_count, b.first_seen.value_as_string)
61+
)
62+
63+
# close the connection
64+
await async_connections.get_connection().close()
65+
66+
67+
if __name__ == "__main__":
68+
asyncio.run(main())

0 commit comments

Comments
 (0)