vlambda博客
学习文章列表

Kafka 核心源码分析 - 2.1.1 必要的参数配置

第二章 生产者

2.1 客户端开发

2.1.1 必要的参数设置

参照上一节中的initConfig()方法,在Kafka生产者客户端KafkaProducer中有3个参数是必填的。

  • bootstrap.servers
  • key.serializervalue.serializer

broker端接收的消息必须以字节数组(byte[])的形式存在。前篇代码中生产者使用的KafkaProducer<String,String>ProducerRecord<String,String>中的泛型<StringString>应的就是消息中keyvalue的类型,生产者客户端使用这种方式可以让代码具有良好的可读性,不过在发往broker之前需要将消息中对应的keyvalue做相应的序列化操作来转换成字节 数组。key.serializervalue.serializer这两个参数分别用来指定keyvalue序列化操作的序列化器,这两个参数无默认值。注意这里必须填写序列化器的全限定名,具体代码请参考上一节综述中的代码示例中org.apache.kafka.common.serialization.StringSerializer,单单指定StringSerializer是错误的。


注意到前篇示例代码中的initConfig()方法里还设置了一个参数client.id,这个参数用来设定KafkaProducer对应的客户端id,默认值为""。如果客户端不设置,则KafkaProducer会自动生成一个非空字符串,内容形式如"producer-1" "producer-2",即字符串"producer-"与数字的拼接。

KafkaProducer中的参数众多,远非示例initConfig()方法中的那样只有4个,开发人员可以根据业务应用的实际需求来修改这些参数的默认值,以达到灵活调配的目的。一般情况下,普通开发人员无法记住所有的参数名称,只能有个大致的印象。在实际使用过程中,诸如"key.serializer" "max.request.size" "interceptor.classes"类的字符串经常由于人为因素而书写错误。

为此,我们可以直接使用客户端中的org.apache.kafka.clients.producer.ProducerConfig类来做一定程度上的预防措施,每个参数在ProducerConfig类中都有对应的名称,以上篇代码中的initConfig()的方法为例,引入ProducerConfig后的修改结果如下:

 public static Properties initNewConfig() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo"); return props; }

注意到上面的代码中key.serializervalue.serializer参数对应类的全限定名比较长,也比较容易写错,这里通过Java中的技巧来做进一步的改进,相关代码如下:

 public static Properties initPerferConfig() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); return props; }

如此以来,代码便能简洁许多,同时还能降低人为出错的可能性。在配置完参数之后,我们就可以使用它来创建一个生产者实例,代码示例如下:

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

KafkaProducer是线程安全的,可以在多个线程中共享单个KafkaProducer实例,也可以将KafkaProducer实例进行池化供其他线程调用。

KafkaProducer中有多个构造方法,比如在创建KafkaProducer实例的时候并没有设定key.serializervalue.serializer这两个配置参数,那么就需要在构造方法中添加对应的序列化器。示例代码如下:

KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

其内部原理和无序列化器的构造方法一样,不过就实际而言,一般都选用

public KafkaProducer(Properties properties) { this(new ProducerConfig(properties), null, null, null, null); }

这个构造方法来创建KafkaProducer实例。

附:KafkaProducer的构造器一览: