Add initial working implementation of capture and playback.
This only works on my system with my modified version of the mouse library.
This commit is contained in:
parent
ff8955506d
commit
342926610b
|
@ -0,0 +1,103 @@
|
|||
import argparse
|
||||
import functools
|
||||
import keyboard
|
||||
import logging
|
||||
import mouse
|
||||
import pickle
|
||||
import queue
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
||||
LOGGER = logging.getLogger("capture")
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"-d", "--delay",
|
||||
default=0,
|
||||
type=int,
|
||||
help="Seconds to wait before capture."
|
||||
)
|
||||
parser.add_argument(
|
||||
"-t", "--trigger",
|
||||
default=None,
|
||||
help="The key to use to trigger start and stop of capture."
|
||||
)
|
||||
parser.add_argument(
|
||||
"-o", "--output",
|
||||
default="dump.capture",
|
||||
help="Name of the file to capture to."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--verbose",
|
||||
action="store_true",
|
||||
help="Show verbose logging.",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG if args.verbose else logging.INFO)
|
||||
if args.delay and args.trigger:
|
||||
print("You cannot specify 'delay' and 'trigger'")
|
||||
sys.exit(1)
|
||||
|
||||
now = time.time()
|
||||
_do_delay(args.delay)
|
||||
_do_trigger(args.trigger)
|
||||
# Add dummy events to lock in the time start
|
||||
event_queue = queue.Queue()
|
||||
event_queue.put(
|
||||
mouse.ButtonEvent(
|
||||
event_type=mouse.UP,
|
||||
button=mouse.RIGHT,
|
||||
time=now,
|
||||
))
|
||||
event_queue.put(
|
||||
keyboard.KeyboardEvent(
|
||||
event_type=keyboard.KEY_UP,
|
||||
name=" ",
|
||||
scan_code=57,
|
||||
time=now,
|
||||
)
|
||||
)
|
||||
hook = functools.partial(_on_hook, event_queue)
|
||||
keyhook = keyboard.hook(hook)
|
||||
mousehook = mouse.hook(hook)
|
||||
print("Capturing...")
|
||||
try:
|
||||
keyboard.wait(args.trigger)
|
||||
except KeyboardInterrupt:
|
||||
keyboard.unhook(keyhook)
|
||||
mouse.unhook(mousehook)
|
||||
|
||||
_save_events(event_queue, args.output)
|
||||
|
||||
def _on_hook(event_queue, event):
|
||||
LOGGER.debug(str(event))
|
||||
event_queue.put(event, block=False)
|
||||
|
||||
def _do_delay(delay: int) -> None:
|
||||
if not delay:
|
||||
return
|
||||
print("\n")
|
||||
for i in range(delay):
|
||||
print(f"\rStarting in {delay-i} seconds")
|
||||
time.sleep(1)
|
||||
|
||||
def _do_trigger(trigger: str) -> None:
|
||||
if not trigger:
|
||||
return
|
||||
print(f"Waiting for '{trigger}'")
|
||||
keyboard.wait(trigger)
|
||||
|
||||
def _save_events(event_q: queue.Queue, filename: str) -> None:
|
||||
events = []
|
||||
while not event_q.empty():
|
||||
event = event_q.get(block=False)
|
||||
events.append(event)
|
||||
with open(filename, "wb") as output:
|
||||
pickle.dump(events, output)
|
||||
print(f"Wrote to {filename}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -0,0 +1,95 @@
|
|||
import argparse
|
||||
import keyboard
|
||||
import logging
|
||||
import mouse
|
||||
import pickle
|
||||
import threading
|
||||
import time
|
||||
|
||||
LOGGER = logging.getLogger("playback")
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"-d", "--delay",
|
||||
default=5,
|
||||
type=int,
|
||||
help="Seconds to wait before replay."
|
||||
)
|
||||
parser.add_argument(
|
||||
"-i", "--input",
|
||||
default="dump.capture",
|
||||
help="Name of the file to replay from."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--verbose",
|
||||
action="store_true",
|
||||
help="Show verbose messages."
|
||||
)
|
||||
args = parser.parse_args()
|
||||
logging.basicConfig(
|
||||
level = logging.DEBUG if args.verbose else logging.INFO
|
||||
)
|
||||
_do_delay(args.delay)
|
||||
|
||||
events = _load_events(args.input)
|
||||
_play_events(events)
|
||||
|
||||
def _play_events(events) -> None:
|
||||
LOGGER.info("Playback started.")
|
||||
key_state = keyboard.stash_state()
|
||||
last_time = None
|
||||
for event in events:
|
||||
if last_time is not None:
|
||||
to_sleep = event.time - last_time
|
||||
if to_sleep > 0:
|
||||
time.sleep(to_sleep)
|
||||
last_time = event.time
|
||||
if isinstance(event, keyboard.KeyboardEvent):
|
||||
_play_event_keyboard(event)
|
||||
elif any([
|
||||
isinstance(event, mouse.ButtonEvent),
|
||||
isinstance(event, mouse.MoveEvent),
|
||||
isinstance(event, mouse.WheelEvent),
|
||||
]):
|
||||
_play_event_mouse(event)
|
||||
else:
|
||||
raise ValueError(f"Not a recognized event {event}")
|
||||
|
||||
keyboard.restore_modifiers(key_state)
|
||||
LOGGER.info("Done.")
|
||||
|
||||
def _play_event_keyboard(event) -> None:
|
||||
LOGGER.debug("Key %s", event)
|
||||
key = event.scan_code or event.name
|
||||
keyboard.press(key) if event.event_type == keyboard.KEY_DOWN else keyboard.release(key)
|
||||
|
||||
def _play_event_mouse(event) -> None:
|
||||
LOGGER.debug("Mouse %s", event)
|
||||
if isinstance(event, mouse.ButtonEvent):
|
||||
if event.event_type == mouse.UP:
|
||||
mouse.release(event.button)
|
||||
else:
|
||||
mouse.press(event.button)
|
||||
elif isinstance(event, mouse.MoveEvent):
|
||||
mouse.move(event.x, event.y, absolute=True)
|
||||
elif isinstance(event, mouse.WheelEvent):
|
||||
mouse.wheel(event.delta)
|
||||
|
||||
def _do_delay(delay: int) -> None:
|
||||
if not delay:
|
||||
return
|
||||
print("\n")
|
||||
for i in range(delay):
|
||||
print(f"\rStarting in {delay-i} seconds")
|
||||
time.sleep(1)
|
||||
|
||||
def _load_events(filename: str):
|
||||
with open(filename, "rb") as input_:
|
||||
events = pickle.load(input_)
|
||||
LOGGER.debug("Loaded %s", filename)
|
||||
return events
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Reference in New Issue