blockbook icon indicating copy to clipboard operation
blockbook copied to clipboard

Signing and Broadcasting Zcash (ZEC) Transaction

Open TheHackitect opened this issue 4 months ago • 2 comments

" {"error":"-26: 16: mandatory-script-verify-flag-failed (Script evaluated without error but finished with a false/empty top stack element)"}"

2025-10-09 18:42:26,738:DEBUG:https://www.google.com:443 "GET / HTTP/11" 200 None 2025-10-09 18:42:42,792:DEBUG:Broadcasting ZEC tx: 0400008085202f8901ebe669a6235a0fa2e4362c97ef4f83fbc0b5a1aaca56b33b34b0c81fd496c55a000000006a47304402204d234b1793b017b9bce688b6f59f00000000001976a9141ec3af58c1c49e8f5fc1bfffff6fb8866bbccde188ac8c4f0100000000001976a914ab51ce8380f07d3f46e287f3abaf7b2ce7a9f32b88ac00000000b9352f00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000..................
2025-10-09 18:42:42,794:DEBUG:Starting new HTTPS connection (1): zcash-blockbook.twnodes.com:443 2025-10-09 18:42:44,146:DEBUG:https://zcash-blockbook.twnodes.com:443 "POST /api/v2/sendtx/ HTTP/11" 400 140 2025-10-09 18:42:44,148:ERROR:HTTP error for https://zcash-blockbook.twnodes.com/api/v2/sendtx/: 400 Client Error: Bad Request for url: https://zcash-blockbook.twnodes.com/api/v2/sendtx/ - {"error":"-26: 16: mandatory-script-verify-flag-failed (Script evaluated without error but finished with a false/empty top stack element)"}

2025-10-09 18:42:44,149:ERROR:Error broadcasting ZEC transaction: 400 Client Error: Bad Request for url: https://zcash-blockbook.twnodes.com/api/v2/sendtx/ Traceback (most recent call last): File "C:\Users\Thehackitect\Desktop\PROJECTS\crypto_manager\app\plugins\zcash_plugin.py", line 332, in broadcast_transaction
response_text = self._request("/sendtx/", method='POST', data=signed_tx_hex) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\Thehackitect\Desktop\PROJECTS\crypto_manager\app\plugins\zcash_plugin.py", line 149, in _request response.raise_for_status() File "C:\Users\Thehackitect\Desktop\PROJECTS\crypto_manager\venv\Lib\site-packages\requests\models.py", line 1024, in raise_for_status raise HTTPError(http_error_msg, response=self) requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: https://zcash-blockbook.twnodes.com/api/v2/sendtx/ 2025-10-09 18:42:55,370:DEBUG:Starting new HTTPS connection (1): www.google.com:443 2025-10-09 18:42:55,873:DEBUG:https://www.google.com:443 "GET / HTTP/11" 200 None 2025-10-09 18:42:56,361:DEBUG:Starting new HTTPS connection (1): www.google.com:443 2025-10-09 18:42:56,850:DEBUG:https://www.google.com:443 "GET / HTTP/11" 200 None

TheHackitect avatar Oct 10 '25 01:10 TheHackitect

Hello, can I ask what set up are you using and what library are you using for transaction signing? It would help to know more details other than error message.

etimofeeva avatar Oct 22 '25 21:10 etimofeeva

Hello, can I ask what set up are you using and what library are you using for transaction signing? It would help to know more details other than error message.

its python.. i have added my script too...

  • python HDwallet to generate wallet credentials
  • In case you dont want to download the file, i have pasted the script here (its a plugin within my python project)

`# zcash_plugin.py

from .base_plugin import BaseCryptoPlugin from hdwallet import HDWallet from hdwallet.cryptocurrencies import Zcash from hdwallet.mnemonics import BIP39Mnemonic from hdwallet.derivations import BIP44Derivation from datetime import datetime

from typing import List, Optional, Dict, Any import requests import time import hashlib import ecdsa from ecdsa.util import sigencode_der_canonize import base58 import struct import logging import math import json

A basic logger setup

try: from app.utils.logger import app_logger except ImportError: app_logger = logging.getLogger(name) if not app_logger.handlers: app_logger.setLevel(logging.DEBUG) handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) app_logger.addHandler(handler)

class ZcashPlugin(BaseCryptoPlugin): """ Robust Zcash Plugin supporting transparent (t-addr) and shielded (z-addr) addresses using Trezor's Blockbook API. """ # Switched to Trezor's Zcash Blockbook API BASE_URL = "https://zcash-blockbook.twnodes.com/api/v2" # Zcash has a standard fee for simple transparent transactions. STANDARD_FEE = 10000 # in zats

# Zcash specific constants for transaction signing (Sapling/ZIP 243)
ZEC_TRANSPARENT_VERSION = 4
ZEC_OVERWINTER_FLAG = (1 << 31)
ZEC_TX_VERSION_V4_OVERWINTERED = ZEC_TRANSPARENT_VERSION | ZEC_OVERWINTER_FLAG
ZEC_TX_VERSION_V4_OVERWINTERED_BYTES = ZEC_TX_VERSION_V4_OVERWINTERED.to_bytes(4, 'little')

ZEC_TRANSPARENT_VERSION_GROUP_ID = 0x892F2085
ZEC_TRANSPARENT_VERSION_GROUP_ID_BYTES = ZEC_TRANSPARENT_VERSION_GROUP_ID.to_bytes(4, 'little')

# Consensus Branch IDs for different Zcash network upgrades (Mainnet)
BRANCH_ID_MAP = {
    "Overwinter": (0x5ba81b19, 347500),
    "Sapling": (0x76b809bb, 419200),
    "Blossom": (0x2bb40e60, 653600),
    "Heartwood": (0xf5b9230b, 903000),
    "Canopy": (0xe9ff75a6, 1046400),
    "NU5": (0xc2d6d0b4, 1687104),  # CORRECTED: This is the correct mainnet NU5 Branch ID
}


def __init__(self, api_key: str = None, blockcypher_api_key: str = None):
    self.api_key = api_key # Note: blockcypher_api_key is unused now
    app_logger.info("ZcashPlugin initialized with Trezor Blockbook API.")

# -------------------------------
# Plugin Identity Methods
# -------------------------------
def get_coin_name(self) -> str:
    return "Zcash"

def get_coin_acronym(self) -> str:
    return "ZEC"

def get_coin_description(self) -> str:
    return "A robust Zcash plugin supporting transparent and shielded addresses via the Trezor Blockbook API."

def get_supported_features(self) -> List[str]:
    return [
        "Wallet Creation (Transparent)",
        "Balance Retrieval (Transparent & Shielded)",
        "Transaction Creation (Transparent)",
        "Transaction History (Transparent & Shielded)",
    ]

def get_version(self) -> str:
    return "7.3.0" # Definitive fix for consensus branch ID selection logic.

def get_exponent(self) -> int:
    return 10**8

def get_author(self) -> str:
    return "Gemini"

def get_network(self) -> str:
    return "mainnet"

def get_icon_url(self) -> str:
    return "https://s2.coinmarketcap.com/static/img/coins/64x64/1437.png"

def get_block_explorer_url(self, txid: str) -> str:
    return f"https://mainnet.zcashexplorer.app/transactions/{txid}"

def get_wallet_explorer_url(self, address: str) -> str:
    return f"https://mainnet.zcashexplorer.app/address/{address}"

# -------------------------------
# Plugin Lifecycle Methods
# -------------------------------
def register_plugin(self):
    app_logger.info("Registering Zcash Plugin.")

def initialize_plugin(self):
    app_logger.info("Initializing Zcash Plugin.")

def shutdown_plugin(self):
    app_logger.info("Shutting down Zcash Plugin.")

# -------------------------------
# Core Functionality Methods
# -------------------------------
def _request(self, endpoint: str, params: dict = None, method: str = 'GET', data: Any = None) -> dict:
    url = f"{self.BASE_URL}{endpoint}"
    headers = {}
    post_data = None
    
    if endpoint == "/sendtx/":
        headers['Content-Type'] = 'text/plain'
        post_data = data
    elif data is not None:
        headers['Content-Type'] = 'application/json'
        post_data = json.dumps(data)

    try:
        if method == 'GET':
            response = requests.get(url, params=params, timeout=20)
        else:  # POST
            response = requests.post(url, data=post_data, headers=headers, timeout=20)
        
        response.raise_for_status()

        if endpoint == "/sendtx/":
            try:
                return response.json() 
            except json.JSONDecodeError:
                result_text = response.text.strip()
                if len(result_text.strip('"')) == 64 and all(c in '0123456789abcdefABCDEF' for c in result_text.strip('"')):
                     return {"result": result_text.strip('"')}
                else:
                     return {"error": f"Unrecognized response from sendtx: {result_text}"}
        return response.json()
    except requests.HTTPError as http_err:
        app_logger.error(f"HTTP error for {url}: {http_err} - {http_err.response.text}")
        raise
    except Exception as err:
        app_logger.error(f"An error occurred for {url}: {err}")
        raise

def create_wallet_from_mnemonic(self, mnemonic: str, account_index: int = 0) -> Optional[Dict[str, Any]]:
    try:
        hdwallet_zec = HDWallet(cryptocurrency=Zcash)
        hdwallet_zec.from_mnemonic(mnemonic=BIP39Mnemonic(mnemonic=mnemonic))
        hdwallet_zec.clean_derivation()
        
        hdwallet_zec.from_derivation(
            BIP44Derivation(coin_type=133, account=account_index, change=False, address=0)
        )
        
        return {
            "address": hdwallet_zec.address(),
            "private_key": hdwallet_zec.private_key(),
            "public_key": hdwallet_zec.public_key(),
            "derivation_path": hdwallet_zec.path(),
            "wif": hdwallet_zec.wif(),
            "account_index": account_index
        }
    except Exception as e:
        app_logger.error(f"Failed to create ZEC wallet: {e}", exc_info=True)
        return None

def get_balance(self, address: str) -> Dict:
    data = self._request(f"/address/{address}", params={"details": "basic"})
    return {
        "confirmed_balance": int(data.get("balance", 0)),
        "unconfirmed_balance": int(data.get("unconfirmedBalance", 0)),
        "total_received": int(data.get("totalReceived", 0))
    }

def create_transaction(self, input_address: str, outputs: List[Dict], private_key_wif: str, min_fee: Optional[int] = None) -> Dict:
    app_logger.info('ZEC Transaction Creation initialized.....')
    try:
        utxos_raw = self._request(f"/utxo/{input_address}")
        if not utxos_raw:
            raise ValueError("No UTXOs available.")

        all_utxos = sorted([
            {
                'transaction_hash': utxo['txid'],
                'index': utxo['vout'],
                'value': int(utxo['value']),
                'script_hex': utxo.get('hex')
            } for utxo in utxos_raw
        ], key=lambda u: u['value'])

        for utxo in all_utxos:
            if not utxo.get('script_hex'):
                app_logger.debug(f"Fetching tx details for {utxo['transaction_hash']} to get script_hex")
                tx_details = self.get_transaction_details(utxo['transaction_hash'])
                utxo['script_hex'] = tx_details['vout'][utxo['index']]['hex']

        total_output_value = sum(int(o['amount'] * self.get_exponent()) for o in outputs)
        fee = self.STANDARD_FEE
        
        selected_utxos = []
        total_input_value = 0
        for utxo in all_utxos:
            selected_utxos.append(utxo)
            total_input_value += utxo['value']
            if total_input_value >= total_output_value + fee:
                break
        else:
            raise ValueError(f"Insufficient funds. Required {(total_output_value + fee) / self.get_exponent()} ZEC.")
        
        change = total_input_value - total_output_value - fee

        status_data = self._request("/status")
        expiry_height = status_data.get("blockbook", {}).get("bestHeight", 0) + 20

        raw_tx = {"inputs": [], "outputs": [], "expiry_height": expiry_height}
        for utxo in selected_utxos:
            raw_tx["inputs"].append({
                "prev_hash": utxo['transaction_hash'],
                "output_index": utxo['index'],
                "amount": utxo['value'],
                "scriptPubKey": utxo['script_hex']
            })
        
        for output in outputs:
            script_pubkey, _, _ = self.address_to_scriptPubKey(output['address'])
            raw_tx["outputs"].append({
                "value": int(output['amount'] * self.get_exponent()),
                "address": output['address'],
                "script_pubkey": script_pubkey
            })

        if change > 546:
            script_pubkey, _, _ = self.address_to_scriptPubKey(input_address)
            raw_tx["outputs"].append({
                "value": change, "address": input_address, "script_pubkey": script_pubkey
            })
        
        private_key_hex, compressed = self.wif_to_private_key(private_key_wif)
        signed_tx_hex = self.sign_transaction(raw_tx, private_key_hex, compressed)

        app_logger.debug(f"Final Signed ZEC TX Hex: {signed_tx_hex}")

        return {
            "transaction_fee": fee,
            "transaction_size": len(bytes.fromhex(signed_tx_hex)),
            "signed_transaction_hex": signed_tx_hex,
            "raw_transaction_data": raw_tx,
        }
    except Exception as e:
        app_logger.error(f"Error creating ZEC transaction: {e}", exc_info=True)
        raise

def get_transactions(self, address: str, limit: int = 10, txstart: int = 0) -> List[Dict]:
    try:
        page = (txstart // limit) + 1
        response = self._request(f"/address/{address}", params={'details': 'txs', 'page': page, 'pageSize': limit})
        transactions = response.get('transactions', [])
        
        for tx in transactions:
            value_in = sum(
                int(vin.get('value', 0)) for vin in tx.get('vin', [])
                if address in vin.get('addresses', [])
            )
            
            value_out = sum(
                int(vout.get('value', 0)) for vout in tx.get('vout', [])
                if address in vout.get('addresses', [])
            )
            
            tx['balance_change'] = value_out - value_in
            tx['hash'] = tx.get('txid')
            tx['time'] = datetime.fromtimestamp(tx.get('blockTime')).isoformat() if tx.get('blockTime') else None
        
        return transactions
    except Exception as e:
        app_logger.error(f"Error fetching ZEC transactions for {address}: {e}")
        return []

def get_transaction_details(self, txid: str) -> Optional[Dict]:
    try:
        return self._request(f"/tx/{txid}")
    except Exception as e:
        app_logger.error(f"Error getting ZEC transaction details: {e}")
        return None

def validate_address(self, address: str) -> bool:
    if not (address.startswith('t') or address.startswith('z')):
        return False
    if address.startswith('t'):
        try:
            decoded = base58.b58decode_check(address)
            return decoded[:2] in [b'\x1c\xb8', b'\x1c\xbd']
        except Exception:
            return False
    elif address.startswith('z'):
        return len(address) > 80
    return False

def load_private_key(self, wif: str) -> Dict:
    private_key, compressed = self.wif_to_private_key(wif)
    return {"private_key": private_key, "compressed": compressed}

def get_public_key_from_private(self, private_key_hex: str, compressed: bool) -> str:
    sk = ecdsa.SigningKey.from_string(bytes.fromhex(private_key_hex), curve=ecdsa.SECP256k1)
    return sk.get_verifying_key().to_string("compressed" if compressed else "uncompressed").hex()

def get_private_key_priority(self) -> List[str]:
    return ["wif", "private_key"]

def broadcast_transaction(self, signed_tx_hex: str) -> Optional[str]:
    try:
        app_logger.debug(f"Broadcasting ZEC tx: {signed_tx_hex}")
        response = self._request("/sendtx/", method='POST', data=signed_tx_hex)
        txid = response.get("result")
        if txid and len(txid) == 64:
            app_logger.info(f"ZEC transaction broadcasted. TXID: {txid}")
            return txid
        raise Exception(f"API broadcast error: {response.get('error', 'Unknown error')}")
    except Exception as e:
        app_logger.error(f"Error broadcasting ZEC transaction: {e}", exc_info=True)
        raise

# -------------------------------
# Helper & Signing Methods
# -------------------------------
def wif_to_private_key(self, wif: str) -> tuple[str, bool]:
    decoded = base58.b58decode_check(wif)
    if decoded[0] != 0x80:
        raise ValueError("Invalid WIF version byte for ZEC Mainnet")
    if len(decoded) == 34 and decoded[-1] == 0x01:
        return decoded[1:-1].hex(), True
    return decoded[1:].hex(), False

def address_to_scriptPubKey(self, address: str) -> tuple[str, str, None]:
    decoded = base58.b58decode_check(address)
    pubkey_hash = decoded[2:].hex()
    if decoded[:2] == b'\x1c\xb8': # P2PKH (t1)
        return f"76a914{pubkey_hash}88ac", 'p2pkh', None
    elif decoded[:2] == b'\x1c\xbd': # P2SH (t3)
        return f"a914{pubkey_hash}87", 'p2sh', None
    raise ValueError("Unsupported Zcash transparent address type for signing")

def encode_varint(self, i: int) -> str:
    if i < 0xfd: return f"{i:02x}"
    if i <= 0xffff: return "fd" + struct.pack("<H", i).hex()
    if i <= 0xffffffff: return "fe" + struct.pack("<I", i).hex()
    return "ff" + struct.pack("<Q", i).hex()

def _hash_blake2b(self, data: bytes, key: bytes) -> bytes:
    return hashlib.blake2b(data, digest_size=32, person=key).digest()

def serialize_transaction(self, raw_tx: dict) -> str:
    header = self.ZEC_TX_VERSION_V4_OVERWINTERED_BYTES + self.ZEC_TRANSPARENT_VERSION_GROUP_ID_BYTES
    
    inputs = b''
    for txin in raw_tx["inputs"]:
        inputs += bytes.fromhex(txin["prev_hash"])[::-1]
        inputs += struct.pack("<I", txin["output_index"])
        script_sig = bytes.fromhex(txin.get('scriptSig', ''))
        inputs += bytes.fromhex(self.encode_varint(len(script_sig))) + script_sig
        inputs += b'\xff\xff\xff\xff'
    
    outputs = b''
    for txout in raw_tx["outputs"]:
        outputs += struct.pack("<q", txout["value"])
        script_pubkey = bytes.fromhex(txout["script_pubkey"])
        outputs += bytes.fromhex(self.encode_varint(len(script_pubkey))) + script_pubkey
    
    lock_time = b'\x00\x00\x00\x00'
    expiry_height = struct.pack("<I", raw_tx["expiry_height"])
    value_balance = b'\x00\x00\x00\x00\x00\x00\x00\x00'
    
    n_shielded_spend = b'\x00'
    n_shielded_output = b'\x00'
    
    n_joinsplit = b'\x00'
    
    # The binding signature is only required for transactions with shielded components.
    # For a transparent-only transaction, it is a field of 64 zero-bytes.
    binding_sig = b'\x00' * 64
    
    tx_parts = [
        header,
        bytes.fromhex(self.encode_varint(len(raw_tx["inputs"]))),
        inputs,
        bytes.fromhex(self.encode_varint(len(raw_tx["outputs"]))),
        outputs,
        lock_time,
        expiry_height,
        value_balance,
        n_shielded_spend,
        n_shielded_output,
        n_joinsplit
    ]
    
    # In Sapling, there is no binding_sig for transparent-only transactions.
    # The structure is defined by ZIP 243, which omits it if there are no shielded components.
    # However, many serializers might still expect it to be zeroed out. Let's trace the preimage
    # creation in `sign_transaction` which is the source of truth. The final serialization
    # should match that structure. For now, we will assume it's omitted for pure transparent.
    # After reviewing ZIP 243, the transaction format for v4 does *not* include the binding_sig
    # field if there are no shielded spends or outputs.
    # The structure is: header, vin, vout, nLockTime, nExpiryHeight, valueBalance, 
    # vShieldedSpend, vShieldedOutput, vJoinSplit.

    tx = b"".join(tx_parts)

    app_logger.info(f"Serialized TX for broadcasting: {tx.hex()}")
    return tx.hex()


def sign_transaction(self, raw_tx: dict, private_key_hex: str, compressed: bool) -> str:
    sk = ecdsa.SigningKey.from_string(bytes.fromhex(private_key_hex), curve=ecdsa.SECP256k1)
    pubkey_hex = sk.get_verifying_key().to_string("compressed" if compressed else "uncompressed").hex()
    pubkey_bytes = bytes.fromhex(pubkey_hex)
    
    hash_type = 1

    # ** FIX IS HERE **
    # Dynamically determine the consensus branch ID based on the transaction's expiry height.
    # This makes the signing process resilient to future network upgrades.
    
    # Default to the first known branch ID if height is very low (unlikely)
    consensus_branch_id = self.BRANCH_ID_MAP["Overwinter"][0]
    
    # Sort branches by their activation height to ensure we pick the correct one.
    sorted_branches = sorted(self.BRANCH_ID_MAP.items(), key=lambda item: item[1][1])
    
    for name, (branch_id, activation_height) in sorted_branches:
        if raw_tx["expiry_height"] >= activation_height:
            consensus_branch_id = branch_id
        else:
            # Since the list is sorted, we can stop once the expiry height is less than
            # the activation height of the current branch.
            break
    
    app_logger.info(f"Using consensus branch ID: {hex(consensus_branch_id)} for expiry height {raw_tx['expiry_height']}")
    consensus_branch_id_bytes = consensus_branch_id.to_bytes(4, 'little')

    # Pre-compute hashes for different parts of the transaction as per ZIP 243
    hash_prevouts = self._hash_blake2b(b"".join(bytes.fromhex(i['prev_hash'])[::-1] + struct.pack('<I', i['output_index']) for i in raw_tx['inputs']), b'ZcashPrevoutHash')
    # CORRECTED: The 'person' parameter for BLAKE2b must be at most 16 bytes.
    # The spec name "ZcashSequenceHash" is 17 bytes, so we truncate it as per reference implementations.
    hash_sequence = self._hash_blake2b(b"".join(b'\xff\xff\xff\xff' for _ in raw_tx['inputs']), b'ZcashSequenceHash'[:16])
    
    outputs_bytes = b""
    for o in raw_tx['outputs']:
        outputs_bytes += struct.pack('<q', o['value'])
        script_bytes = bytes.fromhex(o['script_pubkey'])
        outputs_bytes += bytes.fromhex(self.encode_varint(len(script_bytes))) + script_bytes
    hash_outputs = self._hash_blake2b(outputs_bytes, b'ZcashOutputsHash')

    # Sign each input separately
    for i, txin in enumerate(raw_tx['inputs']):
        # Construct the preimage for the signature hash (sighash)
        preimage = (
            self.ZEC_TX_VERSION_V4_OVERWINTERED_BYTES +
            self.ZEC_TRANSPARENT_VERSION_GROUP_ID_BYTES +
            hash_prevouts +
            hash_sequence +
            hash_outputs +
            b'\x00' * 32 + # hashJoinSplits (zeroed as there are no shielded components)
            b'\x00' * 32 + # hashShieldedSpends (zeroed)
            b'\x00' * 32 + # hashShieldedOutputs (zeroed)
            b'\x00\x00\x00\x00' + # nLockTime
            struct.pack("<I", raw_tx["expiry_height"]) +
            b'\x00' * 8 + # valueBalance (zeroed)
            struct.pack("<I", hash_type) +
            # Input-specific data
            bytes.fromhex(txin['prev_hash'])[::-1] + struct.pack('<I', txin['output_index']) +
            bytes.fromhex(self.encode_varint(len(bytes.fromhex(txin['scriptPubKey'])))) + bytes.fromhex(txin['scriptPubKey']) +
            struct.pack("<q", txin["amount"]) +
            b'\xff\xff\xff\xff' # nSequence
        )
        
        # Personalize the BLAKE2b hash with the correct branch ID
        sighash_personalization = b'ZcashSigHash' + consensus_branch_id_bytes
        sighash = self._hash_blake2b(preimage, sighash_personalization)
        
        app_logger.debug(f"Preimage for input {i}: {preimage.hex()}")
        app_logger.debug(f"Sighash for input {i}: {sighash.hex()}")
        
        # Sign the digest
        signature = sk.sign_digest(sighash, sigencode=sigencode_der_canonize)
        
        # Append the hash type and construct the final scriptSig
        sig_plus_hashtype = signature + bytes([hash_type])
        txin['scriptSig'] = self.encode_varint(len(sig_plus_hashtype)) + sig_plus_hashtype.hex() + self.encode_varint(len(pubkey_bytes)) + pubkey_hex
        
    return self.serialize_transaction(raw_tx)

# -------------------------------
# UI and Other Methods
# -------------------------------
def get_transaction_form_fields(self) -> Dict[str, List[Dict]]:
    return {
        "max_outputs": 5,
        "notes": [
            "<b>Zcash Notice:</b> This plugin supports sending from transparent addresses (starting with 't') only. You can send to any address type."
        ],
        "required": [
            {
                "name": "address",
                "type": "string",
                "label": "Recipient Address",
                "help": "The ZEC transparent or shielded address to receive funds."
            },
            {
                "name": "amount",
                "type": "float",
                "label": "Amount (ZEC)",
                "help": "Amount in ZEC to send."
            }
        ],
        "optional": []
    }

def supports_thorchain_memo(self) -> bool:
    return False

def get_fee_estimation_type(self) -> str:
    return 'utxo'

def parse_raw_transaction_for_summary(self, raw_tx: dict, wallet_address: str) -> dict:
    recipient_address, amount_sats = "", 0
    for out in raw_tx.get('outputs', []):
        if out.get('address') != wallet_address and out.get('value', 0) > 0:
            if not recipient_address:
                recipient_address = out.get('address')
                amount_sats = out.get('value')
    return {
        "recipient_address": recipient_address,
        "amount_sats": amount_sats,
        "memo": None
    }

def get_wallet_historical_data(self, address: str, limit: int = 50) -> List[Dict]:
    try:
        txs = self.get_transactions(address, limit=limit)
        historical_data = []

        for tx in txs:
            if not isinstance(tx, dict):
                app_logger.warning(f"Skipping non-dictionary transaction item: {tx}")
                continue
            
            balance_change = tx.get('balance_change', 0)
            is_sender = balance_change < 0
            
            historical_data.append({
                'date': tx.get('time'), # Already formatted in get_transactions
                'type': 'Expenditure' if is_sender else 'Income',
                'amount': abs(self.satoshis_to_zec(balance_change)),
                'txid': tx.get('txid')
            })
        return historical_data
    except Exception as e:
        app_logger.error(f"Error fetching ZEC historical data for {address}: {e}", exc_info=True)
        return []

def satoshis_to_zec(self, satoshis: int) -> float:
    return satoshis / self.get_exponent()

`

zcash_plugin.py

TheHackitect avatar Oct 22 '25 21:10 TheHackitect