Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python celery plugin #125

Merged
merged 7 commits into from
Jul 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/EnvVars.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ Environment Variable | Description | Default
| `SW_KAFKA_REPORTER_TOPIC_MANAGEMENT` | Specifying Kafka topic name for service instance reporting and registering. | `skywalking-managements` |
| `SW_KAFKA_REPORTER_TOPIC_SEGMENT` | Specifying Kafka topic name for Tracing data. | `skywalking-segments` |
| `SW_KAFKA_REPORTER_CONFIG_key` | The configs to init KafkaProducer. it support the basic arguments (whose type is either `str`, `bool`, or `int`) listed [here](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer) | unset |
| `SW_CELERY_PARAMETERS_LENGTH`| The maximum length of `celery` functions parameters, longer than this will be truncated, 0 turns off | `512` |
7 changes: 5 additions & 2 deletions docs/Plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Library | Versions | Plugin Name
| :--- | :--- | :--- |
| [http.server](https://docs.python.org/3/library/http.server.html) | Python 3.5 ~ 3.9 | `sw_http_server` |
| [urllib.request](https://docs.python.org/3/library/urllib.request.html) | Python 3.5 ~ 3.8 | `sw_urllib_request` |
| [urllib.request](https://docs.python.org/3/library/urllib.request.html) | Python 3.5 ~ 3.9 | `sw_urllib_request` |
| [requests](https://requests.readthedocs.io/en/master/) | >= 2.9.0 < 2.15.0, >= 2.17.0 <= 2.24.0 | `sw_requests` |
| [Flask](https://flask.palletsprojects.com/en/1.1.x/) | >=1.0.4 <= 1.1.2 | `sw_flask` |
| [PyMySQL](https://pymysql.readthedocs.io/en/latest/) | 0.10.0 | `sw_pymysql` |
Expand All @@ -18,6 +18,9 @@ Library | Versions | Plugin Name
| [sanic](https://sanic.readthedocs.io/en/latest/) | >= 20.3.0 <= 20.9.1 | `sw_sanic` |
| [aiohttp](https://sanic.readthedocs.io/en/latest/) | >= 3.7.3 | `sw_aiohttp` |
| [pyramid](https://trypyramid.com) | >= 1.9 | `sw_pyramid` |
| [psycopg2](https://www.psycopg.org/) | 2.8.6 | `sw_psycopg2` |
| [psycopg2](https://www.psycopg.org/) | >= 2.8.6 | `sw_psycopg2` |
| [celery](https://docs.celeryproject.org/) | >= 4.2.1 | `sw_celery` |
kezhenxu94 marked this conversation as resolved.
Show resolved Hide resolved

* Note: The celery server running with "celery -A ..." should be run with the http protocol as it uses multiprocessing by default which is not compatible with the grpc protocol implementation in skywalking currently. Celery clients can use whatever protocol they want.

The column `Versions` only indicates that the versions are tested, if you found the newer versions are also supported, welcome to add the newer version into the table.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ aiofiles==0.6.0
aiohttp==3.7.3
attrs==19.3.0
blindspin==2.0.1
celery==4.4.7
certifi==2020.6.20
chardet==3.0.4
click==7.1.2
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@
author="Apache",
author_email="dev@skywalking.apache.org",
license="Apache 2.0",
packages=find_packages(exclude=("tests",)),
packages=find_packages(exclude=("tests", "tests.*")),
include_package_data=True,
install_requires=[
"grpcio",
"grpcio-tools",
"packaging",
"requests",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @tom-pytel I missed this in this PR, but this makes requests a mandatory dependency of skywalking-python, please also take a look at apache/skywalking#7282 that requests depends on a LGPL licensed dependency that we cannot ship with in ASF project. As we have this in extras_require/http, can we just remove this? When users want to use http protocol, they can use something like pip install skywalking-python[http].

FYI @wu-sheng

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a plugin requires an agent-level dependency?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a plugin requires an agent-level dependency?

We support grpc and http protocols, for http protocol, we use requests to send http requests, as we use grpc as default protocol and http is optional (can be installed by pip install skywalking-python[http]), I think @tom-pytel missed that and wanted to test http protocol so he add the dependency here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, get it. It is glad we don't really depend on it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure if it is problematic then we remove it from the required dependencies if the license will cause problems. I could also look into using a different communication method like urllib.request or urllib3.request?

As for http protocol, we are doing stress testing here and finding that grpc is not entirely reliable and the http protocol is actually a lot more stable. Not sure why this is happening, maybe grpc is not configured correctly or the timeouts are causing problems. But the main result is that you should consider the http protocol a little more than just optional at this point since it is capable of working in scenarios for us where grpc breaks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tom-pytel Could you share how you test the performance in another separate issue? From the last several weeks' perf tests, the JSON really doesn't have good performance from a Java perspective, tested in the OAP backend.

"wrapt",
],
extras_require={
Expand Down
1 change: 1 addition & 0 deletions skywalking/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class Component(Enum):
AioHttp = 7008
Pyramid = 7009
Psycopg = 7010
Celery = 7011


class Layer(Enum):
Expand Down
65 changes: 51 additions & 14 deletions skywalking/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#

import atexit
import os
from queue import Queue, Full
from threading import Thread, Event
from typing import TYPE_CHECKING
Expand All @@ -28,6 +29,11 @@
from skywalking.trace.context import Segment


__started = False
__protocol = Protocol() # type: Protocol
__heartbeat_thread = __report_thread = __queue = __finished = None


def __heartbeat():
while not __finished.is_set():
if connected():
Expand All @@ -39,21 +45,26 @@ def __heartbeat():
def __report():
while not __finished.is_set():
if connected():
__protocol.report(__queue) # is blocking actually
__protocol.report(__queue) # is blocking actually, blocks for max config.QUEUE_TIMEOUT seconds

__finished.wait(1)


__heartbeat_thread = Thread(name='HeartbeatThread', target=__heartbeat, daemon=True)
__report_thread = Thread(name='ReportThread', target=__report, daemon=True)
__queue = Queue(maxsize=10000)
__finished = Event()
__protocol = Protocol() # type: Protocol
__started = False
def __init_threading():
global __heartbeat_thread, __report_thread, __queue, __finished

__queue = Queue(maxsize=10000)
__finished = Event()
__heartbeat_thread = Thread(name='HeartbeatThread', target=__heartbeat, daemon=True)
__report_thread = Thread(name='ReportThread', target=__report, daemon=True)

__heartbeat_thread.start()
__report_thread.start()


def __init():
global __protocol

if config.protocol == 'grpc':
from skywalking.agent.protocol.grpc import GrpcProtocol
__protocol = GrpcProtocol()
Expand All @@ -65,14 +76,40 @@ def __init():
__protocol = KafkaProtocol()

plugins.install()
__init_threading()


def __fini():
__protocol.report(__queue, False)
__queue.join()
__finished.set()


def __fork_before():
if config.protocol != 'http':
logger.warning('fork() not currently supported with %s protocol' % config.protocol)
Comment on lines +89 to +90
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to make gRPC work in fork? Like I said in DM, recreate a brand new gRPC channel in forked process?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried closing down channel and recreating in both parent and child after fork. It is possible I did not do it right since I am not grpc expert but I got one of two results:

  1. Worked in exactly one of the forks, parent or child, but not both.
  2. Didn't work in either.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't mean to close parent channel and create in child process by reusing the agent in parent. What I meant is to start another independent agent in child process and leave the parent one there because there may be other things that may need to be traced in parent process. Can you take a look at

def run(self):
if agent.started() is False:
config.deserialize(self._sw_config)
agent.start()
super(SwProcess, self).run()

... and see whether that helps, it is generally what I propose to do in forked processes?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I your current implementation, when new processes are spawned, the agent in parent process takes no effect then, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't mean to close parent channel and create in child process by reusing the agent in parent. What I meant is to start another independent agent in child process and leave the parent one there because there may be other things that may need to be traced in parent process. Can you take a look at

I tried several things like:

  • Not doing anything before the fork then creating new GrpcServiceManagementClient and GrpcTraceSegmentReportService in child.
  • The above but closing channel in child before creating new.
  • Closing the channel before fork then recreating in both parent and child.
  • Instead of close(), use unsubscribe().
  • Both unsubscribe() then close() before fork or after in child.
  • I did also try waiting for empty queue before allowing fork to proceed but that was unnecessary as I wasn't even sending anything before the fork, just for form.

I also forgot to mention there was a third result I was getting sometime, deadlock hang. It is possible I missed some permutations or a different function to call, but in general researching python grpc with multiprocessing on the the net I found basically the following answers, either 1. "don't do it", or 2. "grpc must be started only after forking everything, then maybe it will work". Here are some links:

googleapis/synthtool#902
https://stackoverflow.com/questions/62798507/why-multiprocess-python-grpc-server-do-not-work

So as I said, it may be possible but I have not hit on how to do it. If you want to give it a shot I will add a simple test scrip to the end of this message. I also didn't test anything with Kafka and assume it will not work correctly forking until someone validates that.

As for current higher level flow, keep in mind it can be modified in the future according to what protocol is in use, but for now - Nothing special is done before fork or after in the parent. In those cases all threads and sockets and locks continue operating as if nothing had happened. In the child, new report and heartbeat threads are started since threads don't survive into children. And specifically in the http protocol, the duplicated sockets are closed and new ones are opened on next heartbeat or report.

There is a potential problem with the __queue object as a thread may have been holding an internal lock on it before fork and since that thread is no longer present the queue will remain in a locked state. Not sure how to resolve this yet, but it should be a very rare event. Even rarer may be the same lock problem with the __finished event, but I wouldn't expect that to happen basically ever.

Right now I have other stuff on my plate but if you have any suggestions on what to try I may revisit this at some point in the future. Or if you ant to try yourself, here is a test script:

import multiprocessing as mp
import time

from skywalking.trace.context import get_context
from skywalking import agent, config

config.init(collector='127.0.0.1:11800', service='your awesome service')
agent.start()

def foo():
    with get_context().new_local_span('child before error'):
        pass

    time.sleep(2)  # this needed to flush send because python doesn't run atexit handlers on exit in forked children

    # import atexit
    # atexit._run_exitfuncs()

if __name__ == '__main__':
    p = mp.Process(target = foo, args = ())

    with get_context().new_local_span('parent before start'):
        pass

    p.start()

    time.sleep(1)

    with get_context().new_local_span('parent after start'):
        pass

    p.join()

    with get_context().new_local_span('parent after join'):
        pass

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But also, the missing failed to install plugin sw_celery tells me you are not running this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But also, the missing failed to install plugin sw_celery tells me you are not running this PR.

I was running on master branch, not this PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same result with or without a 2 sec delay

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried a few more times, with upstream/master, and still bad results. I did get one run where I got all 4 spans but the rest of the runs were 3 spans with a couple of deadlocks. Apart from that, upstream/master can not possibly run correctly in a multiprocessing scenario because on fork() no other threads are duplicated in the child (like report or heartbeat), they need to be explicitly recreated in a fork child (which I do in this PR).

I don't have time allocated now to look into the grpc issue but I do know that http protocol in this PR works with fork() for sure. So how do you want to proceed? I could remove that warning message if you want, or change it to something a little less absolute like "fork() may not work correctly with grpc protocol"? But in general this PR does not change anything about how grpc worked before, just fixes the http protocol and adds restart of report and heartbeat threads in fork() child. And also the celery plugin of course.

Copy link
Contributor Author

@tom-pytel tom-pytel Jun 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, this is not the end of the road though. Our internal stress tests show problems with spans mixing or disappearing so I need to go back into core functionality and fix all that. Maybe overhaul how span context is tracked like in the Node agent (especially since async wansn't originally a design consideration in this agent). So treat this PR as a single step towards getting all that fixed.


# TODO: handle __queue and __finished correctly (locks, mutexes, etc...), need to lock before fork and unlock after
# if possible, or ensure they are not locked in threads (end threads and restart after fork?)

__protocol.fork_before()


def __fork_after_in_parent():
__protocol.fork_after_in_parent()


def __fork_after_in_child():
__protocol.fork_after_in_child()
__init_threading()


def start():
global __started
if __started:
return
__started = True

flag = False
try:
from gevent import monkey
Expand All @@ -82,22 +119,22 @@ def start():
if flag:
import grpc.experimental.gevent as grpc_gevent
grpc_gevent.init_gevent()
global __started
if __started:
raise RuntimeError('the agent can only be started once')

loggings.init()
config.finalize()
__started = True

__init()
__heartbeat_thread.start()
__report_thread.start()

atexit.register(__fini)

if (hasattr(os, 'register_at_fork')):
os.register_at_fork(before=__fork_before, after_in_parent=__fork_after_in_parent,
after_in_child=__fork_after_in_child)


def stop():
atexit.unregister(__fini)
__fini()
__finished.set()


def started():
Expand Down
11 changes: 10 additions & 1 deletion skywalking/agent/protocol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,17 @@


class Protocol(ABC):
def fork_before(self):
pass

def fork_after_in_parent(self):
pass

def fork_after_in_child(self):
pass

def connected(self):
raise NotImplementedError()
return False

def heartbeat(self):
raise NotImplementedError()
Expand Down
22 changes: 17 additions & 5 deletions skywalking/agent/protocol/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

from skywalking.loggings import logger
from queue import Queue, Empty
from time import time

from skywalking import config
from skywalking.agent import Protocol
from skywalking.client.http import HttpServiceManagementClient, HttpTraceSegmentReportService
from skywalking.trace.segment import Segment
Expand All @@ -29,20 +31,27 @@ def __init__(self):
self.service_management = HttpServiceManagementClient()
self.traces_reporter = HttpTraceSegmentReportService()

def fork_after_in_child(self):
self.service_management.fork_after_in_child()
self.traces_reporter.fork_after_in_child()

def connected(self):
return True

def heartbeat(self):
if not self.properties_sent:
self.service_management.send_instance_props()
self.properties_sent = True
self.service_management.send_heart_beat()

def connected(self):
return True

def report(self, queue: Queue, block: bool = True):
start = time()

def generator():
while True:
try:
segment = queue.get(block=block) # type: Segment
timeout = max(0, config.QUEUE_TIMEOUT - int(time() - start)) # type: int
segment = queue.get(block=block, timeout=timeout) # type: Segment
except Empty:
return

Expand All @@ -52,4 +61,7 @@ def generator():

queue.task_done()

self.traces_reporter.report(generator=generator())
try:
self.traces_reporter.report(generator=generator())
except Exception:
pass
22 changes: 15 additions & 7 deletions skywalking/client/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@

class HttpServiceManagementClient(ServiceManagementClient):
def __init__(self):
self.session = requests.session()
self.session = requests.Session()

def fork_after_in_child(self):
self.session.close()
self.session = requests.Session()

def send_instance_props(self):
url = config.collector_address.rstrip('/') + '/v3/management/reportProperties'
url = 'http://' + config.collector_address.rstrip('/') + '/v3/management/reportProperties'
res = self.session.post(url, json={
'service': config.service_name,
'serviceInstance': config.service_instance,
Expand All @@ -44,7 +48,7 @@ def send_heart_beat(self):
config.service_name,
config.service_instance,
)
url = config.collector_address.rstrip('/') + '/v3/management/keepAlive'
url = 'http://' + config.collector_address.rstrip('/') + '/v3/management/keepAlive'
res = self.session.post(url, json={
'service': config.service_name,
'serviceInstance': config.service_instance,
Expand All @@ -54,10 +58,14 @@ def send_heart_beat(self):

class HttpTraceSegmentReportService(TraceSegmentReportService):
def __init__(self):
self.session = requests.session()
self.session = requests.Session()

def fork_after_in_child(self):
self.session.close()
self.session = requests.Session()

def report(self, generator):
url = config.collector_address.rstrip('/') + '/v3/segment'
url = 'http://' + config.collector_address.rstrip('/') + '/v3/segment'
for segment in generator:
res = self.session.post(url, json={
'traceId': str(segment.related_traces[0]),
Expand All @@ -76,10 +84,10 @@ def report(self, generator):
'componentId': span.component.value,
'isError': span.error_occurred,
'logs': [{
'time': log.timestamp * 1000,
'time': int(log.timestamp * 1000),
'data': [{
'key': item.key,
'value': item.val
'value': item.val,
} for item in log.items],
} for log in span.logs],
'tags': [{
Expand Down
3 changes: 2 additions & 1 deletion skywalking/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,14 @@
kafka_bootstrap_servers = os.getenv('SW_KAFKA_REPORTER_BOOTSTRAP_SERVERS') or "localhost:9092" # type: str
kafka_topic_management = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or "skywalking-managements" # type: str
kafka_topic_segment = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or "skywalking-segments" # type: str
celery_parameters_length = int(os.getenv('SW_CELERY_PARAMETERS_LENGTH') or '512')


def init(
service: str = None,
instance: str = None,
collector: str = None,
protocol_type: str = 'grpc',
protocol_type: str = None,
tom-pytel marked this conversation as resolved.
Show resolved Hide resolved
tom-pytel marked this conversation as resolved.
Show resolved Hide resolved
kezhenxu94 marked this conversation as resolved.
Show resolved Hide resolved
token: str = None,
):
global service_name
Expand Down
Loading