StreaMonitor icon indicating copy to clipboard operation
StreaMonitor copied to clipboard

Getting notification when they start private show

Open regzzzz opened this issue 1 month ago • 1 comments

I have a fan club membership for some models. Is it possible to receive notifications when they start private shows? Because recording is not possible while they are on a private show.

regzzzz avatar Nov 17 '25 16:11 regzzzz

Hello, have you ever run into the two issues I mentioned? How did you solve them? I'm still getting these problems very frequently right now — even after I close the CMD window and the monitoring page, they keep coming back. Sometimes they even happen after I restart the computer. It's really troubling me.

Image

NBAMVP avatar Nov 18 '25 05:11 NBAMVP

Yes, it is possible to develop such functionality. In fact I have forked StreaMonitor and developed it for myself. I get notifications to my phone through Pusher

LituanusVulgaris avatar Nov 18 '25 17:11 LituanusVulgaris

Yes, it is possible to develop such functionality. In fact I have forked StreaMonitor and developed it for myself. I get notifications to my phone through Pusher

It would be great if you forked this :')

Hello, have you ever run into the two issues I mentioned? How did you solve them? I'm still getting these problems very frequently right now — even after I close the CMD window and the monitoring page, they keep coming back. Sometimes they even happen after I restart the computer. It's really troubling me.

A stripchat.py was posted about a month ago, and I am still using it. You can find it by scrolling back a bit in the posts.

regzzzz avatar Nov 20 '25 10:11 regzzzz

是的,可以开发这样的功能。事实上,我已经 fork 了 StreaMonitor 并为自己开发了它。我通过 Pusher 接收手机通知。

如果你能fork一下这个项目就太好了 :')

你好,你遇到过我提到的那两个问题吗?你是怎么解决的?我现在还是经常遇到这些问题——即使我关闭了命令提示符窗口和监控页面,它们还是会反复出现。有时候重启电脑后问题依然存在。这真的让我很困扰。

大约一个月前我发布了 stripchat.py 文件,现在还在用。你可以在之前的帖子里往前翻翻就能找到它。

Could you please look it up again when you have time? I really couldn't find anything

NBAMVP avatar Nov 20 '25 15:11 NBAMVP

can you try this? open stripchat.py and edit.

import itertools import random import re import time import requests import base64 import hashlib

from streamonitor.bot import Bot from streamonitor.downloaders.hls import getVideoNativeHLS from streamonitor.enums import Status

class StripChat(Bot): site = 'StripChat' siteslug = 'SC'

_static_data = None
_main_js_data = None
_doppio_js_data = None
_mouflon_keys: dict = None
_cached_keys: dict[str, bytes] = None

def __init__(self, username):
    if StripChat._static_data is None:
        StripChat._static_data = {}
        try:
            self.getInitialData()
        except Exception as e:
            StripChat._static_data = None
            raise e
    while StripChat._static_data == {}:
        time.sleep(1)
    super().__init__(username)
    self.vr = False
    self.getVideo = lambda _, url, filename: getVideoNativeHLS(self, url, filename, None)

@classmethod
def getInitialData(cls):
    r = requests.get('https://hu.stripchat.com/api/front/v3/config/static', headers=cls.headers)
    if r.status_code != 200:
        raise Exception("Failed to fetch static data from StripChat")
    StripChat._static_data = r.json().get('static')

    mmp_origin = StripChat._static_data['features']['MMPExternalSourceOrigin']
    mmp_version = StripChat._static_data['featuresV2']['playerModuleExternalLoading']['mmpVersion']
    mmp_base = f"{mmp_origin}/v{mmp_version}"

    r = requests.get(f"{mmp_base}/main.js", headers=cls.headers)
    if r.status_code != 200:
        raise Exception("Failed to fetch main.js from StripChat")
    StripChat._main_js_data = r.content.decode('utf-8')

    native_js_name = re.findall('require[(]"./(Native.*?[.]js)"[)]', StripChat._main_js_data)[0]
    r = requests.get(f"{mmp_base}/{native_js_name}", headers=cls.headers)
    if r.status_code != 200:
        raise Exception("Failed to fetch native.js")
    cls._native_js_data = r.content.decode('utf-8')

    # 5. 从 Native.js 提取 pkey
    cls._pkey = re.findall('pkey ?: ?"(.*?)"', cls._native_js_data)[0]




    doppio_js_name = re.findall('require[(]"./(Doppio.*?[.]js)"[)]', StripChat._main_js_data)[0]

    r = requests.get(f"{mmp_base}/{doppio_js_name}", headers=cls.headers)
    if r.status_code != 200:
        raise Exception("Failed to fetch doppio.js from StripChat")
    StripChat._doppio_js_data = r.content.decode('utf-8')

@classmethod
def m3u_decoder(cls, content):
    _mouflon_file_attr = "#EXT-X-MOUFLON:FILE:"
    _mouflon_filename = 'media.mp4'

    def _decode(encrypted_b64: str, key: str) -> str:
        if cls._cached_keys is None:
            cls._cached_keys = {}
        hash_bytes = cls._cached_keys[key] if key in cls._cached_keys \
            else cls._cached_keys.setdefault(key, hashlib.sha256(key.encode("utf-8")).digest())
        encrypted_data = base64.b64decode(encrypted_b64 + "==")
        return bytes(a ^ b for (a, b) in zip(encrypted_data, itertools.cycle(hash_bytes))).decode("utf-8")

    _, pkey = StripChat._getMouflonFromM3U(content)

    decoded = ''
    lines = content.splitlines()
    last_decoded_file = None
    for line in lines:
        if line.startswith(_mouflon_file_attr):
            last_decoded_file = _decode(line[len(_mouflon_file_attr):], cls.getMouflonDecKey(pkey))
        elif line.endswith(_mouflon_filename) and last_decoded_file:
            decoded += (line.replace(_mouflon_filename, last_decoded_file)) + '\n'
            last_decoded_file = None
        else:
            decoded += line + '\n'
    return decoded

@classmethod
def getMouflonDecKey(cls, pkey):
    if cls._mouflon_keys is None:
        cls._mouflon_keys = {}
    return cls._mouflon_keys[pkey] if pkey in cls._mouflon_keys \
        else cls._mouflon_keys.setdefault(pkey, re.findall(f'"{pkey}:(.*?)"', cls._doppio_js_data)[0])

@staticmethod
def _getMouflonFromM3U(m3u8_doc):
    if '#EXT-X-MOUFLON:' in m3u8_doc:
        _mouflon_start = m3u8_doc.find('#EXT-X-MOUFLON:')
        if _mouflon_start > 0:
            _mouflon = m3u8_doc[_mouflon_start:m3u8_doc.find('\n', _mouflon_start)].strip().split(':')
            psch = _mouflon[2]
            pkey = _mouflon[3]
            return psch, pkey
    return None, None

def getWebsiteURL(self):
    return "https://stripchat.com/" + self.username

def getVideoUrl(self):
    return self.getWantedResolutionPlaylist(None)


def getPlaylistVariants(self, url):
    url = "https://edge-hls.{host}/hls/{id}{vr}/master/{id}{vr}{auto}.m3u8?psch=v1&pkey={pkey}&playlistType=standard".format(
            host='doppiocdn.' + random.choice(['org', 'com', 'net']),
            id=self.lastInfo["streamName"],
            vr='_vr' if self.vr else '',
            auto='_auto' if not self.vr else '',
            pkey=self._pkey,
     )
    result = requests.get(url, headers=self.headers, cookies=self.cookies)
    m3u8_doc = result.content.decode("utf-8")
    psch, pkey = StripChat._getMouflonFromM3U(m3u8_doc)
    variants = super().getPlaylistVariants(m3u_data=m3u8_doc)
    return [variant | {'url': f'{variant["url"]}{"&" if "?" in variant["url"] else "?"}psch={psch}'}
            for variant in variants]

@staticmethod
def uniq(length=16):
    chars = ''.join(chr(i) for i in range(ord('a'), ord('z')+1))
    chars += ''.join(chr(i) for i in range(ord('0'), ord('9')+1))
    return ''.join(random.choice(chars) for _ in range(length))

def getStatus(self):
    r = requests.get(
        f'https://stripchat.com/api/front/v2/models/username/{self.username}/cam?uniq={StripChat.uniq()}',
        headers=self.headers
    )

    try:
        data = r.json()
    except requests.exceptions.JSONDecodeError:
        self.log('Failed to parse JSON response')
        return Status.UNKNOWN

    if 'cam' not in data:
        if 'error' in data:
            error = data['error']
            if error == 'Not Found':
                return Status.NOTEXIST
            self.logger.warn(f'Status returned error: {error}')
        return Status.UNKNOWN

    self.lastInfo = {'model': data['user']['user']}
    if isinstance(data['cam'], dict):
        self.lastInfo |= data['cam']

    status = self.lastInfo['model'].get('status')
    if status == "public" and self.lastInfo["isCamAvailable"] and self.lastInfo["isCamActive"]:
        return Status.PUBLIC
    if status in ["private", "groupShow", "p2p", "virtualPrivate", "p2pVoice"]:
        return Status.PRIVATE
    if status in ["off", "idle"]:
        return Status.OFFLINE
    if self.lastInfo['model'].get('isDeleted') is True:
        return Status.NOTEXIST
    if data['user'].get('isGeoBanned') is True:
        return Status.RESTRICTED
    self.logger.warn(f'Got unknown status: {status}')
    return Status.UNKNOWN

regzzzz avatar Nov 21 '25 16:11 regzzzz

can you try this? open stripchat.py and edit.

import itertools import random import re import time import requests import base64 import hashlib

from streamonitor.bot import Bot from streamonitor.downloaders.hls import getVideoNativeHLS from streamonitor.enums import Status

class StripChat(Bot): site = 'StripChat' siteslug = 'SC'

_static_data = None
_main_js_data = None
_doppio_js_data = None
_mouflon_keys: dict = None
_cached_keys: dict[str, bytes] = None

def __init__(self, username):
    if StripChat._static_data is None:
        StripChat._static_data = {}
        try:
            self.getInitialData()
        except Exception as e:
            StripChat._static_data = None
            raise e
    while StripChat._static_data == {}:
        time.sleep(1)
    super().__init__(username)
    self.vr = False
    self.getVideo = lambda _, url, filename: getVideoNativeHLS(self, url, filename, None)

@classmethod
def getInitialData(cls):
    r = requests.get('https://hu.stripchat.com/api/front/v3/config/static', headers=cls.headers)
    if r.status_code != 200:
        raise Exception("Failed to fetch static data from StripChat")
    StripChat._static_data = r.json().get('static')

    mmp_origin = StripChat._static_data['features']['MMPExternalSourceOrigin']
    mmp_version = StripChat._static_data['featuresV2']['playerModuleExternalLoading']['mmpVersion']
    mmp_base = f"{mmp_origin}/v{mmp_version}"

    r = requests.get(f"{mmp_base}/main.js", headers=cls.headers)
    if r.status_code != 200:
        raise Exception("Failed to fetch main.js from StripChat")
    StripChat._main_js_data = r.content.decode('utf-8')

    native_js_name = re.findall('require[(]"./(Native.*?[.]js)"[)]', StripChat._main_js_data)[0]
    r = requests.get(f"{mmp_base}/{native_js_name}", headers=cls.headers)
    if r.status_code != 200:
        raise Exception("Failed to fetch native.js")
    cls._native_js_data = r.content.decode('utf-8')

    # 5. 从 Native.js 提取 pkey
    cls._pkey = re.findall('pkey ?: ?"(.*?)"', cls._native_js_data)[0]




    doppio_js_name = re.findall('require[(]"./(Doppio.*?[.]js)"[)]', StripChat._main_js_data)[0]

    r = requests.get(f"{mmp_base}/{doppio_js_name}", headers=cls.headers)
    if r.status_code != 200:
        raise Exception("Failed to fetch doppio.js from StripChat")
    StripChat._doppio_js_data = r.content.decode('utf-8')

@classmethod
def m3u_decoder(cls, content):
    _mouflon_file_attr = "#EXT-X-MOUFLON:FILE:"
    _mouflon_filename = 'media.mp4'

    def _decode(encrypted_b64: str, key: str) -> str:
        if cls._cached_keys is None:
            cls._cached_keys = {}
        hash_bytes = cls._cached_keys[key] if key in cls._cached_keys \
            else cls._cached_keys.setdefault(key, hashlib.sha256(key.encode("utf-8")).digest())
        encrypted_data = base64.b64decode(encrypted_b64 + "==")
        return bytes(a ^ b for (a, b) in zip(encrypted_data, itertools.cycle(hash_bytes))).decode("utf-8")

    _, pkey = StripChat._getMouflonFromM3U(content)

    decoded = ''
    lines = content.splitlines()
    last_decoded_file = None
    for line in lines:
        if line.startswith(_mouflon_file_attr):
            last_decoded_file = _decode(line[len(_mouflon_file_attr):], cls.getMouflonDecKey(pkey))
        elif line.endswith(_mouflon_filename) and last_decoded_file:
            decoded += (line.replace(_mouflon_filename, last_decoded_file)) + '\n'
            last_decoded_file = None
        else:
            decoded += line + '\n'
    return decoded

@classmethod
def getMouflonDecKey(cls, pkey):
    if cls._mouflon_keys is None:
        cls._mouflon_keys = {}
    return cls._mouflon_keys[pkey] if pkey in cls._mouflon_keys \
        else cls._mouflon_keys.setdefault(pkey, re.findall(f'"{pkey}:(.*?)"', cls._doppio_js_data)[0])

@staticmethod
def _getMouflonFromM3U(m3u8_doc):
    if '#EXT-X-MOUFLON:' in m3u8_doc:
        _mouflon_start = m3u8_doc.find('#EXT-X-MOUFLON:')
        if _mouflon_start > 0:
            _mouflon = m3u8_doc[_mouflon_start:m3u8_doc.find('\n', _mouflon_start)].strip().split(':')
            psch = _mouflon[2]
            pkey = _mouflon[3]
            return psch, pkey
    return None, None

def getWebsiteURL(self):
    return "https://stripchat.com/" + self.username

def getVideoUrl(self):
    return self.getWantedResolutionPlaylist(None)


def getPlaylistVariants(self, url):
    url = "https://edge-hls.{host}/hls/{id}{vr}/master/{id}{vr}{auto}.m3u8?psch=v1&pkey={pkey}&playlistType=standard".format(
            host='doppiocdn.' + random.choice(['org', 'com', 'net']),
            id=self.lastInfo["streamName"],
            vr='_vr' if self.vr else '',
            auto='_auto' if not self.vr else '',
            pkey=self._pkey,
     )
    result = requests.get(url, headers=self.headers, cookies=self.cookies)
    m3u8_doc = result.content.decode("utf-8")
    psch, pkey = StripChat._getMouflonFromM3U(m3u8_doc)
    variants = super().getPlaylistVariants(m3u_data=m3u8_doc)
    return [variant | {'url': f'{variant["url"]}{"&" if "?" in variant["url"] else "?"}psch={psch}'}
            for variant in variants]

@staticmethod
def uniq(length=16):
    chars = ''.join(chr(i) for i in range(ord('a'), ord('z')+1))
    chars += ''.join(chr(i) for i in range(ord('0'), ord('9')+1))
    return ''.join(random.choice(chars) for _ in range(length))

def getStatus(self):
    r = requests.get(
        f'https://stripchat.com/api/front/v2/models/username/{self.username}/cam?uniq={StripChat.uniq()}',
        headers=self.headers
    )

    try:
        data = r.json()
    except requests.exceptions.JSONDecodeError:
        self.log('Failed to parse JSON response')
        return Status.UNKNOWN

    if 'cam' not in data:
        if 'error' in data:
            error = data['error']
            if error == 'Not Found':
                return Status.NOTEXIST
            self.logger.warn(f'Status returned error: {error}')
        return Status.UNKNOWN

    self.lastInfo = {'model': data['user']['user']}
    if isinstance(data['cam'], dict):
        self.lastInfo |= data['cam']

    status = self.lastInfo['model'].get('status')
    if status == "public" and self.lastInfo["isCamAvailable"] and self.lastInfo["isCamActive"]:
        return Status.PUBLIC
    if status in ["private", "groupShow", "p2p", "virtualPrivate", "p2pVoice"]:
        return Status.PRIVATE
    if status in ["off", "idle"]:
        return Status.OFFLINE
    if self.lastInfo['model'].get('isDeleted') is True:
        return Status.NOTEXIST
    if data['user'].get('isGeoBanned') is True:
        return Status.RESTRICTED
    self.logger.warn(f'Got unknown status: {status}')
    return Status.UNKNOWN

DEBUG - [SC] kimbeam: Stripchat status request failed or invalid JSON

NBAMVP avatar Nov 21 '25 18:11 NBAMVP

It looks like you're still using an old version. As far as I know, you can no longer save in m3u8 format. To resolve this, remove all files, including Python and temporary files, and do a fresh install from GitHub Desktop. There's also a fork of Astro that might solve your downloading issue.

regzzzz avatar Nov 23 '25 11:11 regzzzz

It looks like you're still using an old version. As far as I know, you can no longer save in m3u8 format. To resolve this, remove all files, including Python and temporary files, and do a fresh install from GitHub Desktop. There's also a fork of Astro that might solve your downloading issue.

I think I’m already using the latest version. I downloaded it by going to https://github.com/lossless1024/StreaMonitor, clicking “Code” → “Download ZIP”. That should be the newest code, right? I also noticed the author’s last update was about three weeks ago. One more question: what exactly is this “Astro” you mentioned? Is it the name of the developer, or the name of the fork/software? Could you please tell me the specific project name or link? Thank you!

NBAMVP avatar Nov 23 '25 13:11 NBAMVP