TypeError: AsyncClient.__init__() got an unexpected keyword argument 'proxy'
TWITTER_EMAIL = str(os.getenv('TWITTER_EMAIL')) TWITTER_USERNAME = str(os.getenv('TWITTER_USERNAME')) TWITTER_PASSWORD = str(os.getenv('TWITTER_PASSWORD')) WECHAT_WEBHOOK = str(os.getenv('WECHAT_WEBHOOK')) MUSK_USER_ID = '44196397' # 马斯克的Twitter用户ID
初始化Twitter客户端
client = Client('en-US') client.language = 'en-US'
async def initialize(): await client.login( auth_info_1=TWITTER_EMAIL, auth_info_2=TWITTER_USERNAME, password=TWITTER_PASSWORD ) return await client.get_user_by_id(MUSK_USER_ID)
获取马斯克用户对象
async def get_musk(): return await initialize()
musk = asyncio.run(get_musk())
格式化时间
def format_time(timestamp): return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
发送到企业微信
def send_to_wechat(message): data = { "msgtype": "markdown", "markdown": { "content": message } } requests.post(WECHAT_WEBHOOK, json=data)
监控函数
async def monitor_musk(): last_tweet = (await musk.get_tweets('Tweets'))[0] last_like = (await musk.get_tweets('Likes'))[0] # 使用get_tweets获取点赞 last_avatar = musk.profile_image_url last_bio = musk.description
while True:
try:
# 检查新推文
new_tweet = (await musk.get_tweets('Tweets'))[0]
if new_tweet.id != last_tweet.id:
message = f"**New Tweet**\n" \
f"**时间/Time:** {format_time(new_tweet.created_at)}\n" \
f"**内容/Content:**\n{new_tweet.text}\n"
send_to_wechat(message)
last_tweet = new_tweet
# 检查新点赞
new_like = (await musk.get_tweets('Likes'))[0] # 使用get_tweets获取点赞
if new_like.id != last_like.id:
message = f"**New Like**\n" \
f"**时间/Time:** {format_time(new_like.created_at)}\n" \
f"**内容/Content:**\n{new_like.text}\n"
send_to_wechat(message)
last_like = new_like
# 检查头像变化
if musk.profile_image_url != last_avatar:
message = f"**Avatar Changed**\n" \
f"**时间/Time:** {format_time(time.time())}\n" \
f"**新头像/New Avatar:** {musk.profile_image_url}\n"
send_to_wechat(message)
last_avatar = musk.profile_image_url
# 检查简介变化
if musk.description != last_bio:
message = f"**Bio Changed**\n" \
f"**时间/Time:** {format_time(time.time())}\n" \
f"**新简介/New Bio:** {musk.description}\n"
send_to_wechat(message)
last_bio = musk.description
await asyncio.sleep(60) # 每分钟检查一次
except Exception as e:
print(f"Error: {e}")
await asyncio.sleep(300) # 出错时等待5分钟重试
if name == 'main': asyncio.run(monitor_musk())
报下面这些错
PS E:\projects\mask3> python mask_twitter_monitor/monitor.py
Traceback (most recent call last):
File "E:\projects\mask3\mask_twitter_monitor\monitor.py", line 16, in