vlambda博客
学习文章列表

利用Nacos服务获取配置逻辑的特点,实现动态配置kafak认证

  • 我要做什么?

实现Nacos动态配置kafka认证信息,使每个微服务读取同一个kafka配置,并生成文件注入到环境变量中。


  • 为什么要这么做?

首先我们看下

Kafka-java接入demo,如图:

1.prod_client_jaas.conf文件

KafkaClient{ org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka_1" password="密码"; };

2.cons_client_jaas.conf

KafkaClient{ org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka_1" password="密码"; };

3.producer

package com.sensetime.kafka; import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;   /** * Producer * */public class App{ public static void main( String[] args ) {   String fsPath=System.getProperty("user.dir"); System.out.println(fsPath); System.setProperty("java.security.auth.login.config", ""+fsPath+"/conf/prod_client_jaas.conf"); // 环境变量添加,需要输入配置文件的路径 System.out.println("===================配置文件地址"+fsPath+"/conf/prod_client_jaas.conf"); Properties props = new Properties(); props.put("bootstrap.servers", "ip:9092,ip:9092,ip:9092"); //此处为kafka接入点 props.put("acks", "1"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty ("security.protocol", "SASL_PLAINTEXT"); props.setProperty ("sasl.mechanism", "PLAIN"); Producer producer = null; try { producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { String msg = "Message " + i; producer.send(new ProducerRecord("testtime", msg)); //此处为创建的topic System.out.println("Sent:" + msg); } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } }}

4.consumer

package com.sensetime.kafka; import java.util.Arrays;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;   public class Consumer { public static void main(String[] args) {  String fsPath=System.getProperty("user.dir"); System.setProperty("java.security.auth.login.config", ""+fsPath+"/conf/cons_client_jaas.conf"); // 环境变量添加,需要输入配置文件的路径 System.out.println("===================配置文件地址"+fsPath+"/conf/cons_client_jaas.conf"); Properties props = new Properties(); props.put("bootstrap.servers", "ip:9092,ip:9092,ip:9092"); //kafka接入点 props.put("group.id", "group1"); //创建的group props.put("group.name", "1"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "earliest"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty ("security.protocol", "SASL_PLAINTEXT"); props.setProperty ("sasl.mechanism", "PLAIN"); KafkaConsumer kafkaConsumer = new Kafkansumer<>(props); kafkaConsumer.subscribe(Arrays.asList("testtime")); //此处为订阅的topic while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println("线程1"+":"+"Partition: " + record.partition() + " Offset: " + record.offset() + " Value: " + record.value() + " ThreadID: " + Thread.currentThread().getId()); }  } } }


这个配置认证方式的痛点在于1: 需要在springboot刚启动还未进行kafka建立连接之前,将认证信息注入到环境变量里边

2:需要每个微服务都要配置认证信息的文件。考虑到我们使用Nacos作为配置中心,我的想法是利用Nacos进行配置认证信息,并在springboot启动后kafka实例化前,读取认证信息,设置到环境变量里。


  • 我是怎么做的?

首先看图所示,springboot启动时候会在refreshContext(context)之前执行初始化applyInitializers,当spring执行这个类PropertySourceBootstrapConfiguration的时候,会执行Nacos的相关获取配置解析配置的方法,所以,我只也搞一个同样的initalizer,并且改initalizer排序在Nacos执行之后,不就解决了吗,对的,就是这样的


实现代码如下:

注意:如果我们自己定义启动执行前的类需如下防范
public class MyApiApplication { public static void main(String[] args) { SpringApplication application = new SpringApplication(MyApiApplication.class); application.addInitializers(new KafkaSaslConfiguration()); application.run(args); }}


public class KafkaSaslConfiguration implements ApplicationContextInitializer<ConfigurableApplicationContext>, Ordered {
private static Logger log = LoggerFactory.getLogger(KafkaSaslConfiguration.class);
private static final String STORE_CONS_CLIENT_JAAS_PATH = "./conf/cons_client_jaas.conf";
@Override public int getOrder() { return Ordered.HIGHEST_PRECEDENCE + 11; }
@Override public void initialize(ConfigurableApplicationContext applicationContext) { ConfigurableEnvironment environment = applicationContext.getEnvironment(); String username = environment.getProperty("kafka-sasl-username"); String password = environment.getProperty("kafka-sasl-password"); if (StringUtils.isBlank(username) || StringUtils.isBlank(password)) { log.error("kafka sasl need kafka-sasl-username and kafka-sasl-password,please set value for Nacos."); System.exit(-1); } String kafkaClient = "KafkaClient{\n" + " org.apache.kafka.common.security.plain.PlainLoginModule required\n" + " username=\"" + username + "\"\n" + " password=\"" + password + "\";\n" + " };";
String clientPath = System.getProperty("user.dir") + "/conf/cons_client_jaas.conf"; System.setProperty("java.security.auth.login.config", clientPath); ResourceUtils.readStringToDisk(kafkaClient, STORE_CONS_CLIENT_JAAS_PATH); log.info("java.security.auth.login.config: {}", clientPath); }
}