Skip to content

Commit 4f3b9ab

Browse files
committed
Merged protocol.py
1 parent e0c65ee commit 4f3b9ab

File tree

1 file changed

+86
-97
lines changed

1 file changed

+86
-97
lines changed

cassandra/protocol.py

Lines changed: 86 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
UnsupportedOperation, UserFunctionDescriptor,
3131
UserAggregateDescriptor, SchemaTargetType)
3232
from cassandra.marshal import (int32_pack, int32_unpack, uint16_pack, uint16_unpack,
33-
int8_pack, int8_unpack, uint64_pack, header_pack,
33+
uint8_pack, int8_unpack, uint64_pack, header_pack,
3434
v3_header_pack, uint32_pack)
3535
from cassandra.cqltypes import (AsciiType, BytesType, BooleanType,
3636
CounterColumnType, DateType, DecimalType,
@@ -387,6 +387,11 @@ def to_exception(self):
387387
return AlreadyExists(**self.info)
388388

389389

390+
class ClientWriteError(RequestExecutionException):
391+
summary = 'Client write failure.'
392+
error_code = 0x8000
393+
394+
390395
class StartupMessage(_MessageType):
391396
opcode = 0x01
392397
name = 'STARTUP'
@@ -512,31 +517,34 @@ def recv_body(cls, f, *args):
512517
_PAGE_SIZE_FLAG = 0x04
513518
_WITH_PAGING_STATE_FLAG = 0x08
514519
_WITH_SERIAL_CONSISTENCY_FLAG = 0x10
515-
_PROTOCOL_TIMESTAMP = 0x20
520+
_PROTOCOL_TIMESTAMP_FLAG = 0x20
521+
_NAMES_FOR_VALUES_FLAG = 0x40 # not used here
516522
_WITH_KEYSPACE_FLAG = 0x80
517523
_PREPARED_WITH_KEYSPACE_FLAG = 0x01
524+
_PAGE_SIZE_BYTES_FLAG = 0x40000000
525+
_PAGING_OPTIONS_FLAG = 0x80000000
518526

519527

520-
class QueryMessage(_MessageType):
521-
opcode = 0x07
522-
name = 'QUERY'
528+
class _QueryMessage(_MessageType):
523529

524-
def __init__(self, query, consistency_level, serial_consistency_level=None,
525-
fetch_size=None, paging_state=None, timestamp=None, keyspace=None):
526-
self.query = query
530+
def __init__(self, query_params, consistency_level,
531+
serial_consistency_level=None, fetch_size=None,
532+
paging_state=None, timestamp=None, skip_meta=False,
533+
continuous_paging_options=None, keyspace=None):
534+
self.query_params = query_params
527535
self.consistency_level = consistency_level
528536
self.serial_consistency_level = serial_consistency_level
529537
self.fetch_size = fetch_size
530538
self.paging_state = paging_state
531539
self.timestamp = timestamp
540+
self.skip_meta = skip_meta
541+
self.continuous_paging_options = continuous_paging_options
532542
self.keyspace = keyspace
533-
self._query_params = None # only used internally. May be set to a list of native-encoded values to have them sent with the request.
534543

535-
def send_body(self, f, protocol_version):
536-
write_longstring(f, self.query)
544+
def _write_query_params(self, f, protocol_version):
537545
write_consistency_level(f, self.consistency_level)
538546
flags = 0x00
539-
if self._query_params is not None:
547+
if self.query_params is not None:
540548
flags |= _VALUES_FLAG # also v2+, but we're only setting params internally right now
541549

542550
if self.serial_consistency_level:
@@ -565,26 +573,33 @@ def send_body(self, f, protocol_version):
565573
"2 or higher. Consider setting Cluster.protocol_version to 2.")
566574

567575
if self.timestamp is not None:
568-
flags |= _PROTOCOL_TIMESTAMP
576+
flags |= _PROTOCOL_TIMESTAMP_FLAG
577+
578+
if self.continuous_paging_options:
579+
if ProtocolVersion.has_continuous_paging_support(protocol_version):
580+
flags |= _PAGING_OPTIONS_FLAG
581+
else:
582+
raise UnsupportedOperation(
583+
"Continuous paging may only be used with protocol version "
584+
"ProtocolVersion.DSE_V1 or higher. Consider setting Cluster.protocol_version to ProtocolVersion.DSE_V1.")
569585

570586
if self.keyspace is not None:
571587
if ProtocolVersion.uses_keyspace_flag(protocol_version):
572588
flags |= _WITH_KEYSPACE_FLAG
573589
else:
574590
raise UnsupportedOperation(
575591
"Keyspaces may only be set on queries with protocol version "
576-
"5 or higher. Consider setting Cluster.protocol_version to 5.")
592+
"DSE_V2 or higher. Consider setting Cluster.protocol_version to ProtocolVersion.DSE_V2.")
577593

578594
if ProtocolVersion.uses_int_query_flags(protocol_version):
579595
write_uint(f, flags)
580596
else:
581597
write_byte(f, flags)
582598

583-
if self._query_params is not None:
584-
write_short(f, len(self._query_params))
585-
for param in self._query_params:
599+
if self.query_params is not None:
600+
write_short(f, len(self.query_params))
601+
for param in self.query_params:
586602
write_value(f, param)
587-
588603
if self.fetch_size:
589604
write_int(f, self.fetch_size)
590605
if self.paging_state:
@@ -595,6 +610,49 @@ def send_body(self, f, protocol_version):
595610
write_long(f, self.timestamp)
596611
if self.keyspace is not None:
597612
write_string(f, self.keyspace)
613+
if self.continuous_paging_options:
614+
self._write_paging_options(f, self.continuous_paging_options, protocol_version)
615+
616+
def _write_paging_options(self, f, paging_options, protocol_version):
617+
write_int(f, paging_options.max_pages)
618+
write_int(f, paging_options.max_pages_per_second)
619+
if ProtocolVersion.has_continuous_paging_next_pages(protocol_version):
620+
write_int(f, paging_options.max_queue_size)
621+
622+
623+
class QueryMessage(_QueryMessage):
624+
opcode = 0x07
625+
name = 'QUERY'
626+
627+
def __init__(self, query, consistency_level, serial_consistency_level=None,
628+
fetch_size=None, paging_state=None, timestamp=None, continuous_paging_options=None, keyspace=None):
629+
self.query = query
630+
super(QueryMessage, self).__init__(None, consistency_level, serial_consistency_level, fetch_size,
631+
paging_state, timestamp, False, continuous_paging_options, keyspace)
632+
633+
def send_body(self, f, protocol_version):
634+
write_longstring(f, self.query)
635+
self._write_query_params(f, protocol_version)
636+
637+
638+
class ExecuteMessage(_QueryMessage):
639+
opcode = 0x0A
640+
name = 'EXECUTE'
641+
642+
def __init__(self, query_id, query_params, consistency_level,
643+
serial_consistency_level=None, fetch_size=None,
644+
paging_state=None, timestamp=None, skip_meta=False,
645+
continuous_paging_options=None, result_metadata_id=None):
646+
self.query_id = query_id
647+
self.result_metadata_id = result_metadata_id
648+
super(ExecuteMessage, self).__init__(query_params, consistency_level, serial_consistency_level, fetch_size,
649+
paging_state, timestamp, skip_meta, continuous_paging_options)
650+
651+
def send_body(self, f, protocol_version):
652+
write_string(f, self.query_id)
653+
if ProtocolVersion.uses_prepared_metadata(protocol_version):
654+
write_string(f, self.result_metadata_id)
655+
self._write_query_params(f, protocol_version)
598656

599657

600658
CUSTOM_TYPE = object()
@@ -611,6 +669,8 @@ class ResultMessage(_MessageType):
611669
name = 'RESULT'
612670

613671
kind = None
672+
results = None
673+
paging_state = None
614674

615675
# Names match type name in module scope. Most are imported from cassandra.cqltypes (except CUSTOM_TYPE)
616676
type_codes = _cqltypes_by_code = dict((v, globals()[k]) for k, v in type_codes.__dict__.items() if not k.startswith('_'))
@@ -622,6 +682,8 @@ class ResultMessage(_MessageType):
622682
_CONTINUOUS_PAGING_LAST_FLAG = 0x80000000
623683
_METADATA_ID_FLAG = 0x0008
624684

685+
kind = None
686+
625687
# These are all the things a result message might contain. They are populated according to 'kind'
626688
column_names = None
627689
column_types = None
@@ -635,7 +697,6 @@ class ResultMessage(_MessageType):
635697
bind_metadata = None
636698
pk_indexes = None
637699
schema_change_event = None
638-
result_metadata_id = None
639700

640701
def __init__(self, kind):
641702
self.kind = kind
@@ -818,7 +879,7 @@ def send_body(self, f, protocol_version):
818879
else:
819880
raise UnsupportedOperation(
820881
"Keyspaces may only be set on queries with protocol version "
821-
"5 or higher. Consider setting Cluster.protocol_version to 5.")
882+
"5 or DSE_V2 or higher. Consider setting Cluster.protocol_version.")
822883

823884
if ProtocolVersion.uses_prepare_flags(protocol_version):
824885
write_uint(f, flags)
@@ -829,87 +890,14 @@ def send_body(self, f, protocol_version):
829890
"Attempted to set flags with value {flags:0=#8x} on"
830891
"protocol version {pv}, which doesn't support flags"
831892
"in prepared statements."
832-
"Consider setting Cluster.protocol_version to 5."
893+
"Consider setting Cluster.protocol_version to 5 or DSE_V2."
833894
"".format(flags=flags, pv=protocol_version))
834895

835896
if ProtocolVersion.uses_keyspace_flag(protocol_version):
836897
if self.keyspace:
837898
write_string(f, self.keyspace)
838899

839900

840-
class ExecuteMessage(_MessageType):
841-
opcode = 0x0A
842-
name = 'EXECUTE'
843-
844-
def __init__(self, query_id, query_params, consistency_level,
845-
serial_consistency_level=None, fetch_size=None,
846-
paging_state=None, timestamp=None, skip_meta=False,
847-
result_metadata_id=None):
848-
self.query_id = query_id
849-
self.query_params = query_params
850-
self.consistency_level = consistency_level
851-
self.serial_consistency_level = serial_consistency_level
852-
self.fetch_size = fetch_size
853-
self.paging_state = paging_state
854-
self.timestamp = timestamp
855-
self.skip_meta = skip_meta
856-
self.result_metadata_id = result_metadata_id
857-
858-
def send_body(self, f, protocol_version):
859-
write_string(f, self.query_id)
860-
if ProtocolVersion.uses_prepared_metadata(protocol_version):
861-
write_string(f, self.result_metadata_id)
862-
if protocol_version == 1:
863-
if self.serial_consistency_level:
864-
raise UnsupportedOperation(
865-
"Serial consistency levels require the use of protocol version "
866-
"2 or higher. Consider setting Cluster.protocol_version to 2 "
867-
"to support serial consistency levels.")
868-
if self.fetch_size or self.paging_state:
869-
raise UnsupportedOperation(
870-
"Automatic query paging may only be used with protocol version "
871-
"2 or higher. Consider setting Cluster.protocol_version to 2.")
872-
write_short(f, len(self.query_params))
873-
for param in self.query_params:
874-
write_value(f, param)
875-
write_consistency_level(f, self.consistency_level)
876-
else:
877-
write_consistency_level(f, self.consistency_level)
878-
flags = _VALUES_FLAG
879-
if self.serial_consistency_level:
880-
flags |= _WITH_SERIAL_CONSISTENCY_FLAG
881-
if self.fetch_size:
882-
flags |= _PAGE_SIZE_FLAG
883-
if self.paging_state:
884-
flags |= _WITH_PAGING_STATE_FLAG
885-
if self.timestamp is not None:
886-
if protocol_version >= 3:
887-
flags |= _PROTOCOL_TIMESTAMP
888-
else:
889-
raise UnsupportedOperation(
890-
"Protocol-level timestamps may only be used with protocol version "
891-
"3 or higher. Consider setting Cluster.protocol_version to 3.")
892-
if self.skip_meta:
893-
flags |= _SKIP_METADATA_FLAG
894-
895-
if ProtocolVersion.uses_int_query_flags(protocol_version):
896-
write_uint(f, flags)
897-
else:
898-
write_byte(f, flags)
899-
900-
write_short(f, len(self.query_params))
901-
for param in self.query_params:
902-
write_value(f, param)
903-
if self.fetch_size:
904-
write_int(f, self.fetch_size)
905-
if self.paging_state:
906-
write_longstring(f, self.paging_state)
907-
if self.serial_consistency_level:
908-
write_consistency_level(f, self.serial_consistency_level)
909-
if self.timestamp is not None:
910-
write_long(f, self.timestamp)
911-
912-
913901
class BatchMessage(_MessageType):
914902
opcode = 0x0D
915903
name = 'BATCH'
@@ -945,7 +933,7 @@ def send_body(self, f, protocol_version):
945933
if self.serial_consistency_level:
946934
flags |= _WITH_SERIAL_CONSISTENCY_FLAG
947935
if self.timestamp is not None:
948-
flags |= _PROTOCOL_TIMESTAMP
936+
flags |= _PROTOCOL_TIMESTAMP_FLAG
949937
if self.keyspace:
950938
if ProtocolVersion.uses_keyspace_flag(protocol_version):
951939
flags |= _WITH_KEYSPACE_FLAG
@@ -1146,7 +1134,7 @@ def decode_message(cls, protocol_version, user_type_map, stream_id, flags, opcod
11461134
else:
11471135
custom_payload = None
11481136

1149-
flags &= USE_BETA_MASK # will only be set if we asserted it in connection estabishment
1137+
flags &= USE_BETA_MASK # will only be set if we asserted it in connection estabishment
11501138

11511139
if flags:
11521140
log.warning("Unknown protocol flags set: %02x. May cause problems.", flags)
@@ -1164,6 +1152,7 @@ def decode_message(cls, protocol_version, user_type_map, stream_id, flags, opcod
11641152

11651153
return msg
11661154

1155+
11671156
def cython_protocol_handler(colparser):
11681157
"""
11691158
Given a column parser to deserialize ResultMessages, return a suitable
@@ -1229,7 +1218,7 @@ def read_byte(f):
12291218

12301219

12311220
def write_byte(f, b):
1232-
f.write(int8_pack(b))
1221+
f.write(uint8_pack(b))
12331222

12341223

12351224
def read_int(f):

0 commit comments

Comments
 (0)