接前文
http://blog.itpub.net/29254281/viewspace-1344706/
http://blog.itpub.net/29254281/viewspace-1347985/
http://blog.itpub.net/29254281/viewspace-2134876/
之前的代码被大神怼了..被怒批 杂乱无章,误人子弟
我估计主要是因为面向对象的程度不够.过程化太明显了
在网上找了一个Reactor模式的例子,又改了改自己的程序.
因为Oracle太笨重了,这回干脆换了MySQL好了
改写之后的程序,在我的电脑上,使用 2线程,最大500连接的配置,性能最好。
-
import java.io.IOException;
-
import java.net.InetSocketAddress;
-
import java.net.SocketAddress;
-
import java.nio.ByteBuffer;
-
import java.nio.channels.SelectionKey;
-
import java.nio.channels.Selector;
-
import java.nio.channels.SocketChannel;
-
import java.nio.charset.Charset;
-
import java.sql.Connection;
-
import java.sql.DriverManager;
-
import java.sql.PreparedStatement;
-
import java.sql.SQLException;
-
import java.sql.Timestamp;
-
import java.util.ArrayList;
-
import java.util.HashSet;
-
import java.util.Iterator;
-
import java.util.List;
-
import java.util.Set;
-
import java.util.concurrent.BlockingQueue;
-
import java.util.concurrent.LinkedBlockingQueue;
-
import java.util.concurrent.atomic.AtomicInteger;
-
import java.util.regex.Matcher;
-
import java.util.regex.Pattern;
-
-
class Reactor implements Runnable {
-
public static int GETCOUNT() {
-
return COUNT.get();
-
-
}
-
-
public static int getQueueSize() {
-
return QUEUE.size();
-
}
-
-
private static final AtomicInteger COUNT = new AtomicInteger();
-
private static final AtomicInteger TASKCOUNT = new AtomicInteger();
-
-
public int startTask() {
-
return TASKCOUNT.incrementAndGet();
-
}
-
-
public int finishTask() {
-
return TASKCOUNT.decrementAndGet();
-
}
-
-
public int incrementAndGet() {
-
return COUNT.incrementAndGet();
-
}
-
-
public final Selector selector;
-
private static BlockingQueue<Task> QUEUE = new LinkedBlockingQueue<Task>();
-
-
public void addTask(Task task) {
-
try {
-
QUEUE.put(task);
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
}
-
-
public Reactor() throws IOException {
-
selector = Selector.open();
-
}
-
-
@Override
-
public void run() {
-
try {
-
while (!Thread.interrupted()) {
-
int maxClient = 500;
-
Task task = null;
-
if (TASKCOUNT.get() < maxClient) {
-
while ((task = (Task) QUEUE.poll()) != null) {
-
new Connector(this, task).run();
-
if (TASKCOUNT.get() > maxClient) {
-
break;
-
}
-
}
-
}
-
-
selector.select();
-
-
Set<SelectionKey> selectionKeys = selector.selectedKeys();
-
Iterator<SelectionKey> it = selectionKeys.iterator();
-
// Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。
-
while (it.hasNext()) {
-
// 来一个事件 第一次触发一个accepter线程
-
// 以后触发SocketReadHandler
-
SelectionKey selectionKey = it.next();
-
dispatch(selectionKey);
-
}
-
selectionKeys.clear();
-
}
-
} catch (IOException e) {
-
e.printStackTrace();
-
}
-
}
-
-
/**
-
* 运行Acceptor或SocketReadHandler
-
*
-
* @param key
-
*/
-
void dispatch(SelectionKey key) {
-
Runnable r = (Runnable) (key.attachment());
-
if (r != null) {
-
r.run();
-
}
-
}
-
-
}
-
-
class Connector implements Runnable {
-
private Reactor reactor;
-
private Task task;
-
-
public Connector(Reactor reactor, Task task) {
-
this.reactor = reactor;
-
this.task = task;
-
}
-
-
@Override
-
public void run() {
-
try {
-
reactor.startTask();
-
task.setStarttime(System.currentTimeMillis());
-
SocketAddress addr = new InetSocketAddress(task.getHost(), 80);
-
SocketChannel socketChannel = SocketChannel.open();
-
socketChannel.configureBlocking(false);
-
socketChannel.connect(addr);
-
-
BaseHandler base = new BaseHandler();
-
base.setTask(task);
-
base.setSelector(reactor.selector);
-
base.setSocketChannel(socketChannel);
-
base.setReactor(reactor);
-
if (socketChannel != null)// 调用Handler来处理channel
-
socketChannel.register(reactor.selector, SelectionKey.OP_CONNECT, new SocketWriteHandler(base));
-
} catch (IOException e) {
-
e.printStackTrace();
-
}
-
}
-
}
-
-
class BaseHandler {
-
private Selector selector;
-
private SocketChannel socketChannel;
-
private Task task;
-
private ByteBuffer byteBuffer = ByteBuffer.allocate(2400);
-
private Reactor reactor;
-
-
public Reactor getReactor() {
-
return reactor;
-
}
-
-
public void setReactor(Reactor reactor) {
-
this.reactor = reactor;
-
}
-
-
public Selector getSelector() {
-
return selector;
-
}
-
-
public void setSelector(Selector selector) {
-
this.selector = selector;
-
}
-
-
public SocketChannel getSocketChannel() {
-
return socketChannel;
-
}
-
-
public void setSocketChannel(SocketChannel socketChannel) {
-
this.socketChannel = socketChannel;
-
}
-
-
public Task getTask() {
-
return task;
-
}
-
-
public void setTask(Task task) {
-
this.task = task;
-
}
-
-
public ByteBuffer getByteBuffer() {
-
return byteBuffer;
-
}
-
}
-
-
class SocketWriteHandler implements Runnable {
-
BaseHandler baseHandler;
-
-
public SocketWriteHandler(BaseHandler baseHandler) {
-
this.baseHandler = baseHandler;
-
ByteBuffer byteBuffer = baseHandler.getByteBuffer();
-
Task task = baseHandler.getTask();
-
try {
-
byteBuffer.put(("GET " + task.getCurrentPath() + " HTTP/1.0\r\n").getBytes("utf8"));
-
byteBuffer.put(("HOST:" + task.getHost() + "\r\n").getBytes("utf8"));
-
byteBuffer.put(("Accept:*/*\r\n").getBytes("utf8"));
-
byteBuffer.put(("\r\n").getBytes("utf8"));
-
byteBuffer.flip();
-
} catch (IOException e) {
-
e.printStackTrace();
-
}
-
}
-
-
@Override
-
public void run() {
-
try {
-
while (!baseHandler.getSocketChannel().finishConnect()) {
-
System.out.println("Waiting Connected");
-
}
-
-
baseHandler.getSocketChannel().write(baseHandler.getByteBuffer());
-
-
if (baseHandler.getByteBuffer().hasRemaining()) {
-
baseHandler.getByteBuffer().compact();
-
baseHandler.getSocketChannel().register(baseHandler.getSelector(), SelectionKey.OP_WRITE, this);
-
System.out.println("Continue Write");
-
} else {
-
baseHandler.getSocketChannel().register(baseHandler.getSelector(), SelectionKey.OP_READ,
-
new SocketReadHandler(baseHandler));
-
baseHandler.getByteBuffer().clear();
-
}
-
} catch (IOException e) {
-
e.printStackTrace();
-
}
-
}
-
-
}
-
-
class SocketReadHandler implements Runnable {
-
Charset charset = Charset.forName("utf8");
-
Charset gbkcharset = Charset.forName("gbk");
-
BaseHandler baseHandler;
-
-
public SocketReadHandler(BaseHandler baseHandler) {
-
this.baseHandler = baseHandler;
-
}
-
-
@Override
-
public void run() {
-
try {
-
SocketChannel channel = baseHandler.getSocketChannel();
-
ByteBuffer byteBuffer = baseHandler.getByteBuffer();
-
Task task = baseHandler.getTask();
-
-
int length;
-
while ((length = channel.read(byteBuffer)) > 0) {
-
byteBuffer.flip();
-
task.getContent().append(charset.decode(charset.encode(gbkcharset.decode(byteBuffer))).toString());
-
-
byteBuffer.compact();
-
}
-
if (length == -1) {
-
channel.close();
-
task.setEndtime(System.currentTimeMillis());
-
baseHandler.getReactor().incrementAndGet();
-
baseHandler.getReactor().finishTask();
-
new ParseHandler(task, baseHandler.getReactor()).run();
-
-
} else {
-
baseHandler.getSocketChannel().register(baseHandler.getSelector(), SelectionKey.OP_READ, this);
-
}
-
} catch (IOException e) {
-
e.printStackTrace();
-
}
-
-
}
-
-
}
-
-
public class Probe {
-
public static void main(String[] args) throws IOException, InterruptedException {
-
for (int i = 0; i <2; i++) {
-
Reactor reactor = new Reactor();
-
reactor.addTask(new Task("news.163.com", 80, "/index.html"));
-
new Thread(reactor, "ReactorThread_" + i).start();
-
}
-
long start = System.currentTimeMillis();
-
while (true) {
-
Thread.sleep(1000);
-
long end = System.currentTimeMillis();
-
float interval = ((end - start) / 1000);
-
int connectTotal = Reactor.GETCOUNT();
-
-
int persistenceTotal = PersistenceHandler.GETCOUNT();
-
-
int connectps = Math.round(connectTotal / interval);
-
int persistenceps = Math.round(persistenceTotal / interval);
-
System.out.print("\r连接总数:" + connectTotal + " \t每秒连接:" + connectps + "\t连接队列剩余:" + Reactor.getQueueSize()
-
+ " \t持久化总数:" + persistenceTotal + " \t每秒持久化:" + persistenceps + "\t持久化队列剩余:"
-
+ PersistenceHandler.getInstance().getSize());
-
}
-
}
-
-
}
-
-
class Task {
-
private String host;
-
private int port;
-
private String currentPath;
-
private long starttime;
-
private long endtime;
-
-
private String type;
-
private StringBuilder content = new StringBuilder(2400);
-
private int state;
-
private boolean isValid = true;
-
-
public Task() {
-
}
-
-
public Task(String host, int port, String path) {
-
init(host, port, path);
-
}
-
-
public void init(String host, int port, String path) {
-
this.setCurrentPath(path);
-
this.host = host;
-
this.port = port;
-
}
-
-
public long getStarttime() {
-
return starttime;
-
}
-
-
public void setStarttime(long starttime) {
-
this.starttime = starttime;
-
}
-
-
public long getEndtime() {
-
return endtime;
-
}
-
-
public void setEndtime(long endtime) {
-
this.endtime = endtime;
-
}
-
-
public boolean isValid() {
-
return isValid;
-
}
-
-
public void setValid(boolean isValid) {
-
this.isValid = isValid;
-
}
-
-
public int getState() {
-
return state;
-
}
-
-
public void setState(int state) {
-
this.state = state;
-
}
-
-
public String getCurrentPath() {
-
return currentPath;
-
}
-
-
public void setCurrentPath(String currentPath) {
-
this.currentPath = currentPath;
-
int i = 0;
-
if (currentPath.indexOf("?") != -1) {
-
i = currentPath.indexOf("?");
-
} else {
-
if (currentPath.indexOf("#") != -1) {
-
i = currentPath.indexOf("#");
-
} else {
-
i = currentPath.length();
-
}
-
}
-
this.type = currentPath.substring(currentPath.indexOf(".") + 1, i);
-
}
-
-
public long getTaskTime() {
-
return getEndtime() - getStarttime();
-
}
-
-
public String getType() {
-
return type;
-
}
-
-
public void setType(String type) {
-
this.type = type;
-
}
-
-
public String getHost() {
-
return host;
-
}
-
-
public int getPort() {
-
return port;
-
}
-
-
public StringBuilder getContent() {
-
return content;
-
}
-
-
public void setContent(StringBuilder content) {
-
this.content = content;
-
}
-
-
}
-
-
class ParseHandler implements Runnable {
-
private static final Set SET = new HashSet();
-
-
PersistenceHandler persistencehandler = PersistenceHandler.getInstance();
-
-
List domainlist = new ArrayList();
-
-
Task task;
-
-
private interface Filter {
-
void doFilter(Task fatherTask, Task newTask, String path, Filter chain);
-
}
-
-
private class FilterChain implements Filter {
-
private List list = new ArrayList();
-
-
{
-
addFilter(new TwoLevel());
-
addFilter(new OneLevel());
-
addFilter(new FullPath());
-
addFilter(new Root());
-
addFilter(new Default());
-
}
-
-
private void addFilter(Filter filter) {
-
list.add(filter);
-
}
-
-
private Iterator it = list.iterator();
-
-
@Override
-
public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {
-
if (it.hasNext()) {
-
((Filter) it.next()).doFilter(fatherTask, newTask, path, chain);
-
}
-
}
-
-
}
-
-
private class TwoLevel implements Filter {
-
-
@Override
-
public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {
-
if (path.startsWith("../../")) {
-
String prefix = getPrefix(fatherTask.getCurrentPath(), 3);
-
newTask.init(fatherTask.getHost(), fatherTask.getPort(), path.replace("../../", prefix));
-
} else {
-
chain.doFilter(fatherTask, newTask, path, chain);
-
}
-
-
}
-
}
-
-
private class OneLevel implements Filter {
-
-
@Override
-
public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {
-
if (path.startsWith("../")) {
-
String prefix = getPrefix(fatherTask.getCurrentPath(), 2);
-
newTask.init(fatherTask.getHost(), fatherTask.getPort(), path.replace("../", prefix));
-
} else {
-
chain.doFilter(fatherTask, newTask, path, chain);
-
}
-
-
}
-
-
}
-
-
private class FullPath implements Filter {
-
-
@Override
-
public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {
-
if (path.startsWith("http://")) {
-
Iterator it = domainlist.iterator();
-
boolean flag = false;
-
while (it.hasNext()) {
-
String domain = (String) it.next();
-
if (path.startsWith("http://" + domain + "/")) {
-
newTask.init(domain, fatherTask.getPort(), path.replace("http://" + domain + "/", "/"));
-
flag = true;
-
break;
-
}
-
}
-
if (!flag) {
-
newTask.setValid(false);
-
}
-
} else {
-
chain.doFilter(fatherTask, newTask, path, chain);
-
}
-
}
-
-
}
-
-
private class Root implements Filter {
-
-
@Override
-
public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {
-
if (path.startsWith("/")) {
-
newTask.init(fatherTask.getHost(), fatherTask.getPort(), path);
-
} else {
-
chain.doFilter(fatherTask, newTask, path, chain);
-
}
-
}
-
-
}
-
-
private class Default implements Filter {
-
-
@Override
-
public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {
-
if (path.contains(":")) {
-
newTask.setValid(false);
-
return;
-
}
-
String prefix = getPrefix(fatherTask.getCurrentPath(), 1);
-
newTask.init(fatherTask.getHost(), fatherTask.getPort(), prefix + "/" + path);
-
}
-
}
-
-
public ParseHandler(Task task, Reactor reactor) {
-
-
this.task = task;
-
this.reactor = reactor;
-
-
// 增加白名单
-
this.domainlist.add("news.163.com");
-
}
-
-
private Reactor reactor;
-
private Pattern pattern = Pattern.compile("\"[^\"]+\\.htm[^\"]*\"");
-
-
private void parseTaskState(Task task) {
-
if (task.getContent().toString().startsWith("HTTP/1.1")) {
-
task.setState(Integer.parseInt(task.getContent().substring(9, 12)));
-
} else {
-
task.setState(Integer.parseInt(task.getContent().substring(9, 12)));
-
}
-
}
-
-
/**
-
* @param fatherTask
-
* @param path
-
* @throws Exception
-
*/
-
private void createNewTask(Task fatherTask, String path) throws Exception {
-
Task newTask = new Task();
-
FilterChain filterchain = new FilterChain();
-
filterchain.doFilter(fatherTask, newTask, path, filterchain);
-
if (newTask.isValid()) {
-
synchronized (SET) {
-
if (SET.contains(newTask.getHost() + newTask.getCurrentPath())) {
-
return;
-
}
-
SET.add(newTask.getHost() + newTask.getCurrentPath());
-
}
-
reactor.addTask(newTask);
-
}
-
}
-
-
private String getPrefix(String s, int count) {
-
String prefix = s;
-
while (count > 0) {
-
prefix = prefix.substring(0, prefix.lastIndexOf("/"));
-
count--;
-
}
-
return "".equals(prefix) ? "/" : prefix;
-
}
-
-
@Override
-
public void run() {
-
try {
-
parseTaskState(task);
-
if (200 == task.getState()) {
-
Matcher matcher = pattern.matcher(task.getContent());
-
while (matcher.find()) {
-
String path = matcher.group();
-
if (!path.contains(" ") && !path.contains("\t") && !path.contains("(") && !path.contains(")")) {
-
path = path.substring(1, path.length() - 1);
-
-
createNewTask(task, path);
-
}
-
}
-
}
-
-
persistencehandler.addTask(task);
-
} catch (Exception e) {
-
e.printStackTrace();
-
}
-
-
}
-
}
-
-
class PersistenceHandler implements Runnable {
-
private static class SingletonHandler {
-
private static PersistenceHandler obj = new PersistenceHandler();
-
}
-
-
public static PersistenceHandler getInstance() {
-
return SingletonHandler.obj;
-
}
-
-
static {
-
try {
-
Class.forName("com.mysql.jdbc.Driver");
-
} catch (ClassNotFoundException e) {
-
// TODO Auto-generated catch block
-
e.printStackTrace();
-
}
-
}
-
-
public static int GETCOUNT() {
-
return COUNT.get();
-
}
-
-
private static final AtomicInteger COUNT = new AtomicInteger();
-
private BlockingQueue persistencelist;
-
-
public PersistenceHandler() {
-
this.persistencelist = new LinkedBlockingQueue();
-
new Thread(this, "PersistenceThread").start();
-
}
-
-
public void addTask(Task task) {
-
try {
-
this.persistencelist.put(task);
-
} catch (InterruptedException e) {
-
// TODO Auto-generated catch block
-
e.printStackTrace();
-
}
-
}
-
-
public int getSize() {
-
return persistencelist.size();
-
}
-
-
private Connection conn;
-
private PreparedStatement ps;
-
-
@Override
-
public void run() {
-
try {
-
conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/mvbox", "xx", "xx");
-
conn.setAutoCommit(false);
-
ps = conn.prepareStatement(
-
"insert into probe(host,path,state,tasktime,type,length,createtime) values(?,?,?,?,?,?,?)");
-
} catch (SQLException e) {
-
// TODO Auto-generated catch block
-
e.printStackTrace();
-
}
-
while (true) {
-
this.handler();
-
COUNT.addAndGet(1);
-
}
-
}
-
-
private void handler() {
-
try {
-
Task task = (Task) persistencelist.take();
-
ps.setString(1, task.getHost());
-
ps.setString(2, task.getCurrentPath());
-
ps.setInt(3, task.getState());
-
ps.setLong(4, task.getTaskTime());
-
ps.setString(5, task.getType());
-
ps.setInt(6, task.getContent().toString().length());
-
ps.setTimestamp(7, new Timestamp(task.getEndtime()));
-
ps.addBatch();
-
if (GETCOUNT() % 500 == 0) {
-
ps.executeBatch();
-
conn.commit();
-
}
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
} catch (SQLException e) {
-
e.printStackTrace();
-
}
-
}
-
}



|