FLINK 读取MYSQL数据-通过JDBC方式
package quickstart.batch;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.types.Row;
public class ReadFromMysql {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
//配置下相关信息,以及取数逻辑
DataSource<Row> dataSource = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://192.168.65.181/mbi_dev")
.setUsername("MobileBI")
.setPassword("ei2musv+@{U4")
.setQuery("select '1','a' from dual")
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
.finish());
//转化输出
Table table1 = tableEnv.fromDataSet(dataSource);
DataSet<Row> rowDataSet = tableEnv.toDataSet(table1, new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
rowDataSet.print();
// result.output(JDBCOutputFormat.buildJDBCOutputFormat()
// .setDrivername("com.mysql.jdbc.Driver")
// .setDBUrl("jdbc:mysql://localhost:3306")
// .setUsername("root")
// .setPassword("root")
// .setQuery("insert into flink.test (id,type) values (?,?)")
// .setSqlTypes(new int[]{Types.INTEGER, Types.NCHAR})
// .finish());
//
// env.execute("flink-test");
}
}