Skip to main content

路 3 min read

Overview#

Usually we saw data systems provide extensibility by allowing user to define functions - UDF, in Nebula, we allow user to define custom column instead, there are some reasons:

  • custom column is simpler - for reuse purpose, user define functions and use function in transformation, there are more places to handle this customized behavior. Custom column allows us to sumply focus on extra column computing.
  • custom column is essential what user is looking for hence more straightforward. As mentioned above, most of time user defined function is for reuse purpose, however it is not a problem what Nebula tries to solve.
  • Nebula core problem here is to allow user to define transformation through literal language - javascript.
  • this aligns Nebula's another goal to enable user to run simple distributed machine learning algorithm, custom column will be feature extraction in that use case.

Feature Support#

Support custom column as extensibility used by Nebula in two scenarios, they are:

  • Runtime compute - user provide custom column in query, and we compute this custom column value for it. In this scenario, column definition is logical, column value is computed on the fly by observers, no persistent required here hence this custom column is not included in the table schema definition. for example:

    ```    // schema definition    // table = ROW(a:int, b:long, c:string);        // custom column definition    const cc = () => nebula.column("a") * 2 + 3;    nebula.register("cc", Types.INT, cc);
        // use the custom column in query    table.select("cc");```
  • Ingestion compute - user provides custom column hook in ingestion definition, it means original data source doesn't have the column, but we define it by lambda. This new computed column during ingestion time will be persisted hence included in table schema.

    nebula.test:    # max 10G RAM assigment    max-mb: 10000    # max 10 days assignment    max-hr: 240    schema: "ROW<id:int, price:double, price2:double>"    columns:      id:        bloom_filter: true      price2:        expr: "[price] * 2";    time:      type: static      # get it from linux by "date +%s"      value: 1565994194

    note that in this example, [price] is just a convenient expression, in runtime, it will be converted to a real function satifying a javascript function definition as const price2 = () => nebula.column("price") * 2

Internally, we're supporting ES2015 syntax standard javascript expression, this should give users flexible enough space to transform data to whatever they want.

More To Expect#

Nebula shall support custom aggregation column as well through javascript definition, however, it is a lot more work and we want to have a performant implementation of major aggregation functions, so this is not in the priority list for now.

路 3 min read

Nebula analytics relies on aggregations, and these aggregations are expressed by

  • Expressions: including constant ops, arthmetic ops, logical ops and column ops. These operations are basics for a programing language, hence they are fundementals to nebula query engine.
  • UDF: nebula will provide native UDFs in its own package implemented with native code. We plan to add customized UDF through user interface which will be available in javascript. V8 engine will be used to execute them with interopability with Nebula runtime data strucutre. Basic built-in UDFs are "in", "not in", "like", "ilike", etc.
  • UDAF: core analytic capability are supported by "aggregation metrics by dimensions". Nebula provides efficient implementations for them, UDAF like "count", "sum", "avg", "median", "percentiles", "NDV" shall be available.

Plug-in a new UDF / UDAF should be straigtforward in nebula code base, feel free to submit a new request through issue or PR.

AVG#

UDAF "avg" is an interesting topic, I have been thinking a lot on how to implement it. Basically there are two major approaches (well essentially the same concept) 1) Having SUM / COUNT columns in computing time and convert to AVG column at the last step. In this approach, we flex the schema and operate in query level. 2) Have "special" column type such as a binary buffer or wider type like int128. It is similar to number 1 in terms of flexing schema, but it doesn't change column count.

I think #1 provides pretty generic solution for many similar problem - extend data store in runtime by adding more columns. #2 sounds a bit hacky, but because it maintains same width of schema, a lot of checks and type converts becoming naturely easy. So in the first try, I chose to implement AVG using #2 approach without changing much on the query planning itself.

A very important data structure used in Nebula is called "HashFlat", it's basically a row-based data set with hash function to keep unique rows in contingious memory block. It needs client to provide interfaces to process data in different type:

  • Store: convert UDAF inner expression value into the store type in hash flat.
  • Merge: merge two rows with same keys.
  • Finalize: convert store type into desired type.

A UDAF will define these 3 functions, and when a hash flat is setup, a wrapper of this function set will be provided. So when HashFlat receives a duplicate rows (same keys), it will trigger the merge function, and new row to trigger store method, for most cases, since store type equals native type, it will have empty store and finalize method. UDF + Types

Percentiles, NDV and others#

Follow the same pattern, for any UDAF that requires state management, it can go the same design as AVG using a store type for state management and use finalize method to convert temporary state to final value.

路 5 min read

We need extensible model to support adding any customized data structure for special data aggregation. It is challenge to make a decent design fitting into this requirements, let's take a look at what current system

  • UDAF/UDF are modeled as generic expressions which represented by ValueEval or TypeValueEval in the runtime.
    • ValueEval is extensible as long as it is visible, it has methods "eval" for value evaluation, "merge" for value merging, "stack" for value accumulation between different types.
    • UDAF needs differentiate input type, runtime type and schema type so that it can finish aggregation life cycle.
  • RowData interface is the generic interface used to exchange data between any compute layers.
    • Expressions details are hidden by RowData interface which supports only readXXX methods for each individual supported types in our schema system.
  • Sometimes it is not performant if we don't expose "input type" to compute layer when customized type is involved.

For example, if we're building a TDigest sketch for given expression typed in integer, assuming we have TDigestUDAF which defined types

  • input type: INTEGER
  • runtime type: TDigest Object modeled as "void*" or better std::shared_ptr<Serializable>
  • schema type: VARCHAR

In computing time, when we iterate all rows, remember that the UDAF expression is hidden beind by a RowData interface which only allow a given column which contains the UDAF to expose one type, usually we expose runtime type, that means we only have RowData.readPointer(), the implication here is that we might end up with constructing/descruting huge number of TDigest objects, this doesn't sound correct and definitely not scalable for "customized" sketch.

Where to put aggregation

(External Compute: HB)

To overcome this hurdle, we have a few options to consider

  1. Expose UDAF as its inner expression, and leave its aggregation logic to external compute.
    1. e.g SUM(EXPR1) -> EXPR1 through RowData interface
    2. Sure, external compute will have enough metadata to understand how to invoke its logic for aggregation.
    3. Pros & Cons: very straightforward and leave data layer clean. The downside is the inconsistency in the schema, since most of the time, the UDAF result value type differs from its inner expression type.
  2. Expose different interface in RowData to allow external compute to fetch inner experssion value instead.
    1. This approach doesn't change its current runtime schema.
    2. Some kind of brokeness on RowData interface.
    3. Difficult to manage life cycle of the custom object since UDAF itself creates object internally and pass to external compute to manage.

None of above approaches is ideal and it is difficult to sort out an ideal approach for now. Here is what I propose to do to make this a bit cleaner hopefully:

  1. Mainly take approach 1 to keep UDAF interface clean and simple.
  2. Push complexity into HB by introducing aggregator object for each defined UDAF.
  3. The aggregator may or may not require extra object allocation, depends on if it needs assistant data structure.
    1. CreateObject
    2. Aggregate
    3. Serde
  4. We need to allow all UDAF to have different runtime schema but with final schema corrected in last stage.

Hopefully this change will help boost performance as well, for example AVG. Since this is going to be large refactoring, I will leave it to next diff and report back what the result looks like - the basic measure is ensure AVG UDAF is working well/better in the new design.

Update#

Through the whole Nebula code base, all the boundary is communicated through RowData and its cursor type RowCursor, it is extremely difficult to model a customized object (aggregator) on a plain storage wrapped by a RowData, and also to be very efficient to avoid object allocations across rows. I also looked at "struct" type, if we can reuse it to represent any customized sketch but attached with different functions, again it is good idea to keep RowData interface intact but facing two big challenges:

  1. construct sketch object on given memory chunk represented by the struct. (preventing object serde per row)
  2. avoid object creation for non-aggregated rows.

Given all the thoughts and exploration, I decided to make the change to RowData interface, adding a new interface to it to fetch aggregator object given column name/index. Client will get a valid aggregator object if

  • the row represents an aggregated row
  • the column is an aggregation column

otherwise it will return a nullptr.

Aggregation column type will be the same as its inner expression type, so that we know if a ROW can return normal value to be aggregated or it can return aggregator to aggregate other incoming values in block computing phase.

In merge phase - aggregator will aggregate other aggregators.

In final phase - an aggregator will provide finalize method to return its desired value.

In the new architecture, any sketch is attached and can be detached. As optimization, it may share memory as schema indicates which is controlled by column width. If the column is an aggregation field, it will produce alignment space for the column no matter it has null value or not.

When serialization, sketch will be serialized into data space, and deserialization will be restruct the sketch for each aggregation column.

路 3 min read

From service deployment perspective, there are 4 major components backs up Nebula service. They are

  1. Nebula Server
  2. Nebula Nodes
  3. Nebula Service Gateway (Envoy)
  4. Nebula Web

Today these components are architectured in this way. Components V1

In this mode, we're opening Nebula raw service through envoy as service gateway which usually listens at some port for http traffic, itself maintains connection with Nebula service through HTTP2 (grpc).

Another open service is the web server which listens at standard 80 for browsers request, this service is backed by a NODE server (NodeJS). It serves static content and client logic for Nebula Web UI, this Web UI provides query interface for data analysis and graphics/charting functions for display.

Why do we architecture it in this way from beginning? Well, there are a few advantages by doing so:

  1. It maintains a lightweight web service for least amount of maintaince cost with high QPS for performance.
  2. It sets up Web UI as a completely independant component which could be replaced by any tech stack or any new client for consuming the same service.
  3. It provides complete UI/service separation for better maintainance.

I appreciate this arch as it places decoupled design giving us best flexbility to make changes independently.

Now, I'm making a slight change for WEB-UI by routing service consumption behind web server. The reasons why I'm doing so are:

  • Authentication. Due to isolation, we need to implement authentication in both interfaces (web + envoy) for single web client. By moving the service behind, we only need to maintain one for web only.
  • Security. This change doesn't mean enovy doesn't need authentication. We need to implement it anyways but not priority for now.

After the change, th arch will look like this Components V2

The drawback of this change is obvious:

  1. for each query, it goes two hops rather than one (Q: client -> web -> (envoy -> server) -> web -> client). Given the extra hop happens in the same server for most of the cases, the impact should be minimal.
  2. adding complexity of query/result transfer logic into web tier which is mostly doing translation.
  3. Extra serde (JSON) from server replied object and web client.

After this change, the envoy gateway is still open, but it will not consumed directly by web client. Also note that, the pros/cons are clear for v1 vs v2, we want to make sure things are easy to switch back and forth. Currently it is supported by a variable archMode in web.js, we may move it to nebula config in future.

  • archMode = 1 (v1): web client connect nebula server for query.
  • archMode = 2 (v2): web client connect web api as proxy for query.

Cheers!

路 3 min read

We know that Javascript can only represent 2^53-1 as it max number. For any values larger than that will lose precision during data exchange in javascript runtime. Most of the time, we have to pay cost to serializing 64bits in string format.

Nebula faces this issue too, in this short note, I would just like to summarize all places with the concern and how we handle them for now, this doesn't mean it is ideal, in fact, we are still looking for improvements.

No 1. rapidjson#

We have been using RapidJson as the JSON serde library in our C++ runtime, due to its capability handling this concern well. We don't do long<-->string conversion trick. Hence we removed similar trick in your code base, specifically in InExpression serde code path.

No 2. node.js#

node.js is the web server runtime Nebula Web resides in, it is definitely the most hot place regarding this issue. We have been using JSON as data fromat to exchange requests/responses between web client and Nebula Server. Here are what we have done:

  1. We're using json-bigint package to do json.parse so if the input JSON includes bigint values, it won't lose the value're precision.
  2. In nebula proto, we added two more types of predicate, so it allows client to use one of them
    1. value: string type
    2. nvalue: int64 type
    3. dvalue: double type.

with this change, we don't pay string->int or string->double type data conversion. (In fact, protobuf does this for us - it is uncertain if perf is better, but at least cleaner at interface).

The serde work is dene by protobuf-js, which is still using Number for all number values, so the problem still exists, if passing bigint value in grpc call defined by int64 in nebula.proto, Nebula Server may receive wrong value. And I haven't seen this is going to be fixed soon anywhere. We will keep eyes open for any alternative solution.

As a work-around, we will use "string" representation for nvalue by appending [jstype = JS_STRING] in its proto declaration. After that, we requires client to send string list for this field for now. Definitely this is neither convinient nor efficient.

No 3. web#

In Nebula UI which means browser here, we haven't handle any thing like this, since it is for display purpose, basically all bigint values are returned in string type. Client code (browser js) needs to convert it if used in further computation.

No 4. other clients.#

This is obvious but just to mention, other clients that is not executing in javascript runtime should not have this issue, for example, if I use curl to post a JSON blob of bigint array, Nebula API (node.js) won't lose precision of values. Other clients like Java should be fine too.