diff --git a/README.md b/README.md
index 9d93d4b..a93a6f7 100755
--- a/README.md
+++ b/README.md
@@ -35,34 +35,8 @@ Remember to change config for your user, host and password.
User should have replication privileges [ REPLICATION CLIENT, SELECT]
-```php
-analysisBinLog();
- if (!is_null($result))
- {
- // all events got __toString() implementation
- echo $result;
-
- // all events got JsonSerializable implementation
- //echo json_encode($result, JSON_PRETTY_PRINT);
-
- //echo 'Memory usage ' . round(memory_get_usage() / 1048576, 2) . ' MB' . PHP_EOL;
- }
-}
+```sh
+php example/dump_events.php
```
For this SQL sessions:
diff --git a/composer.json b/composer.json
index 6021bdb..0874abe 100755
--- a/composer.json
+++ b/composer.json
@@ -10,7 +10,8 @@
"type": "library",
"require": {
"php": ">=5.4",
- "doctrine/dbal": "^2.5"
+ "doctrine/dbal": "^2.5",
+ "doctrine/collections": "^1.3"
},
"require-dev": {
"phpunit/phpunit": "*"
diff --git a/example/benchmark.php b/example/benchmark.php
index 9326626..fe21db4 100755
--- a/example/benchmark.php
+++ b/example/benchmark.php
@@ -4,17 +4,14 @@
error_reporting(E_ALL);
ini_set('display_errors', 1);
-use MySQLReplication\Config\Config;
-use MySQLReplication\DataBase\DBHelper;
+use Doctrine\DBAL\DriverManager;
+use MySQLReplication\BinLogStream;
+use MySQLReplication\Config\ConfigService;
use MySQLReplication\Definitions\ConstEventType;
-use MySQLReplication\DTO\UpdateRowsDTO;
-use MySQLReplication\DTO\WriteRowsDTO;
-use MySQLReplication\Service\BinLogStream;
-
-
+use MySQLReplication\Event\DTO\UpdateRowsDTO;
/**
- * Class Base
+ * Class Benchmark
*/
class Benchmark
{
@@ -22,47 +19,81 @@ class Benchmark
* @var string
*/
private $database = 'mysqlreplication_test';
+
/**
- * @var \Doctrine\DBAL\Connection
- */
- private $conn;
- /**
- * @var Config
+ * Benchmark constructor.
*/
- private $config;
-
public function __construct()
{
- $this->config = new Config('root', '192.168.1.100', 3306, 'root');
- $this->conn = (new DBHelper($this->config))->getConnection();
+ $conn = $this->getConnection();
+ $conn->exec("DROP DATABASE IF EXISTS " . $this->database);
+ $conn->exec("CREATE DATABASE " . $this->database);
+ $conn->exec("USE " . $this->database);
+ $conn->exec("CREATE TABLE test (i INT) ENGINE = MEMORY");
+ $conn->exec("INSERT INTO test VALUES(1)");
+ $conn->exec("CREATE TABLE test2 (i INT) ENGINE = MEMORY");
+ $conn->exec("INSERT INTO test2 VALUES(1)");
+ $conn->exec("RESET MASTER");
+
+ $this->binLogStream = new BinLogStream(
+ (new ConfigService())->makeConfigFromArray([
+ 'user' => 'root',
+ 'host' => '192.168.1.100',
+ 'password' => 'root',
+ 'eventsOnly' => [ConstEventType::UPDATE_ROWS_EVENT_V1, ConstEventType::UPDATE_ROWS_EVENT_V2],
+ 'slaveId' => 9999
+ ])
+ );
+ }
- $this->conn->exec("DROP DATABASE IF EXISTS " . $this->database);
- $this->conn->exec("CREATE DATABASE " . $this->database);
- $this->conn->exec("USE " . $this->database);
- $this->conn->exec("CREATE TABLE test (i INT) ENGINE = MEMORY");
- $this->conn->exec("INSERT INTO test VALUES(1)");
- $this->conn->exec("CREATE TABLE test2 (i INT) ENGINE = MEMORY");
- $this->conn->exec("INSERT INTO test2 VALUES(1)");
- $this->conn->exec("RESET MASTER");
+ /**
+ * @return \Doctrine\DBAL\Connection
+ * @throws \Doctrine\DBAL\DBALException
+ */
+ private function getConnection()
+ {
+ return DriverManager::getConnection([
+ 'user' => 'root',
+ 'password' => 'root',
+ 'host' => '192.168.1.100',
+ 'port' => 3306,
+ 'driver' => 'pdo_mysql',
+ 'dbname' => $this->database
+ ]);
+ }
- $this->binLogStream = new BinLogStream(
- $this->config,
- '',
- '',
- '',
- '',
- [ConstEventType::UPDATE_ROWS_EVENT_V1, ConstEventType::UPDATE_ROWS_EVENT_V2]
- );
+ /**
+ *
+ */
+ public function run()
+ {
+ $pid = pcntl_fork();
+ if ($pid == -1)
+ {
+ die('could not fork');
+ }
+ else if ($pid)
+ {
+ $this->consume();
+ pcntl_wait($status);
+ }
+ else
+ {
+ $this->produce();
+ }
}
- public function consume()
+ /**
+ *
+ */
+ private function consume()
{
$start = microtime(true);
$i = 0;
while (1)
{
- $result = $this->binLogStream->analysisBinLog();
+ $result = $this->binLogStream->getBinLogEvent();
if ($result instanceof UpdateRowsDTO)
{
$i += 1;
@@ -74,34 +105,22 @@ public function consume()
}
}
- public function run()
- {
- $pid = pcntl_fork();
- if ($pid == -1) {
- die('could not fork');
- } else if ($pid) {
- $this->consume();
- pcntl_wait($status);
- } else {
- $this->produce();
- }
-
- }
-
- public function produce()
+ /**
+ * @throws \Doctrine\DBAL\DBALException
+ */
+ private function produce()
{
- $this->conn = (new DBHelper($this->config))->getConnection();
- $this->conn->exec("USE " . $this->database);
+ $conn = $this->getConnection();
echo 'Start insert data' . PHP_EOL;
while (1)
{
-
- $this->conn->exec("UPDATE test SET i = i + 1;");
- $this->conn->exec("UPDATE test2 SET i = i + 1;");
+ $conn->exec("UPDATE test SET i = i + 1;");
+ $conn->exec("UPDATE test2 SET i = i + 1;");
}
- }
+ $conn->close();
+ }
}
(new Benchmark())->run();
\ No newline at end of file
diff --git a/example/dump_events.php b/example/dump_events.php
index 625e691..46b77cd 100755
--- a/example/dump_events.php
+++ b/example/dump_events.php
@@ -5,15 +5,20 @@
include __DIR__ . '/../vendor/autoload.php';
-use MySQLReplication\Service\BinLogStream;
-use MySQLReplication\Config\Config;
+use MySQLReplication\BinLogStream;
+use MySQLReplication\Config\ConfigService;
$binLogStream = new BinLogStream(
- new Config('root', '192.168.1.100', 3306, 'root')
+ (new ConfigService())->makeConfigFromArray([
+ 'user' => 'root',
+ 'host' => '192.168.1.100',
+ 'password' => 'root',
+ //'gtid' => '9b1c8d18-2a76-11e5-a26b-000c2976f3f3:1-177592',
+ ])
);
while (1)
{
- $result = $binLogStream->analysisBinLog();
+ $result = $binLogStream->getBinLogEvent();
if (!is_null($result))
{
// all events got __toString() implementation
diff --git a/src/MySQLReplication/Pack/PackAuth.php b/src/MySQLReplication/BinLog/BinLogAuth.php
similarity index 56%
rename from src/MySQLReplication/Pack/PackAuth.php
rename to src/MySQLReplication/BinLog/BinLogAuth.php
index ba1634f..0a0227a 100755
--- a/src/MySQLReplication/Pack/PackAuth.php
+++ b/src/MySQLReplication/BinLog/BinLogAuth.php
@@ -1,28 +1,44 @@
packageMaxLength);
$data .= chr(33);
for ($i = 0; $i < 23; $i++)
{
@@ -31,10 +47,6 @@ public static function initPack($flag, $user, $pass, $salt, $db = '')
$result = sha1($pass, true) ^ sha1($salt . sha1(sha1($pass, true), true), true);
$data = $data . $user . chr(0) . chr(strlen($result)) . $result;
- if ($db)
- {
- $data .= $db . chr(0);
- }
$str = pack('L', strlen($data));
$s = $str[0] . $str[1] . $str[2];
$data = $s . chr(1) . $data;
@@ -43,25 +55,26 @@ public static function initPack($flag, $user, $pass, $salt, $db = '')
}
/**
- * @param $pack
+ * @param string $packet
* @return array
* @throws BinLogException
*/
- public static function success($pack)
+ public function isWriteSuccessful($packet)
{
- $head = ord($pack[0]);
- if (in_array($head, ConstAuth::$OK_PACK_HEAD))
+ $head = ord($packet[0]);
+ if (in_array($head, $this->packageOkHeader))
{
return ['status' => true, 'code' => 0, 'msg' => ''];
}
else
{
- $error_code = unpack('v', $pack[1] . $pack[2])[1];
+ $error_code = unpack('v', $packet[1] . $packet[2])[1];
$error_msg = '';
- for ($i = 9; $i < strlen($pack); $i++)
+ for ($i = 9; $i < strlen($packet); $i++)
{
- $error_msg .= $pack[$i];
+ $error_msg .= $packet[$i];
}
+
throw new BinLogException($error_msg, $error_code);
}
}
diff --git a/src/MySQLReplication/BinLog/BinLogColumns.php b/src/MySQLReplication/BinLog/BinLogColumns.php
index 6104787..6ad08de 100755
--- a/src/MySQLReplication/BinLog/BinLogColumns.php
+++ b/src/MySQLReplication/BinLog/BinLogColumns.php
@@ -3,84 +3,87 @@
namespace MySQLReplication\BinLog;
use MySQLReplication\Definitions\ConstFieldType;
+use MySQLReplication\Exception\BinLogException;
+use MySQLReplication\BinaryDataReader\BinaryDataReader;
/**
* Class BinLogColumns
+ * @package MySQLReplication\BinLog
*/
class BinLogColumns
{
/**
- * @var []
+ * @var array
*/
private static $field;
/**
- * @param string $column_type
- * @param string $column_schema
- * @param BinLogPack $packet
+ * @param string $columnType
+ * @param array $columnSchema
+ * @param BinaryDataReader $binaryDataReader
* @return array
*/
- public static function parse($column_type, $column_schema, BinLogPack $packet)
+ public static function parse($columnType, array $columnSchema, BinaryDataReader $binaryDataReader)
{
self::$field = [];
- self::$field['type'] = $column_type;
- self::$field['name'] = $column_schema['COLUMN_NAME'];
- self::$field['collation_name'] = $column_schema['COLLATION_NAME'];
- self::$field['character_set_name'] = $column_schema['CHARACTER_SET_NAME'];
- self::$field['comment'] = $column_schema['COLUMN_COMMENT'];
- self::$field['unsigned'] = stripos($column_schema['COLUMN_TYPE'], 'unsigned') === false ? false : true;
+ self::$field['type'] = $columnType;
+ self::$field['name'] = $columnSchema['COLUMN_NAME'];
+ self::$field['collation_name'] = $columnSchema['COLLATION_NAME'];
+ self::$field['character_set_name'] = $columnSchema['CHARACTER_SET_NAME'];
+ self::$field['comment'] = $columnSchema['COLUMN_COMMENT'];
+ self::$field['unsigned'] = stripos($columnSchema['COLUMN_TYPE'], 'unsigned') === false ? false : true;
self::$field['type_is_bool'] = false;
- self::$field['is_primary'] = $column_schema['COLUMN_KEY'] == 'PRI';
+ self::$field['is_primary'] = $columnSchema['COLUMN_KEY'] == 'PRI';
if (self::$field['type'] == ConstFieldType::VARCHAR)
{
- self::$field['max_length'] = unpack('s', $packet->read(2))[1];
+ self::$field['max_length'] = $binaryDataReader->readInt16();
}
elseif (self::$field['type'] == ConstFieldType::DOUBLE)
{
- self::$field['size'] = $packet->readUint8();
+ self::$field['size'] = $binaryDataReader->readUint8();
}
elseif (self::$field['type'] == ConstFieldType::FLOAT)
{
- self::$field['size'] = $packet->readUint8();
+ self::$field['size'] = $binaryDataReader->readUint8();
}
elseif (self::$field['type'] == ConstFieldType::TIMESTAMP2)
{
- self::$field['fsp'] = $packet->readUint8();
+ self::$field['fsp'] = $binaryDataReader->readUint8();
}
elseif (self::$field['type'] == ConstFieldType::DATETIME2)
{
- self::$field['fsp'] = $packet->readUint8();
+ self::$field['fsp'] = $binaryDataReader->readUint8();
}
elseif (self::$field['type'] == ConstFieldType::TIME2)
{
- self::$field['fsp'] = $packet->readUint8();
+ self::$field['fsp'] = $binaryDataReader->readUint8();
}
- elseif (self::$field['type'] == ConstFieldType::TINY && $column_schema['COLUMN_TYPE'] == 'tinyint(1)')
+ elseif (self::$field['type'] == ConstFieldType::TINY && $columnSchema['COLUMN_TYPE'] == 'tinyint(1)')
{
self::$field['type_is_bool'] = true;
}
elseif (self::$field['type'] == ConstFieldType::VAR_STRING || self::$field['type'] == ConstFieldType::STRING)
{
- self::_read_string_metadata($packet, $column_schema);
+ self::getFieldSpecial($binaryDataReader, $columnSchema);
}
elseif (self::$field['type'] == ConstFieldType::BLOB)
{
- self::$field['length_size'] = $packet->readUint8();
+ self::$field['length_size'] = $binaryDataReader->readUint8();
}
elseif (self::$field['type'] == ConstFieldType::GEOMETRY)
{
- self::$field['length_size'] = $packet->readUint8();
+ self::$field['length_size'] = $binaryDataReader->readUint8();
}
elseif (self::$field['type'] == ConstFieldType::NEWDECIMAL)
{
- self::$field['precision'] = $packet->readUint8();
- self::$field['decimals'] = $packet->readUint8();
+ self::$field['precision'] = $binaryDataReader->readUint8();
+ self::$field['decimals'] = $binaryDataReader->readUint8();
}
elseif (self::$field['type'] == ConstFieldType::BIT)
{
- $bits = $packet->readUint8();
- $bytes = $packet->readUint8();
+ $bits = $binaryDataReader->readUint8();
+ $bytes = $binaryDataReader->readUint8();
self::$field['bits'] = ($bytes * 8) + $bits;
self::$field['bytes'] = (int)((self::$field['bits'] + 7) / 8);
}
@@ -89,19 +92,19 @@ public static function parse($column_type, $column_schema, BinLogPack $packet)
}
/**
- * @param BinLogPack $packet
- * @param $column_schema
+ * @param BinaryDataReader $packet
+ * @param array $columnSchema
+ * @throws BinLogException
*/
- private static function _read_string_metadata(BinLogPack $packet, $column_schema)
+ private static function getFieldSpecial(BinaryDataReader $packet, array $columnSchema)
{
-
$metadata = ($packet->readUint8() << 8) + $packet->readUint8();
$real_type = $metadata >> 8;
if ($real_type == ConstFieldType::SET || $real_type == ConstFieldType::ENUM)
{
self::$field['type'] = $real_type;
self::$field['size'] = $metadata & 0x00ff;
- self::_read_enum_metadata($column_schema);
+ self::getFieldSpecialValues($columnSchema);
}
else
{
@@ -110,24 +113,22 @@ private static function _read_string_metadata(BinLogPack $packet, $column_schema
}
/**
- * @param $column_schema
+ * @param $columnSchema
+ * @throws BinLogException
*/
- private static function _read_enum_metadata($column_schema)
+ private static function getFieldSpecialValues($columnSchema)
{
- $enums = $column_schema['COLUMN_TYPE'];
if (self::$field['type'] == ConstFieldType::ENUM)
{
- $enums = str_replace('enum(', '', $enums);
- $enums = str_replace(')', '', $enums);
- $enums = str_replace('\'', '', $enums);
- self::$field['enum_values'] = explode(',', $enums);
+ self::$field['enum_values'] = explode(',', str_replace(['enum(', ')', '\''], '', $columnSchema['COLUMN_TYPE']));
+ }
+ else if (self::$field['type'] == ConstFieldType::SET)
+ {
+ self::$field['set_values'] = explode(',', str_replace(['set(', ')', '\''], '', $columnSchema['COLUMN_TYPE']));
}
else
{
- $enums = str_replace('set(', '', $enums);
- $enums = str_replace(')', '', $enums);
- $enums = str_replace('\'', '', $enums);
- self::$field['set_values'] = explode(',', $enums);
+ throw new BinLogException('Type not handled! - ' . self::$field['type']);
}
}
}
\ No newline at end of file
diff --git a/src/MySQLReplication/BinLog/Connect.php b/src/MySQLReplication/BinLog/BinLogConnect.php
similarity index 64%
rename from src/MySQLReplication/BinLog/Connect.php
rename to src/MySQLReplication/BinLog/BinLogConnect.php
index d2fe35b..3195155 100755
--- a/src/MySQLReplication/BinLog/Connect.php
+++ b/src/MySQLReplication/BinLog/BinLogConnect.php
@@ -2,103 +2,90 @@
namespace MySQLReplication\BinLog;
use MySQLReplication\Config\Config;
-use MySQLReplication\DataBase\DBHelper;
+use MySQLReplication\Repository\MySQLRepository;
use MySQLReplication\Definitions\ConstCapabilityFlags;
use MySQLReplication\Definitions\ConstCommand;
use MySQLReplication\Exception\BinLogException;
-use MySQLReplication\Pack\GtidSet;
-use MySQLReplication\Pack\PackAuth;
-use MySQLReplication\Pack\ServerInfo;
+use MySQLReplication\Gtid\GtidCollection;
/**
* Class Connect
*/
-class Connect
+class BinLogConnect
{
/**
* @var resource
*/
private $socket;
- /**
- * @var int
- */
- private $binFilePos;
- /**
- * @var string
- */
- private $binFileName;
- /**
- * @var string
- */
- private $gtID;
- /**
- * @var int
- */
- private $slaveId = 666;
/**
* @var bool
*/
private $checkSum = false;
/**
- * @var DBHelper
+ * @var MySQLRepository
*/
private $DBHelper;
/**
* @var Config
*/
private $config;
+ /**
+ * @var BinLogAuth
+ */
+ private $packAuth;
+ /**
+ * @var GtidCollection
+ */
+ private $gtidCollection;
/**
* @param Config $config
- * @param DBHelper $DBHelper
- * @param string $gtID
- * @param string $logFile
- * @param string $logPos
- * @param string $slave_id
- * @throws BinLogException
+ * @param MySQLRepository $DBHelper
+ * @param BinLogAuth $packAuth
+ * @param GtidCollection $gtidCollection
*/
public function __construct(
Config $config,
- DBHelper $DBHelper,
- $gtID = '',
- $logFile = '',
- $logPos = '',
- $slave_id = ''
+ MySQLRepository $DBHelper,
+ BinLogAuth $packAuth,
+ GtidCollection $gtidCollection
) {
$this->DBHelper = $DBHelper;
$this->config = $config;
+ $this->packAuth = $packAuth;
+ $this->gtidCollection = $gtidCollection;
+ }
- $this->slaveId = empty($slave_id) ? $this->slaveId : $slave_id;
- $this->gtID = $gtID;
- $this->binFilePos = $logPos;
- $this->binFileName = $logFile;
-
- $this->connectToSocket();
+ public function __destruct()
+ {
+ if (true === $this->isConnected())
+ {
+ socket_shutdown($this->socket);
+ socket_close($this->socket);
+ }
}
/**
* @return bool
*/
- public function getCheckSum()
+ public function isConnected()
{
- return $this->checkSum;
+ return is_resource($this->socket);
}
- public function __destruct()
+ /**
+ * @return bool
+ */
+ public function getCheckSum()
{
- if (true === $this->isConnected())
- {
- socket_shutdown($this->socket);
- socket_close($this->socket);
- }
- $this->socket = null;
+ return $this->checkSum;
}
/**
* @throws BinLogException
* @return self
*/
- private function connectToSocket()
+ public function connectToStream()
{
if (false === ($this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP)))
{
@@ -119,7 +106,7 @@ private function connectToSocket()
private function serverInfo()
{
- ServerInfo::run($this->getPacket(false));
+ BinLogServerInfo::parsePackage($this->getPacket(false));
}
/**
@@ -139,28 +126,19 @@ public function getPacket($checkForOkByte = true)
$result = $this->readFromSocket($dataLength);
if (true === $checkForOkByte)
{
- PackAuth::success($result);
+ $this->packAuth->isWriteSuccessful($result);
}
return $result;
}
/**
- * @return bool
- */
- private function isConnected()
- {
- return is_resource($this->socket);
- }
-
- /**
- * @param $data_len
+ * @param $length
* @return string
* @throws BinLogException
*/
- private function readFromSocket($data_len)
+ private function readFromSocket($length)
{
- // server gone away
- if ($data_len == 5)
+ if ($length == 5)
{
throw new BinLogException('read 5 bytes from mysql server has gone away');
}
@@ -169,9 +147,9 @@ private function readFromSocket($data_len)
{
$bytes_read = 0;
$body = '';
- while ($bytes_read < $data_len)
+ while ($bytes_read < $length)
{
- $resp = socket_read($this->socket, $data_len - $bytes_read);
+ $resp = socket_read($this->socket, $length - $bytes_read);
if ($resp === false)
{
throw new BinLogException(socket_strerror(socket_last_error()), socket_last_error());
@@ -180,18 +158,17 @@ private function readFromSocket($data_len)
// server kill connection or server gone away
if (strlen($resp) === 0)
{
- throw new BinLogException('read less ' . ($data_len - strlen($body)));
+ throw new BinLogException('read less ' . ($length - strlen($body)));
}
$body .= $resp;
$bytes_read += strlen($resp);
}
- if (strlen($body) < $data_len)
+ if (strlen($body) < $length)
{
- throw new BinLogException('read less ' . ($data_len - strlen($body)));
+ throw new BinLogException('read less ' . ($length - strlen($body)));
}
return $body;
- }
- catch (\Exception $e)
+ } catch (\Exception $e)
{
throw new BinLogException(var_export($e, true));
}
@@ -202,7 +179,7 @@ private function readFromSocket($data_len)
*/
private function auth()
{
- $data = PackAuth::initPack(ConstCapabilityFlags::getCapabilities(), $this->config->getUser(), $this->config->getPassword(), ServerInfo::getSalt());
+ $data = $this->packAuth->createAuthenticationPacket(ConstCapabilityFlags::getCapabilities(), $this->config->getUser(), $this->config->getPassword(), BinLogServerInfo::getSalt());
$this->writeToSocket($data);
$this->getPacket();
@@ -210,7 +187,6 @@ private function auth()
/**
* @param $data
- * @return bool
* @throws BinLogException
*/
private function writeToSocket($data)
@@ -232,37 +208,37 @@ private function getBinlogStream()
$this->execute('SET @master_binlog_checksum=@@global.binlog_checksum');
}
- if ('' === $this->gtID)
+ if (0 === $this->gtidCollection->count())
{
- if ('' === $this->binFilePos || '' === $this->binFileName)
+ $binFilePos = $this->config->getBinLogPosition();
+ $binFileName = $this->config->getBinLogFileName();
+
+ if ('' === $binFilePos || '' === $binFileName)
{
$master = $this->DBHelper->getMasterStatus();
- $this->binFilePos = $master['Position'];
- $this->binFileName = $master['File'];
+ $binFilePos = $master['Position'];
+ $binFileName = $master['File'];
}
- $prelude = pack('i', strlen($this->binFileName) + 11) . chr(ConstCommand::COM_BINLOG_DUMP);
- $prelude .= pack('I', $this->binFilePos);
+ $prelude = pack('i', strlen($binFileName) + 11) . chr(ConstCommand::COM_BINLOG_DUMP);
+ $prelude .= pack('I', $binFilePos);
$prelude .= pack('v', 0);
- $prelude .= pack('I', $this->slaveId);
- $prelude .= $this->binFileName;
+ $prelude .= pack('I', $this->config->getSlaveId());
+ $prelude .= $binFileName;
}
else
{
- $gtID = new GtidSet($this->gtID);
- $encoded_data_size = $gtID->encoded_length();
-
- $prelude = pack('l', 26 + $encoded_data_size) . chr(ConstCommand::COM_BINLOG_DUMP_GTID);
+ $prelude = pack('l', 26 + $this->gtidCollection->getEncodedPacketLength()) . chr(ConstCommand::COM_BINLOG_DUMP_GTID);
$prelude .= pack('S', 0);
- $prelude .= pack('I', $this->slaveId);
+ $prelude .= pack('I', $this->config->getSlaveId());
$prelude .= pack('I', 3);
$prelude .= chr(0);
$prelude .= chr(0);
$prelude .= chr(0);
$prelude .= pack('Q', 4);
- $prelude .= pack('I', $gtID->encoded_length());
- $prelude .= $gtID->encoded();
+ $prelude .= pack('I', $this->gtidCollection->getEncodedPacketLength());
+ $prelude .= $this->gtidCollection->getEncodedPacket();
}
$this->writeToSocket($prelude);
diff --git a/src/MySQLReplication/BinLog/BinLogEvent.php b/src/MySQLReplication/BinLog/BinLogEvent.php
deleted file mode 100755
index 9f41417..0000000
--- a/src/MySQLReplication/BinLog/BinLogEvent.php
+++ /dev/null
@@ -1,119 +0,0 @@
-unpackUInt64(self::$BinLogPack->read(6) . chr(0) . chr(0));
- }
-
- /**
- * @param string $bitmap
- * @return int
- */
- protected static function bitCount($bitmap)
- {
- $n = 0;
- for ($i = 0; $i < strlen($bitmap); $i++)
- {
- $bit = $bitmap[$i];
- if (is_string($bit))
- {
- $bit = ord($bit);
- }
- $n += self::$bitCountInByte[$bit];
- }
-
- return $n;
- }
-}
\ No newline at end of file
diff --git a/src/MySQLReplication/BinLog/BinLogPack.php b/src/MySQLReplication/BinLog/BinLogPack.php
deleted file mode 100755
index 8678c14..0000000
--- a/src/MySQLReplication/BinLog/BinLogPack.php
+++ /dev/null
@@ -1,478 +0,0 @@
-DBHelper = $DBHelper;
- }
-
- /**
- * @param $pack
- * @param bool|true $checkSum
- * @param array $onlyEvents
- * @param array $ignoredEvents
- * @param array $onlyTables
- * @param array $onlyDatabases
- * @return WriteRowsDTO|UpdateRowsDTO|DeleteRowsDTO|XidDTO|EventDTO|QueryDTO|GTIDLogDTO|RotateDTO|TableMapDTO
- */
- public function init(
- $pack,
- $checkSum = true,
- array $onlyEvents = [],
- array $ignoredEvents = [],
- array $onlyTables = [],
- array $onlyDatabases = []
- ) {
- $this->buffer = $pack;
- $this->readBytes = 0;
- $this->eventInfo = [];
-
- // "ok" value on first byte
- $this->advance(1);
-
- $this->eventInfo = unpack('Vtime/Ctype/Vid/Vsize/Vpos/vflag', $this->read(19));
- $this->eventInfo['date'] = (new \DateTime())->setTimestamp($this->eventInfo['time'])->format('c');
-
- $event_size_without_header = true === $checkSum ? ($this->eventInfo['size'] - 23) : ($this->eventInfo['size'] - 19);
-
- if ($this->eventInfo['type'] == ConstEventType::TABLE_MAP_EVENT)
- {
- return RowEvent::tableMap($this, $this->DBHelper, $this->eventInfo, $event_size_without_header);
- }
-
- if (!empty($onlyEvents) && !in_array($this->eventInfo['type'], $onlyEvents))
- {
- return null;
- }
-
- if (in_array($this->eventInfo['type'], $ignoredEvents))
- {
- return null;
- }
-
- if (in_array($this->eventInfo['type'], [
- ConstEventType::UPDATE_ROWS_EVENT_V1,
- ConstEventType::UPDATE_ROWS_EVENT_V2
- ]))
- {
- return RowEvent::updateRow($this, $this->eventInfo, $event_size_without_header, $onlyTables, $onlyDatabases);
- }
- elseif (in_array($this->eventInfo['type'], [
- ConstEventType::WRITE_ROWS_EVENT_V1,
- ConstEventType::WRITE_ROWS_EVENT_V2
- ]))
- {
- return RowEvent::addRow($this, $this->eventInfo, $event_size_without_header, $onlyTables, $onlyDatabases);
- }
- elseif (in_array($this->eventInfo['type'], [
- ConstEventType::DELETE_ROWS_EVENT_V1,
- ConstEventType::DELETE_ROWS_EVENT_V2
- ]))
- {
- return RowEvent::delRow($this, $this->eventInfo, $event_size_without_header, $onlyTables, $onlyDatabases);
- }
- elseif ($this->eventInfo['type'] == ConstEventType::XID_EVENT)
- {
- return new XidDTO(
- $this->eventInfo['date'],
- $this->eventInfo['pos'],
- $this->eventInfo['size'],
- $event_size_without_header,
- $this->readUInt64()
- );
- }
- elseif ($this->eventInfo['type'] == ConstEventType::ROTATE_EVENT)
- {
- $pos = $this->readUInt64();
- $binFileName = $this->read($event_size_without_header - 8);
-
- return new RotateDTO(
- $this->eventInfo['date'],
- $this->eventInfo['pos'],
- $this->eventInfo['size'],
- $event_size_without_header,
- $pos,
- $binFileName
- );
- }
- elseif ($this->eventInfo['type'] == ConstEventType::GTID_LOG_EVENT)
- {
- //gtid event
- $commit_flag = $this->readUInt8() == 1;
- $sid = unpack('H*', $this->read(16))[1];
- $gno = $this->readUInt64();
-
- return new GTIDLogDTO(
- $this->eventInfo['date'],
- $this->eventInfo['pos'],
- $this->eventInfo['size'],
- $event_size_without_header,
- $commit_flag,
- vsprintf('%s%s%s%s%s%s%s%s-%s%s%s%s-%s%s%s%s-%s%s%s%s-%s%s%s%s%s%s%s%s%s%s%s%s', str_split($sid)) . ':' . $gno
- );
- }
- else if ($this->eventInfo['type'] == ConstEventType::QUERY_EVENT)
- {
- $this->advance(4);
- $execution_time = $this->readUInt32();
- $schema_length = $this->readUInt8();
- $this->advance(2);
- $status_vars_length = $this->readUInt16();
- $this->advance($status_vars_length);
- $schema = $this->read($schema_length);
- $this->advance(1);
- $query = $this->read($this->eventInfo['size'] - 36 - $status_vars_length - $schema_length - 1);
-
- return new QueryDTO(
- $this->eventInfo['date'],
- $this->eventInfo['pos'],
- $this->eventInfo['size'],
- $event_size_without_header,
- $schema,
- $execution_time,
- $query
- );
- }
-
- return null;
- }
-
- /**
- * @param int $length
- */
- public function advance($length)
- {
- $this->read($length);
- }
-
- /**
- * @param int $length
- * @return string
- * @throws BinLogException
- */
- public function read($length)
- {
- $length = (int)$length;
- $return = substr($this->buffer, 0, $length);
- $this->readBytes += $length;
- $this->buffer = substr($this->buffer, $length);
- return $return;
- }
-
- /**
- * @return int
- */
- public function readUInt64()
- {
- return $this->unpackUInt64($this->read(8));
- }
-
- /**
- * @param string $data
- * @return string
- */
- public function unpackUInt64($data)
- {
- $data = unpack('V*', $data);
- return bcadd($data[1], bcmul($data[2], bcpow(2, 32)));
- }
-
- /**
- * @return int
- */
- public function readUInt8()
- {
- return unpack('C', $this->read(1))[1];
- }
-
- /**
- * @return int
- */
- public function readUInt32()
- {
- return unpack('I', $this->read(4))[1];
- }
-
- /**
- * @return int
- */
- public function readUInt16()
- {
- return unpack('v', $this->read(2))[1];
- }
-
- /**
- * Push again data in data buffer. It's use when you want
- * to extract a bit from a value a let the rest of the code normally
- * read the data
- *
- * @param string $data
- */
- public function unread($data)
- {
- $this->readBytes -= strlen($data);
- $this->buffer = $data . $this->buffer;
- }
-
- /**
- * @see read a 'Length Coded Binary' number from the data buffer.
- * Length coded numbers can be anywhere from 1 to 9 bytes depending
- * on the value of the first byte.
- * From PyMYSQL source code
- *
- * @return int|string
- */
- public function readCodedBinary()
- {
- $c = ord($this->read(1));
- if ($c == ConstMy::NULL_COLUMN)
- {
- return '';
- }
- if ($c < ConstMy::UNSIGNED_CHAR_COLUMN)
- {
- return $c;
- }
- elseif ($c == ConstMy::UNSIGNED_SHORT_COLUMN)
- {
- return $this->readUInt16();
-
- }
- elseif ($c == ConstMy::UNSIGNED_INT24_COLUMN)
- {
- return $this->readUInt24();
- }
- elseif ($c == ConstMy::UNSIGNED_INT64_COLUMN)
- {
- return $this->readUInt64();
- }
- return $c;
- }
-
- /**
- * @return int
- */
- public function readUInt24()
- {
- $data = unpack('C3', $this->read(3));
- return $data[1] + ($data[2] << 8) + ($data[3] << 16);
- }
-
- /**
- * @return int
- */
- public function readInt24()
- {
- $data = unpack('C3', $this->read(3));
-
- $res = $data[1] | ($data[2] << 8) | ($data[3] << 16);
- if ($res >= 0x800000)
- {
- $res -= 0x1000000;
- }
- return $res;
- }
-
- /**
- * @return string
- */
- public function readInt64()
- {
- $data = unpack('V*', $this->read(8));
- return bcadd($data[1], ($data[2] << 32));
- }
-
- /**
- * @param int $size
- * @return string
- * @throws BinLogException
- */
- public function readLengthCodedPascalString($size)
- {
- return $this->read($this->readUIntBySize($size));
- }
-
- /**
- * Read a little endian integer values based on byte number
- *
- * @param $size
- * @return mixed
- * @throws BinLogException
- */
- public function readUIntBySize($size)
- {
- if ($size == 1)
- {
- return $this->readUInt8();
- }
- elseif ($size == 2)
- {
- return $this->readUInt16();
- }
- elseif ($size == 3)
- {
- return $this->readUInt24();
- }
- elseif ($size == 4)
- {
- return $this->readUInt32();
- }
- elseif ($size == 5)
- {
- return $this->readUInt40();
- }
- elseif ($size == 6)
- {
- return $this->readUInt48();
- }
- elseif ($size == 7)
- {
- return $this->readUInt56();
- }
- elseif ($size == 8)
- {
- return $this->readUInt64();
- }
-
- throw new BinLogException('$size ' . $size . ' not handled');
- }
-
- /**
- * @return mixed
- */
- public function readUInt40()
- {
- $data = unpack('CI', $this->read(5));
- return $data[1] + ($data[2] << 8);
- }
-
- /**
- * @return mixed
- */
- public function readUInt48()
- {
- $data = unpack('v3', $this->read(6));
- return $data[1] + ($data[2] << 16) + ($data[3] << 32);
- }
-
- /**
- * @return mixed
- */
- public function readUInt56()
- {
- $data = unpack('CSI', $this->read(7));
- return $data[1] + ($data[2] << 8) + ($data[3] << 24);
- }
-
- /**
- * Read a big endian integer values based on byte number
- *
- * @param int $size
- * @return int
- * @throws BinLogException
- */
- public function readIntBeBySize($size)
- {
- if ($size == 1)
- {
- return unpack('c', $this->read($size))[1];
- }
- elseif ($size == 2)
- {
- return unpack('n', $this->read($size))[1];
- }
- elseif ($size == 3)
- {
- return $this->readInt24Be();
- }
- elseif ($size == 4)
- {
- return unpack('i', strrev($this->read(4)))[1];
- }
- elseif ($size == 5)
- {
- return $this->readInt40Be();
- }
- elseif ($size == 8)
- {
- return unpack('l', $this->read($size))[1];
- }
-
- throw new BinLogException('$size ' . $size . ' not handled');
- }
-
- /**
- * @return int
- */
- public function readInt24Be()
- {
- $data = unpack('C3', $this->read(3));
- $res = ($data[1] << 16) | ($data[2] << 8) | $data[3];
- if ($res >= 0x800000)
- {
- $res -= 0x1000000;
- }
- return $res;
- }
-
- /**
- * @return int
- */
- public function readInt40Be()
- {
- $data1 = unpack('N', $this->read(4))[1];
- $data2 = unpack('C', $this->read(1))[1];
- return $data2 + ($data1 << 8);
- }
-
- /**
- * @param int $size
- * @return bool
- */
- public function isComplete($size)
- {
- if ($this->readBytes + 1 - 20 < $size)
- {
- return false;
- }
- return true;
- }
-}
diff --git a/src/MySQLReplication/Pack/ServerInfo.php b/src/MySQLReplication/BinLog/BinLogServerInfo.php
similarity index 69%
rename from src/MySQLReplication/Pack/ServerInfo.php
rename to src/MySQLReplication/BinLog/BinLogServerInfo.php
index d793eab..62d71db 100755
--- a/src/MySQLReplication/Pack/ServerInfo.php
+++ b/src/MySQLReplication/BinLog/BinLogServerInfo.php
@@ -1,30 +1,30 @@
connection = DriverManager::getConnection([
+ 'user' => $config->getUser(),
+ 'password' => $config->getPassword(),
+ 'host' => $config->getHost(),
+ 'port' => $config->getPort(),
+ 'driver' => 'pdo_mysql',
+ ]);
+ $this->binLogAuth = new BinLogAuth();
+ $this->MySQLRepository = new MySQLRepository($this->connection);
+ $this->GtidCollection = (new GtidService())->makeCollectionFromString($config->getGtid());
+ $this->binLogConnect = new BinLogConnect($config, $this->MySQLRepository, $this->binLogAuth, $this->GtidCollection);
+ $this->binLogConnect->connectToStream();
+ $this->packageService = new BinaryDataReaderService();
+ $this->rowEventService = new RowEventService($config, $this->MySQLRepository);
+ $this->binLogPack = new Event($config, $this->binLogConnect, $this->MySQLRepository, $this->packageService, $this->rowEventService);
+ }
+
+ /**
+ * @return \Doctrine\DBAL\Connection
+ */
+ public function getDbConnection()
+ {
+ return $this->connection;
+ }
+
+ /**
+ * @return DeleteRowsDTO|EventDTO|GTIDLogDTO|QueryDTO|\MySQLReplication\Event\DTO\RotateDTO|TableMapDTO|UpdateRowsDTO|WriteRowsDTO|\MySQLReplication\Event\DTO\XidDTO
+ * @throws \MySQLReplication\Exception\BinLogException
+ */
+ public function getBinLogEvent()
+ {
+ return $this->binLogPack->consume();
+ }
+}
\ No newline at end of file
diff --git a/src/MySQLReplication/BinaryDataReader/BinaryDataReader.php b/src/MySQLReplication/BinaryDataReader/BinaryDataReader.php
new file mode 100755
index 0000000..9852f32
--- /dev/null
+++ b/src/MySQLReplication/BinaryDataReader/BinaryDataReader.php
@@ -0,0 +1,409 @@
+binaryData = $binaryData;
+ }
+
+ /**
+ * @param int $length
+ */
+ public function advance($length)
+ {
+ $this->read($length);
+ }
+
+ /**
+ * @param int $length
+ * @return string
+ * @throws BinLogException
+ */
+ public function read($length)
+ {
+ $length = (int)$length;
+ $return = substr($this->binaryData, 0, $length);
+ $this->readBytes += $length;
+ $this->binaryData = substr($this->binaryData, $length);
+
+ return $return;
+ }
+
+ /**
+ * @return int
+ */
+ public function readInt16()
+ {
+ return unpack('s', $this->read(self::UNSIGNED_SHORT_LENGTH))[1];
+ }
+
+ /**
+ * Push again data in data buffer. It's use when you want
+ * to extract a bit from a value a let the rest of the code normally
+ * read the data
+ *
+ * @param string $data
+ */
+ public function unread($data)
+ {
+ $this->readBytes -= strlen($data);
+ $this->binaryData = $data . $this->binaryData;
+ }
+
+ /**
+ * @see read a 'Length Coded Binary' number from the data buffer.
+ * Length coded numbers can be anywhere from 1 to 9 bytes depending
+ * on the value of the first byte.
+ * From PyMYSQL source code
+ *
+ * @return int|string
+ */
+ public function readCodedBinary()
+ {
+ $c = ord($this->read(self::UNSIGNED_CHAR_LENGTH));
+ if ($c == self::NULL_COLUMN)
+ {
+ return '';
+ }
+ if ($c < self::UNSIGNED_CHAR_COLUMN)
+ {
+ return $c;
+ }
+ elseif ($c == self::UNSIGNED_SHORT_COLUMN)
+ {
+ return $this->readUInt16();
+
+ }
+ elseif ($c == self::UNSIGNED_INT24_COLUMN)
+ {
+ return $this->readUInt24();
+ }
+ elseif ($c == self::UNSIGNED_INT64_COLUMN)
+ {
+ return $this->readUInt64();
+ }
+
+ return $c;
+ }
+
+ /**
+ * @return int
+ */
+ public function readUInt16()
+ {
+ return unpack('v', $this->read(self::UNSIGNED_SHORT_LENGTH))[1];
+ }
+
+ /**
+ * @return int
+ */
+ public function readUInt24()
+ {
+ $data = unpack('C3', $this->read(self::UNSIGNED_INT24_LENGTH));
+ return $data[1] + ($data[2] << 8) + ($data[3] << 16);
+ }
+
+ /**
+ * @return int
+ */
+ public function readUInt64()
+ {
+ return $this->unpackUInt64($this->read(self::UNSIGNED_INT64_LENGTH));
+ }
+
+ /**
+ * @param string $data
+ * @return string
+ */
+ public function unpackUInt64($data)
+ {
+ $data = unpack('V*', $data);
+ return bcadd($data[1], bcmul($data[2], bcpow(2, 32)));
+ }
+
+ /**
+ * @return int
+ */
+ public function readInt24()
+ {
+ $data = unpack('C3', $this->read(self::UNSIGNED_INT24_LENGTH));
+
+ $res = $data[1] | ($data[2] << 8) | ($data[3] << 16);
+ if ($res >= 0x800000)
+ {
+ $res -= 0x1000000;
+ }
+ return $res;
+ }
+
+ /**
+ * @return string
+ */
+ public function readInt64()
+ {
+ $data = unpack('V*', $this->read(self::UNSIGNED_INT64_LENGTH));
+ return bcadd($data[1], ($data[2] << 32));
+ }
+
+ /**
+ * @param int $size
+ * @return string
+ * @throws BinLogException
+ */
+ public function readLengthCodedPascalString($size)
+ {
+ return $this->read($this->readUIntBySize($size));
+ }
+
+ /**
+ * Read a little endian integer values based on byte number
+ *
+ * @param $size
+ * @return mixed
+ * @throws BinLogException
+ */
+ public function readUIntBySize($size)
+ {
+ if ($size == self::UNSIGNED_CHAR_LENGTH)
+ {
+ return $this->readUInt8();
+ }
+ elseif ($size == self::UNSIGNED_SHORT_LENGTH)
+ {
+ return $this->readUInt16();
+ }
+ elseif ($size == self::UNSIGNED_INT24_LENGTH)
+ {
+ return $this->readUInt24();
+ }
+ elseif ($size == self::UNSIGNED_INT32_LENGTH)
+ {
+ return $this->readUInt32();
+ }
+ elseif ($size == self::UNSIGNED_INT40_LENGTH)
+ {
+ return $this->readUInt40();
+ }
+ elseif ($size == self::UNSIGNED_INT48_LENGTH)
+ {
+ return $this->readUInt48();
+ }
+ elseif ($size == self::UNSIGNED_INT56_LENGTH)
+ {
+ return $this->readUInt56();
+ }
+ elseif ($size == self::UNSIGNED_INT64_LENGTH)
+ {
+ return $this->readUInt64();
+ }
+
+ throw new BinLogException('$size ' . $size . ' not handled');
+ }
+
+ /**
+ * @return int
+ */
+ public function readUInt8()
+ {
+ return unpack('C', $this->read(self::UNSIGNED_CHAR_LENGTH))[1];
+ }
+
+ /**
+ * @return int
+ */
+ public function readUInt32()
+ {
+ return unpack('I', $this->read(self::UNSIGNED_INT32_LENGTH))[1];
+ }
+
+ /**
+ * @return mixed
+ */
+ public function readUInt40()
+ {
+ $data = unpack('CI', $this->read(self::UNSIGNED_INT40_LENGTH));
+ return $data[1] + ($data[2] << 8);
+ }
+
+ /**
+ * @return mixed
+ */
+ public function readUInt48()
+ {
+ $data = unpack('v3', $this->read(self::UNSIGNED_INT48_LENGTH));
+ return $data[1] + ($data[2] << 16) + ($data[3] << 32);
+ }
+
+ /**
+ * @return mixed
+ */
+ public function readUInt56()
+ {
+ $data = unpack('CSI', $this->read(self::UNSIGNED_INT56_LENGTH));
+ return $data[1] + ($data[2] << 8) + ($data[3] << 24);
+ }
+
+ /**
+ * Read a big endian integer values based on byte number
+ *
+ * @param int $size
+ * @return int
+ * @throws BinLogException
+ */
+ public function readIntBeBySize($size)
+ {
+ if ($size == self::UNSIGNED_CHAR_LENGTH)
+ {
+ return $this->readInt8();
+ }
+ elseif ($size == self::UNSIGNED_SHORT_LENGTH)
+ {
+ return $this->readInt16Be();
+ }
+ elseif ($size == self::UNSIGNED_INT24_LENGTH)
+ {
+ return $this->readInt24Be();
+ }
+ elseif ($size == self::UNSIGNED_INT32_LENGTH)
+ {
+ return $this->readInt32Be();
+ }
+ elseif ($size == self::UNSIGNED_INT40_LENGTH)
+ {
+ return $this->readInt40Be();
+ }
+
+ throw new BinLogException('$size ' . $size . ' not handled');
+ }
+
+ /**
+ * @return int
+ */
+ public function readInt8()
+ {
+ return unpack('c', $this->read(self::UNSIGNED_CHAR_LENGTH))[1];
+ }
+
+ /**
+ * @return mixed
+ */
+ public function readInt16Be()
+ {
+ return unpack('n', $this->read(self::UNSIGNED_SHORT_LENGTH))[1];
+ }
+
+ /**
+ * @return int
+ */
+ public function readInt24Be()
+ {
+ $data = unpack('C3', $this->read(self::UNSIGNED_INT24_LENGTH));
+ $res = ($data[1] << 16) | ($data[2] << 8) | $data[3];
+ if ($res >= 0x800000)
+ {
+ $res -= 0x1000000;
+ }
+ return $res;
+ }
+
+ /**
+ * @return int
+ */
+ public function readInt32Be()
+ {
+ return unpack('i', strrev($this->read(self::UNSIGNED_INT32_LENGTH)))[1];
+ }
+
+ /**
+ * @return int
+ */
+ public function readInt40Be()
+ {
+ $data1 = unpack('N', $this->read(self::UNSIGNED_INT32_LENGTH))[1];
+ $data2 = unpack('C', $this->read(self::UNSIGNED_CHAR_LENGTH))[1];
+ return $data2 + ($data1 << 8);
+ }
+
+ /**
+ * @return int
+ */
+ public function readInt32()
+ {
+ return unpack('i', $this->read(self::UNSIGNED_INT32_LENGTH))[1];
+ }
+
+ /**
+ * @return float
+ */
+ public function readFloat()
+ {
+ return unpack('f', $this->read(self::UNSIGNED_FLOAT_LENGTH))[1];
+ }
+
+ /**
+ * @return double
+ */
+ public function readDouble()
+ {
+ return unpack('d', $this->read(self::UNSIGNED_DOUBLE_LENGTH))[1];
+ }
+
+ /**
+ * @return string
+ */
+ public function readTableId()
+ {
+ return $this->unpackUInt64($this->read(self::UNSIGNED_INT48_LENGTH) . chr(0) . chr(0));
+ }
+
+ /**
+ * @param int $size
+ * @return bool
+ */
+ public function isComplete($size)
+ {
+ if ($this->readBytes + 1 - 20 < $size)
+ {
+ return false;
+ }
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/src/MySQLReplication/BinaryDataReader/BinaryDataReaderBuilder.php b/src/MySQLReplication/BinaryDataReader/BinaryDataReaderBuilder.php
new file mode 100755
index 0000000..007e875
--- /dev/null
+++ b/src/MySQLReplication/BinaryDataReader/BinaryDataReaderBuilder.php
@@ -0,0 +1,23 @@
+binaryData = $binaryData;
+ }
+
+ public function build()
+ {
+ return new BinaryDataReader(
+ $this->binaryData
+ );
+ }
+}
\ No newline at end of file
diff --git a/src/MySQLReplication/BinaryDataReader/BinaryDataReaderService.php b/src/MySQLReplication/BinaryDataReader/BinaryDataReaderService.php
new file mode 100755
index 0000000..c73e9b5
--- /dev/null
+++ b/src/MySQLReplication/BinaryDataReader/BinaryDataReaderService.php
@@ -0,0 +1,22 @@
+withBinaryData($binaryData);
+
+ return $packageBuilder->build();
+ }
+}
\ No newline at end of file
diff --git a/src/MySQLReplication/Config/Config.php b/src/MySQLReplication/Config/Config.php
index 45c3800..72fb632 100755
--- a/src/MySQLReplication/Config/Config.php
+++ b/src/MySQLReplication/Config/Config.php
@@ -2,9 +2,6 @@
namespace MySQLReplication\Config;
-/**
- * Class Config
- */
/**
* Class Config
* @package MySQLReplication\Config
@@ -35,23 +32,71 @@ class Config
* @var string
*/
private $charset;
+ /**
+ * @var string
+ */
+ private $gtid;
+ /**
+ * @var int
+ */
+ private $slaveId;
+ /**
+ * @var string
+ */
+ private $binLogFileName;
+ /**
+ * @var int
+ */
+ private $binLogPosition;
+ /**
+ * @var array
+ */
+ private $eventsOnly;
+ /**
+ * @var array
+ */
+ private $eventsIgnore;
+ /**
+ * @var array
+ */
+ private $tablesOnly;
+ /**
+ * @var array
+ */
+ private $databasesOnly;
/**
* Config constructor.
- * @param $user
- * @param $host
- * @param $port
- * @param $password
+ * @param string $user
+ * @param string $host
+ * @param int $port
+ * @param string $password
* @param string $dbName
* @param string $charset
+ * @param string $gtid
+ * @param int $slaveId
+ * @param string $binLogFileName
+ * @param $binLogPosition
+ * @param array $eventsOnly
+ * @param array $eventsIgnore
+ * @param array $tablesOnly
+ * @param array $databasesOnly
*/
public function __construct(
$user,
$host,
$port,
$password,
- $dbName = '',
- $charset = ''
+ $dbName,
+ $charset,
+ $gtid,
+ $slaveId,
+ $binLogFileName,
+ $binLogPosition,
+ array $eventsOnly,
+ array $eventsIgnore,
+ array $tablesOnly,
+ array $databasesOnly
) {
$this->user = $user;
$this->host = $host;
@@ -59,18 +104,26 @@ public function __construct(
$this->password = $password;
$this->dbName = $dbName;
$this->charset = $charset;
+ $this->gtid = $gtid;
+ $this->slaveId = $slaveId;
+ $this->binLogFileName = $binLogFileName;
+ $this->binLogPosition = $binLogPosition;
+ $this->eventsOnly = $eventsOnly;
+ $this->eventsIgnore = $eventsIgnore;
+ $this->tablesOnly = $tablesOnly;
+ $this->databasesOnly = $databasesOnly;
}
/**
- * @return mixed
+ * @return string
*/
- public function getCharset()
+ public function getUser()
{
- return $this->charset;
+ return $this->user;
}
/**
- * @return mixed
+ * @return string
*/
public function getHost()
{
@@ -78,7 +131,7 @@ public function getHost()
}
/**
- * @return mixed
+ * @return int
*/
public function getPort()
{
@@ -86,7 +139,7 @@ public function getPort()
}
/**
- * @return mixed
+ * @return string
*/
public function getPassword()
{
@@ -94,7 +147,7 @@ public function getPassword()
}
/**
- * @return mixed
+ * @return string
*/
public function getDbName()
{
@@ -102,10 +155,75 @@ public function getDbName()
}
/**
- * @return mixed
+ * @return string
*/
- public function getUser()
+ public function getCharset()
{
- return $this->user;
+ return $this->charset;
+ }
+
+ /**
+ * @return string
+ */
+ public function getGtid()
+ {
+ return $this->gtid;
}
+
+ /**
+ * @return int
+ */
+ public function getSlaveId()
+ {
+ return $this->slaveId;
+ }
+
+ /**
+ * @return string
+ */
+ public function getBinLogFileName()
+ {
+ return $this->binLogFileName;
+ }
+
+ /**
+ * @return int
+ */
+ public function getBinLogPosition()
+ {
+ return $this->binLogPosition;
+ }
+
+ /**
+ * @return array
+ */
+ public function getEventsOnly()
+ {
+ return $this->eventsOnly;
+ }
+
+ /**
+ * @return array
+ */
+ public function getEventsIgnore()
+ {
+ return $this->eventsIgnore;
+ }
+
+ /**
+ * @return array
+ */
+ public function getTablesOnly()
+ {
+ return $this->tablesOnly;
+ }
+
+ /**
+ * @return array
+ */
+ public function getDatabasesOnly()
+ {
+ return $this->databasesOnly;
+ }
+
}
\ No newline at end of file
diff --git a/src/MySQLReplication/Config/ConfigBuilder.php b/src/MySQLReplication/Config/ConfigBuilder.php
new file mode 100755
index 0000000..660a1d8
--- /dev/null
+++ b/src/MySQLReplication/Config/ConfigBuilder.php
@@ -0,0 +1,202 @@
+user,
+ $this->host,
+ $this->port,
+ $this->password,
+ $this->dbName,
+ $this->charset,
+ $this->gtid,
+ $this->slaveId,
+ $this->binLogFileName,
+ $this->binLogPosition,
+ $this->eventsOnly,
+ $this->eventsIgnore,
+ $this->tablesOnly,
+ $this->databasesOnly
+ );
+ }
+ /**
+ * @param string $user
+ */
+ public function withUser($user)
+ {
+ $this->user = $user;
+ }
+
+ /**
+ * @param string $host
+ */
+ public function withHost($host)
+ {
+ $this->host = $host;
+ }
+
+ /**
+ * @param int $port
+ */
+ public function withPort($port)
+ {
+ $this->port = $port;
+ }
+
+ /**
+ * @param string $password
+ */
+ public function withPassword($password)
+ {
+ $this->password = $password;
+ }
+
+ /**
+ * @param string $dbName
+ */
+ public function withDbName($dbName)
+ {
+ $this->dbName = $dbName;
+ }
+
+ /**
+ * @param string $charset
+ */
+ public function withCharset($charset)
+ {
+ $this->charset = $charset;
+ }
+
+ /**
+ * @param string $gtid
+ */
+ public function withGtid($gtid)
+ {
+ $this->gtid = $gtid;
+ }
+
+ /**
+ * @param int $slaveId
+ */
+ public function withSlaveId($slaveId)
+ {
+ $this->slaveId = $slaveId;
+ }
+
+ /**
+ * @param string $binLogFileName
+ */
+ public function withBinLogFileName($binLogFileName)
+ {
+ $this->binLogFileName = $binLogFileName;
+ }
+
+ /**
+ * @param int $binLogPosition
+ */
+ public function withBinLogPosition($binLogPosition)
+ {
+ $this->binLogPosition = $binLogPosition;
+ }
+
+ /**
+ * @param array $eventsOnly
+ */
+ public function withEventsOnly(array $eventsOnly)
+ {
+ $this->eventsOnly = $eventsOnly;
+ }
+
+ /**
+ * @param array $eventsIgnore
+ */
+ public function withEventsIgnore(array $eventsIgnore)
+ {
+ $this->eventsIgnore = $eventsIgnore;
+ }
+
+ /**
+ * @param array $tablesOnly
+ */
+ public function withTablesOnly(array $tablesOnly)
+ {
+ $this->tablesOnly = $tablesOnly;
+ }
+
+ /**
+ * @param array $databasesOnly
+ */
+ public function withDatabasesOnly(array $databasesOnly)
+ {
+ $this->databasesOnly = $databasesOnly;
+ }
+
+}
\ No newline at end of file
diff --git a/src/MySQLReplication/Config/ConfigService.php b/src/MySQLReplication/Config/ConfigService.php
new file mode 100755
index 0000000..321b1ad
--- /dev/null
+++ b/src/MySQLReplication/Config/ConfigService.php
@@ -0,0 +1,80 @@
+ $v)
+ {
+ if ('user' === $k)
+ {
+ $configBuilder->withUser($v);
+ }
+ if ('host' === $k)
+ {
+ $configBuilder->withHost($v);
+ }
+ if ('port' === $k)
+ {
+ $configBuilder->withPort($v);
+ }
+ if ('password' === $k)
+ {
+ $configBuilder->withPassword($v);
+ }
+ if ('dbName' === $k)
+ {
+ $configBuilder->withDbName($v);
+ }
+ if ('charset' === $k)
+ {
+ $configBuilder->withCharset($v);
+ }
+ if ('gtid' === $k)
+ {
+ $configBuilder->withGtid($v);
+ }
+ if ('slaveId' === $k)
+ {
+ $configBuilder->withSlaveId($v);
+ }
+ if ('binLogFileName' === $k)
+ {
+ $configBuilder->withBinLogFileName($v);
+ }
+ if ('binLogPosition' === $k)
+ {
+ $configBuilder->withBinLogPosition($v);
+ }
+ if ('eventsOnly' === $k)
+ {
+ $configBuilder->withEventsOnly($v);
+ }
+ if ('eventsIgnore' === $k)
+ {
+ $configBuilder->withEventsIgnore($v);
+ }
+ if ('tablesOnly' === $k)
+ {
+ $configBuilder->withTablesOnly($v);
+ }
+ if ('databasesOnly' === $k)
+ {
+ $configBuilder->withDatabasesOnly($v);
+ }
+ }
+
+ return $configBuilder->build();
+ }
+}
\ No newline at end of file
diff --git a/src/MySQLReplication/DTO/DeleteRowsDTO.php b/src/MySQLReplication/DTO/DeleteRowsDTO.php
deleted file mode 100755
index a448c09..0000000
--- a/src/MySQLReplication/DTO/DeleteRowsDTO.php
+++ /dev/null
@@ -1,11 +0,0 @@
-date = $date;
- $this->binLogPos = $binLogPos;
- $this->eventSize = $eventSize;
- $this->readBytes = $readBytes;
- }
-
- /**
- * @return int
- */
- public function getDate()
- {
- return $this->date;
- }
-
- /**
- * @return int
- */
- public function getBinLogPos()
- {
- return $this->binLogPos;
- }
-
- /**
- * @return int
- */
- public function getEventSize()
- {
- return $this->eventSize;
- }
-
- /**
- * @return int
- */
- public function getReadBytes()
- {
- return $this->readBytes;
- }
-
- /**
- * @return string
- */
- public function __toString()
- {
- return PHP_EOL .
- '=== ' . get_class($this) . ' === ' . PHP_EOL .
- 'Date: ' . $this->date . PHP_EOL .
- 'Log position: ' . $this->binLogPos . PHP_EOL .
- 'Event size: ' . $this->eventSize . PHP_EOL .
- 'Read bytes: ' . $this->readBytes . PHP_EOL;
- }
-
- /**
- * Specify data which should be serialized to JSON
- * @link http://php.net/manual/en/jsonserializable.jsonserialize.php
- * @return mixed data which can be serialized by json_encode,
- * which is a value of any type other than a resource.
- * @since 5.4.0
- */
- public function jsonSerialize()
- {
- return get_object_vars($this);
- }
-}
\ No newline at end of file
diff --git a/src/MySQLReplication/DTO/UpdateRowsDTO.php b/src/MySQLReplication/DTO/UpdateRowsDTO.php
deleted file mode 100755
index 6f50e38..0000000
--- a/src/MySQLReplication/DTO/UpdateRowsDTO.php
+++ /dev/null
@@ -1,11 +0,0 @@
-eventInfo = $eventInfo;
+ }
+
+ /**
+ * @return EventInfo
+ */
+ public function getEventInfo()
+ {
+ return $this->eventInfo;
+ }
+
+ /**
+ * @return string
+ */
+ abstract public function getType();
+
+ /**
+ * @return string
+ */
+ abstract public function __toString();
+ /**
+ * Specify data which should be serialized to JSON
+ * @link http://php.net/manual/en/jsonserializable.jsonserialize.php
+ * @return mixed data which can be serialized by json_encode,
+ * which is a value of any type other than a resource.
+ * @since 5.4.0
+ */
+ abstract public function jsonSerialize();
+}
\ No newline at end of file
diff --git a/src/MySQLReplication/DTO/GTIDLogDTO.php b/src/MySQLReplication/Event/DTO/GTIDLogDTO.php
similarity index 65%
rename from src/MySQLReplication/DTO/GTIDLogDTO.php
rename to src/MySQLReplication/Event/DTO/GTIDLogDTO.php
index c20d139..d4ba500 100755
--- a/src/MySQLReplication/DTO/GTIDLogDTO.php
+++ b/src/MySQLReplication/Event/DTO/GTIDLogDTO.php
@@ -1,6 +1,9 @@
commit = $commit;
$this->gtid = $gtid;
@@ -56,17 +53,24 @@ public function getGtid()
return $this->gtid;
}
+ /**
+ * @return string
+ */
+ public function getType()
+ {
+ return ConstEventsNames::GTID;
+ }
+
/**
* @return string
*/
public function __toString()
{
return PHP_EOL .
- '=== ' . __CLASS__ . ' === ' . PHP_EOL .
- 'Date: ' . $this->date . PHP_EOL .
- 'Log position: ' . $this->binLogPos . PHP_EOL .
- 'Event size: ' . $this->eventSize . PHP_EOL .
- 'Read bytes: ' . $this->readBytes . PHP_EOL .
+ '=== Event ' . $this->getType() . ' === ' . PHP_EOL .
+ 'Date: ' . $this->eventInfo->getDateTime() . PHP_EOL .
+ 'Log position: ' . $this->eventInfo->getPos() . PHP_EOL .
+ 'Event size: ' . $this->eventInfo->getSize() . PHP_EOL .
'Commit: ' . var_export($this->commit, true) . PHP_EOL .
'GTID NEXT: ' . $this->gtid . PHP_EOL;
}
diff --git a/src/MySQLReplication/DTO/QueryDTO.php b/src/MySQLReplication/Event/DTO/QueryDTO.php
similarity index 63%
rename from src/MySQLReplication/DTO/QueryDTO.php
rename to src/MySQLReplication/Event/DTO/QueryDTO.php
index 4021a4b..341df3d 100755
--- a/src/MySQLReplication/DTO/QueryDTO.php
+++ b/src/MySQLReplication/Event/DTO/QueryDTO.php
@@ -1,6 +1,9 @@
executionTime = $executionTime;
$this->query = $query;
@@ -71,20 +68,27 @@ public function getQuery()
return $this->query;
}
+ /**
+ * @return string
+ */
+ public function getType()
+ {
+ return ConstEventsNames::QUERY;
+ }
+
/**
* @return string
*/
public function __toString()
{
return PHP_EOL .
- '=== ' . get_class($this) . ' === ' . PHP_EOL .
- 'Date: ' . $this->date . PHP_EOL .
- 'Log position: ' . $this->binLogPos . PHP_EOL .
- 'Event size: ' . $this->eventSize . PHP_EOL .
- 'Read bytes: ' . $this->readBytes . PHP_EOL .
- 'Database: ' . $this->database . PHP_EOL .
- 'Execution time: ' . $this->executionTime . PHP_EOL .
- 'Query: ' . $this->query . PHP_EOL;
+ '=== Event ' . $this->getType() . ' === ' . PHP_EOL .
+ 'Date: ' . $this->eventInfo->getDateTime() . PHP_EOL .
+ 'Log position: ' . $this->eventInfo->getPos() . PHP_EOL .
+ 'Event size: ' . $this->eventInfo->getSize() . PHP_EOL .
+ 'Database: ' . $this->database . PHP_EOL .
+ 'Execution time: ' . $this->executionTime . PHP_EOL .
+ 'Query: ' . $this->query . PHP_EOL;
}
/**
diff --git a/src/MySQLReplication/DTO/RotateDTO.php b/src/MySQLReplication/Event/DTO/RotateDTO.php
similarity index 66%
rename from src/MySQLReplication/DTO/RotateDTO.php
rename to src/MySQLReplication/Event/DTO/RotateDTO.php
index 4fa1688..6678b7d 100755
--- a/src/MySQLReplication/DTO/RotateDTO.php
+++ b/src/MySQLReplication/Event/DTO/RotateDTO.php
@@ -1,6 +1,9 @@
position = $position;
$this->next_binlog = $next_binlog;
@@ -56,17 +53,24 @@ public function getNextBinlog()
return $this->next_binlog;
}
+ /**
+ * @return string
+ */
+ public function getType()
+ {
+ return ConstEventsNames::ROTATE;
+ }
+
/**
* @return string
*/
public function __toString()
{
return PHP_EOL .
- '=== ' . get_class($this) . ' === ' . PHP_EOL .
- 'Date: ' . $this->date . PHP_EOL .
- 'Log position: ' . $this->binLogPos . PHP_EOL .
- 'Event size: ' . $this->eventSize . PHP_EOL .
- 'Read bytes: ' . $this->readBytes . PHP_EOL .
+ '=== Event ' . $this->getType() . ' === ' . PHP_EOL .
+ 'Date: ' . $this->eventInfo->getDateTime() . PHP_EOL .
+ 'Log position: ' . $this->eventInfo->getPos() . PHP_EOL .
+ 'Event size: ' . $this->eventInfo->getSize() . PHP_EOL .
'Binlog position: ' . $this->position . PHP_EOL .
'Binlog filename: ' . $this->next_binlog . PHP_EOL;
}
diff --git a/src/MySQLReplication/DTO/RowsDTO.php b/src/MySQLReplication/Event/DTO/RowsDTO.php
similarity index 75%
rename from src/MySQLReplication/DTO/RowsDTO.php
rename to src/MySQLReplication/Event/DTO/RowsDTO.php
index 19ba2d9..9038810 100755
--- a/src/MySQLReplication/DTO/RowsDTO.php
+++ b/src/MySQLReplication/Event/DTO/RowsDTO.php
@@ -1,6 +1,8 @@
database = $database;
$this->table = $table;
@@ -101,17 +97,24 @@ public function getChangedRows()
return $this->changedRows;
}
+ /**
+ * @return string
+ */
+ public function getType()
+ {
+ return '';
+ }
+
/**
* @return string
*/
public function __toString()
{
return PHP_EOL .
- '=== ' . get_class($this) . ' === ' . PHP_EOL .
- 'Date: ' . $this->date . PHP_EOL .
- 'Log position: ' . $this->binLogPos . PHP_EOL .
- 'Event size: ' . $this->eventSize . PHP_EOL .
- 'Read bytes: ' . $this->readBytes . PHP_EOL .
+ '=== Event ' . $this->getType() . ' === ' . PHP_EOL .
+ 'Date: ' . $this->eventInfo->getDateTime() . PHP_EOL .
+ 'Log position: ' . $this->eventInfo->getPos() . PHP_EOL .
+ 'Event size: ' . $this->eventInfo->getSize() . PHP_EOL .
'Table: ' . $this->table . PHP_EOL .
'Affected columns: ' . $this->affected . PHP_EOL .
'Changed rows: ' . $this->changedRows . PHP_EOL .
diff --git a/src/MySQLReplication/DTO/TableMapDTO.php b/src/MySQLReplication/Event/DTO/TableMapDTO.php
similarity index 72%
rename from src/MySQLReplication/DTO/TableMapDTO.php
rename to src/MySQLReplication/Event/DTO/TableMapDTO.php
index 087feec..92ebbac 100755
--- a/src/MySQLReplication/DTO/TableMapDTO.php
+++ b/src/MySQLReplication/Event/DTO/TableMapDTO.php
@@ -1,6 +1,9 @@
tableId = $tableId;
$this->database = $database;
@@ -86,17 +83,24 @@ public function getColumns()
return $this->columns;
}
+ /**
+ * @return string
+ */
+ public function getType()
+ {
+ return ConstEventsNames::TABLE_MAP;
+ }
+
/**
* @return string
*/
public function __toString()
{
return PHP_EOL .
- '=== ' . get_class($this) . ' === ' . PHP_EOL .
- 'Date: ' . $this->date . PHP_EOL .
- 'Log position: ' . $this->binLogPos . PHP_EOL .
- 'Event size: ' . $this->eventSize . PHP_EOL .
- 'Read bytes: ' . $this->readBytes . PHP_EOL .
+ '=== Event ' . $this->getType() . ' === ' . PHP_EOL .
+ 'Date: ' . $this->eventInfo->getDateTime() . PHP_EOL .
+ 'Log position: ' . $this->eventInfo->getPos() . PHP_EOL .
+ 'Event size: ' . $this->eventInfo->getSize() . PHP_EOL .
'Table: ' . $this->table . PHP_EOL .
'Database: ' . $this->database . PHP_EOL .
'Table Id: ' . $this->tableId . PHP_EOL .
diff --git a/src/MySQLReplication/Event/DTO/UpdateRowsDTO.php b/src/MySQLReplication/Event/DTO/UpdateRowsDTO.php
new file mode 100755
index 0000000..b0190bf
--- /dev/null
+++ b/src/MySQLReplication/Event/DTO/UpdateRowsDTO.php
@@ -0,0 +1,20 @@
+xid = $xid;
}
@@ -41,17 +38,24 @@ public function getXid()
return $this->xid;
}
+ /**
+ * @return string
+ */
+ public function getType()
+ {
+ return ConstEventsNames::XID;
+ }
+
/**
* @return string
*/
public function __toString()
{
return PHP_EOL .
- '=== ' . get_class($this) . ' === ' . PHP_EOL .
- 'Date: ' . $this->date . PHP_EOL .
- 'Log position: ' . $this->binLogPos . PHP_EOL .
- 'Event size: ' . $this->eventSize . PHP_EOL .
- 'Read bytes: ' . $this->readBytes . PHP_EOL .
+ '=== Event ' . $this->getType() . ' === ' . PHP_EOL .
+ 'Date: ' . $this->eventInfo->getDateTime() . PHP_EOL .
+ 'Log position: ' . $this->eventInfo->getPos() . PHP_EOL .
+ 'Event size: ' . $this->eventInfo->getSize() . PHP_EOL .
'Transaction ID: ' . $this->xid . PHP_EOL;
}
diff --git a/src/MySQLReplication/Event/Event.php b/src/MySQLReplication/Event/Event.php
new file mode 100755
index 0000000..a63ab1f
--- /dev/null
+++ b/src/MySQLReplication/Event/Event.php
@@ -0,0 +1,140 @@
+mySQLRepository = $mySQLRepository;
+ $this->config = $config;
+ $this->binLogConnect = $binLogConnect;
+ $this->packageService = $packageService;
+ $this->rowEventService = $rowEventService;
+ }
+
+ /**
+ * @return DeleteRowsDTO|EventDTO|GTIDLogDTO|QueryDTO|RotateDTO|TableMapDTO|UpdateRowsDTO|WriteRowsDTO|XidDTO
+ */
+ public function consume()
+ {
+ $binaryDataReader = $this->packageService->makePackageFromBinaryData($this->binLogConnect->getPacket());
+
+ // "ok" value on first byte continue
+ $binaryDataReader->advance(1);
+
+ // decode all events data
+ $eventInfo = new EventInfo(
+ $binaryDataReader->readInt32(),
+ $binaryDataReader->readUInt8(),
+ $binaryDataReader->readInt32(),
+ $binaryDataReader->readInt32(),
+ $binaryDataReader->readInt32(),
+ $binaryDataReader->readUInt16(),
+ $this->binLogConnect->getCheckSum()
+ );
+
+ if ($eventInfo->getType() == ConstEventType::TABLE_MAP_EVENT)
+ {
+ $rowEvent = $this->rowEventService->makeRowEvent($binaryDataReader, $eventInfo);
+ return $rowEvent->makeTableMapDTO();
+ }
+
+ if ([] !== $this->config->getEventsOnly() && !in_array($eventInfo->getType(), $this->config->getEventsOnly()))
+ {
+ return null;
+ }
+
+ if (in_array($eventInfo->getType(), $this->config->getEventsIgnore()))
+ {
+ return null;
+ }
+
+ if (in_array($eventInfo->getType(), [ConstEventType::UPDATE_ROWS_EVENT_V1, ConstEventType::UPDATE_ROWS_EVENT_V2]))
+ {
+ $rowEvent = $this->rowEventService->makeRowEvent($binaryDataReader, $eventInfo);
+ return $rowEvent->makeUpdateRowsDTO();
+ }
+ elseif (in_array($eventInfo->getType(), [ConstEventType::WRITE_ROWS_EVENT_V1, ConstEventType::WRITE_ROWS_EVENT_V2]))
+ {
+ $rowEvent = $this->rowEventService->makeRowEvent($binaryDataReader, $eventInfo);
+ return $rowEvent->makeWriteRowsDTO();
+ }
+ elseif (in_array($eventInfo->getType(), [ConstEventType::DELETE_ROWS_EVENT_V1, ConstEventType::DELETE_ROWS_EVENT_V2]))
+ {
+ $rowEvent = $this->rowEventService->makeRowEvent($binaryDataReader, $eventInfo);
+ return $rowEvent->makeDeleteRowsDTO();
+ }
+ elseif ($eventInfo->getType() == ConstEventType::XID_EVENT)
+ {
+ return (new XidEvent($eventInfo, $binaryDataReader))->makeXidDTO();
+ }
+ elseif ($eventInfo->getType() == ConstEventType::ROTATE_EVENT)
+ {
+ return (new RotateEvent($eventInfo, $binaryDataReader))->makeRotateEventDTO();
+ }
+ elseif ($eventInfo->getType() == ConstEventType::GTID_LOG_EVENT)
+ {
+ return (new GtidEvent($eventInfo, $binaryDataReader))->makeGTIDLogDTO();
+ }
+ else if ($eventInfo->getType() == ConstEventType::QUERY_EVENT)
+ {
+ return (new QueryEvent($eventInfo, $binaryDataReader))->makeQueryDTO();
+ }
+
+ return null;
+ }
+}
diff --git a/src/MySQLReplication/Event/EventCommon.php b/src/MySQLReplication/Event/EventCommon.php
new file mode 100755
index 0000000..d5a1787
--- /dev/null
+++ b/src/MySQLReplication/Event/EventCommon.php
@@ -0,0 +1,31 @@
+eventInfo = $eventInfo;
+ $this->binaryDataReader = $binaryDataReader;
+ }
+}
\ No newline at end of file
diff --git a/src/MySQLReplication/Event/EventInfo.php b/src/MySQLReplication/Event/EventInfo.php
new file mode 100755
index 0000000..bbe24e3
--- /dev/null
+++ b/src/MySQLReplication/Event/EventInfo.php
@@ -0,0 +1,131 @@
+timestamp = $timestamp;
+ $this->type = $type;
+ $this->id = $id;
+ $this->size = $size;
+ $this->pos = $pos;
+ $this->flag = $flag;
+ $this->checkSum = $checkSum;
+ }
+
+ /**
+ * @return string
+ */
+ public function getDateTime()
+ {
+ return (new \DateTime())->setTimestamp($this->timestamp)->format('c');
+ }
+
+ /**
+ * @return string
+ */
+ public function getSizeNoHeader()
+ {
+ return (true === $this->checkSum ? $this->size - 23 : $this->size - 19);
+ }
+
+ /**
+ * @return int
+ */
+ public function getTimestamp()
+ {
+ return $this->timestamp;
+ }
+
+ /**
+ * @return string
+ */
+ public function getType()
+ {
+ return $this->type;
+ }
+
+ /**
+ * @return int
+ */
+ public function getId()
+ {
+ return $this->id;
+ }
+
+ /**
+ * @return int
+ */
+ public function getSize()
+ {
+ return $this->size;
+ }
+
+ /**
+ * @return int
+ */
+ public function getPos()
+ {
+ return $this->pos;
+ }
+
+ /**
+ * @return string
+ */
+ public function getFlag()
+ {
+ return $this->flag;
+ }
+}
\ No newline at end of file
diff --git a/src/MySQLReplication/Event/GtidEvent.php b/src/MySQLReplication/Event/GtidEvent.php
new file mode 100755
index 0000000..b2679db
--- /dev/null
+++ b/src/MySQLReplication/Event/GtidEvent.php
@@ -0,0 +1,24 @@
+binaryDataReader->readUInt8() == 1;
+ $sid = unpack('H*', $this->binaryDataReader->read(16))[1];
+ $gno = $this->binaryDataReader->readUInt64();
+
+ return new GTIDLogDTO(
+ $this->eventInfo,
+ $commit_flag,
+ vsprintf('%s%s%s%s%s%s%s%s-%s%s%s%s-%s%s%s%s-%s%s%s%s-%s%s%s%s%s%s%s%s%s%s%s%s', str_split($sid)) . ':' . $gno
+ );
+ }
+}
\ No newline at end of file
diff --git a/src/MySQLReplication/Event/QueryEvent.php b/src/MySQLReplication/Event/QueryEvent.php
new file mode 100755
index 0000000..ed8143f
--- /dev/null
+++ b/src/MySQLReplication/Event/QueryEvent.php
@@ -0,0 +1,31 @@
+binaryDataReader->advance(4);
+ $execution_time = $this->binaryDataReader->readUInt32();
+ $schema_length = $this->binaryDataReader->readUInt8();
+ $this->binaryDataReader->advance(2);
+ $status_vars_length = $this->binaryDataReader->readUInt16();
+ $this->binaryDataReader->advance($status_vars_length);
+ $schema = $this->binaryDataReader->read($schema_length);
+ $this->binaryDataReader->advance(1);
+ $query = $this->binaryDataReader->read($this->eventInfo->getSize() - 36 - $status_vars_length - $schema_length - 1);
+
+ return new QueryDTO(
+ $this->eventInfo,
+ $schema,
+ $execution_time,
+ $query
+ );
+ }
+}
\ No newline at end of file
diff --git a/src/MySQLReplication/Event/RotateEvent.php b/src/MySQLReplication/Event/RotateEvent.php
new file mode 100755
index 0000000..76d2733
--- /dev/null
+++ b/src/MySQLReplication/Event/RotateEvent.php
@@ -0,0 +1,27 @@
+binaryDataReader->readUInt64();
+ $binFileName = $this->binaryDataReader->read($this->eventInfo->getSizeNoHeader() - 8);
+
+ return new RotateDTO(
+ $this->eventInfo,
+ $pos,
+ $binFileName
+ );
+ }
+}
\ No newline at end of file
diff --git a/src/MySQLReplication/Event/RowEvent/RowEvent.php b/src/MySQLReplication/Event/RowEvent/RowEvent.php
new file mode 100755
index 0000000..6d5b2d7
--- /dev/null
+++ b/src/MySQLReplication/Event/RowEvent/RowEvent.php
@@ -0,0 +1,882 @@
+MySQLRepository = $MySQLRepository;
+ $this->config = $config;
+ }
+
+ /**
+ * This evenement describe the structure of a table.
+ * It's send before a change append on a table.
+ * A end user of the lib should have no usage of this
+ *
+ * @return TableMapDTO
+ */
+ public function makeTableMapDTO()
+ {
+ $tableId = $this->binaryDataReader->readTableId();
+ $this->binaryDataReader->advance(2);
+
+ $data = [];
+ $data['schema_length'] = $this->binaryDataReader->readUInt8();
+ $data['schema_name'] = $this->tableMapDatabase = $this->binaryDataReader->read($data['schema_length']);
+ $this->binaryDataReader->advance(1);
+
+ $data['table_length'] = $this->binaryDataReader->readUInt8();
+ $data['table_name'] = $this->tableMapTableName = $this->binaryDataReader->read($data['table_length']);
+ $this->binaryDataReader->advance(1);
+
+ $this->tableMapColumnsAmount = $this->binaryDataReader->readCodedBinary();
+
+ $column_type_def = $this->binaryDataReader->read($this->tableMapColumnsAmount);
+
+ // automatically clear array to save memory
+ if (count(self::$tableMapCache) >= 200)
+ {
+ self::$tableMapCache = array_slice(self::$tableMapCache, 100, -1, true);
+ }
+
+ $tableMapDTO = new TableMapDTO(
+ $this->eventInfo,
+ $tableId,
+ $data['schema_name'],
+ $data['table_name'],
+ $this->tableMapColumnsAmount
+ );
+
+ if (isset(self::$tableMapCache[$tableId]))
+ {
+ return $tableMapDTO;
+ }
+
+ $this->binaryDataReader->readCodedBinary();
+
+ self::$tableMapCache[$tableId]['fields'] = [];
+ self::$tableMapCache[$tableId]['database'] = $data['schema_name'];
+ self::$tableMapCache[$tableId]['table_name'] = $data['table_name'];
+
+ $columns = $this->MySQLRepository->getFields($data['schema_name'], $data['table_name']);
+ // if you drop tables and parse of logs you will get empty scheme
+ if (empty($columns))
+ {
+ return $tableMapDTO;
+ }
+
+ for ($i = 0; $i < strlen($column_type_def); $i++)
+ {
+ // this a dirty hack to prevent row events containing columns which have been dropped
+ if (!isset($columns[$i]))
+ {
+ $columns[$i] = [
+ 'COLUMN_NAME' => 'DROPPED_COLUMN_' . $i,
+ 'COLLATION_NAME' => null,
+ 'CHARACTER_SET_NAME' => null,
+ 'COLUMN_COMMENT' => null,
+ 'COLUMN_TYPE' => 'BLOB',
+ 'COLUMN_KEY' => '',
+ ];
+ $type = ConstFieldType::IGNORE;
+ }
+ else
+ {
+ $type = ord($column_type_def[$i]);
+ }
+
+ self::$tableMapCache[$tableId]['fields'][$i] = BinLogColumns::parse($type, $columns[$i], $this->binaryDataReader);
+ }
+
+ return $tableMapDTO;
+ }
+
+ /**
+ * @return WriteRowsDTO
+ */
+ public function makeWriteRowsDTO()
+ {
+ $this->rowInit();
+
+ if (false === $this->process)
+ {
+ return null;
+ }
+
+ $values = $this->getValues();
+
+ return new WriteRowsDTO(
+ $this->eventInfo,
+ $this->tableMapDatabase,
+ $this->tableMapTableName,
+ $this->tableMapColumnsAmount,
+ count($values),
+ $values
+ );
+ }
+
+ /**
+ *
+ */
+ private function rowInit()
+ {
+ $this->process = true;
+
+ $tableId = $this->binaryDataReader->readTableId();
+ $this->binaryDataReader->advance(2);
+
+ if (in_array($this->eventInfo->getType(), [
+ ConstEventType::DELETE_ROWS_EVENT_V2,
+ ConstEventType::WRITE_ROWS_EVENT_V2,
+ ConstEventType::UPDATE_ROWS_EVENT_V2
+ ]))
+ {
+ $this->binaryDataReader->read($this->binaryDataReader->readUInt16() / 8);
+ }
+
+ $this->tableMapColumnsAmount = $this->binaryDataReader->readCodedBinary();
+
+
+ $this->tableMapFields = [];
+ if (isset(self::$tableMapCache[$tableId]))
+ {
+ $this->tableMapFields = self::$tableMapCache[$tableId]['fields'];
+
+ if ([] !== $this->config->getTablesOnly() && !in_array(self::$tableMapCache[$tableId]['table_name'], $this->config->getTablesOnly()))
+ {
+ $this->process = false;
+ }
+
+ if ([] !== $this->config->getDatabasesOnly() && !in_array(self::$tableMapCache[$tableId]['database'], $this->config->getDatabasesOnly()))
+ {
+ $this->process = false;
+ }
+ }
+ if ([] == $this->tableMapFields)
+ {
+ //remove cache can be empty (drop table)
+ unset(self::$tableMapCache[$tableId]);
+ $this->process = false;
+ }
+ }
+
+ /**
+ * @param int $colsBitmap
+ * @return array
+ * @throws \Exception
+ */
+ private function getColumnData($colsBitmap)
+ {
+ $values = [];
+
+ $l = $this->getColumnsBinarySize($this->bitCount($colsBitmap));
+
+ // null bitmap length = (bits set in 'columns-present-bitmap'+7)/8
+ // see http://dev.mysql.com/doc/internals/en/rows-event.html
+ $null_bitmap = $this->binaryDataReader->read($l);
+ $nullBitmapIndex = 0;
+
+ foreach ($this->tableMapFields as $i => $column)
+ {
+ $name = $column['name'];
+ $unsigned = $column['unsigned'];
+
+ if ($this->bitGet($colsBitmap, $i) == 0)
+ {
+ $values[$name] = null;
+ continue;
+ }
+
+ if ($this->checkNull($null_bitmap, $nullBitmapIndex))
+ {
+ $values[$name] = null;
+ }
+ elseif ($column['type'] == ConstFieldType::IGNORE)
+ {
+ $values[$name] = null;
+ }
+ elseif ($column['type'] == ConstFieldType::TINY)
+ {
+ if ($unsigned)
+ {
+ $values[$name] = $this->binaryDataReader->readUInt8();
+ }
+ else
+ {
+ $values[$name] = $this->binaryDataReader->readInt8();
+ }
+ }
+ elseif ($column['type'] == ConstFieldType::SHORT)
+ {
+ if ($unsigned)
+ {
+ $values[$name] = $this->binaryDataReader->readUInt16();
+ }
+ else
+ {
+ $values[$name] = $this->binaryDataReader->readInt16();
+ }
+ }
+ elseif ($column['type'] == ConstFieldType::LONG)
+ {
+ if ($unsigned)
+ {
+ $values[$name] = $this->binaryDataReader->readUInt32();
+ }
+ else
+ {
+ $values[$name] = $this->binaryDataReader->readInt32();
+ }
+ }
+ elseif ($column['type'] == ConstFieldType::LONGLONG)
+ {
+ if ($unsigned)
+ {
+ $values[$name] = $this->binaryDataReader->readUInt64();
+ }
+ else
+ {
+ $values[$name] = $this->binaryDataReader->readInt64();
+ }
+ }
+ elseif ($column['type'] == ConstFieldType::INT24)
+ {
+ if ($unsigned)
+ {
+ $values[$name] = $this->binaryDataReader->readUInt24();
+ }
+ else
+ {
+ $values[$name] = $this->binaryDataReader->readInt24();
+ }
+ }
+ elseif ($column['type'] == ConstFieldType::FLOAT)
+ {
+ // http://dev.mysql.com/doc/refman/5.7/en/floating-point-types.html FLOAT(7,4)
+ $values[$name] = round($this->binaryDataReader->readFloat(), 4);
+ }
+ elseif ($column['type'] == ConstFieldType::DOUBLE)
+ {
+ $values[$name] = $this->binaryDataReader->readDouble();
+ }
+ elseif ($column['type'] == ConstFieldType::VARCHAR || $column['type'] == ConstFieldType::STRING)
+ {
+ if ($column['max_length'] > 255)
+ {
+ $values[$name] = $this->getString(2, $column);
+ }
+ else
+ {
+ $values[$name] = $this->getString(1, $column);
+ }
+ }
+ elseif ($column['type'] == ConstFieldType::NEWDECIMAL)
+ {
+ $values[$name] = $this->getDecimal($column);
+ }
+ elseif ($column['type'] == ConstFieldType::BLOB)
+ {
+ $values[$name] = $this->getString($column['length_size'], $column);
+ }
+ elseif ($column['type'] == ConstFieldType::DATETIME)
+ {
+ $values[$name] = $this->getDatetime();
+ }
+ elseif ($column['type'] == ConstFieldType::DATETIME2)
+ {
+ $values[$name] = $this->getDatetime2($column);
+ }
+ elseif ($column['type'] == ConstFieldType::TIME2)
+ {
+ $values[$name] = $this->getTime2($column);
+ }
+ elseif ($column['type'] == ConstFieldType::TIMESTAMP2)
+ {
+ $values[$name] = $this->getTimestamp2($column);
+ }
+ elseif ($column['type'] == ConstFieldType::DATE)
+ {
+ $values[$name] = $this->getDate();
+ }
+ elseif ($column['type'] == ConstFieldType::YEAR)
+ {
+ $values[$name] = $this->binaryDataReader->readUInt8() + 1900;
+ }
+ elseif ($column['type'] == ConstFieldType::ENUM)
+ {
+ $values[$name] = $column['enum_values'][$this->binaryDataReader->readUIntBySize($column['size']) - 1];
+ }
+ elseif ($column['type'] == ConstFieldType::SET)
+ {
+ $values[$name] = $this->getSet($column);
+ }
+ elseif ($column['type'] == ConstFieldType::BIT)
+ {
+ $values[$name] = $this->getBit($column);
+ }
+ elseif ($column['type'] == ConstFieldType::GEOMETRY)
+ {
+ $values[$name] = $this->binaryDataReader->readLengthCodedPascalString($column['length_size']);
+ }
+ else
+ {
+ throw new BinLogException('Unknown row type: ' . $column['type']);
+ }
+
+ $nullBitmapIndex += 1;
+ }
+
+ return $values;
+ }
+
+ /**
+ * @param int $columns
+ * @return int
+ */
+ private function getColumnsBinarySize($columns)
+ {
+ return (int)(($columns + 7) / 8);
+ }
+
+ /**
+ * @param string $bitmap
+ * @return int
+ */
+ protected function bitCount($bitmap)
+ {
+ $n = 0;
+ for ($i = 0; $i < strlen($bitmap); $i++)
+ {
+ $bit = $bitmap[$i];
+ if (is_string($bit))
+ {
+ $bit = ord($bit);
+ }
+ $n += $this->bitCountInByte[$bit];
+ }
+
+ return $n;
+ }
+
+ /**
+ * @param string $bitmap
+ * @param int $position
+ * @return int
+ */
+ private function bitGet($bitmap, $position)
+ {
+ $bit = $bitmap[(int)($position / 8)];
+ if (is_string($bit))
+ {
+ $bit = ord($bit);
+ }
+
+ return $bit & (1 << ($position & 7));
+ }
+
+ /**
+ * @param string $nullBitmap
+ * @param int $position
+ * @return int
+ */
+ private function checkNull($nullBitmap, $position)
+ {
+ $bit = $nullBitmap[intval($position / 8)];
+ if (is_string($bit))
+ {
+ $bit = ord($bit);
+ }
+
+ return $bit & (1 << ($position % 8));
+ }
+
+ /**
+ * @param int $size
+ * @param array $column
+ * @return string
+ */
+ private function getString($size, array $column)
+ {
+ $string = $this->binaryDataReader->readLengthCodedPascalString($size);
+ if ($column['character_set_name'])
+ {
+ // convert strings?
+ }
+
+ return $string;
+ }
+
+ /**
+ * Read MySQL's new decimal format introduced in MySQL 5
+ * @param array $column
+ * @return string
+ */
+ private function getDecimal(array $column)
+ {
+ $digits_per_integer = 9;
+ $compressed_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4];
+ $integral = $column['precision'] - $column['decimals'];
+ $uncomp_integral = (int)($integral / $digits_per_integer);
+ $uncomp_fractional = (int)($column['decimals'] / $digits_per_integer);
+ $comp_integral = $integral - ($uncomp_integral * $digits_per_integer);
+ $comp_fractional = $column['decimals'] - ($uncomp_fractional * $digits_per_integer);
+
+ $value = $this->binaryDataReader->readUInt8();
+ if (0 != ($value & 0x80))
+ {
+ $mask = 0;
+ $res = '';
+ }
+ else
+ {
+ $mask = -1;
+ $res = '-';
+ }
+ $this->binaryDataReader->unread(pack('C', ($value ^ 0x80)));
+
+ $size = $compressed_bytes[$comp_integral];
+ if ($size > 0)
+ {
+ $value = $this->binaryDataReader->readIntBeBySize($size) ^ $mask;
+ $res .= $value;
+ }
+
+ for ($i = 0; $i < $uncomp_integral; $i++)
+ {
+ $value = $this->binaryDataReader->readInt32Be() ^ $mask;
+ $res .= sprintf('%09d', $value);
+ }
+
+ $res .= '.';
+
+ for ($i = 0; $i < $uncomp_fractional; $i++)
+ {
+ $value = $this->binaryDataReader->readInt32Be() ^ $mask;
+ $res .= sprintf('%09d', $value);
+ }
+
+ $size = $compressed_bytes[$comp_fractional];
+ if ($size > 0)
+ {
+ $value = $this->binaryDataReader->readIntBeBySize($size) ^ $mask;
+ $res .= sprintf('%0' . $comp_fractional . 'd', $value);
+ }
+
+ return bcmul($res, 1, $column['precision']);
+ }
+
+ /**
+ * @return float|null
+ */
+ private function getDatetime()
+ {
+ $value = $this->binaryDataReader->readUInt64();
+ // nasty mysql 0000-00-00 dates
+ if ($value == 0)
+ {
+ return null;
+ }
+
+ $date = $value / 1000000;
+ $year = (int)($date / 10000);
+ $month = (int)(($date % 10000) / 100);
+ $day = (int)($date % 100);
+ if ($year == 0 || $month == 0 || $day == 0)
+ {
+ return null;
+ }
+
+ return (new \DateTime())->setDate($year, $month, $day)->format('Y-m-d');
+ }
+
+ /**
+ * Date Time
+ * 1 bit sign (1= non-negative, 0= negative)
+ * 17 bits year*13+month (year 0-9999, month 0-12)
+ * 5 bits day (0-31)
+ * 5 bits hour (0-23)
+ * 6 bits minute (0-59)
+ * 6 bits second (0-59)
+ * ---------------------------
+ * 40 bits = 5 bytes
+ * @param array $column
+ * @return string
+ * @throws \Exception
+ */
+ private function getDatetime2(array $column)
+ {
+ $data = $this->binaryDataReader->readIntBeBySize(5);
+
+ $year_month = $this->getBinarySlice($data, 1, 17, 40);
+
+ $year = (int)($year_month / 13);
+ $month = $year_month % 13;
+ $day = $this->getBinarySlice($data, 18, 5, 40);
+ $hour = $this->getBinarySlice($data, 23, 5, 40);
+ $minute = $this->getBinarySlice($data, 28, 6, 40);
+ $second = $this->getBinarySlice($data, 34, 6, 40);
+
+ $date = new \DateTime($year . '-' . $month . '-' . $day . ' ' . $hour . ':' . $minute . ':' . $second);
+ if (array_sum($date->getLastErrors()) > 0)
+ {
+ return null;
+ }
+
+ return $date->format('Y-m-d H:i:s') . $this->getFSP($column);
+ }
+
+ /**
+ * Read a part of binary data and extract a number
+ * binary: the data
+ * start: From which bit (1 to X)
+ * size: How many bits should be read
+ * data_length: data size
+ *
+ * @param int $binary
+ * @param int $start
+ * @param int $size
+ * @param int $data_length
+ * @return int
+ */
+ private function getBinarySlice($binary, $start, $size, $data_length)
+ {
+ $binary = $binary >> $data_length - ($start + $size);
+ $mask = ((1 << $size) - 1);
+
+ return $binary & $mask;
+ }
+
+ /**
+ * Read and add the fractional part of time
+ * For more details about new date format:
+ * http://dev.mysql.com/doc/internals/en/date-and-time-data-type-representation.html
+ *
+ * @param array $column
+ * @return int|string
+ * @throws \Exception
+ */
+ private function getFSP(array $column)
+ {
+ $read = 0;
+ $time = '';
+ if ($column['fsp'] == 1 || $column['fsp'] == 2)
+ {
+ $read = 1;
+ }
+ elseif ($column['fsp'] == 3 || $column['fsp'] == 4)
+ {
+ $read = 2;
+ }
+ elseif ($column ['fsp'] == 5 || $column['fsp'] == 6)
+ {
+ $read = 3;
+ }
+ if ($read > 0)
+ {
+ $microsecond = $this->binaryDataReader->readIntBeBySize($read);
+ if ($column['fsp'] % 2)
+ {
+ $time = (int)($microsecond / 10);
+ }
+ else
+ {
+ $time = $microsecond;
+ }
+ }
+
+ return $time;
+ }
+
+ /**
+ * TIME encoding for nonfractional part:
+ * 1 bit sign (1= non-negative, 0= negative)
+ * 1 bit unused (reserved for future extensions)
+ * 10 bits hour (0-838)
+ * 6 bits minute (0-59)
+ * 6 bits second (0-59)
+ * ---------------------
+ * 24 bits = 3 bytes
+ *
+ * @param array $column
+ * @return string
+ */
+ private function getTime2(array $column)
+ {
+ $data = $this->binaryDataReader->readInt24Be();
+
+ $hour = $this->getBinarySlice($data, 2, 10, 24);
+ $minute = $this->getBinarySlice($data, 12, 6, 24);
+ $second = $this->getBinarySlice($data, 18, 6, 24);
+
+ return (new \DateTime())->setTime($hour, $minute, $second)->format('H:i:s') . $this->getFSP($column);
+ }
+
+ /**
+ * @param array $column
+ * @return bool|string
+ * @throws BinLogException
+ */
+ private function getTimestamp2(array $column)
+ {
+ $time = date('Y-m-d H:i:s', $this->binaryDataReader->readInt32Be());
+ $fsp = $this->getFSP($column);
+ if ('' !== $fsp)
+ {
+ $time .= '.' . $fsp;
+ }
+ return $time;
+ }
+
+ /**
+ * @return string
+ */
+ private function getDate()
+ {
+ $time = $this->binaryDataReader->readUInt24();
+ if (0 == $time)
+ {
+ return null;
+ }
+
+ $year = ($time & ((1 << 15) - 1) << 9) >> 9;
+ $month = ($time & ((1 << 4) - 1) << 5) >> 5;
+ $day = ($time & ((1 << 5) - 1));
+ if ($year == 0 || $month == 0 || $day == 0)
+ {
+ return null;
+ }
+
+ return (new \DateTime())->setDate($year, $month, $day)->format('Y-m-d');
+ }
+
+ /**
+ * @param array $column
+ * @return array
+ * @throws BinLogException
+ */
+ private function getSet(array $column)
+ {
+ // we read set columns as a bitmap telling us which options are enabled
+ $bit_mask = $this->binaryDataReader->readUIntBySize($column['size']);
+ $sets = [];
+ foreach ($column['set_values'] as $k => $item)
+ {
+ if ($bit_mask & pow(2, $k))
+ {
+ $sets[] = $item;
+ }
+ }
+
+ return $sets;
+ }
+
+ /**
+ * Read MySQL BIT type
+ * @param array $column
+ * @return string
+ */
+ private function getBit(array $column)
+ {
+ $res = '';
+ for ($byte = 0; $byte < $column['bytes']; $byte++)
+ {
+ $current_byte = '';
+ $data = $this->binaryDataReader->readUInt8();
+ if (0 === $byte)
+ {
+ if (1 === $column['bytes'])
+ {
+ $end = $column['bits'];
+ }
+ else
+ {
+ $end = $column['bits'] % 8;
+ if (0 === $end)
+ {
+ $end = 8;
+ }
+ }
+ }
+ else
+ {
+ $end = 8;
+ }
+
+ for ($bit = 0; $bit < $end; $bit++)
+ {
+ if ($data & (1 << $bit))
+ {
+ $current_byte .= '1';
+ }
+ else
+ {
+ $current_byte .= '0';
+ }
+
+ }
+ $res .= strrev($current_byte);
+ }
+
+ return $res;
+ }
+
+ /**
+ * @return DeleteRowsDTO
+ */
+ public function makeDeleteRowsDTO()
+ {
+ $this->rowInit();
+
+ if (false === $this->process)
+ {
+ return null;
+ }
+
+ $values = $this->getValues();
+
+ return new DeleteRowsDTO(
+ $this->eventInfo,
+ $this->tableMapDatabase,
+ $this->tableMapTableName,
+ $this->tableMapColumnsAmount,
+ count($values),
+ $values
+ );
+ }
+
+ /**
+ * @return UpdateRowsDTO
+ */
+ public function makeUpdateRowsDTO()
+ {
+ $this->rowInit();
+
+ if (false === $this->process)
+ {
+ return null;
+ }
+
+ $columnsBinarySize = $this->getColumnsBinarySize($this->tableMapColumnsAmount);
+ $beforeBinaryData = $this->binaryDataReader->read($columnsBinarySize);
+ $afterBinaryData = $this->binaryDataReader->read($columnsBinarySize);
+
+ $values = [];
+ while (false === $this->binaryDataReader->isComplete($this->eventInfo->getSizeNoHeader()))
+ {
+ $values[] = [
+ 'before' => $this->getColumnData($beforeBinaryData),
+ 'after' => $this->getColumnData($afterBinaryData)
+ ];
+ }
+
+ return new UpdateRowsDTO(
+ $this->eventInfo,
+ $this->tableMapDatabase,
+ $this->tableMapTableName,
+ $this->tableMapColumnsAmount,
+ count($values),
+ $values
+ );
+ }
+
+ /**
+ * @return array
+ * @throws BinLogException
+ */
+ private function getValues()
+ {
+ $columnsBinarySize = $this->getColumnsBinarySize($this->tableMapColumnsAmount);
+ $binaryData = $this->binaryDataReader->read($columnsBinarySize);
+
+ $values = [];
+ while (!$this->binaryDataReader->isComplete($this->eventInfo->getSizeNoHeader()))
+ {
+ $values[] = $this->getColumnData($binaryData);
+ }
+
+ return $values;
+ }
+}
diff --git a/src/MySQLReplication/Event/RowEvent/RowEventBuilder.php b/src/MySQLReplication/Event/RowEvent/RowEventBuilder.php
new file mode 100755
index 0000000..2444f8f
--- /dev/null
+++ b/src/MySQLReplication/Event/RowEvent/RowEventBuilder.php
@@ -0,0 +1,67 @@
+MySQLRepository = $MySQLRepository;
+ $this->config = $config;
+ }
+
+ /**
+ * @param BinaryDataReader $package
+ */
+ public function withPackage(BinaryDataReader $package)
+ {
+ $this->package = $package;
+ }
+
+ /**
+ * @return RowEvent
+ */
+ public function build()
+ {
+ return new RowEvent(
+ $this->config,
+ $this->MySQLRepository,
+ $this->package,
+ $this->eventInfo
+ );
+ }
+
+ /**
+ * @param EventInfo $eventInfo
+ */
+ public function withEventInfo(EventInfo $eventInfo)
+ {
+ $this->eventInfo = $eventInfo;
+ }
+}
\ No newline at end of file
diff --git a/src/MySQLReplication/Event/RowEvent/RowEventService.php b/src/MySQLReplication/Event/RowEvent/RowEventService.php
new file mode 100755
index 0000000..bef98ba
--- /dev/null
+++ b/src/MySQLReplication/Event/RowEvent/RowEventService.php
@@ -0,0 +1,38 @@
+rowEventBuilder = new RowEventBuilder($config, $mySQLRepository);
+ }
+
+ /**
+ * @param BinaryDataReader $package
+ * @param EventInfo $eventInfo
+ * @return RowEvent
+ */
+ public function makeRowEvent(BinaryDataReader $package, EventInfo $eventInfo)
+ {
+ $this->rowEventBuilder->withPackage($package);
+ $this->rowEventBuilder->withEventInfo($eventInfo);
+
+ return $this->rowEventBuilder->build();
+ }
+}
\ No newline at end of file
diff --git a/src/MySQLReplication/Event/XidEvent.php b/src/MySQLReplication/Event/XidEvent.php
new file mode 100755
index 0000000..a466bfc
--- /dev/null
+++ b/src/MySQLReplication/Event/XidEvent.php
@@ -0,0 +1,23 @@
+eventInfo,
+ $this->binaryDataReader->readUInt64()
+ );
+ }
+}
\ No newline at end of file
diff --git a/src/MySQLReplication/Gtid/GtidCollection.php b/src/MySQLReplication/Gtid/GtidCollection.php
new file mode 100755
index 0000000..d724bc5
--- /dev/null
+++ b/src/MySQLReplication/Gtid/GtidCollection.php
@@ -0,0 +1,41 @@
+toArray() as $gtid)
+ {
+ $l += $gtid->encoded_length();
+ }
+
+ return $l;
+ }
+
+ /**
+ * @return string
+ */
+ public function getEncodedPacket()
+ {
+ $s = pack('Q', $this->count());
+ /** @var GtidEntity $gtid */
+ foreach ($this->toArray() as $gtid)
+ {
+ $s .= $gtid->encode();
+ }
+
+ return $s;
+ }
+}
\ No newline at end of file
diff --git a/src/MySQLReplication/Pack/Gtid.php b/src/MySQLReplication/Gtid/GtidEntity.php
similarity index 72%
rename from src/MySQLReplication/Pack/Gtid.php
rename to src/MySQLReplication/Gtid/GtidEntity.php
index 3af5f54..c9f0901 100755
--- a/src/MySQLReplication/Pack/Gtid.php
+++ b/src/MySQLReplication/Gtid/GtidEntity.php
@@ -1,11 +1,11 @@
gtid = $gtid;
- preg_match('/^([0-9a-fA-F]{8}(?:-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12})((?::[0-9-]+)+)$/', $this->gtid, $matches);
+ if (false === (bool)preg_match('/^([0-9a-fA-F]{8}(?:-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12})((?::[0-9-]+)+)$/', $this->gtid, $matches))
+ {
+ throw new GtidException(GtidException::INCORRECT_GTID_MESSAGE, GtidException::INCORRECT_GTID_CODE);
+ }
$this->sid = $matches[1];
foreach (array_filter(explode(':', $matches[2])) as $k)
@@ -68,6 +72,6 @@ public function encode()
*/
public function encoded_length()
{
- return (16 + 8 + 2 * 8 * count($this->intervals));
+ return (40 * count($this->intervals));
}
}
\ No newline at end of file
diff --git a/src/MySQLReplication/Gtid/GtidException.php b/src/MySQLReplication/Gtid/GtidException.php
new file mode 100755
index 0000000..c65f3c0
--- /dev/null
+++ b/src/MySQLReplication/Gtid/GtidException.php
@@ -0,0 +1,9 @@
+GtidCollection = new GtidCollection();
+ }
+
+ /**
+ * @param string $gtids
+ * @return GtidCollection
+ */
+ public function makeCollectionFromString($gtids)
+ {
+ foreach (array_filter(explode(',', $gtids)) as $gtid)
+ {
+ $this->GtidCollection->add(new GtidEntity($gtid));
+ }
+
+ return $this->GtidCollection;
+ }
+}
\ No newline at end of file
diff --git a/src/MySQLReplication/Pack/GtidSet.php b/src/MySQLReplication/Pack/GtidSet.php
deleted file mode 100755
index 0b845fa..0000000
--- a/src/MySQLReplication/Pack/GtidSet.php
+++ /dev/null
@@ -1,56 +0,0 @@
-gtids[] = new Gtid($gtid);
- }
- }
-
- /**
- * @return int
- */
- public function encoded_length()
- {
- $l = 8;
-
- foreach ($this->gtids as $gtid)
- {
- $l += $gtid->encoded_length();
- }
-
- return $l;
- }
-
- /**
- * @return string
- */
- public function encoded()
- {
- $s = pack('Q', count($this->gtids));
-
- foreach ($this->gtids as $gtid)
- {
- $s .= $gtid->encode();
- }
-
- return $s;
- }
-}
\ No newline at end of file
diff --git a/src/MySQLReplication/Pack/RowEvent.php b/src/MySQLReplication/Pack/RowEvent.php
deleted file mode 100755
index 878a32a..0000000
--- a/src/MySQLReplication/Pack/RowEvent.php
+++ /dev/null
@@ -1,850 +0,0 @@
-read(2))[1];
-
- $data = [];
- $data['schema_length'] = unpack('C', $pack->read(1))[1];
- $data['schema_name'] = self::$SCHEMA_NAME = $pack->read($data['schema_length']);
-
- self::$BinLogPack->advance(1);
-
- $data['table_length'] = unpack('C', self::$BinLogPack->read(1))[1];
- $data['table_name'] = self::$TABLE_NAME = $pack->read($data['table_length']);
-
- self::$BinLogPack->advance(1);
-
- self::$COLUMNS_NUM = self::$BinLogPack->readCodedBinary();
-
- $column_type_def = self::$BinLogPack->read(self::$COLUMNS_NUM);
-
- // automatyczne czyszczenie starych danych
- if (count(self::$TABLE_MAP) >= 200) {
- self::$TABLE_MAP = array_slice(self::$TABLE_MAP, 100, -1, true);
- }
-
- $tableMapDTO = new TableMapDTO(
- $eventInfo['date'],
- $eventInfo['pos'],
- $eventInfo['size'],
- $size,
- self::$TABLE_ID,
- $data['schema_name'],
- $data['table_name'],
- self::$COLUMNS_NUM
- );
-
- if (isset(self::$TABLE_MAP[self::$TABLE_ID]))
- {
- return $tableMapDTO;
- }
-
- self::$BinLogPack->readCodedBinary();
-
- $columns = $DBHelper->getFields($data['schema_name'], $data['table_name']);
-
- self::$TABLE_MAP[self::$TABLE_ID]['fields'] = [];
- self::$TABLE_MAP[self::$TABLE_ID]['database'] = $data['schema_name'];
- self::$TABLE_MAP[self::$TABLE_ID]['table_name'] = $data['table_name'];
-
- // if you drop tables and parse of logs you will get empty scheme
- if (empty($columns))
- {
- return null;
- }
-
- for ($i = 0; $i < strlen($column_type_def); $i++)
- {
- // this a dirty hack to prevent row events containing columns which have been dropped prior
- // to pymysqlreplication start, but replayed from binlog from blowing up the service.
- if (!isset($columns[$i]))
- {
- $columns[$i] = [
- 'COLUMN_NAME' => 'DROPPED_COLUMN_' . $i,
- 'COLLATION_NAME' => null,
- 'CHARACTER_SET_NAME' => null,
- 'COLUMN_COMMENT' => null,
- 'COLUMN_TYPE' => 'BLOB',
- 'COLUMN_KEY' => '',
- ];
- $type = ConstFieldType::IGNORE;
- }
- else
- {
- $type = ord($column_type_def[$i]);
- }
-
- self::$TABLE_MAP[self::$TABLE_ID]['fields'][$i] = BinLogColumns::parse($type, $columns[$i], self::$BinLogPack);
- }
-
- return $tableMapDTO;
- }
-
- /**
- * @param BinLogPack $pack
- * @param array $eventInfo
- * @param $size
- * @param $onlyTables
- * @param $onlyDatabases
- * @return mixed
- */
- public static function addRow(BinLogPack $pack, array $eventInfo, $size, $onlyTables, $onlyDatabases)
- {
- self::rowInit($pack, $eventInfo['type'], $size, $onlyTables, $onlyDatabases);
-
- if (false === self::$process)
- {
- return null;
- }
-
- $values = self::_getAddRows(['bitmap' => self::$BinLogPack->read(self::getColumnsAmount(self::$COLUMNS_NUM))]);
-
- return new WriteRowsDTO(
- $eventInfo['date'],
- $eventInfo['pos'],
- $eventInfo['size'],
- $size,
- self::$SCHEMA_NAME,
- self::$TABLE_NAME,
- self::$COLUMNS_NUM,
- count($values),
- $values
- );
- }
-
- /**
- * @param BinLogPack $pack
- * @param $event_type
- * @param $size
- * @param array $onlyTables
- * @param array $onlyDatabases
- */
- private static function rowInit(BinLogPack $pack, $event_type, $size, array $onlyTables, array $onlyDatabases)
- {
- self::$process = true;
-
- parent::init($pack, $event_type, $size);
-
- self::$TABLE_ID = self::readTableId();
-
- self::$FLAGS = self::$BinLogPack->readUIntBySize(2);
-
- if (in_array(self::$EVENT_TYPE, [
- ConstEventType::DELETE_ROWS_EVENT_V2,
- ConstEventType::WRITE_ROWS_EVENT_V2,
- ConstEventType::UPDATE_ROWS_EVENT_V2
- ]))
- {
- self::$EXTRA_DATA_LENGTH = self::$BinLogPack->readUIntBySize(2);
-
- self::$EXTRA_DATA = self::$BinLogPack->read(self::$EXTRA_DATA_LENGTH / 8);
- }
-
- self::$COLUMNS_NUM = self::$BinLogPack->readCodedBinary();
-
-
- self::$fields = [];
- if (self::$TABLE_MAP[self::$TABLE_ID])
- {
- self::$fields = self::$TABLE_MAP[self::$TABLE_ID]['fields'];
-
- if (!empty($onlyTables) && !in_array(self::$TABLE_MAP[self::$TABLE_ID]['table_name'], $onlyTables))
- {
- self::$process = false;
- }
-
- if (!empty($onlyTables) && !in_array(self::$TABLE_MAP[self::$TABLE_ID]['database'], $onlyDatabases))
- {
- self::$process = false;
- }
- }
- if ([] == self::$fields)
- {
- //remove cache can be empty (drop table)
- unset(self::$TABLE_MAP[self::$TABLE_ID]);
-
- self::$process = false;
- }
- }
-
- /**
- * @param int $columns
- * @return int
- */
- private static function getColumnsAmount($columns)
- {
- return (int)(($columns + 7) / 8);
- }
-
- /**
- * @param array $result
- * @return array
- */
- private static function _getAddRows(array $result)
- {
- $rows = [];
- while (!self::$BinLogPack->isComplete(self::$PACK_SIZE))
- {
- $rows[] = self::_read_column_data($result['bitmap']);
- }
-
- return $rows;
- }
-
- /**
- * @param $cols_bitmap
- * @return array
- * @throws \Exception
- */
- private static function _read_column_data($cols_bitmap)
- {
- $values = [];
-
- $l = self::getColumnsAmount(self::bitCount($cols_bitmap));
-
- // null bitmap length = (bits set in 'columns-present-bitmap'+7)/8
- // see http://dev.mysql.com/doc/internals/en/rows-event.html
- $null_bitmap = self::$BinLogPack->read($l);
- $nullBitmapIndex = 0;
-
- foreach (self::$fields as $i => $column)
- {
- $name = $column['name'];
- $unsigned = $column['unsigned'];
-
- if (self::bitGet($cols_bitmap, $i) == 0)
- {
- $values[$name] = null;
- continue;
- }
-
- if (self::_is_null($null_bitmap, $nullBitmapIndex))
- {
- $values[$name] = null;
- }
- elseif ($column['type'] == ConstFieldType::IGNORE)
- {
- $values[$name] = null;
- }
- elseif ($column['type'] == ConstFieldType::TINY)
- {
- if ($unsigned)
- {
- $values[$name] = unpack('C', self::$BinLogPack->read(1))[1];
- }
- else
- {
- $values[$name] = unpack('c', self::$BinLogPack->read(1))[1];
- }
- }
- elseif ($column['type'] == ConstFieldType::SHORT)
- {
- if ($unsigned)
- {
- $values[$name] = unpack('v', self::$BinLogPack->read(2))[1];
- }
- else
- {
- $values[$name] = unpack('s', self::$BinLogPack->read(2))[1];
- }
- }
- elseif ($column['type'] == ConstFieldType::LONG)
- {
- if ($unsigned)
- {
- $values[$name] = unpack('I', self::$BinLogPack->read(4))[1];
- }
- else
- {
- $values[$name] = unpack('i', self::$BinLogPack->read(4))[1];
- }
- }
- elseif ($column['type'] == ConstFieldType::INT24)
- {
- if ($unsigned)
- {
- $values[$name] = self::$BinLogPack->readUInt24();
- }
- else
- {
- $values[$name] = self::$BinLogPack->readInt24();
- }
- }
- elseif ($column['type'] == ConstFieldType::FLOAT)
- {
- // http://dev.mysql.com/doc/refman/5.7/en/floating-point-types.html FLOAT(7,4)
- $values[$name] = round(unpack('f', self::$BinLogPack->read(4))[1], 4);
- }
- elseif ($column['type'] == ConstFieldType::DOUBLE)
- {
- $values[$name] = unpack('d', self::$BinLogPack->read(8))[1];
- }
- elseif ($column['type'] == ConstFieldType::VARCHAR || $column['type'] == ConstFieldType::STRING)
- {
- if ($column['max_length'] > 255)
- {
- $values[$name] = self::_read_string(2, $column);
- }
- else
- {
- $values[$name] = self::_read_string(1, $column);
- }
- }
- elseif ($column['type'] == ConstFieldType::NEWDECIMAL)
- {
- $values[$name] = self::__read_new_decimal($column);
- }
- elseif ($column['type'] == ConstFieldType::BLOB)
- {
- $values[$name] = self::_read_string($column['length_size'], $column);
- }
- elseif ($column['type'] == ConstFieldType::DATETIME)
- {
- $values[$name] = self::_read_datetime();
- }
- elseif ($column['type'] == ConstFieldType::DATETIME2)
- {
- $values[$name] = self::_read_datetime2($column);
- }
- elseif ($column['type'] == ConstFieldType::TIME2)
- {
- $values[$name] = self::_read_time2($column);
- }
- elseif ($column['type'] == ConstFieldType::TIMESTAMP2)
- {
- $time = date('Y-m-d H:i:s', self::$BinLogPack->readIntBeBySize(4));
- $fsp = self::_add_fsp_to_time($column);
- if ('' !== $fsp)
- {
- $time .= '.' . $fsp;
- }
- $values[$name] = $time;
- }
- elseif ($column['type'] == ConstFieldType::DATE)
- {
- $values[$name] = self::_read_date();
- }
- elseif ($column['type'] == ConstFieldType::LONGLONG)
- {
- if ($unsigned)
- {
- $values[$name] = self::$BinLogPack->readUInt64();
- }
- else
- {
- $values[$name] = self::$BinLogPack->readInt64();
- }
- }
- elseif ($column['type'] == ConstFieldType::YEAR)
- {
- $values[$name] = self::$BinLogPack->readUInt8() + 1900;
- }
- elseif ($column['type'] == ConstFieldType::ENUM)
- {
- $values[$name] = $column['enum_values'][self::$BinLogPack->readUIntBySize($column['size']) - 1];
- }
- elseif ($column['type'] == ConstFieldType::SET)
- {
- // we read set columns as a bitmap telling us which options are enabled
- $bit_mask = self::$BinLogPack->readUIntBySize($column['size']);
- $sets = [];
- foreach ($column['set_values'] as $k => $item)
- {
- if ($bit_mask & pow(2, $k))
- {
- $sets[] = $item;
- }
- }
- $values[$name] = $sets;
- }
- elseif ($column['type'] == ConstFieldType::BIT)
- {
- $values[$name] = self::_read_bit($column);
- }
- elseif ($column['type'] == ConstFieldType::GEOMETRY)
- {
- $values[$name] = self::$BinLogPack->readLengthCodedPascalString($column['length_size']);
- }
- else
- {
- throw new BinLogException('Unknown row type: ' . $column['type']);
- }
-
- $nullBitmapIndex += 1;
- }
-
- return $values;
- }
-
- /**
- * @param $bitmap
- * @param $position
- * @return int
- */
- private static function bitGet($bitmap, $position)
- {
- $bit = $bitmap[(int)($position / 8)];
- if (is_string($bit))
- {
- $bit = ord($bit);
- }
-
- return $bit & (1 << ($position & 7));
- }
-
- /**
- * @param $null_bitmap
- * @param $position
- * @return int
- */
- private static function _is_null($null_bitmap, $position)
- {
- $bit = $null_bitmap[intval($position / 8)];
- if (is_string($bit))
- {
- $bit = ord($bit);
- }
-
- return $bit & (1 << ($position % 8));
- }
-
- /**
- * @param int $size
- * @param array $column
- * @return string
- */
- private static function _read_string($size, array $column)
- {
- $string = self::$BinLogPack->readLengthCodedPascalString($size);
- if ($column['character_set_name'])
- {
- // convert strings?
- }
-
- return $string;
- }
-
- /**
- * Read MySQL's new decimal format introduced in MySQL 5
- * @param $column
- * @return string
- */
- private static function __read_new_decimal(array $column)
- {
- $digits_per_integer = 9;
- $compressed_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4];
- $integral = $column['precision'] - $column['decimals'];
- $uncomp_integral = (int)($integral / $digits_per_integer);
- $uncomp_fractional = (int)($column['decimals'] / $digits_per_integer);
- $comp_integral = $integral - ($uncomp_integral * $digits_per_integer);
- $comp_fractional = $column['decimals'] - ($uncomp_fractional * $digits_per_integer);
-
- $value = self::$BinLogPack->readUInt8();
- if (0 != ($value & 0x80))
- {
- $mask = 0;
- $res = '';
- }
- else
- {
- $mask = -1;
- $res = '-';
- }
- self::$BinLogPack->unread(pack('C', ($value ^ 0x80)));
-
- $size = $compressed_bytes[$comp_integral];
- if ($size > 0)
- {
- $value = self::$BinLogPack->readIntBeBySize($size) ^ $mask;
- $res .= $value;
- }
-
- for ($i = 0; $i < $uncomp_integral; $i++)
- {
- $value = self::$BinLogPack->readIntBeBySize(4) ^ $mask;
- $res .= sprintf('%09d', $value);
- }
-
- $res .= '.';
-
- for ($i = 0; $i < $uncomp_fractional; $i++)
- {
- $value = self::$BinLogPack->readIntBeBySize(4) ^ $mask;
- $res .= sprintf('%09d', $value);
- }
-
- $size = $compressed_bytes[$comp_fractional];
- if ($size > 0)
- {
- $value = self::$BinLogPack->readIntBeBySize($size) ^ $mask;
- $res .= sprintf('%0' . $comp_fractional . 'd', $value);
- }
-
- return bcmul($res, 1, $column['precision']);
- }
-
- /**
- * @return float|null
- */
- private static function _read_datetime()
- {
- $value = self::$BinLogPack->readUInt64();
- if ($value == 0) # nasty mysql 0000-00-00 dates
- {
- return null;
- }
-
- $date = $value / 1000000;
- $year = (int)($date / 10000);
- $month = (int)(($date % 10000) / 100);
- $day = (int)($date % 100);
- if ($year == 0 || $month == 0 || $day == 0)
- {
- return '';
- }
-
- $date = new \DateTime();
- $date->setDate($year, $month, $day);
- return $date->format('Y-m-d');
- }
-
- /**
- * Date Time
- * 1 bit sign (1= non-negative, 0= negative)
- * 17 bits year*13+month (year 0-9999, month 0-12)
- * 5 bits day (0-31)
- * 5 bits hour (0-23)
- * 6 bits minute (0-59)
- * 6 bits second (0-59)
- * ---------------------------
- * 40 bits = 5 bytes
- * @param $column
- * @return string
- * @throws \Exception
- */
- private static function _read_datetime2(array $column)
- {
- $data = self::$BinLogPack->readIntBeBySize(5);
-
- $year_month = self::_read_binary_slice($data, 1, 17, 40);
-
- $year = (int)($year_month / 13);
- $month = $year_month % 13;
- $day = self::_read_binary_slice($data, 18, 5, 40);
- $hour = self::_read_binary_slice($data, 23, 5, 40);
- $minute = self::_read_binary_slice($data, 28, 6, 40);
- $second = self::_read_binary_slice($data, 34, 6, 40);
-
- $date = new \DateTime($year . '-' . $month . '-' . $day . ' ' . $hour . ':' . $minute . ':' . $second);
- if (array_sum($date->getLastErrors()) > 0)
- {
- return null;
- }
- return $date->format('Y-m-d H:i:s') . self::_add_fsp_to_time($column);
- }
-
- /**
- * Read a part of binary data and extract a number
- * binary: the data
- * start: From which bit (1 to X)
- * size: How many bits should be read
- * data_length: data size
- *
- * @param $binary
- * @param $start
- * @param $size
- * @param $data_length
- * @return int
- */
- private static function _read_binary_slice($binary, $start, $size, $data_length)
- {
- $binary = $binary >> $data_length - ($start + $size);
- $mask = ((1 << $size) - 1);
- return $binary & $mask;
- }
-
- /**
- * Read and add the fractional part of time
- * For more details about new date format:
- * http://dev.mysql.com/doc/internals/en/date-and-time-data-type-representation.html
- *
- * @param array $column
- * @return int|string
- * @throws \Exception
- */
- private static function _add_fsp_to_time(array $column)
- {
- $read = 0;
- $time = '';
- if ($column['fsp'] == 1 || $column['fsp'] == 2)
- {
- $read = 1;
- }
- elseif ($column['fsp'] == 3 || $column['fsp'] == 4)
- {
- $read = 2;
- }
- elseif ($column ['fsp'] == 5 || $column['fsp'] == 6)
- {
- $read = 3;
- }
- if ($read > 0)
- {
- $microsecond = self::$BinLogPack->readIntBeBySize($read);
- if ($column['fsp'] % 2)
- {
- $time = (int)($microsecond / 10);
- }
- else
- {
- $time = $microsecond;
- }
- }
- return $time;
- }
-
- /**
- * TIME encoding for nonfractional part:
- * 1 bit sign (1= non-negative, 0= negative)
- * 1 bit unused (reserved for future extensions)
- * 10 bits hour (0-838)
- * 6 bits minute (0-59)
- * 6 bits second (0-59)
- * ---------------------
- * 24 bits = 3 bytes
- *
- * @param array $column
- * @return string
- */
- private static function _read_time2(array $column)
- {
- $data = self::$BinLogPack->readIntBeBySize(3);
-
- $hour = self::_read_binary_slice($data, 2, 10, 24);
- $minute = self::_read_binary_slice($data, 12, 6, 24);
- $second = self::_read_binary_slice($data, 18, 6, 24);
-
- $date = new \DateTime();
- $date->setTime($hour, $minute, $second);
-
- return $date->format('H:i:s') . self::_add_fsp_to_time($column);
- }
-
- /**
- * @return string
- */
- private static function _read_date()
- {
- $time = self::$BinLogPack->readUInt24();
- if (0 == $time)
- {
- return null;
- }
-
- $year = ($time & ((1 << 15) - 1) << 9) >> 9;
- $month = ($time & ((1 << 4) - 1) << 5) >> 5;
- $day = ($time & ((1 << 5) - 1));
- if ($year == 0 || $month == 0 || $day == 0)
- {
- return null;
- }
-
- $date = new \DateTime();
- $date->setDate($year, $month, $day);
- return $date->format('Y-m-d');
- }
-
- /**
- * Read MySQL BIT type
- * @param array $column
- * @return string
- */
- private static function _read_bit(array $column)
- {
- $res = '';
- for ($byte = 0; $byte < $column['bytes']; $byte++)
- {
- $current_byte = '';
- $data = self::$BinLogPack->readUInt8();
- if (0 === $byte)
- {
- if (1 === $column['bytes'])
- {
- $end = $column['bits'];
- }
- else
- {
- $end = $column['bits'] % 8;
- if (0 === $end)
- {
- $end = 8;
- }
- }
- }
- else
- {
- $end = 8;
- }
-
- for ($bit = 0; $bit < $end; $bit++)
- {
- if ($data & (1 << $bit))
- {
- $current_byte .= '1';
- }
- else
- {
- $current_byte .= '0';
- }
-
- }
- $res .= strrev($current_byte);
- }
-
- return $res;
- }
-
- /**
- * @param BinLogPack $pack
- * @param $eventInfo
- * @param $size
- * @param array $onlyTables
- * @param array $onlyDatabases
- * @return mixed
- */
- public static function delRow(BinLogPack $pack, $eventInfo, $size, array $onlyTables, array $onlyDatabases)
- {
- self::rowInit($pack, $eventInfo['type'], $size, $onlyTables, $onlyDatabases);
-
- if (false === self::$process)
- {
- return null;
- }
-
- $values = self::_getDelRows(['bitmap' => self::$BinLogPack->read(self::getColumnsAmount(self::$COLUMNS_NUM))]);
-
- return new DeleteRowsDTO(
- $eventInfo['date'],
- $eventInfo['pos'],
- $eventInfo['size'],
- $size,
- self::$SCHEMA_NAME,
- self::$TABLE_NAME,
- self::$COLUMNS_NUM,
- count($values),
- $values
- );
- }
-
- /**
- * @param array $result
- * @return array
- */
- private static function _getDelRows(array $result)
- {
- $rows = [];
- while (!self::$BinLogPack->isComplete(self::$PACK_SIZE))
- {
- $rows[] = self::_read_column_data($result['bitmap']);
- }
-
- return $rows;
- }
-
- /**
- * @param BinLogPack $pack
- * @param array $eventInfo
- * @param $size
- * @param array $onlyTables
- * @param array $onlyDatabases
- * @return mixed
- */
- public static function updateRow(BinLogPack $pack, array $eventInfo, $size, array $onlyTables, array $onlyDatabases)
- {
- self::rowInit($pack, $eventInfo['type'], $size, $onlyTables, $onlyDatabases);
-
- if (false === self::$process)
- {
- return null;
- }
-
- $len = self::getColumnsAmount(self::$COLUMNS_NUM);
-
- $values = self::_getUpdateRows(['bitmap1' => self::$BinLogPack->read($len), 'bitmap2' => self::$BinLogPack->read($len)]);
-
- return new UpdateRowsDTO(
- $eventInfo['date'],
- $eventInfo['pos'],
- $eventInfo['size'],
- $size,
- self::$SCHEMA_NAME,
- self::$TABLE_NAME,
- self::$COLUMNS_NUM,
- count($values),
- $values
-
- );
- }
-
- /**
- * @param array $result
- * @return array
- */
- private static function _getUpdateRows(array $result)
- {
- $rows = [];
- while (!self::$BinLogPack->isComplete(self::$PACK_SIZE))
- {
- $rows[] = [
- 'before' => self::_read_column_data($result['bitmap1']),
- 'after' => self::_read_column_data($result['bitmap2'])
- ];
- }
-
- return $rows;
- }
-}
diff --git a/src/MySQLReplication/DataBase/DBHelper.php b/src/MySQLReplication/Repository/MySQLRepository.php
similarity index 65%
rename from src/MySQLReplication/DataBase/DBHelper.php
rename to src/MySQLReplication/Repository/MySQLRepository.php
index 4f9ce03..fa840cf 100755
--- a/src/MySQLReplication/DataBase/DBHelper.php
+++ b/src/MySQLReplication/Repository/MySQLRepository.php
@@ -1,32 +1,28 @@
'',
- 'user' => $config->getUser(),
- 'password' => $config->getPassword(),
- 'host' => $config->getHost(),
- 'port' => $config->getPort(),
- 'driver' => 'pdo_mysql',
- ];
- $this->conn = DriverManager::getConnection($config);
+ $this->conn = $connection;
+ }
+
+ public function __destruct()
+ {
+ $this->conn->close();
}
/**
@@ -34,6 +30,11 @@ public function __construct(Config $config)
*/
public function getConnection()
{
+ if (false === $this->conn->ping())
+ {
+ $this->conn->close();
+ $this->conn->connect();
+ }
return $this->conn;
}
@@ -59,7 +60,7 @@ public function getFields($schema, $table)
AND
`table_name` = ?
";
- return $this->conn->fetchAll($sql, [$schema, $table]);
+ return $this->getConnection()->fetchAll($sql, [$schema, $table]);
}
/**
@@ -68,7 +69,7 @@ public function getFields($schema, $table)
public function isCheckSum()
{
$sql = "SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM'";
- $res = $this->conn->fetchAssoc($sql);
+ $res = $this->getConnection()->fetchAssoc($sql);
if ($res['Value'])
{
return true;
@@ -88,6 +89,6 @@ public function isCheckSum()
public function getMasterStatus()
{
$sql = "SHOW MASTER STATUS";
- return $this->conn->fetchAssoc($sql);
+ return $this->getConnection()->fetchAssoc($sql);
}
}
diff --git a/src/MySQLReplication/Service/BinLogStream.php b/src/MySQLReplication/Service/BinLogStream.php
deleted file mode 100755
index eb5d4be..0000000
--- a/src/MySQLReplication/Service/BinLogStream.php
+++ /dev/null
@@ -1,99 +0,0 @@
-dbHelper = new DBHelper($config);
- $this->connect = new Connect($config, $this->dbHelper, $gtID, $logFile, $logPos, $slave_id);
- $this->binLogPack = new BinLogPack($this->dbHelper);
-
- $this->ignoredEvents = $ignoredEvents;
- $this->onlyTables = $onlyTables;
- $this->onlyDatabases = $onlyDatabases;
- $this->onlyEvents = $onlyEvents;
- }
-
- /**
- * @return DeleteRowsDTO|EventDTO|GTIDLogDTO|QueryDTO|UpdateRowsDTO|WriteRowsDTO|TableMapDTO|null
- */
- public function analysisBinLog()
- {
- $result = $this->binLogPack->init(
- $this->connect->getPacket(),
- $this->connect->getCheckSum(),
- $this->onlyEvents,
- $this->ignoredEvents,
- $this->onlyTables,
- $this->onlyDatabases
- );
-
- if (!empty($result))
- {
- return $result;
- }
- return null;
- }
-}
\ No newline at end of file
diff --git a/tests/TypesTest.php b/tests/Integration/TypesTest.php
similarity index 87%
rename from tests/TypesTest.php
rename to tests/Integration/TypesTest.php
index e9340be..7b7ccb0 100755
--- a/tests/TypesTest.php
+++ b/tests/Integration/TypesTest.php
@@ -1,14 +1,13 @@
config = new Config('root', '192.168.1.100', 3306, 'root');
- $this->conn = (new DBHelper($this->config))->getConnection();
- }
-
- private function createAndInsertValue($create_query, $insert_query)
- {
- $binLogStream = new BinLogStream($this->config);
+ $config = (new ConfigService())->makeConfigFromArray([
+ 'user' => 'root',
+ 'host' => '192.168.1.100',
+ 'password' => 'root'
+ ]);
+ $this->binLogStream = new BinLogStream($config);
+ $this->conn = $this->binLogStream->getDbConnection();
$this->conn->exec("SET GLOBAL time_zone = 'UTC'");
$this->conn->exec("DROP DATABASE IF EXISTS " . $this->database);
$this->conn->exec("CREATE DATABASE " . $this->database);
$this->conn->exec("USE " . $this->database);
+ }
+
+
+ /**
+ * @param $create_query
+ * @param $insert_query
+ * @return \MySQLReplication\Event\DTO\DeleteRowsDTO|\MySQLReplication\Event\DTO\EventDTO|\MySQLReplication\Event\DTO\GTIDLogDTO|\MySQLReplication\Event\DTO\QueryDTO|\MySQLReplication\Event\DTO\RotateDTO|\MySQLReplication\Event\DTO\TableMapDTO|\MySQLReplication\Event\DTO\UpdateRowsDTO|\MySQLReplication\Event\DTO\WriteRowsDTO|\MySQLReplication\Event\DTO\XidDTO
+ * @throws \Doctrine\DBAL\DBALException
+ */
+ private function createAndInsertValue($create_query, $insert_query)
+ {
$this->conn->exec($create_query);
$this->conn->exec($insert_query);
- ;
+ $this->assertEquals(null, $this->binLogStream->getBinLogEvent());
+ $this->assertInstanceOf('MySQLReplication\Event\DTO\GTIDLogDTO', $this->binLogStream->getBinLogEvent());
+ $this->assertInstanceOf('MySQLReplication\Event\DTO\QueryDTO', $this->binLogStream->getBinLogEvent());
+ $this->assertInstanceOf('MySQLReplication\Event\DTO\GTIDLogDTO', $this->binLogStream->getBinLogEvent());
+ $this->assertInstanceOf('MySQLReplication\Event\DTO\QueryDTO', $this->binLogStream->getBinLogEvent());
+ $this->assertInstanceOf('MySQLReplication\Event\DTO\GTIDLogDTO', $this->binLogStream->getBinLogEvent());
+ $this->assertInstanceOf('MySQLReplication\Event\DTO\QueryDTO', $this->binLogStream->getBinLogEvent());
+ $this->assertInstanceOf('MySQLReplication\Event\DTO\GTIDLogDTO', $this->binLogStream->getBinLogEvent());
+ $this->assertInstanceOf('MySQLReplication\Event\DTO\QueryDTO', $this->binLogStream->getBinLogEvent());
+ $this->assertInstanceOf('MySQLReplication\Event\DTO\TableMapDTO', $this->binLogStream->getBinLogEvent());
- $this->assertEquals(null, $binLogStream->analysisBinLog());
- $this->assertInstanceOf('MySQLReplication\DTO\GTIDLogDTO', $binLogStream->analysisBinLog());
- $this->assertInstanceOf('MySQLReplication\DTO\QueryDTO', $binLogStream->analysisBinLog());
- $this->assertInstanceOf('MySQLReplication\DTO\GTIDLogDTO', $binLogStream->analysisBinLog());
- $this->assertInstanceOf('MySQLReplication\DTO\QueryDTO', $binLogStream->analysisBinLog());
- $this->assertInstanceOf('MySQLReplication\DTO\GTIDLogDTO', $binLogStream->analysisBinLog());
- $this->assertInstanceOf('MySQLReplication\DTO\QueryDTO', $binLogStream->analysisBinLog());
- $this->assertInstanceOf('MySQLReplication\DTO\GTIDLogDTO', $binLogStream->analysisBinLog());
- $this->assertInstanceOf('MySQLReplication\DTO\QueryDTO', $binLogStream->analysisBinLog());
- $this->assertInstanceOf('MySQLReplication\DTO\TableMapDTO', $binLogStream->analysisBinLog());
- return $binLogStream->analysisBinLog();
+ return $this->binLogStream->getBinLogEvent();
}
/**
@@ -532,20 +541,20 @@ public function shouldBeVarChar()
*/
public function shouldBeBit()
{
- $create_query = "CREATE TABLE test (test BIT(6),
- test2 BIT(16),
- test3 BIT(12),
- test4 BIT(9),
- test5 BIT(64)
- );
- ";
+ $create_query = "CREATE TABLE test (
+ test BIT(6),
+ test2 BIT(16),
+ test3 BIT(12),
+ test4 BIT(9),
+ test5 BIT(64)
+ );";
$insert_query = "INSERT INTO test VALUES(
- b'100010',
- b'1000101010111000',
- b'100010101101',
- b'101100111',
- b'1101011010110100100111100011010100010100101110111011101011011010')
- ";
+ b'100010',
+ b'1000101010111000',
+ b'100010101101',
+ b'101100111',
+ b'1101011010110100100111100011010100010100101110111011101011011010'
+ )";
$event = $this->createAndInsertValue($create_query, $insert_query);