
由于数据安全,网速等要求,许多公司都会建立多个数据中心,每个数据中心有独立的 Kafka 集群。为保持不同中心间的数据同步,就有必要在 Kafka 集群间进行数据镜像。
kafka-mirror-maker命令或应用 Kafka Connect 可用于在多个 Kafka 集群相同的 Topic 之间互间同步数据。这里就来体验一下不同的 Kafka 集群间如何用
kafka-mirror-maker进行 topic 数据镜像。测试环境选择用两个 Vagrant 虚拟机,当然同一个主机上在不同的 ZooKeeper chroot 或不同的端口中也能演示同样的功能。首先要两启两个 Vagrant 虚拟机,这里用的是 Ubuntu Server 18.04。需要在本地建立两个目录, 分别是 ubuntu-server-1 和 ubuntu-server-2, 在各自目录中建立 Vagrantfile 文件,内容如下:
1Vagrant.configure("2") do |config| 2 config.vm.box = "ubuntu/disco64" 3 config.vm.provider "virtualbox" do |vb| 4 vb.memory = "2048" 5 end 6 config.vm.network "private_network", type: "dhcp" 7 config.vm.hostname = "ubuntu-server-1" # 另一台机器指定 ubuntu-server-2 8end以下启动 Vagrant 虚拟机,安装 JDK8 和 启动 ZooKeeper, Kafka 分别要在两个目录 (ubuntu-server-1 和 ubuntu-server-2) 中各执行一遍。 Read More
以下内容完全毫无章序,是阅读 《redis设计与实现(第二版)》一书所划的一些自己助记用的重点。本不访放到博客上来,只称放在个人 Evernote 当中,于此纯粹为了自己往后查阅,所以请不要读它。OBJECT ENCODING key 可以查数据存储的底层结构类型如redis> SADD numbers 1 3 5 7 9redis> OBJECT ENCODING numbers"intset"redis> ZADD fruit-price 5 bananaredis> OBJECT ENCODING fruit-price"quicklist"Redis 的对象带有访问时间记录信息,该信息可用于计算数据库键的空转时长redis> type msg 返回值对象的类型字符串对象编码可用 int,raw 或者 embstrRead More
承接近两年前的 用 PreparedStatement 向 SqlServer 中一次性插入多条记录,其文后用 User-Defined Type 可用下面简单的代码把 Java 本地内存中表格数据一股脑的刷入到 SQL Server 数据库表格中
String sql = "INSERT INTO Customers SELECT * FROM ?";
SQLServerPreparedStatement pstmt = (SQLServerPreparedStatement) conn.prepareStatement(sql);
SQLServerDataTable dataTable = ..... // 生成好的本地表格数据
pstmt.setStructured(1, "CustomersTableType", dataTable);
pstmt.execute();上面的 dataTable 本地表格类型变量容易生成,关键是必须在正式数据库数须预先用
CREATE TYPE创建好CustomersTableType这个用户自定义类型,这会受权限的约束。如果由 DBA 预先完全依照目标表来创建好这个用户自定义类型,又无法确定是否总是要操作该目标表的所有字段。数据库是允许我们创建临时的用户自定义类型 Read More

现实中有这样的用法,创建一批在线程池中运行的 CompletableFuture 实例,然后等待它们全部执行完再继续后面的操作。比如说 AWS 的 Lambda, 单单提交任务到线程池,不等待所有任务全部完成便退出主线程的话,AWS 便认为 Lambda 执行完毕,无视线程池中正在执行的任务而强行结束该 Lambda 实例。
以往我们通常的作法如下
1ExecutorService threadPool = Executors.newFixedThreadPool(10); 2List<CompletableFuture<Void>> futures = IntStream.rangeClosed(1, 10000) 3 .mapToObj(n -> 4 CompletableFuture.runAsync(() -> { 5 System.out.println("done " + n); 6 }, threadPool)).collect(toList()); 7 8futures.forEach(CompletableFuture::join); 9 10System.out.println("all done");如果所有的任务均无异常,上面的代码能得到预想的结果,只要上面打印出
all done的话真的就是表明所有的任务都完成了。但是在循环 joinfutures中的每一个 CompletableFuture 时,只要碰到任意一个任务有异常时,便立即抛出给外部线程,不在乎是否还有其他任务正在执行。此时,如果外部未予捕获,当然 Read More
Redis 自 2.6 版本起加入了服务端的 Lua 脚本支持,即增添了EVAL,EVALSHA,SCRIPT相关命令。Lua 为何物,Lua 是一个非常轻量级,强大,高效,可内嵌的脚本语言; 产自于巴西,源码和二进制包都只有 200 多 KB。当前版本的 Redis 5.0.5 中 Lua 引擎版本是 Lua 5.1(自 Redis 2.6 起就没变,当前 Lua 为 5.3.5),可用 Redis 命令eval "return _VERSION" 0查看到。
本文就要探究一下如何在 Redis 中使用 Lua 脚本,以及如何简化与 Redis 的交互。比如说在 Redis 中要先获一个值,然后根据这个值再去 Redis 中获得另一个相关联的值,如果不使用 Lua 脚本就会有两次与 Redis 交互,引入 Lua 脚本可以只用一次操作。
本文不具体讲述 Lua 语言本身,只涉及到与 Redis 相关的 Lua 特性。现在来体验下 Lua 中嵌入 Lua 脚本的基本操作。 Read More
在 Python 中如果把函数定义写在调用的下方可能会出错,例如下面的代码
1foo() 2 3def foo(): 4 print("hello")执行时会报出错误
NameError: name 'foo' is not defined
这时候要把
foo()调用代码放到该函数的声明后面1def foo(): 2 print("hello") 3 4foo()这样执行就一切正常了。这仿佛像是 C 语言中的函数调用需要提前声明一般,例如在 C 语言中要调用后头的定义的函数要写成 Read More
使用 Python 书写 AWS Lambda 的一个好处就是能够在控制台中直接编辑源代码,非常方便进行快速验证测试 AWS 环境相关的。这只限于使用 AWS 为 Python Lambda 运行时提供的默认组件(比如 boto3),尚若需要在自己的 Python Lambda 中使用其他的组件(如 redis), 就不得不把自己的代码及依赖打成一个 zip 包再部署,这时候就无法在控制台直接编辑代码了,也只能坠入本地修改代码,重新打包上传测试的循环当中。
欲了解 Python Lambda 中除了 boto3 外还能直接使用别的什么组件,可点击此链接 https://gist.github.com/gene1wood/4a052f39490fae00e0c3 查看当前。该 gist 也还提供了代码 code to run in Lambda 来获得所有依赖。试了下在 Python Lambda 中,用通常的help('modules') # 或
竟然连大名鼎鼎的 boto3 都无法列出来。
help('modules package')
回到正题来,如果既想用第三方的依赖,又想要在控制台中直接编辑代码进行测试,是否有他法呢?有,那就是 AWS 在 2018 年 11 月推出的 Lambda 层。见 AWS Lambda Now Supports Custom Runtimes and Enables Sharing Common Code Between Functions, 这里的层除了能用来提供 Python 依赖,还许自定义运行时,如 C++ 或 Rust 等写 Lambda 都不是梦。
AWS 的服务就像个大口袋,何时偷偷的加添了什么服务,或出了什么新的我,不时关注它的 What's New with AWS 必是个好习惯。 Read More
在用 Python 编写 AWS 服务时,要用到 Boto 3 组件,而像 boto3.client('s3') 获得的对象只能被 IDE 识别为一个 BaseClient, 具体包含什么操作方法是在运行时由参数
s3指示的基于 JSON 文件所描述的。因此 IDE 对s3 = boto3.client('s3')的s3对象无法提供有效的智能提示,每次用 Boto 3 时不得不打开 Boto 3 的在线 API 文档来对照。长此以往,总觉麻烦且效率低下,有种一直摸着石头过河的感觉。那么,是否有办法让 IDE 智能提示出各种 boto3.client('<service>') 的实际操作呢?网上找了找,确实有这个需求,解决办法有- botostubs: 与 boto3 API 保持更新(每三天),并支持众多 IDE, 试过在 IntelliJ IDEA 和 Visual Studio Code 中可用
- pyboto3: 上次更新在两年前, https://github.com/wavycloud/pyboto3, 只在 Python 2.7 下测试过
- autoboto: 需有智能提示,但改变了应用 Boto 3 组件的方式,不建议使用
本文重点推荐 botostubs, 下面会叙说具体理由,在进入正是之前,不妨来回顾一下直接使用 Boto 3 时没有好的智能提示的问题 Read More

Java 中有关抽象的可遍历的对象有 Iterator, Iterable 和 Java 8 的 Stream, Iterable 可简单的用如下代码转换为 Stream
StreamSupport.stream(iterable.spliterator(), false)
再回过头来,为什么要把 Iterator 或 Iterable 转换为 Stream, 因为 Iterator 和 Iterable 只提供有限的遍历操作,如 Iterator 接口的全部四个方法
hasNext()
next()
forEachRemaining(consumer)
remove()同样 Iterable 也只有
iterator(),forEach(consumer), 和spliterator()方法。而 Java 8 的 Stream 就大不一样的,带有大量的链式操作方法,如 filter, map, flatMap, collect 等。因此如果我们已有一个 Iterator 类型,能够被转换为 Stream 类型的话将会大大简化后续的转换,处理操作。具体的从 Iterator 到 Stream 的转换方式有两种 Read More
在进行数据流处理过程中,需要一个高效苗条的流处理组件,比如对输入流能进行分组(窗口),能进行流量控制(Back Pressure - 背压),这也就涉及到响应式编程,流处理框架。这方面如果直接基于 Akka actor 来构建 Akka ActorSystem 也是比较复杂,依赖的组件也不少。还有构筑在 Akka actor 之上的 Akka Streams,再往上的 Flink Streaming,它们都有像滑动,滚动窗口的概念,但是依赖更不得了。一个基本的 Flink Streaming 的项目会依赖到 45 M 以上的第三方组件,如果用它来写一个数据流处理的共享组件,那真是要命。Spring 5 也开始带上了自己的 Reactive-Streams 实现 Spring Reactor, 想要把它从 Spring 中单独抽离出也非易事。Flink Streaming 组件依赖:org.apache.fling:flink-streaming-java_2.12:1.80, 会依赖于其他诸如 akka-stream, akka-actor, flink-core, flink-clients, scala-library 等非常多的东西
而另一个著名的响应式框架 RxJava 2 就清爽多了,完全没有第三方依赖,要说有也就是定义了四个接口的 reactive-streams(2 KB 大小),就自身那个 rxjava-2.2.9.jar 包只有 2.3 M,这才叫轻量级。因为它设计来是能被应用于 Android 客户端应用的,Andriod 上的 rxandriod-1.2.1.aar 只有 9 K。所以 RxJava 2.x 太适合用来写一些小的共享组件了。 Read More