Question on Glue Streaming ETL Job that reads JSON from a Kinesis Data Stream

Hi,

I’ve set-up a Kinesis Data Stream and I use boto3’s kinesis client with boto3.client("kinesis")and the put_records method to send data into the Kinesis stream (data is a List of Dictionaries of the format { "Data": '{"a": 1, "b":2, "c":3}' }

I want to use Glue Streaming Job to read these JSONs.

I set up a the database and table with terraform:

resource "aws_glue_catalog_table" "glue-table" {
  depends_on = [ aws_glue_catalog_database.glue-db ]
  database_name = aws_glue_catalog_database.glue-db.name
  name          = "glue_table"
  table_type    = "EXTERNAL_TABLE" 

  storage_descriptor {

    parameters = {
		  typeOfData = "aws-kinesis"     # 'kinesis' doesn't work but this appears to
		  streamARN  = "arn:aws:kinesis:us-east-1:000000000000:stream/my-kinesis-stream"
          classification: "JSON"  
	}

    columns {
      name = "a"
      type = "INT"
    }
    columns {
      name = "b"
      type = "INT"
    }
    columns {
      name = "c"
      type = "INT"
    }
  }
}

My Glue Job is also defined in terraform:

resource "aws_glue_job" "etl-job" {
  name        = "etl-job"
  role_arn    = "arn:aws:iam::000000000000:role/glue-etl-role"

  command {
    name = "gluestreaming"
    script_location = "s3://etl-scripts/my-script"
  }

  default_arguments = {
    "--JOB_NAME" = "etl-job"
    "--JOB_BOOKMARK_OPTION" = "JOB_STORE"
    "--enable-glue-datacatalog": ""
    "--continuous-log-logGroup"          = aws_cloudwatch_log_group.etl.name
    "--enable-continuous-cloudwatch-log" = "true"
    "--enable-continuous-log-filter"     = "true"
    "--enable-metrics"                   = "true"
  }
  max_retries = 1
  timeout = 28800
  glue_version = "4.0"
  worker_type = "G.1X"
  number_of_workers = 2
}

My python script (‘my-script’) makes the following statement after setting up GlueContext:

kinesis_options = { 
  "startingPosition": "TRIM_HORIZON", 
  "useSparkDataSource": True,
  "optimizePerformance": True
  }

dyf = glueContext.create_data_frame.from_catalog(   
  database = "glue_db",
  table_name = "glue_netflow_table",
  additional_options = kinesis_options,
  transformation_ctx="DFfromFirehose"
)

Here are error logs for the Glue Job:

"Traceback (most recent call last):"
"  File \"/tmp/script-c7708abb.py\", line 80, in <module>"
"    dyf = glueContext.create_data_frame.from_catalog(   "
"          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
"  File \"/opt/code/localstack/.venv/lib/python3.11/site-packages/awsglue/dataframereader.py\", line 32, in from_catalog"
"    return self._glue_context.create_data_frame_from_catalog(db, table_name, redshift_tmp_dir, transformation_ctx, push_down_predicate, additional_options, catalog_id, **kwargs)"
"           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
"  File \"/opt/code/localstack/.venv/lib/python3.11/site-packages/awsglue/context.py\", line 216, in create_data_frame_from_catalog"
"    source = StreamingDataSource(self._ssql_ctx.getCatalogSource(db, table_name, redshift_tmp_dir, transformation_ctx,"
"                                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
"  File \"/var/lib/localstack/lib/spark/3.3.0/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py\", line 1321, in __call__"
"  File \"/var/lib/localstack/lib/spark/3.3.0/python/lib/pyspark.zip/pyspark/sql/utils.py\", line 190, in deco"
"  File \"/var/lib/localstack/lib/spark/3.3.0/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py\", line 326, in get_return_value"
"py4j.protocol.Py4JJavaError: An error occurred while calling o35.getCatalogSource."
": com.amazonaws.services.glue.util.NonFatalException: Formats not supported for SparkSQL data sources. Got JSON"

What is going wrong? It looks like Glue doesn’t like the JSON format?

Thanks