scratchattach icon indicating copy to clipboard operation
scratchattach copied to clipboard

[Bug]: Unexpected Function Call Triggered by return in Cloud Requests

Open Okaz02 opened this issue 3 months ago • 16 comments

scratchattach version

2.1.13

What happened?

In Cloud Requests, even when the value returned via return does not exceed the 3000-character limit, there are cases where it unexpectedly causes a function that was not called to be triggered. It seems that Cloud Requests mistakenly invokes an unrelated function through the returned value.

Your code.

import scratchattach as sa
import cv2
import zlib
import base64
import numpy as np
import struct
import requests
from dotenv import load_dotenv
import os
global chash
chash={"IDs":{}}
def zigzag(block):
    h, w = block.shape
    assert h == 8 and w == 8
    result = []
    for s in range(h + w - 1):
        if s % 2 == 0:
            for y in range(s + 1):
                x = s - y
                if x < w and y < h:
                    result.append(block[y, x])
        else:
            for x in range(s + 1):
                y = s - x
                if x < w and y < h:
                    result.append(block[y, x])
    return result

def process_block(block):
    y_ch  = block[:, :, 0].astype(np.float32)
    cr_ch = block[:, :, 1].astype(np.float32)
    cb_ch = block[:, :, 2].astype(np.float32)

    dct_y  = cv2.dct(y_ch)
    dct_cr = cv2.dct(cr_ch)
    dct_cb = cv2.dct(cb_ch)

    q_y  = np.round(dct_y / 100 + 128).astype(np.uint8)
    q_cr = np.round(dct_cr / 100 + 128).astype(np.uint8)
    q_cb = np.round(dct_cb / 100 + 128).astype(np.uint8)

    zz_y  = zigzag(q_y)
    zz_cr = zigzag(q_cr)
    zz_cb = zigzag(q_cb)

    dc = [zz_y[0], zz_cr[0], zz_cb[0]]
    ac = zz_y[1:] + zz_cr[1:] + zz_cb[1:]
    return dc, ac

def compress_and_encode(values):
    num_chunks = 16
    chunk_size = (len(values) + num_chunks - 1) // num_chunks
    chunks = [values[i * chunk_size : (i + 1) * chunk_size] for i in range(num_chunks)]
    encoded_chunks = []
    for chunk in chunks:
        if not chunk:
            continue
        packed = struct.pack(f'{len(chunk)}B', *chunk)
        compressed = zlib.compress(packed, level=9)[2:-4]
        encoded = base64.b64encode(compressed).decode('utf-8')
        encoded_chunks.append(encoded)
    return encoded_chunks

def encode_image(img):
    img_ycrcb = cv2.cvtColor(cv2.cvtColor(img, cv2.COLOR_BGR2RGB), cv2.COLOR_RGB2YCrCb)
    h, w, _ = img_ycrcb.shape
    dc_values = []
    ac_values = []

    for y in range(0, h, 8):
        for x in range(0, w, 8):
            block = img_ycrcb[y:y+8, x:x+8]
            if block.shape[:2] == (8, 8):
                dc, ac = process_block(block)
                dc_values.extend(dc)
                ac_values.extend(ac)
    def split_into_chunks(values, num_chunks=16):
        chunk_size = (len(values) + num_chunks - 1) // num_chunks
        return [values[i * chunk_size : (i + 1) * chunk_size] for i in range(num_chunks)]

    dc_chunks = split_into_chunks(dc_values, 16)
    ac_chunks = split_into_chunks(ac_values, 16)
    encoded_dc = compress_and_encode(dc_values)[:16]
    encoded_ac = compress_and_encode(ac_values)[:16]
    result = [f"{h}:{w}:{encoded_ac[0]}:{encoded_dc[0]}"]
    for i in range(1, 16):
        result.append(f"{encoded_ac[i]}:{encoded_dc[i]}")
    return result


load_dotenv()
session = sa.login(os.getenv("User") , os.getenv("Passward"))
cloud = session.connect_cloud("****")
client = cloud.requests()
twcloud = sa.get_tw_cloud("****")
twclient = twcloud.requests()
@client.request
def AI(Text,Id,Index):
    url = '****'
    data = {'action': 'generate_image', 'prompt': Text,'aspect_ratio':'Select Aspect Ratio','hd':'1'}
    response = requests.post(url, data=data)
    response = response.json()
    print([Text,Id,Index])
    if "data" in response and int(Index) == 0:
     response = requests.get(response["data"]["image_link"])
     img_array = np.frombuffer(response.content, np.uint8)
     img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
     img = encode_image(img)
     chash["IDs"][Id] = {"Img": img}
     print(Id)
     with open(f"./{Id}.txt", "w", encoding="utf-8") as f:
       f.write("@"+"|".join(img))
     return img[0]
    else:
     if Id in chash["IDs"]:
       return chash["IDs"][Id]["Img"][int(Index)]
     else:
      return "retry"
@client.request
def Shair(Id,Index):
    with open(f"./{Id}.txt", "r", encoding="utf-8") as f:
      img = f.read()
    if img[0]=="@":
     return img[1:].split("|")[int(Index)]
    else:
     return "~"+img
@client.event
def on_ready():
    print("Request handler is running")
@twclient.request
def AI(Text,Id,Index):
    url = '****'
    data = {'action': 'generate_image', 'prompt': Text,'aspect_ratio':'Select Aspect Ratio','hd':'1'}
    response = requests.post(url, data=data)
    response = response.json()
    if "data" in response and int(Index) == 0:
     response = requests.get(response["data"]["image_link"])
     img_array = np.frombuffer(response.content, np.uint8)
     img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
     img = encode_image(img)
     chash["IDs"][Id] = {"Img": img}
     print(Id)
     with open(f"./{Id}.txt", "w", encoding="utf-8") as f:
       f.write("@"+"|".join(img))
     return img[0]
    else:
     if Id in chash["IDs"]:
      return chash["IDs"][Id]["Img"][int(Index)]
     else:
      return "retry"
@twclient.request
def Shair(Id,Index):
    with open(f"./{Id}.txt", "r", encoding="utf-8") as f:
      img = f.read()
    if img[0]=="@":
     return img[1:].split("|")[int(Index)]
    else:
     return "~"+img
@twclient.event
def on_ready():
    print("Request handler is running")
client.start(thread=True)
twclient.start(thread=True)

Traceback

Warning: Client received an unknown request called ''

Okaz02 avatar Aug 04 '25 04:08 Okaz02