Kafka is a distributed message queue with high throughput, high reliability, and high scalability, while GreptimeDB is an open-source time-series database designed for storing time-series data. Both excel in their respective fields, but how can they be efficiently connected to achieve seamless data transmission and processing?
Vector, a high-speed and scalable data pipeline tool, plays a role here. It can collect, transform, and transmit data from multiple sources (such as application logs, system metrics) and send this data to different destinations (such as databases, monitoring systems).
With GreptimeDB now fully supporting log data storage and analysis, the log reception function greptime log sink has been integrated into Vector, allowing users to easily write various data sources from Vector into GreptimeDB through the greptime_logs
sink. For details and sample code, refer to this article: Vector now Support GreptimeDB Log Sink, Enabling Seamless Data Pipelines from Multiple Sources.
In this article, we will show you how to use Vector to read log data from Kafka and write it into GreptimeDB, including specific implementation steps and sample code.
Preparation
Assume we already have a Kafka cluster with a topic named test_topic
that stores log data. The sample data content in Kafka is as follows:
127.0.0.1 - - [04/Sep/2024:15:46:13 -0700] "GET / HTTP/1.1" 200 615 "-" "Mozilla/5.0 (X11; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0"
Next, we need to install Vector and GreptimeDB.
Install and Configure Vector
Vector is an open-source data collection tool that supports reading data from various data sources and writing data to various destinations. We can use Vector to read data from Kafka and write it into GreptimeDB.
Installing Vector is very simple and can be done through binary containers, etc. For specific installation steps, please refer to official Vector documentation.
After installation, we need to configure Vector to read data from Kafka and write it into GreptimeDB. Below is a simple Vector configuration file:
[sources.mq]
type = "kafka"
group_id = "vector0"
topics = ["test_topic"]
bootstrap_servers = "kafka:9092"
[sinks.console]
type = "console"
inputs = [ "mq" ]
encoding.codec = "text"
[sinks.sink_greptime_logs]
type = "greptimedb_logs"
table = "demo_logs"
pipeline_name = "demo_pipeline"
compression = "gzip"
inputs = [ "mq" ]
endpoint = "http://greptimedb:4000"
In the above configuration file, we defined a source named mq
to read data from Kafka. We also defined a sink named sink_greptime_logs
to write data into GreptimeDB.
Install & Configure GreptimeDB
GreptimeDB is an open-source time-series database designed for storing time-series data. We can use GreptimeDB to store log data read from Kafka.
Installing GreptimeDB is also very straightforward and can be done through binary, containers, etc. For specific installation steps, please refer to the official GreptimeDB documentation.
After installation, use the default configuration. Since log data is diverse, we provide a Pipeline function to process and filter log data, retaining only the data we care about in the logs. We will share the implementation principles and steps of the Pipeline engine in subsequent technical blogs, so stay tuned.
The following example parses the nginx log format we provide, using the Pipeline configuration file shown below.
processors:
- dissect:
fields:
- message
patterns:
- '%{ip} - - [%{datetime}] "%{method} %{path} %{protocol}" %{status} %{size} "-" "%{user_agent}"'
- date:
fields:
- datetime
formats:
- "%d/%b/%Y:%H:%M:%S %z"
- date:
fields:
- timestamp
formats:
- "%Y-%m-%dT%H:%M:%S%.3fZ"
transform:
- fields:
- ip
- path
type: string
- fields:
- method
- protocol
type: string
index: tag
- fields:
- user_agent
type: string
index: fulltext
- fields:
- status
type: uint32
index: tag
- fields:
- size
type: uint32
- fields:
- datetime
type: timestamp
index: timestamp
- fields:
- timestamp
type: timestamp
In the above Pipeline configuration file, we use the dissect
processor to parse the log data. The originally unstructured log data is split and formatted to obtain structured data containing ip
, datetime
, method
, path
, protocol
, status
, size
, and user_agent
. Then, we use the date
processor to parse the time fields in two different formats. Finally, we use transform
to convert the fields and set the index.
For the index
, we designated method
, protocol
, and status
as tag fields, primarily to enable efficient querying. Fields with uncertain or high volume of values are not recommended to be set as tags, as this can lead to a high cardinality issue. Therefore, neither ip
nor size
has been set as a tag field.
We added a full-text index for the path
and user_agent
fields to facilitate fuzzy searching for relevant content. For detailed query syntax, please refer to this link.
The above configuration file can be uploaded to GreptimeDB through the HTTP interface to create a Pipeline named demo_pipeline
for log parsing and trimming, and then stored in GreptimeDB.
curl -X 'POST' 'http://greptimedb:4000/v1/events/pipelines/demo_pipeline' -F 'file=@/config_data/pipeline.yaml' -v
Run Vector & GreptimeDB
Now you are ready to run Vector and GreptimeDB. Once successful, Vector will read data from Kafka and write it into GreptimeDB.
You can connect to GreptimeDB through the MySQL protocol to view the data.
mysql> show tables;
+-------------+
| Tables |
+-------------+
| demo_logs |
| numbers |
+-------------+
3 rows in set (0.00 sec)
mysql> select * from demo_logs order by timestamp desc limit 10;
+-----------------+--------+----------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+
| ip | method | protocol | path | user_agent | status | size | datetime | timestamp |
+-----------------+--------+----------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+
| 37.254.223.207 | DELETE | HTTP/2.0 | /about | Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; The World) | 201 | 495 | 2024-10-28 03:39:29 | 2024-10-28 03:39:29.982000 |
| 113.26.47.170 | PUT | HTTP/2.0 | /contact | Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; LBBROWSER) | 404 | 183 | 2024-10-28 03:39:26 | 2024-10-28 03:39:26.977000 |
| 33.80.49.13 | PUT | HTTP/2.0 | /about | Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11 | 500 | 150 | 2024-10-28 03:39:23 | 2024-10-28 03:39:23.973000 |
| 240.14.156.37 | DELETE | HTTP/1.1 | /contact | Mozilla/5.0 (Linux; U; Android 3.0; en-us; Xoom Build/HRI39) AppleWebKit/534.13 (KHTML, like Gecko) Version/4.0 Safari/534.13 | 200 | 155 | 2024-10-28 03:39:20 | 2024-10-28 03:39:20.969000 |
| 210.90.39.41 | POST | HTTP/2.0 | /about | Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; LBBROWSER) | 201 | 188 | 2024-10-28 03:39:17 | 2024-10-28 03:39:17.964000 |
| 219.88.194.150 | DELETE | HTTP/1.1 | /contact | Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.0; Trident/4.0) | 404 | 704 | 2024-10-28 03:39:14 | 2024-10-28 03:39:14.963000 |
| 130.255.0.241 | DELETE | HTTP/1.1 | /contact | Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1) | 500 | 816 | 2024-10-28 03:39:11 | 2024-10-28 03:39:11.959000 |
| 168.144.155.215 | POST | HTTP/1.1 | / | Mozilla/5.0 (iPhone; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5 | 500 | 511 | 2024-10-28 03:39:08 | 2024-10-28 03:39:08.954000 |
| 28.112.30.158 | GET | HTTP/1.1 | /about | Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; en) Opera 9.50 | 200 | 842 | 2024-10-28 03:39:05 | 2024-10-28 03:39:05.950000 |
| 166.9.187.104 | GET | HTTP/2.0 | /blog | Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.71 Safari/537.36 | 201 | 970 | 2024-10-28 03:39:02 | 2024-10-28 03:39:02.946000 |
+-----------------+--------+----------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+
10 rows in set (0.00 sec)
mysql> desc demo_logs;
+------------+---------------------+------+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+------------+---------------------+------+------+---------+---------------+
| ip | String | | YES | | FIELD |
| method | String | PRI | YES | | TAG |
| protocol | String | PRI | YES | | TAG |
| path | String | | YES | | FIELD |
| user_agent | String | | YES | | FIELD |
| status | UInt32 | PRI | YES | | TAG |
| size | UInt32 | | YES | | FIELD |
| datetime | TimestampNanosecond | PRI | NO | | TIMESTAMP |
| timestamp | TimestampNanosecond | | YES | | FIELD |
+------------+---------------------+------+------+---------+---------------+
9 rows in set (0.00 sec)
mysql> show create table demo_logs;
+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table |
+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| demo_logs | CREATE TABLE IF NOT EXISTS `demo_logs` (
`ip` STRING NULL,
`method` STRING NULL,
`protocol` STRING NULL,
`path` STRING NULL FULLTEXT WITH(analyzer = 'English', case_sensitive = 'false'),
`user_agent` STRING NULL FULLTEXT WITH(analyzer = 'English', case_sensitive = 'false'),
`status` INT UNSIGNED NULL,
`size` INT UNSIGNED NULL,
`datetime` TIMESTAMP(9) NOT NULL,
`timestamp` TIMESTAMP(9) NULL,
TIME INDEX (`datetime`),
PRIMARY KEY (`method`, `protocol`, `status`)
)
ENGINE=mito
WITH(
append_mode = 'true'
) |
+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)
Now that the data is stored in the database, you can leverage some of the features provided by GreptimeDB to quickly filter for data of interest. For example, using full-text search, we can perform a fuzzy match on UA
to quickly find records where the UA
contains "Android
."
mysql> SELECT * FROM demo_logs WHERE MATCHES(user_agent, 'Android') limit 10;
+-----------------+--------+----------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+
| ip | method | protocol | path | user_agent | status | size | datetime | timestamp |
+-----------------+--------+----------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+
| 240.14.156.37 | DELETE | HTTP/1.1 | /contact | Mozilla/5.0 (Linux; U; Android 3.0; en-us; Xoom Build/HRI39) AppleWebKit/534.13 (KHTML, like Gecko) Version/4.0 Safari/534.13 | 200 | 155 | 2024-10-28 03:39:20 | 2024-10-28 03:39:20.969000 |
| 186.44.204.29 | DELETE | HTTP/1.1 | / | Opera/9.80 (Android 2.3.4; Linux; Opera Mobi/build-1107180945; U; en-GB) Presto/2.8.149 Version/11.10 | 201 | 343 | 2024-10-28 03:45:33 | 2024-10-28 03:45:33.459000 |
| 75.246.111.167 | DELETE | HTTP/1.1 | /contact | Mozilla/5.0 (Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1 | 404 | 869 | 2024-10-28 03:38:59 | 2024-10-28 03:38:59.942000 |
| 236.239.192.109 | DELETE | HTTP/1.1 | /contact | Mozilla/5.0 (Linux; U; Android 2.2.1; zh-cn; HTC_Wildfire_A3333 Build/FRG83D) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1 | 500 | 892 | 2024-10-28 03:38:53 | 2024-10-28 03:38:53.934000 |
| 232.232.14.176 | DELETE | HTTP/1.1 | /contact | Mozilla/5.0 (Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1 | 500 | 644 | 2024-10-28 03:46:42 | 2024-10-28 03:46:42.550000 |
| 135.16.130.172 | DELETE | HTTP/2.0 | / | MQQBrowser/26 Mozilla/5.0 (Linux; U; Android 2.3.7; zh-cn; MB200 Build/GRJ22; CyanogenMod-7) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1 | 404 | 177 | 2024-10-28 03:47:27 | 2024-10-28 03:47:27.613000 |
| 69.23.7.123 | GET | HTTP/1.1 | /blog | Mozilla/5.0 (Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1 | 201 | 770 | 2024-10-28 03:45:09 | 2024-10-28 03:45:09.425000 |
| 37.61.6.211 | GET | HTTP/1.1 | /blog | MQQBrowser/26 Mozilla/5.0 (Linux; U; Android 2.3.7; zh-cn; MB200 Build/GRJ22; CyanogenMod-7) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1 | 404 | 298 | 2024-10-28 03:45:21 | 2024-10-28 03:45:21.442000 |
| 244.166.255.46 | GET | HTTP/2.0 | /blog | Mozilla/5.0 (Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1 | 201 | 963 | 2024-10-28 03:45:48 | 2024-10-28 03:45:48.478000 |
| 35.169.107.238 | GET | HTTP/2.0 | /blog | Mozilla/5.0 (Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1 | 404 | 249 | 2024-10-28 03:46:48 | 2024-10-28 03:46:48.558000 |
+-----------------+--------+----------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+
10 rows in set (0.01 sec)
When performing statistical work or troubleshooting, it’s often necessary to distinguish user channels and URLs. For example, you may need to filter for users from the Android channel who accessed the blog page and check the distribution of HTTP status codes. This can be quickly achieved with the following SQL, significantly reducing data processing time:
mysql> SELECT method,status,count(*) FROM demo_logs WHERE MATCHES(user_agent, 'Android') and MATCHES(path, 'blog') group by method,status;
+--------+--------+----------+
| method | status | COUNT(*) |
+--------+--------+----------+
| GET | 404 | 2 |
| GET | 201 | 3 |
| PUT | 500 | 2 |
| POST | 404 | 1 |
+--------+--------+----------+
4 rows in set (0.01 sec)
We have packaged this process into a docker compose file, and you are welcome to visit the GitHub demo-scene repo to get the relevant source code and guide: https://github.com/GreptimeTeam/demo-scene/tree/main/kafka-ingestion
Summary
This article introduced how to use Vector to read log data from Kafka and write it into GreptimeDB. Vector is an open-source data collection tool that supports reading data from various data sources and writing data to various destinations. It currently supports GreptimeDB's sink and is very convenient to import monitoring data from the original system into GreptimeDB.
This article also introduced how to use the Vector tool to seamlessly transfer log data from Kafka to GreptimeDB, fully utilizing GreptimeDB's advantages in storing and analyzing time-series data, and Vector's flexibility to make data processing more efficient.
GreptimeDB's powerful log storage and query capabilities provide reliable assurance for log analysis, whether building a log management system or performing real-time monitoring and analysis. The combination of Kafka + Vector + GreptimeDB can help users achieve efficient time-series data flow and processing. In the future, we will further introduce how to achieve more complex log processing and data filtering through GreptimeDB's Pipeline engine, so stay tuned!
About Greptime
Greptime offers industry-leading time series database products and solutions to empower IoT and Observability scenarios, enabling enterprises to uncover valuable insights from their data with less time, complexity, and cost.
GreptimeDB is an open-source, high-performance time-series database offering unified storage and analysis for metrics, logs, and events. Try it out instantly with GreptimeCloud, a fully-managed DBaaS solution—no deployment needed!
The Edge-Cloud Integrated Solution combines multimodal edge databases with cloud-based GreptimeDB to optimize IoT edge scenarios, cutting costs while boosting data performance.
Star us on GitHub or join GreptimeDB Community on Slack to get connected.