本篇内容介绍了“docker-compose怎么搭建 es/kibana/logstash elk”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
一、准备两台centos7 虚拟机
二、新建 /home/docker-contains/es 再创建node文件夹
分别在node下新建 /data /logs /conf文件
在conf目录下 新建elasticsearch.yml文件如下
cluster.name: elasticsearch-cluster
node.name: es01
network.bind_host: 0.0.0.0
network.publish_host: 192.168.65.135
http.port: 9200
transport.tcp.port: 9300
http.cors.enabled: true
http.cors.allow-origin: "*"
node.master: true
node.data: true
discovery.zen.ping.unicast.hosts: ["192.168.65.135:9300","192.168.65.136:9300"]
discovery.zen.minimum_master_nodes: 1
path.logs: /usr/share/elasticsearch/logs
xpack.security.audit.enabled: true
~
回到es目录:
新建docker-compose.yml如下
version: '3'
services:
es01:
image: elasticsearch:6.6.1
container_name: es01
restart: always
volumes:
- /home/docker_container/es/master/data:/usr/share/elasticsearch/data:rw
- /home/docker_container/es/master/conf/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
- /home/docker_container/es/master/logs:/user/share/elasticsearch/logs:rw
- /home/docker_container/es/master/plugin1:/usr/share/elasticsearch/plugins:rw
ports:
- "9200:9200"
- "9300:9300"
另外一台机器重复上述过程:
新建elasticsearch.yml:
cluster.name: elasticsearch-cluster
node.name: es02
network.bind_host: 0.0.0.0
network.publish_host: 192.168.65.136
http.port: 9200
transport.tcp.port: 9300
http.cors.enabled: true
http.cors.allow-origin: "*"
node.master: true
node.data: true
discovery.zen.ping.unicast.hosts: ["192.168.65.135:9300","192.168.65.136:9300"]
discovery.zen.minimum_master_nodes: 1
path.logs: /usr/share/elasticsearch/logs
xpack.security.audit.enabled: true
新建docker-compose.yml:
version: '3'
services:
es02:
image: elasticsearch:6.6.1
container_name: es02
restart: always
volumes:
- /home/docker-container/es/node1/data:/usr/share/elasticsearch/data:rw
- /home/docker-container/es/node1/conf/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
- /home/docker-container/es/node1/logs:/user/share/elasticsearch/logs:rw
- /home/docker-container/es/node1/plugin1:/usr/share/elasticsearch/plugins:rw
ports:
- "9200:9200"
- "9300:9300"
分别在docker-compose.yml同级目录执行
docker-compose up -d
安装kibana:
docker-compose.yml
version: '3'
services:
kibana:
image: kibana:6.6.1
container_name: kibana
volumes:
- /home/docker_container/kibana/conf/kibana.yml:/usr/share/kibana/config/kibana.yml
restart: always
ports:
- 5601:5601
kibana.yml:
elasticsearch.hosts: ["http://192.168.65.135:9200"]
server.host: "0.0.0.0"
xpack.monitoring.ui.container.elasticsearch.enabled: true
i18n.locale: zh-CN
~
如果出现 Kibana server is not ready yet
第一点:KB、ES版本不一致(网上大部分都是这么说的)
解决方法:把KB和ES版本调整为统一版本
第二点:kibana.yml中配置有问题(通过查看日志,发现了Error: No Living connections的问题)
解决方法:将配置文件kibana.yml中的elasticsearch.url改为正确的链接,默认为: http://elasticsearch:9200
改为http://自己的IP地址:9200
第三点:浏览器没有缓过来
解决方法:刷新几次浏览器(狂刷 我刷了 6遍才出来)。
logstash:
logstash采用非docker安装,上传logstash-6.4.3.tar.gz至/home/software目录
解压缩gz包,进入到解压后的logstash-6.4.3目录
编辑:vim config/pipelines.yml (如果需要连接多个 只需在后面追加pipeline.id和path.config,切记pipeline.id不能重复)
# Default is path.data/dead_letter_queue
#
# path.dead_letter_queue:
- pipeline.id: table1
path.config: "/home/software/logstash-6.4.3/conf/mysql_1.conf"
- pipeline.id: table2
path.config: "/home/software/logstash-6.4.3/conf/mysql.conf"
mysql.conf:
input {
jdbc {
jdbc_driver_library => "/home/software/logstash-6.4.3/sql/mysql-connector-java-5.1.46.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://ip:3306/test"
jdbc_user => "root"
jdbc_password => ""
schedule => "* * * * *"
statement => "SELECT * FROM user WHERE update_time >= :sql_last_value"
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "update_time"
last_run_metadata_path => "syncpoint_table"
}
}
output {
elasticsearch {
# ES的IP地址及端口
hosts => ["192.168.65.135:9200","192.168.65.136:9200"]
# 索引名称 可自定义
index => "user"
# 需要关联的数据库中有有一个id字段,对应类型中的id
document_id => "%{id}"
document_type => "user"
}
stdout {
# JSON格式输出
codec => json_lines
}
}
mysql_1.conf:
input {
jdbc {
jdbc_driver_library => "/home/software/logstash-6.4.3/sql/mysql-connector-java-5.1.46.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://ip:3306/lvz_goods?autoReconnect=true&useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8"
jdbc_user => "root"
jdbc_password => ""
schedule => "* * * * *"
statement => "SELECT * FROM lvz_product WHERE update_time >= :sql_last_value"
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "update_time"
last_run_metadata_path => "syncpoint_table"
}
}
output {
elasticsearch {
# ES的IP地址及端口
hosts => ["192.168.65.135:9200","192.168.65.136:9200"]
# 索引名称 可自定义
index => "goods"
# 需要关联的数据库中有有一个id字段,对应类型中的id
document_id => "%{id}"
document_type => "goods"
}
stdout {
# JSON格式输出
codec => json_lines
}
}
注意:需要把mysql连接包放到对应目录;至此三大件安装就完成了
es整合ik和拼音分词器 1下载elasticsearch-analysis-ik-6.6.1.zip
分别上传到两台机器上的node1和 node2目录 ,解压重命名为ik
2下载elasticsearch-analysis-pinyin-6.6.1.zip
分别上传到两台机器上的node1和 node2目录 ,解压重命名为pinyin
编辑docker-compose文件 挂载:
/home/docker-container/es/node1/plugin1:/usr/share/elasticsearch/plugins:rw
重启es,到kibana界面 :
执行
先删除goods索引
在goods索引自定义ik和拼音分词器,ik_smart_pinyin:
DELETE /goods
PUT /goods
{
"settings": {
"analysis": {
"analyzer": {
"ik_smart_pinyin": {
"type": "custom",
"tokenizer": "ik_smart",
"filter": ["my_pinyin", "word_delimiter"]
},
"ik_max_word_pinyin": {
"type": "custom",
"tokenizer": "ik_max_word",
"filter": ["my_pinyin", "word_delimiter"]
}
},
"filter": {
"my_pinyin": {
"type" : "pinyin",
"keep_separate_first_letter" : true,
"keep_full_pinyin" : true,
"keep_original" : true,
"limit_first_letter_length" : 16,
"lowercase" : true,
"remove_duplicated_term" : true
}
}
}
}
}
指定goods索引为ik_smart_pinyin
POST /goods/_mapping/goods
{
"goods": {
"properties": {
"@timestamp": {
"type": "date"
},
"@version": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"attribute_list": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"category_id": {
"type": "long"
},
"created_time": {
"type": "date"
},
"detail": {
"type": "text",
"analyzer":"ik_smart_pinyin",
"search_analyzer":"ik_smart_pinyin"
},
"id": {
"type": "long"
},
"main_image": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"name": {
"type": "text",
"analyzer":"ik_smart_pinyin",
"search_analyzer":"ik_smart_pinyin"
},
"revision": {
"type": "long"
},
"status": {
"type": "long"
},
"sub_images": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"subtitle": {
"type": "text",
"analyzer":"ik_smart",
"search_analyzer":"ik_smart"
},
"updated_time": {
"type": "date"
}
}
}
}
安装kafka:
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092" # kafka 把9092端口随机映射到主机的端口
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.65.135 #本机ip
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: test:1:1
volumes:
- /var/run/docker.sock:/var/run/docker.sock
启动两个节点 kafka:
docker-compose up -d --scale kafka=2 本机启动一个有两个节点的 Kafka 集群
本地项目集成,kafka和elk;
application.yml
###服务启动端口号
server:
port: 8500
###服务名称(服务注册到eureka名称)
eureka:
client:
service-url:
defaultZone: http://localhost:8100/eureka
spring:
application:
name: app-lvz-goods
redis:
host: 192.168.65.136
port: 6379
password: feilvzhang
pool:
max-idle: 100
min-idle: 1
max-active: 1000
max-wait: -1
###数据库相关连接
datasource:
username: root
password:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://192.168.125.113:3306/lvz_goods?autoReconnect=true&useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8
data:
elasticsearch:
####集群名称
cluster-name: elasticsearch-cluster
####地址
cluster-nodes: 192.168.65.135:9300,192.168.65.136:9300
kafka:
bootstrap-servers: 192.168.65.135:32768,192.168.65.135:32769
日志切面:
package com.lvz.shop.elk.aop;
import com.alibaba.fastjson.JSONObject;
import com.lvz.shop.elk.kafka.KafkaSender;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
/**
* @description: ELK拦截日志信息
* @author: flz
* @date: 2019/8/9 15:57
*/
@Aspect
@Component
public class AopLogAspect {
@Autowired
private KafkaSender<JSONObject> kafkaSender;
// 申明一个切点 里面是 execution表达式
@Pointcut("execution(* com.lvz.shop.*.impl.*.*(..))")
private void serviceAspect() {
}
// 请求method前打印内容
@Before(value = "serviceAspect()")
public void methodBefore(JoinPoint joinPoint) {
ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
.getRequestAttributes();
HttpServletRequest request = requestAttributes.getRequest();
// // 打印请求内容
// log.info("===============请求内容===============");
// log.info("请求地址:" + request.getRequestURL().toString());
// log.info("请求方式:" + request.getMethod());
// log.info("请求类方法:" + joinPoint.getSignature());
// log.info("请求类方法参数:" + Arrays.toString(joinPoint.getArgs()));
// log.info("===============请求内容===============");
JSONObject jsonObject = new JSONObject();
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
//请求时间
jsonObject.put("request_time", df.format(new Date()));
//请求URL
jsonObject.put("request_url", request.getRequestURL().toString());
//请求方法
jsonObject.put("request_method", request.getMethod());
//请求类方法
jsonObject.put("signature", joinPoint.getSignature());
//请求参数
jsonObject.put("request_args", Arrays.toString(joinPoint.getArgs()));
JSONObject requestJsonObject = new JSONObject();
requestJsonObject.put("request", jsonObject);
kafkaSender.send(requestJsonObject);
}
// 在方法执行完结后打印返回内容
@AfterReturning(returning = "o", pointcut = "serviceAspect()")
public void methodAfterReturing(Object o) {
// log.info("--------------返回内容----------------");
// log.info("Response内容:" + gson.toJson(o));
// log.info("--------------返回内容----------------");
JSONObject respJSONObject = new JSONObject();
JSONObject jsonObject = new JSONObject();
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
jsonObject.put("response_time", df.format(new Date()));
jsonObject.put("response_content", JSONObject.toJSONString(o));
respJSONObject.put("response", jsonObject);
kafkaSender.send(respJSONObject);
}
}
kafka消息发送:
package com.lvz.shop.elk.kafka;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* @description: 生产者
* @author: flz
* @date: 2019/8/9 15:59
*/
@Component
@Slf4j
public class KafkaSender<T> {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
/**
* kafka 发送消息
*
* @param obj 消息对象
*/
public void send(T obj) {
String jsonObj = JSON.toJSONString(obj);
log.info("------------ message = {}", jsonObj);
// 发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("goods_mylog", jsonObj);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
log.info("Produce: The message failed to be sent:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
// TODO 业务处理
log.info("Produce: The message was sent successfully:");
log.info("Produce: _+_+_+_+_+_+_+ result: " + stringObjectSendResult.toString());
}
});
}
}
异常切面日志:
package com.lvz.shop.elk.aop.error;
import com.alibaba.fastjson.JSONObject;
import com.lvz.shop.elk.kafka.KafkaSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @description: 全局异常处理
* @author: flz
* @date: 2019/8/9 15:56
*/
//@ControllerAdvice
@Slf4j
public class GlobalExceptionHandler {
@Autowired
private KafkaSender<JSONObject> kafkaSender;
@ExceptionHandler(RuntimeException.class)
@ResponseBody
public JSONObject exceptionHandler(Exception e) {
log.info("###全局捕获异常###,error:{}", e);
// 1.封装异常日志信息
JSONObject errorJson = new JSONObject();
JSONObject logJson = new JSONObject();
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
logJson.put("request_time", df.format(new Date()));
logJson.put("error_info", e);
errorJson.put("request_error", logJson);
kafkaSender.send(errorJson);
// 2. 返回错误信息
JSONObject result = new JSONObject();
result.put("code", 500);
result.put("msg", "系统错误");
return result;
}
}
logstash 配置kafka和es:
goods_mylog.conf:
input {
kafka {
bootstrap_servers => ["192.168.65.135:32768,192.168.65.135:32769"]
topics => ["goods_mylog"]
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
hosts => ["192.168.65.135:9200","192.168.65.136:9200"]
index => "goods_mylog"
}
}
mylog.conf:
input {
kafka {
bootstrap_servers => ["192.168.65.135:32768,192.168.65.135:32769"]
topics => ["my_log"]
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
hosts => ["192.168.65.135:9200","192.168.65.136:9200"]
index => "my_log"
}
}
启动项目 访问 接口elk就会开始收集日志,在kibana下查看:

“docker-compose怎么搭建 es/kibana/logstash elk”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注天达云网站,小编将为大家输出更多高质量的实用文章!