mypipe icon indicating copy to clipboard operation
mypipe copied to clipboard

Empty data list for UPDATE INSERT DELETE queries

Open pratpor opened this issue 9 years ago • 3 comments

Hi.

My issue is quite similar to https://github.com/mardambey/mypipe/issues/15 For all kinds of query being made on the database, I am only able to receive empty sets of mutations.

UPDATE test_db.some_table SET () 
INSERT INTO test_db.some_table () VALUES ()
INSERT INTO test_db.some_table () VALUES ()
UPDATE test_db.some_table SET () 
INSERT INTO test_db.some_table () VALUES ()
INSERT INTO test_db.some_table () VALUES ()
UPDATE test_db.some_table SET ()

Here is my application.conf file:

mypipe {

  # Avro schema repository client class name
  #schema-repo-client = "mypipe.avro.schema.SchemaRepo"

  # consumers represent sources for mysql binary logs
  consumers {

    localhost {
      # database "host:port:user:pass" array
      source = "localhost:3306:mypipe:mypipe"
    }   
  }

  # data producers export data out (stdout, other stores, external services, etc.)
  producers {

    stdout {
      class = "mypipe.producer.stdout.StdoutProducer"
    }   

    #kafka-generic {
    #  class = "mypipe.producer.KafkaMutationGenericAvroProducer"
    #}  
  }

  # pipes join consumers and producers
  pipes {

    stdout {
      consumers = ["localhost"]
      producer {
        stdout {}
      }   
      # how to save and load binary log positions
      binlog-position-repo {
        # saved to a file, this is the default if unspecified
        class = "mypipe.api.repo.ConfigurableFileBasedBinaryLogPositionRepository"
        config {
          file-prefix = "stdout-00"     # required if binlog-position-repo is specifiec
          data-dir = "/tmp/mypipe/data" # defaults to mypipe.data-dir if not present
        }   
      }   
    }   

    #kafka-generic {
    #  enabled = true
    #  consumers = ["localhost"]
    #  producer {
    #    kafka-generic {
    #      metadata-brokers = "localhost:9092"
    #    }   
    #  }
    #  binlog-position-repo {
    #    # saves to a MySQL database, make sure you use the following as well to prevent reacting on
    #    # inserts / updates made in the same DB being listenened on for changes
    #    # mypipe {
    #    #   include-event-condition = """ table != "binlogpos" """ 
    #    #   error {
    #    #     quit-on-empty-mutation-commit-failure = false
    #    #   }
    #    # }
    #    class = "mypipe.api.repo.ConfigurableMySQLBasedBinaryLogPositionRepository"
    #    config {
    #      # database "host:port:user:pass" array
    #      source = "localhost:3306:mypipe:mypipe"
    #      database = "mypipe"
    #      table = "binlogpos"
    #      id = "kafka-generic" # used to find the row in the table for this pipe
    #    }
    #  }
    #}
  }
}

This is the table schema:

mysql> describe some_table;
+---------------+---------------------+------+-----+---------+----------------+
| Field         | Type                | Null | Key | Default | Extra          |
+---------------+---------------------+------+-----+---------+----------------+
| id            | bigint(20) unsigned | NO   | PRI | NULL    | auto_increment |
| tag_id        | bigint(20)          | NO   | MUL | NULL    |                |
| some_field   | bigint(20)          | YES  | MUL | NULL    |                |
+---------------+---------------------+------+-----+---------+----------------+

Here is the sample query:

update some_table set some_field=4 where some_field=3

Any help on this would be highly appreciated.

pratpor avatar Jan 25 '16 14:01 pratpor

Maybe you are missing some privileges for your account. You will need a QUERY on information_schema.

tramchamploo avatar Jan 29 '16 02:01 tramchamploo

Hi @pratpor, I used your configuration file as is to replace mine (application.overrides.conf in the runner) with the following table:

create table some_table (id bigint(20) unsigned auto_increment, tag_id bigint(20), some_field bigint(20), primary key (id)) engine = innodb;

then did the following:

./sbt
...
...
Multiple main classes detected, select one to run:

[1] mypipe.runner.KafkaGenericConsoleConsumer
[2] mypipe.runner.PipeRunner

Enter number: 2

After which I got:

09:46:18 INFO  [mypipe.runner.PipeRunnerUtil$           ] Loading configuration for stdout pipe
09:46:18 INFO  [mypipe.mysql.MySQLBinaryLogConsumer     ] Using current master binlog position for consuming from localhost:3306
09:46:18 INFO  [mypipe.runner.PipeRunner$               ] Connecting 1 pipes...
09:46:18 INFO  [mypipe.pipe.Pipe                        ] Connecting pipe between localhost:3306 -> StdoutProducer
09:46:18 INFO  [mypipe.mysql.MySQLBinaryLogConsumer     ] Connecting client to [email protected]:localhost:3306
Feb 07, 2016 9:46:18 AM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to localhost:3306 at mysql-bin.000009/311 (sid:1457, cid:2)
09:46:18 INFO  [mypipe.pipe.Pipe                        ] Pipe stdout connected!
09:46:28 INFO  [m.a.r.ConfigurableFileBasedBinaryLogPositionRepository] Saving binlog position for pipe stdout-00/localhost-3306 -> Some(mysql-bin.000009:311)

Then, I opened up a MySQL client to the server, and did:

mysql> insert into some_table values (null, 1, 2); Query OK, 1 row affected (0.13 sec)

mysql> insert into some_table values (null, 3, 4); Query OK, 1 row affected (0.14 sec)

mysql> update some_table set some_field=5 where some_field=2; Query OK, 1 row affected (0.10 sec) Rows matched: 1 Changed: 1 Warnings: 0

mysql> update some_table set some_field=5 where some_field>1; Query OK, 1 row affected (0.12 sec) Rows matched: 2 Changed: 1 Warnings: 0

mysql> update some_table set some_field=9 where some_field>1; Query OK, 2 rows affected (0.10 sec) Rows matched: 2 Changed: 2 Warnings: 0

to which I had the following in the console, due to the stdout producer being active (note that I cleaned up the output a bit to remove extra logger output and other unrelated logs):

INSERT INTO mypipe.some_table (id, tag_id, some_field) VALUES (1, 1, 2) INSERT INTO mypipe.some_table (id, tag_id, some_field) VALUES (2, 3, 4) UPDATE mypipe.some_table SET (id=1, tag_id=1, some_field=5) WHERE (id=1) UPDATE mypipe.some_table SET (id=2, tag_id=3, some_field=5) WHERE (id=2) UPDATE mypipe.some_table SET (id=1, tag_id=1, some_field=9) WHERE (id=1) UPDATE mypipe.some_table SET (id=2, tag_id=3, some_field=9) WHERE (id=2)

This is to say, that your configuration looks fine. Note that I tested all of this using the Docker Compose set up provided by mypipe in the docker/ sub-directory of the project.

mardambey avatar Feb 07 '16 14:02 mardambey

@pratpor any news? Take a look at this for potentially fixing up permissions:

GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'mypipe'@'%' IDENTIFIED BY 'mypipe'
GRANT ALL PRIVILEGES ON mypipe.* TO 'mypipe'@'%'
GRANT RELOAD ON *.* TO 'mypipe'@'%'

mardambey avatar Feb 14 '16 19:02 mardambey