h2 icon indicating copy to clipboard operation
h2 copied to clipboard

Accepting the PUSH_PROMISE frame at the client

Open chanh1964 opened this issue 4 years ago • 2 comments

Dear developers,

I'm investigating hyper-h2 with the Server Push feature.

I'm trying the following flow:

  • Client sends a GET request (path: /file.txt)
  • Server responses along with a PUSH_PROMISE frame (path: /push.text)
  • Client receives the response and accepts the PUSH_PROMISE frame
  • Server sends the pushed response

However, my code is stuck at the client accepting the PUSH_PROMISE frame. Therefore, the server never sends the pushed response and the client never receives the pushed response.

Please let me know what I have missed in the codes or whether I understand the flow right.

Here are the codes:

Server (based on "Writing Your Server"):

import json
import socket

import h2.connection
import h2.events
import h2.config

def send_response(conn, event):
    stream_id = event.stream_id
    response_data = json.dumps(dict(event.headers)).encode('utf-8')
    
    response_headers = [
            (':status', '200'),
            ('server', 'test-h2-server'),
            ('content-length', str(len(response_data))),
            ('content-type', 'application/json'),
            ]

    send_push(conn, event)

    conn.send_headers(
        stream_id=stream_id,
        headers=response_headers,
    )
    conn.send_data(
        stream_id=stream_id,
        data=response_data,
        end_stream=True
    )

def send_push(conn, event):
    push_id=conn.get_next_available_stream_id()
    request_headers = event.headers
    push_headers = []
    
    for entry in request_headers:
        if ':path' in entry:
            if '/file.txt' in entry:
                push_headers.append((':path','/push.text'))
            else:
                return
        else:
            push_headers.append(entry)
    
    conn.push_stream(
        stream_id=event.stream_id,
        promised_stream_id=push_id,
        request_headers=push_headers
    )
    
def handle(sock):
    config = h2.config.H2Configuration(client_side=False)
    conn = h2.connection.H2Connection(config=config)
    conn.initiate_connection()
    sock.sendall(conn.data_to_send())
    while True:
        data = sock.recv(65535)
        if not data:
            break

        events = conn.receive_data(data)
        for event in events:
            print event
            if isinstance(event, h2.events.RequestReceived):
                send_response(conn, event)                

        data_to_send = conn.data_to_send()
        if data_to_send:
            sock.sendall(data_to_send)

print("TEST HTTP/2 SERVER")
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('0.0.0.0', 8080))
sock.listen(5)

while True:
    handle(sock.accept()[0])

Client (based on Hyper as suggested in "Writing Your Server"):

from hyper import HTTP20Connection as H

def Req(conn, path):
    conn.request('GET',path)
    r = conn.get_response()
    if r:        
        print r.headers
        for push in conn.get_pushes():
            print push.path         
            pr = push.get_response()
            print pr.headers

c = H('localhost:8080',enable_push=True)

Req(c,'/file.txt')

c.close()

Output at the server:

<RemoteSettingsChanged changed_settings:{ChangedSetting(setting=SettingCodes.HEADER_TABLE_SIZE, original_value=4096, new_value=4096), ChangedSetting(setting=SettingCodes.ENABLE_PUSH, original_value=1, new_value=1), ChangedSetting(setting=SettingCodes.MAX_CONCURRENT_STREAMS, original_value=None, new_value=100), ChangedSetting(setting=SettingCodes.INITIAL_WINDOW_SIZE, original_value=65535, new_value=65535), ChangedSetting(setting=SettingCodes._max_frame_size, original_value=16384, new_value=16384), ChangedSetting(setting=SettingCodes._max_header_list_size, original_value=None, new_value=65536)}>
<RemoteSettingsChanged changed_settings:{ChangedSetting(setting=SettingCodes.ENABLE_PUSH, original_value=1, new_value=1)}>
<SettingsAcknowledged changed_settings:{}>
<RequestReceived stream_id:1, headers:[(u':method', u'GET'), (u':scheme', u'http'), (u':authority', u'localhost'), (u':path', u'/file.txt')]>
<StreamEnded stream_id:1>

No event related to the PUSH_PROMISE is shown.

Output at the client:

HTTPHeaderMap([('server', 'test-h2-server'), ('content-length', '86'), ('content-type', 'application/json')])
/push.text

The code pauses at pr = push.get_response(). I think it is waiting for the response of the PUSH_PROMISE from the server, but nothing is returned.

I look forward to hearing from you soon. Thank you very much for your time. Best regards.

chanh1964 avatar Apr 30 '20 14:04 chanh1964

Some updates on this: I have tried again by modifying the asyncio-sever.py. This time I used Chrome to request the index.html file. The server sent the response and pushed the style.css file.

=> The index.html file was successfully sent but the style.css file was stalled forever. image

Code:

# -*- coding: utf-8 -*-
import asyncio
import io
import json
import ssl
import collections
from typing import List, Tuple

from h2.config import H2Configuration
from h2.connection import H2Connection
from h2.events import (
    ConnectionTerminated, DataReceived, RemoteSettingsChanged,
    RequestReceived, StreamEnded, StreamReset, WindowUpdated
)
from h2.errors import ErrorCodes
from h2.exceptions import ProtocolError, StreamClosedError
from h2.settings import SettingCodes

import os
import sys


RequestData = collections.namedtuple('RequestData', ['headers', 'data'])


class H2Protocol(asyncio.Protocol):
    def __init__(self):
        config = H2Configuration(client_side=False, header_encoding='utf-8')
        self.conn = H2Connection(config=config)
        self.transport = None
        self.stream_data = {}
        self.flow_control_futures = {}

    def connection_made(self, transport: asyncio.Transport):
        self.transport = transport
        self.conn.initiate_connection()
        self.transport.write(self.conn.data_to_send())

    def connection_lost(self, exc):
        for future in self.flow_control_futures.values():
            future.cancel()
        self.flow_control_futures = {}

    def data_received(self, data: bytes):
        try:
            events = self.conn.receive_data(data)
        except ProtocolError as e:
            self.transport.write(self.conn.data_to_send())
            self.transport.close()
        else:
            self.transport.write(self.conn.data_to_send())
            for event in events:
                print(event)
                if isinstance(event, RequestReceived):
                    self.request_received(event.headers, event.stream_id)
                elif isinstance(event, DataReceived):
                    self.receive_data(event.data, event.stream_id)
                elif isinstance(event, StreamEnded):
                    self.stream_complete(event.stream_id)
                elif isinstance(event, ConnectionTerminated):
                    self.transport.close()
                elif isinstance(event, StreamReset):
                    self.stream_reset(event.stream_id)
                elif isinstance(event, WindowUpdated):
                    self.window_updated(event.stream_id, event.delta)
                elif isinstance(event, RemoteSettingsChanged):
                    if SettingCodes.INITIAL_WINDOW_SIZE in event.changed_settings:
                        self.window_updated(None, 0)

                self.transport.write(self.conn.data_to_send())

    def request_received(self, headers: List[Tuple[str, str]], stream_id: int):
        headers = collections.OrderedDict(headers)
        method = headers[':method']

        # Store off the request data.
        try:
            dt = open("."+headers[':path'], "rb")
            request_data = RequestData(headers, dt.read())
        except FileNotFoundError:
            request_data = RequestData(headers,"File Not Found".encode('utf-8'))
        self.stream_data[stream_id] = request_data

    def stream_complete(self, stream_id: int):
        """
        When a stream is complete, we can send our response.
        """
        try:
            request_data = self.stream_data[stream_id]
        except KeyError:
            # Just return, we probably 405'd this already
            return

        headers = request_data.headers
        body = request_data.data.decode('utf-8')

        data = request_data.data
        content_type = 'text/html' if (headers[':path'].endswith('html')) else 'text/css'

        response_headers = (
            (':status', '200'),
            ('content-type', content_type),
            ('content-length', str(len(data))),
            ('server', 'asyncio-h2'),
        )
        self.send_push(headers,stream_id)
        self.conn.send_headers(stream_id, response_headers)
        asyncio.ensure_future(self.send_data(data, stream_id))
        
    def send_push(self,headers,stream_id):
        if '/index.html' not in headers[':path']:
            return
        push_id = self.conn.get_next_available_stream_id()
        push_headers = [];
        
        entries = [':method',':scheme',':authority',':path']
        
        for entry in entries:
            if ':path' in entry:
                push_headers.append((entry,'/style.css'))
            else:
                push_headers.append((entry,headers[entry]))       
                        
        
        self.conn.push_stream(
            stream_id=stream_id,
            promised_stream_id=push_id,
            request_headers=push_headers
        )        
        
    def receive_data(self, data: bytes, stream_id: int):
        """
        We've received some data on a stream. If that stream is one we're
        expecting data on, save it off. Otherwise, reset the stream.
        """
        try:
            stream_data = self.stream_data[stream_id]
        except KeyError:
            self.conn.reset_stream(
                stream_id, error_code=ErrorCodes.PROTOCOL_ERROR
            )
        else:
            stream_data.data.write(data)

    def stream_reset(self, stream_id):
        """
        A stream reset was sent. Stop sending data.
        """
        if stream_id in self.flow_control_futures:
            future = self.flow_control_futures.pop(stream_id)
            future.cancel()

    async def send_data(self, data, stream_id):
        """
        Send data according to the flow control rules.
        """
        while data:
            while self.conn.local_flow_control_window(stream_id) < 1:
                try:
                    await self.wait_for_flow_control(stream_id)
                except asyncio.CancelledError:
                    return

            chunk_size = min(
                self.conn.local_flow_control_window(stream_id),
                len(data),
                self.conn.max_outbound_frame_size,
            )

            try:
                self.conn.send_data(
                    stream_id,
                    data[:chunk_size],
                    end_stream=(chunk_size == len(data))
                )
            except (StreamClosedError, ProtocolError):
                # The stream got closed and we didn't get told. We're done
                # here.
                break

            self.transport.write(self.conn.data_to_send())
            data = data[chunk_size:]

    async def wait_for_flow_control(self, stream_id):
        """
        Waits for a Future that fires when the flow control window is opened.
        """
        f = asyncio.Future()
        self.flow_control_futures[stream_id] = f
        await f

    def window_updated(self, stream_id, delta):
        """
        A window update frame was received. Unblock some number of flow control
        Futures.
        """
        if stream_id and stream_id in self.flow_control_futures:
            f = self.flow_control_futures.pop(stream_id)
            f.set_result(delta)
        elif not stream_id:
            for f in self.flow_control_futures.values():
                f.set_result(delta)

            self.flow_control_futures = {}


ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.options |= (
    ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_COMPRESSION
)
ssl_context.load_cert_chain(certfile="cert.crt", keyfile="cert.key")
ssl_context.set_alpn_protocols(["h2"])

loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(H2Protocol, '127.0.0.1', 8443, ssl=ssl_context)
server = loop.run_until_complete(coro)

# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

Output:

Serving on ('127.0.0.1', 8443)
<RemoteSettingsChanged changed_settings:{ChangedSetting(setting=SettingCodes.HEADER_TABLE_SIZE, original_value=4096, new_value=65536), ChangedSetting(setting=SettingCodes.MAX_CONCURRENT_STREAMS, original_value=None, new_value=1000), ChangedSetting(setting=SettingCodes.INITIAL_WINDOW_SIZE, original_value=65535, new_value=6291456), ChangedSetting(setting=SettingCodes.MAX_HEADER_LIST_SIZE, original_value=None, new_value=262144)}>
<WindowUpdated stream_id:0, delta:15663105>
<RequestReceived stream_id:1, headers:[(':method', 'GET'), (':authority', 'localhost:8443'), (':scheme', 'https'), (':path', '/index.html'), ('cache-control', 'max-age=0'), ('upgrade-insecure-requests', '1'), ('user-agent', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'), ('accept', 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9'), ('sec-fetch-site', 'cross-site'), ('sec-fetch-mode', 'navigate'), ('sec-fetch-user', '?1'), ('sec-fetch-dest', 'document'), ('accept-encoding', 'gzip, deflate, br'), ('accept-language', 'en-US,en;q=0.9')]>
<StreamEnded stream_id:1>
<PriorityUpdated stream_id:1, weight:256, depends_on:0, exclusive:True>
<SettingsAcknowledged changed_settings:{}>
<PriorityUpdated stream_id:2, weight:110, depends_on:1, exclusive:True>

chanh1964 avatar May 07 '20 11:05 chanh1964

I think you are missing a call to send_data with your style.css content. Once you send the PUSH_PROMISE, the client expects to receive DATA frames - but I think your server is not sending any.

Kriechi avatar Jul 31 '20 12:07 Kriechi