Skip to content

Commit cb45ad5

Browse files
committed
Increases timeout for data extraction job to prevent errors. Updates data transformation job to use date associated with data extraction job. Fixes configuration of boto3 and smart_open clients to permit file writes to Cloudflare R2.
1 parent d711427 commit cb45ad5

8 files changed

Lines changed: 54 additions & 20 deletions

File tree

.github/workflows/deploy-gcp.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ jobs:
112112
env:
113113
CLOUDFLARE_R2_ACCESS_KEY_ID: ${{ secrets.CLOUDFLARE_R2_ACCESS_KEY_ID }}
114114
CLOUDFLARE_R2_BUCKET_URL: ${{ vars.CLOUDFLARE_R2_BUCKET_URL }}
115+
CLOUDFLARE_R2_ENDPOINT_URL: ${{ vars.CLOUDFLARE_R2_ENDPOINT_URL }}
115116
CLOUDFLARE_R2_SECRET_ACCESS_KEY: ${{ secrets.CLOUDFLARE_R2_SECRET_ACCESS_KEY }}
116117
DJANGO_PORT: ${{ vars.DJANGO_PORT }}
117118
DJANGO_SECRET_KEY: ${{ secrets.DJANGO_SECRET_KEY }}
@@ -125,6 +126,7 @@ jobs:
125126
GEMINI_API_KEY: ${{ secrets.GEMINI_API_KEY }}
126127
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
127128
OUTPUT_FILE_MAX_AGE: ${{ vars.OUTPUT_FILE_MAX_AGE }}
129+
OUTPUT_FILE_NAME: ${{ vars.OUTPUT_FILE_NAME }}
128130
OUTPUT_FILE_TOTAL_MAX_ATTEMPTS: ${{ vars.OUTPUT_FILE_TOTAL_MAX_ATTEMPTS }}
129131
POSTGRES_DB: ${{ vars.POSTGRES_DB }}
130132
POSTGRES_PASSWORD: ${{ secrets.POSTGRES_PASSWORD }}
@@ -142,6 +144,7 @@ jobs:
142144
env:
143145
CLOUDFLARE_R2_ACCESS_KEY_ID: ${{ secrets.CLOUDFLARE_R2_ACCESS_KEY_ID }}
144146
CLOUDFLARE_R2_BUCKET_URL: ${{ vars.CLOUDFLARE_R2_BUCKET_URL }}
147+
CLOUDFLARE_R2_ENDPOINT_URL: ${{ vars.CLOUDFLARE_R2_ENDPOINT_URL }}
145148
CLOUDFLARE_R2_SECRET_ACCESS_KEY: ${{ secrets.CLOUDFLARE_R2_SECRET_ACCESS_KEY }}
146149
DJANGO_PORT: ${{ vars.DJANGO_PORT }}
147150
DJANGO_SECRET_KEY: ${{ secrets.DJANGO_SECRET_KEY }}
@@ -155,6 +158,7 @@ jobs:
155158
GEMINI_API_KEY: ${{ secrets.GEMINI_API_KEY }}
156159
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
157160
OUTPUT_FILE_MAX_AGE: ${{ vars.OUTPUT_FILE_MAX_AGE }}
161+
OUTPUT_FILE_NAME: ${{ vars.OUTPUT_FILE_NAME }}
158162
OUTPUT_FILE_TOTAL_MAX_ATTEMPTS: ${{ vars.OUTPUT_FILE_TOTAL_MAX_ATTEMPTS }}
159163
POSTGRES_DB: ${{ vars.POSTGRES_DB }}
160164
POSTGRES_PASSWORD: ${{ secrets.POSTGRES_PASSWORD }}

infra/gcp/persistent/__main__.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from constants import (
1111
CLOUDFLARE_R2_ACCESS_KEY_ID,
1212
CLOUDFLARE_R2_BUCKET_URL,
13+
CLOUDFLARE_R2_ENDPOINT_URL,
1314
CLOUDFLARE_R2_SECRET_ACCESS_KEY,
1415
DJANGO_ALLOWED_HOST,
1516
DJANGO_API_PATH_DATA_EXTRACTION,
@@ -26,6 +27,7 @@
2627
IS_TEST,
2728
MAPPING_DIR,
2829
OUTPUT_FILE_MAX_AGE,
30+
OUTPUT_FILE_NAME,
2931
OUTPUT_FILE_TOTAL_MAX_ATTEMPTS,
3032
POSTGRES_DB,
3133
POSTGRES_PASSWORD,
@@ -982,7 +984,7 @@
982984
)
983985
],
984986
service_account=cloud_run_service_account.email,
985-
timeout="86400s",
987+
timeout="172800s",
986988
volumes=[
987989
gcp.cloudrunv2.JobTemplateTemplateVolumeArgs(
988990
name="cloudsql",
@@ -1149,6 +1151,10 @@
11491151
name="CLOUDFLARE_R2_ACCESS_KEY_ID",
11501152
value=CLOUDFLARE_R2_ACCESS_KEY_ID,
11511153
),
1154+
gcp.cloudrunv2.JobTemplateTemplateContainerEnvArgs(
1155+
name="CLOUDFLARE_R2_ENDPOINT_URL",
1156+
value=CLOUDFLARE_R2_ENDPOINT_URL,
1157+
),
11521158
gcp.cloudrunv2.JobTemplateTemplateContainerEnvArgs(
11531159
name="CLOUDFLARE_R2_SECRET_ACCESS_KEY",
11541160
value=CLOUDFLARE_R2_SECRET_ACCESS_KEY,
@@ -1551,8 +1557,9 @@
15511557
- initializeVariables:
15521558
assign:
15531559
- inputBucket: ${{ "gs://" + event.data.bucket }}
1554-
- outputBucket: {output_bucket_url}
1555-
- objectKey: ${{ event.data.name }}
1560+
- inputObjectKey: ${{ event.data.name }}
1561+
- outputBucket: {output_bucket}
1562+
- outputObjectKey: {output_object_key}
15561563
- jobPrefix: projects/{project_id}/locations/{project_region}/jobs/
15571564
- mapJobFullName: ${{ jobPrefix + "{map_job_name}" }}
15581565
- mapData:
@@ -1563,7 +1570,8 @@
15631570
overrides:
15641571
containerOverrides:
15651572
args:
1566-
- ${{ objectKey }}
1573+
- ${{ inputObjectKey }}
1574+
- ${{ outputObjectKey }}
15671575
- --input_bucket
15681576
- ${{ inputBucket }}
15691577
- --output_bucket
@@ -1574,9 +1582,10 @@
15741582
result: mapDataOperation
15751583
""",
15761584
map_job_name=map_cloud_run_job.name,
1585+
output_bucket=CLOUDFLARE_R2_BUCKET_URL,
1586+
output_object_key=OUTPUT_FILE_NAME,
15771587
project_id=PROJECT_ID,
15781588
project_region=PROJECT_REGION,
1579-
output_bucket_url=CLOUDFLARE_R2_BUCKET_URL,
15801589
),
15811590
opts=pulumi.ResourceOptions(
15821591
depends_on=enabled_services, provider=gcp_provider

infra/gcp/persistent/constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@
107107
try:
108108
CLOUDFLARE_R2_ACCESS_KEY_ID = os.environ["CLOUDFLARE_R2_ACCESS_KEY_ID"]
109109
CLOUDFLARE_R2_BUCKET_URL = os.environ["CLOUDFLARE_R2_BUCKET_URL"]
110+
CLOUDFLARE_R2_ENDPOINT_URL = os.environ["CLOUDFLARE_R2_ENDPOINT_URL"]
110111
CLOUDFLARE_R2_SECRET_ACCESS_KEY = os.environ[
111112
"CLOUDFLARE_R2_SECRET_ACCESS_KEY"
112113
]
@@ -123,6 +124,7 @@
123124
EXTRACTION_PIPELINE_SCHEDULE = os.environ["EXTRACTION_PIPELINE_SCHEDULE"]
124125
GEMINI_API_KEY = os.environ["GEMINI_API_KEY"]
125126
OUTPUT_FILE_MAX_AGE = os.environ["OUTPUT_FILE_MAX_AGE"]
127+
OUTPUT_FILE_NAME = os.environ["OUTPUT_FILE_NAME"]
126128
OUTPUT_FILE_TOTAL_MAX_ATTEMPTS = os.environ[
127129
"OUTPUT_FILE_TOTAL_MAX_ATTEMPTS"
128130
]

services/map/Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ container_output_dir := "app/output"
1515
project_name := $(current_dir)
1616
env_path := $(current_abs_path).env.dev
1717
input_file := cleaned.parquet
18+
output_file := debit_projects.parquet
1819

1920
# Define commands
2021
build-mapper:
@@ -27,4 +28,4 @@ run-mapper: build-mapper
2728
-v $(input_dir):/$(container_input_dir) \
2829
-v $(output_dir):/$(container_output_dir) \
2930
--env-file $(env_path) \
30-
--rm $(project_name) ${input_file}
31+
--rm $(project_name) ${input_file} ${output_file}

services/map/src/map_clean/constants.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,3 @@
77
RUNTIME_DIR = Path.cwd()
88
INPUT_DIR = RUNTIME_DIR / "input"
99
OUTPUT_DIR = RUNTIME_DIR / "output"
10-
OUTPUT_FNAME = "debit_projects.parquet"

services/map/src/map_clean/main.py

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import smart_open
1212

1313
# Package imports
14-
from map_clean.constants import INPUT_DIR, OUTPUT_DIR, OUTPUT_FNAME
14+
from map_clean.constants import INPUT_DIR, OUTPUT_DIR
1515
from map_clean.utils import configure_cloudflare_request_params, LoggerFactory
1616

1717

@@ -156,10 +156,15 @@ def get_document(row: pd.Series) -> str:
156156
# Parse command line arguments
157157
parser = argparse.ArgumentParser()
158158
parser.add_argument(
159-
"object_key",
159+
"input_object_key",
160160
type=str,
161161
help="The path to the input file in the storage directory or bucket.",
162162
)
163+
parser.add_argument(
164+
"output_object_key",
165+
type=str,
166+
help="The path to the output file in the storage directory or bucket.",
167+
)
163168
parser.add_argument(
164169
"--input_bucket",
165170
type=str,
@@ -179,14 +184,21 @@ def get_document(row: pd.Series) -> str:
179184
)
180185
args = parser.parse_args()
181186

182-
# Validate object key argument received
183-
if not args.object_key:
187+
# Validate positional arguments received
188+
if not args.input_object_key:
184189
logger.error(
185190
"Missing positional argument for the path to the "
186191
"input file in the storage directory or bucket."
187192
)
188193
exit(1)
189194

195+
if not args.output_object_key:
196+
logger.error(
197+
"Missing positional argument for the path to the "
198+
"output file in the storage directory or bucket."
199+
)
200+
exit(1)
201+
190202
# Determine path for input and output files
191203
if args.remote:
192204
# Validate bucket options present if files hosted remotely
@@ -198,10 +210,10 @@ def get_document(row: pd.Series) -> str:
198210
exit(1)
199211

200212
# Compose path to remote input file
201-
input_fpath = f"{args.input_bucket}/{args.object_key}"
213+
input_fpath = f"{args.input_bucket}/{args.input_object_key}"
202214

203215
# Compose path to remote output file
204-
output_fpath = f"{args.output_bucket}/{OUTPUT_FNAME}"
216+
output_fpath = f"{args.output_bucket}/{args.output_object_key}"
205217

206218
# Compose storage transport parameters
207219
try:
@@ -211,11 +223,11 @@ def get_document(row: pd.Series) -> str:
211223
exit(1)
212224
else:
213225
# Compose path to local input file
214-
input_fpath = f"{INPUT_DIR}/{args.object_key}"
226+
input_fpath = f"{INPUT_DIR}/{args.input_object_key}"
215227

216228
# Compose path to local output file
217229
Path.mkdir(OUTPUT_DIR, exist_ok=True)
218-
output_fpath = f"{OUTPUT_DIR}/{OUTPUT_FNAME}"
230+
output_fpath = f"{OUTPUT_DIR}/{args.output_object_key}"
219231

220232
# Compose storage transport parameters
221233
transport_params = {}

services/map/src/map_clean/utils/storage.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ def configure_cloudflare_request_params() -> dict:
2828
# Parse environment variables
2929
try:
3030
cloudflare_access_key_id = os.environ["CLOUDFLARE_R2_ACCESS_KEY_ID"]
31+
cloudflare_r2_endpoint_url = os.environ["CLOUDFLARE_R2_ENDPOINT_URL"]
3132
cloudflare_secret_access_key = os.environ["CLOUDFLARE_R2_SECRET_ACCESS_KEY"]
3233
output_file_max_age = int(os.getenv("OUTPUT_FILE_MAX_AGE", "3600"))
3334
output_file_max_attempts = int(os.getenv("OUTPUT_FILE_TOTAL_MAX_ATTEMPTS", "3"))
@@ -53,7 +54,11 @@ def configure_cloudflare_request_params() -> dict:
5354
aws_access_key_id=cloudflare_access_key_id,
5455
aws_secret_access_key=cloudflare_secret_access_key,
5556
)
56-
client = session.client("s3", config=config)
57+
client = session.client(
58+
"s3",
59+
endpoint_url=cloudflare_r2_endpoint_url,
60+
config=config,
61+
)
5762
except Exception as e:
5863
raise RuntimeError(
5964
"Failed to configure new client for writing "

services/transform/src/clean_raw/main.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
# Standard library imports
44
import argparse
55
import logging
6-
from datetime import datetime, UTC
76
from pathlib import Path
87

98
# Third-party imports
@@ -206,10 +205,13 @@ def main(input_fpath: str, output_fpath: str, logger: logging.Logger) -> None:
206205
# Compose path to remote input file
207206
input_fpath = f"{args.input_bucket}/{args.object_key}"
208207

208+
# Parse input file name and date from input file object key
209+
input_fpath_segments = args.object_key.split("/")
210+
input_fname = input_fpath_segments[-1]
211+
input_date = input_fpath_segments[-2]
212+
209213
# Compose path to remote output file
210-
today = datetime.now(tz=UTC).strftime("%Y-%m-%d")
211-
input_fname = args.object_key.split("/")[-1]
212-
output_obj_key = f"transformation/{today}/{input_fname}.parquet"
214+
output_obj_key = f"transformation/{input_date}/{input_fname}.parquet"
213215
output_fpath = f"{args.output_bucket}/{output_obj_key}"
214216
else:
215217
# Compose path to local input file

0 commit comments

Comments
 (0)