CSV Source
A base CSV source that reads data from a CSV file and produces rows to the Kafka topic in JSON format.
The CSV source reads the file, produce the data and exit. It doesn't keep any state. On restart, the whole file will be re-consumed.
How To Use
To use a CSV Source, you need to create and instance of CSVSource
and pass it to the app.dataframe()
method.
from quixstreams import Application
from quixstreams.sources.core.csv import CSVSource
def main():
app = Application()
# Create the Source instance with a file path and a name.
# The name will be included to the default topic name.
source = CSVSource(path="input.csv", name="csv")
sdf = app.dataframe(source=source)
sdf.print(metadata=True)
app.run()
if __name__ == "__main__":
main()
File format
The CSV source expect the input file to have headers.
Every row will be converted to a JSON dictionary and set to the topic.
Example file:
What the source will produce: ```json lines {"field1": "foo1", "field2": "bar1", "timestamp": "1"} {"field1": "foo2", "field2": "bar2", "timestamp": "2"} {"field1": "foo3", "field2": "bar3", "timestamp": "3"}
## Key and timestamp extractors
By default, the produced Kafka messages don't have keys and use current epoch as timestamps.
To specify keys and timestamps for the messages, you may pass `key_extractor` and `timestamp_extractor` callables:
```python
from typing import AnyStr
from quixstreams import Application
from quixstreams.sources.core.csv import CSVSource
def key_extractor(row: dict) -> AnyStr:
return row["field1"]
def timestamp_extractor(row: dict) -> int:
return int(row["timestamp"])
def main():
app = Application(broker_address="localhost:9092")
# input.csv:
# field1,field2,timestamp
# foo1,bar1,1
# foo2,bar2,2
# foo3,bar3,3
source = CSVSource(
path="input.csv",
name="csv",
# Extract field "field1" from each row and use it as a message key.
# Keys must be either strings or bytes.
key_extractor=key_extractor,
# Extract field "timestamp" from each row and use it as a timestamp.
# Timestamps must be integers in milliseconds.
timestamp_extractor=timestamp_extractor,
)
sdf = app.dataframe(source=source)
sdf.print(metadata=True)
app.run()
if __name__ == "__main__":
main()
Topic
The default topic used for the CSV source will use the name
as a part of the topic name and expect keys to be strings and values to be JSON objects.