mysql-binlog-connector-java icon indicating copy to clipboard operation
mysql-binlog-connector-java copied to clipboard

why monitor binlog stopped

Open introspection3 opened this issue 7 years ago • 1 comments
trafficstars

import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.URL; import java.net.URLDecoder; import java.util.ArrayList; import java.util.Date; import java.util.Properties; import java.util.Set;

import com.github.shyiko.mysql.binlog.BinaryLogClient; import com.github.shyiko.mysql.binlog.BinaryLogClient.EventListener; import com.github.shyiko.mysql.binlog.event.Event; import com.github.shyiko.mysql.binlog.event.EventData; import com.github.shyiko.mysql.binlog.event.TableMapEventData;

import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisShardInfo;

public class RedisHelper {

private static Jedis  JedisClient;
private static Properties AppConfig;
private static ArrayList<String> IgnoreDbArray=new  ArrayList<String>();


public static void main(String[] args){		
	Boolean ok=loadConfig();
	if(ok==false) {
		System.out.println("加载配置失败,请确定配置文件是否存在且配置正确");
		return;
	}
	localIgnoreDbArray();
	CreateJedis();
	initailDbMonitor();		
	System.out.println("按任意键结束..");
	InputStreamReader ir = new InputStreamReader(System.in);
	BufferedReader in = new BufferedReader(ir);
	try {
		in.readLine();
	} catch (IOException e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	}
}

/**
 * 初始化不监控的数据库
 */
private static void localIgnoreDbArray() {
	IgnoreDbArray.add("mysql");
	IgnoreDbArray.add("CallChains.Db");
	String config=AppConfig.getProperty("ignore.db");
	if(config!=null&&config.equals("")==false) {
		String[] array=config.split(",");
		for(String db:array) {
			IgnoreDbArray.add(db);
		}
	}
}



/**
 * 获取程序的根目录
 * @return
 */
private static String getRootPath(){  
    URL url = RedisHelper.class.getProtectionDomain().getCodeSource().getLocation();  
    String filePath = null;  
    try {  
    	// 转化为utf-8编码 
        filePath = URLDecoder.decode(url.getPath(), "utf-8"); 
    } catch (Exception e) {  
        e.printStackTrace();  
    }  
    if (filePath.endsWith(".jar")) { 
        // 截取路径中的jar包名  
        filePath = filePath.substring(0, filePath.lastIndexOf("/") + 1);  
    }  
    File file = new File(filePath);  
    filePath = file.getAbsolutePath(); 
    return filePath;  
}  

private static Boolean loadConfig() {
	String location=getRootPath();
	System.out.println(location);
	 InputStream in;
	try {
		in = new BufferedInputStream(new FileInputStream(  
		         new File(location+"/app.properties")));
		 Properties prop = new Properties();  
         prop.load(in);  
         AppConfig=prop;
         return true;
	} catch (FileNotFoundException e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
		return false;
	} catch (IOException e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
		return false;
	}  
}

/**
 * 创建redis客户端
 */
private static void CreateJedis() {
	String host=AppConfig.getProperty("redis.host").trim();		
	String port=AppConfig.getProperty("redis.port").trim();
	String password=AppConfig.getProperty("redis.password").trim();		
	JedisShardInfo info=new JedisShardInfo(host,Integer.parseInt(port));
	info.setSoTimeout(10000);
	if(password.equals("")==false)
		info.setPassword(password);
	JedisClient=new Jedis(info);
	JedisClient.select(7);
}

/**
 * 清除对应库表的缓存
 * @param dbName 库名
 * @param tableName 表名
 */
private static void UpdateCache(String dbName,String tableName) {
	String key=dbName+"."+tableName;
	System.out.println(key);
	Set<String> hkeys=JedisClient.hkeys(key);
	System.out.println(hkeys);
 	if(hkeys.size()>0) {
		String[] hKeyArray = new String[hkeys.size()];
		JedisClient.del(hkeys.toArray(hKeyArray));
		JedisClient.del(key);
		System.out.println("缓存更新完毕..");
	} 
	
}
 
/**
 * 初始化db监控
 */
private static void initailDbMonitor() {
	String dbHost=AppConfig.getProperty("db.host").trim();		
	String dbPort=AppConfig.getProperty("db.port").trim();
	String dbUserName=AppConfig.getProperty("db.username").trim();
	String dbPassword=AppConfig.getProperty("db.password").trim();
	
	BinaryLogClient client = new BinaryLogClient(dbHost, Integer.parseInt(dbPort), dbUserName, dbPassword);
	client.registerEventListener(new EventListener() {
		@Override
		public void onEvent(Event event) {
			System.out.println("----------------------"+new Date());
			ProcessEvent(event);
		}
	});
	client.setHeartbeatInterval(1000*60);
	
	try {
		client.connect();
	} catch (IOException e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	}
}



/**
 * 处理数据库事件
 * @param event 数据库事件
 */
private static void ProcessEvent(Event event) {
	EventData eData=event.getData();
	if(eData==null) {
		System.out.println("eData==null");
		System.out.println(event);
	}else {
		if(eData.getClass()==TableMapEventData.class) {
			TableMapEventData tableMapEventData=(TableMapEventData)eData;
			String tableName=tableMapEventData.getTable();
			String dbName=tableMapEventData.getDatabase();
			if(IgnoreDbArray.contains(dbName)==false)
				UpdateCache(dbName,tableName);
		}
	}
	
}

}

introspection3 avatar Jul 10 '18 05:07 introspection3

whether will it call onDisconnect when no data

introspection3 avatar Jul 10 '18 06:07 introspection3