RL_trader icon indicating copy to clipboard operation
RL_trader copied to clipboard

class TradingEnvironment en the live is making yuuge gains in learning

Open vpetrea71 opened this issue 11 months ago • 0 comments

import tensorflow as tf import numpy as np import ta import os import pandas as pd from binance.client import Client import datetime as dt from tf_agents.environments import py_environment from tf_agents.specs import array_spec from tf_agents.trajectories import time_step as ts import time

tf.compat.v1.enable_v2_behavior()

class TradingEnvironment(py_environment.PyEnvironment): """ Environment to train agent

Requires:
initial_balance - starting balance
features -

Goal:
Include relevant features to the state to help the agent make the best actions
Reward the agent properly for each action through the _step function to optimize P/L
"""

def __init__(self, initial_balance, features, position_increment=0.0005, fees=0.0000005):
    self.t = 0
    self.position_increment = position_increment
    self.fees = fees
    self.positions = []
    self.features = features
    self.initial_balance = self.balance = self.cash_balance = initial_balance

    # Additional features for state representation
    self.moving_average_window = 5

    # Define action and observation specs
    self._action_spec = array_spec.BoundedArraySpec(
        shape=(), dtype=np.int32, minimum=0, maximum=2, name='action')
    observation_shape = len(self.features.features.columns) + 1  # Add 1 for balance
    self._observation_spec = array_spec.BoundedArraySpec(
        shape=(observation_shape,), dtype=np.float32, name="observation")

def observation_spec(self):
    return self._observation_spec

def _step(self, action):
    if self._episode_ended:
        return self.reset()

    closing_price = self.features.price_data.iloc[self.t]['Close']
    features = self.features.features.iloc[self.t]

    # Immediate rewards
    immediate_reward = 0

    if action == 0:  # Hold
        print("\033[94mAgent chose to hold.\033[0m")
        ma5 = self.features.price_data.iloc[self.t-4:self.t+1]['Close'].mean()
        if closing_price > ma5:
            # Positive reward for holding in an uptrend
            immediate_reward += 2  # Placeholder value for positive reward in uptrend
        elif closing_price < ma5:
            # Positive reward for waiting in a downtrend
            immediate_reward += 1  # Placeholder value for positive reward in downtrend

    elif action == 1:  # Buy
        print("\033[92mAgent chose to buy.\033[0m")
        if len(self.positions) == 0:
            immediate_reward = 0
        else:
            profits = 0
        p = closing_price * self.position_increment * (1 + self.fees)
        price_change = closing_price - self.features.price_data.iloc[self.t - 1]['Close']

        if price_change < 0:
            immediate_reward = 1  # Positive reward for buying during a price decrease
        else:
            immediate_reward = 1  # Positive reward for buying during a price increase

        if p > self.cash_balance:
            immediate_reward = -1
        else:
            # Calculate transaction cost
            transaction_cost = p * self.fees
            self.cash_balance -= p + transaction_cost  # Deduct transaction cost
            self.positions.append(closing_price)
            immediate_reward += features['macd']

    elif action == 2:  # Sell
        print("\033[91mAgent chose to sell.\033[0m")
        if len(self.positions) == 0:
            immediate_reward = 0
        else:
            profits = 0
            for p in self.positions:
                profits += (closing_price - p) * self.position_increment * (1 - self.fees)
                self.cash_balance += closing_price * self.position_increment * (1 - self.fees)
            if profits > 0:
                # Calculate transaction cost
                transaction_cost = profits * self.fees
                immediate_reward += profits - transaction_cost - features['macd']
                # Reinvest profits into buying positions
                self.cash_balance -= profits
                num_new_positions = int(profits / closing_price / self.position_increment)
                for _ in range(num_new_positions):
                    self.positions.append(closing_price)
                # Hierarchical reward based on profits
                if profits > 10:  # Example threshold for higher profits
                    immediate_reward += 3  # Example additional reward for higher profits
                elif profits > 5:  # Example threshold for moderate profits
                    immediate_reward += 2  # Example additional reward for moderate profits
                else:
                    immediate_reward += 2  # Example additional reward for small profits
            else:
                immediate_reward += 0  # Do not take the profit

    # Subtract MACD value from reward
    reward = immediate_reward - features['macd']

    # Update balance
    self.balance = self.cash_balance
    for _ in self.positions:
        self.balance += closing_price * self.position_increment

    # Print current state
    print("Time = {}: #Positions = {}: Balance = {}: Closing Price = {}".format(
        self.t, len(self.positions), self.balance, closing_price))

    self.t += 1

    if self.t == len(self.features.price_data) - 1:
        self._episode_ended = True

    self._state = [self.balance] + self.features.features.iloc[self.t].values.tolist()
    return ts.transition(
        np.array(self._state, dtype=np.float32), reward=reward, discount=0.7)

def action_spec(self):
    return self._action_spec

def _reset(self):
    self.t = 0
    self._episode_ended = False
    self.profits = 0
    self.balance = self.initial_balance
    self.cash_balance = self.initial_balance
    self.positions = []
    self._state = [self.balance] + self.features.features.iloc[0].values.tolist()

    # Reinvest profits if there are any
    if self.profits > 0:
        num_new_positions = int(self.profits / self.features.price_data.iloc[self.t]['Close'] / self.position_increment)
        for _ in range(num_new_positions):
            self.positions.append(self.features.price_data.iloc[self.t]['Close'])
        # Update balance after reinvestment
        self.balance -= self.profits

    return ts.restart(np.array(self._state, dtype=np.float32))

def buy_and_hold(self):
    amount = self.initial_balance / self.price_data.iloc[0, :]['Close']
    return self.price_data * amount

class LiveBinanceEnvironment(py_environment.PyEnvironment): """ Environment to trade on Binance.

Does not include features class to organize time series data.
"""

def __init__(self, asset1, asset2, position_increment, fees, price_history_t, mean_history_t, macd_t, fast_ema, slow_ema):
    super().__init__()  # Call superclass's __init__ method
    # Initialize instance variables
    self.asset1 = asset1
    self.asset2 = asset2
    self.assetpair = asset1 + asset2
    self.position_increment = position_increment
    self.fees = fees
    self.fast_ema = fast_ema
    self.slow_ema = slow_ema
    self.price_history_t = price_history_t
    self.mean_history_t = mean_history_t
    self.macd_t = macd_t
    self.trades = []
    self.orders = []  # Initialize orders list

    # Remaining initialization logic...
    api_key = os.getenv("CLIENT_KEY")
    api_secret = os.getenv("SECRET_KEY")
    self.client = Client(api_key, api_secret)
    self._columns = [
        'Open time',
        'Open',
        'High',
        'Low',
        'Close',
        'Volume',
        'Close time',
        'Quote asset volume',
        'Number of trades',
        'Taker buy base asset volume',
        'Taker buy quote asset volume',
        'ignore'
    ]

    prices = self.client.get_historical_klines(
        self.assetpair, self.client.KLINE_INTERVAL_1MINUTE, "5 DAY ago UTC")

    prices = pd.DataFrame(prices, columns=self._columns).astype(float)

    prices['Open time'] = prices['Open time'].apply(
        lambda x: dt.datetime.fromtimestamp(int(x)/1000))

    self.price_data = prices.set_index('Open time')

    self.initial_balance = self.client.get_asset_balance(asset='USDT')['free']
    self.balance = self.initial_balance
    self.free_balance = self.initial_balance  # Initialize free balance attribute

    self.return_history = [self.price_data.iloc[-k, :]['Close'] -
                           self.price_data.iloc[-k-1, :]['Close'] for k in reversed(range(self.price_history_t))]

    self.mean_data = self.price_data.rolling(20, min_periods=1).mean()
    self.MACD_trend = ta.trend.ema_indicator(
        self.price_data['Close'], self.fast_ema) - ta.trend.ema_indicator(self.price_data['Close'], self.slow_ema)
    self.MACD_trend = self.MACD_trend.fillna(
        self.MACD_trend.iloc[self.slow_ema]).tolist()

    self.MACD = [self.MACD_trend[-k] for k in reversed(range(self.macd_t))]
    self._action_spec = array_spec.BoundedArraySpec(
        shape=(), dtype=np.int32, minimum=0, maximum=2, name='action')
    self._observation_spec = array_spec.BoundedArraySpec(
        shape=(price_history_t+macd_t+1,), dtype=np.float32, name="observation")

def calculate_profits(self, current_price):
    """
    Calculate profits based on the current price and open positions.

    :param current_price: The current price of the asset.
    :return: Total profits from open positions.
    """
    total_profits = 0
    # Iterate over each trade in the trades list
    for trade in self.trades:
        # Calculate profit for each trade
        profit = (current_price - trade[0]) * trade[1] * (1 - self.fees)
        # Add profit to total profits
        total_profits += profit
    return total_profits

def observation_spec(self):
    return self._observation_spec

def _step(self, action):
    cost_basis = 0
    reward = 0

    if action == 0:
        print("Agent chose to hold.")

    elif action == 1:
        print("Agent chose to buy.")
        avg_price_info = self.client.get_avg_price(symbol=self.assetpair)
        average_price = float(avg_price_info['price'])
        free_balance = float(
            self.client.get_asset_balance(asset='USDT')['free'])

        p = average_price * self.position_increment
        if p > free_balance:
            reward = -1
        else:
            try:
                order = self.client.order_market_buy(
                    symbol=self.assetpair, quantity=self.position_increment)
                print("Bought {} of {}".format(
                    self.position_increment, self.asset1))

                # Store the buy order details
                self.trades.append(
                    (order['fills'][0]['price'], self.position_increment))

                # Update free balance after buying BTC
                self.free_balance = free_balance - p

                reward += 0.5 * (self.MACD[-1])
            except Exception as e:
                print("Buy failed:", e)

    elif action == 2:
        print("Agent chose to sell.")
        if len(self.trades) == 0:
            print("Not enough {} to sell. Waiting to buy more.".format(self.asset1))
        else:
            try:
                # Calculate the total quantity of BTC bought
                total_quantity_bought = sum(
                    [trade[1] for trade in self.trades])
                print("Total quantity bought:", total_quantity_bought)

                order = self.client.order_market_sell(
                    symbol=self.assetpair, quantity=total_quantity_bought * self.position_increment)
                print("Sold {} of {}".format(total_quantity_bought *
                      self.position_increment, self.asset1))

                cost_basis = 0
                for trade in self.trades:
                    # Calculate the total cost basis
                    cost_basis += trade[0] * trade[1] * (1 - self.fees)

                sell_value = float(
                    order['fills'][0]['price']) * total_quantity_bought * (1 - self.fees)

                reward = sell_value - cost_basis  # Calculate the reward

                # Update free balance after selling BTC
                self.free_balance += sell_value
                self.balance = self.client.get_asset_balance(asset='USDT')[
                    'free']

                # Reset trades
                self.trades = []

                # Reinvest gains
                if reward > 0:
                    p = float(self.client.get_avg_price(symbol=self.assetpair)[
                        'price']) * self.position_increment
                    num_new_positions = int(reward / p)
                    for _ in range(num_new_positions):
                        try:
                            order = self.client.order_market_buy(
                                symbol=self.assetpair, quantity=self.position_increment)
                            print("Reinvested gains: Bought {} of {}".format(
                                self.position_increment, self.asset1))
                            self.trades.append(
                                (order['fills'][0]['price'], self.position_increment))
                            self.free_balance -= p
                        except Exception as e:
                            print("Reinvestment failed:", e)
            except Exception as e:
                print("Sell failed:", e)

    # Update balance for all actions (buying, selling, or holding)
    self.balance = self.client.get_asset_balance(asset='USDT')['free']
    self.free_balance = self.balance

    # Remaining step logic...
    cur_price = float(self.client.get_avg_price(
        symbol=self.assetpair)['price'])

    # Remaining step logic...
    self.return_history.pop(0)
    self.return_history.append(
        cur_price - self.price_data.iloc[-1]['Close'])  # Append to the series

    # Concatenate the new data
    self.price_data.loc[pd.Timestamp.now()] = {'Close': cur_price}

    self.MACD_trend = ta.trend.ema_indicator(
        self.price_data['Close'], self.fast_ema)

    self.MACD_trend = self.MACD_trend.fillna(
        self.MACD_trend.iloc[self.slow_ema]).tolist()

    self.MACD.pop(0)
    self.MACD.append(self.MACD_trend[-1])
    self._state = [self.balance, self.free_balance] + \
        self.return_history + self.MACD
    print("State components:")
    print("Balance:", self.balance)
    print("Free Balance:", self.free_balance)
    print("Action taken:", action)
    # Wait for 1 second before proceeding to the next step
    time.sleep(1)

    return ts.transition(
        np.array(self._state, dtype=np.float32), reward=reward, discount=0.7)

def action_spec(self):
    return self._action_spec

def _reset(self):
    self._state = [self.balance] + self.return_history + self.MACD
    return ts.restart(np.array(self._state, dtype=np.float32))

def buy_and_hold(self):
    amount = self.initial_balance / self.price_data.iloc[0, :]['Close']
    return self.price_data * amount

vpetrea71 avatar Mar 19 '24 03:03 vpetrea71