Issues with s3 cp and EMR steps

Hey, I’m having issues using my current pipeline code with localstack and EMR.

These steps work fully in AWS EMR, but when running with LocalStack EMR, I’m getting issues where the pipeline_entry_point.py file is not being made available for the next steps.

The logs appear to suggest the file was downloaded from LocalStack S3 as tmp9e6hnx73 and later removed and then later on, it seems to have changed the s3 cp command to copy from ‘/tmp/8c0f78a6.py’ where it then fails to run, meaning that the file is not copied to home/hadoop and not made available for the next step, and so the while thing fails.

This all works on actual AWS though, so I’m a bit stupped.

Full logs here, any help would be appreciated.

2024-01-27T19:28:26.228  INFO --- [   asgi_gw_2] localstack.request.aws     : AWS emr.RunJobFlow => 200; 000000000000/eu-west-1; RunJobFlowInput(
  {
    'Name': 'json-create', 
    'LogUri': 's3://logs/mwaa_emr/', 
    'ReleaseLabel': 'emr-6.4.0', 
    'Instances': {
      'InstanceGroups': [
        {'Name': 'Master nodes', 'Market': 'ON_DEMAND', 'InstanceRole': 'MASTER', 'InstanceType': 'c6g.xlarge', 'InstanceCount': 1}, 
        {'Name': 'Core nodes', 'Market': 'ON_DEMAND', 'InstanceRole': 'CORE', 'InstanceType': 'c6g.xlarge', 'InstanceCount': 3}
      ], 
      'KeepJobFlowAliveWhenNoSteps': False, 
      'TerminationProtected': False, 
      'Ec2SubnetId': 'subnet-d073b1ce'
    }, 
    'Steps': [
      {
        'Name': 'Upload pipeline entry point', 
        'ActionOnFailure': 'TERMINATE_CLUSTER', 
        'HadoopJarStep': {
          'Jar': 'command-runner.jar', 
          'Args': [
            'aws', 's3', 'cp', 's3://apps/pipeline/pipeline_entry_point.py', '/home/hadoop'
          ]
        }
      }
    ], 
    'BootstrapActions': [
      {
        'Name': 'Install pipeline library', 
        'ScriptBootstrapAction': {
          'Path': 's3://apps/pipeline/pipeline_bootstrap.sh'
        }
      }
    ], 
    'Applications': [{'Name': 'Spark'}], 
    'Configurations': [
      {
        'Classification': 'spark-hive-site', 
        'Properties': {'hive.metastore.client.factory.class': 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory'}
      }, 
      {'Classification': 'spark', 'Properties': {'maximizeResourceAllocation': 'true'}}, 
      {'Classification': 'spark-env', 'Configurations': [], 'Properties': {}}
    ], 
    'VisibleToAllUsers': True, 
    'JobFlowRole': 'EMR_EC2_DefaultRole', 
    'ServiceRole': 'EMR_DefaultRole', 
    'Tags': [
      {'Key': 'app', 'Value': 'analytics'}, 
      {'Key': 'environment', 'Value': 'development'}
    ]
  }, 
  headers={
    'Host': 'localstack:4566', 
    'Accept-Encoding': 'identity', 
    'X-Amz-Target': 'ElasticMapReduce.RunJobFlow', 
    'Content-Type': 'application/x-amz-json-1.1', 
    'User-Agent': 'Boto3/1.26.51 Python/3.10.9 Linux/5.10.104-linuxkit Botocore/1.29.51 Airflow/2.5.1 AmPP/7.1.0 Caller/EmrCreateJobFlowOperator DagRunKey/45e217f4-4a30-5d5d-93b0-74e2d9bbbd31', 
    'X-Amz-Date': '20240127T192804Z', 
    'Authorization': 'AWS4-HMAC-SHA256 Credential=000000000000/20240127/eu-west-1/elasticmapreduce/aws4_request, SignedHeaders=content-type;host;x-amz-date;x-amz-target, Signature=413aa887999e1556ca85b3815b44fc67b3553092e4d3642dc0b553fb236d834d', 
    'amz-sdk-invocation-id': '0cee92f2-b02f-47e3-ad96-ae2faf922c7e', 
    'amz-sdk-request': 'attempt=1', 
    'Content-Length': '1609', 
    'x-moto-account-id': '000000000000'
  }
); 

RunJobFlowOutput(
  {'JobFlowId': 'j-6IBURSO6BFEP1', 'ClusterArn': 'arn:aws:elasticmapreduce:eu-west-1:000000000000:cluster/j-6IBURSO6BFEP1'},
  headers={'Content-Type': 'application/x-amz-json-1.1', 'Content-Length': '121', 'x-amzn-requestid': '59bbec31-0f51-4d4e-91e1-342371491eeb'}
)



2024-01-27T19:28:33.417  INFO --- [   asgi_gw_0] localstack.request.aws     : AWS emr.AddJobFlowSteps => 200; 000000000000/eu-west-1; AddJobFlowStepsInput(
  {
    'JobFlowId': 'j-6IBURSO6BFEP1', 
    'Steps': [
      {
        'Name': 'Upload entry point', 
        'ActionOnFailure': 'TERMINATE_CLUSTER', 
        'HadoopJarStep': {
          'Jar': 'command-runner.jar', 
          'Args': ['aws', 's3', 'cp', 's3://apps/pipeline/pipeline_entry_point.py', '/home/hadoop']
        }
      }, 
      {
        'Name': 'Run ETL for test', 
        'ActionOnFailure': 'TERMINATE_CLUSTER', 
        'HadoopJarStep': {
          'Jar': 'command-runner.jar', 
          'Args': ['spark-submit', '--master', 'yarn', '/home/hadoop/pipeline_entry_point.py', 'etl', 'run-create-service-events', '--raw-path', 's3://data/test/', '--product', 'test', '--raw-out', 's3://out-data/raw_data/', '--json-out', 's3://queue-out-data/test/20240127/', '--chunk-size', '5000', '--date', '20240127']
        }
      }
    ]
  },
  headers={'Host': 'localstack:4566', 'Accept-Encoding': 'identity', 'X-Amz-Target': 'ElasticMapReduce.AddJobFlowSteps', 'Content-Type': 'application/x-amz-json-1.1', 'User-Agent': 'Boto3/1.26.51 Python/3.10.9 Linux/5.10.104-linuxkit Botocore/1.29.51 Airflow/2.5.1 AmPP/7.1.0 Caller/EmrAddStepsOperator DagRunKey/45e217f4-4a30-5d5d-93b0-74e2d9bbbd31', 'X-Amz-Date': '20240127T192833Z', 'Authorization': 'AWS4-HMAC-SHA256 Credential=000000000000/20240127/eu-west-1/elasticmapreduce/aws4_request, SignedHeaders=content-type;host;x-amz-date;x-amz-target, Signature=e1a083da0f0b1f9aa1ce4c8fc542ce6604acf77a050b28b5d5ee97ab75e87daf', 'amz-sdk-invocation-id': '479b6c4d-602a-45eb-92dd-68b3f226d66c', 'amz-sdk-request': 'attempt=1', 'Content-Length': '870', 'x-moto-account-id': '000000000000'}
);

AddJobFlowStepsOutput(
  {'StepIds': ['s-N76XWNX3W92B5', 's-L6XYKHZULX1HK']}, 
  headers={'Content-Type': 'application/x-amz-json-1.1', 'Content-Length': '51', 'x-amzn-requestid': '50a55b58-b575-4a90-add1-64be2075316e'}
)

2024-01-27T19:28:33.420 DEBUG --- [uncthread121] l.services.emr.provider    : Downloading S3 file for EMR job: bucket 
  "apps", key "pipeline/pipeline_entry_point.py", local file "/tmp/tmp9e6hnx73"

2024-01-27T19:28:33.460 DEBUG --- [   asgi_gw_5] l.aws.protocol.serializer  : Determined accept type (None) is not supported by this serializer. Using default of this serializer: application/xml

2024-01-27T19:28:33.470  INFO --- [   asgi_gw_5] l.request.internal.aws     : AWS s3.HeadObject => 200

2024-01-27T19:28:33.524 DEBUG --- [uncthread124] localstack.utils.run       : Executing command: 
  ['/var/lib/localstack/lib/hive/2.3.9/bin/hive', '--service', 'metastore']

2024-01-27T19:28:33.528 DEBUG --- [   asgi_gw_9] l.aws.protocol.serializer  : Determined accept type (None) is not supported by this serializer. Using default of this serializer: application/xml

2024-01-27T19:28:33.535  INFO --- [   asgi_gw_9] l.request.internal.aws     : AWS s3.GetObject => 200

2024-01-27T19:28:33.582 DEBUG --- [uncthread121] localstack.utils.run       : Executing command: rm -rf "/tmp/tmp9e6hnx73"

2024-01-27T19:28:33.611  INFO --- [uncthread121] l.services.emr.provider    : EMR job running (using SPARK_HOME=/var/lib/localstack/lib/spark/3.1.2): 
  ['aws', 's3', 'cp', '/tmp/8c0f78a6.py', '/home/hadoop'] - 
  Env: {'AWS_REGION': 'eu-west-1', 'AWS_ACCESS_KEY_ID': '000000000000', 'AWS_SECRET_ACCESS_KEY': 'test', 'LOCALSTACK_HOSTNAME': 'localhost', 'EDGE_PORT': 4566, 'PATH': '/var/lib/localstack/lib/spark/3.1.2/bin:/opt/code/localstack/.venv/bin:/usr/local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-11/bin', 'SPARK_HOME': '/var/lib/localstack/lib/spark/3.1.2'}

2024-01-27T19:28:33.612 DEBUG --- [uncthread121] localstack.utils.run       : Executing command: 
  ['aws', 's3', 'cp', '/tmp/8c0f78a6.py', '/home/hadoop']

ERROR: '['aws', 's3', 'cp', '/tmp/8c0f78a6.py', '/home/hadoop']': exit code 255; 
  output: b'\nusage: aws s3 cp <LocalPath> <S3Uri> or <S3Uri> <LocalPath> or <S3Uri> <S3Uri>\nError: Invalid argument type\n'

2024-01-27T19:28:35.345 DEBUG --- [uncthread121] l.services.emr.provider    : Execution of EMR job step failed: 
  {
    'Name': 'Upload entry point', 
    'ActionOnFailure': 'TERMINATE_CLUSTER', 
    'HadoopJarStep': {
      'Jar': 'command-runner.jar', 
      'Args': ['aws', 's3', 'cp', 's3://apps/pipeline/pipeline_entry_point.py', '/home/hadoop']
    }
  } - Command '['aws', 's3', 'cp', '/tmp/8c0f78a6.py', '/home/hadoop']' returned non-zero exit status 255.

2024-01-27T19:28:35.348 DEBUG --- [uncthread121] l.services.emr.provider    : Downloading S3 file for EMR job: 
  bucket "data", key "test/", local file "/tmp/tmp6i03rjev"

2024-01-27T19:28:35.377 DEBUG --- [   asgi_gw_7] l.aws.protocol.serializer  : Determined accept type (None) is not supported by this serializer. Using default of this serializer: application/xml

2024-01-27T19:28:35.379  INFO --- [   asgi_gw_7] l.request.internal.aws     : AWS s3.HeadObject => 404 (NoSuchKey)

2024-01-27T19:28:35.402 DEBUG --- [uncthread121] l.services.emr.provider    : Downloading S3 file for EMR job: 
  bucket "out-data", key "raw_data/", local file "/tmp/tmpvih4xve3"

2024-01-27T19:28:35.429 DEBUG --- [   asgi_gw_3] l.aws.protocol.serializer  : Determined accept type (None) is not supported by this serializer. Using default of this serializer: application/xml

2024-01-27T19:28:35.431  INFO --- [   asgi_gw_3] l.request.internal.aws     : AWS s3.HeadObject => 404 (NoSuchKey)

2024-01-27T19:28:35.438 DEBUG --- [uncthread121] l.services.emr.provider    : Downloading S3 file for EMR job: 
  bucket "queue-out-data", key "test/20240127/", local file "/tmp/tmpctq06mn5"

2024-01-27T19:28:35.461 DEBUG --- [   asgi_gw_8] l.aws.protocol.serializer  : Determined accept type (None) is not supported by this serializer. Using default of this serializer: application/xml

2024-01-27T19:28:35.462  INFO --- [   asgi_gw_8] l.request.internal.aws     : AWS s3.HeadObject => 404 (NoSuchKey)

2024-01-27T19:28:35.476  INFO --- [uncthread121] l.services.emr.provider    : EMR job running (using SPARK_HOME=/var/lib/localstack/lib/spark/3.1.2): 
  ['spark-submit', '--conf', 'spark.yarn.dist.files=/var/lib/localstack/lib/hive/2.3.9/conf/hive-site.xml', '--conf', 'spark.hadoop.dfs.client.datanode-restart.timeout=30', '--master', 'yarn', '/home/hadoop/pipeline_entry_point.py', 'etl', 'run-create-service-events', '--raw-path', 's3://data/test/', '--product', 'test', '--raw-out', 's3://out-data/raw_data/', '--json-out', 's3://queue-out-data/test/20240127/', '--chunk-size', '5000', '--date', '20240127'] - Env: {'AWS_REGION': 'eu-west-1', 'AWS_ACCESS_KEY_ID': '000000000000', 'AWS_SECRET_ACCESS_KEY': 'test', 'LOCALSTACK_HOSTNAME': 'localhost', 'EDGE_PORT': 4566, 'PATH': '/var/lib/localstack/lib/spark/3.1.2/bin:/opt/code/localstack/.venv/bin:/usr/local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-11/bin', 'SPARK_HOME': '/var/lib/localstack/lib/spark/3.1.2'}

2024-01-27T19:28:35.477 DEBUG --- [uncthread121] localstack.utils.run       : Executing command: 
  ['spark-submit', '--conf', 'spark.yarn.dist.files=/var/lib/localstack/lib/hive/2.3.9/conf/hive-site.xml', '--conf', 'spark.hadoop.dfs.client.datanode-restart.timeout=30', '--master', 'yarn', '/home/hadoop/pipeline_entry_point.py', 'etl', 'run-create-service-events', '--raw-path', 's3://data/test/', '--product', 'test', '--raw-out', 's3://out-data/raw_data/', '--json-out', 's3://queue-out-data/test/20240127/', '--chunk-size', '5000', '--date', '20240127']

2024-01-27T19:28:36.543 DEBUG --- [   asgi_gw_1] l.aws.protocol.serializer  : No accept header given. Using request's Content-Type (application/x-amz-json-1.1) as preferred response Content-Type.

2024-01-27T19:28:36.545  INFO --- [   asgi_gw_1] localstack.request.aws     : AWS emr.DescribeStep => 200; 000000000000/eu-west-1; 
  DescribeStepInput(
    {'ClusterId': 'j-6IBURSO6BFEP1', 'StepId': 's-N76XWNX3W92B5'}, headers={'Host': 'localstack:4566', 'Accept-Encoding': 'identity', 'X-Amz-Target': 'ElasticMapReduce.DescribeStep', 'Content-Type': 'application/x-amz-json-1.1', 'User-Agent': 'Boto3/1.26.51 Python/3.10.9 Linux/5.10.104-linuxkit Botocore/1.29.51 Airflow/2.5.1 AmPP/7.1.0 Caller/EmrStepSensor DagRunKey/45e217f4-4a30-5d5d-93b0-74e2d9bbbd31', 'X-Amz-Date': '20240127T192836Z', 'Authorization': 'AWS4-HMAC-SHA256 Credential=000000000000/20240127/eu-west-1/elasticmapreduce/aws4_request, SignedHeaders=content-type;host;x-amz-date;x-amz-target, Signature=db709fc3eaf9bb7a6381d36dd871886e3327ac7d8e5ea7f827555fedbdc6bfb7', 'amz-sdk-invocation-id': 'eac2e095-1017-4839-b7b9-9b485aa3d07f', 'amz-sdk-request': 'attempt=1', 'Content-Length': '61', 'x-moto-account-id': '000000000000'}); DescribeStepOutput({'Step': {'Id': 's-N76XWNX3W92B5', 'Name': 'Upload entry point', 'Config': {'Jar': 'command-runner.jar', 'Properties': {}, 'Args': ['aws', 's3', 'cp', 's3://apps/pipeline/pipeline_entry_point.py', '/home/hadoop']}, 'ActionOnFailure': 'TERMINATE_CLUSTER', 'Status': {'State': 'FAILED', 'FailureDetails': {}, 'Timeline': {'CreationDateTime': datetime.datetime(2024, 1, 27, 19, 28, 33, 411825, tzinfo=tzlocal())}}}}, headers={'Content-Type': 'application/x-amz-json-1.1', 'Content-Length': '412', 'x-amzn-requestid': '748783b8-c8d7-4cdb-94db-b9bd80199987'})

ERROR: '['spark-submit', '--conf', 'spark.yarn.dist.files=/var/lib/localstack/lib/hive/2.3.9/conf/hive-site.xml', '--conf', 'spark.hadoop.dfs.client.datanode-restart.timeout=30', '--master', 'yarn', '/home/hadoop/pipeline_entry_point.py', 'etl', 'run-create-service-events', '--raw-path', 's3://data/test/', '--product', 'test', '--raw-out', 's3://out-data/raw_data/', '--json-out', 's3://queue-out-data/test/20240127/', '--chunk-size', '5000', '--date', '20240127']': exit code 2; 
  output: b"WARNING: An illegal reflective access operation has occurred\nWARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/var/lib/localstack/lib/spark/3.1.2/jars/spark-unsafe_2.12-3.1.2.jar) to constructor java.nio.DirectByteBuffer(long,int)\nWARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform\nWARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations\nWARNING: All illegal access operations will be denied in a future release\n2024-01-27 19:28:40,896 
  WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n
  python3: can't open file '/home/hadoop/pipeline_entry_point.py': 
  [Errno 2] No such file or directory\n2024-01-27 19:28:41,610 INFO util.ShutdownHookManager: Shutdown hook called\n2024-01-27 19:28:41,612 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-e000740c-b7b5-4541-abbf-0e871cbd4a67\n"

2024-01-27T19:28:41.687 DEBUG --- [uncthread121] l.services.emr.provider    : Execution of EMR job step failed: 
  {'Name': 'Run ETL for test', 'ActionOnFailure': 'TERMINATE_CLUSTER', 'HadoopJarStep': {'Jar': 'command-runner.jar', 'Args': ['spark-submit', '--master', 'yarn', '/home/hadoop/pipeline_entry_point.py', 'etl', 'run-create-service-events', '--raw-path', 's3://data/test/', '--product', 'test', '--raw-out', 's3://out-data/raw_data/', '--json-out', 's3://queue-out-data/test/20240127/', '--chunk-size', '5000', '--date', '20240127']}} - Command '['spark-submit', '--conf', 'spark.yarn.dist.files=/var/lib/localstack/lib/hive/2.3.9/conf/hive-site.xml', '--conf', 'spark.hadoop.dfs.client.datanode-restart.timeout=30', '--master', 'yarn', '/home/hadoop/pipeline_entry_point.py', 'etl', 'run-create-service-events', '--raw-path', 's3://data/test/', '--product', 'test', '--raw-out', 's3://out-data/raw_data/', '--json-out', 's3://queue-out-data/test/20240127/', '--chunk-size', '5000', '--date', '20240127']' returned non-zero exit status 2.