From b772d48432c1ad4a34eec6f1b48c36e6809e5c02 Mon Sep 17 00:00:00 2001 From: fazi Date: Sat, 27 Feb 2016 02:34:33 +0100 Subject: [PATCH] new code --- README.md | 30 +- composer.json | 3 +- example/benchmark.php | 131 +-- example/dump_events.php | 13 +- .../PackAuth.php => BinLog/BinLogAuth.php} | 47 +- src/MySQLReplication/BinLog/BinLogColumns.php | 85 +- .../BinLog/{Connect.php => BinLogConnect.php} | 150 ++- src/MySQLReplication/BinLog/BinLogEvent.php | 119 --- src/MySQLReplication/BinLog/BinLogPack.php | 478 ---------- .../BinLogServerInfo.php} | 30 +- src/MySQLReplication/BinLogStream.php | 88 ++ .../BinaryDataReader/BinaryDataReader.php | 409 ++++++++ .../BinaryDataReaderBuilder.php | 23 + .../BinaryDataReaderService.php | 22 + src/MySQLReplication/Config/Config.php | 156 +++- src/MySQLReplication/Config/ConfigBuilder.php | 202 ++++ src/MySQLReplication/Config/ConfigService.php | 80 ++ src/MySQLReplication/DTO/DeleteRowsDTO.php | 11 - src/MySQLReplication/DTO/EventDTO.php | 103 -- src/MySQLReplication/DTO/UpdateRowsDTO.php | 11 - src/MySQLReplication/DTO/WriteRowsDTO.php | 11 - .../Definitions/ConstAuth.php | 19 - .../Definitions/ConstEventsNames.php | 15 + src/MySQLReplication/Definitions/ConstMy.php | 20 - .../Event/DTO/DeleteRowsDTO.php | 20 + src/MySQLReplication/Event/DTO/EventDTO.php | 53 ++ .../{ => Event}/DTO/GTIDLogDTO.php | 34 +- .../{ => Event}/DTO/QueryDTO.php | 40 +- .../{ => Event}/DTO/RotateDTO.php | 34 +- .../{ => Event}/DTO/RowsDTO.php | 35 +- .../{ => Event}/DTO/TableMapDTO.php | 34 +- .../Event/DTO/UpdateRowsDTO.php | 20 + .../Event/DTO/WriteRowsDTO.php | 20 + .../{ => Event}/DTO/XidDTO.php | 34 +- src/MySQLReplication/Event/Event.php | 140 +++ src/MySQLReplication/Event/EventCommon.php | 31 + src/MySQLReplication/Event/EventInfo.php | 131 +++ src/MySQLReplication/Event/GtidEvent.php | 24 + src/MySQLReplication/Event/QueryEvent.php | 31 + src/MySQLReplication/Event/RotateEvent.php | 27 + .../Event/RowEvent/RowEvent.php | 882 ++++++++++++++++++ .../Event/RowEvent/RowEventBuilder.php | 67 ++ .../Event/RowEvent/RowEventService.php | 38 + src/MySQLReplication/Event/XidEvent.php | 23 + src/MySQLReplication/Gtid/GtidCollection.php | 41 + .../{Pack/Gtid.php => Gtid/GtidEntity.php} | 12 +- src/MySQLReplication/Gtid/GtidException.php | 9 + src/MySQLReplication/Gtid/GtidService.php | 28 + src/MySQLReplication/Pack/GtidSet.php | 56 -- src/MySQLReplication/Pack/RowEvent.php | 850 ----------------- .../MySQLRepository.php} | 37 +- src/MySQLReplication/Service/BinLogStream.php | 99 -- tests/{ => Integration}/TypesTest.php | 89 +- 53 files changed, 2993 insertions(+), 2202 deletions(-) rename src/MySQLReplication/{Pack/PackAuth.php => BinLog/BinLogAuth.php} (56%) rename src/MySQLReplication/BinLog/{Connect.php => BinLogConnect.php} (64%) delete mode 100755 src/MySQLReplication/BinLog/BinLogEvent.php delete mode 100755 src/MySQLReplication/BinLog/BinLogPack.php rename src/MySQLReplication/{Pack/ServerInfo.php => BinLog/BinLogServerInfo.php} (69%) create mode 100755 src/MySQLReplication/BinLogStream.php create mode 100755 src/MySQLReplication/BinaryDataReader/BinaryDataReader.php create mode 100755 src/MySQLReplication/BinaryDataReader/BinaryDataReaderBuilder.php create mode 100755 src/MySQLReplication/BinaryDataReader/BinaryDataReaderService.php create mode 100755 src/MySQLReplication/Config/ConfigBuilder.php create mode 100755 src/MySQLReplication/Config/ConfigService.php delete mode 100755 src/MySQLReplication/DTO/DeleteRowsDTO.php delete mode 100755 src/MySQLReplication/DTO/EventDTO.php delete mode 100755 src/MySQLReplication/DTO/UpdateRowsDTO.php delete mode 100755 src/MySQLReplication/DTO/WriteRowsDTO.php delete mode 100755 src/MySQLReplication/Definitions/ConstAuth.php create mode 100755 src/MySQLReplication/Definitions/ConstEventsNames.php delete mode 100755 src/MySQLReplication/Definitions/ConstMy.php create mode 100755 src/MySQLReplication/Event/DTO/DeleteRowsDTO.php create mode 100755 src/MySQLReplication/Event/DTO/EventDTO.php rename src/MySQLReplication/{ => Event}/DTO/GTIDLogDTO.php (65%) rename src/MySQLReplication/{ => Event}/DTO/QueryDTO.php (63%) rename src/MySQLReplication/{ => Event}/DTO/RotateDTO.php (66%) rename src/MySQLReplication/{ => Event}/DTO/RowsDTO.php (75%) rename src/MySQLReplication/{ => Event}/DTO/TableMapDTO.php (72%) create mode 100755 src/MySQLReplication/Event/DTO/UpdateRowsDTO.php create mode 100755 src/MySQLReplication/Event/DTO/WriteRowsDTO.php rename src/MySQLReplication/{ => Event}/DTO/XidDTO.php (59%) create mode 100755 src/MySQLReplication/Event/Event.php create mode 100755 src/MySQLReplication/Event/EventCommon.php create mode 100755 src/MySQLReplication/Event/EventInfo.php create mode 100755 src/MySQLReplication/Event/GtidEvent.php create mode 100755 src/MySQLReplication/Event/QueryEvent.php create mode 100755 src/MySQLReplication/Event/RotateEvent.php create mode 100755 src/MySQLReplication/Event/RowEvent/RowEvent.php create mode 100755 src/MySQLReplication/Event/RowEvent/RowEventBuilder.php create mode 100755 src/MySQLReplication/Event/RowEvent/RowEventService.php create mode 100755 src/MySQLReplication/Event/XidEvent.php create mode 100755 src/MySQLReplication/Gtid/GtidCollection.php rename src/MySQLReplication/{Pack/Gtid.php => Gtid/GtidEntity.php} (72%) create mode 100755 src/MySQLReplication/Gtid/GtidException.php create mode 100755 src/MySQLReplication/Gtid/GtidService.php delete mode 100755 src/MySQLReplication/Pack/GtidSet.php delete mode 100755 src/MySQLReplication/Pack/RowEvent.php rename src/MySQLReplication/{DataBase/DBHelper.php => Repository/MySQLRepository.php} (65%) delete mode 100755 src/MySQLReplication/Service/BinLogStream.php rename tests/{ => Integration}/TypesTest.php (87%) diff --git a/README.md b/README.md index 9d93d4b..a93a6f7 100755 --- a/README.md +++ b/README.md @@ -35,34 +35,8 @@ Remember to change config for your user, host and password. User should have replication privileges [ REPLICATION CLIENT, SELECT] -```php -analysisBinLog(); - if (!is_null($result)) - { - // all events got __toString() implementation - echo $result; - - // all events got JsonSerializable implementation - //echo json_encode($result, JSON_PRETTY_PRINT); - - //echo 'Memory usage ' . round(memory_get_usage() / 1048576, 2) . ' MB' . PHP_EOL; - } -} +```sh +php example/dump_events.php ``` For this SQL sessions: diff --git a/composer.json b/composer.json index 6021bdb..0874abe 100755 --- a/composer.json +++ b/composer.json @@ -10,7 +10,8 @@ "type": "library", "require": { "php": ">=5.4", - "doctrine/dbal": "^2.5" + "doctrine/dbal": "^2.5", + "doctrine/collections": "^1.3" }, "require-dev": { "phpunit/phpunit": "*" diff --git a/example/benchmark.php b/example/benchmark.php index 9326626..fe21db4 100755 --- a/example/benchmark.php +++ b/example/benchmark.php @@ -4,17 +4,14 @@ error_reporting(E_ALL); ini_set('display_errors', 1); -use MySQLReplication\Config\Config; -use MySQLReplication\DataBase\DBHelper; +use Doctrine\DBAL\DriverManager; +use MySQLReplication\BinLogStream; +use MySQLReplication\Config\ConfigService; use MySQLReplication\Definitions\ConstEventType; -use MySQLReplication\DTO\UpdateRowsDTO; -use MySQLReplication\DTO\WriteRowsDTO; -use MySQLReplication\Service\BinLogStream; - - +use MySQLReplication\Event\DTO\UpdateRowsDTO; /** - * Class Base + * Class Benchmark */ class Benchmark { @@ -22,47 +19,81 @@ class Benchmark * @var string */ private $database = 'mysqlreplication_test'; + /** - * @var \Doctrine\DBAL\Connection - */ - private $conn; - /** - * @var Config + * Benchmark constructor. */ - private $config; - public function __construct() { - $this->config = new Config('root', '192.168.1.100', 3306, 'root'); - $this->conn = (new DBHelper($this->config))->getConnection(); + $conn = $this->getConnection(); + $conn->exec("DROP DATABASE IF EXISTS " . $this->database); + $conn->exec("CREATE DATABASE " . $this->database); + $conn->exec("USE " . $this->database); + $conn->exec("CREATE TABLE test (i INT) ENGINE = MEMORY"); + $conn->exec("INSERT INTO test VALUES(1)"); + $conn->exec("CREATE TABLE test2 (i INT) ENGINE = MEMORY"); + $conn->exec("INSERT INTO test2 VALUES(1)"); + $conn->exec("RESET MASTER"); + + $this->binLogStream = new BinLogStream( + (new ConfigService())->makeConfigFromArray([ + 'user' => 'root', + 'host' => '192.168.1.100', + 'password' => 'root', + 'eventsOnly' => [ConstEventType::UPDATE_ROWS_EVENT_V1, ConstEventType::UPDATE_ROWS_EVENT_V2], + 'slaveId' => 9999 + ]) + ); + } - $this->conn->exec("DROP DATABASE IF EXISTS " . $this->database); - $this->conn->exec("CREATE DATABASE " . $this->database); - $this->conn->exec("USE " . $this->database); - $this->conn->exec("CREATE TABLE test (i INT) ENGINE = MEMORY"); - $this->conn->exec("INSERT INTO test VALUES(1)"); - $this->conn->exec("CREATE TABLE test2 (i INT) ENGINE = MEMORY"); - $this->conn->exec("INSERT INTO test2 VALUES(1)"); - $this->conn->exec("RESET MASTER"); + /** + * @return \Doctrine\DBAL\Connection + * @throws \Doctrine\DBAL\DBALException + */ + private function getConnection() + { + return DriverManager::getConnection([ + 'user' => 'root', + 'password' => 'root', + 'host' => '192.168.1.100', + 'port' => 3306, + 'driver' => 'pdo_mysql', + 'dbname' => $this->database + ]); + } - $this->binLogStream = new BinLogStream( - $this->config, - '', - '', - '', - '', - [ConstEventType::UPDATE_ROWS_EVENT_V1, ConstEventType::UPDATE_ROWS_EVENT_V2] - ); + /** + * + */ + public function run() + { + $pid = pcntl_fork(); + if ($pid == -1) + { + die('could not fork'); + } + else if ($pid) + { + $this->consume(); + pcntl_wait($status); + } + else + { + $this->produce(); + } } - public function consume() + /** + * + */ + private function consume() { $start = microtime(true); $i = 0; while (1) { - $result = $this->binLogStream->analysisBinLog(); + $result = $this->binLogStream->getBinLogEvent(); if ($result instanceof UpdateRowsDTO) { $i += 1; @@ -74,34 +105,22 @@ public function consume() } } - public function run() - { - $pid = pcntl_fork(); - if ($pid == -1) { - die('could not fork'); - } else if ($pid) { - $this->consume(); - pcntl_wait($status); - } else { - $this->produce(); - } - - } - - public function produce() + /** + * @throws \Doctrine\DBAL\DBALException + */ + private function produce() { - $this->conn = (new DBHelper($this->config))->getConnection(); - $this->conn->exec("USE " . $this->database); + $conn = $this->getConnection(); echo 'Start insert data' . PHP_EOL; while (1) { - - $this->conn->exec("UPDATE test SET i = i + 1;"); - $this->conn->exec("UPDATE test2 SET i = i + 1;"); + $conn->exec("UPDATE test SET i = i + 1;"); + $conn->exec("UPDATE test2 SET i = i + 1;"); } - } + $conn->close(); + } } (new Benchmark())->run(); \ No newline at end of file diff --git a/example/dump_events.php b/example/dump_events.php index 625e691..46b77cd 100755 --- a/example/dump_events.php +++ b/example/dump_events.php @@ -5,15 +5,20 @@ include __DIR__ . '/../vendor/autoload.php'; -use MySQLReplication\Service\BinLogStream; -use MySQLReplication\Config\Config; +use MySQLReplication\BinLogStream; +use MySQLReplication\Config\ConfigService; $binLogStream = new BinLogStream( - new Config('root', '192.168.1.100', 3306, 'root') + (new ConfigService())->makeConfigFromArray([ + 'user' => 'root', + 'host' => '192.168.1.100', + 'password' => 'root', + //'gtid' => '9b1c8d18-2a76-11e5-a26b-000c2976f3f3:1-177592', + ]) ); while (1) { - $result = $binLogStream->analysisBinLog(); + $result = $binLogStream->getBinLogEvent(); if (!is_null($result)) { // all events got __toString() implementation diff --git a/src/MySQLReplication/Pack/PackAuth.php b/src/MySQLReplication/BinLog/BinLogAuth.php similarity index 56% rename from src/MySQLReplication/Pack/PackAuth.php rename to src/MySQLReplication/BinLog/BinLogAuth.php index ba1634f..0a0227a 100755 --- a/src/MySQLReplication/Pack/PackAuth.php +++ b/src/MySQLReplication/BinLog/BinLogAuth.php @@ -1,28 +1,44 @@ packageMaxLength); $data .= chr(33); for ($i = 0; $i < 23; $i++) { @@ -31,10 +47,6 @@ public static function initPack($flag, $user, $pass, $salt, $db = '') $result = sha1($pass, true) ^ sha1($salt . sha1(sha1($pass, true), true), true); $data = $data . $user . chr(0) . chr(strlen($result)) . $result; - if ($db) - { - $data .= $db . chr(0); - } $str = pack('L', strlen($data)); $s = $str[0] . $str[1] . $str[2]; $data = $s . chr(1) . $data; @@ -43,25 +55,26 @@ public static function initPack($flag, $user, $pass, $salt, $db = '') } /** - * @param $pack + * @param string $packet * @return array * @throws BinLogException */ - public static function success($pack) + public function isWriteSuccessful($packet) { - $head = ord($pack[0]); - if (in_array($head, ConstAuth::$OK_PACK_HEAD)) + $head = ord($packet[0]); + if (in_array($head, $this->packageOkHeader)) { return ['status' => true, 'code' => 0, 'msg' => '']; } else { - $error_code = unpack('v', $pack[1] . $pack[2])[1]; + $error_code = unpack('v', $packet[1] . $packet[2])[1]; $error_msg = ''; - for ($i = 9; $i < strlen($pack); $i++) + for ($i = 9; $i < strlen($packet); $i++) { - $error_msg .= $pack[$i]; + $error_msg .= $packet[$i]; } + throw new BinLogException($error_msg, $error_code); } } diff --git a/src/MySQLReplication/BinLog/BinLogColumns.php b/src/MySQLReplication/BinLog/BinLogColumns.php index 6104787..6ad08de 100755 --- a/src/MySQLReplication/BinLog/BinLogColumns.php +++ b/src/MySQLReplication/BinLog/BinLogColumns.php @@ -3,84 +3,87 @@ namespace MySQLReplication\BinLog; use MySQLReplication\Definitions\ConstFieldType; +use MySQLReplication\Exception\BinLogException; +use MySQLReplication\BinaryDataReader\BinaryDataReader; /** * Class BinLogColumns + * @package MySQLReplication\BinLog */ class BinLogColumns { /** - * @var [] + * @var array */ private static $field; /** - * @param string $column_type - * @param string $column_schema - * @param BinLogPack $packet + * @param string $columnType + * @param array $columnSchema + * @param BinaryDataReader $binaryDataReader * @return array */ - public static function parse($column_type, $column_schema, BinLogPack $packet) + public static function parse($columnType, array $columnSchema, BinaryDataReader $binaryDataReader) { self::$field = []; - self::$field['type'] = $column_type; - self::$field['name'] = $column_schema['COLUMN_NAME']; - self::$field['collation_name'] = $column_schema['COLLATION_NAME']; - self::$field['character_set_name'] = $column_schema['CHARACTER_SET_NAME']; - self::$field['comment'] = $column_schema['COLUMN_COMMENT']; - self::$field['unsigned'] = stripos($column_schema['COLUMN_TYPE'], 'unsigned') === false ? false : true; + self::$field['type'] = $columnType; + self::$field['name'] = $columnSchema['COLUMN_NAME']; + self::$field['collation_name'] = $columnSchema['COLLATION_NAME']; + self::$field['character_set_name'] = $columnSchema['CHARACTER_SET_NAME']; + self::$field['comment'] = $columnSchema['COLUMN_COMMENT']; + self::$field['unsigned'] = stripos($columnSchema['COLUMN_TYPE'], 'unsigned') === false ? false : true; self::$field['type_is_bool'] = false; - self::$field['is_primary'] = $column_schema['COLUMN_KEY'] == 'PRI'; + self::$field['is_primary'] = $columnSchema['COLUMN_KEY'] == 'PRI'; if (self::$field['type'] == ConstFieldType::VARCHAR) { - self::$field['max_length'] = unpack('s', $packet->read(2))[1]; + self::$field['max_length'] = $binaryDataReader->readInt16(); } elseif (self::$field['type'] == ConstFieldType::DOUBLE) { - self::$field['size'] = $packet->readUint8(); + self::$field['size'] = $binaryDataReader->readUint8(); } elseif (self::$field['type'] == ConstFieldType::FLOAT) { - self::$field['size'] = $packet->readUint8(); + self::$field['size'] = $binaryDataReader->readUint8(); } elseif (self::$field['type'] == ConstFieldType::TIMESTAMP2) { - self::$field['fsp'] = $packet->readUint8(); + self::$field['fsp'] = $binaryDataReader->readUint8(); } elseif (self::$field['type'] == ConstFieldType::DATETIME2) { - self::$field['fsp'] = $packet->readUint8(); + self::$field['fsp'] = $binaryDataReader->readUint8(); } elseif (self::$field['type'] == ConstFieldType::TIME2) { - self::$field['fsp'] = $packet->readUint8(); + self::$field['fsp'] = $binaryDataReader->readUint8(); } - elseif (self::$field['type'] == ConstFieldType::TINY && $column_schema['COLUMN_TYPE'] == 'tinyint(1)') + elseif (self::$field['type'] == ConstFieldType::TINY && $columnSchema['COLUMN_TYPE'] == 'tinyint(1)') { self::$field['type_is_bool'] = true; } elseif (self::$field['type'] == ConstFieldType::VAR_STRING || self::$field['type'] == ConstFieldType::STRING) { - self::_read_string_metadata($packet, $column_schema); + self::getFieldSpecial($binaryDataReader, $columnSchema); } elseif (self::$field['type'] == ConstFieldType::BLOB) { - self::$field['length_size'] = $packet->readUint8(); + self::$field['length_size'] = $binaryDataReader->readUint8(); } elseif (self::$field['type'] == ConstFieldType::GEOMETRY) { - self::$field['length_size'] = $packet->readUint8(); + self::$field['length_size'] = $binaryDataReader->readUint8(); } elseif (self::$field['type'] == ConstFieldType::NEWDECIMAL) { - self::$field['precision'] = $packet->readUint8(); - self::$field['decimals'] = $packet->readUint8(); + self::$field['precision'] = $binaryDataReader->readUint8(); + self::$field['decimals'] = $binaryDataReader->readUint8(); } elseif (self::$field['type'] == ConstFieldType::BIT) { - $bits = $packet->readUint8(); - $bytes = $packet->readUint8(); + $bits = $binaryDataReader->readUint8(); + $bytes = $binaryDataReader->readUint8(); self::$field['bits'] = ($bytes * 8) + $bits; self::$field['bytes'] = (int)((self::$field['bits'] + 7) / 8); } @@ -89,19 +92,19 @@ public static function parse($column_type, $column_schema, BinLogPack $packet) } /** - * @param BinLogPack $packet - * @param $column_schema + * @param BinaryDataReader $packet + * @param array $columnSchema + * @throws BinLogException */ - private static function _read_string_metadata(BinLogPack $packet, $column_schema) + private static function getFieldSpecial(BinaryDataReader $packet, array $columnSchema) { - $metadata = ($packet->readUint8() << 8) + $packet->readUint8(); $real_type = $metadata >> 8; if ($real_type == ConstFieldType::SET || $real_type == ConstFieldType::ENUM) { self::$field['type'] = $real_type; self::$field['size'] = $metadata & 0x00ff; - self::_read_enum_metadata($column_schema); + self::getFieldSpecialValues($columnSchema); } else { @@ -110,24 +113,22 @@ private static function _read_string_metadata(BinLogPack $packet, $column_schema } /** - * @param $column_schema + * @param $columnSchema + * @throws BinLogException */ - private static function _read_enum_metadata($column_schema) + private static function getFieldSpecialValues($columnSchema) { - $enums = $column_schema['COLUMN_TYPE']; if (self::$field['type'] == ConstFieldType::ENUM) { - $enums = str_replace('enum(', '', $enums); - $enums = str_replace(')', '', $enums); - $enums = str_replace('\'', '', $enums); - self::$field['enum_values'] = explode(',', $enums); + self::$field['enum_values'] = explode(',', str_replace(['enum(', ')', '\''], '', $columnSchema['COLUMN_TYPE'])); + } + else if (self::$field['type'] == ConstFieldType::SET) + { + self::$field['set_values'] = explode(',', str_replace(['set(', ')', '\''], '', $columnSchema['COLUMN_TYPE'])); } else { - $enums = str_replace('set(', '', $enums); - $enums = str_replace(')', '', $enums); - $enums = str_replace('\'', '', $enums); - self::$field['set_values'] = explode(',', $enums); + throw new BinLogException('Type not handled! - ' . self::$field['type']); } } } \ No newline at end of file diff --git a/src/MySQLReplication/BinLog/Connect.php b/src/MySQLReplication/BinLog/BinLogConnect.php similarity index 64% rename from src/MySQLReplication/BinLog/Connect.php rename to src/MySQLReplication/BinLog/BinLogConnect.php index d2fe35b..3195155 100755 --- a/src/MySQLReplication/BinLog/Connect.php +++ b/src/MySQLReplication/BinLog/BinLogConnect.php @@ -2,103 +2,90 @@ namespace MySQLReplication\BinLog; use MySQLReplication\Config\Config; -use MySQLReplication\DataBase\DBHelper; +use MySQLReplication\Repository\MySQLRepository; use MySQLReplication\Definitions\ConstCapabilityFlags; use MySQLReplication\Definitions\ConstCommand; use MySQLReplication\Exception\BinLogException; -use MySQLReplication\Pack\GtidSet; -use MySQLReplication\Pack\PackAuth; -use MySQLReplication\Pack\ServerInfo; +use MySQLReplication\Gtid\GtidCollection; /** * Class Connect */ -class Connect +class BinLogConnect { /** * @var resource */ private $socket; - /** - * @var int - */ - private $binFilePos; - /** - * @var string - */ - private $binFileName; - /** - * @var string - */ - private $gtID; - /** - * @var int - */ - private $slaveId = 666; /** * @var bool */ private $checkSum = false; /** - * @var DBHelper + * @var MySQLRepository */ private $DBHelper; /** * @var Config */ private $config; + /** + * @var BinLogAuth + */ + private $packAuth; + /** + * @var GtidCollection + */ + private $gtidCollection; /** * @param Config $config - * @param DBHelper $DBHelper - * @param string $gtID - * @param string $logFile - * @param string $logPos - * @param string $slave_id - * @throws BinLogException + * @param MySQLRepository $DBHelper + * @param BinLogAuth $packAuth + * @param GtidCollection $gtidCollection */ public function __construct( Config $config, - DBHelper $DBHelper, - $gtID = '', - $logFile = '', - $logPos = '', - $slave_id = '' + MySQLRepository $DBHelper, + BinLogAuth $packAuth, + GtidCollection $gtidCollection ) { $this->DBHelper = $DBHelper; $this->config = $config; + $this->packAuth = $packAuth; + $this->gtidCollection = $gtidCollection; + } - $this->slaveId = empty($slave_id) ? $this->slaveId : $slave_id; - $this->gtID = $gtID; - $this->binFilePos = $logPos; - $this->binFileName = $logFile; - - $this->connectToSocket(); + public function __destruct() + { + if (true === $this->isConnected()) + { + socket_shutdown($this->socket); + socket_close($this->socket); + } } /** * @return bool */ - public function getCheckSum() + public function isConnected() { - return $this->checkSum; + return is_resource($this->socket); } - public function __destruct() + /** + * @return bool + */ + public function getCheckSum() { - if (true === $this->isConnected()) - { - socket_shutdown($this->socket); - socket_close($this->socket); - } - $this->socket = null; + return $this->checkSum; } /** * @throws BinLogException * @return self */ - private function connectToSocket() + public function connectToStream() { if (false === ($this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP))) { @@ -119,7 +106,7 @@ private function connectToSocket() private function serverInfo() { - ServerInfo::run($this->getPacket(false)); + BinLogServerInfo::parsePackage($this->getPacket(false)); } /** @@ -139,28 +126,19 @@ public function getPacket($checkForOkByte = true) $result = $this->readFromSocket($dataLength); if (true === $checkForOkByte) { - PackAuth::success($result); + $this->packAuth->isWriteSuccessful($result); } return $result; } /** - * @return bool - */ - private function isConnected() - { - return is_resource($this->socket); - } - - /** - * @param $data_len + * @param $length * @return string * @throws BinLogException */ - private function readFromSocket($data_len) + private function readFromSocket($length) { - // server gone away - if ($data_len == 5) + if ($length == 5) { throw new BinLogException('read 5 bytes from mysql server has gone away'); } @@ -169,9 +147,9 @@ private function readFromSocket($data_len) { $bytes_read = 0; $body = ''; - while ($bytes_read < $data_len) + while ($bytes_read < $length) { - $resp = socket_read($this->socket, $data_len - $bytes_read); + $resp = socket_read($this->socket, $length - $bytes_read); if ($resp === false) { throw new BinLogException(socket_strerror(socket_last_error()), socket_last_error()); @@ -180,18 +158,17 @@ private function readFromSocket($data_len) // server kill connection or server gone away if (strlen($resp) === 0) { - throw new BinLogException('read less ' . ($data_len - strlen($body))); + throw new BinLogException('read less ' . ($length - strlen($body))); } $body .= $resp; $bytes_read += strlen($resp); } - if (strlen($body) < $data_len) + if (strlen($body) < $length) { - throw new BinLogException('read less ' . ($data_len - strlen($body))); + throw new BinLogException('read less ' . ($length - strlen($body))); } return $body; - } - catch (\Exception $e) + } catch (\Exception $e) { throw new BinLogException(var_export($e, true)); } @@ -202,7 +179,7 @@ private function readFromSocket($data_len) */ private function auth() { - $data = PackAuth::initPack(ConstCapabilityFlags::getCapabilities(), $this->config->getUser(), $this->config->getPassword(), ServerInfo::getSalt()); + $data = $this->packAuth->createAuthenticationPacket(ConstCapabilityFlags::getCapabilities(), $this->config->getUser(), $this->config->getPassword(), BinLogServerInfo::getSalt()); $this->writeToSocket($data); $this->getPacket(); @@ -210,7 +187,6 @@ private function auth() /** * @param $data - * @return bool * @throws BinLogException */ private function writeToSocket($data) @@ -232,37 +208,37 @@ private function getBinlogStream() $this->execute('SET @master_binlog_checksum=@@global.binlog_checksum'); } - if ('' === $this->gtID) + if (0 === $this->gtidCollection->count()) { - if ('' === $this->binFilePos || '' === $this->binFileName) + $binFilePos = $this->config->getBinLogPosition(); + $binFileName = $this->config->getBinLogFileName(); + + if ('' === $binFilePos || '' === $binFileName) { $master = $this->DBHelper->getMasterStatus(); - $this->binFilePos = $master['Position']; - $this->binFileName = $master['File']; + $binFilePos = $master['Position']; + $binFileName = $master['File']; } - $prelude = pack('i', strlen($this->binFileName) + 11) . chr(ConstCommand::COM_BINLOG_DUMP); - $prelude .= pack('I', $this->binFilePos); + $prelude = pack('i', strlen($binFileName) + 11) . chr(ConstCommand::COM_BINLOG_DUMP); + $prelude .= pack('I', $binFilePos); $prelude .= pack('v', 0); - $prelude .= pack('I', $this->slaveId); - $prelude .= $this->binFileName; + $prelude .= pack('I', $this->config->getSlaveId()); + $prelude .= $binFileName; } else { - $gtID = new GtidSet($this->gtID); - $encoded_data_size = $gtID->encoded_length(); - - $prelude = pack('l', 26 + $encoded_data_size) . chr(ConstCommand::COM_BINLOG_DUMP_GTID); + $prelude = pack('l', 26 + $this->gtidCollection->getEncodedPacketLength()) . chr(ConstCommand::COM_BINLOG_DUMP_GTID); $prelude .= pack('S', 0); - $prelude .= pack('I', $this->slaveId); + $prelude .= pack('I', $this->config->getSlaveId()); $prelude .= pack('I', 3); $prelude .= chr(0); $prelude .= chr(0); $prelude .= chr(0); $prelude .= pack('Q', 4); - $prelude .= pack('I', $gtID->encoded_length()); - $prelude .= $gtID->encoded(); + $prelude .= pack('I', $this->gtidCollection->getEncodedPacketLength()); + $prelude .= $this->gtidCollection->getEncodedPacket(); } $this->writeToSocket($prelude); diff --git a/src/MySQLReplication/BinLog/BinLogEvent.php b/src/MySQLReplication/BinLog/BinLogEvent.php deleted file mode 100755 index 9f41417..0000000 --- a/src/MySQLReplication/BinLog/BinLogEvent.php +++ /dev/null @@ -1,119 +0,0 @@ -unpackUInt64(self::$BinLogPack->read(6) . chr(0) . chr(0)); - } - - /** - * @param string $bitmap - * @return int - */ - protected static function bitCount($bitmap) - { - $n = 0; - for ($i = 0; $i < strlen($bitmap); $i++) - { - $bit = $bitmap[$i]; - if (is_string($bit)) - { - $bit = ord($bit); - } - $n += self::$bitCountInByte[$bit]; - } - - return $n; - } -} \ No newline at end of file diff --git a/src/MySQLReplication/BinLog/BinLogPack.php b/src/MySQLReplication/BinLog/BinLogPack.php deleted file mode 100755 index 8678c14..0000000 --- a/src/MySQLReplication/BinLog/BinLogPack.php +++ /dev/null @@ -1,478 +0,0 @@ -DBHelper = $DBHelper; - } - - /** - * @param $pack - * @param bool|true $checkSum - * @param array $onlyEvents - * @param array $ignoredEvents - * @param array $onlyTables - * @param array $onlyDatabases - * @return WriteRowsDTO|UpdateRowsDTO|DeleteRowsDTO|XidDTO|EventDTO|QueryDTO|GTIDLogDTO|RotateDTO|TableMapDTO - */ - public function init( - $pack, - $checkSum = true, - array $onlyEvents = [], - array $ignoredEvents = [], - array $onlyTables = [], - array $onlyDatabases = [] - ) { - $this->buffer = $pack; - $this->readBytes = 0; - $this->eventInfo = []; - - // "ok" value on first byte - $this->advance(1); - - $this->eventInfo = unpack('Vtime/Ctype/Vid/Vsize/Vpos/vflag', $this->read(19)); - $this->eventInfo['date'] = (new \DateTime())->setTimestamp($this->eventInfo['time'])->format('c'); - - $event_size_without_header = true === $checkSum ? ($this->eventInfo['size'] - 23) : ($this->eventInfo['size'] - 19); - - if ($this->eventInfo['type'] == ConstEventType::TABLE_MAP_EVENT) - { - return RowEvent::tableMap($this, $this->DBHelper, $this->eventInfo, $event_size_without_header); - } - - if (!empty($onlyEvents) && !in_array($this->eventInfo['type'], $onlyEvents)) - { - return null; - } - - if (in_array($this->eventInfo['type'], $ignoredEvents)) - { - return null; - } - - if (in_array($this->eventInfo['type'], [ - ConstEventType::UPDATE_ROWS_EVENT_V1, - ConstEventType::UPDATE_ROWS_EVENT_V2 - ])) - { - return RowEvent::updateRow($this, $this->eventInfo, $event_size_without_header, $onlyTables, $onlyDatabases); - } - elseif (in_array($this->eventInfo['type'], [ - ConstEventType::WRITE_ROWS_EVENT_V1, - ConstEventType::WRITE_ROWS_EVENT_V2 - ])) - { - return RowEvent::addRow($this, $this->eventInfo, $event_size_without_header, $onlyTables, $onlyDatabases); - } - elseif (in_array($this->eventInfo['type'], [ - ConstEventType::DELETE_ROWS_EVENT_V1, - ConstEventType::DELETE_ROWS_EVENT_V2 - ])) - { - return RowEvent::delRow($this, $this->eventInfo, $event_size_without_header, $onlyTables, $onlyDatabases); - } - elseif ($this->eventInfo['type'] == ConstEventType::XID_EVENT) - { - return new XidDTO( - $this->eventInfo['date'], - $this->eventInfo['pos'], - $this->eventInfo['size'], - $event_size_without_header, - $this->readUInt64() - ); - } - elseif ($this->eventInfo['type'] == ConstEventType::ROTATE_EVENT) - { - $pos = $this->readUInt64(); - $binFileName = $this->read($event_size_without_header - 8); - - return new RotateDTO( - $this->eventInfo['date'], - $this->eventInfo['pos'], - $this->eventInfo['size'], - $event_size_without_header, - $pos, - $binFileName - ); - } - elseif ($this->eventInfo['type'] == ConstEventType::GTID_LOG_EVENT) - { - //gtid event - $commit_flag = $this->readUInt8() == 1; - $sid = unpack('H*', $this->read(16))[1]; - $gno = $this->readUInt64(); - - return new GTIDLogDTO( - $this->eventInfo['date'], - $this->eventInfo['pos'], - $this->eventInfo['size'], - $event_size_without_header, - $commit_flag, - vsprintf('%s%s%s%s%s%s%s%s-%s%s%s%s-%s%s%s%s-%s%s%s%s-%s%s%s%s%s%s%s%s%s%s%s%s', str_split($sid)) . ':' . $gno - ); - } - else if ($this->eventInfo['type'] == ConstEventType::QUERY_EVENT) - { - $this->advance(4); - $execution_time = $this->readUInt32(); - $schema_length = $this->readUInt8(); - $this->advance(2); - $status_vars_length = $this->readUInt16(); - $this->advance($status_vars_length); - $schema = $this->read($schema_length); - $this->advance(1); - $query = $this->read($this->eventInfo['size'] - 36 - $status_vars_length - $schema_length - 1); - - return new QueryDTO( - $this->eventInfo['date'], - $this->eventInfo['pos'], - $this->eventInfo['size'], - $event_size_without_header, - $schema, - $execution_time, - $query - ); - } - - return null; - } - - /** - * @param int $length - */ - public function advance($length) - { - $this->read($length); - } - - /** - * @param int $length - * @return string - * @throws BinLogException - */ - public function read($length) - { - $length = (int)$length; - $return = substr($this->buffer, 0, $length); - $this->readBytes += $length; - $this->buffer = substr($this->buffer, $length); - return $return; - } - - /** - * @return int - */ - public function readUInt64() - { - return $this->unpackUInt64($this->read(8)); - } - - /** - * @param string $data - * @return string - */ - public function unpackUInt64($data) - { - $data = unpack('V*', $data); - return bcadd($data[1], bcmul($data[2], bcpow(2, 32))); - } - - /** - * @return int - */ - public function readUInt8() - { - return unpack('C', $this->read(1))[1]; - } - - /** - * @return int - */ - public function readUInt32() - { - return unpack('I', $this->read(4))[1]; - } - - /** - * @return int - */ - public function readUInt16() - { - return unpack('v', $this->read(2))[1]; - } - - /** - * Push again data in data buffer. It's use when you want - * to extract a bit from a value a let the rest of the code normally - * read the data - * - * @param string $data - */ - public function unread($data) - { - $this->readBytes -= strlen($data); - $this->buffer = $data . $this->buffer; - } - - /** - * @see read a 'Length Coded Binary' number from the data buffer. - * Length coded numbers can be anywhere from 1 to 9 bytes depending - * on the value of the first byte. - * From PyMYSQL source code - * - * @return int|string - */ - public function readCodedBinary() - { - $c = ord($this->read(1)); - if ($c == ConstMy::NULL_COLUMN) - { - return ''; - } - if ($c < ConstMy::UNSIGNED_CHAR_COLUMN) - { - return $c; - } - elseif ($c == ConstMy::UNSIGNED_SHORT_COLUMN) - { - return $this->readUInt16(); - - } - elseif ($c == ConstMy::UNSIGNED_INT24_COLUMN) - { - return $this->readUInt24(); - } - elseif ($c == ConstMy::UNSIGNED_INT64_COLUMN) - { - return $this->readUInt64(); - } - return $c; - } - - /** - * @return int - */ - public function readUInt24() - { - $data = unpack('C3', $this->read(3)); - return $data[1] + ($data[2] << 8) + ($data[3] << 16); - } - - /** - * @return int - */ - public function readInt24() - { - $data = unpack('C3', $this->read(3)); - - $res = $data[1] | ($data[2] << 8) | ($data[3] << 16); - if ($res >= 0x800000) - { - $res -= 0x1000000; - } - return $res; - } - - /** - * @return string - */ - public function readInt64() - { - $data = unpack('V*', $this->read(8)); - return bcadd($data[1], ($data[2] << 32)); - } - - /** - * @param int $size - * @return string - * @throws BinLogException - */ - public function readLengthCodedPascalString($size) - { - return $this->read($this->readUIntBySize($size)); - } - - /** - * Read a little endian integer values based on byte number - * - * @param $size - * @return mixed - * @throws BinLogException - */ - public function readUIntBySize($size) - { - if ($size == 1) - { - return $this->readUInt8(); - } - elseif ($size == 2) - { - return $this->readUInt16(); - } - elseif ($size == 3) - { - return $this->readUInt24(); - } - elseif ($size == 4) - { - return $this->readUInt32(); - } - elseif ($size == 5) - { - return $this->readUInt40(); - } - elseif ($size == 6) - { - return $this->readUInt48(); - } - elseif ($size == 7) - { - return $this->readUInt56(); - } - elseif ($size == 8) - { - return $this->readUInt64(); - } - - throw new BinLogException('$size ' . $size . ' not handled'); - } - - /** - * @return mixed - */ - public function readUInt40() - { - $data = unpack('CI', $this->read(5)); - return $data[1] + ($data[2] << 8); - } - - /** - * @return mixed - */ - public function readUInt48() - { - $data = unpack('v3', $this->read(6)); - return $data[1] + ($data[2] << 16) + ($data[3] << 32); - } - - /** - * @return mixed - */ - public function readUInt56() - { - $data = unpack('CSI', $this->read(7)); - return $data[1] + ($data[2] << 8) + ($data[3] << 24); - } - - /** - * Read a big endian integer values based on byte number - * - * @param int $size - * @return int - * @throws BinLogException - */ - public function readIntBeBySize($size) - { - if ($size == 1) - { - return unpack('c', $this->read($size))[1]; - } - elseif ($size == 2) - { - return unpack('n', $this->read($size))[1]; - } - elseif ($size == 3) - { - return $this->readInt24Be(); - } - elseif ($size == 4) - { - return unpack('i', strrev($this->read(4)))[1]; - } - elseif ($size == 5) - { - return $this->readInt40Be(); - } - elseif ($size == 8) - { - return unpack('l', $this->read($size))[1]; - } - - throw new BinLogException('$size ' . $size . ' not handled'); - } - - /** - * @return int - */ - public function readInt24Be() - { - $data = unpack('C3', $this->read(3)); - $res = ($data[1] << 16) | ($data[2] << 8) | $data[3]; - if ($res >= 0x800000) - { - $res -= 0x1000000; - } - return $res; - } - - /** - * @return int - */ - public function readInt40Be() - { - $data1 = unpack('N', $this->read(4))[1]; - $data2 = unpack('C', $this->read(1))[1]; - return $data2 + ($data1 << 8); - } - - /** - * @param int $size - * @return bool - */ - public function isComplete($size) - { - if ($this->readBytes + 1 - 20 < $size) - { - return false; - } - return true; - } -} diff --git a/src/MySQLReplication/Pack/ServerInfo.php b/src/MySQLReplication/BinLog/BinLogServerInfo.php similarity index 69% rename from src/MySQLReplication/Pack/ServerInfo.php rename to src/MySQLReplication/BinLog/BinLogServerInfo.php index d793eab..62d71db 100755 --- a/src/MySQLReplication/Pack/ServerInfo.php +++ b/src/MySQLReplication/BinLog/BinLogServerInfo.php @@ -1,30 +1,30 @@ connection = DriverManager::getConnection([ + 'user' => $config->getUser(), + 'password' => $config->getPassword(), + 'host' => $config->getHost(), + 'port' => $config->getPort(), + 'driver' => 'pdo_mysql', + ]); + $this->binLogAuth = new BinLogAuth(); + $this->MySQLRepository = new MySQLRepository($this->connection); + $this->GtidCollection = (new GtidService())->makeCollectionFromString($config->getGtid()); + $this->binLogConnect = new BinLogConnect($config, $this->MySQLRepository, $this->binLogAuth, $this->GtidCollection); + $this->binLogConnect->connectToStream(); + $this->packageService = new BinaryDataReaderService(); + $this->rowEventService = new RowEventService($config, $this->MySQLRepository); + $this->binLogPack = new Event($config, $this->binLogConnect, $this->MySQLRepository, $this->packageService, $this->rowEventService); + } + + /** + * @return \Doctrine\DBAL\Connection + */ + public function getDbConnection() + { + return $this->connection; + } + + /** + * @return DeleteRowsDTO|EventDTO|GTIDLogDTO|QueryDTO|\MySQLReplication\Event\DTO\RotateDTO|TableMapDTO|UpdateRowsDTO|WriteRowsDTO|\MySQLReplication\Event\DTO\XidDTO + * @throws \MySQLReplication\Exception\BinLogException + */ + public function getBinLogEvent() + { + return $this->binLogPack->consume(); + } +} \ No newline at end of file diff --git a/src/MySQLReplication/BinaryDataReader/BinaryDataReader.php b/src/MySQLReplication/BinaryDataReader/BinaryDataReader.php new file mode 100755 index 0000000..9852f32 --- /dev/null +++ b/src/MySQLReplication/BinaryDataReader/BinaryDataReader.php @@ -0,0 +1,409 @@ +binaryData = $binaryData; + } + + /** + * @param int $length + */ + public function advance($length) + { + $this->read($length); + } + + /** + * @param int $length + * @return string + * @throws BinLogException + */ + public function read($length) + { + $length = (int)$length; + $return = substr($this->binaryData, 0, $length); + $this->readBytes += $length; + $this->binaryData = substr($this->binaryData, $length); + + return $return; + } + + /** + * @return int + */ + public function readInt16() + { + return unpack('s', $this->read(self::UNSIGNED_SHORT_LENGTH))[1]; + } + + /** + * Push again data in data buffer. It's use when you want + * to extract a bit from a value a let the rest of the code normally + * read the data + * + * @param string $data + */ + public function unread($data) + { + $this->readBytes -= strlen($data); + $this->binaryData = $data . $this->binaryData; + } + + /** + * @see read a 'Length Coded Binary' number from the data buffer. + * Length coded numbers can be anywhere from 1 to 9 bytes depending + * on the value of the first byte. + * From PyMYSQL source code + * + * @return int|string + */ + public function readCodedBinary() + { + $c = ord($this->read(self::UNSIGNED_CHAR_LENGTH)); + if ($c == self::NULL_COLUMN) + { + return ''; + } + if ($c < self::UNSIGNED_CHAR_COLUMN) + { + return $c; + } + elseif ($c == self::UNSIGNED_SHORT_COLUMN) + { + return $this->readUInt16(); + + } + elseif ($c == self::UNSIGNED_INT24_COLUMN) + { + return $this->readUInt24(); + } + elseif ($c == self::UNSIGNED_INT64_COLUMN) + { + return $this->readUInt64(); + } + + return $c; + } + + /** + * @return int + */ + public function readUInt16() + { + return unpack('v', $this->read(self::UNSIGNED_SHORT_LENGTH))[1]; + } + + /** + * @return int + */ + public function readUInt24() + { + $data = unpack('C3', $this->read(self::UNSIGNED_INT24_LENGTH)); + return $data[1] + ($data[2] << 8) + ($data[3] << 16); + } + + /** + * @return int + */ + public function readUInt64() + { + return $this->unpackUInt64($this->read(self::UNSIGNED_INT64_LENGTH)); + } + + /** + * @param string $data + * @return string + */ + public function unpackUInt64($data) + { + $data = unpack('V*', $data); + return bcadd($data[1], bcmul($data[2], bcpow(2, 32))); + } + + /** + * @return int + */ + public function readInt24() + { + $data = unpack('C3', $this->read(self::UNSIGNED_INT24_LENGTH)); + + $res = $data[1] | ($data[2] << 8) | ($data[3] << 16); + if ($res >= 0x800000) + { + $res -= 0x1000000; + } + return $res; + } + + /** + * @return string + */ + public function readInt64() + { + $data = unpack('V*', $this->read(self::UNSIGNED_INT64_LENGTH)); + return bcadd($data[1], ($data[2] << 32)); + } + + /** + * @param int $size + * @return string + * @throws BinLogException + */ + public function readLengthCodedPascalString($size) + { + return $this->read($this->readUIntBySize($size)); + } + + /** + * Read a little endian integer values based on byte number + * + * @param $size + * @return mixed + * @throws BinLogException + */ + public function readUIntBySize($size) + { + if ($size == self::UNSIGNED_CHAR_LENGTH) + { + return $this->readUInt8(); + } + elseif ($size == self::UNSIGNED_SHORT_LENGTH) + { + return $this->readUInt16(); + } + elseif ($size == self::UNSIGNED_INT24_LENGTH) + { + return $this->readUInt24(); + } + elseif ($size == self::UNSIGNED_INT32_LENGTH) + { + return $this->readUInt32(); + } + elseif ($size == self::UNSIGNED_INT40_LENGTH) + { + return $this->readUInt40(); + } + elseif ($size == self::UNSIGNED_INT48_LENGTH) + { + return $this->readUInt48(); + } + elseif ($size == self::UNSIGNED_INT56_LENGTH) + { + return $this->readUInt56(); + } + elseif ($size == self::UNSIGNED_INT64_LENGTH) + { + return $this->readUInt64(); + } + + throw new BinLogException('$size ' . $size . ' not handled'); + } + + /** + * @return int + */ + public function readUInt8() + { + return unpack('C', $this->read(self::UNSIGNED_CHAR_LENGTH))[1]; + } + + /** + * @return int + */ + public function readUInt32() + { + return unpack('I', $this->read(self::UNSIGNED_INT32_LENGTH))[1]; + } + + /** + * @return mixed + */ + public function readUInt40() + { + $data = unpack('CI', $this->read(self::UNSIGNED_INT40_LENGTH)); + return $data[1] + ($data[2] << 8); + } + + /** + * @return mixed + */ + public function readUInt48() + { + $data = unpack('v3', $this->read(self::UNSIGNED_INT48_LENGTH)); + return $data[1] + ($data[2] << 16) + ($data[3] << 32); + } + + /** + * @return mixed + */ + public function readUInt56() + { + $data = unpack('CSI', $this->read(self::UNSIGNED_INT56_LENGTH)); + return $data[1] + ($data[2] << 8) + ($data[3] << 24); + } + + /** + * Read a big endian integer values based on byte number + * + * @param int $size + * @return int + * @throws BinLogException + */ + public function readIntBeBySize($size) + { + if ($size == self::UNSIGNED_CHAR_LENGTH) + { + return $this->readInt8(); + } + elseif ($size == self::UNSIGNED_SHORT_LENGTH) + { + return $this->readInt16Be(); + } + elseif ($size == self::UNSIGNED_INT24_LENGTH) + { + return $this->readInt24Be(); + } + elseif ($size == self::UNSIGNED_INT32_LENGTH) + { + return $this->readInt32Be(); + } + elseif ($size == self::UNSIGNED_INT40_LENGTH) + { + return $this->readInt40Be(); + } + + throw new BinLogException('$size ' . $size . ' not handled'); + } + + /** + * @return int + */ + public function readInt8() + { + return unpack('c', $this->read(self::UNSIGNED_CHAR_LENGTH))[1]; + } + + /** + * @return mixed + */ + public function readInt16Be() + { + return unpack('n', $this->read(self::UNSIGNED_SHORT_LENGTH))[1]; + } + + /** + * @return int + */ + public function readInt24Be() + { + $data = unpack('C3', $this->read(self::UNSIGNED_INT24_LENGTH)); + $res = ($data[1] << 16) | ($data[2] << 8) | $data[3]; + if ($res >= 0x800000) + { + $res -= 0x1000000; + } + return $res; + } + + /** + * @return int + */ + public function readInt32Be() + { + return unpack('i', strrev($this->read(self::UNSIGNED_INT32_LENGTH)))[1]; + } + + /** + * @return int + */ + public function readInt40Be() + { + $data1 = unpack('N', $this->read(self::UNSIGNED_INT32_LENGTH))[1]; + $data2 = unpack('C', $this->read(self::UNSIGNED_CHAR_LENGTH))[1]; + return $data2 + ($data1 << 8); + } + + /** + * @return int + */ + public function readInt32() + { + return unpack('i', $this->read(self::UNSIGNED_INT32_LENGTH))[1]; + } + + /** + * @return float + */ + public function readFloat() + { + return unpack('f', $this->read(self::UNSIGNED_FLOAT_LENGTH))[1]; + } + + /** + * @return double + */ + public function readDouble() + { + return unpack('d', $this->read(self::UNSIGNED_DOUBLE_LENGTH))[1]; + } + + /** + * @return string + */ + public function readTableId() + { + return $this->unpackUInt64($this->read(self::UNSIGNED_INT48_LENGTH) . chr(0) . chr(0)); + } + + /** + * @param int $size + * @return bool + */ + public function isComplete($size) + { + if ($this->readBytes + 1 - 20 < $size) + { + return false; + } + return true; + } +} \ No newline at end of file diff --git a/src/MySQLReplication/BinaryDataReader/BinaryDataReaderBuilder.php b/src/MySQLReplication/BinaryDataReader/BinaryDataReaderBuilder.php new file mode 100755 index 0000000..007e875 --- /dev/null +++ b/src/MySQLReplication/BinaryDataReader/BinaryDataReaderBuilder.php @@ -0,0 +1,23 @@ +binaryData = $binaryData; + } + + public function build() + { + return new BinaryDataReader( + $this->binaryData + ); + } +} \ No newline at end of file diff --git a/src/MySQLReplication/BinaryDataReader/BinaryDataReaderService.php b/src/MySQLReplication/BinaryDataReader/BinaryDataReaderService.php new file mode 100755 index 0000000..c73e9b5 --- /dev/null +++ b/src/MySQLReplication/BinaryDataReader/BinaryDataReaderService.php @@ -0,0 +1,22 @@ +withBinaryData($binaryData); + + return $packageBuilder->build(); + } +} \ No newline at end of file diff --git a/src/MySQLReplication/Config/Config.php b/src/MySQLReplication/Config/Config.php index 45c3800..72fb632 100755 --- a/src/MySQLReplication/Config/Config.php +++ b/src/MySQLReplication/Config/Config.php @@ -2,9 +2,6 @@ namespace MySQLReplication\Config; -/** - * Class Config - */ /** * Class Config * @package MySQLReplication\Config @@ -35,23 +32,71 @@ class Config * @var string */ private $charset; + /** + * @var string + */ + private $gtid; + /** + * @var int + */ + private $slaveId; + /** + * @var string + */ + private $binLogFileName; + /** + * @var int + */ + private $binLogPosition; + /** + * @var array + */ + private $eventsOnly; + /** + * @var array + */ + private $eventsIgnore; + /** + * @var array + */ + private $tablesOnly; + /** + * @var array + */ + private $databasesOnly; /** * Config constructor. - * @param $user - * @param $host - * @param $port - * @param $password + * @param string $user + * @param string $host + * @param int $port + * @param string $password * @param string $dbName * @param string $charset + * @param string $gtid + * @param int $slaveId + * @param string $binLogFileName + * @param $binLogPosition + * @param array $eventsOnly + * @param array $eventsIgnore + * @param array $tablesOnly + * @param array $databasesOnly */ public function __construct( $user, $host, $port, $password, - $dbName = '', - $charset = '' + $dbName, + $charset, + $gtid, + $slaveId, + $binLogFileName, + $binLogPosition, + array $eventsOnly, + array $eventsIgnore, + array $tablesOnly, + array $databasesOnly ) { $this->user = $user; $this->host = $host; @@ -59,18 +104,26 @@ public function __construct( $this->password = $password; $this->dbName = $dbName; $this->charset = $charset; + $this->gtid = $gtid; + $this->slaveId = $slaveId; + $this->binLogFileName = $binLogFileName; + $this->binLogPosition = $binLogPosition; + $this->eventsOnly = $eventsOnly; + $this->eventsIgnore = $eventsIgnore; + $this->tablesOnly = $tablesOnly; + $this->databasesOnly = $databasesOnly; } /** - * @return mixed + * @return string */ - public function getCharset() + public function getUser() { - return $this->charset; + return $this->user; } /** - * @return mixed + * @return string */ public function getHost() { @@ -78,7 +131,7 @@ public function getHost() } /** - * @return mixed + * @return int */ public function getPort() { @@ -86,7 +139,7 @@ public function getPort() } /** - * @return mixed + * @return string */ public function getPassword() { @@ -94,7 +147,7 @@ public function getPassword() } /** - * @return mixed + * @return string */ public function getDbName() { @@ -102,10 +155,75 @@ public function getDbName() } /** - * @return mixed + * @return string */ - public function getUser() + public function getCharset() { - return $this->user; + return $this->charset; + } + + /** + * @return string + */ + public function getGtid() + { + return $this->gtid; } + + /** + * @return int + */ + public function getSlaveId() + { + return $this->slaveId; + } + + /** + * @return string + */ + public function getBinLogFileName() + { + return $this->binLogFileName; + } + + /** + * @return int + */ + public function getBinLogPosition() + { + return $this->binLogPosition; + } + + /** + * @return array + */ + public function getEventsOnly() + { + return $this->eventsOnly; + } + + /** + * @return array + */ + public function getEventsIgnore() + { + return $this->eventsIgnore; + } + + /** + * @return array + */ + public function getTablesOnly() + { + return $this->tablesOnly; + } + + /** + * @return array + */ + public function getDatabasesOnly() + { + return $this->databasesOnly; + } + } \ No newline at end of file diff --git a/src/MySQLReplication/Config/ConfigBuilder.php b/src/MySQLReplication/Config/ConfigBuilder.php new file mode 100755 index 0000000..660a1d8 --- /dev/null +++ b/src/MySQLReplication/Config/ConfigBuilder.php @@ -0,0 +1,202 @@ +user, + $this->host, + $this->port, + $this->password, + $this->dbName, + $this->charset, + $this->gtid, + $this->slaveId, + $this->binLogFileName, + $this->binLogPosition, + $this->eventsOnly, + $this->eventsIgnore, + $this->tablesOnly, + $this->databasesOnly + ); + } + /** + * @param string $user + */ + public function withUser($user) + { + $this->user = $user; + } + + /** + * @param string $host + */ + public function withHost($host) + { + $this->host = $host; + } + + /** + * @param int $port + */ + public function withPort($port) + { + $this->port = $port; + } + + /** + * @param string $password + */ + public function withPassword($password) + { + $this->password = $password; + } + + /** + * @param string $dbName + */ + public function withDbName($dbName) + { + $this->dbName = $dbName; + } + + /** + * @param string $charset + */ + public function withCharset($charset) + { + $this->charset = $charset; + } + + /** + * @param string $gtid + */ + public function withGtid($gtid) + { + $this->gtid = $gtid; + } + + /** + * @param int $slaveId + */ + public function withSlaveId($slaveId) + { + $this->slaveId = $slaveId; + } + + /** + * @param string $binLogFileName + */ + public function withBinLogFileName($binLogFileName) + { + $this->binLogFileName = $binLogFileName; + } + + /** + * @param int $binLogPosition + */ + public function withBinLogPosition($binLogPosition) + { + $this->binLogPosition = $binLogPosition; + } + + /** + * @param array $eventsOnly + */ + public function withEventsOnly(array $eventsOnly) + { + $this->eventsOnly = $eventsOnly; + } + + /** + * @param array $eventsIgnore + */ + public function withEventsIgnore(array $eventsIgnore) + { + $this->eventsIgnore = $eventsIgnore; + } + + /** + * @param array $tablesOnly + */ + public function withTablesOnly(array $tablesOnly) + { + $this->tablesOnly = $tablesOnly; + } + + /** + * @param array $databasesOnly + */ + public function withDatabasesOnly(array $databasesOnly) + { + $this->databasesOnly = $databasesOnly; + } + +} \ No newline at end of file diff --git a/src/MySQLReplication/Config/ConfigService.php b/src/MySQLReplication/Config/ConfigService.php new file mode 100755 index 0000000..321b1ad --- /dev/null +++ b/src/MySQLReplication/Config/ConfigService.php @@ -0,0 +1,80 @@ + $v) + { + if ('user' === $k) + { + $configBuilder->withUser($v); + } + if ('host' === $k) + { + $configBuilder->withHost($v); + } + if ('port' === $k) + { + $configBuilder->withPort($v); + } + if ('password' === $k) + { + $configBuilder->withPassword($v); + } + if ('dbName' === $k) + { + $configBuilder->withDbName($v); + } + if ('charset' === $k) + { + $configBuilder->withCharset($v); + } + if ('gtid' === $k) + { + $configBuilder->withGtid($v); + } + if ('slaveId' === $k) + { + $configBuilder->withSlaveId($v); + } + if ('binLogFileName' === $k) + { + $configBuilder->withBinLogFileName($v); + } + if ('binLogPosition' === $k) + { + $configBuilder->withBinLogPosition($v); + } + if ('eventsOnly' === $k) + { + $configBuilder->withEventsOnly($v); + } + if ('eventsIgnore' === $k) + { + $configBuilder->withEventsIgnore($v); + } + if ('tablesOnly' === $k) + { + $configBuilder->withTablesOnly($v); + } + if ('databasesOnly' === $k) + { + $configBuilder->withDatabasesOnly($v); + } + } + + return $configBuilder->build(); + } +} \ No newline at end of file diff --git a/src/MySQLReplication/DTO/DeleteRowsDTO.php b/src/MySQLReplication/DTO/DeleteRowsDTO.php deleted file mode 100755 index a448c09..0000000 --- a/src/MySQLReplication/DTO/DeleteRowsDTO.php +++ /dev/null @@ -1,11 +0,0 @@ -date = $date; - $this->binLogPos = $binLogPos; - $this->eventSize = $eventSize; - $this->readBytes = $readBytes; - } - - /** - * @return int - */ - public function getDate() - { - return $this->date; - } - - /** - * @return int - */ - public function getBinLogPos() - { - return $this->binLogPos; - } - - /** - * @return int - */ - public function getEventSize() - { - return $this->eventSize; - } - - /** - * @return int - */ - public function getReadBytes() - { - return $this->readBytes; - } - - /** - * @return string - */ - public function __toString() - { - return PHP_EOL . - '=== ' . get_class($this) . ' === ' . PHP_EOL . - 'Date: ' . $this->date . PHP_EOL . - 'Log position: ' . $this->binLogPos . PHP_EOL . - 'Event size: ' . $this->eventSize . PHP_EOL . - 'Read bytes: ' . $this->readBytes . PHP_EOL; - } - - /** - * Specify data which should be serialized to JSON - * @link http://php.net/manual/en/jsonserializable.jsonserialize.php - * @return mixed data which can be serialized by json_encode, - * which is a value of any type other than a resource. - * @since 5.4.0 - */ - public function jsonSerialize() - { - return get_object_vars($this); - } -} \ No newline at end of file diff --git a/src/MySQLReplication/DTO/UpdateRowsDTO.php b/src/MySQLReplication/DTO/UpdateRowsDTO.php deleted file mode 100755 index 6f50e38..0000000 --- a/src/MySQLReplication/DTO/UpdateRowsDTO.php +++ /dev/null @@ -1,11 +0,0 @@ -eventInfo = $eventInfo; + } + + /** + * @return EventInfo + */ + public function getEventInfo() + { + return $this->eventInfo; + } + + /** + * @return string + */ + abstract public function getType(); + + /** + * @return string + */ + abstract public function __toString(); + /** + * Specify data which should be serialized to JSON + * @link http://php.net/manual/en/jsonserializable.jsonserialize.php + * @return mixed data which can be serialized by json_encode, + * which is a value of any type other than a resource. + * @since 5.4.0 + */ + abstract public function jsonSerialize(); +} \ No newline at end of file diff --git a/src/MySQLReplication/DTO/GTIDLogDTO.php b/src/MySQLReplication/Event/DTO/GTIDLogDTO.php similarity index 65% rename from src/MySQLReplication/DTO/GTIDLogDTO.php rename to src/MySQLReplication/Event/DTO/GTIDLogDTO.php index c20d139..d4ba500 100755 --- a/src/MySQLReplication/DTO/GTIDLogDTO.php +++ b/src/MySQLReplication/Event/DTO/GTIDLogDTO.php @@ -1,6 +1,9 @@ commit = $commit; $this->gtid = $gtid; @@ -56,17 +53,24 @@ public function getGtid() return $this->gtid; } + /** + * @return string + */ + public function getType() + { + return ConstEventsNames::GTID; + } + /** * @return string */ public function __toString() { return PHP_EOL . - '=== ' . __CLASS__ . ' === ' . PHP_EOL . - 'Date: ' . $this->date . PHP_EOL . - 'Log position: ' . $this->binLogPos . PHP_EOL . - 'Event size: ' . $this->eventSize . PHP_EOL . - 'Read bytes: ' . $this->readBytes . PHP_EOL . + '=== Event ' . $this->getType() . ' === ' . PHP_EOL . + 'Date: ' . $this->eventInfo->getDateTime() . PHP_EOL . + 'Log position: ' . $this->eventInfo->getPos() . PHP_EOL . + 'Event size: ' . $this->eventInfo->getSize() . PHP_EOL . 'Commit: ' . var_export($this->commit, true) . PHP_EOL . 'GTID NEXT: ' . $this->gtid . PHP_EOL; } diff --git a/src/MySQLReplication/DTO/QueryDTO.php b/src/MySQLReplication/Event/DTO/QueryDTO.php similarity index 63% rename from src/MySQLReplication/DTO/QueryDTO.php rename to src/MySQLReplication/Event/DTO/QueryDTO.php index 4021a4b..341df3d 100755 --- a/src/MySQLReplication/DTO/QueryDTO.php +++ b/src/MySQLReplication/Event/DTO/QueryDTO.php @@ -1,6 +1,9 @@ executionTime = $executionTime; $this->query = $query; @@ -71,20 +68,27 @@ public function getQuery() return $this->query; } + /** + * @return string + */ + public function getType() + { + return ConstEventsNames::QUERY; + } + /** * @return string */ public function __toString() { return PHP_EOL . - '=== ' . get_class($this) . ' === ' . PHP_EOL . - 'Date: ' . $this->date . PHP_EOL . - 'Log position: ' . $this->binLogPos . PHP_EOL . - 'Event size: ' . $this->eventSize . PHP_EOL . - 'Read bytes: ' . $this->readBytes . PHP_EOL . - 'Database: ' . $this->database . PHP_EOL . - 'Execution time: ' . $this->executionTime . PHP_EOL . - 'Query: ' . $this->query . PHP_EOL; + '=== Event ' . $this->getType() . ' === ' . PHP_EOL . + 'Date: ' . $this->eventInfo->getDateTime() . PHP_EOL . + 'Log position: ' . $this->eventInfo->getPos() . PHP_EOL . + 'Event size: ' . $this->eventInfo->getSize() . PHP_EOL . + 'Database: ' . $this->database . PHP_EOL . + 'Execution time: ' . $this->executionTime . PHP_EOL . + 'Query: ' . $this->query . PHP_EOL; } /** diff --git a/src/MySQLReplication/DTO/RotateDTO.php b/src/MySQLReplication/Event/DTO/RotateDTO.php similarity index 66% rename from src/MySQLReplication/DTO/RotateDTO.php rename to src/MySQLReplication/Event/DTO/RotateDTO.php index 4fa1688..6678b7d 100755 --- a/src/MySQLReplication/DTO/RotateDTO.php +++ b/src/MySQLReplication/Event/DTO/RotateDTO.php @@ -1,6 +1,9 @@ position = $position; $this->next_binlog = $next_binlog; @@ -56,17 +53,24 @@ public function getNextBinlog() return $this->next_binlog; } + /** + * @return string + */ + public function getType() + { + return ConstEventsNames::ROTATE; + } + /** * @return string */ public function __toString() { return PHP_EOL . - '=== ' . get_class($this) . ' === ' . PHP_EOL . - 'Date: ' . $this->date . PHP_EOL . - 'Log position: ' . $this->binLogPos . PHP_EOL . - 'Event size: ' . $this->eventSize . PHP_EOL . - 'Read bytes: ' . $this->readBytes . PHP_EOL . + '=== Event ' . $this->getType() . ' === ' . PHP_EOL . + 'Date: ' . $this->eventInfo->getDateTime() . PHP_EOL . + 'Log position: ' . $this->eventInfo->getPos() . PHP_EOL . + 'Event size: ' . $this->eventInfo->getSize() . PHP_EOL . 'Binlog position: ' . $this->position . PHP_EOL . 'Binlog filename: ' . $this->next_binlog . PHP_EOL; } diff --git a/src/MySQLReplication/DTO/RowsDTO.php b/src/MySQLReplication/Event/DTO/RowsDTO.php similarity index 75% rename from src/MySQLReplication/DTO/RowsDTO.php rename to src/MySQLReplication/Event/DTO/RowsDTO.php index 19ba2d9..9038810 100755 --- a/src/MySQLReplication/DTO/RowsDTO.php +++ b/src/MySQLReplication/Event/DTO/RowsDTO.php @@ -1,6 +1,8 @@ database = $database; $this->table = $table; @@ -101,17 +97,24 @@ public function getChangedRows() return $this->changedRows; } + /** + * @return string + */ + public function getType() + { + return ''; + } + /** * @return string */ public function __toString() { return PHP_EOL . - '=== ' . get_class($this) . ' === ' . PHP_EOL . - 'Date: ' . $this->date . PHP_EOL . - 'Log position: ' . $this->binLogPos . PHP_EOL . - 'Event size: ' . $this->eventSize . PHP_EOL . - 'Read bytes: ' . $this->readBytes . PHP_EOL . + '=== Event ' . $this->getType() . ' === ' . PHP_EOL . + 'Date: ' . $this->eventInfo->getDateTime() . PHP_EOL . + 'Log position: ' . $this->eventInfo->getPos() . PHP_EOL . + 'Event size: ' . $this->eventInfo->getSize() . PHP_EOL . 'Table: ' . $this->table . PHP_EOL . 'Affected columns: ' . $this->affected . PHP_EOL . 'Changed rows: ' . $this->changedRows . PHP_EOL . diff --git a/src/MySQLReplication/DTO/TableMapDTO.php b/src/MySQLReplication/Event/DTO/TableMapDTO.php similarity index 72% rename from src/MySQLReplication/DTO/TableMapDTO.php rename to src/MySQLReplication/Event/DTO/TableMapDTO.php index 087feec..92ebbac 100755 --- a/src/MySQLReplication/DTO/TableMapDTO.php +++ b/src/MySQLReplication/Event/DTO/TableMapDTO.php @@ -1,6 +1,9 @@ tableId = $tableId; $this->database = $database; @@ -86,17 +83,24 @@ public function getColumns() return $this->columns; } + /** + * @return string + */ + public function getType() + { + return ConstEventsNames::TABLE_MAP; + } + /** * @return string */ public function __toString() { return PHP_EOL . - '=== ' . get_class($this) . ' === ' . PHP_EOL . - 'Date: ' . $this->date . PHP_EOL . - 'Log position: ' . $this->binLogPos . PHP_EOL . - 'Event size: ' . $this->eventSize . PHP_EOL . - 'Read bytes: ' . $this->readBytes . PHP_EOL . + '=== Event ' . $this->getType() . ' === ' . PHP_EOL . + 'Date: ' . $this->eventInfo->getDateTime() . PHP_EOL . + 'Log position: ' . $this->eventInfo->getPos() . PHP_EOL . + 'Event size: ' . $this->eventInfo->getSize() . PHP_EOL . 'Table: ' . $this->table . PHP_EOL . 'Database: ' . $this->database . PHP_EOL . 'Table Id: ' . $this->tableId . PHP_EOL . diff --git a/src/MySQLReplication/Event/DTO/UpdateRowsDTO.php b/src/MySQLReplication/Event/DTO/UpdateRowsDTO.php new file mode 100755 index 0000000..b0190bf --- /dev/null +++ b/src/MySQLReplication/Event/DTO/UpdateRowsDTO.php @@ -0,0 +1,20 @@ +xid = $xid; } @@ -41,17 +38,24 @@ public function getXid() return $this->xid; } + /** + * @return string + */ + public function getType() + { + return ConstEventsNames::XID; + } + /** * @return string */ public function __toString() { return PHP_EOL . - '=== ' . get_class($this) . ' === ' . PHP_EOL . - 'Date: ' . $this->date . PHP_EOL . - 'Log position: ' . $this->binLogPos . PHP_EOL . - 'Event size: ' . $this->eventSize . PHP_EOL . - 'Read bytes: ' . $this->readBytes . PHP_EOL . + '=== Event ' . $this->getType() . ' === ' . PHP_EOL . + 'Date: ' . $this->eventInfo->getDateTime() . PHP_EOL . + 'Log position: ' . $this->eventInfo->getPos() . PHP_EOL . + 'Event size: ' . $this->eventInfo->getSize() . PHP_EOL . 'Transaction ID: ' . $this->xid . PHP_EOL; } diff --git a/src/MySQLReplication/Event/Event.php b/src/MySQLReplication/Event/Event.php new file mode 100755 index 0000000..a63ab1f --- /dev/null +++ b/src/MySQLReplication/Event/Event.php @@ -0,0 +1,140 @@ +mySQLRepository = $mySQLRepository; + $this->config = $config; + $this->binLogConnect = $binLogConnect; + $this->packageService = $packageService; + $this->rowEventService = $rowEventService; + } + + /** + * @return DeleteRowsDTO|EventDTO|GTIDLogDTO|QueryDTO|RotateDTO|TableMapDTO|UpdateRowsDTO|WriteRowsDTO|XidDTO + */ + public function consume() + { + $binaryDataReader = $this->packageService->makePackageFromBinaryData($this->binLogConnect->getPacket()); + + // "ok" value on first byte continue + $binaryDataReader->advance(1); + + // decode all events data + $eventInfo = new EventInfo( + $binaryDataReader->readInt32(), + $binaryDataReader->readUInt8(), + $binaryDataReader->readInt32(), + $binaryDataReader->readInt32(), + $binaryDataReader->readInt32(), + $binaryDataReader->readUInt16(), + $this->binLogConnect->getCheckSum() + ); + + if ($eventInfo->getType() == ConstEventType::TABLE_MAP_EVENT) + { + $rowEvent = $this->rowEventService->makeRowEvent($binaryDataReader, $eventInfo); + return $rowEvent->makeTableMapDTO(); + } + + if ([] !== $this->config->getEventsOnly() && !in_array($eventInfo->getType(), $this->config->getEventsOnly())) + { + return null; + } + + if (in_array($eventInfo->getType(), $this->config->getEventsIgnore())) + { + return null; + } + + if (in_array($eventInfo->getType(), [ConstEventType::UPDATE_ROWS_EVENT_V1, ConstEventType::UPDATE_ROWS_EVENT_V2])) + { + $rowEvent = $this->rowEventService->makeRowEvent($binaryDataReader, $eventInfo); + return $rowEvent->makeUpdateRowsDTO(); + } + elseif (in_array($eventInfo->getType(), [ConstEventType::WRITE_ROWS_EVENT_V1, ConstEventType::WRITE_ROWS_EVENT_V2])) + { + $rowEvent = $this->rowEventService->makeRowEvent($binaryDataReader, $eventInfo); + return $rowEvent->makeWriteRowsDTO(); + } + elseif (in_array($eventInfo->getType(), [ConstEventType::DELETE_ROWS_EVENT_V1, ConstEventType::DELETE_ROWS_EVENT_V2])) + { + $rowEvent = $this->rowEventService->makeRowEvent($binaryDataReader, $eventInfo); + return $rowEvent->makeDeleteRowsDTO(); + } + elseif ($eventInfo->getType() == ConstEventType::XID_EVENT) + { + return (new XidEvent($eventInfo, $binaryDataReader))->makeXidDTO(); + } + elseif ($eventInfo->getType() == ConstEventType::ROTATE_EVENT) + { + return (new RotateEvent($eventInfo, $binaryDataReader))->makeRotateEventDTO(); + } + elseif ($eventInfo->getType() == ConstEventType::GTID_LOG_EVENT) + { + return (new GtidEvent($eventInfo, $binaryDataReader))->makeGTIDLogDTO(); + } + else if ($eventInfo->getType() == ConstEventType::QUERY_EVENT) + { + return (new QueryEvent($eventInfo, $binaryDataReader))->makeQueryDTO(); + } + + return null; + } +} diff --git a/src/MySQLReplication/Event/EventCommon.php b/src/MySQLReplication/Event/EventCommon.php new file mode 100755 index 0000000..d5a1787 --- /dev/null +++ b/src/MySQLReplication/Event/EventCommon.php @@ -0,0 +1,31 @@ +eventInfo = $eventInfo; + $this->binaryDataReader = $binaryDataReader; + } +} \ No newline at end of file diff --git a/src/MySQLReplication/Event/EventInfo.php b/src/MySQLReplication/Event/EventInfo.php new file mode 100755 index 0000000..bbe24e3 --- /dev/null +++ b/src/MySQLReplication/Event/EventInfo.php @@ -0,0 +1,131 @@ +timestamp = $timestamp; + $this->type = $type; + $this->id = $id; + $this->size = $size; + $this->pos = $pos; + $this->flag = $flag; + $this->checkSum = $checkSum; + } + + /** + * @return string + */ + public function getDateTime() + { + return (new \DateTime())->setTimestamp($this->timestamp)->format('c'); + } + + /** + * @return string + */ + public function getSizeNoHeader() + { + return (true === $this->checkSum ? $this->size - 23 : $this->size - 19); + } + + /** + * @return int + */ + public function getTimestamp() + { + return $this->timestamp; + } + + /** + * @return string + */ + public function getType() + { + return $this->type; + } + + /** + * @return int + */ + public function getId() + { + return $this->id; + } + + /** + * @return int + */ + public function getSize() + { + return $this->size; + } + + /** + * @return int + */ + public function getPos() + { + return $this->pos; + } + + /** + * @return string + */ + public function getFlag() + { + return $this->flag; + } +} \ No newline at end of file diff --git a/src/MySQLReplication/Event/GtidEvent.php b/src/MySQLReplication/Event/GtidEvent.php new file mode 100755 index 0000000..b2679db --- /dev/null +++ b/src/MySQLReplication/Event/GtidEvent.php @@ -0,0 +1,24 @@ +binaryDataReader->readUInt8() == 1; + $sid = unpack('H*', $this->binaryDataReader->read(16))[1]; + $gno = $this->binaryDataReader->readUInt64(); + + return new GTIDLogDTO( + $this->eventInfo, + $commit_flag, + vsprintf('%s%s%s%s%s%s%s%s-%s%s%s%s-%s%s%s%s-%s%s%s%s-%s%s%s%s%s%s%s%s%s%s%s%s', str_split($sid)) . ':' . $gno + ); + } +} \ No newline at end of file diff --git a/src/MySQLReplication/Event/QueryEvent.php b/src/MySQLReplication/Event/QueryEvent.php new file mode 100755 index 0000000..ed8143f --- /dev/null +++ b/src/MySQLReplication/Event/QueryEvent.php @@ -0,0 +1,31 @@ +binaryDataReader->advance(4); + $execution_time = $this->binaryDataReader->readUInt32(); + $schema_length = $this->binaryDataReader->readUInt8(); + $this->binaryDataReader->advance(2); + $status_vars_length = $this->binaryDataReader->readUInt16(); + $this->binaryDataReader->advance($status_vars_length); + $schema = $this->binaryDataReader->read($schema_length); + $this->binaryDataReader->advance(1); + $query = $this->binaryDataReader->read($this->eventInfo->getSize() - 36 - $status_vars_length - $schema_length - 1); + + return new QueryDTO( + $this->eventInfo, + $schema, + $execution_time, + $query + ); + } +} \ No newline at end of file diff --git a/src/MySQLReplication/Event/RotateEvent.php b/src/MySQLReplication/Event/RotateEvent.php new file mode 100755 index 0000000..76d2733 --- /dev/null +++ b/src/MySQLReplication/Event/RotateEvent.php @@ -0,0 +1,27 @@ +binaryDataReader->readUInt64(); + $binFileName = $this->binaryDataReader->read($this->eventInfo->getSizeNoHeader() - 8); + + return new RotateDTO( + $this->eventInfo, + $pos, + $binFileName + ); + } +} \ No newline at end of file diff --git a/src/MySQLReplication/Event/RowEvent/RowEvent.php b/src/MySQLReplication/Event/RowEvent/RowEvent.php new file mode 100755 index 0000000..6d5b2d7 --- /dev/null +++ b/src/MySQLReplication/Event/RowEvent/RowEvent.php @@ -0,0 +1,882 @@ +MySQLRepository = $MySQLRepository; + $this->config = $config; + } + + /** + * This evenement describe the structure of a table. + * It's send before a change append on a table. + * A end user of the lib should have no usage of this + * + * @return TableMapDTO + */ + public function makeTableMapDTO() + { + $tableId = $this->binaryDataReader->readTableId(); + $this->binaryDataReader->advance(2); + + $data = []; + $data['schema_length'] = $this->binaryDataReader->readUInt8(); + $data['schema_name'] = $this->tableMapDatabase = $this->binaryDataReader->read($data['schema_length']); + $this->binaryDataReader->advance(1); + + $data['table_length'] = $this->binaryDataReader->readUInt8(); + $data['table_name'] = $this->tableMapTableName = $this->binaryDataReader->read($data['table_length']); + $this->binaryDataReader->advance(1); + + $this->tableMapColumnsAmount = $this->binaryDataReader->readCodedBinary(); + + $column_type_def = $this->binaryDataReader->read($this->tableMapColumnsAmount); + + // automatically clear array to save memory + if (count(self::$tableMapCache) >= 200) + { + self::$tableMapCache = array_slice(self::$tableMapCache, 100, -1, true); + } + + $tableMapDTO = new TableMapDTO( + $this->eventInfo, + $tableId, + $data['schema_name'], + $data['table_name'], + $this->tableMapColumnsAmount + ); + + if (isset(self::$tableMapCache[$tableId])) + { + return $tableMapDTO; + } + + $this->binaryDataReader->readCodedBinary(); + + self::$tableMapCache[$tableId]['fields'] = []; + self::$tableMapCache[$tableId]['database'] = $data['schema_name']; + self::$tableMapCache[$tableId]['table_name'] = $data['table_name']; + + $columns = $this->MySQLRepository->getFields($data['schema_name'], $data['table_name']); + // if you drop tables and parse of logs you will get empty scheme + if (empty($columns)) + { + return $tableMapDTO; + } + + for ($i = 0; $i < strlen($column_type_def); $i++) + { + // this a dirty hack to prevent row events containing columns which have been dropped + if (!isset($columns[$i])) + { + $columns[$i] = [ + 'COLUMN_NAME' => 'DROPPED_COLUMN_' . $i, + 'COLLATION_NAME' => null, + 'CHARACTER_SET_NAME' => null, + 'COLUMN_COMMENT' => null, + 'COLUMN_TYPE' => 'BLOB', + 'COLUMN_KEY' => '', + ]; + $type = ConstFieldType::IGNORE; + } + else + { + $type = ord($column_type_def[$i]); + } + + self::$tableMapCache[$tableId]['fields'][$i] = BinLogColumns::parse($type, $columns[$i], $this->binaryDataReader); + } + + return $tableMapDTO; + } + + /** + * @return WriteRowsDTO + */ + public function makeWriteRowsDTO() + { + $this->rowInit(); + + if (false === $this->process) + { + return null; + } + + $values = $this->getValues(); + + return new WriteRowsDTO( + $this->eventInfo, + $this->tableMapDatabase, + $this->tableMapTableName, + $this->tableMapColumnsAmount, + count($values), + $values + ); + } + + /** + * + */ + private function rowInit() + { + $this->process = true; + + $tableId = $this->binaryDataReader->readTableId(); + $this->binaryDataReader->advance(2); + + if (in_array($this->eventInfo->getType(), [ + ConstEventType::DELETE_ROWS_EVENT_V2, + ConstEventType::WRITE_ROWS_EVENT_V2, + ConstEventType::UPDATE_ROWS_EVENT_V2 + ])) + { + $this->binaryDataReader->read($this->binaryDataReader->readUInt16() / 8); + } + + $this->tableMapColumnsAmount = $this->binaryDataReader->readCodedBinary(); + + + $this->tableMapFields = []; + if (isset(self::$tableMapCache[$tableId])) + { + $this->tableMapFields = self::$tableMapCache[$tableId]['fields']; + + if ([] !== $this->config->getTablesOnly() && !in_array(self::$tableMapCache[$tableId]['table_name'], $this->config->getTablesOnly())) + { + $this->process = false; + } + + if ([] !== $this->config->getDatabasesOnly() && !in_array(self::$tableMapCache[$tableId]['database'], $this->config->getDatabasesOnly())) + { + $this->process = false; + } + } + if ([] == $this->tableMapFields) + { + //remove cache can be empty (drop table) + unset(self::$tableMapCache[$tableId]); + $this->process = false; + } + } + + /** + * @param int $colsBitmap + * @return array + * @throws \Exception + */ + private function getColumnData($colsBitmap) + { + $values = []; + + $l = $this->getColumnsBinarySize($this->bitCount($colsBitmap)); + + // null bitmap length = (bits set in 'columns-present-bitmap'+7)/8 + // see http://dev.mysql.com/doc/internals/en/rows-event.html + $null_bitmap = $this->binaryDataReader->read($l); + $nullBitmapIndex = 0; + + foreach ($this->tableMapFields as $i => $column) + { + $name = $column['name']; + $unsigned = $column['unsigned']; + + if ($this->bitGet($colsBitmap, $i) == 0) + { + $values[$name] = null; + continue; + } + + if ($this->checkNull($null_bitmap, $nullBitmapIndex)) + { + $values[$name] = null; + } + elseif ($column['type'] == ConstFieldType::IGNORE) + { + $values[$name] = null; + } + elseif ($column['type'] == ConstFieldType::TINY) + { + if ($unsigned) + { + $values[$name] = $this->binaryDataReader->readUInt8(); + } + else + { + $values[$name] = $this->binaryDataReader->readInt8(); + } + } + elseif ($column['type'] == ConstFieldType::SHORT) + { + if ($unsigned) + { + $values[$name] = $this->binaryDataReader->readUInt16(); + } + else + { + $values[$name] = $this->binaryDataReader->readInt16(); + } + } + elseif ($column['type'] == ConstFieldType::LONG) + { + if ($unsigned) + { + $values[$name] = $this->binaryDataReader->readUInt32(); + } + else + { + $values[$name] = $this->binaryDataReader->readInt32(); + } + } + elseif ($column['type'] == ConstFieldType::LONGLONG) + { + if ($unsigned) + { + $values[$name] = $this->binaryDataReader->readUInt64(); + } + else + { + $values[$name] = $this->binaryDataReader->readInt64(); + } + } + elseif ($column['type'] == ConstFieldType::INT24) + { + if ($unsigned) + { + $values[$name] = $this->binaryDataReader->readUInt24(); + } + else + { + $values[$name] = $this->binaryDataReader->readInt24(); + } + } + elseif ($column['type'] == ConstFieldType::FLOAT) + { + // http://dev.mysql.com/doc/refman/5.7/en/floating-point-types.html FLOAT(7,4) + $values[$name] = round($this->binaryDataReader->readFloat(), 4); + } + elseif ($column['type'] == ConstFieldType::DOUBLE) + { + $values[$name] = $this->binaryDataReader->readDouble(); + } + elseif ($column['type'] == ConstFieldType::VARCHAR || $column['type'] == ConstFieldType::STRING) + { + if ($column['max_length'] > 255) + { + $values[$name] = $this->getString(2, $column); + } + else + { + $values[$name] = $this->getString(1, $column); + } + } + elseif ($column['type'] == ConstFieldType::NEWDECIMAL) + { + $values[$name] = $this->getDecimal($column); + } + elseif ($column['type'] == ConstFieldType::BLOB) + { + $values[$name] = $this->getString($column['length_size'], $column); + } + elseif ($column['type'] == ConstFieldType::DATETIME) + { + $values[$name] = $this->getDatetime(); + } + elseif ($column['type'] == ConstFieldType::DATETIME2) + { + $values[$name] = $this->getDatetime2($column); + } + elseif ($column['type'] == ConstFieldType::TIME2) + { + $values[$name] = $this->getTime2($column); + } + elseif ($column['type'] == ConstFieldType::TIMESTAMP2) + { + $values[$name] = $this->getTimestamp2($column); + } + elseif ($column['type'] == ConstFieldType::DATE) + { + $values[$name] = $this->getDate(); + } + elseif ($column['type'] == ConstFieldType::YEAR) + { + $values[$name] = $this->binaryDataReader->readUInt8() + 1900; + } + elseif ($column['type'] == ConstFieldType::ENUM) + { + $values[$name] = $column['enum_values'][$this->binaryDataReader->readUIntBySize($column['size']) - 1]; + } + elseif ($column['type'] == ConstFieldType::SET) + { + $values[$name] = $this->getSet($column); + } + elseif ($column['type'] == ConstFieldType::BIT) + { + $values[$name] = $this->getBit($column); + } + elseif ($column['type'] == ConstFieldType::GEOMETRY) + { + $values[$name] = $this->binaryDataReader->readLengthCodedPascalString($column['length_size']); + } + else + { + throw new BinLogException('Unknown row type: ' . $column['type']); + } + + $nullBitmapIndex += 1; + } + + return $values; + } + + /** + * @param int $columns + * @return int + */ + private function getColumnsBinarySize($columns) + { + return (int)(($columns + 7) / 8); + } + + /** + * @param string $bitmap + * @return int + */ + protected function bitCount($bitmap) + { + $n = 0; + for ($i = 0; $i < strlen($bitmap); $i++) + { + $bit = $bitmap[$i]; + if (is_string($bit)) + { + $bit = ord($bit); + } + $n += $this->bitCountInByte[$bit]; + } + + return $n; + } + + /** + * @param string $bitmap + * @param int $position + * @return int + */ + private function bitGet($bitmap, $position) + { + $bit = $bitmap[(int)($position / 8)]; + if (is_string($bit)) + { + $bit = ord($bit); + } + + return $bit & (1 << ($position & 7)); + } + + /** + * @param string $nullBitmap + * @param int $position + * @return int + */ + private function checkNull($nullBitmap, $position) + { + $bit = $nullBitmap[intval($position / 8)]; + if (is_string($bit)) + { + $bit = ord($bit); + } + + return $bit & (1 << ($position % 8)); + } + + /** + * @param int $size + * @param array $column + * @return string + */ + private function getString($size, array $column) + { + $string = $this->binaryDataReader->readLengthCodedPascalString($size); + if ($column['character_set_name']) + { + // convert strings? + } + + return $string; + } + + /** + * Read MySQL's new decimal format introduced in MySQL 5 + * @param array $column + * @return string + */ + private function getDecimal(array $column) + { + $digits_per_integer = 9; + $compressed_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4]; + $integral = $column['precision'] - $column['decimals']; + $uncomp_integral = (int)($integral / $digits_per_integer); + $uncomp_fractional = (int)($column['decimals'] / $digits_per_integer); + $comp_integral = $integral - ($uncomp_integral * $digits_per_integer); + $comp_fractional = $column['decimals'] - ($uncomp_fractional * $digits_per_integer); + + $value = $this->binaryDataReader->readUInt8(); + if (0 != ($value & 0x80)) + { + $mask = 0; + $res = ''; + } + else + { + $mask = -1; + $res = '-'; + } + $this->binaryDataReader->unread(pack('C', ($value ^ 0x80))); + + $size = $compressed_bytes[$comp_integral]; + if ($size > 0) + { + $value = $this->binaryDataReader->readIntBeBySize($size) ^ $mask; + $res .= $value; + } + + for ($i = 0; $i < $uncomp_integral; $i++) + { + $value = $this->binaryDataReader->readInt32Be() ^ $mask; + $res .= sprintf('%09d', $value); + } + + $res .= '.'; + + for ($i = 0; $i < $uncomp_fractional; $i++) + { + $value = $this->binaryDataReader->readInt32Be() ^ $mask; + $res .= sprintf('%09d', $value); + } + + $size = $compressed_bytes[$comp_fractional]; + if ($size > 0) + { + $value = $this->binaryDataReader->readIntBeBySize($size) ^ $mask; + $res .= sprintf('%0' . $comp_fractional . 'd', $value); + } + + return bcmul($res, 1, $column['precision']); + } + + /** + * @return float|null + */ + private function getDatetime() + { + $value = $this->binaryDataReader->readUInt64(); + // nasty mysql 0000-00-00 dates + if ($value == 0) + { + return null; + } + + $date = $value / 1000000; + $year = (int)($date / 10000); + $month = (int)(($date % 10000) / 100); + $day = (int)($date % 100); + if ($year == 0 || $month == 0 || $day == 0) + { + return null; + } + + return (new \DateTime())->setDate($year, $month, $day)->format('Y-m-d'); + } + + /** + * Date Time + * 1 bit sign (1= non-negative, 0= negative) + * 17 bits year*13+month (year 0-9999, month 0-12) + * 5 bits day (0-31) + * 5 bits hour (0-23) + * 6 bits minute (0-59) + * 6 bits second (0-59) + * --------------------------- + * 40 bits = 5 bytes + * @param array $column + * @return string + * @throws \Exception + */ + private function getDatetime2(array $column) + { + $data = $this->binaryDataReader->readIntBeBySize(5); + + $year_month = $this->getBinarySlice($data, 1, 17, 40); + + $year = (int)($year_month / 13); + $month = $year_month % 13; + $day = $this->getBinarySlice($data, 18, 5, 40); + $hour = $this->getBinarySlice($data, 23, 5, 40); + $minute = $this->getBinarySlice($data, 28, 6, 40); + $second = $this->getBinarySlice($data, 34, 6, 40); + + $date = new \DateTime($year . '-' . $month . '-' . $day . ' ' . $hour . ':' . $minute . ':' . $second); + if (array_sum($date->getLastErrors()) > 0) + { + return null; + } + + return $date->format('Y-m-d H:i:s') . $this->getFSP($column); + } + + /** + * Read a part of binary data and extract a number + * binary: the data + * start: From which bit (1 to X) + * size: How many bits should be read + * data_length: data size + * + * @param int $binary + * @param int $start + * @param int $size + * @param int $data_length + * @return int + */ + private function getBinarySlice($binary, $start, $size, $data_length) + { + $binary = $binary >> $data_length - ($start + $size); + $mask = ((1 << $size) - 1); + + return $binary & $mask; + } + + /** + * Read and add the fractional part of time + * For more details about new date format: + * http://dev.mysql.com/doc/internals/en/date-and-time-data-type-representation.html + * + * @param array $column + * @return int|string + * @throws \Exception + */ + private function getFSP(array $column) + { + $read = 0; + $time = ''; + if ($column['fsp'] == 1 || $column['fsp'] == 2) + { + $read = 1; + } + elseif ($column['fsp'] == 3 || $column['fsp'] == 4) + { + $read = 2; + } + elseif ($column ['fsp'] == 5 || $column['fsp'] == 6) + { + $read = 3; + } + if ($read > 0) + { + $microsecond = $this->binaryDataReader->readIntBeBySize($read); + if ($column['fsp'] % 2) + { + $time = (int)($microsecond / 10); + } + else + { + $time = $microsecond; + } + } + + return $time; + } + + /** + * TIME encoding for nonfractional part: + * 1 bit sign (1= non-negative, 0= negative) + * 1 bit unused (reserved for future extensions) + * 10 bits hour (0-838) + * 6 bits minute (0-59) + * 6 bits second (0-59) + * --------------------- + * 24 bits = 3 bytes + * + * @param array $column + * @return string + */ + private function getTime2(array $column) + { + $data = $this->binaryDataReader->readInt24Be(); + + $hour = $this->getBinarySlice($data, 2, 10, 24); + $minute = $this->getBinarySlice($data, 12, 6, 24); + $second = $this->getBinarySlice($data, 18, 6, 24); + + return (new \DateTime())->setTime($hour, $minute, $second)->format('H:i:s') . $this->getFSP($column); + } + + /** + * @param array $column + * @return bool|string + * @throws BinLogException + */ + private function getTimestamp2(array $column) + { + $time = date('Y-m-d H:i:s', $this->binaryDataReader->readInt32Be()); + $fsp = $this->getFSP($column); + if ('' !== $fsp) + { + $time .= '.' . $fsp; + } + return $time; + } + + /** + * @return string + */ + private function getDate() + { + $time = $this->binaryDataReader->readUInt24(); + if (0 == $time) + { + return null; + } + + $year = ($time & ((1 << 15) - 1) << 9) >> 9; + $month = ($time & ((1 << 4) - 1) << 5) >> 5; + $day = ($time & ((1 << 5) - 1)); + if ($year == 0 || $month == 0 || $day == 0) + { + return null; + } + + return (new \DateTime())->setDate($year, $month, $day)->format('Y-m-d'); + } + + /** + * @param array $column + * @return array + * @throws BinLogException + */ + private function getSet(array $column) + { + // we read set columns as a bitmap telling us which options are enabled + $bit_mask = $this->binaryDataReader->readUIntBySize($column['size']); + $sets = []; + foreach ($column['set_values'] as $k => $item) + { + if ($bit_mask & pow(2, $k)) + { + $sets[] = $item; + } + } + + return $sets; + } + + /** + * Read MySQL BIT type + * @param array $column + * @return string + */ + private function getBit(array $column) + { + $res = ''; + for ($byte = 0; $byte < $column['bytes']; $byte++) + { + $current_byte = ''; + $data = $this->binaryDataReader->readUInt8(); + if (0 === $byte) + { + if (1 === $column['bytes']) + { + $end = $column['bits']; + } + else + { + $end = $column['bits'] % 8; + if (0 === $end) + { + $end = 8; + } + } + } + else + { + $end = 8; + } + + for ($bit = 0; $bit < $end; $bit++) + { + if ($data & (1 << $bit)) + { + $current_byte .= '1'; + } + else + { + $current_byte .= '0'; + } + + } + $res .= strrev($current_byte); + } + + return $res; + } + + /** + * @return DeleteRowsDTO + */ + public function makeDeleteRowsDTO() + { + $this->rowInit(); + + if (false === $this->process) + { + return null; + } + + $values = $this->getValues(); + + return new DeleteRowsDTO( + $this->eventInfo, + $this->tableMapDatabase, + $this->tableMapTableName, + $this->tableMapColumnsAmount, + count($values), + $values + ); + } + + /** + * @return UpdateRowsDTO + */ + public function makeUpdateRowsDTO() + { + $this->rowInit(); + + if (false === $this->process) + { + return null; + } + + $columnsBinarySize = $this->getColumnsBinarySize($this->tableMapColumnsAmount); + $beforeBinaryData = $this->binaryDataReader->read($columnsBinarySize); + $afterBinaryData = $this->binaryDataReader->read($columnsBinarySize); + + $values = []; + while (false === $this->binaryDataReader->isComplete($this->eventInfo->getSizeNoHeader())) + { + $values[] = [ + 'before' => $this->getColumnData($beforeBinaryData), + 'after' => $this->getColumnData($afterBinaryData) + ]; + } + + return new UpdateRowsDTO( + $this->eventInfo, + $this->tableMapDatabase, + $this->tableMapTableName, + $this->tableMapColumnsAmount, + count($values), + $values + ); + } + + /** + * @return array + * @throws BinLogException + */ + private function getValues() + { + $columnsBinarySize = $this->getColumnsBinarySize($this->tableMapColumnsAmount); + $binaryData = $this->binaryDataReader->read($columnsBinarySize); + + $values = []; + while (!$this->binaryDataReader->isComplete($this->eventInfo->getSizeNoHeader())) + { + $values[] = $this->getColumnData($binaryData); + } + + return $values; + } +} diff --git a/src/MySQLReplication/Event/RowEvent/RowEventBuilder.php b/src/MySQLReplication/Event/RowEvent/RowEventBuilder.php new file mode 100755 index 0000000..2444f8f --- /dev/null +++ b/src/MySQLReplication/Event/RowEvent/RowEventBuilder.php @@ -0,0 +1,67 @@ +MySQLRepository = $MySQLRepository; + $this->config = $config; + } + + /** + * @param BinaryDataReader $package + */ + public function withPackage(BinaryDataReader $package) + { + $this->package = $package; + } + + /** + * @return RowEvent + */ + public function build() + { + return new RowEvent( + $this->config, + $this->MySQLRepository, + $this->package, + $this->eventInfo + ); + } + + /** + * @param EventInfo $eventInfo + */ + public function withEventInfo(EventInfo $eventInfo) + { + $this->eventInfo = $eventInfo; + } +} \ No newline at end of file diff --git a/src/MySQLReplication/Event/RowEvent/RowEventService.php b/src/MySQLReplication/Event/RowEvent/RowEventService.php new file mode 100755 index 0000000..bef98ba --- /dev/null +++ b/src/MySQLReplication/Event/RowEvent/RowEventService.php @@ -0,0 +1,38 @@ +rowEventBuilder = new RowEventBuilder($config, $mySQLRepository); + } + + /** + * @param BinaryDataReader $package + * @param EventInfo $eventInfo + * @return RowEvent + */ + public function makeRowEvent(BinaryDataReader $package, EventInfo $eventInfo) + { + $this->rowEventBuilder->withPackage($package); + $this->rowEventBuilder->withEventInfo($eventInfo); + + return $this->rowEventBuilder->build(); + } +} \ No newline at end of file diff --git a/src/MySQLReplication/Event/XidEvent.php b/src/MySQLReplication/Event/XidEvent.php new file mode 100755 index 0000000..a466bfc --- /dev/null +++ b/src/MySQLReplication/Event/XidEvent.php @@ -0,0 +1,23 @@ +eventInfo, + $this->binaryDataReader->readUInt64() + ); + } +} \ No newline at end of file diff --git a/src/MySQLReplication/Gtid/GtidCollection.php b/src/MySQLReplication/Gtid/GtidCollection.php new file mode 100755 index 0000000..d724bc5 --- /dev/null +++ b/src/MySQLReplication/Gtid/GtidCollection.php @@ -0,0 +1,41 @@ +toArray() as $gtid) + { + $l += $gtid->encoded_length(); + } + + return $l; + } + + /** + * @return string + */ + public function getEncodedPacket() + { + $s = pack('Q', $this->count()); + /** @var GtidEntity $gtid */ + foreach ($this->toArray() as $gtid) + { + $s .= $gtid->encode(); + } + + return $s; + } +} \ No newline at end of file diff --git a/src/MySQLReplication/Pack/Gtid.php b/src/MySQLReplication/Gtid/GtidEntity.php similarity index 72% rename from src/MySQLReplication/Pack/Gtid.php rename to src/MySQLReplication/Gtid/GtidEntity.php index 3af5f54..c9f0901 100755 --- a/src/MySQLReplication/Pack/Gtid.php +++ b/src/MySQLReplication/Gtid/GtidEntity.php @@ -1,11 +1,11 @@ gtid = $gtid; - preg_match('/^([0-9a-fA-F]{8}(?:-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12})((?::[0-9-]+)+)$/', $this->gtid, $matches); + if (false === (bool)preg_match('/^([0-9a-fA-F]{8}(?:-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12})((?::[0-9-]+)+)$/', $this->gtid, $matches)) + { + throw new GtidException(GtidException::INCORRECT_GTID_MESSAGE, GtidException::INCORRECT_GTID_CODE); + } $this->sid = $matches[1]; foreach (array_filter(explode(':', $matches[2])) as $k) @@ -68,6 +72,6 @@ public function encode() */ public function encoded_length() { - return (16 + 8 + 2 * 8 * count($this->intervals)); + return (40 * count($this->intervals)); } } \ No newline at end of file diff --git a/src/MySQLReplication/Gtid/GtidException.php b/src/MySQLReplication/Gtid/GtidException.php new file mode 100755 index 0000000..c65f3c0 --- /dev/null +++ b/src/MySQLReplication/Gtid/GtidException.php @@ -0,0 +1,9 @@ +GtidCollection = new GtidCollection(); + } + + /** + * @param string $gtids + * @return GtidCollection + */ + public function makeCollectionFromString($gtids) + { + foreach (array_filter(explode(',', $gtids)) as $gtid) + { + $this->GtidCollection->add(new GtidEntity($gtid)); + } + + return $this->GtidCollection; + } +} \ No newline at end of file diff --git a/src/MySQLReplication/Pack/GtidSet.php b/src/MySQLReplication/Pack/GtidSet.php deleted file mode 100755 index 0b845fa..0000000 --- a/src/MySQLReplication/Pack/GtidSet.php +++ /dev/null @@ -1,56 +0,0 @@ -gtids[] = new Gtid($gtid); - } - } - - /** - * @return int - */ - public function encoded_length() - { - $l = 8; - - foreach ($this->gtids as $gtid) - { - $l += $gtid->encoded_length(); - } - - return $l; - } - - /** - * @return string - */ - public function encoded() - { - $s = pack('Q', count($this->gtids)); - - foreach ($this->gtids as $gtid) - { - $s .= $gtid->encode(); - } - - return $s; - } -} \ No newline at end of file diff --git a/src/MySQLReplication/Pack/RowEvent.php b/src/MySQLReplication/Pack/RowEvent.php deleted file mode 100755 index 878a32a..0000000 --- a/src/MySQLReplication/Pack/RowEvent.php +++ /dev/null @@ -1,850 +0,0 @@ -read(2))[1]; - - $data = []; - $data['schema_length'] = unpack('C', $pack->read(1))[1]; - $data['schema_name'] = self::$SCHEMA_NAME = $pack->read($data['schema_length']); - - self::$BinLogPack->advance(1); - - $data['table_length'] = unpack('C', self::$BinLogPack->read(1))[1]; - $data['table_name'] = self::$TABLE_NAME = $pack->read($data['table_length']); - - self::$BinLogPack->advance(1); - - self::$COLUMNS_NUM = self::$BinLogPack->readCodedBinary(); - - $column_type_def = self::$BinLogPack->read(self::$COLUMNS_NUM); - - // automatyczne czyszczenie starych danych - if (count(self::$TABLE_MAP) >= 200) { - self::$TABLE_MAP = array_slice(self::$TABLE_MAP, 100, -1, true); - } - - $tableMapDTO = new TableMapDTO( - $eventInfo['date'], - $eventInfo['pos'], - $eventInfo['size'], - $size, - self::$TABLE_ID, - $data['schema_name'], - $data['table_name'], - self::$COLUMNS_NUM - ); - - if (isset(self::$TABLE_MAP[self::$TABLE_ID])) - { - return $tableMapDTO; - } - - self::$BinLogPack->readCodedBinary(); - - $columns = $DBHelper->getFields($data['schema_name'], $data['table_name']); - - self::$TABLE_MAP[self::$TABLE_ID]['fields'] = []; - self::$TABLE_MAP[self::$TABLE_ID]['database'] = $data['schema_name']; - self::$TABLE_MAP[self::$TABLE_ID]['table_name'] = $data['table_name']; - - // if you drop tables and parse of logs you will get empty scheme - if (empty($columns)) - { - return null; - } - - for ($i = 0; $i < strlen($column_type_def); $i++) - { - // this a dirty hack to prevent row events containing columns which have been dropped prior - // to pymysqlreplication start, but replayed from binlog from blowing up the service. - if (!isset($columns[$i])) - { - $columns[$i] = [ - 'COLUMN_NAME' => 'DROPPED_COLUMN_' . $i, - 'COLLATION_NAME' => null, - 'CHARACTER_SET_NAME' => null, - 'COLUMN_COMMENT' => null, - 'COLUMN_TYPE' => 'BLOB', - 'COLUMN_KEY' => '', - ]; - $type = ConstFieldType::IGNORE; - } - else - { - $type = ord($column_type_def[$i]); - } - - self::$TABLE_MAP[self::$TABLE_ID]['fields'][$i] = BinLogColumns::parse($type, $columns[$i], self::$BinLogPack); - } - - return $tableMapDTO; - } - - /** - * @param BinLogPack $pack - * @param array $eventInfo - * @param $size - * @param $onlyTables - * @param $onlyDatabases - * @return mixed - */ - public static function addRow(BinLogPack $pack, array $eventInfo, $size, $onlyTables, $onlyDatabases) - { - self::rowInit($pack, $eventInfo['type'], $size, $onlyTables, $onlyDatabases); - - if (false === self::$process) - { - return null; - } - - $values = self::_getAddRows(['bitmap' => self::$BinLogPack->read(self::getColumnsAmount(self::$COLUMNS_NUM))]); - - return new WriteRowsDTO( - $eventInfo['date'], - $eventInfo['pos'], - $eventInfo['size'], - $size, - self::$SCHEMA_NAME, - self::$TABLE_NAME, - self::$COLUMNS_NUM, - count($values), - $values - ); - } - - /** - * @param BinLogPack $pack - * @param $event_type - * @param $size - * @param array $onlyTables - * @param array $onlyDatabases - */ - private static function rowInit(BinLogPack $pack, $event_type, $size, array $onlyTables, array $onlyDatabases) - { - self::$process = true; - - parent::init($pack, $event_type, $size); - - self::$TABLE_ID = self::readTableId(); - - self::$FLAGS = self::$BinLogPack->readUIntBySize(2); - - if (in_array(self::$EVENT_TYPE, [ - ConstEventType::DELETE_ROWS_EVENT_V2, - ConstEventType::WRITE_ROWS_EVENT_V2, - ConstEventType::UPDATE_ROWS_EVENT_V2 - ])) - { - self::$EXTRA_DATA_LENGTH = self::$BinLogPack->readUIntBySize(2); - - self::$EXTRA_DATA = self::$BinLogPack->read(self::$EXTRA_DATA_LENGTH / 8); - } - - self::$COLUMNS_NUM = self::$BinLogPack->readCodedBinary(); - - - self::$fields = []; - if (self::$TABLE_MAP[self::$TABLE_ID]) - { - self::$fields = self::$TABLE_MAP[self::$TABLE_ID]['fields']; - - if (!empty($onlyTables) && !in_array(self::$TABLE_MAP[self::$TABLE_ID]['table_name'], $onlyTables)) - { - self::$process = false; - } - - if (!empty($onlyTables) && !in_array(self::$TABLE_MAP[self::$TABLE_ID]['database'], $onlyDatabases)) - { - self::$process = false; - } - } - if ([] == self::$fields) - { - //remove cache can be empty (drop table) - unset(self::$TABLE_MAP[self::$TABLE_ID]); - - self::$process = false; - } - } - - /** - * @param int $columns - * @return int - */ - private static function getColumnsAmount($columns) - { - return (int)(($columns + 7) / 8); - } - - /** - * @param array $result - * @return array - */ - private static function _getAddRows(array $result) - { - $rows = []; - while (!self::$BinLogPack->isComplete(self::$PACK_SIZE)) - { - $rows[] = self::_read_column_data($result['bitmap']); - } - - return $rows; - } - - /** - * @param $cols_bitmap - * @return array - * @throws \Exception - */ - private static function _read_column_data($cols_bitmap) - { - $values = []; - - $l = self::getColumnsAmount(self::bitCount($cols_bitmap)); - - // null bitmap length = (bits set in 'columns-present-bitmap'+7)/8 - // see http://dev.mysql.com/doc/internals/en/rows-event.html - $null_bitmap = self::$BinLogPack->read($l); - $nullBitmapIndex = 0; - - foreach (self::$fields as $i => $column) - { - $name = $column['name']; - $unsigned = $column['unsigned']; - - if (self::bitGet($cols_bitmap, $i) == 0) - { - $values[$name] = null; - continue; - } - - if (self::_is_null($null_bitmap, $nullBitmapIndex)) - { - $values[$name] = null; - } - elseif ($column['type'] == ConstFieldType::IGNORE) - { - $values[$name] = null; - } - elseif ($column['type'] == ConstFieldType::TINY) - { - if ($unsigned) - { - $values[$name] = unpack('C', self::$BinLogPack->read(1))[1]; - } - else - { - $values[$name] = unpack('c', self::$BinLogPack->read(1))[1]; - } - } - elseif ($column['type'] == ConstFieldType::SHORT) - { - if ($unsigned) - { - $values[$name] = unpack('v', self::$BinLogPack->read(2))[1]; - } - else - { - $values[$name] = unpack('s', self::$BinLogPack->read(2))[1]; - } - } - elseif ($column['type'] == ConstFieldType::LONG) - { - if ($unsigned) - { - $values[$name] = unpack('I', self::$BinLogPack->read(4))[1]; - } - else - { - $values[$name] = unpack('i', self::$BinLogPack->read(4))[1]; - } - } - elseif ($column['type'] == ConstFieldType::INT24) - { - if ($unsigned) - { - $values[$name] = self::$BinLogPack->readUInt24(); - } - else - { - $values[$name] = self::$BinLogPack->readInt24(); - } - } - elseif ($column['type'] == ConstFieldType::FLOAT) - { - // http://dev.mysql.com/doc/refman/5.7/en/floating-point-types.html FLOAT(7,4) - $values[$name] = round(unpack('f', self::$BinLogPack->read(4))[1], 4); - } - elseif ($column['type'] == ConstFieldType::DOUBLE) - { - $values[$name] = unpack('d', self::$BinLogPack->read(8))[1]; - } - elseif ($column['type'] == ConstFieldType::VARCHAR || $column['type'] == ConstFieldType::STRING) - { - if ($column['max_length'] > 255) - { - $values[$name] = self::_read_string(2, $column); - } - else - { - $values[$name] = self::_read_string(1, $column); - } - } - elseif ($column['type'] == ConstFieldType::NEWDECIMAL) - { - $values[$name] = self::__read_new_decimal($column); - } - elseif ($column['type'] == ConstFieldType::BLOB) - { - $values[$name] = self::_read_string($column['length_size'], $column); - } - elseif ($column['type'] == ConstFieldType::DATETIME) - { - $values[$name] = self::_read_datetime(); - } - elseif ($column['type'] == ConstFieldType::DATETIME2) - { - $values[$name] = self::_read_datetime2($column); - } - elseif ($column['type'] == ConstFieldType::TIME2) - { - $values[$name] = self::_read_time2($column); - } - elseif ($column['type'] == ConstFieldType::TIMESTAMP2) - { - $time = date('Y-m-d H:i:s', self::$BinLogPack->readIntBeBySize(4)); - $fsp = self::_add_fsp_to_time($column); - if ('' !== $fsp) - { - $time .= '.' . $fsp; - } - $values[$name] = $time; - } - elseif ($column['type'] == ConstFieldType::DATE) - { - $values[$name] = self::_read_date(); - } - elseif ($column['type'] == ConstFieldType::LONGLONG) - { - if ($unsigned) - { - $values[$name] = self::$BinLogPack->readUInt64(); - } - else - { - $values[$name] = self::$BinLogPack->readInt64(); - } - } - elseif ($column['type'] == ConstFieldType::YEAR) - { - $values[$name] = self::$BinLogPack->readUInt8() + 1900; - } - elseif ($column['type'] == ConstFieldType::ENUM) - { - $values[$name] = $column['enum_values'][self::$BinLogPack->readUIntBySize($column['size']) - 1]; - } - elseif ($column['type'] == ConstFieldType::SET) - { - // we read set columns as a bitmap telling us which options are enabled - $bit_mask = self::$BinLogPack->readUIntBySize($column['size']); - $sets = []; - foreach ($column['set_values'] as $k => $item) - { - if ($bit_mask & pow(2, $k)) - { - $sets[] = $item; - } - } - $values[$name] = $sets; - } - elseif ($column['type'] == ConstFieldType::BIT) - { - $values[$name] = self::_read_bit($column); - } - elseif ($column['type'] == ConstFieldType::GEOMETRY) - { - $values[$name] = self::$BinLogPack->readLengthCodedPascalString($column['length_size']); - } - else - { - throw new BinLogException('Unknown row type: ' . $column['type']); - } - - $nullBitmapIndex += 1; - } - - return $values; - } - - /** - * @param $bitmap - * @param $position - * @return int - */ - private static function bitGet($bitmap, $position) - { - $bit = $bitmap[(int)($position / 8)]; - if (is_string($bit)) - { - $bit = ord($bit); - } - - return $bit & (1 << ($position & 7)); - } - - /** - * @param $null_bitmap - * @param $position - * @return int - */ - private static function _is_null($null_bitmap, $position) - { - $bit = $null_bitmap[intval($position / 8)]; - if (is_string($bit)) - { - $bit = ord($bit); - } - - return $bit & (1 << ($position % 8)); - } - - /** - * @param int $size - * @param array $column - * @return string - */ - private static function _read_string($size, array $column) - { - $string = self::$BinLogPack->readLengthCodedPascalString($size); - if ($column['character_set_name']) - { - // convert strings? - } - - return $string; - } - - /** - * Read MySQL's new decimal format introduced in MySQL 5 - * @param $column - * @return string - */ - private static function __read_new_decimal(array $column) - { - $digits_per_integer = 9; - $compressed_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4]; - $integral = $column['precision'] - $column['decimals']; - $uncomp_integral = (int)($integral / $digits_per_integer); - $uncomp_fractional = (int)($column['decimals'] / $digits_per_integer); - $comp_integral = $integral - ($uncomp_integral * $digits_per_integer); - $comp_fractional = $column['decimals'] - ($uncomp_fractional * $digits_per_integer); - - $value = self::$BinLogPack->readUInt8(); - if (0 != ($value & 0x80)) - { - $mask = 0; - $res = ''; - } - else - { - $mask = -1; - $res = '-'; - } - self::$BinLogPack->unread(pack('C', ($value ^ 0x80))); - - $size = $compressed_bytes[$comp_integral]; - if ($size > 0) - { - $value = self::$BinLogPack->readIntBeBySize($size) ^ $mask; - $res .= $value; - } - - for ($i = 0; $i < $uncomp_integral; $i++) - { - $value = self::$BinLogPack->readIntBeBySize(4) ^ $mask; - $res .= sprintf('%09d', $value); - } - - $res .= '.'; - - for ($i = 0; $i < $uncomp_fractional; $i++) - { - $value = self::$BinLogPack->readIntBeBySize(4) ^ $mask; - $res .= sprintf('%09d', $value); - } - - $size = $compressed_bytes[$comp_fractional]; - if ($size > 0) - { - $value = self::$BinLogPack->readIntBeBySize($size) ^ $mask; - $res .= sprintf('%0' . $comp_fractional . 'd', $value); - } - - return bcmul($res, 1, $column['precision']); - } - - /** - * @return float|null - */ - private static function _read_datetime() - { - $value = self::$BinLogPack->readUInt64(); - if ($value == 0) # nasty mysql 0000-00-00 dates - { - return null; - } - - $date = $value / 1000000; - $year = (int)($date / 10000); - $month = (int)(($date % 10000) / 100); - $day = (int)($date % 100); - if ($year == 0 || $month == 0 || $day == 0) - { - return ''; - } - - $date = new \DateTime(); - $date->setDate($year, $month, $day); - return $date->format('Y-m-d'); - } - - /** - * Date Time - * 1 bit sign (1= non-negative, 0= negative) - * 17 bits year*13+month (year 0-9999, month 0-12) - * 5 bits day (0-31) - * 5 bits hour (0-23) - * 6 bits minute (0-59) - * 6 bits second (0-59) - * --------------------------- - * 40 bits = 5 bytes - * @param $column - * @return string - * @throws \Exception - */ - private static function _read_datetime2(array $column) - { - $data = self::$BinLogPack->readIntBeBySize(5); - - $year_month = self::_read_binary_slice($data, 1, 17, 40); - - $year = (int)($year_month / 13); - $month = $year_month % 13; - $day = self::_read_binary_slice($data, 18, 5, 40); - $hour = self::_read_binary_slice($data, 23, 5, 40); - $minute = self::_read_binary_slice($data, 28, 6, 40); - $second = self::_read_binary_slice($data, 34, 6, 40); - - $date = new \DateTime($year . '-' . $month . '-' . $day . ' ' . $hour . ':' . $minute . ':' . $second); - if (array_sum($date->getLastErrors()) > 0) - { - return null; - } - return $date->format('Y-m-d H:i:s') . self::_add_fsp_to_time($column); - } - - /** - * Read a part of binary data and extract a number - * binary: the data - * start: From which bit (1 to X) - * size: How many bits should be read - * data_length: data size - * - * @param $binary - * @param $start - * @param $size - * @param $data_length - * @return int - */ - private static function _read_binary_slice($binary, $start, $size, $data_length) - { - $binary = $binary >> $data_length - ($start + $size); - $mask = ((1 << $size) - 1); - return $binary & $mask; - } - - /** - * Read and add the fractional part of time - * For more details about new date format: - * http://dev.mysql.com/doc/internals/en/date-and-time-data-type-representation.html - * - * @param array $column - * @return int|string - * @throws \Exception - */ - private static function _add_fsp_to_time(array $column) - { - $read = 0; - $time = ''; - if ($column['fsp'] == 1 || $column['fsp'] == 2) - { - $read = 1; - } - elseif ($column['fsp'] == 3 || $column['fsp'] == 4) - { - $read = 2; - } - elseif ($column ['fsp'] == 5 || $column['fsp'] == 6) - { - $read = 3; - } - if ($read > 0) - { - $microsecond = self::$BinLogPack->readIntBeBySize($read); - if ($column['fsp'] % 2) - { - $time = (int)($microsecond / 10); - } - else - { - $time = $microsecond; - } - } - return $time; - } - - /** - * TIME encoding for nonfractional part: - * 1 bit sign (1= non-negative, 0= negative) - * 1 bit unused (reserved for future extensions) - * 10 bits hour (0-838) - * 6 bits minute (0-59) - * 6 bits second (0-59) - * --------------------- - * 24 bits = 3 bytes - * - * @param array $column - * @return string - */ - private static function _read_time2(array $column) - { - $data = self::$BinLogPack->readIntBeBySize(3); - - $hour = self::_read_binary_slice($data, 2, 10, 24); - $minute = self::_read_binary_slice($data, 12, 6, 24); - $second = self::_read_binary_slice($data, 18, 6, 24); - - $date = new \DateTime(); - $date->setTime($hour, $minute, $second); - - return $date->format('H:i:s') . self::_add_fsp_to_time($column); - } - - /** - * @return string - */ - private static function _read_date() - { - $time = self::$BinLogPack->readUInt24(); - if (0 == $time) - { - return null; - } - - $year = ($time & ((1 << 15) - 1) << 9) >> 9; - $month = ($time & ((1 << 4) - 1) << 5) >> 5; - $day = ($time & ((1 << 5) - 1)); - if ($year == 0 || $month == 0 || $day == 0) - { - return null; - } - - $date = new \DateTime(); - $date->setDate($year, $month, $day); - return $date->format('Y-m-d'); - } - - /** - * Read MySQL BIT type - * @param array $column - * @return string - */ - private static function _read_bit(array $column) - { - $res = ''; - for ($byte = 0; $byte < $column['bytes']; $byte++) - { - $current_byte = ''; - $data = self::$BinLogPack->readUInt8(); - if (0 === $byte) - { - if (1 === $column['bytes']) - { - $end = $column['bits']; - } - else - { - $end = $column['bits'] % 8; - if (0 === $end) - { - $end = 8; - } - } - } - else - { - $end = 8; - } - - for ($bit = 0; $bit < $end; $bit++) - { - if ($data & (1 << $bit)) - { - $current_byte .= '1'; - } - else - { - $current_byte .= '0'; - } - - } - $res .= strrev($current_byte); - } - - return $res; - } - - /** - * @param BinLogPack $pack - * @param $eventInfo - * @param $size - * @param array $onlyTables - * @param array $onlyDatabases - * @return mixed - */ - public static function delRow(BinLogPack $pack, $eventInfo, $size, array $onlyTables, array $onlyDatabases) - { - self::rowInit($pack, $eventInfo['type'], $size, $onlyTables, $onlyDatabases); - - if (false === self::$process) - { - return null; - } - - $values = self::_getDelRows(['bitmap' => self::$BinLogPack->read(self::getColumnsAmount(self::$COLUMNS_NUM))]); - - return new DeleteRowsDTO( - $eventInfo['date'], - $eventInfo['pos'], - $eventInfo['size'], - $size, - self::$SCHEMA_NAME, - self::$TABLE_NAME, - self::$COLUMNS_NUM, - count($values), - $values - ); - } - - /** - * @param array $result - * @return array - */ - private static function _getDelRows(array $result) - { - $rows = []; - while (!self::$BinLogPack->isComplete(self::$PACK_SIZE)) - { - $rows[] = self::_read_column_data($result['bitmap']); - } - - return $rows; - } - - /** - * @param BinLogPack $pack - * @param array $eventInfo - * @param $size - * @param array $onlyTables - * @param array $onlyDatabases - * @return mixed - */ - public static function updateRow(BinLogPack $pack, array $eventInfo, $size, array $onlyTables, array $onlyDatabases) - { - self::rowInit($pack, $eventInfo['type'], $size, $onlyTables, $onlyDatabases); - - if (false === self::$process) - { - return null; - } - - $len = self::getColumnsAmount(self::$COLUMNS_NUM); - - $values = self::_getUpdateRows(['bitmap1' => self::$BinLogPack->read($len), 'bitmap2' => self::$BinLogPack->read($len)]); - - return new UpdateRowsDTO( - $eventInfo['date'], - $eventInfo['pos'], - $eventInfo['size'], - $size, - self::$SCHEMA_NAME, - self::$TABLE_NAME, - self::$COLUMNS_NUM, - count($values), - $values - - ); - } - - /** - * @param array $result - * @return array - */ - private static function _getUpdateRows(array $result) - { - $rows = []; - while (!self::$BinLogPack->isComplete(self::$PACK_SIZE)) - { - $rows[] = [ - 'before' => self::_read_column_data($result['bitmap1']), - 'after' => self::_read_column_data($result['bitmap2']) - ]; - } - - return $rows; - } -} diff --git a/src/MySQLReplication/DataBase/DBHelper.php b/src/MySQLReplication/Repository/MySQLRepository.php similarity index 65% rename from src/MySQLReplication/DataBase/DBHelper.php rename to src/MySQLReplication/Repository/MySQLRepository.php index 4f9ce03..fa840cf 100755 --- a/src/MySQLReplication/DataBase/DBHelper.php +++ b/src/MySQLReplication/Repository/MySQLRepository.php @@ -1,32 +1,28 @@ '', - 'user' => $config->getUser(), - 'password' => $config->getPassword(), - 'host' => $config->getHost(), - 'port' => $config->getPort(), - 'driver' => 'pdo_mysql', - ]; - $this->conn = DriverManager::getConnection($config); + $this->conn = $connection; + } + + public function __destruct() + { + $this->conn->close(); } /** @@ -34,6 +30,11 @@ public function __construct(Config $config) */ public function getConnection() { + if (false === $this->conn->ping()) + { + $this->conn->close(); + $this->conn->connect(); + } return $this->conn; } @@ -59,7 +60,7 @@ public function getFields($schema, $table) AND `table_name` = ? "; - return $this->conn->fetchAll($sql, [$schema, $table]); + return $this->getConnection()->fetchAll($sql, [$schema, $table]); } /** @@ -68,7 +69,7 @@ public function getFields($schema, $table) public function isCheckSum() { $sql = "SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM'"; - $res = $this->conn->fetchAssoc($sql); + $res = $this->getConnection()->fetchAssoc($sql); if ($res['Value']) { return true; @@ -88,6 +89,6 @@ public function isCheckSum() public function getMasterStatus() { $sql = "SHOW MASTER STATUS"; - return $this->conn->fetchAssoc($sql); + return $this->getConnection()->fetchAssoc($sql); } } diff --git a/src/MySQLReplication/Service/BinLogStream.php b/src/MySQLReplication/Service/BinLogStream.php deleted file mode 100755 index eb5d4be..0000000 --- a/src/MySQLReplication/Service/BinLogStream.php +++ /dev/null @@ -1,99 +0,0 @@ -dbHelper = new DBHelper($config); - $this->connect = new Connect($config, $this->dbHelper, $gtID, $logFile, $logPos, $slave_id); - $this->binLogPack = new BinLogPack($this->dbHelper); - - $this->ignoredEvents = $ignoredEvents; - $this->onlyTables = $onlyTables; - $this->onlyDatabases = $onlyDatabases; - $this->onlyEvents = $onlyEvents; - } - - /** - * @return DeleteRowsDTO|EventDTO|GTIDLogDTO|QueryDTO|UpdateRowsDTO|WriteRowsDTO|TableMapDTO|null - */ - public function analysisBinLog() - { - $result = $this->binLogPack->init( - $this->connect->getPacket(), - $this->connect->getCheckSum(), - $this->onlyEvents, - $this->ignoredEvents, - $this->onlyTables, - $this->onlyDatabases - ); - - if (!empty($result)) - { - return $result; - } - return null; - } -} \ No newline at end of file diff --git a/tests/TypesTest.php b/tests/Integration/TypesTest.php similarity index 87% rename from tests/TypesTest.php rename to tests/Integration/TypesTest.php index e9340be..7b7ccb0 100755 --- a/tests/TypesTest.php +++ b/tests/Integration/TypesTest.php @@ -1,14 +1,13 @@ config = new Config('root', '192.168.1.100', 3306, 'root'); - $this->conn = (new DBHelper($this->config))->getConnection(); - } - - private function createAndInsertValue($create_query, $insert_query) - { - $binLogStream = new BinLogStream($this->config); + $config = (new ConfigService())->makeConfigFromArray([ + 'user' => 'root', + 'host' => '192.168.1.100', + 'password' => 'root' + ]); + $this->binLogStream = new BinLogStream($config); + $this->conn = $this->binLogStream->getDbConnection(); $this->conn->exec("SET GLOBAL time_zone = 'UTC'"); $this->conn->exec("DROP DATABASE IF EXISTS " . $this->database); $this->conn->exec("CREATE DATABASE " . $this->database); $this->conn->exec("USE " . $this->database); + } + + + /** + * @param $create_query + * @param $insert_query + * @return \MySQLReplication\Event\DTO\DeleteRowsDTO|\MySQLReplication\Event\DTO\EventDTO|\MySQLReplication\Event\DTO\GTIDLogDTO|\MySQLReplication\Event\DTO\QueryDTO|\MySQLReplication\Event\DTO\RotateDTO|\MySQLReplication\Event\DTO\TableMapDTO|\MySQLReplication\Event\DTO\UpdateRowsDTO|\MySQLReplication\Event\DTO\WriteRowsDTO|\MySQLReplication\Event\DTO\XidDTO + * @throws \Doctrine\DBAL\DBALException + */ + private function createAndInsertValue($create_query, $insert_query) + { $this->conn->exec($create_query); $this->conn->exec($insert_query); - ; + $this->assertEquals(null, $this->binLogStream->getBinLogEvent()); + $this->assertInstanceOf('MySQLReplication\Event\DTO\GTIDLogDTO', $this->binLogStream->getBinLogEvent()); + $this->assertInstanceOf('MySQLReplication\Event\DTO\QueryDTO', $this->binLogStream->getBinLogEvent()); + $this->assertInstanceOf('MySQLReplication\Event\DTO\GTIDLogDTO', $this->binLogStream->getBinLogEvent()); + $this->assertInstanceOf('MySQLReplication\Event\DTO\QueryDTO', $this->binLogStream->getBinLogEvent()); + $this->assertInstanceOf('MySQLReplication\Event\DTO\GTIDLogDTO', $this->binLogStream->getBinLogEvent()); + $this->assertInstanceOf('MySQLReplication\Event\DTO\QueryDTO', $this->binLogStream->getBinLogEvent()); + $this->assertInstanceOf('MySQLReplication\Event\DTO\GTIDLogDTO', $this->binLogStream->getBinLogEvent()); + $this->assertInstanceOf('MySQLReplication\Event\DTO\QueryDTO', $this->binLogStream->getBinLogEvent()); + $this->assertInstanceOf('MySQLReplication\Event\DTO\TableMapDTO', $this->binLogStream->getBinLogEvent()); - $this->assertEquals(null, $binLogStream->analysisBinLog()); - $this->assertInstanceOf('MySQLReplication\DTO\GTIDLogDTO', $binLogStream->analysisBinLog()); - $this->assertInstanceOf('MySQLReplication\DTO\QueryDTO', $binLogStream->analysisBinLog()); - $this->assertInstanceOf('MySQLReplication\DTO\GTIDLogDTO', $binLogStream->analysisBinLog()); - $this->assertInstanceOf('MySQLReplication\DTO\QueryDTO', $binLogStream->analysisBinLog()); - $this->assertInstanceOf('MySQLReplication\DTO\GTIDLogDTO', $binLogStream->analysisBinLog()); - $this->assertInstanceOf('MySQLReplication\DTO\QueryDTO', $binLogStream->analysisBinLog()); - $this->assertInstanceOf('MySQLReplication\DTO\GTIDLogDTO', $binLogStream->analysisBinLog()); - $this->assertInstanceOf('MySQLReplication\DTO\QueryDTO', $binLogStream->analysisBinLog()); - $this->assertInstanceOf('MySQLReplication\DTO\TableMapDTO', $binLogStream->analysisBinLog()); - return $binLogStream->analysisBinLog(); + return $this->binLogStream->getBinLogEvent(); } /** @@ -532,20 +541,20 @@ public function shouldBeVarChar() */ public function shouldBeBit() { - $create_query = "CREATE TABLE test (test BIT(6), - test2 BIT(16), - test3 BIT(12), - test4 BIT(9), - test5 BIT(64) - ); - "; + $create_query = "CREATE TABLE test ( + test BIT(6), + test2 BIT(16), + test3 BIT(12), + test4 BIT(9), + test5 BIT(64) + );"; $insert_query = "INSERT INTO test VALUES( - b'100010', - b'1000101010111000', - b'100010101101', - b'101100111', - b'1101011010110100100111100011010100010100101110111011101011011010') - "; + b'100010', + b'1000101010111000', + b'100010101101', + b'101100111', + b'1101011010110100100111100011010100010100101110111011101011011010' + )"; $event = $this->createAndInsertValue($create_query, $insert_query);