Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Commit 6dcb079

Browse files
Sergei Smolianinovlukaszdudek-silvair
Sergei Smolianinov
authored andcommitted
Add CQs management methods to the client
1 parent 05a101d commit 6dcb079

File tree

3 files changed

+222
-0
lines changed

3 files changed

+222
-0
lines changed

influxdb/client.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -908,6 +908,98 @@ def get_list_privileges(self, username):
908908
text = "SHOW GRANTS FOR {0}".format(quote_ident(username))
909909
return list(self.query(text).get_points())
910910

911+
def get_list_continuous_queries(self):
912+
"""Get the list of continuous queries in InfluxDB.
913+
914+
:return: all CQs in InfluxDB
915+
:rtype: list of dictionaries
916+
917+
:Example:
918+
919+
::
920+
921+
>> cqs = client.get_list_cqs()
922+
>> cqs
923+
[
924+
{
925+
u'db1': []
926+
},
927+
{
928+
u'db2': [
929+
{
930+
u'name': u'vampire',
931+
u'query': u'CREATE CONTINUOUS QUERY vampire ON '
932+
'mydb BEGIN SELECT count(dracula) INTO '
933+
'mydb.autogen.all_of_them FROM '
934+
'mydb.autogen.one GROUP BY time(5m) END'
935+
}
936+
]
937+
}
938+
]
939+
"""
940+
query_string = "SHOW CONTINUOUS QUERIES"
941+
return [{sk[0]: list(p)} for sk, p in self.query(query_string).items()]
942+
943+
def create_continuous_query(self, name, select, database=None,
944+
resample_opts=None):
945+
r"""Create a continuous query for a database.
946+
947+
:param name: the name of continuous query to create
948+
:type name: str
949+
:param select: select statement for the continuous query
950+
:type select: str
951+
:param database: the database for which the continuous query is
952+
created. Defaults to current client's database
953+
:type database: str
954+
:param resample_opts: resample options
955+
:type resample_opts: str
956+
957+
:Example:
958+
959+
::
960+
961+
>> select_clause = 'SELECT mean("value") INTO "cpu_mean" ' \
962+
... 'FROM "cpu" GROUP BY time(1m)'
963+
>> client.create_continuous_query(
964+
... 'cpu_mean', select_clause, 'db_name', 'EVERY 10s FOR 2m'
965+
... )
966+
>> client.get_list_continuous_queries()
967+
[
968+
{
969+
'db_name': [
970+
{
971+
'name': 'cpu_mean',
972+
'query': 'CREATE CONTINUOUS QUERY "cpu_mean" '
973+
'ON "db_name" '
974+
'RESAMPLE EVERY 10s FOR 2m '
975+
'BEGIN SELECT mean("value") '
976+
'INTO "cpu_mean" FROM "cpu" '
977+
'GROUP BY time(1m) END'
978+
}
979+
]
980+
}
981+
]
982+
"""
983+
query_string = (
984+
"CREATE CONTINUOUS QUERY {0} ON {1}{2} BEGIN {3} END"
985+
).format(quote_ident(name), quote_ident(database or self._database),
986+
' RESAMPLE ' + resample_opts if resample_opts else '', select)
987+
self.query(query_string)
988+
989+
def drop_continuous_query(self, name, database=None):
990+
"""Drop an existing continuous query for a database.
991+
992+
:param name: the name of continuous query to drop
993+
:type name: str
994+
:param database: the database for which the continuous query is
995+
dropped. Defaults to current client's database
996+
:type database: str
997+
"""
998+
query_string = (
999+
"DROP CONTINUOUS QUERY {0} ON {1}"
1000+
).format(quote_ident(name), quote_ident(database or self._database))
1001+
self.query(query_string)
1002+
9111003
def send_packet(self, packet, protocol='json', time_precision=None):
9121004
"""Send an UDP packet.
9131005

influxdb/tests/client_test.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1027,6 +1027,108 @@ def test_get_list_privileges_fails(self):
10271027
with _mocked_session(cli, 'get', 401):
10281028
cli.get_list_privileges('test')
10291029

1030+
def test_get_list_continuous_queries(self):
1031+
data = {
1032+
"results": [
1033+
{
1034+
"statement_id": 0,
1035+
"series": [
1036+
{
1037+
"name": "testdb01",
1038+
"columns": ["name", "query"],
1039+
"values": [["testname01", "testquery01"],
1040+
["testname02", "testquery02"]]
1041+
},
1042+
{
1043+
"name": "testdb02",
1044+
"columns": ["name", "query"],
1045+
"values": [["testname03", "testquery03"]]
1046+
},
1047+
{
1048+
"name": "testdb03",
1049+
"columns": ["name", "query"]
1050+
}
1051+
]
1052+
}
1053+
]
1054+
}
1055+
1056+
with _mocked_session(self.cli, 'get', 200, json.dumps(data)):
1057+
self.assertListEqual(
1058+
self.cli.get_list_continuous_queries(),
1059+
[
1060+
{
1061+
'testdb01': [
1062+
{'name': 'testname01', 'query': 'testquery01'},
1063+
{'name': 'testname02', 'query': 'testquery02'}
1064+
]
1065+
},
1066+
{
1067+
'testdb02': [
1068+
{'name': 'testname03', 'query': 'testquery03'}
1069+
]
1070+
},
1071+
{
1072+
'testdb03': []
1073+
}
1074+
]
1075+
)
1076+
1077+
@raises(Exception)
1078+
def test_get_list_continuous_queries_fails(self):
1079+
with _mocked_session(self.cli, 'get', 400):
1080+
self.cli.get_list_continuous_queries()
1081+
1082+
def test_create_continuous_query(self):
1083+
data = {"results": [{}]}
1084+
with requests_mock.Mocker() as m:
1085+
m.register_uri(
1086+
requests_mock.GET,
1087+
"http://localhost:8086/query",
1088+
text=json.dumps(data)
1089+
)
1090+
query = 'SELECT count("value") INTO "6_months"."events" FROM ' \
1091+
'"events" GROUP BY time(10m)'
1092+
self.cli.create_continuous_query('cq_name', query, 'db_name')
1093+
self.assertEqual(
1094+
m.last_request.qs['q'][0],
1095+
'create continuous query "cq_name" on "db_name" begin select '
1096+
'count("value") into "6_months"."events" from "events" group '
1097+
'by time(10m) end'
1098+
)
1099+
self.cli.create_continuous_query('cq_name', query, 'db_name',
1100+
'EVERY 10s FOR 2m')
1101+
self.assertEqual(
1102+
m.last_request.qs['q'][0],
1103+
'create continuous query "cq_name" on "db_name" resample '
1104+
'every 10s for 2m begin select count("value") into '
1105+
'"6_months"."events" from "events" group by time(10m) end'
1106+
)
1107+
1108+
@raises(Exception)
1109+
def test_create_continuous_query_fails(self):
1110+
with _mocked_session(self.cli, 'get', 400):
1111+
self.cli.create_continuous_query('cq_name', 'select', 'db_name')
1112+
1113+
def test_drop_continuous_query(self):
1114+
data = {"results": [{}]}
1115+
with requests_mock.Mocker() as m:
1116+
m.register_uri(
1117+
requests_mock.GET,
1118+
"http://localhost:8086/query",
1119+
text=json.dumps(data)
1120+
)
1121+
self.cli.drop_continuous_query('cq_name', 'db_name')
1122+
self.assertEqual(
1123+
m.last_request.qs['q'][0],
1124+
'drop continuous query "cq_name" on "db_name"'
1125+
)
1126+
1127+
@raises(Exception)
1128+
def test_drop_continuous_query_fails(self):
1129+
with _mocked_session(self.cli, 'get', 400):
1130+
self.cli.drop_continuous_query('cq_name', 'db_name')
1131+
10301132
def test_invalid_port_fails(self):
10311133
"""Test invalid port fail for TestInfluxDBClient object."""
10321134
with self.assertRaises(ValueError):

influxdb/tests/server_tests/client_test_with_server.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -720,6 +720,34 @@ def test_drop_retention_policy(self):
720720
rsp
721721
)
722722

723+
def test_create_continuous_query(self):
724+
self.cli.create_retention_policy('some_rp', '1d', 1)
725+
query = 'select count("value") into "some_rp"."events" from ' \
726+
'"events" group by time(10m)'
727+
self.cli.create_continuous_query('test_cq', query, 'db')
728+
cqs = self.cli.get_list_continuous_queries()
729+
expected_cqs = [
730+
{
731+
'db': [
732+
{
733+
'name': 'test_cq',
734+
'query': 'CREATE CONTINUOUS QUERY test_cq ON db '
735+
'BEGIN SELECT count(value) INTO '
736+
'db.some_rp.events FROM db.autogen.events '
737+
'GROUP BY time(10m) END'
738+
}
739+
]
740+
}
741+
]
742+
self.assertEqual(cqs, expected_cqs)
743+
744+
def test_drop_continuous_query(self):
745+
self.test_create_continuous_query()
746+
self.cli.drop_continuous_query('test_cq', 'db')
747+
cqs = self.cli.get_list_continuous_queries()
748+
expected_cqs = [{'db': []}]
749+
self.assertEqual(cqs, expected_cqs)
750+
723751
def test_issue_143(self):
724752
"""Test for PR#143 from repo."""
725753
pt = partial(point, 'a_series_name', timestamp='2015-03-30T16:16:37Z')

0 commit comments

Comments
 (0)