606 lines
16 KiB
Python
606 lines
16 KiB
Python
|
import sys
|
||
|
import os
|
||
|
import subprocess
|
||
|
import gi
|
||
|
|
||
|
gi.require_version('Gdk', '4.0')
|
||
|
gi.require_version('Gtk', '4.0')
|
||
|
|
||
|
from gi.repository import GLib, GObject, Gdk, Gtk
|
||
|
from pydbus import SessionBus
|
||
|
|
||
|
verbose = True
|
||
|
|
||
|
remote_desktop = None
|
||
|
screen_cast = None
|
||
|
session = None
|
||
|
stream_path = None
|
||
|
done = False
|
||
|
|
||
|
def terminate():
|
||
|
sys.exit(1)
|
||
|
|
||
|
loop = None
|
||
|
|
||
|
def quit_cb(loop):
|
||
|
loop.quit()
|
||
|
print('timed out while waiting')
|
||
|
|
||
|
def wait(millis):
|
||
|
global loop
|
||
|
before = GLib.get_monotonic_time()
|
||
|
loop = GLib.MainLoop()
|
||
|
GLib.timeout_add(millis, quit_cb, loop)
|
||
|
loop.run()
|
||
|
if verbose:
|
||
|
time = (GLib.get_monotonic_time() - before) / 1000
|
||
|
print(f'waited for {time} milliseconds')
|
||
|
|
||
|
display = None
|
||
|
window = None
|
||
|
entry = None
|
||
|
expected_change = None
|
||
|
|
||
|
def key_pressed_cb (controller, keyval, keycode, state):
|
||
|
global expected_change
|
||
|
global loop
|
||
|
|
||
|
if verbose:
|
||
|
print(f'got key press: {keyval}, state {state}')
|
||
|
assert expected_change != None, 'Unexpected key press'
|
||
|
assert expected_change['type'] == 'press', 'Key press event expected'
|
||
|
assert keyval == expected_change['keyval'], 'Unexpected keyval in key press event'
|
||
|
assert state == expected_change['state'], 'Unexpected state in key press event'
|
||
|
|
||
|
expected_change = None
|
||
|
loop.quit()
|
||
|
|
||
|
def key_released_cb (controller, keyval, keycode, state):
|
||
|
global expected_change
|
||
|
global loop
|
||
|
|
||
|
if verbose:
|
||
|
print(f'got key release: {keyval}, state {state}')
|
||
|
assert expected_change != None, 'Unexpected key release'
|
||
|
assert expected_change['type'] == 'release', 'Key release event expected'
|
||
|
assert keyval == expected_change['keyval'], 'Unexpected keyval in key release event'
|
||
|
assert state == expected_change['state'], 'Unexpected state in key release event'
|
||
|
|
||
|
expected_change = None
|
||
|
loop.quit()
|
||
|
|
||
|
def motion_cb (controller, x, y):
|
||
|
global expected_change
|
||
|
global loop
|
||
|
|
||
|
if verbose:
|
||
|
print(f'got motion: {x}, {y}')
|
||
|
if expected_change != None:
|
||
|
assert expected_change['type'] == 'motion', 'Motion event expected'
|
||
|
assert x == expected_change['x'], 'Unexpected x coord in motion event'
|
||
|
assert y == expected_change['y'], 'Unexpected y coord in motion event'
|
||
|
expected_change = None
|
||
|
loop.quit()
|
||
|
|
||
|
def enter_cb (controller, x, y):
|
||
|
global expected_change
|
||
|
global loop
|
||
|
|
||
|
if verbose:
|
||
|
print(f'got enter: {x}, {y}')
|
||
|
assert expected_change != None, 'Unexpected enter'
|
||
|
assert expected_change['type'] == 'enter', 'Enter event expected'
|
||
|
assert x == expected_change['x'], 'Unexpected x coord in enter event'
|
||
|
assert y == expected_change['y'], 'Unexpected y coord in enter event'
|
||
|
|
||
|
expected_change = None
|
||
|
loop.quit()
|
||
|
|
||
|
def pressed_cb(controller, n, x, y):
|
||
|
global expected_change
|
||
|
global loop
|
||
|
|
||
|
if verbose:
|
||
|
print(f'got pressed')
|
||
|
assert expected_change != None, 'Unexpected event'
|
||
|
assert expected_change['type'] == 'press', 'Button press expected'
|
||
|
assert expected_change['button'] == controller.get_current_button(), 'Unexpected button pressed'
|
||
|
assert x == expected_change['x'], 'Unexpected x coord in motion event'
|
||
|
assert y == expected_change['y'], 'Unexpected y coord in motion event'
|
||
|
|
||
|
expected_change = None
|
||
|
loop.quit()
|
||
|
|
||
|
def released_cb(controller, n, x, y):
|
||
|
global expected_change
|
||
|
global loop
|
||
|
|
||
|
if verbose:
|
||
|
print(f'got released')
|
||
|
assert expected_change != None, 'Unexpected event'
|
||
|
assert expected_change['type'] == 'release', 'Button release expected'
|
||
|
|
||
|
expected_change = None
|
||
|
loop.quit()
|
||
|
|
||
|
def expect_key_press(keyval, state, timeout):
|
||
|
global expected_change
|
||
|
expected_change = {
|
||
|
'type' : 'press',
|
||
|
'keyval' : keyval,
|
||
|
'state' : state
|
||
|
}
|
||
|
wait(timeout)
|
||
|
assert expected_change == None, 'Expected event did not happen'
|
||
|
|
||
|
def expect_key_release(keyval, state, timeout):
|
||
|
global expected_change
|
||
|
expected_change = {
|
||
|
'type' : 'release',
|
||
|
'keyval' : keyval,
|
||
|
'state' : state
|
||
|
}
|
||
|
wait(timeout)
|
||
|
assert expected_change == None, 'Expected event did not happen'
|
||
|
|
||
|
def expect_motion(x, y, timeout):
|
||
|
global expected_change
|
||
|
expected_change = {
|
||
|
'type' : 'motion',
|
||
|
'x' : x,
|
||
|
'y' : y
|
||
|
}
|
||
|
wait(timeout)
|
||
|
assert expected_change == None, 'Expected event did not happen'
|
||
|
|
||
|
def expect_enter(x, y, timeout):
|
||
|
global expected_change
|
||
|
expected_change = {
|
||
|
'type' : 'enter',
|
||
|
'x' : x,
|
||
|
'y' : y
|
||
|
}
|
||
|
wait(timeout)
|
||
|
assert expected_change == None, 'Expected event did not happen'
|
||
|
|
||
|
def expect_button_press(button, x, y, timeout):
|
||
|
global expected_change
|
||
|
expected_change = {
|
||
|
'type' : 'press',
|
||
|
'button' : button,
|
||
|
'x' : x,
|
||
|
'y' : y
|
||
|
}
|
||
|
wait(timeout)
|
||
|
assert expected_change == None, 'Button press did not arrive'
|
||
|
|
||
|
def expect_button_release(button, x, y, timeout):
|
||
|
global expected_change
|
||
|
expected_change = {
|
||
|
'type' : 'release',
|
||
|
'button' : button,
|
||
|
'x' : x,
|
||
|
'y' : y
|
||
|
}
|
||
|
wait(timeout)
|
||
|
assert expected_change == None, 'Button release did not arrive'
|
||
|
|
||
|
def got_active(object, pspec):
|
||
|
global loop
|
||
|
object.disconnect_by_func(got_active)
|
||
|
loop.quit()
|
||
|
|
||
|
def launch_observer():
|
||
|
global display
|
||
|
global window
|
||
|
|
||
|
if verbose:
|
||
|
print('launch observer')
|
||
|
|
||
|
if display == None:
|
||
|
display = Gdk.Display.open(os.getenv('WAYLAND_DISPLAY'))
|
||
|
|
||
|
window = Gtk.Window.new()
|
||
|
|
||
|
controller = Gtk.EventControllerKey.new()
|
||
|
controller.set_propagation_phase(Gtk.PropagationPhase.CAPTURE)
|
||
|
controller.connect('key-pressed', key_pressed_cb)
|
||
|
controller.connect('key-released', key_released_cb)
|
||
|
window.add_controller(controller)
|
||
|
|
||
|
controller = Gtk.EventControllerMotion.new()
|
||
|
controller.set_propagation_phase(Gtk.PropagationPhase.CAPTURE)
|
||
|
controller.connect('enter', enter_cb)
|
||
|
controller.connect('motion', motion_cb)
|
||
|
window.add_controller(controller)
|
||
|
|
||
|
controller = Gtk.GestureClick.new()
|
||
|
controller.set_propagation_phase(Gtk.PropagationPhase.CAPTURE)
|
||
|
controller.connect('pressed', pressed_cb)
|
||
|
controller.connect('released', released_cb)
|
||
|
window.add_controller(controller)
|
||
|
|
||
|
window.connect('notify::is-active', got_active)
|
||
|
window.maximize()
|
||
|
window.present()
|
||
|
|
||
|
wait(2000)
|
||
|
|
||
|
assert window.is_active(), 'Observer not active'
|
||
|
assert window.get_width() == 1024, 'Window not maximized'
|
||
|
assert window.get_height() == 768, 'Window not maximized'
|
||
|
|
||
|
# we need to wait out the map animation, or pointer coords will be off
|
||
|
wait(1000)
|
||
|
|
||
|
def launch_entry():
|
||
|
global display
|
||
|
global window
|
||
|
global entry
|
||
|
|
||
|
if verbose:
|
||
|
print('launch entry')
|
||
|
|
||
|
if display == None:
|
||
|
display = Gdk.Display.open(os.getenv('WAYLAND_DISPLAY'))
|
||
|
|
||
|
window = Gtk.Window.new()
|
||
|
|
||
|
entry = Gtk.Entry.new()
|
||
|
|
||
|
window.set_child(entry)
|
||
|
|
||
|
window.connect('notify::is-active', got_active)
|
||
|
window.maximize()
|
||
|
window.present()
|
||
|
|
||
|
wait(500)
|
||
|
|
||
|
assert window.is_active(), "Observer not active"
|
||
|
assert window.get_width() == 1024, "Window not maximized"
|
||
|
assert window.get_height() == 768, "Window not maximized"
|
||
|
|
||
|
# we need to wait out the map animation, or pointer coords will be off
|
||
|
wait(1000)
|
||
|
|
||
|
def stop_observer():
|
||
|
global window
|
||
|
window.destroy()
|
||
|
window = None
|
||
|
|
||
|
def expect_entry_text(text):
|
||
|
assert text == entry.get_text(), 'Unexpected entry text: ' + entry.get_text()
|
||
|
|
||
|
def key_press(keyval):
|
||
|
if verbose:
|
||
|
print(f'press key {keyval}')
|
||
|
session.NotifyKeyboardKeysym(keyval, True)
|
||
|
|
||
|
def key_release(keyval):
|
||
|
if verbose:
|
||
|
print(f'release key {keyval}')
|
||
|
session.NotifyKeyboardKeysym(keyval, False)
|
||
|
|
||
|
buttons = {
|
||
|
1 : 0x110,
|
||
|
2 : 0x111,
|
||
|
3 : 0x112
|
||
|
}
|
||
|
|
||
|
def button_press(button):
|
||
|
if verbose:
|
||
|
print(f'press button {button}')
|
||
|
session.NotifyPointerButton(buttons[button], True)
|
||
|
|
||
|
def button_release(button):
|
||
|
if verbose:
|
||
|
print(f'release button {button}')
|
||
|
session.NotifyPointerButton(buttons[button], False)
|
||
|
|
||
|
def pointer_move(x, y):
|
||
|
if verbose:
|
||
|
print(f'pointer move {x} {y}')
|
||
|
session.NotifyPointerMotionAbsolute(stream_path, x, y)
|
||
|
|
||
|
def basic_keyboard_tests():
|
||
|
try:
|
||
|
if verbose:
|
||
|
print('Starting basic keyboard tests')
|
||
|
|
||
|
launch_observer()
|
||
|
|
||
|
key_press(Gdk.KEY_a)
|
||
|
expect_key_press(keyval=Gdk.KEY_a, state=0, timeout=100)
|
||
|
|
||
|
key_release(Gdk.KEY_a)
|
||
|
expect_key_release(keyval=Gdk.KEY_a, state=0, timeout=100)
|
||
|
|
||
|
key_press(Gdk.KEY_Control_L)
|
||
|
expect_key_press(keyval=Gdk.KEY_Control_L, state=0, timeout=100)
|
||
|
|
||
|
key_press(Gdk.KEY_x)
|
||
|
expect_key_press(keyval=Gdk.KEY_x, state=Gdk.ModifierType.CONTROL_MASK, timeout=100)
|
||
|
|
||
|
key_release(Gdk.KEY_Control_L)
|
||
|
expect_key_release(keyval=Gdk.KEY_Control_L, state=Gdk.ModifierType.CONTROL_MASK, timeout=100)
|
||
|
|
||
|
key_release(Gdk.KEY_x)
|
||
|
expect_key_release(keyval=Gdk.KEY_x, state=0, timeout=100)
|
||
|
|
||
|
stop_observer()
|
||
|
except AssertionError as e:
|
||
|
print(f'Error in basic_keyboard_tests: {e}')
|
||
|
terminate()
|
||
|
|
||
|
def quick_typing_test():
|
||
|
try:
|
||
|
launch_entry()
|
||
|
|
||
|
key_press(Gdk.KEY_T)
|
||
|
key_release(Gdk.KEY_T)
|
||
|
key_press(Gdk.KEY_e)
|
||
|
key_release(Gdk.KEY_e)
|
||
|
key_press(Gdk.KEY_s)
|
||
|
key_release(Gdk.KEY_s)
|
||
|
key_press(Gdk.KEY_t)
|
||
|
key_release(Gdk.KEY_t)
|
||
|
|
||
|
wait(100)
|
||
|
expect_entry_text('Test')
|
||
|
|
||
|
stop_observer()
|
||
|
except AssertionError as e:
|
||
|
print(f'Error in quick_typing_test: {e}')
|
||
|
terminate()
|
||
|
|
||
|
def basic_pointer_tests():
|
||
|
try:
|
||
|
if verbose:
|
||
|
print('Starting basic pointer tests')
|
||
|
|
||
|
pointer_move(-100.0, -100.0)
|
||
|
launch_observer()
|
||
|
|
||
|
# observer window is maximized, so window coords == global coords
|
||
|
pointer_move(500.0, 300.0)
|
||
|
expect_enter(x=500, y=300, timeout=200)
|
||
|
|
||
|
pointer_move(400.0, 200.0)
|
||
|
expect_motion(x=400, y=200, timeout=200)
|
||
|
|
||
|
button_press(1)
|
||
|
expect_button_press(button=1, x=400, y=200, timeout=200)
|
||
|
|
||
|
pointer_move(220.0, 200.0)
|
||
|
expect_motion(x=220, y=200, timeout=200)
|
||
|
|
||
|
button_release(1)
|
||
|
expect_button_release(button=1, x=220, y=200, timeout=200)
|
||
|
|
||
|
stop_observer()
|
||
|
except AssertionError as e:
|
||
|
print(f'Error in basic_pointer_tests: {e}')
|
||
|
terminate()
|
||
|
|
||
|
ds_window = None
|
||
|
ds = None
|
||
|
|
||
|
def drag_begin(controller, drag):
|
||
|
global expected_change
|
||
|
global loop
|
||
|
|
||
|
if verbose:
|
||
|
print('got drag begin')
|
||
|
assert expected_change != None, 'Unexpected drag begin'
|
||
|
assert expected_change['type'] == 'drag', 'Drag begin expected'
|
||
|
|
||
|
expected_change = None
|
||
|
loop.quit()
|
||
|
|
||
|
def launch_drag_source(value):
|
||
|
global display
|
||
|
global ds_window
|
||
|
global ds
|
||
|
|
||
|
if verbose:
|
||
|
print('launch drag source')
|
||
|
|
||
|
if display == None:
|
||
|
display = Gdk.Display.open(os.getenv('WAYLAND_DISPLAY'))
|
||
|
|
||
|
ds_window = Gtk.Window.new()
|
||
|
ds_window.set_title('Drag Source')
|
||
|
|
||
|
ds = Gtk.DragSource.new()
|
||
|
ds.set_content(Gdk.ContentProvider.new_for_value(value))
|
||
|
ds_window.add_controller(ds)
|
||
|
ds.connect('drag-begin', drag_begin)
|
||
|
|
||
|
controller = Gtk.GestureClick.new()
|
||
|
controller.set_propagation_phase(Gtk.PropagationPhase.CAPTURE)
|
||
|
controller.connect('pressed', pressed_cb)
|
||
|
controller.connect('released', released_cb)
|
||
|
ds_window.add_controller(controller)
|
||
|
|
||
|
ds_window.connect('notify::is-active', got_active)
|
||
|
ds_window.maximize()
|
||
|
ds_window.present()
|
||
|
|
||
|
wait(2000)
|
||
|
|
||
|
assert ds_window.is_active(), 'drag source not active'
|
||
|
assert ds_window.get_width() == 1024, 'Window not maximized'
|
||
|
assert ds_window.get_height() == 768, 'Window not maximized'
|
||
|
|
||
|
# we need to wait out the map animation, or pointer coords will be off
|
||
|
wait(1000)
|
||
|
|
||
|
def stop_drag_source():
|
||
|
global ds_window
|
||
|
ds_window.destroy()
|
||
|
ds_window = None
|
||
|
|
||
|
dt_window = None
|
||
|
|
||
|
def do_drop(controller, value, x, y):
|
||
|
global expected_change
|
||
|
global loop
|
||
|
|
||
|
if verbose:
|
||
|
print(f'got drop {value}')
|
||
|
assert expected_change != None, 'Unexpected drop begin'
|
||
|
assert expected_change['type'] == 'drop', 'Drop expected'
|
||
|
assert expected_change['value'] == value, 'Unexpected value dropped'
|
||
|
|
||
|
expected_change = None
|
||
|
loop.quit()
|
||
|
|
||
|
def launch_drop_target():
|
||
|
global display
|
||
|
global dt_window
|
||
|
|
||
|
if verbose:
|
||
|
print('launch drop target')
|
||
|
|
||
|
if display == None:
|
||
|
display = Gdk.Display.open(os.getenv('WAYLAND_DISPLAY'))
|
||
|
|
||
|
dt_window = Gtk.Window.new()
|
||
|
dt_window.set_title('Drop Target')
|
||
|
|
||
|
controller = Gtk.DropTarget.new(GObject.TYPE_STRING, Gdk.DragAction.COPY)
|
||
|
dt_window.add_controller(controller)
|
||
|
controller.connect('drop', do_drop)
|
||
|
|
||
|
dt_window.connect('notify::is-active', got_active)
|
||
|
dt_window.maximize()
|
||
|
dt_window.present()
|
||
|
|
||
|
wait(2000)
|
||
|
|
||
|
assert dt_window.is_active(), 'drop target not active'
|
||
|
assert dt_window.get_width() == 1024, 'Window not maximized'
|
||
|
assert dt_window.get_height() == 768, 'Window not maximized'
|
||
|
|
||
|
# we need to wait out the map animation, or pointer coords will be off
|
||
|
wait(1000)
|
||
|
|
||
|
def stop_drop_target():
|
||
|
global dt_window
|
||
|
dt_window.destroy()
|
||
|
dt_window = None
|
||
|
|
||
|
def expect_drag(timeout):
|
||
|
global expected_change
|
||
|
expected_change = {
|
||
|
'type' : 'drag',
|
||
|
}
|
||
|
wait(timeout)
|
||
|
assert expected_change == None, 'DND operation not started'
|
||
|
|
||
|
def expect_drop(value, timeout):
|
||
|
global expected_change
|
||
|
expected_change = {
|
||
|
'type' : 'drop',
|
||
|
'value' : value
|
||
|
}
|
||
|
wait(timeout)
|
||
|
assert expected_change == None, 'Drop has not happened'
|
||
|
|
||
|
def dnd_tests():
|
||
|
try:
|
||
|
if verbose:
|
||
|
print('Starting dnd tests')
|
||
|
|
||
|
pointer_move(-100, -100)
|
||
|
|
||
|
launch_drag_source('abc')
|
||
|
wait(100);
|
||
|
|
||
|
pointer_move(100, 100)
|
||
|
wait(100);
|
||
|
button_press(1)
|
||
|
expect_button_press(button=1, x=100, y=100, timeout=300)
|
||
|
# need to wait out the MIN_TIME_TO_DND
|
||
|
wait(150)
|
||
|
|
||
|
pointer_move(120, 150)
|
||
|
expect_drag(timeout=2000)
|
||
|
|
||
|
launch_drop_target()
|
||
|
wait(100);
|
||
|
button_release(1)
|
||
|
expect_drop('abc', timeout=2000)
|
||
|
|
||
|
stop_drop_target()
|
||
|
stop_drag_source()
|
||
|
except AssertionError as e:
|
||
|
print(f'Error in dnd_tests: {e}')
|
||
|
terminate()
|
||
|
|
||
|
def session_closed_cb():
|
||
|
print('Session closed')
|
||
|
|
||
|
def run_commands():
|
||
|
basic_keyboard_tests()
|
||
|
basic_pointer_tests()
|
||
|
dnd_tests()
|
||
|
quick_typing_test()
|
||
|
|
||
|
def mutter_appeared(name):
|
||
|
global remote_desktop
|
||
|
global session
|
||
|
global stream_path
|
||
|
global done
|
||
|
|
||
|
if verbose:
|
||
|
print('mutter appeared on the bus')
|
||
|
|
||
|
remote_desktop = bus.get('org.gnome.Mutter.RemoteDesktop',
|
||
|
'/org/gnome/Mutter/RemoteDesktop')
|
||
|
device_types = remote_desktop.Get('org.gnome.Mutter.RemoteDesktop', 'SupportedDeviceTypes')
|
||
|
assert device_types & 1 == 1, 'No keyboard'
|
||
|
assert device_types & 2 == 2, 'No pointer'
|
||
|
|
||
|
screen_cast = bus.get('org.gnome.Mutter.ScreenCast',
|
||
|
'/org/gnome/Mutter/ScreenCast')
|
||
|
|
||
|
session_path = remote_desktop.CreateSession()
|
||
|
session = bus.get('org.gnome.Mutter.RemoteDesktop', session_path)
|
||
|
session.onClosed = session_closed_cb
|
||
|
|
||
|
screen_cast_session_path = screen_cast.CreateSession({ 'remote-desktop-session-id' : GLib.Variant('s', session.SessionId)})
|
||
|
screen_cast_session = bus.get('org.gnome.Mutter.ScreenCast', screen_cast_session_path)
|
||
|
|
||
|
stream_path = screen_cast_session.RecordMonitor('Meta-0', {})
|
||
|
session.Start()
|
||
|
|
||
|
# work around lack of initial devices
|
||
|
key_press(Gdk.KEY_Control_L)
|
||
|
key_release(Gdk.KEY_Control_L)
|
||
|
pointer_move(-100, -100)
|
||
|
|
||
|
run_commands()
|
||
|
|
||
|
session.Stop()
|
||
|
|
||
|
if verbose:
|
||
|
print('Done running commands, exiting...')
|
||
|
done = True
|
||
|
|
||
|
def mutter_vanished():
|
||
|
global done
|
||
|
if remote_desktop != None:
|
||
|
if verbose:
|
||
|
print('mutter left the bus')
|
||
|
done = True
|
||
|
|
||
|
bus = SessionBus()
|
||
|
bus.watch_name('org.gnome.Mutter.RemoteDesktop', 0, mutter_appeared, mutter_vanished)
|
||
|
|
||
|
try:
|
||
|
while not done:
|
||
|
GLib.MainContext.default().iteration(True)
|
||
|
except KeyboardInterrupt:
|
||
|
print('Interrupted')
|