Apache Spark는 Scala 프로그래밍 언어로 작성되었습니다
<Apache Spark는 본래 Scala 언어로 만들어졌지만 Java, R, Python 등과 같은 다양한 프로그래밍 언어 API를 제공한다.>
또한, PySpark를 사용하면 Apache Spark와 Python 프로그래밍 언어로 RDD(Resilient Distributed Datasets)에 접속하는 데 도움이 됩니다. 이를 위해 Py4j 라이브러리를 활용했습니다.
Py4J는 PySpark에 내장된 대중적인 라이브러리이며 JVM 개체를 사용해 Python의 동적인 인터페이스를 허용합니다. PySpark에는 효율적인 프로그램을 쓰는 데 좋은 라이브러리가 꽤 많습니다. 또한 호환되는 외부 라이브러리도 다양합니다.
출처 : https://databricks.com/kr/glossary/pyspark
Apache Spark
- 대용량 데이터 병렬 처리를 위한 통합 분석 엔진
RDD
- Apache Spark의 기본 자료 구조
- Pandas처럼 사용가능
Pyspark 메소드
<스파크 세션>
from pytz import timezone
from datetime import datetime, timedelta
# 스파크 세션
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
# 최근 날짜
STAMP_DATE = (datetime.now(timezone("Asia/Seoul")) - timedelta(days=1)).strftime("%Y-%m-%d")
def reference_date(delta):
return (datetime.now(timezone("Asia/Seoul")) - timedelta(days=delta)).strftime("%Y-%m-%d")
<groupBy 와 partitionBy의 차이>
* groupBy()
첫째, 동일한 값을 여러 개 갖고 있는 열의 이름을 GROUP BY 절에 적어줌으로써 데이터가 그룹 지어질 수 있는 기준으로 제공합니다.
둘째, 집계 함수가 동일한 값을 하나의 값으로 합치기 위해 그 행들의 값을 계산합니다.
셋째, 집계 함수를 통해 값을 합치는 과정에서 기존의 행들은 사라지게 됩니다. 집계 함수를 통해 구한 값들을 볼 수는 있어도 기존에 있던 정보를 함께 볼 수는 없습니다.
기존 행에 있던 세세한 정보를 더 이상 보지 못하는 것은 대부분의 경우는 괜찮습니다. 우리는 집계 함수를 통해 보다 유의미한 분석 포인트를 발견하고 싶었던 것이니까요.
하지만, 때론, 집계 함수로 새로 구한 값과 원래 기존의 세세한 행들을 같이 보면서 분석을 해야 할 때가 생깁니다.
이는 서브 쿼리를 활용해 해결할 수 있지만, 서브 쿼리로 해결하다 보면 쿼리문도 길어질 수도 있습니다.
* partitionBy()
PARTITION BY를 통해 특정 기준에 한정하여 집계된 값을 계산해 줄 수 있습니다.
여러 행의 집계된 값을 구하고자 PARTITION BY는 OVER절과 윈도우 함수와 함께 사용됩니다.
이는 GROUP BY와 집계 함수가 하는 역할과 거의 유사하지만, 차이점이 1가지 존재합니다.
여러분이 PARTITION BY를 사용하면, GROUP BY와는 달리 기존 행의 세세한 정보들은 사라지지 않고 그대로 유지됩니다.
즉, 기존의 데이터와 집계된 값을 함께 나란히 볼 수 있다는 이야기입니다.
1. 유사점: PARTITION BY와 GROUP BY 모두 집계된 값을 반환할 때 사용합니다.
2. 차이점
(1) GROUP BY를 사용하면 기존 행들이 합쳐집니다. 집계된 값을 반환하면서 원래 행에 있었던 값을 함께 볼 수 없습니다.
(2) 반면, PARTITION BY를 사용할 경우 집계된 값을 반환하면서 동시에 기존 행의 값들도 함께 볼 수 있습니다.
(3) 또한, PARTITION BY는 OVER()와 윈도우 함수와 함께 사용됩니다. = over.(Window.partitionBy())
+ Window.partitionBy: Creates a WindowSpec with the partitioning defined.
+ row_number()에 의해 '1'부터 차례대로 넘버링 된다.
순위 동률을 메기는 RANK()와는 다르게 모든 행의 번호를 고유하게 순차적으로 지정하는 것이 특징
+ over() : 쿼리 결과 집합 내의 윈도우 또는 사용자 지정 행 집합을 정의한다.
over 절에 윈도우 함수를 사용하여 이동 평균, 누적 집계, 누계 또는 그룹 결과당 상위 N개 결과 등의 집계된 값을 계산할 수 있다.
+ posexplode_outer: 지정된 배열 또는 맵의 위치(index?)와 함께 각 요소의 새 행을 반환합니다.
final.withColumn("splited", split(col("description"), "<"))
.select(
"product_id",
posexplode_outer("splited").alias("index", "splited") # pos, col ==> "index", "splited"(= enumerate?)
)
.withColumn("splited", regexp_extract(col("splited"), imgExp, 1))
.filter(col("splited") != "") # filter 이미지 정보가 없는 경우는 제외
.withColumn("splited", regexp_replace(col("splited"), "^//", "https://")) # //로 시작하는 경우 https://로 변경
.filter(col("splited").startswith("http")) # http로 시작하는 것 = (do not use a regex ^)
.withColumn(
"index", row_number().over(Window.partitionBy("product_id").orderBy("index"))
) # 프로덕션아이디 별로 파티션 새로생성, 프러덕트 아이디가 달라질때마다 인덱스 새로시작
.select("product_id", "index", "splited")
'''
posexplode_outer({"x": 1.0}) ==> pos key value 이렇게 세 개 필드 생성됨(= 0 x 1.0)
posexplode_outer(["foo", "bar"]) ==> pos col
posexplode와 달리 어레이/맵이 null이거나 비어 있으면 행(null, null)이 생성됩니다.(= 빈 값에 null 적용 안 해도되)
'''
* explode()
select explode(map('Tom',10,'Jerry',20,'Emily',30)) as (name, age)
'''
+--------+------+--+
| name | age |
+--------+------+--+
| Tom | 10 |
| Jerry | 20 |
| Emily | 30 |
+--------+------+--+
'''
'IT 개인학습 > Memo' 카테고리의 다른 글
Big O 표기법 (0) | 2022.06.04 |
---|---|
Pillow image mode (0) | 2022.05.07 |
Encoding 인코딩 (0) | 2022.05.07 |
Tempfile (0) | 2022.05.07 |
정규표현식 (0) | 2022.05.07 |
댓글