继续把 Kafka 捋一捋,还剩两个主要的组件了,分别为 Kafka Connect 和 Kafka Streams。而其中的 Kafka Connect 是在 Kafka 0.9.0.0 开始加入的我,Connect 的出现让 Kafka 与外部世界更紧密连接起来了,进而可以让其他外围组件通过 Connect 的 Source 与 Sink 紧密的团结在以 Kafka 为核心的消息中心。从此不再总是以标准的 Kafka Consumer 和 Producer 与外部联络。
Kafka Connect 主要由两部分组成,Source Connector 和 Sink Connector,这两个来自于 Akka Stream 这一 Reactive 框架的概念,即往 Kafka 流入数据的 Connector 是 Source, 从 Kafka 导出数据的是 Sink。 要自己实现 Kafka 的 Connector 需要用到org.apache.kafka:connect-api组件,不包含在 kafka-clients 依赖中,其中定义了两个主要抽像类- org.apache.kafka.connect.source.SourceConnector extends Connector
- org.apache.kafka.connect.sink.SinkConnector extends Connector
Read More
Kafka 默认情况下是没有启用安全机制,这让能连接到 Broker 的客户端可以为所欲为,自 Kafka 0.9.0.0 版本引入了安全配置,但是需要进行一些配置来开启它。Kafka 安全主要包含三个方面:认证(authentication),授权(authorization), 和信道加密(encryption)。其中认证机制和授权分别通过 SASL(Simple Authentication and Security Layer)和 ACL(Access Control List) 来实现。本篇主要演示 SASL + ACL 的配置,未涉及 SSL 信道加道,所以没有配置 Kerberos, 客户端与 Broker 之间的数据传输仍然以明文(PLAINTEXT) 传输,这在内网使用 Kafka 基本没问题。
开启 SASL 和 ACL 需要在 Broker 和 Client 端进行相应的配置,要为两端创建包含用户认证信息的 JAAS(Java Authentication and Authorization Service) 文件。听其名就是为 Java 代码服务的,不知客户端要支持别的语言(如 Python) 时应该如何配置客户端。 Read More

由于数据安全,网速等要求,许多公司都会建立多个数据中心,每个数据中心有独立的 Kafka 集群。为保持不同中心间的数据同步,就有必要在 Kafka 集群间进行数据镜像。
kafka-mirror-maker命令或应用 Kafka Connect 可用于在多个 Kafka 集群相同的 Topic 之间互间同步数据。这里就来体验一下不同的 Kafka 集群间如何用
kafka-mirror-maker进行 topic 数据镜像。测试环境选择用两个 Vagrant 虚拟机,当然同一个主机上在不同的 ZooKeeper chroot 或不同的端口中也能演示同样的功能。首先要两启两个 Vagrant 虚拟机,这里用的是 Ubuntu Server 18.04。需要在本地建立两个目录, 分别是 ubuntu-server-1 和 ubuntu-server-2, 在各自目录中建立 Vagrantfile 文件,内容如下:
1Vagrant.configure("2") do |config| 2 config.vm.box = "ubuntu/disco64" 3 config.vm.provider "virtualbox" do |vb| 4 vb.memory = "2048" 5 end 6 config.vm.network "private_network", type: "dhcp" 7 config.vm.hostname = "ubuntu-server-1" # 另一台机器指定 ubuntu-server-2 8end以下启动 Vagrant 虚拟机,安装 JDK8 和 启动 ZooKeeper, Kafka 分别要在两个目录 (ubuntu-server-1 和 ubuntu-server-2) 中各执行一遍。 Read More
以下内容完全毫无章序,是阅读 《redis设计与实现(第二版)》一书所划的一些自己助记用的重点。本不访放到博客上来,只称放在个人 Evernote 当中,于此纯粹为了自己往后查阅,所以请不要读它。OBJECT ENCODING key 可以查数据存储的底层结构类型如redis> SADD numbers 1 3 5 7 9redis> OBJECT ENCODING numbers"intset"redis> ZADD fruit-price 5 bananaredis> OBJECT ENCODING fruit-price"quicklist"Redis 的对象带有访问时间记录信息,该信息可用于计算数据库键的空转时长redis> type msg 返回值对象的类型字符串对象编码可用 int,raw 或者 embstrRead More
承接近两年前的 用 PreparedStatement 向 SqlServer 中一次性插入多条记录,其文后用 User-Defined Type 可用下面简单的代码把 Java 本地内存中表格数据一股脑的刷入到 SQL Server 数据库表格中
String sql = "INSERT INTO Customers SELECT * FROM ?";
SQLServerPreparedStatement pstmt = (SQLServerPreparedStatement) conn.prepareStatement(sql);
SQLServerDataTable dataTable = ..... // 生成好的本地表格数据
pstmt.setStructured(1, "CustomersTableType", dataTable);
pstmt.execute();上面的 dataTable 本地表格类型变量容易生成,关键是必须在正式数据库数须预先用
CREATE TYPE创建好CustomersTableType这个用户自定义类型,这会受权限的约束。如果由 DBA 预先完全依照目标表来创建好这个用户自定义类型,又无法确定是否总是要操作该目标表的所有字段。数据库是允许我们创建临时的用户自定义类型 Read More

现实中有这样的用法,创建一批在线程池中运行的 CompletableFuture 实例,然后等待它们全部执行完再继续后面的操作。比如说 AWS 的 Lambda, 单单提交任务到线程池,不等待所有任务全部完成便退出主线程的话,AWS 便认为 Lambda 执行完毕,无视线程池中正在执行的任务而强行结束该 Lambda 实例。
以往我们通常的作法如下
1ExecutorService threadPool = Executors.newFixedThreadPool(10); 2List<CompletableFuture<Void>> futures = IntStream.rangeClosed(1, 10000) 3 .mapToObj(n -> 4 CompletableFuture.runAsync(() -> { 5 System.out.println("done " + n); 6 }, threadPool)).collect(toList()); 7 8futures.forEach(CompletableFuture::join); 9 10System.out.println("all done");如果所有的任务均无异常,上面的代码能得到预想的结果,只要上面打印出
all done的话真的就是表明所有的任务都完成了。但是在循环 joinfutures中的每一个 CompletableFuture 时,只要碰到任意一个任务有异常时,便立即抛出给外部线程,不在乎是否还有其他任务正在执行。此时,如果外部未予捕获,当然 Read More
Redis 自 2.6 版本起加入了服务端的 Lua 脚本支持,即增添了EVAL,EVALSHA,SCRIPT相关命令。Lua 为何物,Lua 是一个非常轻量级,强大,高效,可内嵌的脚本语言; 产自于巴西,源码和二进制包都只有 200 多 KB。当前版本的 Redis 5.0.5 中 Lua 引擎版本是 Lua 5.1(自 Redis 2.6 起就没变,当前 Lua 为 5.3.5),可用 Redis 命令eval "return _VERSION" 0查看到。
本文就要探究一下如何在 Redis 中使用 Lua 脚本,以及如何简化与 Redis 的交互。比如说在 Redis 中要先获一个值,然后根据这个值再去 Redis 中获得另一个相关联的值,如果不使用 Lua 脚本就会有两次与 Redis 交互,引入 Lua 脚本可以只用一次操作。
本文不具体讲述 Lua 语言本身,只涉及到与 Redis 相关的 Lua 特性。现在来体验下 Lua 中嵌入 Lua 脚本的基本操作。 Read More
在 Python 中如果把函数定义写在调用的下方可能会出错,例如下面的代码
1foo() 2 3def foo(): 4 print("hello")执行时会报出错误
NameError: name 'foo' is not defined
这时候要把
foo()调用代码放到该函数的声明后面1def foo(): 2 print("hello") 3 4foo()这样执行就一切正常了。这仿佛像是 C 语言中的函数调用需要提前声明一般,例如在 C 语言中要调用后头的定义的函数要写成 Read More
使用 Python 书写 AWS Lambda 的一个好处就是能够在控制台中直接编辑源代码,非常方便进行快速验证测试 AWS 环境相关的。这只限于使用 AWS 为 Python Lambda 运行时提供的默认组件(比如 boto3),尚若需要在自己的 Python Lambda 中使用其他的组件(如 redis), 就不得不把自己的代码及依赖打成一个 zip 包再部署,这时候就无法在控制台直接编辑代码了,也只能坠入本地修改代码,重新打包上传测试的循环当中。
欲了解 Python Lambda 中除了 boto3 外还能直接使用别的什么组件,可点击此链接 https://gist.github.com/gene1wood/4a052f39490fae00e0c3 查看当前。该 gist 也还提供了代码 code to run in Lambda 来获得所有依赖。试了下在 Python Lambda 中,用通常的help('modules') # 或
竟然连大名鼎鼎的 boto3 都无法列出来。
help('modules package')
回到正题来,如果既想用第三方的依赖,又想要在控制台中直接编辑代码进行测试,是否有他法呢?有,那就是 AWS 在 2018 年 11 月推出的 Lambda 层。见 AWS Lambda Now Supports Custom Runtimes and Enables Sharing Common Code Between Functions, 这里的层除了能用来提供 Python 依赖,还许自定义运行时,如 C++ 或 Rust 等写 Lambda 都不是梦。
AWS 的服务就像个大口袋,何时偷偷的加添了什么服务,或出了什么新的我,不时关注它的 What's New with AWS 必是个好习惯。 Read More
在用 Python 编写 AWS 服务时,要用到 Boto 3 组件,而像 boto3.client('s3') 获得的对象只能被 IDE 识别为一个 BaseClient, 具体包含什么操作方法是在运行时由参数
s3指示的基于 JSON 文件所描述的。因此 IDE 对s3 = boto3.client('s3')的s3对象无法提供有效的智能提示,每次用 Boto 3 时不得不打开 Boto 3 的在线 API 文档来对照。长此以往,总觉麻烦且效率低下,有种一直摸着石头过河的感觉。那么,是否有办法让 IDE 智能提示出各种 boto3.client('<service>') 的实际操作呢?网上找了找,确实有这个需求,解决办法有- botostubs: 与 boto3 API 保持更新(每三天),并支持众多 IDE, 试过在 IntelliJ IDEA 和 Visual Studio Code 中可用
- pyboto3: 上次更新在两年前, https://github.com/wavycloud/pyboto3, 只在 Python 2.7 下测试过
- autoboto: 需有智能提示,但改变了应用 Boto 3 组件的方式,不建议使用
本文重点推荐 botostubs, 下面会叙说具体理由,在进入正是之前,不妨来回顾一下直接使用 Boto 3 时没有好的智能提示的问题 Read More