error uploading attchments
This Python script automates uploading files to a Part-DB instance, automatically creating parts from directory names and attaching files within those directories to the corresponding parts.
Features Recursively processes directories to create parts Uploads files as attachments Supports multiple file types (STL, SVG, JPG, PNG, GIF, BMP, TXT, XCF, PSD, 3MF) Custom MIME type handling for specialized files Interactive selection of categories and attachment types
import os
import mimetypes
import requests
import base64
API_BASE_URL = "http://parts.local:8080/api"
API_KEY = "tcp_160835b5abbb7818da705abce584a34eb11fea7e7acbdef3274e56c1f887d698"
# Register custom MIME types
mimetypes.add_type('application/sla', '.stl')
mimetypes.add_type('image/x-xcf', '.xcf')
mimetypes.add_type('image/vnd.adobe.photoshop', '.psd')
mimetypes.add_type('model/3mf', '.3mf')
# ALLOWED_EXTS list already includes images, txt, psd, xcf, stl, etc.
ALLOWED_EXTS = {'.stl', '.svg', '.jpg', '.jpeg', '.png', '.gif', '.bmp', '.txt', '.xcf', '.psd', '.3mf'}
def get_categories():
response = requests.get(
f"{API_BASE_URL}/categories",
headers={"Authorization": f"Bearer {API_KEY}"}
)
if response.status_code != 200:
print("❌ Failed to fetch categories:", response.text)
return []
data = response.json()
return data.get("hydra:member", data)
def select_category(categories):
print("\nAvailable Categories:")
for idx, cat in enumerate(categories):
print(f"{idx+1}: {cat.get('name', cat.get('title', str(cat)))} ({cat.get('@id', cat.get('id', ''))})")
while True:
choice = input("Select category number: ").strip()
if not choice.isdigit() or not (1 <= int(choice) <= len(categories)):
print("Invalid selection. Try again.")
else:
return categories[int(choice)-1]['@id']
def create_part(part_name, category_iri):
payload = {
"name": part_name,
"description": f"Auto-created from folder {part_name}",
"category": category_iri
}
response = requests.post(
f"{API_BASE_URL}/parts",
json=payload,
headers={"Authorization": f"Bearer {API_KEY}"}
)
if response.status_code not in (200, 201):
print(f"❌ Failed to create part: {response.status_code} {response.text}")
return None
part = response.json()
return part.get("@id") or part.get("id")
def upload_attachment(part_iri, file_path, attachment_type_iri):
filename = os.path.basename(file_path)
ext = os.path.splitext(filename)[1].lower()
if ext not in ALLOWED_EXTS:
print(f"⚠️ Skipping {filename}: Not an allowed file type")
return
mime_type, _ = mimetypes.guess_type(filename)
if not mime_type:
print(f"⚠️ Skipping {filename}: Unknown MIME type")
return
with open(file_path, "rb") as f:
encoded = base64.b64encode(f.read()).decode("utf-8")
payload = {
"_type": "Attachment",
"element": part_iri,
"name": filename,
"filename": filename,
"data": f"data:{mime_type};base64,{encoded}",
"private": False,
"attachment_type": attachment_type_iri,
}
response = requests.post(
f"{API_BASE_URL}/attachments",
json=payload,
headers={
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
)
if response.status_code in (200, 201):
print(f"📎 Uploaded: {filename}")
else:
print(f"⚠️ Failed to upload {filename}: {response.status_code} {response.text}")
def process_directory(root_dir, category_iri, attachment_type_iri):
for dirpath, dirnames, filenames in os.walk(root_dir):
# Ensure we get a valid folder name. os.path.normpath removes trailing separators.
part_name = os.path.basename(os.path.normpath(dirpath))
if not part_name:
print(f"⚠️ Skipping directory {dirpath}: Part name is blank")
continue
print(f"\n📦 Creating part for directory: {dirpath} (Part name: {part_name})")
part_iri = create_part(part_name, category_iri)
if not part_iri:
continue
for filename in filenames:
file_path = os.path.join(dirpath, filename)
upload_attachment(part_iri, file_path, attachment_type_iri)
def get_attachment_types():
response = requests.get(
f"{API_BASE_URL}/attachment_types",
headers={"Authorization": f"Bearer {API_KEY}"}
)
if response.status_code != 200:
print("❌ Failed to fetch attachment types:", response.text)
return []
data = response.json()
return data.get("hydra:member", data)
def select_attachment_type(types):
print("\nAvailable Attachment Types:")
for idx, t in enumerate(types):
print(f"{idx+1}: {t.get('name', t.get('title', str(t)))} ({t.get('@id', t.get('id', ''))})")
while True:
choice = input("Select attachment type number: ").strip()
if not choice.isdigit() or not (1 <= int(choice) <= len(types)):
print("Invalid selection. Try again.")
else:
return types[int(choice)-1]['@id']
def inspect_attachments():
response = requests.get(
f"{API_BASE_URL}/attachments",
headers={"Authorization": f"Bearer {API_KEY}"}
)
if response.status_code == 200:
data = response.json()
if data.get("hydra:member"):
print(f"Example attachment: {data['hydra:member'][0]}")
def find_attachments(name=None, part_id=None):
"""Find attachments by name or associated part"""
query_params = []
if name:
query_params.append(f"name={name}")
if part_id:
query_params.append(f"part={part_id}")
query_string = "&".join(query_params)
url = f"{API_BASE_URL}/attachments"
if query_string:
url = f"{url}?{query_string}"
response = requests.get(
url,
headers={"Authorization": f"Bearer {API_KEY}"}
)
if response.status_code == 200:
data = response.json()
attachments = data.get("hydra:member", [])
if attachments:
print(f"Found {len(attachments)} attachments:")
for idx, attachment in enumerate(attachments):
print(f"{idx+1}: {attachment.get('name')} ({attachment.get('@id')})")
return attachments
else:
print("No attachments found matching the criteria.")
return []
else:
print(f"❌ Failed to fetch attachments: {response.status_code} {response.text}")
return []
print("###############")
#inspect_attachments()
#find_attachments(part_id="/api/part/33") ##Replace with actual part ID
#print("---------")
def main():
root_dir = input("Enter root directory to process recursively: ").strip()
if not os.path.isdir(root_dir):
print("❌ Invalid directory.")
return
categories = get_categories()
if not categories:
print("❌ No categories available.")
return
category_iri = select_category(categories)
attachment_types = get_attachment_types()
if not attachment_types:
print("❌ No attachment types available.")
return
attachment_type_iri = select_attachment_type(attachment_types)
# Add this line to actually process the directory:
process_directory(root_dir, category_iri, attachment_type_iri)
if __name__ == "__main__":
main()
I am getting the following error. tried a bunch of fixed within the upload_attachment payload to no avail, and seems like the api wants then doesn't want the _type field.
Failed to upload 1-Nemo2-cookiecad 2in.stl: 400 {"@id":"/api/errors/400","@type":"hydra:Error","title":"An error occuurred","detail":"The type \u0022Attachment\u0022 is not a valid value.","status":400,"type":"/errors/400","description":"The type \u0022Attachment\u0022 is not a valid value.","hydra:description":"The type \u0022Attachment\u0022 is not a valid value.","hydra:title":"An error occurred"}
I got it to go through however the file is not actually uploaded to part-db. It returns file is not accessible. under the attachments tab for the the part each attachment is listed, but there's a red caution triangle that file cannot be found.
def upload_attachment(part_iri, file_path, attachment_type_iri):
filename = os.path.basename(file_path)
ext = os.path.splitext(filename)[1].lower()
if ext not in ALLOWED_EXTS:
print(f"⚠️ Skipping {filename}: Not an allowed file type")
return
mime_type, _ = mimetypes.guess_type(filename)
if not mime_type:
print(f"⚠️ Skipping {filename}: Unknown MIME type")
return
with open(file_path, "rb") as f:
encoded = base64.b64encode(f.read()).decode("utf-8")
payload = {
#"type": "Attachment",
"name": filename,
"part": part_iri,
"element": part_iri,
"filename": filename,
"data": f"data:{mime_type};base64,{encoded}",
"private": False,
"attachment_type": attachment_type_iri,
"picture": True
}
response = requests.post(
f"{API_BASE_URL}/attachments",
json=payload,
headers={
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
)
if response.status_code in (200, 201):
print(f"📎 Uploaded: {filename}")
verify_attachment("attachment_id") # Replace with actual attachment ID from response if needed
else:
print(f"⚠️ Failed to upload {filename}: {response.status_code} {response.text}")
What does the full stack traces from the error logs say?
excuse my ignorance, but where can i get those. I cant even seem to find /var/log/prod.log.
i got these logs- which look like it's where the logs in the web gui are coming from... 348 was uploaded manually and it works.
! [NOTE] There are a total of 355 log entries in the DB.
+-----+---------------------+-----------------+ Page 1 / 8 -------------+--------------------------------+ | ID | Timestamp | Type | User | Target Type | Target | +-----+---------------------+-----------------+-----------+-------------+--------------------------------+ | 355 | 2025-07-29 06:33:53 | element_created | (@admin) | Attachment | 4-Bow2.png (101) | | 354 | 2025-07-29 06:33:49 | element_created | (@admin) | Attachment | 4-Bow2-cookiecad_4.06x3.97in.s | | | | | | | tl (100) | | 353 | 2025-07-29 06:33:45 | element_created | (@admin) | Attachment | 4-Bow2-cookiecad_3.56x3.48in.s | | | | | | | tl (99) | | 352 | 2025-07-29 06:33:40 | element_created | (@admin) | Attachment | 3-Bow2.svg (98) | | 351 | 2025-07-29 06:33:37 | element_created | (@admin) | Attachment | 2-Bow2.png (97) | | 350 | 2025-07-29 06:33:34 | element_created | (@admin) | Attachment | 1-Bow2.jpeg (96) | | 349 | 2025-07-29 06:33:31 | element_created | (@admin) | Part | Bow2 (80) | | 348 | 2025-07-29 06:28:48 | element_created | (@admin) | Attachment | 2-Mickey One Basketball.png (9 |
The logs, which are often very helpful, are found in the subdirectory var/log/ of the installation directory of Part-DB. Notice that it's a relative path.
So if your Part-DB resides in e.g. /var/www/Part-DB-server, the log file is are in /var/www/Part-DB-server/var/log/prod.log
i've been looking all over. all i have is /var/www/Part-DB-server/var/log/prod.log located in /var/www/Part-DB-server/var/log/prod.log
i can't find anything else that even looks similar.
The API docs suggest to structure the POST like so (I tested this successfully):
{
"upload": {
"data": "data:text/plain;base64,dGVzdGZpbGU=",
"filename": "test file2",
"private": false
},
"name": "test file2",
"element": "/api/parts/1636",
"attachment_type": "/api/attachment_types/1002"
}
Thanks- it works!
`import os import mimetypes import requests import base64
API_BASE_URL = "http://parts.local:8080/api" API_KEY = "tcp_7a55a8c54569e0b2d6fa84eeea23276ce5f48130bf3eeb0422470f2c415c85ec"
Register custom MIME types
mimetypes.add_type('application/sla', '.stl') mimetypes.add_type('image/x-xcf', '.xcf') mimetypes.add_type('image/vnd.adobe.photoshop', '.psd') mimetypes.add_type('model/3mf', '.3mf') mimetypes.add_type('model/gltf-binary', '.glb') mimetypes.add_type('model/gltf+json', '.gltf') mimetypes.add_type('image/webp', '.webp')
ALLOWED_EXTS list - fix missing dot
ALLOWED_EXTS = {'.stl', '.svg', '.jpg', '.jpeg', '.png', '.gif', '.bmp', '.txt', '.xcf', '.psd', '.3mf', '.webp', '.webm', '.glb', '.gltf', '.obj', '.fbx', '.dae', '.zip', '.rar', '.tar', '.gz', '.7z'}
def get_categories(): response = requests.get( f"{API_BASE_URL}/categories", headers={"Authorization": f"Bearer {API_KEY}"} ) if response.status_code != 200: print("❌ Failed to fetch categories:", response.text) return [] data = response.json() return data.get("hydra:member", data)
def select_category(categories): print("\nAvailable Categories:") for idx, cat in enumerate(categories): print(f"{idx+1}: {cat.get('name', cat.get('title', str(cat)))} ({cat.get('@id', cat.get('id', ''))})") while True: choice = input("Select category number: ").strip() if not choice.isdigit() or not (1 <= int(choice) <= len(categories)): print("Invalid selection. Try again.") else: return categories[int(choice)-1]['@id']
def check_part_exists(part_name): """Check if a part with the given name already exists""" response = requests.get( f"{API_BASE_URL}/parts?name={part_name}", headers={"Authorization": f"Bearer {API_KEY}"} ) if response.status_code == 200: data = response.json() parts = data.get("hydra:member", []) for part in parts: if part.get('name') == part_name: return part.get('@id') or part.get('id') return None
def create_part(part_name, category_iri): # Check if part already exists existing_part_iri = check_part_exists(part_name) if existing_part_iri: print(f"ℹ️ Part '{part_name}' already exists: {existing_part_iri}") return existing_part_iri
payload = {
"name": part_name,
"description": f"Auto-created from folder {part_name}",
"category": category_iri
}
response = requests.post(
f"{API_BASE_URL}/parts",
json=payload,
headers={"Authorization": f"Bearer {API_KEY}"}
)
if response.status_code not in (200, 201):
print(f"❌ Failed to create part: {response.status_code} {response.text}")
return None
part = response.json()
print(f"✅ Created new part: {part_name}")
return part.get("@id") or part.get("id")
def get_attachment_types(): response = requests.get( f"{API_BASE_URL}/attachment_types", headers={"Authorization": f"Bearer {API_KEY}"} ) if response.status_code != 200: print("❌ Failed to fetch attachment types:", response.text) return [] data = response.json() return data.get("hydra:member", data)
def select_attachment_type(types): print("\nAvailable Attachment Types:") for idx, t in enumerate(types): print(f"{idx+1}: {t.get('name', t.get('title', str(t)))} ({t.get('@id', t.get('id', ''))})") while True: choice = input("Select attachment type number: ").strip() if not choice.isdigit() or not (1 <= int(choice) <= len(types)): print("Invalid selection. Try again.") else: return types[int(choice)-1]['@id']
def check_attachment_exists(part_iri, filename): """Check if an attachment with the given filename already exists for the part""" response = requests.get( f"{API_BASE_URL}/attachments?element={part_iri}&name={filename}", headers={"Authorization": f"Bearer {API_KEY}"} ) if response.status_code == 200: data = response.json() attachments = data.get("hydra:member", []) return len(attachments) > 0 return False
def upload_attachment(part_iri, file_path, attachment_type_iri, picture=True): filename = os.path.basename(file_path) ext = os.path.splitext(filename)[1].lower() if ext not in ALLOWED_EXTS: print(f"⚠️ Skipping {filename}: Not an allowed file type") return
# Check if attachment already exists
if check_attachment_exists(part_iri, filename):
print(f"ℹ️ Attachment '{filename}' already exists for this part. Skipping.")
return
mime_type, _ = mimetypes.guess_type(filename)
if not mime_type:
print(f"⚠️ Skipping {filename}: Unknown MIME type")
return
with open(file_path, "rb") as f:
encoded = base64.b64encode(f.read()).decode("utf-8")
payload = {
"upload": {
"data": f"data:{mime_type};base64,{encoded}",
"filename": filename,
"private": False
},
"name": filename,
"element": part_iri,
"attachment_type": attachment_type_iri
}
response = requests.post(
f"{API_BASE_URL}/attachments",
json=payload,
headers={
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
)
if response.status_code in (200, 201):
print(f"📎 Uploaded: {filename}")
else:
print(f"⚠️ Failed to upload {filename}: {response.status_code} {response.text}")
def process_directory(root_dir, category_iri, attachment_type_iri): for dirpath, dirnames, filenames in os.walk(root_dir): part_name = os.path.basename(os.path.normpath(dirpath)) if not part_name: print(f"⚠️ Skipping directory {dirpath}: Part name is blank") continue print(f"\n📦 Processing directory: {dirpath} (Part name: {part_name})") part_iri = create_part(part_name, category_iri) if not part_iri: continue
preview_set = False
for filename in filenames:
file_path = os.path.join(dirpath, filename)
ext = os.path.splitext(filename)[1].lower()
# Set 'picture' True only for the first image file
if not preview_set and ext in {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp'}:
upload_attachment(part_iri, file_path, attachment_type_iri, picture=True)
preview_set = True
else:
upload_attachment(part_iri, file_path, attachment_type_iri, picture=False)
def inspect_attachments(): response = requests.get( f"{API_BASE_URL}/attachments", headers={"Authorization": f"Bearer {API_KEY}"} ) if response.status_code == 200: data = response.json() if data.get("hydra:member"): print(f"Example attachment: {data['hydra:member'][0]}")
def find_attachments(name=None, part_id=None): """Find attachments by name or associated part""" query_params = []
if name:
query_params.append(f"name={name}")
if part_id:
query_params.append(f"part={part_id}")
query_string = "&".join(query_params)
url = f"{API_BASE_URL}/attachments"
if query_string:
url = f"{url}?{query_string}"
response = requests.get(
url,
headers={"Authorization": f"Bearer {API_KEY}"}
)
if response.status_code == 200:
data = response.json()
attachments = data.get("hydra:member", [])
if attachments:
print(f"Found {len(attachments)} attachments:")
for idx, attachment in enumerate(attachments):
print(f"{idx+1}: {attachment.get('name')} ({attachment.get('@id')})")
return attachments
else:
print("No attachments found matching the criteria.")
return []
else:
print(f"❌ Failed to fetch attachments: {response.status_code} {response.text}")
return []
#print("###############") #inspect_attachments() #verify_attachment("46") #find_attachments(part_id="/api/part/46") ##Replace with actual part ID #print("---------")
def main(): root_dir = input("Enter root directory to process recursively: ").strip() if not os.path.isdir(root_dir): print("❌ Invalid directory.") return categories = get_categories() if not categories: print("❌ No categories available.") return category_iri = select_category(categories)
attachment_types = get_attachment_types()
if not attachment_types:
print("❌ No attachment types available.")
return
attachment_type_iri = select_attachment_type(attachment_types)
# Add this line to actually process the directory:
process_directory(root_dir, category_iri, attachment_type_iri)
if name == "main": main()`