-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
160 lines (124 loc) · 4.58 KB
/
Copy pathapp.py
File metadata and controls
160 lines (124 loc) · 4.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# Import required libraries
from flask import Flask, request, jsonify, render_template, make_response, send_from_directory
from inference_sdk import InferenceHTTPClient
from PIL import Image
import io
import os
from werkzeug.utils import secure_filename
import base64
from dotenv import load_dotenv
load_dotenv()
# Initialize Flask app
app = Flask(__name__)
# Load environment variables
api_key = os.environ.get("API_KEY")
workspace_name = os.environ.get("WORKSPACE_NAME")
workspace_id = os.environ.get("WORKFLOW_ID")
UPLOAD_FOLDER = os.environ.get("UPLOAD_FOLDER")
port = os.environ.get("PORT")
# Initialize Roboflow client
client = InferenceHTTPClient(
api_url="https://detect.roboflow.com",
api_key=api_key,
)
# Create upload folder if it doesn't exist
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
@app.route("/")
def home():
"""Homepage route that returns index.html.
Returns:
str: The rendered HTML of index.html
"""
return render_template("index.html")
@app.route("/detect", methods=["POST"])
def detect():
"""Handle image detection requests."""
# Get the image file from form submission
image_file = request.files["cameraInput"]
# Save uploaded file with secure filename
filename = secure_filename(image_file.filename)
image_file.save(os.path.join(UPLOAD_FOLDER, filename))
# Process and encode image for Roboflow
with open(os.path.join(UPLOAD_FOLDER, filename), "rb") as image_file:
# Read image bytes
image_bytes = image_file.read()
# Convert bytes to PIL Image
image = Image.open(io.BytesIO(image_bytes))
# Convert PIL Image to byte stream
buffered = io.BytesIO()
image.save(buffered, format="JPEG")
# Encode to base64 for API
img_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8")
# Send image to Roboflow for inference
result = client.run_workflow(
workspace_name=workspace_name,
workflow_id=workspace_id,
images={"image": img_base64, "data": img_base64},
)
# Clean up - remove uploaded file
os.remove(os.path.join(UPLOAD_FOLDER, filename))
# Extract results
image = result[0]["label_visualization"]
ai_response = result[0]["open_ai"]["output"]
detected_count = result[0]["property_definition"]
# Return JSON response with results
return jsonify(
{
"ai_response": ai_response,
"image": image,
"detected_count": detected_count,
}
)
@app.route("/final_detect", methods=["POST"])
def final_detect():
"""Handle final detection requests."""
# Get previous image and new image file
before_img_base64 = request.form.get("before_image", "false")
image_file = request.files["finalCameraInput"]
# Validate that phase 1 detection was done
if before_img_base64 == "false":
return make_response(
{"message": "You have to run the phase 1 detection first"}, 500
)
# Save uploaded file with secure filename
filename = secure_filename(image_file.filename)
image_file.save(os.path.join(UPLOAD_FOLDER, filename))
# Process and encode image for Roboflow
with open(os.path.join(UPLOAD_FOLDER, filename), "rb") as image_file:
# Read image bytes
image_bytes = image_file.read()
# Convert bytes to PIL Image
image = Image.open(io.BytesIO(image_bytes))
# Convert PIL Image to byte stream
buffered = io.BytesIO()
image.save(buffered, format="JPEG")
# Encode to base64 for API
img_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8")
# Send image to Roboflow for inference
result = client.run_workflow(
workspace_name=workspace_name,
workflow_id=workspace_id,
images={"image": img_base64, "data": img_base64},
)
# Clean up - remove uploaded file
os.remove(os.path.join(UPLOAD_FOLDER, filename))
# Extract results
image = result[0]["label_visualization"]
ai_response = result[0]["open_ai"]["output"]
detected_count = result[0]["property_definition"]
# Return JSON response with results and previous image
return jsonify(
{
"ai_response": ai_response,
"image": image,
"detected_count": detected_count
}
)
@app.route("/download/<filename>")
def download_file(filename):
# Specify the directory where your file is stored
directory = os.path.join(app.root_path, 'files')
return send_from_directory(directory, filename)
# Run the Flask app
if __name__ == "__main__":
app.run(host="0.0.0.0", port=port, debug=True)