@ 가 붙는다1.1. @type
1.2. @id
in_monitor_agent uses this value forplugin_id field1.3. @label
1.4. @log_level
자세한 내용은 아래 참고
match , filter 태그에서 사용한다.2.1. *
a.* 의 경우 a.b 와 매치, a 또는 a.b.c 와는 매치 안됨2.2. ``**
a.** 의 경우 a, a.b, a.b.c 와 모두 매치2.3. 그외
그외 regex, 값 매치 등은 아래에서 확인
#{...} 표현식에 루비 코드를 넣을 수 있다.<match "app.#{ENV['FLUENTD_TAG']}">
@type stdout
</match>@type 으로 명시가 되야한다.# Receive events from 24224/tcp
# This is used by log forwarding and the fluent-cat command
<source>
@type forward
port 24224
</source>
# http://<ip>:9880/myapp.access?json={"event":"data"}
<source>
@type http
port 9880
</source># Receive events from 24224/tcp
# This is used by log forwarding and the fluent-cat command
<source>
@type forward
port 24224
</source>
# http://<ip>:9880/myapp.access?json={"event":"data"}
<source>
@type http
port 9880
</source>
# Match events tagged with "myapp.access" and
# store them to /var/log/fluent/access.%Y-%m-%d
# Of course, you can control how you partition your data
# with the time_slice_format option.
<match myapp.access>
@type file
path /var/log/fluent/access
</match># http://this.host:9880/myapp.access?json={"event":"data"}
<source>
@type http
port 9880
</source>
<filter myapp.access>
@type record_transformer
<record>
host_param "#{Socket.gethostname}"
</record>
</filter>
<match myapp.access>
@type file
path /var/log/fluent/access
</match>✔️ 위 예제 동작 플로우
{"event":"data","host_param":"webserver1"} 값이 ouput plugin 으로 보내져 파일로 출력하게 된다.전체 설정 내용
data:
fluent.conf: |
<system>
workers 1
@log_level info
</system>
@include input-kubernetes.conf
@include filter-kubernetes.conf
<match **>
@type copy
<store>
@type kafka2
@id out_kafka
brokers logis-kafka.dev.kakaoi.io:9092
max_send_retries 10
required_acks 1
default_topic xoauth-dev
exception_backup false
compression_codec gzip
<format>
@type json
</format>
<buffer topic,tag>
@type file
path /var/log/td-agent/buffer/td
flush_interval 5s
chunk_limit_size 16M
queue_limit_length 32
retry_forever true
</buffer>
</store>
</match>
input-kubernetes.conf: |
# Prevent fluentd from handling records containing its own logs. Otherwise
# it can lead to an infinite loop, when error in sending one message generates
# another message which also fails to be sent and so on.
<match fluent.**>
@type null
</match>
<source>
@type tail
@id in_tail_container_logs
path /var/log/containers/xoauth*.log
pos_file /var/log/fluentd-containers.log.pos
tag kubernetes.*
read_from_head "#{ENV['FLUENTD_INPUT_READ_FROM_HEAD'] || 'false'}"
limit_recently_modified "#{ENV['LIMIT_RECENTLY_MODIFIED'] || '120m'}"
rotate_wait 0
enable_stat_watcher false
<parse>
@type regexp
expression /^(?<time>[^ ]+) (?<stream>stdout|stderr) (?<flags>[^ ]+) (?<message>.*)$/
</parse>
</source>
filter-kubernetes.conf: |
<filter kubernetes.**>
@type kubernetes_metadata
@id filter_kube_metadata
</filter>
<filter kubernetes.**>
@type record_modifier
tag ${record.dig("kubernetes", "container_name").gsub('-', '.')}
<record>
@timestamp ${(Time.at(time) + (60*60*9)).strftime('%Y-%m-%dT%H:%M:%S.%L+09:00')}
cluster_name "xoauth-dev"
hostname "#{ENV['K8S_NODE_NAME']}"
container_image ${record.dig("kubernetes", "container_image")}
pod_name ${record.dig("kubernetes", "pod_name")}
</record>
</filter>
<filter kubernetes.**>
@type parser
key_name message
reserve_data true
remove_key_name_field true
<parse>
@type json
</parse>
</filter>
<filter kubernetes.**>
@type record_transformer
remove_keys $.kubernetes
</filter>input-kubernetes.conf: |
<match fluent.**>
@type null
</match>
<source>
@type tail
@id in_tail_container_logs
path /var/log/containers/xoauth*.log
pos_file /var/log/fluentd-containers.log.pos
tag kubernetes.*
read_from_head "#{ENV['FLUENTD_INPUT_READ_FROM_HEAD'] || 'false'}"
limit_recently_modified "#{ENV['LIMIT_RECENTLY_MODIFIED'] || '120m'}"
rotate_wait 0
enable_stat_watcher false
<parse>
@type regexp
expression /^(?<time>[^ ]+) (?<stream>stdout|stderr) (?<flags>[^ ]+) (?<message>.*)$/
</parse>
</source><source>
@type tail
@id in_tail_container_logs
path /var/log/containers/xoauth*.log
pos_file /var/log/fluentd-containers.log.pos
tag kubernetes.*
read_from_head "#{ENV['FLUENTD_INPUT_READ_FROM_HEAD'] || 'false'}"
limit_recently_modified "#{ENV['LIMIT_RECENTLY_MODIFIED'] || '120m'}"
rotate_wait 0
enable_stat_watcher false
<parse>
@type regexp
expression /^(?<time>[^ ]+) (?<stream>stdout|stderr) (?<flags>[^ ]+) (?<message>.*)$/
</parse>
</source>2022-08-23T16:20:02.313036458+09:00 stdout F {"@timestamp":"2022-08-23T07:20:02.312+00:00","@version":"1","message":"Running with Spring Boot v2.2.0.RELEASE, Spring v5.2.0.RELEASE","logger_name":"com.kakao.xoauth.XoauthApplication","thread_name":"main","level":"DEBUG","level_value":10000}<parse> 의 regexp 를 통해 아래와 같이 파싱이 된다.
<match fluent.**>
@type null
</match>✔️ 전체 코드
filter-kubernetes.conf: |
<filter kubernetes.**>
@type kubernetes_metadata
@id filter_kube_metadata
</filter>
<filter kubernetes.**>
@type record_modifier
tag ${record.dig("kubernetes", "container_name").gsub('-', '.')}
<record>
@timestamp ${(Time.at(time) + (60*60*9)).strftime('%Y-%m-%dT%H:%M:%S.%L+09:00')}
cluster_name "xoauth-dev"
hostname "#{ENV['K8S_NODE_NAME']}"
container_image ${record.dig("kubernetes", "container_image")}
pod_name ${record.dig("kubernetes", "pod_name")}
</record>
</filter>
<filter kubernetes.**>
@type parser
key_name message
reserve_data true
remove_key_name_field true
<parse>
@type json
</parse>
</filter>
<filter kubernetes.**>
@type record_transformer
remove_keys $.kubernetes
</filter>kubernetes_metadata 타입의 필터로 인해 해당 로그 레코드를 내보낸 컨테이너에 대한 기본 메타데이터(hostname, podname, containername 등) 가 해당 레코드에 포함된다.fluent-plugin-kubernetes_metadata_filter 를 설치해야 사용할 수 있다. (https://github.com/fabric8io/fluent-plugin-kubernetes_metadata_filter)<filter kubernetes.**>
@type kubernetes_metadata
@id filter_kube_metadata
</filter>✔️ fluent-plugin-kubernetesmetadatafilter 설치는 언제?
✔️ input/output 에 대한 예제
<record> 내에 정의 해주면 된다.fluent-plugin-record-modifier 플러그인을 설치해줘야 한다. (https://github.com/repeatedly/fluent-plugin-record-modifier)<filter kubernetes.**>
@type record_modifier
tag ${record.dig("kubernetes", "container_name").gsub('-', '.')}
<record>
@timestamp ${(Time.at(time) + (60*60*9)).strftime('%Y-%m-%dT%H:%M:%S.%L+09:00')}
cluster_name "xoauth-dev"
hostname "#{ENV['K8S_NODE_NAME']}"
container_image ${record.dig("kubernetes", "container_image")}
pod_name ${record.dig("kubernetes", "pod_name")}
</record>
</filter>✔️ fluent-plugin-record-modifier 설치는 언제?
✔️ record_transformer 와의 차이점은?
parser 타입의 필터는 key_name에 명시된 필드를 파싱하고 이벤트 레코드를 파싱된 결과로 변경한다.<source> 태그에서 message 키에 매핑된 application 로그를 파싱한다.<filter kubernetes.**>
@type parser
key_name message
reserve_data true
remove_key_name_field true
<parse>
@type json
</parse>
</filter>✔️ reserve_data
<filter foo.bar>
@type parser
key_name log
reserve_data true
<parse>
@type json
</parse>
</filter>✔️ removekeyname_field
<filter foo.bar>
@type parser
key_name log
reserve_data true
remove_key_name_field true
<parse>
@type json
</parse>
</filter><filter kubernetes.**>
@type record_transformer
remove_keys $.kubernetes
</filter><match **>
@type copy
<store>
@type kafka2
@id out_kafka
brokers logis-kafka.dev.kakaoi.io:9092
max_send_retries 10
required_acks 1
default_topic xoauth-dev
exception_backup false
compression_codec gzip
<format>
@type json
</format>
<buffer topic,tag>
@type file
path /var/log/td-agent/buffer/td
flush_interval 5s
chunk_limit_size 16M
queue_limit_length 32
retry_forever true
</buffer>
</store>
</match>