Hi,
I’ve set-up a Kinesis Data Stream and I use boto3’s kinesis client with boto3.client("kinesis")
and the put_records
method to send data into the Kinesis stream (data is a List of Dictionaries of the format { "Data": '{"a": 1, "b":2, "c":3}' }
I want to use Glue Streaming Job to read these JSONs.
I set up a the database and table with terraform:
resource "aws_glue_catalog_table" "glue-table" {
depends_on = [ aws_glue_catalog_database.glue-db ]
database_name = aws_glue_catalog_database.glue-db.name
name = "glue_table"
table_type = "EXTERNAL_TABLE"
storage_descriptor {
parameters = {
typeOfData = "aws-kinesis" # 'kinesis' doesn't work but this appears to
streamARN = "arn:aws:kinesis:us-east-1:000000000000:stream/my-kinesis-stream"
classification: "JSON"
}
columns {
name = "a"
type = "INT"
}
columns {
name = "b"
type = "INT"
}
columns {
name = "c"
type = "INT"
}
}
}
My Glue Job is also defined in terraform:
resource "aws_glue_job" "etl-job" {
name = "etl-job"
role_arn = "arn:aws:iam::000000000000:role/glue-etl-role"
command {
name = "gluestreaming"
script_location = "s3://etl-scripts/my-script"
}
default_arguments = {
"--JOB_NAME" = "etl-job"
"--JOB_BOOKMARK_OPTION" = "JOB_STORE"
"--enable-glue-datacatalog": ""
"--continuous-log-logGroup" = aws_cloudwatch_log_group.etl.name
"--enable-continuous-cloudwatch-log" = "true"
"--enable-continuous-log-filter" = "true"
"--enable-metrics" = "true"
}
max_retries = 1
timeout = 28800
glue_version = "4.0"
worker_type = "G.1X"
number_of_workers = 2
}
My python script (‘my-script’) makes the following statement after setting up GlueContext
:
kinesis_options = {
"startingPosition": "TRIM_HORIZON",
"useSparkDataSource": True,
"optimizePerformance": True
}
dyf = glueContext.create_data_frame.from_catalog(
database = "glue_db",
table_name = "glue_netflow_table",
additional_options = kinesis_options,
transformation_ctx="DFfromFirehose"
)
Here are error logs for the Glue Job:
"Traceback (most recent call last):"
" File \"/tmp/script-c7708abb.py\", line 80, in <module>"
" dyf = glueContext.create_data_frame.from_catalog( "
" ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
" File \"/opt/code/localstack/.venv/lib/python3.11/site-packages/awsglue/dataframereader.py\", line 32, in from_catalog"
" return self._glue_context.create_data_frame_from_catalog(db, table_name, redshift_tmp_dir, transformation_ctx, push_down_predicate, additional_options, catalog_id, **kwargs)"
" ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
" File \"/opt/code/localstack/.venv/lib/python3.11/site-packages/awsglue/context.py\", line 216, in create_data_frame_from_catalog"
" source = StreamingDataSource(self._ssql_ctx.getCatalogSource(db, table_name, redshift_tmp_dir, transformation_ctx,"
" ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
" File \"/var/lib/localstack/lib/spark/3.3.0/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py\", line 1321, in __call__"
" File \"/var/lib/localstack/lib/spark/3.3.0/python/lib/pyspark.zip/pyspark/sql/utils.py\", line 190, in deco"
" File \"/var/lib/localstack/lib/spark/3.3.0/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py\", line 326, in get_return_value"
"py4j.protocol.Py4JJavaError: An error occurred while calling o35.getCatalogSource."
": com.amazonaws.services.glue.util.NonFatalException: Formats not supported for SparkSQL data sources. Got JSON"
What is going wrong? It looks like Glue doesn’t like the JSON format?
Thanks