mypipe
mypipe copied to clipboard
Empty data list for UPDATE INSERT DELETE queries
Hi.
My issue is quite similar to https://github.com/mardambey/mypipe/issues/15 For all kinds of query being made on the database, I am only able to receive empty sets of mutations.
UPDATE test_db.some_table SET ()
INSERT INTO test_db.some_table () VALUES ()
INSERT INTO test_db.some_table () VALUES ()
UPDATE test_db.some_table SET ()
INSERT INTO test_db.some_table () VALUES ()
INSERT INTO test_db.some_table () VALUES ()
UPDATE test_db.some_table SET ()
Here is my application.conf
file:
mypipe {
# Avro schema repository client class name
#schema-repo-client = "mypipe.avro.schema.SchemaRepo"
# consumers represent sources for mysql binary logs
consumers {
localhost {
# database "host:port:user:pass" array
source = "localhost:3306:mypipe:mypipe"
}
}
# data producers export data out (stdout, other stores, external services, etc.)
producers {
stdout {
class = "mypipe.producer.stdout.StdoutProducer"
}
#kafka-generic {
# class = "mypipe.producer.KafkaMutationGenericAvroProducer"
#}
}
# pipes join consumers and producers
pipes {
stdout {
consumers = ["localhost"]
producer {
stdout {}
}
# how to save and load binary log positions
binlog-position-repo {
# saved to a file, this is the default if unspecified
class = "mypipe.api.repo.ConfigurableFileBasedBinaryLogPositionRepository"
config {
file-prefix = "stdout-00" # required if binlog-position-repo is specifiec
data-dir = "/tmp/mypipe/data" # defaults to mypipe.data-dir if not present
}
}
}
#kafka-generic {
# enabled = true
# consumers = ["localhost"]
# producer {
# kafka-generic {
# metadata-brokers = "localhost:9092"
# }
# }
# binlog-position-repo {
# # saves to a MySQL database, make sure you use the following as well to prevent reacting on
# # inserts / updates made in the same DB being listenened on for changes
# # mypipe {
# # include-event-condition = """ table != "binlogpos" """
# # error {
# # quit-on-empty-mutation-commit-failure = false
# # }
# # }
# class = "mypipe.api.repo.ConfigurableMySQLBasedBinaryLogPositionRepository"
# config {
# # database "host:port:user:pass" array
# source = "localhost:3306:mypipe:mypipe"
# database = "mypipe"
# table = "binlogpos"
# id = "kafka-generic" # used to find the row in the table for this pipe
# }
# }
#}
}
}
This is the table schema:
mysql> describe some_table;
+---------------+---------------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+---------------------+------+-----+---------+----------------+
| id | bigint(20) unsigned | NO | PRI | NULL | auto_increment |
| tag_id | bigint(20) | NO | MUL | NULL | |
| some_field | bigint(20) | YES | MUL | NULL | |
+---------------+---------------------+------+-----+---------+----------------+
Here is the sample query:
update some_table set some_field=4 where some_field=3
Any help on this would be highly appreciated.
Maybe you are missing some privileges for your account. You will need a QUERY on information_schema.
Hi @pratpor, I used your configuration file as is to replace mine (application.overrides.conf in the runner) with the following table:
create table some_table (id bigint(20) unsigned auto_increment, tag_id bigint(20), some_field bigint(20), primary key (id)) engine = innodb;
then did the following:
./sbt
...
...
Multiple main classes detected, select one to run:
[1] mypipe.runner.KafkaGenericConsoleConsumer
[2] mypipe.runner.PipeRunner
Enter number: 2
After which I got:
09:46:18 INFO [mypipe.runner.PipeRunnerUtil$ ] Loading configuration for stdout pipe
09:46:18 INFO [mypipe.mysql.MySQLBinaryLogConsumer ] Using current master binlog position for consuming from localhost:3306
09:46:18 INFO [mypipe.runner.PipeRunner$ ] Connecting 1 pipes...
09:46:18 INFO [mypipe.pipe.Pipe ] Connecting pipe between localhost:3306 -> StdoutProducer
09:46:18 INFO [mypipe.mysql.MySQLBinaryLogConsumer ] Connecting client to [email protected]:localhost:3306
Feb 07, 2016 9:46:18 AM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to localhost:3306 at mysql-bin.000009/311 (sid:1457, cid:2)
09:46:18 INFO [mypipe.pipe.Pipe ] Pipe stdout connected!
09:46:28 INFO [m.a.r.ConfigurableFileBasedBinaryLogPositionRepository] Saving binlog position for pipe stdout-00/localhost-3306 -> Some(mysql-bin.000009:311)
Then, I opened up a MySQL client to the server, and did:
mysql> insert into some_table values (null, 1, 2); Query OK, 1 row affected (0.13 sec)
mysql> insert into some_table values (null, 3, 4); Query OK, 1 row affected (0.14 sec)
mysql> update some_table set some_field=5 where some_field=2; Query OK, 1 row affected (0.10 sec) Rows matched: 1 Changed: 1 Warnings: 0
mysql> update some_table set some_field=5 where some_field>1; Query OK, 1 row affected (0.12 sec) Rows matched: 2 Changed: 1 Warnings: 0
mysql> update some_table set some_field=9 where some_field>1; Query OK, 2 rows affected (0.10 sec) Rows matched: 2 Changed: 2 Warnings: 0
to which I had the following in the console, due to the stdout producer being active (note that I cleaned up the output a bit to remove extra logger output and other unrelated logs):
INSERT INTO mypipe.some_table (id, tag_id, some_field) VALUES (1, 1, 2) INSERT INTO mypipe.some_table (id, tag_id, some_field) VALUES (2, 3, 4) UPDATE mypipe.some_table SET (id=1, tag_id=1, some_field=5) WHERE (id=1) UPDATE mypipe.some_table SET (id=2, tag_id=3, some_field=5) WHERE (id=2) UPDATE mypipe.some_table SET (id=1, tag_id=1, some_field=9) WHERE (id=1) UPDATE mypipe.some_table SET (id=2, tag_id=3, some_field=9) WHERE (id=2)
This is to say, that your configuration looks fine. Note that I tested all of this using the Docker Compose set up provided by mypipe in the docker/ sub-directory of the project.
@pratpor any news? Take a look at this for potentially fixing up permissions:
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'mypipe'@'%' IDENTIFIED BY 'mypipe'
GRANT ALL PRIVILEGES ON mypipe.* TO 'mypipe'@'%'
GRANT RELOAD ON *.* TO 'mypipe'@'%'