package com.kafka.spout;
import java.util.HashMap;
import java.util.Map;
import com.google.common.collect.Maps;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
public class StormKafkaTopo {
public static void main(String[] args) {
BrokerHosts brokerHosts = new ZkHosts( "zeb,yjd,ylh" );
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "yjd" , "/storm" , "kafkaspout" );
Config conf = new Config();
spoutConfig.scheme = new SchemeAsMultiScheme( new MessageScheme());
SimpleHBaseMapper mapper = new SimpleHBaseMapper();
mapper.withColumnFamily( "result" );
mapper.withColumnFields( new Fields( "count" ));
mapper.withRowKeyField( "word" );
Map<String, Object> map = Maps.newTreeMap();
map.put( "hbase.rootdir" , "hdfs://zeb:9000/hbase" );
map.put( "hbase.zookeeper.quorum" , "zeb:2181,yjd:2181,ylh:2181" );
HBaseBolt hBaseBolt = new HBaseBolt( "wc" , mapper).withConfigKey( "hbase.conf" );
conf.setDebug( true );
conf.put( "hbase.conf" , map);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout( "spout" , new KafkaSpout(spoutConfig));
builder.setBolt( "split" , new LevelSplit(), 1 ).shuffleGrouping( "spout" );
builder.setBolt( "count" , new LevelCount(), 1 ).fieldsGrouping( "split" , new Fields( "word" ));
builder.setBolt( "hbase" , hBaseBolt, 1 ).shuffleGrouping( "count" );
if (args != null && args.length > 0 ) {
try {
StormSubmitter.submitTopology(args[ 0 ], conf, builder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology( "Topotest1121" , conf, builder.createTopology());
Utils.sleep( 1000000 );
cluster.killTopology( "Topotest1121" );
cluster.shutdown();
}
}
}
|