package com.kafka.spout;
import java.util.HashMap;
import java.util.Map;
import com.google.common.collect.Maps;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
public class StormKafkaTopo {
public static void main(String[] args) {
BrokerHosts brokerHosts = new ZkHosts("zeb,yjd,ylh");
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "yjd", "/storm", "kafkaspout");
Config conf = new Config();
spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
SimpleHBaseMapper mapper = new SimpleHBaseMapper();
mapper.withColumnFamily("result");
mapper.withColumnFields(new Fields("count"));
mapper.withRowKeyField("word");
Map<String, Object> map = Maps.newTreeMap();
map.put("hbase.rootdir", "hdfs://zeb:9000/hbase");
map.put("hbase.zookeeper.quorum", "zeb:2181,yjd:2181,ylh:2181");
HBaseBolt hBaseBolt = new HBaseBolt("wc", mapper).withConfigKey("hbase.conf");
conf.setDebug(true);
conf.put("hbase.conf", map);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new KafkaSpout(spoutConfig));
builder.setBolt("split", new LevelSplit(), 1).shuffleGrouping("spout");
builder.setBolt("count", new LevelCount(), 1).fieldsGrouping("split", new Fields("word"));
builder.setBolt("hbase", hBaseBolt, 1).shuffleGrouping("count");
if(args != null && args.length > 0) {
try {
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Topotest1121", conf, builder.createTopology());
Utils.sleep(1000000);
cluster.killTopology("Topotest1121");
cluster.shutdown();
}
}
}
|