johnpoint

johnpoint

(。・∀・)ノ゙嗨
github

Collect logs from Tencent Cloud TKE using Logstash

Prerequisites#

It's been a while since I last updated my blog, and recently I've been experimenting with Logstash, so I thought I'd document it.

Why use Logstash? It's mainly because the log collection for Tencent Cloud TKE is not enabled in the testing environment, which makes troubleshooting quite painful. Since I had some free time, I thought about extracting the logs and putting them into ES for easier future troubleshooting. I noticed that Tencent Cloud's logging rules allow collecting pod stdout logs and delivering them to Kafka, so I gave it a try.

Deploying Logstash#

I chose to use Docker Compose for a quick deployment of Logstash.

Here’s the deployment process, referenced from the deviantony/docker-elk project.

Create Directories#

mkdir logstash/config logstash/pipeline -p

Create Environment Variables#

Path: .env

ELASTIC_VERSION=8.7.1
LOGSTASH_INTERNAL_PASSWORD='changeme'

Create Dockerfile#

Path: logstash/Dockerfile

ARG ELASTIC_VERSION

# https://www.docker.elastic.co/
FROM docker.elastic.co/logstash/logstash:${ELASTIC_VERSION}

Configuration File#

Path: logstash/config/logstash.yml

---
## Default Logstash configuration from Logstash base image.
## https://github.com/elastic/logstash/blob/main/docker/data/logstash/config/logstash-full.yml
#
http.host: 0.0.0.0

node.name: logstash

Path: logstash/pipeline/logstash.conf

input {
	beats {
		port => 5044
	}

	tcp {
		port => 50000
	}
}

## Add your filters / logstash plugins configuration here

output {
	elasticsearch {
		hosts => "elasticsearch:9200"
		user => "logstash_internal"
		password => "${LOGSTASH_INTERNAL_PASSWORD}"
		index => "logstash-%{+YYYY-MM-dd}"
	}
}

Start the Service#

version: '3.7'

services:
  logstash:
    build:
      context: logstash/
      args:
        ELASTIC_VERSION: ${ELASTIC_VERSION}
    volumes:
      - ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro,Z
      - ./logstash/pipeline:/usr/share/logstash/pipeline:ro,Z
    ports:
      - 5044:5044
      - 50000:50000/tcp
      - 50000:50000/udp
      - 9600:9600
    environment:
      LS_JAVA_OPTS: -Xms256m -Xmx256m
      LOGSTASH_INTERNAL_PASSWORD: ${LOGSTASH_INTERNAL_PASSWORD:-}
    depends_on:
      - elasticsearch
    restart: unless-stopped

Configure Logstash Pipeline#

Configure Input#

Since the logs need to be read from Kafka, a new data source must be declared in the input block.

input {
	beats {
		port => 5044
	}

	tcp {
		port => 50000
	}

	kafka {
		bootstrap_servers => "kafka address"
		client_id => "test_logstash"
		group_id => "test_logstash"
		auto_offset_reset => "latest" 
		consumer_threads => 1
		decorate_events => true 
		topics => ["tencent-tke-log"]
		type => "bhy"
	}
}

Configure Filters#

Since the data delivered to Kafka by Tencent Cloud is not directly usable, the data example is as follows:

{"@timestamp":1684203000.007603,"@filepath":"/var/log/tke-log-agent/log/stdout-containerd/xxxxxxx.log","log":"2023-05-16T10:10:00.002817673+08:00 stdout F {\"level\":\"debug\",\"time\":\"2023-05-16T10:10:00.002+0800\",\"msg\":\"xxxxxxxx\"}","kubernetes":{"pod_name":"xxxxx","namespace_name":"default","pod_id":"xxxxxx","labels":{"k8s-app":"xxxxx","pod-template-hash":"xxxxx","qcloud-app":"xxxxxxxx"},"annotations":{"qcloud-redeploy-timestamp":"1681975158658","tke.cloud.tencent.com/networks-status":"json"},"host":"xxxxx","container_name":"xxxx","docker_id":"xxxxx","container_hash":"xxxxxx","container_image":"xxxxxxxx"}}

At this point, data preprocessing is necessary because many data points do not need to be added to ES. The logstags filter is used for processing.

Save Original Data#

This filter adds a field called source and stores all Kafka messages in it.

	mutate {
		add_field => { "source" => "%{[message]}" }
  	}

Parse JSON#

Using the JSON plugin, the JSON string in the message field is serialized, and the serialized data is stored in the kafkajson field, while the message field is deleted.

	json {
		source => "message"
		remove_field => [ 
			"message"
		]
		target => "kafkajson"
	}

Extract Log Metadata#

Extract some location data from the logs to the outer layer.

	mutate {
    	rename => { "[kafkajson][kubernetes][namespace_name]" => "namespace" }
    	rename => { "[kafkajson][kubernetes][pod_name]" => "podname" }
    	rename => { "[kafkajson][kubernetes][labels][k8s-app]" => "k8s-app" }
    	rename => { "[kafkajson][kubernetes][container_image]" => "container-image" }
  	}

Use Regular Expressions to Process Data#

The data provided by Tencent Cloud adds a timestamp and "stdout F" before the service's own printed logs, so regular expressions are used to replace unnecessary parts.

2023-05-16T10:10:00.002817673+08:00 stdout F {\"level\":\"debug\",\"time\":\"2023-05-16T10:10:00.002+0800\",\"msg\":\"xxxxx\"}
	mutate {
		gsub => [ "[kafkajson][log]",".+stdout\sF\s","" ]
	}

Extract Service Printed Logs#

Finally, the logs can be extracted.

	json {
		source => "[kafkajson][log]"
		remove_field => [ 
			"kafkajson"
		]
		target => "data"
	}

Extract Log Level#

The last step is to extract the log level field to the outer layer for easier use.

	mutate {
    	rename => { "[data][level]" => "level" }
  	}

Complete Pipeline#

input {
	beats {
		port => 5044
	}

	tcp {
		port => 50000
	}

	kafka{
		bootstrap_servers => "kafka address"
		client_id => "test_logstash"
		group_id => "test_logstash"
		auto_offset_reset => "latest" 
		consumer_threads => 1
		decorate_events => true 
		topics => ["tencent-tke-log"]
		type => "bhy"
	}
}

filter {
	mutate {
		add_field => { "source" => "%{[message]}" }
  	}
	json {
		source => "message"
		remove_field => [ 
			"message"
		]
		target => "kafkajson"
	}
	mutate {
    	rename => { "[kafkajson][kubernetes][namespace_name]" => "namespace" }
    	rename => { "[kafkajson][kubernetes][pod_name]" => "podname" }
    	rename => { "[kafkajson][kubernetes][labels][k8s-app]" => "k8s-app" }
    	rename => { "[kafkajson][kubernetes][container_image]" => "container-image" }
  	}
	mutate {
		gsub => [ "[kafkajson][log]",".+stdout\sF\s","" ]
	}
	json {
		source => "[kafkajson][log]"
		remove_field => [ 
			"kafkajson"
		]
		target => "data"
	}
	mutate {
    	rename => { "[data][level]" => "level" }
  	}
}

## Add your filters / logstash plugins configuration here

output {
	elasticsearch {
		hosts => "elasticsearch:9200"
		user => "logstash_internal"
		password => "${LOGSTASH_INTERNAL_PASSWORD}"
		index => "logstash-%{+YYYY-MM-dd}"
	}
}

Special Thanks#

During the configuration of Logstash, I received significant help from my self-built Telegram bot, which uses the OpenAPI API and is based on the GPT-3.5 language model.

Here is a conversation I had with it (the conversation content has been processed):

johnpoint, [5/15/23 2:18 PM]
I want to extract JSON data from 2023-05-15T13:51:00.004687384+08:00 stdout F {"level":"debug","time":"2023-05-15T13:51:00.002+0800","msg":"xxxxxxxx"} using Logstash. How should the Logstash pipeline be written?

openAI, [5/15/23 2:18 PM]
Assuming your log file outputs in JSON format, you need to configure a JSON parser to parse the JSON data in the logs and convert it into usable events. Here is a simple Logstash configuration file:

```
input {
  file {
    path => "/path/to/your/logfile"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}

filter {
  json {
    source => "message"
  }
}

output {
  stdout {
      codec => rubydebug
  }
}
```

In the input section of Logstash, we specify the path of the file to read and start reading data from the beginning of the file.
In the filter section, we use the `json` plugin to extract the JSON object from the logs.
Finally, we use `rubydebug` in the output section for output.

Run this Logstash configuration command:

```
bin/logstash -f /path/to/your/config/file
```

This way, Logstash will read data from the specified file and output the extracted JSON objects to the console.

johnpoint, [5/15/23 2:19 PM]
This data has JSON format data, but there is some dirty data before the JSON format data. Can it be removed?

openAI, [5/15/23 2:19 PM]
Yes, when analyzing data with Logstash, you can use the grok plugin to match the required data using regular expressions.

Here is a possible grok expression that fits the data you provided:

```
%{TIMESTAMP_ISO8601:timestamp} stdout F %{GREEDYDATA:json_data}
```

This regular expression means to match a timestamp represented in ISO8601 format (like "2023-05-15T13:51:00.004687384+08:00") and content ending with the string "stdout F", followed by a JSON formatted string. "%{GREEDYDATA:json_data}" indicates that this string will be saved into a field named "json_data".

The above grok expression can be used in the Logstash configuration file, for example:

```
input {
  # Configure the source from which Logstash receives data
}

filter {
  grok {
    match => {
      "message" => "%{TIMESTAMP_ISO8601:timestamp} stdout F %{GREEDYDATA:json_data}"
    }
  }
  # Other filters
}

output {
  # Configure the target to which Logstash outputs data
}
```

In this configuration file, the grok plugin under the filter section matches the timestamp and json_data fields. Thus, Logstash can use these data for further operations, such as outputting to Elasticsearch or stdout, achieving the desired result.

Where `%{TIMESTAMP_ISO8601:timestamp}` indicates matching the timestamp in ISO8601 format and saving it into a field named "timestamp". `%{GREEDYDATA:json_data}` indicates matching any characters, which can match the additional content at the end of the line that does not conform to JSON format. Since grok will keep matching until the end of the line, the last field (i.e., json_data) will include all content contained within.

johnpoint, [5/15/23 2:36 PM]
What does the error "Can't get text on a START_OBJECT at 1:8" mean?

openAI, [5/15/23 2:36 PM]
This error is common when trying to parse a JSON object as a string, but what you actually have is a JSON object, not a string. This means that the beginning of the object is a "{" instead of a "\". You can check the function or method that calls the code to ensure that the string being parsed is indeed a string. Additionally, check the source code of the JSON object to ensure it is formatted as expected.

johnpoint, [5/15/23 2:53 PM]
[2023-05-15T06:52:48,511][WARN ][logstash.filters.json    ][main][xxxxxx] Error parsing json {:source=>"json_data", :raw=>"{\\\"level\\\":\\\"debug\\\",\\\"time\\\":\\\"2023-05-15T14:52:47.664+0800\\\",\\\"msg\\\":\\\"xxxxxxxx\\\"}\", \"@timestamp\"=>0.1684133567674155e10}", :exception=>#<LogStash::Json::ParserError: Unexpected character ('\' (code 92)): was expecting double-quote to start field name
What does this log indicate about Logstash?

openAI, [5/15/23 2:53 PM]
This log indicates that Logstash encountered an encoding issue while parsing the input data json_data, where the input data contains characters like `\'` that do not conform to JSON standards, causing the JSON parser to fail to parse correctly. Specifically, the input data may have used backslashes to escape quotes but did not properly escape the backslashes themselves. You can try preprocessing the input data to escape or remove any characters that do not conform to JSON standards so that Logstash can parse the data correctly.

johnpoint, [5/15/23 2:53 PM]
Can you provide the corresponding pipeline configuration?

openAI, [5/15/23 2:53 PM]
Sure, here is a simple Logstash Pipeline configuration to extract the `json_data` field from the input and filter out any invalid data:

```conf
input {
  # Configure the input plugin here
  stdin {}
}

filter {
  # Extract JSON data from the json_data field
  json {
    source => "json_data"
    # Used to ignore incomplete JSON data strings
    skip_on_invalid_json => true
  }

  # Filter out events without valid JSON data
  if "_jsonparsefailure" in [tags] {
    drop {}
  }
}

output {
  # Configure the output plugin here
  stdout {}
}
```

You need to modify and adjust according to your needs, such as changing `json_data` to the actual field name or using suitable input and output plugins.

johnpoint, [5/15/23 2:54 PM]
The JSON data extracted using grok has unnecessary escaped backslashes.

openAI, [5/15/23 2:54 PM]
Yes, this is because the Grok pattern matching treats backslashes as escape characters, so if the log already has backslashes, they will be escaped again. To resolve this issue, you can use the gsub filter in the configuration file to remove the extra backslashes. Here’s an example configuration:

```
input {
  stdin { }
}

filter {
  grok {
    match => { "message" => '^\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{WORD:log_level}\] %{GREEDYDATA:json_data}$' }
  }
  mutate {
    gsub => ["json_data", "\\\\", "\\"]
  }
  json {
    source => "json_data"
  }
}

output {
  stdout { codec => rubydebug }
}
```

In this configuration, we first use Grok to parse the JSON data in the logs and store it in the `json_data` field. Then we use the `mutate` filter's `gsub` to replace the extra backslashes, and finally, we use the `json` filter to convert it into JSON formatted events. This way, we can avoid unnecessary extra backslashes.

johnpoint, [5/15/23 3:00 PM]
An error occurred: Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"{\", \",\", \"]\" at line 36, column 34 (byte 672) after filter {\n\tjson {\n\t\tsource => \"message\"\n\t\tremove_field => [ \n\t\t\t\"message\"\n\t\t]\n\t\ttarget => \"kafkajson\"\n\t}\n\tgrok {\n\t\tmatch => {\"kafkajson\" => \"%{TIMESTAMP_ISO8601:timestamp} stdout F %{GREEDYDATA:json_data}\"}\n\t}\n\tmutate {\n\t\tgsub => [\"json_data\", \"\\\\\\\\\", \"", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in `compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:187:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:72:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:47:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:52:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:383:in `block in converge_state'"]}

openAI, [5/15/23 3:00 PM]
This error indicates that in the Logstash configuration file, at line 36, column 34 (672 bytes), it expected to see one of the characters [ \t\r\n], "#", "{", ",", "]", but encountered a character that could not be parsed as expected. This may be due to a syntax error or an incorrect expression. Please check your configuration file and ensure that the syntax is correct and resolve any expression errors.

Additionally, since the error message does not provide enough context, you may also need to check other parts of the configuration file to determine if there are other errors.

johnpoint, [5/15/23 3:15 PM]
Can Logstash move data from one field to another?

openAI, [5/15/23 3:15 PM]
Yes, you can use the mutate plugin in Logstash's filter plugins to achieve this with the rename option.

For example, if the source field name is `source_field` and you want to copy it to the target field `target_field`, you can use the following configuration:

```
filter {
  mutate {
    rename => { "source_field" => "target_field" }
  }
}
```

This will copy the value of `source_field` to the new field `target_field` and remove the `source_field` itself.

johnpoint, [5/15/23 3:19 PM]
What if I want to keep this field?

openAI, [5/15/23 3:19 PM]
You can use the add_field option of the mutate plugin to create a new field in the event.

For example, if you want to create a new field "new_field" in the Logstash event and move its value from the original field "old_field":

```
filter {
   mutate {
     add_field => { "new_field" => "%{[old_field]}" }
     remove_field => [ "old_field" ]
   }
}
```

This will assign the value from "old_field" to the new "new_field" field and remove the "old_field" field. If you want to keep the source field while moving the value, you can remove the remove_field option.

johnpoint, [5/15/23 3:21 PM]
Logstash automatically creates keyword fields for deletion.

openAI, [5/15/23 3:21 PM]
Logstash will create a `keyword` type subfield for each string type field by default, serving as the exact value of the original field. If you do not need this subfield, you can remove it in the index template by setting `"include_in_all": false` in the `mappings`. However, be aware that using this method to delete the `keyword` field will result in losing some precise query capabilities.

If you want to keep the `keyword` field in the log stream, you can use the remove_field filter to delete the original field but keep the `keyword` field:

```ruby
filter {
  mutate {
    remove_field => ["my_field"]
    rename => { "my_field.keyword" => "new_field" }
  }
}
```

In the above example, Logstash will delete the original `my_field` field and keep the exact value in the `my_field.keyword` subfield, finally using the rename filter to rename `my_field.keyword` to `new_field`. This way, you can use `new_field` for precise queries in Elasticsearch.

johnpoint, [5/15/23 4:18 PM]
Why does the ES index created by Logstash not have a daily rolling name?

openAI, [5/15/23 4:18 PM]
Logstash creates ES indexes in a Daily Index mode by default, with one index per day and a date suffix. For example, an index named `logstash` will be transformed into `logstash-yyyy.mm.dd`, and when writing data, it will determine the corresponding date based on UTC time. If there is no rolling, one possibility is that the configuration file has not been enabled, and another possibility is that the Elasticsearch cluster configuration file has settings that prohibit creating new indexes. You can check the relevant settings in the Elasticsearch configuration file.

johnpoint, [5/15/23 4:19 PM]
How to enable Daily Index mode in Logstash?

openAI, [5/15/23 4:19 PM]
To enable Daily Index mode, you need to modify the Logstash configuration file. In the `output` configuration, you can use the `index` configuration item to specify the name of the Elasticsearch index. To make the index name roll daily, you can use a date format placeholder, for example:

```
output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "your-index-name-%{+YYYY.MM.dd}"
  }
}
```

In the above configuration, the index name format is `your-index-name-YYYY.MM.dd`, where `%{+YYYY.MM.dd}` is a date format placeholder representing the current date. With this configuration, a new index will be automatically created every day.

Also, note that to enable Date Index mode, you need to set `action.auto_create_index` to `true` in the Elasticsearch configuration file.

Reference Documents#

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.