Kafka Connect 介绍和使用

继续把 Kafka 捋一捋,还剩两个主要的组件了,分别为 Kafka Connect 和 Kafka Streams。而其中的 Kafka Connect 是在 Kafka 0.9.0.0 开始加入的我,Connect 的出现让 Kafka 与外部世界更紧密连接起来了,进而可以让其他外围组件通过 Connect 的 Source 与 Sink 紧密的团结在以 Kafka 为核心的消息中心。从此不再总是以标准的 Kafka Consumer 和 Producer 与外部联络。

Kafka Connect 主要由两部分组成,Source Connector 和  Sink Connector,这两个来自于 Akka Stream 这一 Reactive 框架的概念,即往 Kafka 流入数据的 Connector 是 Source, 从 Kafka 导出数据的是 Sink。 要自己实现 Kafka 的 Connector  需要用到 org.apache.kafka:connect-api 组件,不包含在 kafka-clients 依赖中,其中定义了两个主要抽像类

  1. org.apache.kafka.connect.source.SourceConnector extends Connector
  2. org.apache.kafka.connect.sink.SinkConnector extends Connector

阅读全文 >>

类别: Kafka. 标签: . 阅读(46). 评论(0) »

启用并测试 Kafka 的 SASL + ACL 认证授权

Kafka 默认情况下是没有启用安全机制,这让能连接到 Broker 的客户端可以为所欲为,自 Kafka 0.9.0.0 版本引入了安全配置,但是需要进行一些配置来开启它。Kafka 安全主要包含三个方面:认证(authentication),授权(authorization), 和信道加密(encryption)。其中认证机制和授权分别通过 SASL(Simple Authentication and Security Layer)和  ACL(Access Control List) 来实现。本篇主要演示 SASL + ACL 的配置,未涉及 SSL 信道加道,所以没有配置 Kerberos, 客户端与 Broker 之间的数据传输仍然以明文(PLAINTEXT) 传输,这在内网使用 Kafka 基本没问题。

开启 SASL 和 ACL 需要在 Broker 和 Client 端进行相应的配置,要为两端创建包含用户认证信息的 JAAS(Java Authentication and Authorization Service) 文件。听其名就是为 Java 代码服务的,不知客户端要支持别的语言(如 Python) 时应该如何配置客户端。 阅读全文 >>

类别: Kafka. 标签: , , . 阅读(35). 评论(0) »

Kafka 集群间数据镜像实测

由于数据安全,网速等要求,许多公司都会建立多个数据中心,每个数据中心有独立的 Kafka 集群。为保持不同中心间的数据同步,就有必要在 Kafka 集群间进行数据镜像。kafka-mirror-maker 命令或应用 Kafka Connect 可用于在多个 Kafka 集群相同的 Topic 之间互间同步数据。

这里就来体验一下不同的 Kafka 集群间如何用 kafka-mirror-maker 进行 topic 数据镜像。测试环境选择用两个 Vagrant 虚拟机,当然同一个主机上在不同的 ZooKeeper chroot 或不同的端口中也能演示同样的功能。

首先要两启两个 Vagrant 虚拟机,这里用的是 Ubuntu Server 18.04。需要在本地建立两个目录, 分别是 ubuntu-server-1 和 ubuntu-server-2, 在各自目录中建立 Vagrantfile 文件,内容如下:

以下启动 Vagrant 虚拟机,安装 JDK8 和 启动 ZooKeeper, Kafka 分别要在两个目录 (ubuntu-server-1 和 ubuntu-server-2) 中各执行一遍。 阅读全文 >>

类别: Kafka. 标签: , . 阅读(15). 评论(0) »

Redis 知识点乱记

以下内容完全毫无章序,是阅读 《redis设计与实现(第二版)》一书所划的一些自己助记用的重点。本不访放到博客上来,只称放在个人 Evernote 当中,于此纯粹为了自己往后查阅,所以请不要读它。

OBJECT ENCODING key    可以查数据存储的底层结构类型
 
redis> SADD numbers 1 3 5 7 9
redis> OBJECT ENCODING numbers
"intset"
 
redis> ZADD fruit-price 5 banana
redis> OBJECT ENCODING fruit-price
"quicklist"
 
Redis 的对象带有访问时间记录信息,该信息可用于计算数据库键的空转时长
 
redis> type msg    返回值对象的类型
 
字符串对象编码可用 int,raw 或者 embstr

阅读全文 >>

类别: Redis. 标签: , . 阅读(35). 评论(0) »

用 Java 把内存中的表格数据合并到 SQL Server 表中

承接近两年前的 用 PreparedStatement 向 SqlServer 中一次性插入多条记录,其文后用 User-Defined Type 可用下面简单的代码把 Java 本地内存中表格数据一股脑的刷入到 SQL Server 数据库表格中

String sql = "INSERT INTO Customers SELECT * FROM ?";
SQLServerPreparedStatement pstmt = (SQLServerPreparedStatement) conn.prepareStatement(sql);
SQLServerDataTable dataTable = ..... // 生成好的本地表格数据
pstmt.setStructured(1, "CustomersTableType", dataTable);
pstmt.execute();

上面的 dataTable 本地表格类型变量容易生成,关键是必须在正式数据库数须预先用 CREATE TYPE 创建好 CustomersTableType 这个用户自定义类型,这会受权限的约束。如果由 DBA 预先完全依照目标表来创建好这个用户自定义类型,又无法确定是否总是要操作该目标表的所有字段。

数据库是允许我们创建临时的用户自定义类型 阅读全文 >>

类别: Database, Java/JEE. 标签: . 阅读(26). 评论(0) »

等待所有的 CompletableFuture 完成

现实中有这样的用法,创建一批在线程池中运行的  CompletableFuture 实例,然后等待它们全部执行完再继续后面的操作。比如说 AWS 的 Lambda, 单单提交任务到线程池,不等待所有任务全部完成便退出主线程的话,AWS 便认为 Lambda 执行完毕,无视线程池中正在执行的任务而强行结束该 Lambda 实例。

以往我们通常的作法如下

如果所有的任务均无异常,上面的代码能得到预想的结果,只要上面打印出 all done 的话真的就是表明所有的任务都完成了。但是在循环 join futures 中的每一个 CompletableFuture 时,只要碰到任意一个任务有异常时,便立即抛出给外部线程,不在乎是否还有其他任务正在执行。此时,如果外部未予捕获,当然 阅读全文 >>

类别: Java/JEE. 标签: . 阅读(44). 评论(0) »

Redis 中使用服务端 Lua 脚本

Redis 自 2.6 版本起加入了服务端的 Lua 脚本支持,即增添了 EVAL, EVALSHA, SCRIPT 相关命令。Lua 为何物,Lua 是一个非常轻量级,强大,高效,可内嵌的脚本语言; 产自于巴西,源码和二进制包都只有 200 多 KB。当前版本的 Redis 5.0.5 中 Lua 引擎版本是 Lua 5.1(自 Redis 2.6 起就没变,当前 Lua 为 5.3.5),可用 Redis 命令 eval "return _VERSION" 0 查看到。

本文就要探究一下如何在 Redis 中使用 Lua 脚本,以及如何简化与 Redis 的交互。比如说在 Redis 中要先获一个值,然后根据这个值再去 Redis 中获得另一个相关联的值,如果不使用 Lua 脚本就会有两次与 Redis 交互,引入 Lua 脚本可以只用一次操作。

本文不具体讲述 Lua 语言本身,只涉及到与 Redis 相关的 Lua 特性。现在来体验下 Lua 中嵌入 Lua 脚本的基本操作。 阅读全文 >>

类别: Mid-Ware. 标签: , . 阅读(23). 评论(0) »

Python 执行系统命令 - subprocess 模块的使用

Python 可信手拈来写系统脚本,那么在 Python 中调用系统命令应该会比较便捷。所以本文来看看 Python 有几种方式调用系统命令,以及与回味一下其他几种脚本语言的类似操作。简单说来,Python 执行系统命令的方式有四种方式,即

  1. os.system(cmd) (建议用 subprocess 模块)
  2. os.popen(cmd) (Python 3 中还能用,但不推荐使用了)
  3. commands 模块(在 Python 3 中已移除了该模块,基本是不必去了解它)
  4. subprocess 模块(总是上面的矛头全指向它的,重点)

os.system(cmd)

启动一个子进程来执行系统命令,可以获得标准输入,不能获到命令输出, 但可以得到一个状态码 阅读全文 >>

类别: Python. 标签: , . 阅读(53). 评论(0) »

Python 函数声明先后顺序的问题

在 Python 中如果把函数定义写在调用的下方可能会出错,例如下面的代码

foo()

def foo():
    print("hello")

执行时会报出错误

NameError: name 'foo' is not defined

这时候要把 foo() 调用代码放到该函数的声明后面

def foo():
    print("hello")

foo()

这样执行就一切正常了。这仿佛像是 C 语言中的函数调用需要提前声明一般,例如在 C 语言中要调用后头的定义的函数要写成 阅读全文 >>

类别: Python. 标签: , . 阅读(52). 评论(3) »

AWS Python Lambda 使用 Layer

使用 Python 书写 AWS Lambda 的一个好处就是能够在控制台中直接编辑源代码,非常方便进行快速验证测试 AWS 环境相关的。这只限于使用 AWS 为 Python Lambda 运行时提供的默认组件(比如 boto3),尚若需要在自己的 Python Lambda 中使用其他的组件(如 redis), 就不得不把自己的代码及依赖打成一个 zip 包再部署,这时候就无法在控制台直接编辑代码了,也只能坠入本地修改代码,重新打包上传测试的循环当中。

欲了解 Python Lambda 中除了 boto3 外还能直接使用别的什么组件,可点击此链接 https://gist.github.com/gene1wood/4a052f39490fae00e0c3 查看当前。该 gist 也还提供了代码 code to run in Lambda 来获得所有依赖。试了下在 Python Lambda 中,用通常的

help('modules')    # 或
help('modules package')

竟然连大名鼎鼎的 boto3 都无法列出来。

回到正题来,如果既想用第三方的依赖,又想要在控制台中直接编辑代码进行测试,是否有他法呢?有,那就是 AWS 在 2018 年 11 月推出的 Lambda 层。见 AWS Lambda Now Supports Custom Runtimes and Enables Sharing Common Code Between Functions, 这里的层除了能用来提供 Python 依赖,还许自定义运行时,如 C++ 或 Rust 等写 Lambda 都不是梦。

AWS 的服务就像个大口袋,何时偷偷的加添了什么服务,或出了什么新的我,不时关注它的 What's New with AWS 必是个好习惯。 阅读全文 >>

类别: AWS, Python. 标签: , . 阅读(41). 评论(0) »