diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index ed69b2a2..67757405 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -2,6 +2,7 @@ import pymysql import struct +import time from distutils.version import LooseVersion from pymysql.constants.COMMAND import COM_BINLOG_DUMP, COM_REGISTER_SLAVE @@ -30,6 +31,7 @@ # 2006 MySQL server has gone away MYSQL_EXPECTED_ERROR_CODES = [2013, 2006] +PYMYSQL_VERSION_LT_06 = pymysql.__version__ < LooseVersion("0.6") class ReportSlave(object): @@ -142,20 +144,48 @@ def __init__(self, connection_settings, server_id, slave_heartbeat=None, is_mariadb=False): """ - Attributes: + Parameters: + connection_settings: a dict of parameters passed to `pymysql.connect` + or `pymysql_wrapper`, of which "db" parameter is not necessary + pymysql_wrapper: custom replacement for `pymysql.connect` ctl_connection_settings: Connection settings for cluster holding - schema information - resume_stream: Start for event from position or the latest event of - binlog or from older available event + schema information, which could be None, in which case + `connection_settings` will be used as ctl_connection_settings, + except for that "db" will be replaced to "information_schema" + resume_stream: True or False. control the start point of the returned + events, only works when `auto_position` is None. + `fetchone` will fetch data from: + 1.the begining of `log_file`: if `resume_stream` is False + 2.`log_pos` of `log_file`: if resume_stream is True, and it's + the first time to fetch the data + 3.the event right next to the last fetched event: when resume_stream + is True and it's not the first time to fetch data + note: the log position will be set back to the begging of `log_file` + each time the client is disconnected and then reconnected + to the mysql server (OperationalError 2006/2013) if resume_stream + is False. so it's suggested to set resume_stream to True. + blocking: When master has finished reading/sending binlog it will send EOF instead of blocking connection. only_events: Array of allowed events ignored_events: Array of ignored events - log_file: Set replication start log file + log_file: Set replication start log file. if ether `log_file` or + `log_pos` is None, and auto_position is None, then log_pos + and log_file will be set as the values returned by the query + "SHOW MASTER STATUS" log_pos: Set replication start log pos (resume_stream should be - true) + true). if ether `log_file` or `log_pos` is None, and auto_position + is None, then log_pos and log_file will be set as the values + returned by the query "SHOW MASTER STATUS", and log_pos will + be set as 4 (the start position of any log file) if resume_stream + is a false value end_log_pos: Set replication end log pos - auto_position: Use master_auto_position gtid to set position + auto_position: a string of replicated GTIDs. all the events except + for thoses included in `auto_position` and those purged by + the source server will be sent to the client. a valid `auto_position` + looks like: + 19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10, + 1c2aad49-ae92-409a-b4df-d05a03e4702e:42-47:80-100:130-140 only_tables: An array with the tables you want to watch (only works in binlog_format ROW) ignored_tables: An array with the tables you want to skip @@ -177,13 +207,17 @@ def __init__(self, connection_settings, server_id, for semantics is_mariadb: Flag to indicate it's a MariaDB server, used with auto_position to point to Mariadb specific GTID. + + Notes: + the log position will be set back to the begging of `log_file` + each time the client is disconnected and then auto-reconnected + to the mysql server (OperationalError 2006/2013) if resume_stream + is False. so it's suggested to set resume_stream to True. """ self.__connection_settings = connection_settings self.__connection_settings.setdefault("charset", "utf8") - self.__connected_stream = False - self.__connected_ctl = False self.__resume_stream = resume_stream self.__blocking = blocking self._ctl_connection_settings = ctl_connection_settings @@ -232,22 +266,24 @@ def __init__(self, connection_settings, server_id, def close(self): if self.__connected_stream: self._stream_connection.close() - self.__connected_stream = False - if self.__connected_ctl: + if getattr(self, '_ctl_connection', None): # break reference cycle between stream reader and underlying # mysql connection object self._ctl_connection._get_table_information = None - self._ctl_connection.close() - self.__connected_ctl = False + if self._ctl_connection.open: + self._ctl_connection.close() - def __connect_to_ctl(self): + def __connect_to_ctl(self, force_reconnect=False): + if self.__connected_ctl: + if not force_reconnect: + return + self._ctl_connection.close() if not self._ctl_connection_settings: self._ctl_connection_settings = dict(self.__connection_settings) self._ctl_connection_settings["db"] = "information_schema" self._ctl_connection_settings["cursorclass"] = DictCursor self._ctl_connection = self.pymysql_wrapper(**self._ctl_connection_settings) self._ctl_connection._get_table_information = self.__get_table_information - self.__connected_ctl = True def __checksum_enabled(self): """Return True if binlog-checksum = CRC32. Only for MySQL > 5.6""" @@ -269,7 +305,7 @@ def _register_slave(self): packet = self.report_slave.encoded(self.__server_id) - if pymysql.__version__ < LooseVersion("0.6"): + if PYMYSQL_VERSION_LT_06: self._stream_connection.wfile.write(packet) self._stream_connection.wfile.flush() self._stream_connection.read_packet() @@ -278,12 +314,29 @@ def _register_slave(self): self._stream_connection._next_seq_id = 1 self._stream_connection._read_packet() - def __connect_to_stream(self): + @property + def __connected_stream(self): + return bool(getattr(self, '_stream_connection', None) and \ + self._stream_connection.open) + + @property + def __connected_ctl(self): + return bool(getattr(self, '_ctl_connection', None) and \ + self._ctl_connection.open) + + def __connect_to_stream(self, force_reconnect=False): + if self.__connected_stream: + if not force_reconnect: + return + self._stream_connection.close() + # log_pos (4) -- position in the binlog-file to start the stream with # flags (2) BINLOG_DUMP_NON_BLOCK (0 or 1) # server_id (4) -- server id of this slave # log_file (string.EOF) -- filename of the binlog on the master self._stream_connection = self.pymysql_wrapper(**self.__connection_settings) + if PYMYSQL_VERSION_LT_06: + self._stream_connection._read_packet = self._stream_connection.read_packet self.__use_checksum = self.__checksum_enabled() @@ -305,9 +358,7 @@ def __connect_to_stream(self): 4294967)) # If heartbeat is too low, the connection will disconnect before, # this is also the behavior in mysql - heartbeat = float(min(net_timeout/2., self.slave_heartbeat)) - if heartbeat > 4294967: - heartbeat = 4294967 + heartbeat = float(min(net_timeout/2., self.slave_heartbeat, 4294967)) # master_heartbeat_period is nanoseconds heartbeat = int(heartbeat * 1000000000) @@ -454,35 +505,33 @@ def __connect_to_stream(self): # encoded_data prelude += gtid_set.encoded() - if pymysql.__version__ < LooseVersion("0.6"): + if PYMYSQL_VERSION_LT_06: self._stream_connection.wfile.write(prelude) self._stream_connection.wfile.flush() else: self._stream_connection._write_bytes(prelude) self._stream_connection._next_seq_id = 1 - self.__connected_stream = True - def fetchone(self): + def fetchone(self, force_reconnect=False): + self.__prefetch(force_reconnect=force_reconnect) + return self.__fetchone() + + def __prefetch(self, force_reconnect=False): + self.__connect_to_ctl(force_reconnect=force_reconnect) + self.__connect_to_stream(force_reconnect=force_reconnect) + + def __fetchone(self): + # let `__fetchone` be as light weight as possible. while True: if self.end_log_pos and self.is_past_end_log_pos: return None - if not self.__connected_stream: - self.__connect_to_stream() - - if not self.__connected_ctl: - self.__connect_to_ctl() - try: - if pymysql.__version__ < LooseVersion("0.6"): - pkt = self._stream_connection.read_packet() - else: - pkt = self._stream_connection._read_packet() + pkt = self._stream_connection._read_packet() except pymysql.OperationalError as error: code, message = error.args if code in MYSQL_EXPECTED_ERROR_CODES: - self._stream_connection.close() - self.__connected_stream = False + self.__connect_to_stream(force_reconnect=True) continue raise @@ -597,9 +646,8 @@ def _allowed_event_list(self, only_events, ignored_events, def __get_table_information(self, schema, table): for i in range(1, 3): + self.__connect_to_ctl() try: - if not self.__connected_ctl: - self.__connect_to_ctl() cur = self._ctl_connection.cursor() cur.execute(""" @@ -617,10 +665,10 @@ def __get_table_information(self, schema, table): except pymysql.OperationalError as error: code, message = error.args if code in MYSQL_EXPECTED_ERROR_CODES: - self.__connected_ctl = False continue else: raise error def __iter__(self): - return iter(self.fetchone, None) + self.__prefetch(force_reconnect=False) + return iter(self.__fetchone, None) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index a2ea52fa..66d51616 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -503,13 +503,27 @@ def test_end_log_pos(self): binlog = self.execute("SHOW BINARY LOGS").fetchone()[0] + self.stream.close() + self.stream = BinLogStreamReader( + self.database, + server_id=1024, + log_pos=0, + log_file=binlog) + # fetch several events then get the end position of + # the last event + # do not use a fixed int as end position, cause that + # may be an invalid position + for i in range(13): + _ = self.stream.fetchone() + binlog = self.stream.log_file + end_position = self.stream.log_pos self.stream.close() self.stream = BinLogStreamReader( self.database, server_id=1024, log_pos=0, log_file=binlog, - end_log_pos=888) + end_log_pos=end_position) last_log_pos = 0 last_event_type = 0 @@ -517,7 +531,7 @@ def test_end_log_pos(self): last_log_pos = self.stream.log_pos last_event_type = event.event_type - self.assertEqual(last_log_pos, 888) + self.assertEqual(last_log_pos, end_position) self.assertEqual(last_event_type, TABLE_MAP_EVENT) class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase): diff --git a/pymysqlreplication/tests/test_data_type.py b/pymysqlreplication/tests/test_data_type.py index 8c0215d8..74ab64cf 100644 --- a/pymysqlreplication/tests/test_data_type.py +++ b/pymysqlreplication/tests/test_data_type.py @@ -730,7 +730,7 @@ def test_null_bitmask(self): column_type = "INT" column_definition.append(column_type) - nullability = "NOT NULL" if not RowsEvent.__is_null(bit_mask, i) else "" + nullability = "NOT NULL" if not RowsEvent._RowsEvent__is_null(None, bit_mask, i) else "" column_definition.append(nullability) columns.append(" ".join(column_definition))