RL_trader
RL_trader copied to clipboard
class TradingEnvironment en the live is making yuuge gains in learning
import tensorflow as tf import numpy as np import ta import os import pandas as pd from binance.client import Client import datetime as dt from tf_agents.environments import py_environment from tf_agents.specs import array_spec from tf_agents.trajectories import time_step as ts import time
tf.compat.v1.enable_v2_behavior()
class TradingEnvironment(py_environment.PyEnvironment): """ Environment to train agent
Requires:
initial_balance - starting balance
features -
Goal:
Include relevant features to the state to help the agent make the best actions
Reward the agent properly for each action through the _step function to optimize P/L
"""
def __init__(self, initial_balance, features, position_increment=0.0005, fees=0.0000005):
self.t = 0
self.position_increment = position_increment
self.fees = fees
self.positions = []
self.features = features
self.initial_balance = self.balance = self.cash_balance = initial_balance
# Additional features for state representation
self.moving_average_window = 5
# Define action and observation specs
self._action_spec = array_spec.BoundedArraySpec(
shape=(), dtype=np.int32, minimum=0, maximum=2, name='action')
observation_shape = len(self.features.features.columns) + 1 # Add 1 for balance
self._observation_spec = array_spec.BoundedArraySpec(
shape=(observation_shape,), dtype=np.float32, name="observation")
def observation_spec(self):
return self._observation_spec
def _step(self, action):
if self._episode_ended:
return self.reset()
closing_price = self.features.price_data.iloc[self.t]['Close']
features = self.features.features.iloc[self.t]
# Immediate rewards
immediate_reward = 0
if action == 0: # Hold
print("\033[94mAgent chose to hold.\033[0m")
ma5 = self.features.price_data.iloc[self.t-4:self.t+1]['Close'].mean()
if closing_price > ma5:
# Positive reward for holding in an uptrend
immediate_reward += 2 # Placeholder value for positive reward in uptrend
elif closing_price < ma5:
# Positive reward for waiting in a downtrend
immediate_reward += 1 # Placeholder value for positive reward in downtrend
elif action == 1: # Buy
print("\033[92mAgent chose to buy.\033[0m")
if len(self.positions) == 0:
immediate_reward = 0
else:
profits = 0
p = closing_price * self.position_increment * (1 + self.fees)
price_change = closing_price - self.features.price_data.iloc[self.t - 1]['Close']
if price_change < 0:
immediate_reward = 1 # Positive reward for buying during a price decrease
else:
immediate_reward = 1 # Positive reward for buying during a price increase
if p > self.cash_balance:
immediate_reward = -1
else:
# Calculate transaction cost
transaction_cost = p * self.fees
self.cash_balance -= p + transaction_cost # Deduct transaction cost
self.positions.append(closing_price)
immediate_reward += features['macd']
elif action == 2: # Sell
print("\033[91mAgent chose to sell.\033[0m")
if len(self.positions) == 0:
immediate_reward = 0
else:
profits = 0
for p in self.positions:
profits += (closing_price - p) * self.position_increment * (1 - self.fees)
self.cash_balance += closing_price * self.position_increment * (1 - self.fees)
if profits > 0:
# Calculate transaction cost
transaction_cost = profits * self.fees
immediate_reward += profits - transaction_cost - features['macd']
# Reinvest profits into buying positions
self.cash_balance -= profits
num_new_positions = int(profits / closing_price / self.position_increment)
for _ in range(num_new_positions):
self.positions.append(closing_price)
# Hierarchical reward based on profits
if profits > 10: # Example threshold for higher profits
immediate_reward += 3 # Example additional reward for higher profits
elif profits > 5: # Example threshold for moderate profits
immediate_reward += 2 # Example additional reward for moderate profits
else:
immediate_reward += 2 # Example additional reward for small profits
else:
immediate_reward += 0 # Do not take the profit
# Subtract MACD value from reward
reward = immediate_reward - features['macd']
# Update balance
self.balance = self.cash_balance
for _ in self.positions:
self.balance += closing_price * self.position_increment
# Print current state
print("Time = {}: #Positions = {}: Balance = {}: Closing Price = {}".format(
self.t, len(self.positions), self.balance, closing_price))
self.t += 1
if self.t == len(self.features.price_data) - 1:
self._episode_ended = True
self._state = [self.balance] + self.features.features.iloc[self.t].values.tolist()
return ts.transition(
np.array(self._state, dtype=np.float32), reward=reward, discount=0.7)
def action_spec(self):
return self._action_spec
def _reset(self):
self.t = 0
self._episode_ended = False
self.profits = 0
self.balance = self.initial_balance
self.cash_balance = self.initial_balance
self.positions = []
self._state = [self.balance] + self.features.features.iloc[0].values.tolist()
# Reinvest profits if there are any
if self.profits > 0:
num_new_positions = int(self.profits / self.features.price_data.iloc[self.t]['Close'] / self.position_increment)
for _ in range(num_new_positions):
self.positions.append(self.features.price_data.iloc[self.t]['Close'])
# Update balance after reinvestment
self.balance -= self.profits
return ts.restart(np.array(self._state, dtype=np.float32))
def buy_and_hold(self):
amount = self.initial_balance / self.price_data.iloc[0, :]['Close']
return self.price_data * amount
class LiveBinanceEnvironment(py_environment.PyEnvironment): """ Environment to trade on Binance.
Does not include features class to organize time series data.
"""
def __init__(self, asset1, asset2, position_increment, fees, price_history_t, mean_history_t, macd_t, fast_ema, slow_ema):
super().__init__() # Call superclass's __init__ method
# Initialize instance variables
self.asset1 = asset1
self.asset2 = asset2
self.assetpair = asset1 + asset2
self.position_increment = position_increment
self.fees = fees
self.fast_ema = fast_ema
self.slow_ema = slow_ema
self.price_history_t = price_history_t
self.mean_history_t = mean_history_t
self.macd_t = macd_t
self.trades = []
self.orders = [] # Initialize orders list
# Remaining initialization logic...
api_key = os.getenv("CLIENT_KEY")
api_secret = os.getenv("SECRET_KEY")
self.client = Client(api_key, api_secret)
self._columns = [
'Open time',
'Open',
'High',
'Low',
'Close',
'Volume',
'Close time',
'Quote asset volume',
'Number of trades',
'Taker buy base asset volume',
'Taker buy quote asset volume',
'ignore'
]
prices = self.client.get_historical_klines(
self.assetpair, self.client.KLINE_INTERVAL_1MINUTE, "5 DAY ago UTC")
prices = pd.DataFrame(prices, columns=self._columns).astype(float)
prices['Open time'] = prices['Open time'].apply(
lambda x: dt.datetime.fromtimestamp(int(x)/1000))
self.price_data = prices.set_index('Open time')
self.initial_balance = self.client.get_asset_balance(asset='USDT')['free']
self.balance = self.initial_balance
self.free_balance = self.initial_balance # Initialize free balance attribute
self.return_history = [self.price_data.iloc[-k, :]['Close'] -
self.price_data.iloc[-k-1, :]['Close'] for k in reversed(range(self.price_history_t))]
self.mean_data = self.price_data.rolling(20, min_periods=1).mean()
self.MACD_trend = ta.trend.ema_indicator(
self.price_data['Close'], self.fast_ema) - ta.trend.ema_indicator(self.price_data['Close'], self.slow_ema)
self.MACD_trend = self.MACD_trend.fillna(
self.MACD_trend.iloc[self.slow_ema]).tolist()
self.MACD = [self.MACD_trend[-k] for k in reversed(range(self.macd_t))]
self._action_spec = array_spec.BoundedArraySpec(
shape=(), dtype=np.int32, minimum=0, maximum=2, name='action')
self._observation_spec = array_spec.BoundedArraySpec(
shape=(price_history_t+macd_t+1,), dtype=np.float32, name="observation")
def calculate_profits(self, current_price):
"""
Calculate profits based on the current price and open positions.
:param current_price: The current price of the asset.
:return: Total profits from open positions.
"""
total_profits = 0
# Iterate over each trade in the trades list
for trade in self.trades:
# Calculate profit for each trade
profit = (current_price - trade[0]) * trade[1] * (1 - self.fees)
# Add profit to total profits
total_profits += profit
return total_profits
def observation_spec(self):
return self._observation_spec
def _step(self, action):
cost_basis = 0
reward = 0
if action == 0:
print("Agent chose to hold.")
elif action == 1:
print("Agent chose to buy.")
avg_price_info = self.client.get_avg_price(symbol=self.assetpair)
average_price = float(avg_price_info['price'])
free_balance = float(
self.client.get_asset_balance(asset='USDT')['free'])
p = average_price * self.position_increment
if p > free_balance:
reward = -1
else:
try:
order = self.client.order_market_buy(
symbol=self.assetpair, quantity=self.position_increment)
print("Bought {} of {}".format(
self.position_increment, self.asset1))
# Store the buy order details
self.trades.append(
(order['fills'][0]['price'], self.position_increment))
# Update free balance after buying BTC
self.free_balance = free_balance - p
reward += 0.5 * (self.MACD[-1])
except Exception as e:
print("Buy failed:", e)
elif action == 2:
print("Agent chose to sell.")
if len(self.trades) == 0:
print("Not enough {} to sell. Waiting to buy more.".format(self.asset1))
else:
try:
# Calculate the total quantity of BTC bought
total_quantity_bought = sum(
[trade[1] for trade in self.trades])
print("Total quantity bought:", total_quantity_bought)
order = self.client.order_market_sell(
symbol=self.assetpair, quantity=total_quantity_bought * self.position_increment)
print("Sold {} of {}".format(total_quantity_bought *
self.position_increment, self.asset1))
cost_basis = 0
for trade in self.trades:
# Calculate the total cost basis
cost_basis += trade[0] * trade[1] * (1 - self.fees)
sell_value = float(
order['fills'][0]['price']) * total_quantity_bought * (1 - self.fees)
reward = sell_value - cost_basis # Calculate the reward
# Update free balance after selling BTC
self.free_balance += sell_value
self.balance = self.client.get_asset_balance(asset='USDT')[
'free']
# Reset trades
self.trades = []
# Reinvest gains
if reward > 0:
p = float(self.client.get_avg_price(symbol=self.assetpair)[
'price']) * self.position_increment
num_new_positions = int(reward / p)
for _ in range(num_new_positions):
try:
order = self.client.order_market_buy(
symbol=self.assetpair, quantity=self.position_increment)
print("Reinvested gains: Bought {} of {}".format(
self.position_increment, self.asset1))
self.trades.append(
(order['fills'][0]['price'], self.position_increment))
self.free_balance -= p
except Exception as e:
print("Reinvestment failed:", e)
except Exception as e:
print("Sell failed:", e)
# Update balance for all actions (buying, selling, or holding)
self.balance = self.client.get_asset_balance(asset='USDT')['free']
self.free_balance = self.balance
# Remaining step logic...
cur_price = float(self.client.get_avg_price(
symbol=self.assetpair)['price'])
# Remaining step logic...
self.return_history.pop(0)
self.return_history.append(
cur_price - self.price_data.iloc[-1]['Close']) # Append to the series
# Concatenate the new data
self.price_data.loc[pd.Timestamp.now()] = {'Close': cur_price}
self.MACD_trend = ta.trend.ema_indicator(
self.price_data['Close'], self.fast_ema)
self.MACD_trend = self.MACD_trend.fillna(
self.MACD_trend.iloc[self.slow_ema]).tolist()
self.MACD.pop(0)
self.MACD.append(self.MACD_trend[-1])
self._state = [self.balance, self.free_balance] + \
self.return_history + self.MACD
print("State components:")
print("Balance:", self.balance)
print("Free Balance:", self.free_balance)
print("Action taken:", action)
# Wait for 1 second before proceeding to the next step
time.sleep(1)
return ts.transition(
np.array(self._state, dtype=np.float32), reward=reward, discount=0.7)
def action_spec(self):
return self._action_spec
def _reset(self):
self._state = [self.balance] + self.return_history + self.MACD
return ts.restart(np.array(self._state, dtype=np.float32))
def buy_and_hold(self):
amount = self.initial_balance / self.price_data.iloc[0, :]['Close']
return self.price_data * amount