Logstash는 Elastic Stack에 원래 포함되어 있는 구성 요소로서, 구문을 분석하고 데이터를 보강하며 처리를 할 때 사용하던 도구이다.
Ingest Node는 Elasticsearch 5.0부터 도입되었으며 최소한의 구성으로 간단한 아키텍처를 가능하게 하며, Logstash의 상위개념이라고 볼 수 있다. 일부 기능은 중복되지만 서로의 사용처가 분명하게 다르다.
실습으로 경험해본 차이점은 Logstash의 Filter 부분을 구성할 때 .yaml 파일로 작성하게 되는데 만약 관련 설정이 길어지거나 추가사항이 많을때는 가독성과 유지보수가 힘들다.
하지만 Ingest Node를 사용하여 관리했을땐 GUI기반으로 설정하기가 매우 간단했으며 편리하게 관리할 수 있었다.
인제스트 노드(Ingest Node)
도규먼트의 가공과 정제를 위한 인제스트 파이프라인이 실행되는 노드
파이프라인을 통해 도큐먼트를 엘라스틱서치에 인덱싱하기 전에 원하는 형태로 변형할 수 있다.
Logstash의 filter와 기능적으로 유사하나, 실행 주체가 엘라스틱서치라는 점과 지원되는 기능에 일부 차이가 있다.
무엇보다 인제스트 노드를 활용하면 Logstash 설치 없이 비츠만 설치해 데이터를 수집하고, 인제스트 파이프라인을 이용해 이를 가공할 수 있다.
물론 입력되는 데이터가 많아지면 인덱싱 리퀘스트를 모아주거나, 좀 더 복잡한 가공을 수행할 수 있다는 점에서 Logstash의 역할이 필수적인 경우도 있으니
가볍고 간단한 수집프로세스 → Beats + Ingest Node
좀 더 무겁고 복잡한 가공이 필요 → + Logstash
인제스트 파이프라인(Ingest Pipelines) 만들기

Management - Stack Management - Ingest Pipelines에서 Create Pipeline
파이프라인에서 중요한 부분은 프로세서(Processors)를 추가할 수 있다는 점이다.
** Date Processor




먼저 Grok 패턴을 통해 Message에 적힌 시간을 timestamp에 할당을 한다.
그림2을 보면 @timestamp와 timestamp의 시간이 일치하지않은것을 볼 수 있는데 이는 로그가 발생한 시간과 Elasticsearch에 인입한 시간이 다르기때문에 이를 동일하게 맞춰주려면 Data Processor를 이용한다.
Message의 시간이 May 3 12:00:11와 같은 SYSLOGTIMESTAMP 형식으로 오기 때문에 그림1을 참고하여 Date의 Format은 MMM d HH:mm:ss로 맞춰주어 연결을 시켜준다.
정상적으로 연결이 되면 그림3처럼 @timestamp와 timestamp의 시간이 일치하는 것을 볼 수 있고 그림4처럼 @timestamp의 출력형식을 TIMESTMP_ISO8601의 방식으로 바꿔주려면 그림1 하단의 Output Format란에 yyyy-MM-dd HH:mm:ss.SS로 해준다.
** KV(Key Value) Processor
KV에서 Include key에 설정하는 값들은 파싱할 실제 데이터를 넣는것이고 index에서 mapping하는 값들과 일치시켜줘야 파싱한 데이터를 mapping 할 수 있다.
KV(Key-Value) 프로세서는 특정 필드를 key-value 형식으로 파싱하고 싶을 떄 사용하는 프로세서이다.

위의 사진을 참고하면 result 필드에 데이터를 파싱했는데 아래와 같은 형태로 저장된다.
{"resultCode":"0000","resultMsg":"정상","resultDate":"20240313151046","transDate":"20240313","transTime":"151045","msgNO":"21212121","mid":"amoreN0123m","amt":"3000","bankCD":"020","accountNO":"11111111","accountNM":"test","accountDesc":"(주)test"}
이를 편리하게 파싱하기 위해서 도입된것이 KV Processor이며 사용 방법은 아래와 같다.


field 구분자, value 구분자를 설정하고 중괄호 위한 Trim까지 설정해주고 나면 key:value 형식으로 데이터가 저장된다.
이때 Include keys에서 필요한 데이터 필드들만 추가할 수 있는데 Defaults는 모두 추가를 하니 유의해야한다.
** Script Processor
특정 스크립트를 적용하고 싶을 때 사용하는 프로세서이다.
painless라는 스크립팅 언어를 사용하며 엘라스틱서치에서 사용하는 언어이다.


DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
df.setTimeZone(TimeZone.getTimeZone("Asia/Seoul"));
Date date = new Date();
String datetime = df.format(date);
ZonedDateTime zdt = ZonedDateTime.parse(datetime);
int year = zdt.getYear();
int month = zdt.getMonthValue();
int day = zdt.getDayOfMonth();
if(month<10){
ctx.month = "0"+month;
}else{
ctx.month = month;
}
if(day<10){
ctx.day = "0"+day;
}else{
ctx.day= day;
}
ctx.year= year;
ctx.필드명에 값을 대입하면 필드가 생성되며 이 값들로 데이터를 만들어낼 수 있다.
필드명을 동적으로 처리하고싶으면 아래의 예시처럼 할 수 있다.
for (int i = 0; i < ctx.message.result.length; i++) {
def data = ctx.message.result[i];
def parsedData = [
"rejectRate": data.rejectRate,
"concurrentUser": data.concurrentUser,
"visitDay": data.visitDay,
"activeUser": data.activeUser,
"responseTime": data.responseTime,
"ipAddress": data.ipAddress,
"visitHour": data.visitHour,
"activeServiceRangeCount0": data.activeServiceRangeCount0,
"domainId": data.domainId,
"hitHour": data.hitHour,
"activeServiceRangeCount2": data.activeServiceRangeCount2,
"activeServiceRangeCount1": data.activeServiceRangeCount1,
"activeServiceRangeCount3": data.activeServiceRangeCount3,
"port": data.port,
"tps": data.tps,
"domainName": data.domainName,
"activeService": data.activeService,
"hitDay": data.hitDay
];
for (def temp: parsedData.entrySet()){
def key = temp.getKey();
def value = temp.getValue();
if(key != "domainName"){
String field_name = parsedData.domainName + "_" + key;
ctx[field_name] = value;
}
}
}
특정 문자열로 시작하는 조건문도 만들 수 있다.
if(ctx.body.startsWith('workType=sendRes')){
ctx.isTrue = true;
}
else{
ctx.isTrue = false;
}
** Set Processor
보통 failure handler에 사용되며 예외처리를 할 때 사용한다.

특정 프로세서가 제대로 동작하지 않을 때 어떤필드에 어떤값을 넣을건지 설정한다.
** Remove Processor
파싱한 데이터들 중 필요없는 필드를 제거할 때 사용한다.
Redact processor | Elasticsearch Guide [8.15] | Elastic
The Redact processor uses the Grok rules engine to obscure text in the input document matching the given Grok patterns. The processor can be used to obscure Personal Identifying Information (PII) by configuring it to detect known patterns such as email or
www.elastic.co
** Redact Processor
Elasticsearch의 Ingest Pipeline에 속하는 프로세서 중 하나로, 문서 내 민감한 정보(예: 개인정보)를 삭제하거나 대체할 때 사용됩니다. 예를 들어, 로그나 트랜잭션 데이터에 포함된 사용자 정보, 이메일, 전화번호 등 중요한 데이터를 노출시키지 않기 위해 사용됩니다.
이 프로세서는 정규식을 활용해 특정 패턴에 맞는 텍스트를 찾아 대체할 수 있으며, 이 과정에서 민감한 정보를 숨기거나 익명화하여 보안을 강화할 수 있습니다.
'DevOps > ELK' 카테고리의 다른 글
[Elastic Stack] Elasticsearch Command line tools (2) | 2024.03.19 |
---|---|
[Elastic Stack] Elasticsearch.yml 파일 세팅 (0) | 2024.03.18 |
[Elastic Stack] Fleet and Elastic Agent (0) | 2024.03.12 |
[Elastic Stack] Grok Pattern에 대해 (0) | 2024.03.08 |
[Elastic Stack] 검색어 자동완성 구현(Spring Data Elasticsearch) (0) | 2023.05.04 |