mysql-binlog-connector-java
mysql-binlog-connector-java copied to clipboard
why monitor binlog stopped
import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.URL; import java.net.URLDecoder; import java.util.ArrayList; import java.util.Date; import java.util.Properties; import java.util.Set;
import com.github.shyiko.mysql.binlog.BinaryLogClient; import com.github.shyiko.mysql.binlog.BinaryLogClient.EventListener; import com.github.shyiko.mysql.binlog.event.Event; import com.github.shyiko.mysql.binlog.event.EventData; import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisShardInfo;
public class RedisHelper {
private static Jedis JedisClient;
private static Properties AppConfig;
private static ArrayList<String> IgnoreDbArray=new ArrayList<String>();
public static void main(String[] args){
Boolean ok=loadConfig();
if(ok==false) {
System.out.println("加载配置失败,请确定配置文件是否存在且配置正确");
return;
}
localIgnoreDbArray();
CreateJedis();
initailDbMonitor();
System.out.println("按任意键结束..");
InputStreamReader ir = new InputStreamReader(System.in);
BufferedReader in = new BufferedReader(ir);
try {
in.readLine();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 初始化不监控的数据库
*/
private static void localIgnoreDbArray() {
IgnoreDbArray.add("mysql");
IgnoreDbArray.add("CallChains.Db");
String config=AppConfig.getProperty("ignore.db");
if(config!=null&&config.equals("")==false) {
String[] array=config.split(",");
for(String db:array) {
IgnoreDbArray.add(db);
}
}
}
/**
* 获取程序的根目录
* @return
*/
private static String getRootPath(){
URL url = RedisHelper.class.getProtectionDomain().getCodeSource().getLocation();
String filePath = null;
try {
// 转化为utf-8编码
filePath = URLDecoder.decode(url.getPath(), "utf-8");
} catch (Exception e) {
e.printStackTrace();
}
if (filePath.endsWith(".jar")) {
// 截取路径中的jar包名
filePath = filePath.substring(0, filePath.lastIndexOf("/") + 1);
}
File file = new File(filePath);
filePath = file.getAbsolutePath();
return filePath;
}
private static Boolean loadConfig() {
String location=getRootPath();
System.out.println(location);
InputStream in;
try {
in = new BufferedInputStream(new FileInputStream(
new File(location+"/app.properties")));
Properties prop = new Properties();
prop.load(in);
AppConfig=prop;
return true;
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return false;
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return false;
}
}
/**
* 创建redis客户端
*/
private static void CreateJedis() {
String host=AppConfig.getProperty("redis.host").trim();
String port=AppConfig.getProperty("redis.port").trim();
String password=AppConfig.getProperty("redis.password").trim();
JedisShardInfo info=new JedisShardInfo(host,Integer.parseInt(port));
info.setSoTimeout(10000);
if(password.equals("")==false)
info.setPassword(password);
JedisClient=new Jedis(info);
JedisClient.select(7);
}
/**
* 清除对应库表的缓存
* @param dbName 库名
* @param tableName 表名
*/
private static void UpdateCache(String dbName,String tableName) {
String key=dbName+"."+tableName;
System.out.println(key);
Set<String> hkeys=JedisClient.hkeys(key);
System.out.println(hkeys);
if(hkeys.size()>0) {
String[] hKeyArray = new String[hkeys.size()];
JedisClient.del(hkeys.toArray(hKeyArray));
JedisClient.del(key);
System.out.println("缓存更新完毕..");
}
}
/**
* 初始化db监控
*/
private static void initailDbMonitor() {
String dbHost=AppConfig.getProperty("db.host").trim();
String dbPort=AppConfig.getProperty("db.port").trim();
String dbUserName=AppConfig.getProperty("db.username").trim();
String dbPassword=AppConfig.getProperty("db.password").trim();
BinaryLogClient client = new BinaryLogClient(dbHost, Integer.parseInt(dbPort), dbUserName, dbPassword);
client.registerEventListener(new EventListener() {
@Override
public void onEvent(Event event) {
System.out.println("----------------------"+new Date());
ProcessEvent(event);
}
});
client.setHeartbeatInterval(1000*60);
try {
client.connect();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 处理数据库事件
* @param event 数据库事件
*/
private static void ProcessEvent(Event event) {
EventData eData=event.getData();
if(eData==null) {
System.out.println("eData==null");
System.out.println(event);
}else {
if(eData.getClass()==TableMapEventData.class) {
TableMapEventData tableMapEventData=(TableMapEventData)eData;
String tableName=tableMapEventData.getTable();
String dbName=tableMapEventData.getDatabase();
if(IgnoreDbArray.contains(dbName)==false)
UpdateCache(dbName,tableName);
}
}
}
}
whether will it call onDisconnect when no data