本篇文章给大家分享的是有关如何进行storm1.1.3与kafka1.0.0整合,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
package hgs.core.sk;
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
@SuppressWarnings("deprecation")
public class StormKafkaMainTest {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
//zookeeper链接地址
BrokerHosts hosts = new ZkHosts("bigdata01:2181,bigdata02:2181,bigdata03:2181");
//KafkaSpout需要一个config,参数代表的意义1:zookeeper链接,2:消费kafka的topic,3,4:记录消费offset的zookeeper地址 ,这里会保存在 zookeeper
//集群的/test7/consume下面
SpoutConfig sconfig = new SpoutConfig(hosts, "test7", "/test7", "consume");
//消费的时候忽略offset从头开始消费,这里可以注释掉,因为消费的offset在zookeeper中可以找到
sconfig.ignoreZkOffsets=true;
//sconfig.scheme = new SchemeAsMultiScheme( new StringScheme() );
builder.setSpout("kafkaspout", new KafkaSpout(sconfig), 1);
builder.setBolt("mybolt1", new MyboltO(), 1).shuffleGrouping("kafkaspout");
Config config = new Config();
config.setNumWorkers(1);
try {
StormSubmitter.submitTopology("storm----kafka--test", config, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
/* LocalCluster cu = new LocalCluster();
cu.submitTopology("test", config, builder.createTopology());*/
}
}
class MyboltO extends BaseRichBolt{
private static final long serialVersionUID = 1L;
OutputCollector collector = null;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple input) {
//这里把消息大一出来,在对应的woker下面的日志可以找到打印的内容
//因为得到的内容是byte数组,所以需要转换
String out = new String((byte[])input.getValue(0));
System.out.println(out);
collector.ack(input);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
pom.xml文件的依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>hgs</groupId>
<artifactId>core.sk</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>core.sk</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- <dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-monitor</artifactId>
<version>1.2.2</version>
</dependency> -->
<!-- <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.1</version>
</dependency> -->
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.7.0</version>
</dependency>
<!-- 尝试了很多次 都会有这个错误:
java.lang.NullPointerException at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOffsetLags(KafkaOffsetLagUtil.java:272)
最后修改为kafka相应的kafka-clients版本后问题得到解决,应该是该出的问题
-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2</version>
<configuration>
<archive>
<manifest>
<!-- 我运行这个jar所运行的主类 -->
<mainClass>hgs.core.sk.StormKafkaMainTest</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>
<!-- 必须是这样写 -->
jar-with-dependencies
</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
以上就是如何进行storm1.1.3与kafka1.0.0整合,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注天达云行业资讯频道。