towhee icon indicating copy to clipboard operation
towhee copied to clipboard

towhee的输入可以是二进制数据么?

Open yfq512 opened this issue 1 year ago • 17 comments

Is there an existing issue for this?

  • [X] I have searched the existing issues.

Is your feature request related to a problem? Please describe.

通关查看相关例子,发现图像、视频提取特征的输入基本都是path,但我的实际场景很多输入都是图像、视频的url,但我又不想落盘下载到本地,请问towhee支持输入url或者下载的二进制数据么?

Describe the solution you'd like.

No response

Describe an alternate solution.

No response

Anything else? (Additional Context)

No response

yfq512 avatar Aug 08 '22 01:08 yfq512

此外,我看到的例子基本都是批量提取特征等操作,请问towhee有没有针对流式数据处理的场景呢?不然每来一条数据都需要加载一遍模型,浪费了很多时间

yfq512 avatar Aug 08 '22 01:08 yfq512

Hi, @yfq512. The data pipeline can be converted into a callback function. You can refer to the codes in the last section of our image search example: https://github.com/towhee-io/examples/blob/main/image/reverse_image_search/1_build_image_search_engine.ipynb

reiase avatar Aug 08 '22 08:08 reiase

@reiase 感谢回复,我找到了图片不落盘的加载方法,

path = '1.jpg'
img = PILImage.open(path)
feature = (DataCollection([from_pil(img)]).image_embedding.timm(model_name='resnet50').tensor_normalize().to_list()[0].tolist())

请问视频不落盘的方法有么?

yfq512 avatar Aug 10 '22 02:08 yfq512

@junjiejiangjjj Do we support decoding video streams from HTTP addresses?

reiase avatar Aug 11 '22 03:08 reiase

The op https://towhee.io/video-decode/ffmpeg can load video from HTTP addresses, and image decode also support HTTP addresses.

junjiejiangjjj avatar Aug 11 '22 07:08 junjiejiangjjj

搞定了,谢谢你们🌹

yfq512 avatar Aug 11 '22 08:08 yfq512

@junjiejiangjjj 感谢回复,还遇到个问题,如果输入是 HTTP addresses ,如何设置超时参数呀

yfq512 avatar Aug 11 '22 09:08 yfq512

个问题,如果输入是 HTTP addresses ,如何设置超时参数呀

The decoders don't support it, you can open a new issue to trace it.

junjiejiangjjj avatar Aug 11 '22 09:08 junjiejiangjjj

@junjiejiangjjj towhee支持多线程么,我使用python自带的线程池,多任务并行处理,速度几乎没有提升?请问有多线程么的示例么?

yfq512 avatar Aug 17 '22 01:08 yfq512

@Chiiizzzy

junjiejiangjjj avatar Aug 17 '22 02:08 junjiejiangjjj

@junjiejiangjjj towhee支持多线程么,我使用python自带的线程池,多任务并行处理,速度几乎没有提升?请问有多线程么的示例么?

We provide a function set_parallel to support multi-thread, here is an example:

import towhee

dc = (
    towhee.read_csv('reverse_image_search.csv')
        .set_parallel(5)
        .runas_op['id', 'id'](func=lambda x: int(x))
        .image_decode['path', 'img']()
        .image_embedding.timm['img', 'vec'](model_name='resnet50')
        .to_list()
)

Chiiizzzy avatar Aug 17 '22 02:08 Chiiizzzy

@Chiiizzzy 感谢回复,我实际的场景是“生产者消费者”模式,每次消费kafka数据,只针对一张图片进行处理,但短时间内会有许多张图需要处理,感觉上述的例子就不太适合我这个场景(这个例子更像是已知一批数据,再去处理),实际上我只是用x3d模型提取图像、视频的特征。(其实我更倾向直接基于pytorch进行推理,得到视频、图像的特征,但没找到相关的示例😂) 请问有没有针对这种数据流式的场景的示例呢?

yfq512 avatar Aug 17 '22 02:08 yfq512

hello @yfq512, I suggest that you can use a generator as dc input, for example:


def read_ kafka():
    """ read images from kafka
    """
    while True:
        img = kafka.read()
        yield img

def output_function(e):
    print(e)

(
    towhee.dc['img'](read_ kafka())
        .set_parallel(5)
        .runas_op['id', 'id'](func=lambda x: int(x))
        .image_decode['path', 'img']()
        .image_embedding.timm['img', 'vec'](model_name='resnet50')
        .runas_op(output_function)
)

reiase avatar Aug 17 '22 03:08 reiase

@reiase 我这样做的:

from core url2img_pil
from towhee.utils.pil_utils import from_pil

def img2feature(url):  # shape = 2048
    try:
        img = from_pil(url2img_pil(url))
        feature = DataCollection([DataCollection([[img]]).action_classification.pytorchvideo(model_name='x3d_m')[0][2]]).tensor_normalize().to_list()[0].tolist()
        return feature
    except Exception as e:
        print(e)
        return None

def read_ kafka():
    """ read images from kafka
    """
    while True:
        url = kafka.read()
        feature = img2feature(url)  # 多线程???
        # 写入milvus
        # 写入sql……

这样用towhee是不是效率很低呢,开启python自带的线程池速度甚者还不如单线程😄

yfq512 avatar Aug 17 '22 03:08 yfq512

hi @yfq512, towhee can parallize the code and handle the exceptions for you. All you need to do is re-orginizing the code as a data processing pipeline:

from core url2img_pil
from towhee.utils.pil_utils import from_pil

def read_ kafka():
    """ read images from kafka
    """
    while True:
        url = kafka.read()
        yield url

collection = create_some_milvus_collection()

(
  towhee.dc['url'](read_ kafka()) # read data from kafka as a stream
        .set_parallel(5)          # parallel execution, towhee will open a thread pool of size 5 to execute the pipeline
        .exception_safe()       # make the execution exception safe, which will drop unexpected errors
        .runas_op['url', 'img'](lambda url: [from_pil(url2img_pil(url)),])      # load image as frame sequence
        .action_classification.pytorchvideo['img', 'feature'](model_name='x3d_m')      # run the model
        .runas_op['feature', 'feature'](lambda x: x[2])          # extract feature from model output
        .tensor_normalize()                      # tensor ormalization
        .drop_empty()                             # drop unexpected errors
        .ann_insert.milvus[('url', 'feature'), 'mr'](collection=collection)    # insert the result into milvus
        .runas_op(write_sql)          # write result into SQL database
        .run()            # consume data from the kafka data stream and execute the pipeline
)

reiase avatar Aug 17 '22 05:08 reiase

In my opinion, if your pipeline process is relatively simple, the usage scenarios are based on traffic, not batch processing.

Use Towhee v0.8.0 to generate services docker image with one-click, and then perform horizontal elastic expansion through Docker.

With the internal application gateway, it is easier to improve performance while maintaining flexibility. @yfq512

// In addition to discussing issues, you can use Chinese to communicate in WeChat groups; if you are used to using English, Slack will be a better place to go.

soulteary avatar Aug 19 '22 03:08 soulteary

@reiase 使用 set_parallel(5) 设置多线程时,程序有时会不工作,删除 set_parallel(5) 这行就能100%工作,这是为什么,怎么解决呢?

yfq512 avatar Aug 22 '22 09:08 yfq512

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. Rotten issues close after 30d of inactivity. Close the stale issues and pull requests after 7 days of inactivity. Reopen the issue with /reopen.

stale[bot] avatar Oct 17 '22 05:10 stale[bot]