Source connectors
Kafka
This page was recently updated. What do you think about it? Let us know!.
Connect Kafka to your preprocessing pipeline, and use the Unstructured Ingest CLI or the Unstructured Ingest Python library to batch process all your documents and store structured outputs locally on your filesystem.
The requirements are as follows.
- A Kafka cluster, such as ones provided by Confluent Cloud, Amazon Managed Streaming for Apache Kafka (Amazon MSK), or Google Cloud Managed Service for Apache Kafka.
- The hostname of the bootstrap Kafka cluster to connect to.
- The port number of the cluster.
- The name of the topic to read messages from and write messages to on the cluster.
- If you use Kafka API keys and secrets for authentication, the key and secret values.
The Kafka connector dependencies:
CLI, Python
You might also need to install additional dependencies, depending on your needs. Learn more.
The following environment variables:
KAFKA_BOOTSTRAP_SERVER
- The hostname of the bootstrap Kafka cluster to connect to, represented by--bootstrap-server
(CLI) orbootstrap_server
(Python).KAFKA_PORT
- The port number of the cluster, represented by--port
(CLI) orport
(Python).KAFKA_TOPIC
- The unique name of the topic to read messages from and write messages to on the cluster, represented by--topic
(CLI) ortopic
(Python).
If you use Kafka API keys and secrets for authentication:
KAFKA_API_KEY
- The Kafka API key value, represented by--kafka-api-key
(CLI) orkafka_api_key
(Python).KAFKA_SECRET
- The secret value for the Kafka API key, represented by--secret
(CLI) orsecret
(Python).
Additional settings include:
--confluent
(CLI) orconfluent
(Python): True to indicate that the cluster is running Confluent Kafka.--num-messages-to-consume
(CLI) ornum_messages_to_consume
(Python): The maximum number of messages to get from the topic. The default is1
if not otherwise specified.--timeout
(CLI) ortimeout
(Python): The maximum amount of time to wait for the response of a request to the topic, expressed in seconds. The default is1.0
if not otherwise specified.--group-id
(CLI) orgroup_id
(Python): The ID of the consumer group, if any, that is associated with the target Kafka cluser. (A consumer group is a way to allow a pool of consumers to divide the consumption of data over topics and partitions.) The default isdefault_group_id
if not otherwise specified.
These environment variables:
UNSTRUCTURED_API_KEY
- Your Unstructured API key value.UNSTRUCTURED_API_URL
- Your Unstructured API URL.
Now call the Unstructured Ingest CLI or the Unstructured Ingest Python library. The destination connector can be any of the ones supported. This example uses the local destination connector:
Was this page helpful?