Kafka 核心源码分析 - 2.1.1 必要的参数配置
第二章 生产者
2.1 客户端开发
2.1.1 必要的参数设置
参照上一节中的initConfig()方法,在Kafka生产者客户端KafkaProducer中有3个参数是必填的。
-  
   bootstrap.servers
-  
   key.serializer和value.serializer
broker端接收的消息必须以字节数组(byte[])的形式存在。前篇代码中生产者使用的KafkaProducer<String,String>和ProducerRecord<String,String>中的泛型<StringString>应的就是消息中key和value的类型,生产者客户端使用这种方式可以让代码具有良好的可读性,不过在发往broker之前需要将消息中对应的key和value做相应的序列化操作来转换成字节 数组。key.serializer和value.serializer这两个参数分别用来指定key和value序列化操作的序列化器,这两个参数无默认值。注意这里必须填写序列化器的全限定名,具体代码请参考上一节综述中的代码示例中org.apache.kafka.common.serialization.StringSerializer,单单指定StringSerializer是错误的。
注意到前篇示例代码中的initConfig()方法里还设置了一个参数client.id,这个参数用来设定KafkaProducer对应的客户端id,默认值为""。如果客户端不设置,则KafkaProducer会自动生成一个非空字符串,内容形式如"producer-1" "producer-2",即字符串"producer-"与数字的拼接。
KafkaProducer中的参数众多,远非示例initConfig()方法中的那样只有4个,开发人员可以根据业务应用的实际需求来修改这些参数的默认值,以达到灵活调配的目的。一般情况下,普通开发人员无法记住所有的参数名称,只能有个大致的印象。在实际使用过程中,诸如"key.serializer" "max.request.size" "interceptor.classes"类的字符串经常由于人为因素而书写错误。
为此,我们可以直接使用客户端中的org.apache.kafka.clients.producer.ProducerConfig类来做一定程度上的预防措施,每个参数在ProducerConfig类中都有对应的名称,以上篇代码中的initConfig()的方法为例,引入ProducerConfig后的修改结果如下:
public static Properties initNewConfig() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");return props;}
注意到上面的代码中key.serializer和value.serializer参数对应类的全限定名比较长,也比较容易写错,这里通过Java中的技巧来做进一步的改进,相关代码如下:
public static Properties initPerferConfig() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());return props;}
如此以来,代码便能简洁许多,同时还能降低人为出错的可能性。在配置完参数之后,我们就可以使用它来创建一个生产者实例,代码示例如下:
KafkaProducer<String, String> producer = new KafkaProducer<>(props);KafkaProducer是线程安全的,可以在多个线程中共享单个KafkaProducer实例,也可以将KafkaProducer实例进行池化供其他线程调用。
KafkaProducer中有多个构造方法,比如在创建KafkaProducer实例的时候并没有设定key.serializer和value.serializer这两个配置参数,那么就需要在构造方法中添加对应的序列化器。示例代码如下:
KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());其内部原理和无序列化器的构造方法一样,不过就实际而言,一般都选用
public KafkaProducer(Properties properties) {this(new ProducerConfig(properties), null, null, null, null);}
这个构造方法来创建KafkaProducer实例。
附:
KafkaProducer的构造器一览:
