kafka-logger
The kafka-logger plugin pushes request and response logs as JSON objects to Apache Kafka clusters in batches and supports the customization of log formats.
Examples
The examples below demonstrate how you can configure kafka-logger plugin for different scenarios.
To follow along the examples, start a sample Kafka cluster:
- Docker
- Kubernetes
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.8.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
- kafka_net
notkafka:
image: confluentinc/cp-kafka:7.8.0
container_name: notkafka
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://notkafka:29092,PLAINTEXT_HOST://127.0.0.1:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
ports:
- "9092:9092"
networks:
- kafka_net
networks:
kafka_net:
driver: bridge
Start containers:
docker compose up -d
Create a Kubernetes manifest file for the Zookeeper and Kafka deployments:
apiVersion: apps/v1
kind: Deployment
metadata:
namespace: aic
name: zookeeper
spec:
replicas: 1
selector:
matchLabels:
app: zookeeper
template:
metadata:
labels:
app: zookeeper
spec:
containers:
- name: zookeeper
image: confluentinc/cp-zookeeper:7.8.0
env:
- name: ZOOKEEPER_CLIENT_PORT
value: "2181"
- name: ZOOKEEPER_TICK_TIME
value: "2000"
ports:
- containerPort: 2181
---
apiVersion: v1
kind: Service
metadata:
namespace: aic
name: zookeeper
spec:
selector:
app: zookeeper
ports:
- port: 2181
targetPort: 2181
type: ClusterIP
---
apiVersion: apps/v1
kind: Deployment
metadata:
namespace: aic
name: kafka-server
spec:
replicas: 1
selector:
matchLabels:
app: kafka-server
template:
metadata:
labels:
app: kafka-server
spec:
containers:
- name: kafka-server
image: confluentinc/cp-kafka:7.8.0
env:
- name: KAFKA_BROKER_ID
value: "1"
- name: KAFKA_ZOOKEEPER_CONNECT
value: "zookeeper:2181"
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: "PLAINTEXT:PLAINTEXT"
- name: KAFKA_ADVERTISED_LISTENERS
value: "PLAINTEXT://kafka-server.aic.svc:9092"
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "1"
- name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
value: "true"
ports:
- containerPort: 9092
---
apiVersion: v1
kind: Service
metadata:
namespace: aic
name: kafka-server
spec:
selector:
app: kafka-server
ports:
- port: 9092
targetPort: 9092
type: ClusterIP
Apply the manifests:
kubectl apply -f kafka-deployment.yaml
Wait for messages in the configured Kafka topic:
- Docker
- Kubernetes
docker exec -it notkafka kafka-console-consumer --bootstrap-server localhost:9092 --topic test2 --from-beginning
kubectl exec -n aic deploy/kafka-server -- kafka-console-consumer --bootstrap-server kafka-server.aic.svc:9092 --topic test2 --from-beginning
Open a new terminal session for the following steps working with APISIX.
Log in Different Meta Log Formats
The following example demonstrates how you can enable the kafka-logger plugin on a route, which logs client requests to the route and pushes logs to Kafka. You will also understand the differences between the default and origin meta log formats.
Create a route with kafka-logger as follows:
- Admin API
- ADC
- Ingress Controller
curl "http://127.0.0.1:9180/apisix/admin/routes" -X PUT \
-H "X-API-KEY: ${ADMIN_API_KEY}" \
-d '{
"id": "kafka-logger-route",
"uri": "/get",
"plugins": {
"kafka-logger": {
"meta_format": "default",
"brokers": [
{
"host": "notkafka",
"port": 29092
}
],
"kafka_topic": "test2",
"key": "key1",
"batch_max_size": 1
}
},
"upstream": {
"nodes": {
"httpbin.org:80": 1
},
"type": "roundrobin"
}
}'
services:
- name: httpbin
routes:
- uris:
- /get
name: kafka-logger-route
plugins:
kafka-logger:
meta_format: "default"
brokers:
- host: "notkafka"
port: 29092
kafka_topic: "test2"
key: "key1"
batch_max_size: 1
upstream:
type: roundrobin
nodes:
- host: httpbin.org
port: 80
weight: 1
Synchronize the configuration to the gateway:
adc sync -f adc.yaml
- Gateway API
- APISIX CRD
apiVersion: v1
kind: Service
metadata:
namespace: aic
name: httpbin-external-domain
spec:
type: ExternalName
externalName: httpbin.org
---
apiVersion: apisix.apache.org/v1alpha1
kind: PluginConfig
metadata:
namespace: aic
name: kafka-logger-plugin-config
spec:
plugins:
- name: kafka-logger
config:
meta_format: "default"
brokers:
- host: "kafka-server.aic.svc"
port: 9092
kafka_topic: "test2"
key: "key1"
batch_max_size: 1
---
apiVersion: gateway.networking.k8s.io/v1
kind: HTTPRoute
metadata:
namespace: aic
name: kafka-logger-route
spec:
parentRefs:
- name: apisix
rules:
- matches:
- path:
type: Exact
value: /get
filters:
- type: ExtensionRef
extensionRef:
group: apisix.apache.org
kind: PluginConfig
name: kafka-logger-plugin-config
backendRefs:
- name: httpbin-external-domain
port: 80
apiVersion: apisix.apache.org/v2
kind: ApisixUpstream
metadata:
namespace: aic
name: httpbin-external-domain
spec:
ingressClassName: apisix
externalNodes:
- type: Domain
name: httpbin.org
---
apiVersion: apisix.apache.org/v2
kind: ApisixRoute
metadata:
namespace: aic
name: kafka-logger-route
spec:
ingressClassName: apisix
http:
- name: kafka-logger-route
match:
paths:
- /get
methods:
- GET
upstreams:
- name: httpbin-external-domain
plugins:
- name: kafka-logger
config:
meta_format: "default"
brokers:
- host: "kafka-server.aic.svc"
port: 9092
kafka_topic: "test2"
key: "key1"
batch_max_size: 1
Apply the configuration:
kubectl apply -f kafka-logger-ic.yaml
❶ meta_format: set to the default log format.
❷ batch_max_size: set to 1 to send the log entry immediately.
Send a request to the route to generate a log entry:
curl -i "http://127.0.0.1:9080/get"
You should see an HTTP/1.1 200 OK response.
You should see a log entry in the Kafka topic similar to the following:
{
"latency": 411.00001335144,
"request": {
"querystring": {},
"headers": {
"host": "127.0.0.1:9080",
"user-agent": "curl/8.7.1",
"accept": "*/*",
"x-forwarded-proto": "http",
"x-forwarded-host": "127.0.0.1",
"x-forwarded-port": "9080"
},
"method": "GET",
"size": 83,
"uri": "/get",
"url": "http://127.0.0.1:9080/get"
},
"response": {
"headers": {
"content-length": "233",
"access-control-allow-credentials": "true",
"content-type": "application/json",
"connection": "close",
"access-control-allow-origin": "*",
"date": "Fri, 10 Nov 2023 06:02:44 GMT",
"server": "APISIX/3.16.0"
},
"status": 200,
"size": 475
},
"route_id": "kafka-logger-route",
"client_ip": "127.0.0.1",
"server": {
"hostname": "apisix",
"version": "3.16.0"
},
"apisix_latency": 18.00001335144,
"service_id": "",
"upstream_latency": 393,
"start_time": 1699596164550,
"upstream": "54.90.18.68:80"
}
Update the meta log format to origin:
- Admin API
- ADC
- Ingress Controller
curl "http://127.0.0.1:9180/apisix/admin/routes/kafka-logger-route" -X PATCH \
-H "X-API-KEY: ${ADMIN_API_KEY}" \
-d '{
"plugins": {
"kafka-logger": {
"meta_format": "origin"
}
}
}'
Update adc.yaml to set meta_format to origin:
services:
- name: httpbin
routes:
- uris:
- /get
name: kafka-logger-route
plugins:
kafka-logger:
meta_format: "origin"
brokers:
- host: "notkafka"
port: 29092
kafka_topic: "test2"
key: "key1"
batch_max_size: 1
upstream:
type: roundrobin
nodes:
- host: httpbin.org
port: 80
weight: 1
Synchronize the configuration to the gateway:
adc sync -f adc.yaml
- Gateway API
- APISIX CRD
Update kafka-logger-ic.yaml to set meta_format to origin:
apiVersion: apisix.apache.org/v1alpha1
kind: PluginConfig
metadata:
namespace: aic
name: kafka-logger-plugin-config
spec:
plugins:
- name: kafka-logger
config:
meta_format: "origin"
brokers:
- host: "kafka-server.aic.svc"
port: 9092
kafka_topic: "test2"
key: "key1"
batch_max_size: 1
Update kafka-logger-ic.yaml to set meta_format to origin:
apiVersion: apisix.apache.org/v2
kind: ApisixRoute
metadata:
namespace: aic
name: kafka-logger-route
spec:
ingressClassName: apisix
http:
- name: kafka-logger-route
match:
paths:
- /get
methods:
- GET
upstreams:
- name: httpbin-external-domain
plugins:
- name: kafka-logger
config:
meta_format: "origin"
brokers:
- host: "kafka-server.aic.svc"
port: 9092
kafka_topic: "test2"
key: "key1"
batch_max_size: 1
Apply the updated configuration:
kubectl apply -f kafka-logger-ic.yaml
Send a request to the route again to generate a new log entry:
curl -i "http://127.0.0.1:9080/get"
You should see an HTTP/1.1 200 OK response.
You should see a log entry in the Kafka topic similar to the following:
GET /get HTTP/1.1
x-forwarded-proto: http
x-forwarded-host: 127.0.0.1
user-agent: curl/8.7.1
x-forwarded-port: 9080
host: 127.0.0.1:9080
accept: */*
Log Request and Response Headers With Plugin Metadata
The following example demonstrates how you can customize log format using plugin metadata and built-in variables to log specific headers from request and response.
In APISIX, plugin metadata is used to configure the common metadata fields of all plugin instances of the same plugin. It is useful when a plugin is enabled across multiple resources and requires a universal update to their metadata fields.
First, create a route with kafka-logger as follows:
- Admin API
- ADC
- Ingress Controller
curl "http://127.0.0.1:9180/apisix/admin/routes" -X PUT \
-H "X-API-KEY: ${ADMIN_API_KEY}" \
-d '{
"id": "kafka-logger-route",
"uri": "/get",
"plugins": {
"kafka-logger": {
"meta_format": "default",
"brokers": [
{
"host": "notkafka",
"port": 29092
}
],
"kafka_topic": "test2",
"key": "key1",
"batch_max_size": 1
}
},
"upstream": {
"nodes": {
"httpbin.org:80": 1
},
"type": "roundrobin"
}
}'
services:
- name: httpbin
routes:
- uris:
- /get
name: kafka-logger-route
plugins:
kafka-logger:
meta_format: "default"
brokers:
- host: "notkafka"
port: 29092
kafka_topic: "test2"
key: "key1"
batch_max_size: 1
upstream:
type: roundrobin
nodes:
- host: httpbin.org
port: 80
weight: 1
Synchronize the configuration to the gateway:
adc sync -f adc.yaml
- Gateway API
- APISIX CRD
apiVersion: v1
kind: Service
metadata:
namespace: aic
name: httpbin-external-domain
spec:
type: ExternalName
externalName: httpbin.org
---
apiVersion: apisix.apache.org/v1alpha1
kind: PluginConfig
metadata:
namespace: aic
name: kafka-logger-plugin-config
spec:
plugins:
- name: kafka-logger
config:
meta_format: "default"
brokers:
- host: "kafka-server.aic.svc"
port: 9092
kafka_topic: "test2"
key: "key1"
batch_max_size: 1
---
apiVersion: gateway.networking.k8s.io/v1
kind: HTTPRoute
metadata:
namespace: aic
name: kafka-logger-route
spec:
parentRefs:
- name: apisix
rules:
- matches:
- path:
type: Exact
value: /get
filters:
- type: ExtensionRef
extensionRef:
group: apisix.apache.org
kind: PluginConfig
name: kafka-logger-plugin-config
backendRefs:
- name: httpbin-external-domain
port: 80
apiVersion: apisix.apache.org/v2
kind: ApisixUpstream
metadata:
namespace: aic
name: httpbin-external-domain
spec:
ingressClassName: apisix
externalNodes:
- type: Domain
name: httpbin.org
---
apiVersion: apisix.apache.org/v2
kind: ApisixRoute
metadata:
namespace: aic
name: kafka-logger-route
spec:
ingressClassName: apisix
http:
- name: kafka-logger-route
match:
paths:
- /get
methods:
- GET
upstreams:
- name: httpbin-external-domain
plugins:
- name: kafka-logger
config:
meta_format: "default"
brokers:
- host: "kafka-server.aic.svc"
port: 9092
kafka_topic: "test2"
key: "key1"
batch_max_size: 1
Apply the configuration:
kubectl apply -f kafka-logger-ic.yaml
❶ meta_format: set to the default log format. It is important to note that this is mandatory if you would like to customize log format with plugin metadata. If meta_format is set to origin, the log entries will remain in origin format.
❷ batch_max_size: set to 1 to send the log entry immediately.
Next, configure the plugin metadata for kafka-logger:
- Admin API
- ADC
- Ingress Controller
curl "http://127.0.0.1:9180/apisix/admin/plugin_metadata/kafka-logger" -X PUT \
-H "X-API-KEY: ${ADMIN_API_KEY}" \
-d '{
"log_format": {
"host": "$host",
"@timestamp": "$time_iso8601",
"client_ip": "$remote_addr",
"env": "$http_env",
"resp_content_type": "$sent_http_Content_Type"
}
}'
plugin_metadata:
- name: kafka-logger
log_format:
host: "$host"
"@timestamp": "$time_iso8601"
client_ip: "$remote_addr"
env: "$http_env"
resp_content_type: "$sent_http_Content_Type"
Synchronize the configuration to the gateway:
adc sync -f adc.yaml
apiVersion: apisix.apache.org/v1alpha1
kind: GatewayProxy
metadata:
namespace: aic
name: apisix-config
spec:
provider:
type: ControlPlane
controlPlane:
# ...
# your control plane connection configuration
pluginMetadata:
kafka-logger:
log_format:
host: "$host"
"@timestamp": "$time_iso8601"
client_ip: "$remote_addr"
env: "$http_env"
resp_content_type: "$sent_http_Content_Type"
Apply the configuration:
kubectl apply -f gatewayproxy.yaml
❶ log the custom request header env.
❷ log the response header Content-Type.
Send a request to the route with the env header:
curl -i "http://127.0.0.1:9080/get" -H "env: dev"
You should see a log entry in the Kafka topic similar to the following:
{
"@timestamp": "2023-11-10T23:09:04+00:00",
"host": "127.0.0.1",
"client_ip": "127.0.0.1",
"route_id": "kafka-logger-route",
"env": "dev",
"resp_content_type":"application/json"
}
Log Request Bodies Conditionally
The following example demonstrates how you can conditionally log request body.
Create a route with kafka-logger as follows:
- Admin API
- ADC
- Ingress Controller
curl "http://127.0.0.1:9180/apisix/admin/routes" -X PUT \
-H "X-API-KEY: ${ADMIN_API_KEY}" \
-d '{
"id": "kafka-logger-route",
"uri": "/post",
"plugins": {
"kafka-logger": {
"brokers": [
{
"host": "notkafka",
"port": 29092
}
],
"kafka_topic": "test2",
"key": "key1",
"batch_max_size": 1,
"include_req_body": true,
"include_req_body_expr": [["arg_log_body", "==", "yes"]]
}
},
"upstream": {
"nodes": {
"httpbin.org:80": 1
},
"type": "roundrobin"
}
}'
services:
- name: httpbin
routes:
- uris:
- /post
name: kafka-logger-route
plugins:
kafka-logger:
brokers:
- host: "notkafka"
port: 29092
kafka_topic: "test2"
key: "key1"
batch_max_size: 1
include_req_body: true
include_req_body_expr:
- - "arg_log_body"
- "=="
- "yes"
upstream:
type: roundrobin
nodes:
- host: httpbin.org
port: 80
weight: 1
Synchronize the configuration to the gateway:
adc sync -f adc.yaml
- Gateway API
- APISIX CRD
apiVersion: v1
kind: Service
metadata:
namespace: aic
name: httpbin-external-domain
spec:
type: ExternalName
externalName: httpbin.org
---
apiVersion: apisix.apache.org/v1alpha1
kind: PluginConfig
metadata:
namespace: aic
name: kafka-logger-plugin-config
spec:
plugins:
- name: kafka-logger
config:
brokers:
- host: "kafka-server.aic.svc"
port: 9092
kafka_topic: "test2"
key: "key1"
batch_max_size: 1
include_req_body: true
include_req_body_expr:
- - "arg_log_body"
- "=="
- "yes"
---
apiVersion: gateway.networking.k8s.io/v1
kind: HTTPRoute
metadata:
namespace: aic
name: kafka-logger-route
spec:
parentRefs:
- name: apisix
rules:
- matches:
- path:
type: Exact
value: /post
filters:
- type: ExtensionRef
extensionRef:
group: apisix.apache.org
kind: PluginConfig
name: kafka-logger-plugin-config
backendRefs:
- name: httpbin-external-domain
port: 80
apiVersion: apisix.apache.org/v2
kind: ApisixUpstream
metadata:
namespace: aic
name: httpbin-external-domain
spec:
ingressClassName: apisix
externalNodes:
- type: Domain
name: httpbin.org
---
apiVersion: apisix.apache.org/v2
kind: ApisixRoute
metadata:
namespace: aic
name: kafka-logger-route
spec:
ingressClassName: apisix
http:
- name: kafka-logger-route
match:
paths:
- /post
methods:
- POST
upstreams:
- name: httpbin-external-domain
plugins:
- name: kafka-logger
config:
brokers:
- host: "kafka-server.aic.svc"
port: 9092
kafka_topic: "test2"
key: "key1"
batch_max_size: 1
include_req_body: true
include_req_body_expr:
- - "arg_log_body"
- "=="
- "yes"
Apply the configuration:
kubectl apply -f kafka-logger-ic.yaml
❶ include_req_body: set to true to include request body.
❷ include_req_body_expr: only include request body if the URL query string log_body is yes.
Send a request to the route with a URL query string satisfying the condition:
curl -i "http://127.0.0.1:9080/post?log_body=yes" -X POST -d '{"env": "dev"}'
You should see the request body logged:
{
...,
"method": "POST",
"body": "{\"env\": \"dev\"}",
"size": 179
}
}
Send a request to the route without any URL query string:
curl -i "http://127.0.0.1:9080/post" -X POST -d '{"env": "dev"}'
You should not observe the request body in the log.
If you have customized the log_format in addition to setting include_req_body or include_resp_body to true, the plugin would not include the bodies in the logs.
As a workaround, you may be able to use the NGINX variable $request_body in the log format, such as:
{
"kafka-logger": {
...,
"log_format": {"body": "$request_body"}
}
}