Kafka 核心源码分析 - 2.1.1 必要的参数配置
第二章 生产者
2.1 客户端开发
2.1.1 必要的参数设置
参照上一节中的initConfig()
方法,在Kafka生产者客户端KafkaProducer
中有3个参数是必填的。
-
bootstrap.servers
-
key.serializer
和value.serializer
broker端接收的消息必须以字节数组(byte[]
)的形式存在。前篇代码中生产者使用的KafkaProducer<String,String>
和ProducerRecord<String,String>
中的泛型<StringString>
应的就是消息中key
和value
的类型,生产者客户端使用这种方式可以让代码具有良好的可读性,不过在发往broker之前需要将消息中对应的key
和value
做相应的序列化操作来转换成字节 数组。key.serializer
和value.serializer
这两个参数分别用来指定key
和value
序列化操作的序列化器,这两个参数无默认值。注意这里必须填写序列化器的全限定名,具体代码请参考上一节综述中的代码示例中org.apache.kafka.common.serialization.StringSerializer
,单单指定StringSerializer
是错误的。
注意到前篇示例代码中的initConfig()
方法里还设置了一个参数client.id
,这个参数用来设定KafkaProducer
对应的客户端id,默认值为""
。如果客户端不设置,则KafkaProducer
会自动生成一个非空字符串,内容形式如"producer-1" "producer-2"
,即字符串"producer-"
与数字的拼接。
KafkaProducer
中的参数众多,远非示例initConfig()
方法中的那样只有4个,开发人员可以根据业务应用的实际需求来修改这些参数的默认值,以达到灵活调配的目的。一般情况下,普通开发人员无法记住所有的参数名称,只能有个大致的印象。在实际使用过程中,诸如"key.serializer"
"max.request.size"
"interceptor.classes"
类的字符串经常由于人为因素而书写错误。
为此,我们可以直接使用客户端中的org.apache.kafka.clients.producer.ProducerConfig
类来做一定程度上的预防措施,每个参数在ProducerConfig
类中都有对应的名称,以上篇代码中的initConfig()
的方法为例,引入ProducerConfig
后的修改结果如下:
public static Properties initNewConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
return props;
}
注意到上面的代码中key.serializer
和value.serializer
参数对应类的全限定名比较长,也比较容易写错,这里通过Java中的技巧来做进一步的改进,相关代码如下:
public static Properties initPerferConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
return props;
}
如此以来,代码便能简洁许多,同时还能降低人为出错的可能性。在配置完参数之后,我们就可以使用它来创建一个生产者实例,代码示例如下:
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
KafkaProducer
是线程安全的,可以在多个线程中共享单个KafkaProducer
实例,也可以将KafkaProducer
实例进行池化供其他线程调用。
KafkaProducer
中有多个构造方法,比如在创建KafkaProducer
实例的时候并没有设定key.serializer
和value.serializer
这两个配置参数,那么就需要在构造方法中添加对应的序列化器。示例代码如下:
KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
其内部原理和无序列化器的构造方法一样,不过就实际而言,一般都选用
public KafkaProducer(Properties properties) {
this(new ProducerConfig(properties), null, null, null, null);
}
这个构造方法来创建KafkaProducer
实例。
附:
KafkaProducer
的构造器一览: