zookeeper,跨集群数据拷贝

八 20th, 2014

有这么一个场景,多套测试的zookeeper集群之间数据拷贝,按之前的理解,如果可以的话,直接拷贝zk的data文件可以解决问题。

但是最近碰到这么一个事情,只是无意中删除了一个集群某一个路径下的数据,由于data数据并不可读,不能有选择的copy。所以,在这场景下,只能通过代码的方式解决了。

 

我所用到的2个场景:

1. 误删除数据,从其它zookeeper集群拷贝

2. 搭建测试环境,直接从线上导入部分节点数据到本地

 

zookeeper管理node,基本和我们的文件系统一致,这个需求就变得非常简单,可以直接转换为递归一个路径,然后在新的集群下创建。简单的实现如下:

 

import com.metaboy.common.zk.dao.ZkDaoImpl;

import com.netflix.curator.framework.CuratorFramework;

import com.netflix.curator.framework.CuratorFrameworkFactory;

import com.netflix.curator.retry.RetryNTimes;

 

import java.util.List;

 

/**

 * @author yuxiong.wangy

 *         Date: 14-8-14

 *         Time: 下午7:33

 */

public class ZookeeperDemo {

    protected static CuratorFramework client_src;

    protected static CuratorFramework client_dst;

    protected static String  namespace_src;

    protected static String  namespace_dst;

    protected static String zkRoot_src = “/app”;

 

    public static void main(String[] args) throws InterruptedException {

        String zkConnectionStr_src = “*.*.*.*:2181,*.*.*.*:2181,*.*.*.*:2181”;

        client_src = getZKClient(zkConnectionStr_src,namespace_src,60000);

        String zkConnectionStr_dst = “*.*.*.*:2181,*.*.*.*:2181,*.*.*.*:2181”;

        client_dst = getZKClient(zkConnectionStr_dst,namespace_dst,60000);

 

        copyDataRecursion(zkRoot_src);

    }

 

    /*

     *       递归拷贝数据

     */

    public static void  copyDataRecursion(String parent){

        List<String> groups = ZkDaoImpl.getChildren(client_src, parent);

        if(groups.size() > 0){

            for(String group: groups){

                String path = parent+”/”+group;

 

                if(ZkDaoImpl.getData(client_src, path) != null){

                    ZkDaoImpl.createPersistentFile(client_dst, path, ZkDaoImpl.getData(client_src, path));

                    System.out.println(“[” + path + “]:” + ZkDaoImpl.getData(client_src,path));

                }else{

                    ZkDaoImpl.createPersistentFile(client_dst, path);

                    System.out.println(“[” + path + “]:”);

                }

 

                if(ZkDaoImpl.getChildren(client_src,path).size() > 0){

                    copyDataRecursion(path);

                }

            }

        }else {

            ZkDaoImpl.createPersistentFile(client_dst,parent,ZkDaoImpl.getData(client_src,parent));

        }

    }

 

    public static CuratorFramework getZKClient(String zkConnectionStr, String namespace, int sessionTimeout) throws InterruptedException {

        int connectTimeout = 60000;

        int retry = 3;

        int retryTimeout = 10000;

        CuratorFramework client = CuratorFrameworkFactory.builder().connectString(zkConnectionStr)

                .retryPolicy(new RetryNTimes(retry, retryTimeout)).connectionTimeoutMs(connectTimeout)

                .sessionTimeoutMs(sessionTimeout).namespace(namespace).build();

        client.start();

        client.getZookeeperClient().blockUntilConnectedOrTimedOut();

        return client;

    }

}

 

ZkDaoImpl封装一个对ZK的操作的集合,类似下面这种,将原生的方法包装了下,使用上是差不多:

    /**

     * 获取文件数据

     * 

     * @param path

     *            文件路径

     * @return 文件内容

     */

    public static String getData(CuratorFramework client, String path) {

        try {

            return new String(client.getData().forPath(path));

        } catch (Exception e) {

            throw new ZkException(ZkErrors.GET_DATA_EXCEPTION, “path:” + path, e);

        }

    }





除非注明,本站文章均为原创。本文基于 BY-NC-SA 协议进行授权,欢迎转载,演绎或用于商业目的,但是必须保留本文的署名 metaboy(包含链接).

本文链接地址: http://blog.wangyuxiong.com/archives/52339

订阅本站:http://www.wangyuxiong.com/feed

分类: 分布式系统与计算         标签:
目前还没有任何评论.

无觅相关文章插件,快速提升流量