From db8abe9bd7e0f7c9862fcd468f05448426b01b95 Mon Sep 17 00:00:00 2001 From: 21 <1192138141@qq.com> Date: Fri, 20 May 2022 14:23:09 +0800 Subject: [PATCH 1/8] add continue shield ddl command --- .gitignore | 3 +- .idea/21-mysql-es.iml | 9 +++ .idea/modules.xml | 8 +++ .idea/vcs.xml | 6 ++ .idea/workspace.xml | 94 ++++++++++++++++++++++++++ etc/river.toml | 149 ------------------------------------------ go.sum | 69 +++++++++++++++++++ river/rule.go | 33 ++++++++++ river/sync.go | 6 ++ var/master.info | 2 + 10 files changed, 229 insertions(+), 150 deletions(-) create mode 100644 .idea/21-mysql-es.iml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml create mode 100644 .idea/workspace.xml delete mode 100644 etc/river.toml create mode 100644 var/master.info diff --git a/.gitignore b/.gitignore index c5e82d74..8cbb47ee 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -bin \ No newline at end of file +bin +etc/ \ No newline at end of file diff --git a/.idea/21-mysql-es.iml b/.idea/21-mysql-es.iml new file mode 100644 index 00000000..5e764c4f --- /dev/null +++ b/.idea/21-mysql-es.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 00000000..27f1ee36 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 00000000..94a25f7f --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/workspace.xml b/.idea/workspace.xml new file mode 100644 index 00000000..16f40dc8 --- /dev/null +++ b/.idea/workspace.xml @@ -0,0 +1,94 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + true + + + + + + file://$PROJECT_DIR$/river/sync.go + 67 + + + + + \ No newline at end of file diff --git a/etc/river.toml b/etc/river.toml deleted file mode 100644 index c390c862..00000000 --- a/etc/river.toml +++ /dev/null @@ -1,149 +0,0 @@ -# MySQL address, user and password -# user must have replication privilege in MySQL. -my_addr = "127.0.0.1:3306" -my_user = "root" -my_pass = "" -my_charset = "utf8" - -# Set true when elasticsearch use https -#es_https = false -# Elasticsearch address -es_addr = "127.0.0.1:9200" -# Elasticsearch user and password, maybe set by shield, nginx, or x-pack -es_user = "" -es_pass = "" - -# Path to store data, like master.info, if not set or empty, -# we must use this to support breakpoint resume syncing. -# TODO: support other storage, like etcd. -data_dir = "./var" - -# Inner Http status address -stat_addr = "127.0.0.1:12800" -stat_path = "/metrics" - -# pseudo server id like a slave -server_id = 1001 - -# mysql or mariadb -flavor = "mysql" - -# mysqldump execution path -# if not set or empty, ignore mysqldump. -mysqldump = "mysqldump" - -# if we have no privilege to use mysqldump with --master-data, -# we must skip it. -#skip_master_data = false - -# minimal items to be inserted in one bulk -bulk_size = 128 - -# force flush the pending requests if we don't have enough items >= bulk_size -flush_bulk_time = "200ms" - -# Ignore table without primary key -skip_no_pk_table = false - -# MySQL data source -[[source]] -schema = "test" - -# Only below tables will be synced into Elasticsearch. -# "t_[0-9]{4}" is a wildcard table format, you can use it if you have many sub tables, like table_0000 - table_1023 -# I don't think it is necessary to sync all tables in a database. -tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"] - -# Below is for special rule mapping - -# Very simple example -# -# desc t; -# +-------+--------------+------+-----+---------+-------+ -# | Field | Type | Null | Key | Default | Extra | -# +-------+--------------+------+-----+---------+-------+ -# | id | int(11) | NO | PRI | NULL | | -# | name | varchar(256) | YES | | NULL | | -# +-------+--------------+------+-----+---------+-------+ -# -# The table `t` will be synced to ES index `test` and type `t`. -[[rule]] -schema = "test" -table = "t" -index = "test" -type = "t" - -# Wildcard table rule, the wildcard table must be in source tables -# All tables which match the wildcard format will be synced to ES index `test` and type `t`. -# In this example, all tables must have same schema with above table `t`; -[[rule]] -schema = "test" -table = "t_[0-9]{4}" -index = "test" -type = "t" - -# Simple field rule -# -# desc tfield; -# +----------+--------------+------+-----+---------+-------+ -# | Field | Type | Null | Key | Default | Extra | -# +----------+--------------+------+-----+---------+-------+ -# | id | int(11) | NO | PRI | NULL | | -# | tags | varchar(256) | YES | | NULL | | -# | keywords | varchar(256) | YES | | NULL | | -# +----------+--------------+------+-----+---------+-------+ -# -[[rule]] -schema = "test" -table = "tfield" -index = "test" -type = "tfield" - -[rule.field] -# Map column `id` to ES field `es_id` -id="es_id" -# Map column `tags` to ES field `es_tags` with array type -tags="es_tags,list" -# Map column `keywords` to ES with array type -keywords=",list" - -# Filter rule -# -# desc tfilter; -# +-------+--------------+------+-----+---------+-------+ -# | Field | Type | Null | Key | Default | Extra | -# +-------+--------------+------+-----+---------+-------+ -# | id | int(11) | NO | PRI | NULL | | -# | c1 | int(11) | YES | | 0 | | -# | c2 | int(11) | YES | | 0 | | -# | name | varchar(256) | YES | | NULL | | -# +-------+--------------+------+-----+---------+-------+ -# -[[rule]] -schema = "test" -table = "tfilter" -index = "test" -type = "tfilter" - -# Only sync following columns -filter = ["id", "name"] - -# id rule -# -# desc tid_[0-9]{4}; -# +----------+--------------+------+-----+---------+-------+ -# | Field | Type | Null | Key | Default | Extra | -# +----------+--------------+------+-----+---------+-------+ -# | id | int(11) | NO | PRI | NULL | | -# | tag | varchar(256) | YES | | NULL | | -# | desc | varchar(256) | YES | | NULL | | -# +----------+--------------+------+-----+---------+-------+ -# -[[rule]] -schema = "test" -table = "tid_[0-9]{4}" -index = "test" -type = "t" -# The es doc's id will be `id`:`tag` -# It is useful for merge muliple table into one type while theses tables have same PK -id = ["id", "tag"] diff --git a/go.sum b/go.sum index 97c0717c..effd277d 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,54 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= +github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= +github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= +github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= +github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/juju/errors v0.0.0-20190207033735-e65537c515d7 h1:dMIPRDg6gi7CUp0Kj2+HxqJ5kTr1iAdzsXYIrLCNSmU= github.com/juju/errors v0.0.0-20190207033735-e65537c515d7/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q= +github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= +github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ= github.com/pingcap/errors v0.11.0 h1:DCJQB8jrHbQ1VVlMFIrbj2ApScNNotVmkSNplu2yUt4= github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= +github.com/pingcap/parser v0.0.0-20190506092653-e336082eb825 h1:U9Kdnknj4n2v76Mg7wazevZ5N9U1OIaMwSNRVLEcLX0= +github.com/pingcap/parser v0.0.0-20190506092653-e336082eb825/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA= +github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330 h1:rRMLMjIMFulCX9sGKZ1hoov/iROMsKyC8Snc02nSukw= +github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_golang v0.9.3 h1:9iH4JKXLzFbOAdtqv/a+j8aewx2Y8lAjAydhbaScPF8= +github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/common v0.4.0 h1:7etb9YClo3a6HjLzfl6rIQaU+FDfi0VSX39io3aQ+DM= +github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= +github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084 h1:sofwID9zm4tzrgykg80hfFph1mryUeLRsUfoocVVmRY= +github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 h1:pntxY8Ary0t43dCZ5dqY4YTJCObLY1kIXl0uzMv+7DE= @@ -12,7 +57,31 @@ github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 h1:xT+JlYxNGqyT+XcU8 github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw= github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 h1:oI+RNwuC9jF2g2lP0u0cVEEZrc/AYBCuFdvwrLWM/6Q= github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07/go.mod h1:yFdBgwXP24JziuRl2NMUahT7nGLNOKi1SIiFxMttVD4= +github.com/siddontang/go-log v0.0.0-20190221022429-1e957dd83bed h1:KMgQoLJGCq1IoZpLZE3AIffh9veYWoVlsvA4ib55TMM= +github.com/siddontang/go-log v0.0.0-20190221022429-1e957dd83bed/go.mod h1:yFdBgwXP24JziuRl2NMUahT7nGLNOKi1SIiFxMttVD4= github.com/siddontang/go-mysql v0.0.0-20190123011128-88e9cd7f6643 h1:yzg8+Cip1iDhy6GGS1zKflqOybgRc4xp82eYwQrP+DU= github.com/siddontang/go-mysql v0.0.0-20190123011128-88e9cd7f6643/go.mod h1:/b8ZcWjAShCcHp2dWpjb1vTlNyiG03UeHEQr2jteOpI= github.com/siddontang/go-mysql v0.0.0-20190303113352-670f74e8daf5 h1:5Nr7spTeY+ziXzqk/9p+GLnvH4rIjp9BX+aRaYDbR44= github.com/siddontang/go-mysql v0.0.0-20190303113352-670f74e8daf5/go.mod h1:/b8ZcWjAShCcHp2dWpjb1vTlNyiG03UeHEQr2jteOpI= +github.com/siddontang/go-mysql v0.0.0-20190524062908-de6c3a84bcbe h1:HMx2v6kZjEt9CGzdpIubWsEEi3j8LCoBPMv2M98NxqU= +github.com/siddontang/go-mysql v0.0.0-20190524062908-de6c3a84bcbe/go.mod h1:Bl4lryU44qtIXEXNbP0k0pD646Nkw/qHn21wfZVGJx4= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k= +github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5 h1:mzjBh+S5frKOsOBobWIMAbXavqjmgO17k/2puhcFR94= +golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/river/rule.go b/river/rule.go index fb7cccb7..94c62413 100644 --- a/river/rule.go +++ b/river/rule.go @@ -32,6 +32,9 @@ type Rule struct { // Elasticsearch pipeline // To pre-process documents before indexing Pipeline string `toml:"pipeline"` + + // shield ddl command + ShieldDDL string `toml:"shield_ddl"` } func newDefaultRule(schema string, table string) *Rule { @@ -83,3 +86,33 @@ func (r *Rule) CheckFilter(field string) bool { } return false } + +// CheckHasShieldCommands Check Has Shield commands +func (r *Rule) CheckHasShieldCommands() bool { + return len(r.ShieldDDL) > 0 +} + +// CheckContinueShieldCommand check continue command +func (r *Rule) CheckContinueShieldCommand(action string) bool { + if len(r.ShieldDDL) < 1 { + return false + } + // get command []string + shieldDDLs := r.getManyShieldCommands() + for _, v := range shieldDDLs { + // action == shieldDDL + if v == action { + return true + } + } + return false +} + +func (r *Rule) getManyShieldCommands() []string { + // not have shield DDL command + if len(r.ShieldDDL) < 0 { + return []string{} + } + // command example delete,update + return strings.Split(r.ShieldDDL, ",") +} diff --git a/river/sync.go b/river/sync.go index acbcc8b0..8527a080 100644 --- a/river/sync.go +++ b/river/sync.go @@ -70,6 +70,12 @@ func (h *eventHandler) OnRow(e *canal.RowsEvent) error { return nil } + // check continue command + if rule.CheckContinueShieldCommand(e.Action) { + log.Infof("rules have continue ddl command , command event is (%v) , rows (%v)", e.Action, e.Rows) + return nil + } + var reqs []*elastic.BulkRequest var err error switch e.Action { diff --git a/var/master.info b/var/master.info new file mode 100644 index 00000000..91c66be1 --- /dev/null +++ b/var/master.info @@ -0,0 +1,2 @@ +bin_name = "binlog.000397" +bin_pos = 102631045 From 40aede7475d6ec7deba1c7a9d1c1e9b1c5108aca Mon Sep 17 00:00:00 2001 From: 21 <1192138141@qq.com> Date: Fri, 20 May 2022 14:23:25 +0800 Subject: [PATCH 2/8] add continue shield ddl command --- .gitignore | 3 +- .idea/21-mysql-es.iml | 9 ----- .idea/modules.xml | 8 ---- .idea/vcs.xml | 6 --- .idea/workspace.xml | 94 ------------------------------------------- var/master.info | 2 +- 6 files changed, 3 insertions(+), 119 deletions(-) delete mode 100644 .idea/21-mysql-es.iml delete mode 100644 .idea/modules.xml delete mode 100644 .idea/vcs.xml delete mode 100644 .idea/workspace.xml diff --git a/.gitignore b/.gitignore index 8cbb47ee..0aee042c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ bin -etc/ \ No newline at end of file +etc/ +.idea \ No newline at end of file diff --git a/.idea/21-mysql-es.iml b/.idea/21-mysql-es.iml deleted file mode 100644 index 5e764c4f..00000000 --- a/.idea/21-mysql-es.iml +++ /dev/null @@ -1,9 +0,0 @@ - - - - - - - - - \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml deleted file mode 100644 index 27f1ee36..00000000 --- a/.idea/modules.xml +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml deleted file mode 100644 index 94a25f7f..00000000 --- a/.idea/vcs.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/.idea/workspace.xml b/.idea/workspace.xml deleted file mode 100644 index 16f40dc8..00000000 --- a/.idea/workspace.xml +++ /dev/null @@ -1,94 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - true - - - - - - file://$PROJECT_DIR$/river/sync.go - 67 - - - - - \ No newline at end of file diff --git a/var/master.info b/var/master.info index 91c66be1..2f73c1c6 100644 --- a/var/master.info +++ b/var/master.info @@ -1,2 +1,2 @@ bin_name = "binlog.000397" -bin_pos = 102631045 +bin_pos = 102674450 From 0b488b071c874a0a7619391b077d0f1bfb8cb959 Mon Sep 17 00:00:00 2001 From: 21 <1192138141@qq.com> Date: Fri, 20 May 2022 14:26:00 +0800 Subject: [PATCH 3/8] remove var file path --- .gitignore | 3 ++- var/master.info | 2 -- 2 files changed, 2 insertions(+), 3 deletions(-) delete mode 100644 var/master.info diff --git a/.gitignore b/.gitignore index 0aee042c..72c85dc2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ bin etc/ -.idea \ No newline at end of file +.idea +var \ No newline at end of file diff --git a/var/master.info b/var/master.info deleted file mode 100644 index 2f73c1c6..00000000 --- a/var/master.info +++ /dev/null @@ -1,2 +0,0 @@ -bin_name = "binlog.000397" -bin_pos = 102674450 From 47372b83e2fb8af68e08e079c856b60cd4da7a7c Mon Sep 17 00:00:00 2001 From: 21 <1192138141@qq.com> Date: Fri, 20 May 2022 14:33:10 +0800 Subject: [PATCH 4/8] add README.md continue mysql ddl commands --- README.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/README.md b/README.md index 05a4878d..52a4bc4b 100644 --- a/README.md +++ b/README.md @@ -174,6 +174,22 @@ filter = ["id", "name"] In the above example, we will only sync MySQL table tfiler's columns `id` and `name` to Elasticsearch. +## Filter mysql ddl commands +You can use `shield_ddl` to continue mysql DDL command + +``` +[[rule]] +schema = "choujiang" +table = "c_21" +index = "" +type = "" +# shield ddl mysql command +# example continue one command : delete +# example continue many command : delete,update +shield_ddl = "delete" +``` +In the above example , we will continue mysql delete command + ## Ignore table without a primary key When you sync table without a primary key, you can see below error message. ``` From 21b76ba5c9016e902c207a4eb976f0e03fb277aa Mon Sep 17 00:00:00 2001 From: 21 <1192138141@qq.com> Date: Fri, 20 May 2022 14:36:30 +0800 Subject: [PATCH 5/8] update skip mysql ddl func name --- README.md | 8 ++++---- river/rule.go | 4 ++-- river/sync.go | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 52a4bc4b..504723f4 100644 --- a/README.md +++ b/README.md @@ -175,7 +175,7 @@ filter = ["id", "name"] In the above example, we will only sync MySQL table tfiler's columns `id` and `name` to Elasticsearch. ## Filter mysql ddl commands -You can use `shield_ddl` to continue mysql DDL command +You can use `shield_ddl` to skip mysql DDL command ``` [[rule]] @@ -184,11 +184,11 @@ table = "c_21" index = "" type = "" # shield ddl mysql command -# example continue one command : delete -# example continue many command : delete,update +# example skip one command : delete +# example skip many command : delete,update shield_ddl = "delete" ``` -In the above example , we will continue mysql delete command +In the above example , we will skip mysql delete command ## Ignore table without a primary key When you sync table without a primary key, you can see below error message. diff --git a/river/rule.go b/river/rule.go index 94c62413..8bd56af9 100644 --- a/river/rule.go +++ b/river/rule.go @@ -92,8 +92,8 @@ func (r *Rule) CheckHasShieldCommands() bool { return len(r.ShieldDDL) > 0 } -// CheckContinueShieldCommand check continue command -func (r *Rule) CheckContinueShieldCommand(action string) bool { +// CheckSkipShieldCommand check continue command +func (r *Rule) CheckSkipShieldCommand(action string) bool { if len(r.ShieldDDL) < 1 { return false } diff --git a/river/sync.go b/river/sync.go index 8527a080..3359d5d5 100644 --- a/river/sync.go +++ b/river/sync.go @@ -71,7 +71,7 @@ func (h *eventHandler) OnRow(e *canal.RowsEvent) error { } // check continue command - if rule.CheckContinueShieldCommand(e.Action) { + if rule.CheckSkipShieldCommand(e.Action) { log.Infof("rules have continue ddl command , command event is (%v) , rows (%v)", e.Action, e.Rows) return nil } From 205ec479f99601af4e7d67bb8894297af92b1366 Mon Sep 17 00:00:00 2001 From: 21 <1192138141@qq.com> Date: Fri, 20 May 2022 14:38:58 +0800 Subject: [PATCH 6/8] update river.toml --- .gitignore | 1 - etc/river.toml | 149 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 149 insertions(+), 1 deletion(-) create mode 100644 etc/river.toml diff --git a/.gitignore b/.gitignore index 72c85dc2..7ed44333 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,3 @@ bin -etc/ .idea var \ No newline at end of file diff --git a/etc/river.toml b/etc/river.toml new file mode 100644 index 00000000..23093910 --- /dev/null +++ b/etc/river.toml @@ -0,0 +1,149 @@ +# MySQL address, user and password +# user must have replication privilege in MySQL. +my_addr = "127.0.0.1:3306" +my_user = "root" +my_pass = "" +my_charset = "utf8" + +# Set true when elasticsearch use https +#es_https = false +# Elasticsearch address +es_addr = "127.0.0.1:9200" +# Elasticsearch user and password, maybe set by shield, nginx, or x-pack +es_user = "" +es_pass = "" + +# Path to store data, like master.info, if not set or empty, +# we must use this to support breakpoint resume syncing. +# TODO: support other storage, like etcd. +data_dir = "./var" + +# Inner Http status address +stat_addr = "127.0.0.1:12800" +stat_path = "/metrics" + +# pseudo server id like a slave +server_id = 1001 + +# mysql or mariadb +flavor = "mysql" + +# mysqldump execution path +# if not set or empty, ignore mysqldump. +mysqldump = "mysqldump" + +# if we have no privilege to use mysqldump with --master-data, +# we must skip it. +#skip_master_data = false + +# minimal items to be inserted in one bulk +bulk_size = 128 + +# force flush the pending requests if we don't have enough items >= bulk_size +flush_bulk_time = "200ms" + +# Ignore table without primary key +skip_no_pk_table = false + +# MySQL data source +[[source]] +schema = "test" + +# Only below tables will be synced into Elasticsearch. +# "t_[0-9]{4}" is a wildcard table format, you can use it if you have many sub tables, like table_0000 - table_1023 +# I don't think it is necessary to sync all tables in a database. +tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"] + +# Below is for special rule mapping + +# Very simple example +# +# desc t; +# +-------+--------------+------+-----+---------+-------+ +# | Field | Type | Null | Key | Default | Extra | +# +-------+--------------+------+-----+---------+-------+ +# | id | int(11) | NO | PRI | NULL | | +# | name | varchar(256) | YES | | NULL | | +# +-------+--------------+------+-----+---------+-------+ +# +# The table `t` will be synced to ES index `test` and type `t`. +[[rule]] +schema = "test" +table = "t" +index = "test" +type = "t" + +# Wildcard table rule, the wildcard table must be in source tables +# All tables which match the wildcard format will be synced to ES index `test` and type `t`. +# In this example, all tables must have same schema with above table `t`; +[[rule]] +schema = "test" +table = "t_[0-9]{4}" +index = "test" +type = "t" + +# Simple field rule +# +# desc tfield; +# +----------+--------------+------+-----+---------+-------+ +# | Field | Type | Null | Key | Default | Extra | +# +----------+--------------+------+-----+---------+-------+ +# | id | int(11) | NO | PRI | NULL | | +# | tags | varchar(256) | YES | | NULL | | +# | keywords | varchar(256) | YES | | NULL | | +# +----------+--------------+------+-----+---------+-------+ +# +[[rule]] +schema = "test" +table = "tfield" +index = "test" +type = "tfield" + +[rule.field] +# Map column `id` to ES field `es_id` +id="es_id" +# Map column `tags` to ES field `es_tags` with array type +tags="es_tags,list" +# Map column `keywords` to ES with array type +keywords=",list" + +# Filter rule +# +# desc tfilter; +# +-------+--------------+------+-----+---------+-------+ +# | Field | Type | Null | Key | Default | Extra | +# +-------+--------------+------+-----+---------+-------+ +# | id | int(11) | NO | PRI | NULL | | +# | c1 | int(11) | YES | | 0 | | +# | c2 | int(11) | YES | | 0 | | +# | name | varchar(256) | YES | | NULL | | +# +-------+--------------+------+-----+---------+-------+ +# +[[rule]] +schema = "test" +table = "tfilter" +index = "test" +type = "tfilter" + +# Only sync following columns +filter = ["id", "name"] + +# id rule +# +# desc tid_[0-9]{4}; +# +----------+--------------+------+-----+---------+-------+ +# | Field | Type | Null | Key | Default | Extra | +# +----------+--------------+------+-----+---------+-------+ +# | id | int(11) | NO | PRI | NULL | | +# | tag | varchar(256) | YES | | NULL | | +# | desc | varchar(256) | YES | | NULL | | +# +----------+--------------+------+-----+---------+-------+ +# +[[rule]] +schema = "test" +table = "tid_[0-9]{4}" +index = "test" +type = "t" +# The es doc's id will be `id`:`tag` +# It is useful for merge muliple table into one type while theses tables have same PK +id = ["id", "tag"] \ No newline at end of file From 5d1df6ca3b42d84de5ff6c63728d452be7a572f7 Mon Sep 17 00:00:00 2001 From: 21 <1192138141@qq.com> Date: Fri, 20 May 2022 14:41:00 +0800 Subject: [PATCH 7/8] update .gitignore --- .gitignore | 4 +- etc/river.toml | 149 ------------------------------------------------- go.sum | 87 ----------------------------- 3 files changed, 3 insertions(+), 237 deletions(-) delete mode 100644 etc/river.toml delete mode 100644 go.sum diff --git a/.gitignore b/.gitignore index 7ed44333..837e5f52 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ bin +etc/ .idea -var \ No newline at end of file +var +go.sum \ No newline at end of file diff --git a/etc/river.toml b/etc/river.toml deleted file mode 100644 index 23093910..00000000 --- a/etc/river.toml +++ /dev/null @@ -1,149 +0,0 @@ -# MySQL address, user and password -# user must have replication privilege in MySQL. -my_addr = "127.0.0.1:3306" -my_user = "root" -my_pass = "" -my_charset = "utf8" - -# Set true when elasticsearch use https -#es_https = false -# Elasticsearch address -es_addr = "127.0.0.1:9200" -# Elasticsearch user and password, maybe set by shield, nginx, or x-pack -es_user = "" -es_pass = "" - -# Path to store data, like master.info, if not set or empty, -# we must use this to support breakpoint resume syncing. -# TODO: support other storage, like etcd. -data_dir = "./var" - -# Inner Http status address -stat_addr = "127.0.0.1:12800" -stat_path = "/metrics" - -# pseudo server id like a slave -server_id = 1001 - -# mysql or mariadb -flavor = "mysql" - -# mysqldump execution path -# if not set or empty, ignore mysqldump. -mysqldump = "mysqldump" - -# if we have no privilege to use mysqldump with --master-data, -# we must skip it. -#skip_master_data = false - -# minimal items to be inserted in one bulk -bulk_size = 128 - -# force flush the pending requests if we don't have enough items >= bulk_size -flush_bulk_time = "200ms" - -# Ignore table without primary key -skip_no_pk_table = false - -# MySQL data source -[[source]] -schema = "test" - -# Only below tables will be synced into Elasticsearch. -# "t_[0-9]{4}" is a wildcard table format, you can use it if you have many sub tables, like table_0000 - table_1023 -# I don't think it is necessary to sync all tables in a database. -tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"] - -# Below is for special rule mapping - -# Very simple example -# -# desc t; -# +-------+--------------+------+-----+---------+-------+ -# | Field | Type | Null | Key | Default | Extra | -# +-------+--------------+------+-----+---------+-------+ -# | id | int(11) | NO | PRI | NULL | | -# | name | varchar(256) | YES | | NULL | | -# +-------+--------------+------+-----+---------+-------+ -# -# The table `t` will be synced to ES index `test` and type `t`. -[[rule]] -schema = "test" -table = "t" -index = "test" -type = "t" - -# Wildcard table rule, the wildcard table must be in source tables -# All tables which match the wildcard format will be synced to ES index `test` and type `t`. -# In this example, all tables must have same schema with above table `t`; -[[rule]] -schema = "test" -table = "t_[0-9]{4}" -index = "test" -type = "t" - -# Simple field rule -# -# desc tfield; -# +----------+--------------+------+-----+---------+-------+ -# | Field | Type | Null | Key | Default | Extra | -# +----------+--------------+------+-----+---------+-------+ -# | id | int(11) | NO | PRI | NULL | | -# | tags | varchar(256) | YES | | NULL | | -# | keywords | varchar(256) | YES | | NULL | | -# +----------+--------------+------+-----+---------+-------+ -# -[[rule]] -schema = "test" -table = "tfield" -index = "test" -type = "tfield" - -[rule.field] -# Map column `id` to ES field `es_id` -id="es_id" -# Map column `tags` to ES field `es_tags` with array type -tags="es_tags,list" -# Map column `keywords` to ES with array type -keywords=",list" - -# Filter rule -# -# desc tfilter; -# +-------+--------------+------+-----+---------+-------+ -# | Field | Type | Null | Key | Default | Extra | -# +-------+--------------+------+-----+---------+-------+ -# | id | int(11) | NO | PRI | NULL | | -# | c1 | int(11) | YES | | 0 | | -# | c2 | int(11) | YES | | 0 | | -# | name | varchar(256) | YES | | NULL | | -# +-------+--------------+------+-----+---------+-------+ -# -[[rule]] -schema = "test" -table = "tfilter" -index = "test" -type = "tfilter" - -# Only sync following columns -filter = ["id", "name"] - -# id rule -# -# desc tid_[0-9]{4}; -# +----------+--------------+------+-----+---------+-------+ -# | Field | Type | Null | Key | Default | Extra | -# +----------+--------------+------+-----+---------+-------+ -# | id | int(11) | NO | PRI | NULL | | -# | tag | varchar(256) | YES | | NULL | | -# | desc | varchar(256) | YES | | NULL | | -# +----------+--------------+------+-----+---------+-------+ -# -[[rule]] -schema = "test" -table = "tid_[0-9]{4}" -index = "test" -type = "t" -# The es doc's id will be `id`:`tag` -# It is useful for merge muliple table into one type while theses tables have same PK -id = ["id", "tag"] \ No newline at end of file diff --git a/go.sum b/go.sum deleted file mode 100644 index effd277d..00000000 --- a/go.sum +++ /dev/null @@ -1,87 +0,0 @@ -github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= -github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= -github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= -github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= -github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= -github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= -github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= -github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= -github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= -github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= -github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= -github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= -github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= -github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/juju/errors v0.0.0-20190207033735-e65537c515d7 h1:dMIPRDg6gi7CUp0Kj2+HxqJ5kTr1iAdzsXYIrLCNSmU= -github.com/juju/errors v0.0.0-20190207033735-e65537c515d7/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q= -github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= -github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= -github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= -github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= -github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= -github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= -github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= -github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ= -github.com/pingcap/errors v0.11.0 h1:DCJQB8jrHbQ1VVlMFIrbj2ApScNNotVmkSNplu2yUt4= -github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= -github.com/pingcap/parser v0.0.0-20190506092653-e336082eb825 h1:U9Kdnknj4n2v76Mg7wazevZ5N9U1OIaMwSNRVLEcLX0= -github.com/pingcap/parser v0.0.0-20190506092653-e336082eb825/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA= -github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330 h1:rRMLMjIMFulCX9sGKZ1hoov/iROMsKyC8Snc02nSukw= -github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= -github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= -github.com/prometheus/client_golang v0.9.3 h1:9iH4JKXLzFbOAdtqv/a+j8aewx2Y8lAjAydhbaScPF8= -github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso= -github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= -github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE= -github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= -github.com/prometheus/common v0.4.0 h1:7etb9YClo3a6HjLzfl6rIQaU+FDfi0VSX39io3aQ+DM= -github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= -github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= -github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084 h1:sofwID9zm4tzrgykg80hfFph1mryUeLRsUfoocVVmRY= -github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= -github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= -github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= -github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= -github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 h1:pntxY8Ary0t43dCZ5dqY4YTJCObLY1kIXl0uzMv+7DE= -github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= -github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 h1:xT+JlYxNGqyT+XcU8iUrN18JYed2TvG9yN5ULG2jATM= -github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw= -github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 h1:oI+RNwuC9jF2g2lP0u0cVEEZrc/AYBCuFdvwrLWM/6Q= -github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07/go.mod h1:yFdBgwXP24JziuRl2NMUahT7nGLNOKi1SIiFxMttVD4= -github.com/siddontang/go-log v0.0.0-20190221022429-1e957dd83bed h1:KMgQoLJGCq1IoZpLZE3AIffh9veYWoVlsvA4ib55TMM= -github.com/siddontang/go-log v0.0.0-20190221022429-1e957dd83bed/go.mod h1:yFdBgwXP24JziuRl2NMUahT7nGLNOKi1SIiFxMttVD4= -github.com/siddontang/go-mysql v0.0.0-20190123011128-88e9cd7f6643 h1:yzg8+Cip1iDhy6GGS1zKflqOybgRc4xp82eYwQrP+DU= -github.com/siddontang/go-mysql v0.0.0-20190123011128-88e9cd7f6643/go.mod h1:/b8ZcWjAShCcHp2dWpjb1vTlNyiG03UeHEQr2jteOpI= -github.com/siddontang/go-mysql v0.0.0-20190303113352-670f74e8daf5 h1:5Nr7spTeY+ziXzqk/9p+GLnvH4rIjp9BX+aRaYDbR44= -github.com/siddontang/go-mysql v0.0.0-20190303113352-670f74e8daf5/go.mod h1:/b8ZcWjAShCcHp2dWpjb1vTlNyiG03UeHEQr2jteOpI= -github.com/siddontang/go-mysql v0.0.0-20190524062908-de6c3a84bcbe h1:HMx2v6kZjEt9CGzdpIubWsEEi3j8LCoBPMv2M98NxqU= -github.com/siddontang/go-mysql v0.0.0-20190524062908-de6c3a84bcbe/go.mod h1:Bl4lryU44qtIXEXNbP0k0pD646Nkw/qHn21wfZVGJx4= -github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= -github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k= -github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= -github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= -github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5 h1:mzjBh+S5frKOsOBobWIMAbXavqjmgO17k/2puhcFR94= -golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= -golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= From 7f7399cb7f21c4e4f9782bd866f31860c858201e Mon Sep 17 00:00:00 2001 From: 21 <1192138141@qq.com> Date: Fri, 20 May 2022 14:42:50 +0800 Subject: [PATCH 8/8] update .gitignore --- .gitignore | 1 - etc/river.toml | 150 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 150 insertions(+), 1 deletion(-) create mode 100644 etc/river.toml diff --git a/.gitignore b/.gitignore index 837e5f52..387675ab 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,4 @@ bin -etc/ .idea var go.sum \ No newline at end of file diff --git a/etc/river.toml b/etc/river.toml new file mode 100644 index 00000000..5bc30d9d --- /dev/null +++ b/etc/river.toml @@ -0,0 +1,150 @@ +# MySQL address, user and password +# user must have replication privilege in MySQL. +my_addr = "127.0.0.1:3306" +my_user = "root" +my_pass = "" +my_charset = "utf8" + +# Set true when elasticsearch use https +#es_https = false +# Elasticsearch address +es_addr = "127.0.0.1:9200" +# Elasticsearch user and password, maybe set by shield, nginx, or x-pack +es_user = "" +es_pass = "" + +# Path to store data, like master.info, if not set or empty, +# we must use this to support breakpoint resume syncing. +# TODO: support other storage, like etcd. +data_dir = "./var" + +# Inner Http status address +stat_addr = "127.0.0.1:12800" +stat_path = "/metrics" + +# pseudo server id like a slave +server_id = 1001 + +# mysql or mariadb +flavor = "mysql" + +# mysqldump execution path +# if not set or empty, ignore mysqldump. +mysqldump = "mysqldump" + +# if we have no privilege to use mysqldump with --master-data, +# we must skip it. +#skip_master_data = false + +# minimal items to be inserted in one bulk +bulk_size = 128 + +# force flush the pending requests if we don't have enough items >= bulk_size +flush_bulk_time = "200ms" + +# Ignore table without primary key +skip_no_pk_table = false + +# MySQL data source +[[source]] +schema = "test" + +# Only below tables will be synced into Elasticsearch. +# "t_[0-9]{4}" is a wildcard table format, you can use it if you have many sub tables, like table_0000 - table_1023 +# I don't think it is necessary to sync all tables in a database. +tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"] + +# Below is for special rule mapping + +# Very simple example +# +# desc t; +# +-------+--------------+------+-----+---------+-------+ +# | Field | Type | Null | Key | Default | Extra | +# +-------+--------------+------+-----+---------+-------+ +# | id | int(11) | NO | PRI | NULL | | +# | name | varchar(256) | YES | | NULL | | +# +-------+--------------+------+-----+---------+-------+ +# +# The table `t` will be synced to ES index `test` and type `t`. +[[rule]] +schema = "test" +table = "t" +index = "test" +type = "t" +shield_ddl = "" + +# Wildcard table rule, the wildcard table must be in source tables +# All tables which match the wildcard format will be synced to ES index `test` and type `t`. +# In this example, all tables must have same schema with above table `t`; +[[rule]] +schema = "test" +table = "t_[0-9]{4}" +index = "test" +type = "t" + +# Simple field rule +# +# desc tfield; +# +----------+--------------+------+-----+---------+-------+ +# | Field | Type | Null | Key | Default | Extra | +# +----------+--------------+------+-----+---------+-------+ +# | id | int(11) | NO | PRI | NULL | | +# | tags | varchar(256) | YES | | NULL | | +# | keywords | varchar(256) | YES | | NULL | | +# +----------+--------------+------+-----+---------+-------+ +# +[[rule]] +schema = "test" +table = "tfield" +index = "test" +type = "tfield" + +[rule.field] +# Map column `id` to ES field `es_id` +id="es_id" +# Map column `tags` to ES field `es_tags` with array type +tags="es_tags,list" +# Map column `keywords` to ES with array type +keywords=",list" + +# Filter rule +# +# desc tfilter; +# +-------+--------------+------+-----+---------+-------+ +# | Field | Type | Null | Key | Default | Extra | +# +-------+--------------+------+-----+---------+-------+ +# | id | int(11) | NO | PRI | NULL | | +# | c1 | int(11) | YES | | 0 | | +# | c2 | int(11) | YES | | 0 | | +# | name | varchar(256) | YES | | NULL | | +# +-------+--------------+------+-----+---------+-------+ +# +[[rule]] +schema = "test" +table = "tfilter" +index = "test" +type = "tfilter" + +# Only sync following columns +filter = ["id", "name"] + +# id rule +# +# desc tid_[0-9]{4}; +# +----------+--------------+------+-----+---------+-------+ +# | Field | Type | Null | Key | Default | Extra | +# +----------+--------------+------+-----+---------+-------+ +# | id | int(11) | NO | PRI | NULL | | +# | tag | varchar(256) | YES | | NULL | | +# | desc | varchar(256) | YES | | NULL | | +# +----------+--------------+------+-----+---------+-------+ +# +[[rule]] +schema = "test" +table = "tid_[0-9]{4}" +index = "test" +type = "t" +# The es doc's id will be `id`:`tag` +# It is useful for merge muliple table into one type while theses tables have same PK +id = ["id", "tag"]