Aws logs are not writing in cloud watch after certain steps

0 votes

i have an aws job which reads a csv file in pyspark which write the logs in aws cloud watch . the logs are writing in the initial steps but after a certain step the logs are not written

```

 for i, key in enumerate(keys):
            source_path = "s3://"+source_bucket+"/"+key
            if conf_format =='csv' or conf_format == 'text':
                #source_df = spark.read.option("header",conf_header).option("inferSchema","true").option("delimiter",conf_delimiter).csv(source_path)
                #source_df = spark.read.option("header",conf_header).option("inferSchema","true").option("delimiter",conf_delimiter).csv(source_path)
                validated_df_2=schema_validation(source_path)
                source_df_2=validated_df_2.filter(validated_df_2.valid_rec == "Y")
                print("printing source_df")
                source_df=source_df_2.drop(source_df_2.valid_rec)
                print("printing source_df after drop dolumn")
                source_df.printSchema()
                source_df.show(5)
            elif conf_format =='json':
                source_df = spark.read.option("multiline", "true").json(source_path)
            elif conf_format =='avro':
                source_df = spark.read.format("com.databricks.spark.avro").load(source_path)
            if i==0:
                target_df = source_df
            else:
                target_df = target_df.union(source_df)
        ct_before = target_df.count()
        #remove all null values
        target_df.na.drop("all")
        #Convert column names to lower case
        lower_df = target_df.toDF(*[c.lower() for c in target_df.columns])
        #Convert slash into hyphen in column name 
        col_df = lower_df.toDF(*list(map(lambda col : col if '/' not in col else col[1:].replace('/', '-'), lower_df.columns)))
        #Convert whitespace into empty in column name
        final_df = col_df.toDF(*(c.replace(' ', '') for c in col_df.columns))
        #remove duplicates
        col = final_df.columns
        col1 = final_df.columns[0]
        print(col)
        print(col1)
        win = Window.partitionBy(final_df.columns).orderBy(col1)
        df_with_rn = final_df.withColumn("row_num", f.row_number().over(win))
        df_with_rn.createOrReplaceTempView("t_stage")
        deduped_df = spark.sql(""" select * from t_stage where row_num = 1
                               """)
        delta_df = deduped_df.drop(deduped_df.row_num)
        print("show delta df schema and data")
        delta_df.printSchema()// till line prints in cloud watch . after this no logs and job is running for ever 
        delta_df.show(5)
// have more than 1000 lines 
```

Jul 30, 2021 in Apache Spark by Anjali

edited Mar 4, 2025 713 views

No answer to this question. Be the first to respond.

Your answer

Your name to display (optional):
Privacy: Your email address will only be used for sending these notifications.
webinar REGISTER FOR FREE WEBINAR X
REGISTER NOW
webinar_success Thank you for registering Join Edureka Meetup community for 100+ Free Webinars each month JOIN MEETUP GROUP