利用Nacos服务获取配置逻辑的特点,实现动态配置kafak认证
我要做什么?
实现Nacos动态配置kafka认证信息,使每个微服务读取同一个kafka配置,并生成文件注入到环境变量中。
为什么要这么做?
首先我们看下
Kafka-java接入demo,如图:
1.prod_client_jaas.conf文件
KafkaClient{org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka_1"password="密码";};
2.cons_client_jaas.conf
KafkaClient{org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka_1"password="密码";};
3.producer
package com.sensetime.kafka;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;/*** Producer**/public class App{public static void main( String[] args ){String fsPath=System.getProperty("user.dir");System.out.println(fsPath);System.setProperty("java.security.auth.login.config", ""+fsPath+"/conf/prod_client_jaas.conf"); // 环境变量添加,需要输入配置文件的路径System.out.println("===================配置文件地址"+fsPath+"/conf/prod_client_jaas.conf");Properties props = new Properties();props.put("bootstrap.servers", "ip:9092,ip:9092,ip:9092"); //此处为kafka接入点props.put("acks", "1");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.setProperty ("security.protocol", "SASL_PLAINTEXT");props.setProperty ("sasl.mechanism", "PLAIN");Producer producer = null;try {producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {String msg = "Message " + i;producer.send(new ProducerRecord("testtime", msg)); //此处为创建的topicSystem.out.println("Sent:" + msg);}} catch (Exception e) {e.printStackTrace();} finally {producer.close();}}}
4.consumer
package com.sensetime.kafka;import java.util.Arrays;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;public class Consumer {public static void main(String[] args) {String fsPath=System.getProperty("user.dir");System.setProperty("java.security.auth.login.config", ""+fsPath+"/conf/cons_client_jaas.conf"); // 环境变量添加,需要输入配置文件的路径System.out.println("===================配置文件地址"+fsPath+"/conf/cons_client_jaas.conf");Properties props = new Properties();props.put("bootstrap.servers", "ip:9092,ip:9092,ip:9092"); //kafka接入点props.put("group.id", "group1"); //创建的groupprops.put("group.name", "1");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("auto.offset.reset", "earliest");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty ("security.protocol", "SASL_PLAINTEXT");props.setProperty ("sasl.mechanism", "PLAIN");KafkaConsumer kafkaConsumer = new Kafkansumer<>(props);kafkaConsumer.subscribe(Arrays.asList("testtime")); //此处为订阅的topicwhile (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.println("线程1"+":"+"Partition: " + record.partition() + " Offset: " + record.offset() + " Value: " + record.value() + " ThreadID: " + Thread.currentThread().getId());}}}}
这个配置认证方式的痛点在于1: 需要在springboot刚启动还未进行kafka建立连接之前,将认证信息注入到环境变量里边
2:需要每个微服务都要配置认证信息的文件。考虑到我们使用Nacos作为配置中心,我的想法是利用Nacos进行配置认证信息,并在springboot启动后kafka实例化前,读取认证信息,设置到环境变量里。
我是怎么做的?
首先看图所示,springboot启动时候会在refreshContext(context)之前执行初始化applyInitializers,当spring执行这个类PropertySourceBootstrapConfiguration的时候,会执行Nacos的相关获取配置解析配置的方法,所以,我只也搞一个同样的initalizer,并且改initalizer排序在Nacos执行之后,不就解决了吗,对的,就是这样的
实现代码如下:
注意:如果我们自己定义启动执行前的类需如下防范
public class MyApiApplication {public static void main(String[] args) {SpringApplication application = new SpringApplication(MyApiApplication.class);application.addInitializers(new KafkaSaslConfiguration());application.run(args);}}public class KafkaSaslConfiguration implementsApplicationContextInitializer<ConfigurableApplicationContext>, Ordered {private static Logger log = LoggerFactory.getLogger(KafkaSaslConfiguration.class);private static final String STORE_CONS_CLIENT_JAAS_PATH = "./conf/cons_client_jaas.conf";@Overridepublic int getOrder() {return Ordered.HIGHEST_PRECEDENCE + 11;}@Overridepublic void initialize(ConfigurableApplicationContext applicationContext) {ConfigurableEnvironment environment = applicationContext.getEnvironment();String username = environment.getProperty("kafka-sasl-username");String password = environment.getProperty("kafka-sasl-password");if (StringUtils.isBlank(username) || StringUtils.isBlank(password)) {log.error("kafka sasl need kafka-sasl-username and kafka-sasl-password,please set value for Nacos.");System.exit(-1);}String kafkaClient = "KafkaClient{\n" +" org.apache.kafka.common.security.plain.PlainLoginModule required\n" +" username=\"" + username + "\"\n" +" password=\"" + password + "\";\n" +" };";String clientPath = System.getProperty("user.dir") + "/conf/cons_client_jaas.conf";System.setProperty("java.security.auth.login.config", clientPath);ResourceUtils.readStringToDisk(kafkaClient, STORE_CONS_CLIENT_JAAS_PATH);log.info("java.security.auth.login.config: {}", clientPath);}}
